重要|Flink SQL与kafka整合的那些事儿

jujusharp  •  Jun 2, 2019 9:13:23 PM

原文地址


重要|Flink SQL与kafka整合的那些事儿

flink与kafka整合是很常见的一种实时处理场景,尤其是kafka 0.11版本以后生产者支持了事务,使得flink与kafka整合能实现完整的端到端的仅一次处理,虽然这样会有checkpoint周期的数据延迟,但是这个仅一次处理也是很诱人的。可见的端到端的使用案例估计就是前段时间oppo的案例分享吧。关注浪尖微信公众号(bigdatatip)输入 oppo 即可获得。

1.flink sql与kafka整合方式介绍

flink SQL与kafka整合有多种方式,浪尖就在这里总结一下:

1.datastream转table

通过addsource和addsink API,整合,生成Datastream后注册为表,然后sql分析。

主要接口有两种形式

1.直接注册为表// register the DataStream as Table "myTable" with fields "f0", "f1"tableEnv.registerDataStream("myTable", stream);// register the DataStream as table "myTable2" with fields "myLong", "myString"tableEnv.registerDataStream("myTable2", stream, "myLong, myString");2.转换为tableDataStream<Tuple2<Long, String>> stream = ...// Convert the DataStream into a Table with default fields "f0", "f1"Table table1 = tableEnv.fromDataStream(stream);// Convert the DataStream into a Table with fields "myLong", "myString"Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");

2.tablesource和tablesink

通过tablesource和tablesink接口,也可以直接注册为输入和输出表。

Kafka010JsonTableSource和Kafka010JsonTableSink

3.自定义catalog

通过自定义catalog的形式,这种类型暂时不讲后面会有视频教程放到知识星球里。

ExternalCatalog catalog = new InMemoryExternalCatalog();// register the ExternalCatalog catalogtableEnv.registerExternalCatalog("InMemCatalog", catalog);

4.connector方式

这种方式是本文要讲明白的一种方式,其余的会陆续分享到知识星球内部。

这种方式目前仅仅支持kafka,es,和file。

2.案例讲解

直接上案例吧,然后再去讲一下细节问题。

import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.TableEnvironment;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.Json;import org.apache.flink.table.descriptors.Kafka;import org.apache.flink.table.descriptors.Rowtime;import org.apache.flink.table.descriptors.Schema;public class kafka2kafka {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        env.setParallelism(1);        StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);        tEnv.connect(                new Kafka()                        .version("0.10")                        //   "0.8", "0.9", "0.10", "0.11", and "universal"                        .topic("jsontest")                        .property("bootstrap.servers", "mt-mdh.local:9093")                        .property("group.id","test")                        .startFromLatest()        )                .withFormat(                        new Json()                                .failOnMissingField(false)                                .deriveSchema()                )                .withSchema(                        new Schema()                                .field("rowtime",Types.SQL_TIMESTAMP)                                .rowtime(new Rowtime()                                        .timestampsFromField("eventtime")                                        .watermarksPeriodicBounded(2000)                                )                                .field("fruit", Types.STRING)                                .field("number", Types.INT)                )                .inAppendMode()                .registerTableSource("source");        tEnv.connect(                new Kafka()                        .version("0.10")                        //   "0.8", "0.9", "0.10", "0.11", and "universal"                        .topic("test")                        .property("acks", "all")                        .property("retries", "0")                        .property("batch.size", "16384")                        .property("linger.ms", "10")                        .property("bootstrap.servers", "mt-mdh.local:9093")                        .sinkPartitionerFixed()        ).inAppendMode()                .withFormat(                        new Json().deriveSchema()                )                .withSchema(                        new Schema()                                .field("fruit", Types.STRING)                                .field("total", Types.INT)                                .field("time", Types.SQL_TIMESTAMP)                )                .registerTableSink("sink");        tEnv.sqlUpdate("insert into sink select fruit,sum(number),TUMBLE_END(rowtime, INTERVAL '5' SECOND) from source group by fruit,TUMBLE(rowtime, INTERVAL '5' SECOND)");        env.execute();    }}

这个例子是按照事件时间开窗,统计对fruit求和。从这个例子里可以看到要使用connector还是比较麻烦的,配置项目比较多,下面我们就拆分介绍一下。细节内容可以阅读官网(https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#type-strings)

1.配置数据源

.connect( new Kafka() .version("0.11") // required: valid connector versions are // "0.8", "0.9", "0.10", "0.11", and "universal" .topic("...") // required: topic name from which the table is read // optional: connector specific properties .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") .property("group.id", "testGroup") // optional: select a startup mode for Kafka offsets .startFromEarliest() .startFromLatest() .startFromSpecificOffsets(...) // optional: output partitioning from Flink's partitions into Kafka's partitions .sinkPartitionerFixed() // each Flink partition ends up in at-most one Kafka partition (default) .sinkPartitionerRoundRobin() // a Flink partition is distributed to Kafka partitions round-robin .sinkPartitionerCustom(MyCustom.class) // use a custom FlinkKafkaPartitioner subclass)

2.数据的格式

目前支持CSV,JSON,AVRO三种格式。从json数据源里解析所需要的table字段,这个过程需要我们指定。总共有三种方式,如下:

.withFormat( new Json() .failOnMissingField(true) // optional: flag whether to fail if a field is missing or not, false by default // required: define the schema either by using type information which parses numbers to corresponding types .schema(Type.ROW(...)) // or by using a JSON schema which parses to DECIMAL and TIMESTAMP .jsonSchema( "{" + " type: 'object'," + " properties: {" + " lon: {" + " type: 'number'" + " }," + " rideTime: {" + " type: 'string'," + " format: 'date-time'" + " }" + " }" + "}" ) // or use the table's schema .deriveSchema())

其实,最常用的是第三种,直接从我们指定的schema里逆推。

3.schema信息

除了配置schema信息之外,还可以配置时间相关的概念。

.withSchema( new Schema() .field("MyField1", Types.SQL_TIMESTAMP) .proctime() // optional: declares this field as a processing-time attribute .field("MyField2", Types.SQL_TIMESTAMP) .rowtime(...) // optional: declares this field as a event-time attribute .field("MyField3", Types.BOOLEAN) .from("mf3") // optional: original field in the input that is referenced/aliased by this field)

4.输出的更新模式

更新模式有append模式,retract模式,update模式。

.connect(...)
.inAppendMode() // otherwise: inUpsertMode() or inRetractMode()

5.时间相关配置

在配置schema信息的时候可以配置时间相关的概念,比如事件时间,处理时间,还可以配置watermark相关的,甚至是自定义watermark。

对于事件时间,时间戳抽取支持:

// Converts an existing LONG or SQL_TIMESTAMP field in the input into the rowtime attribute..rowtime( new Rowtime() .timestampsFromField("ts_field") // required: original field name in the input)// Converts the assigned timestamps from a DataStream API record into the rowtime attribute// and thus preserves the assigned timestamps from the source.// This requires a source that assigns timestamps (e.g., Kafka 0.10+)..rowtime( new Rowtime() .timestampsFromSource())// Sets a custom timestamp extractor to be used for the rowtime attribute.// The extractor must extend `org.apache.flink.table.sources.tsextractors.TimestampExtractor`..rowtime( new Rowtime() .timestampsFromExtractor(...))

watermark生成策略支持

// Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum// observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp// are not late..rowtime(  new Rowtime()    .watermarksPeriodicAscending())// Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.// Emits watermarks which are the maximum observed timestamp minus the specified delay..rowtime(  new Rowtime()    .watermarksPeriodicBounded(2000)    // delay in milliseconds)// Sets a built-in watermark strategy which indicates the watermarks should be preserved from the// underlying DataStream API and thus preserves the assigned watermarks from the source..rowtime(  new Rowtime()    .watermarksFromSource())

3.总结

本文主要讲了flink sql与kafka结合的多种方式,对于datastream相关操作可以一般采用addsource和addsink的方式,对于想使用flink的朋友们,kafkajsontablesource和kafkajsontablesink在逐步废弃掉,可以采用connector和catalog的形式,尤其是后者在实现平台的过程中也是非常之靠谱好用的。

更多flink内容,欢迎加入浪尖知识星球,与750+好友一起学习。

0 回复
暂时没有回复,你也许会成为第一个哦