Flink 1.11.1:table sql Kafka Connector支持Upsert写入
flink版本:1.11.1文章目录目的测试的代码flink本身的Kafka Connector重新定义一个支持Upsert的Kafka Connector Sink编译打包替换jar包并测试目的在使用flink table sql的情况下,使kafka connector sink支持upsert正常写入测试的代码val fsSettings = EnvironmentSettings.newI
flink版本:1.11.1
目的
在使用flink table sql的情况下,使kafka connector sink支持upsert正常写入
测试的代码
val fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tblEnv = StreamTableEnvironment.create(senv,fsSettings)
val createTable =
"""
|CREATE TABLE ck1 (
| id VARCHAR,
| create_date TIMESTAMP,
| write_date TIMESTAMP,
| code VARCHAR
|)
|WITH (
| 'connector' = 'kafka',
| 'topic' = 'action.log',
| 'scan.startup.mode' = 'latest-offset',
| 'properties.bootstrap.servers' = '192.168.10.16:9092',
| 'properties.group.id' = 'testGroup',
| 'format' = 'json'
|)
""".stripMargin
val createTable1 =
"""
|CREATE TABLE ck2 (
| id VARCHAR,
| create_date TIMESTAMP,
| write_date TIMESTAMP,
| code VARCHAR
|)
|WITH (
| 'connector' = 'kafka',
| 'topic' = 'action.log.log',
| 'scan.startup.mode' = 'latest-offset',
| 'properties.bootstrap.servers' = '192.168.10.16:9092',
| 'properties.group.id' = 'testGroup1',
| 'format' = 'json'
|)
""".stripMargin
tblEnv.executeSql(createTable)
tblEnv.executeSql(createTable1)
val nt_order_detail =
"""
|CREATE TABLE ck_interval(
| id VARCHAR primary key,
| creat_date TIMESTAMP,
| code VARCHAR
|)
|WITH (
| 'connector' = 'kafka',
| 'topic' = 'test_ck.public.ck_interval',
| 'scan.startup.mode' = 'latest-offset',
| 'properties.bootstrap.servers' = '192.168.10.16:9092',
| 'properties.group.id' = 'testGroup',
| 'format' = 'csv'
|)
""".stripMargin
tblEnv.executeSql(nt_order_detail)
var sqlQuery =
"""
|insert into ck_interval
|SELECT
|id,create_date,code from(
|SELECT p.id,p.create_date
|,b.code from
|(SELECT id,create_date,write_date,code from ck1) AS p
|LEFT JOIN
|(SELECT id,create_date,write_date,code from ck2) AS b
| ON b.id = p.id
| )a
|""".stripMargin
tblEnv.executeSql(sqlQuery)
flink本身的Kafka Connector
flink本身的Kafka Connector的在执行upsert写入时会报错,如下:
org.apache.flink.table.api.TableException:
Table sink 'default_catalog.default_database.ck_interval' doesn't support consuming update and delete
changes which is produced by node Join(joinType=[LeftOuterJoin], where=[(id0 = id)], select=[id, create_date, id0, code],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
重新定义一个支持Upsert的Kafka Connector Sink
分别copy生成:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSinkBase.java -------->>>>>> flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/NTKafkaDynamicUpsertSinkBase.java
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java -------->>>>>> flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/NTKafkaDynamicUpsertTableFactoryBase.java
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java -------->>>>>> flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/NTKafkaDynamicUpsertSink.java
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java -------->>>>>> flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/NTKafkaDynamicUpsertTableFactory.java
修改:
NTKafkaDynamicUpsertTableFactory.java
public static final String IDENTIFIER = "nt_kafka";
这里定义新的connector-name
NTKafkaDynamicUpsertSinkBase.java
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
// UPSERT mode
ChangelogMode.Builder builder = ChangelogMode.newBuilder();
for (RowKind kind : requestedMode.getContainedKinds()) {
if (kind != RowKind.UPDATE_BEFORE) {
builder.addContainedKind(kind);
}
}
return builder.build();
}
这里定义kafka的写入为upsert模式
其它两个文件没有特定修改,只需要调整引用即可
编译打包
编译打包前,修改
flink-connectors/flink-connector-kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
添加:
org.apache.flink.streaming.connectors.kafka.table.NTKafkaDynamicUpsertTableFactory
编译打包:
> mvn package -Dmaven.test.skip=true -Dcheckstyle.skip=true
替换jar包并测试
1、替换flink lib下的jar包
flink-connector-kafka_2.11-1.11.1.jar
2、测试
val nt_order_detail =
"""
|CREATE TABLE ck_interval(
| id VARCHAR primary key,
| creat_date TIMESTAMP,
| code VARCHAR
|)
|WITH (
| 'connector' = 'nt_kafka',
| 'topic' = 'test_ck.public.ck_interval',
| 'scan.startup.mode' = 'latest-offset',
| 'properties.bootstrap.servers' = '192.168.10.16:9092',
| 'properties.group.id' = 'testGroup',
| 'format' = 'csv'
|)
""".stripMargin
3、任务提交成功:
4、测试数据:
更多推荐
所有评论(0)