mongo实时导入到clickhouse案例(包含复杂嵌套json的解析)
本案例是把Mongo数据库的数据通过FlinkCDC实时导入到Kafka,消费Kafka数据把维表数据写入到MySQL。读取MySQL维表数据和消费Kafka的数据通过Flink SQL Join后导入到ClickHouse。(三) 数据准备test01表 json数据user表 json数据使用db.test01.insertOne(json数据) 插入到mongo数据库即可github地址ht
(一)案例介绍
本案例是把Mongo数据库的数据通过FlinkCDC实时导入到Kafka,消费Kafka数据把维表数据写入到MySQL。读取MySQL维表数据和消费Kafka的数据通过Flink SQL Join后导入到ClickHouse。
(二) maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>mini-program</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>flink-mongo-realdata</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<!-- <flink.version>1.12.7</flink.version>-->
<!-- <flink.scala.version>2.11</flink.scala.version>-->
<flink.version>1.13.3</flink.version>
<flink.scala.version>2.12</flink.scala.version>
<flink-cdc-version>2.1.0</flink-cdc-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${flink.scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${flink.scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.1-patch</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-dbcp2 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.mongodb/mongo-java-driver -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>2.2.1</version>
<!-- <exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${flink.scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${flink.scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${flink.scala.version}</artifactId>
<version>${flink.version}</version>
<!-- <exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>-->
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.16</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>${flink.version}</version>
<type>pom</type>
<!-- <scope>provided</scope>-->
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${flink.scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${flink.scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${flink.scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${flink.scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-clickhouse</artifactId>
<version>1.13.2-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
(三) 数据准备
test01表 json数据
{
"audio": "12345",
"detail": "hello world01",
"location": {
"alt": 1209.2559999999994,
"lat": 39.38871949999998,
"long": 106.9585184
},
"ppx": {
"x": -4833.720000125517,
"y": 944.3920672436675,
"z": 1209.2559999999994
},
"hight": 90.203,
"userid": 8,
"time": "2022/11/14 09:59:52",
"send_message": [
{
"message": "how are you!",
"time": "2022/11/14 09:58:50"
},
{
"message": "what?",
"time": "2022/11/14 09:59:35"
}
]
}
{
"audio": "67890",
"detail": "hello world02",
"location": {
"alt": 1209.2559999999994,
"lat": 39.38871949999998,
"long": 106.9585184
},
"ppx": {
"x": -4833.720000125517,
"y": 944.3920672436675,
"z": 1209.2559999999994
},
"hight": 80.203,
"userid": 7,
"time": "2022/11/13 09:59:52",
"send_message": [
{
"message": "how are you!",
"time": "2022/11/14 09:58:50"
},
{
"message": "what?",
"time": "2022/11/14 09:59:35"
}
]
}
user表 json数据
{
"id": 7,
"name": "abel",
register_time: "2021/04/12 15:08:31",
"age": 36
}
{
"id": 8,
"name": "piter",
register_time: "2020/05/09 06:59:52",
"age": 45
}
使用db.test01.insertOne(json数据) 插入到mongo数据库即可
(四) Mongo到Kafka
Flink Mongo CDC介绍
github地址
https://github.com/ambitfly/flink-cdc-connectors/blob/master/docs/content/connectors/mongodb-cdc.md
MongoDB CDC连接器通过伪装一个MongoDB集群里副本,利用MongoDB集群的高可用机制,该副本可以从master节点获取完整oplog(operation log)事件流。
依赖条件
MongoDB版本
MongoDB version >= 3.6
集群部署
副本集 或 分片集群
Storage Engine
WiredTiger存储引擎。
副本集协议版本
副本集协议版本1 (pv1) 。
从4.0版本开始,MongoDB只支持pv1。 pv1是MongoDB 3.2或更高版本创建的所有新副本集的默认值。
需要的权限
MongoDB Kafka Connector需要changeStream 和 read 权限。
您可以使用下面的示例进行简单授权:
更多详细授权请参考MongoDB数据库用户角色。
use admin;
db.createUser({
user: "flinkuser",
pwd: "flinkpw",
roles: [
{ role: "read", db: "admin" }, //read role includes changeStream privilege
{ role: "readAnyDatabase", db: "admin" } //for snapshot reading
]
});
Flink Mongo CDC DataStream Demo
Flink Mongo CDC DataStream代码
package com.ambitfly.mongo.demo;
import com.ambitfly.mongo.MyMongoDebeziumSchema;
import com.ambitfly.utils.PropertiesUtil;
import com.ververica.cdc.connectors.mongodb.MongoDBSource;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCDCDataStreamDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
String host = "10.10.21.229:27017";
String username = "flinkuser";
String password = "flinkpw";
String databaseList = "test";
String collectionList = "test.test01,test.user";
DebeziumSourceFunction<String> source = MongoDBSource.<String>builder()
.hosts(host)
.username(username)
.password(password)
.databaseList(databaseList.split(","))
.collectionList(collectionList.split(","))
.deserializer(new StringDebeziumDeserializationSchema())
.build();
DataStreamSource<String> stringDataStreamSource = env.addSource(source);
stringDataStreamSource.print();
env.execute();
}
}
运行结果
Flink Mongo CDC SQL方式 Demo
Flink SQL语句准备
Source Table SQL
CREATE TABLE mongoCDCSourceTable (
_id STRING,
audio STRING,
detail STRING,
location ROW<
alt DOUBLE,
lat DOUBLE,
long DOUBLE
>,
ppx ROW<
x DOUBLE,
y DOUBLE,
z DOUBLE
>,
hight DOUBLE,
userid INT,
`time` STRING,
send_message ARRAY<
ROW<
message STRING,
`time` STRING
>
>,
not_exist ROW<
not_exist1 STRING,
not_exist2 STRING
>,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = '10.10.21.229:27017',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database' = 'test',
'collection' = 'test01'
)
Sink Table SQL
CREATE TABLE printSinkTable(
_id STRING,
audio STRING,
detail STRING,
location_alt DOUBLE,
location_lat DOUBLE,
location_long DOUBLE,
ppx_x DOUBLE,
ppx_y DOUBLE,
ppx_z DOUBLE,
hight DOUBLE,
userid INT,
`time` STRING,
send_message_messages ARRAY<STRING>,
send_message_times ARRAY<STRING>,
not_exist_not_exist1 STRING,
not_exist_not_exist2 STRING
) WITH (
'connector' = 'print'
)
Insert SQL
INSERT INTO printSinkTable
SELECT
_id
,audio
,detail
,location.alt
,location.lat
,location.long
,ppx.x
,ppx.y
,ppx.z
,hight
,userid
,`time`
,collect_list(send_message_message)
,collect_list(send_message_time)
,not_exist.not_exist1
,not_exist.not_exist2
FROM mongoCDCSourceTable,UNNEST(send_message) as t(send_message_message,send_message_time)
group by
_id
,audio
,detail
,location.alt
,location.lat
,location.long
,ppx.x
,ppx.y
,ppx.z
,hight
,userid
,`time`
,not_exist.not_exist1
,not_exist.not_exist2
CollectList UDF函数
需要自己自定义一个聚合函数,collect_list
package com.ambitfly.mongo.demo.function;
import org.apache.flink.table.functions.AggregateFunction;
import java.util.ArrayList;
import java.util.List;
public class CollectList extends AggregateFunction<String[], List<String>> {
public void retract(List acc,String conlum){
acc.remove(conlum);
}
public void accumulate(List acc,String conlum){
acc.add(conlum);
}
@Override
public String[] getValue(List list) {
return (String[]) list.toArray(new String[0]);
}
@Override
public List createAccumulator() {
List list = new ArrayList<>();
return list;
}
public void resetAccumulator(List list){
list.clear();
}
}
Flink Mongo CDC SQL方式代码
package com.ambitfly.mongo.demo;
import com.ambitfly.mongo.demo.function.CollectList;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import java.util.concurrent.ExecutionException;
public class FlinkCDCSQLDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
//.inBatchMode()
.build();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);
tableEnvironment.executeSql(
"CREATE TABLE mongoCDCSourceTable (\n" +
"\t_id STRING,\n" +
"\taudio STRING,\n" +
"\tdetail STRING,\n" +
"\tlocation ROW<\n" +
"\t\talt DOUBLE,\n" +
"\t\tlat DOUBLE,\n" +
"\t\tlong DOUBLE\n" +
"\t\t>,\n" +
"\tppx ROW<\n" +
"\t\tx DOUBLE,\n" +
"\t\ty DOUBLE,\n" +
"\t\tz DOUBLE\n" +
"\t\t>,\n" +
"\thight DOUBLE,\n" +
"\tuserid INT,\n" +
"\t`time` STRING,\n" +
"\tsend_message ARRAY<\n" +
"\t\tROW<\n" +
"\t\t\tmessage STRING,\n" +
"\t\t\t`time` STRING\n" +
"\t\t\t>\n" +
"\t\t>,\n" +
"\tnot_exist ROW<\n" +
"\t\tnot_exist1 STRING,\n" +
"\t\tnot_exist2 STRING\n" +
"\t>,\n" +
"PRIMARY KEY(_id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'mongodb-cdc',\n" +
" 'hosts' = '10.10.21.229:27017',\n" +
" 'username' = 'flinkuser',\n" +
" 'password' = 'flinkpw',\n" +
" 'database' = 'test',\n" +
" 'collection' = 'test01'\n" +
")");
tableEnvironment.executeSql(
"CREATE TABLE printSinkTable(\n" +
"\t_id STRING,\n" +
"\taudio STRING,\n" +
"\tdetail STRING,\n" +
"\tlocation_alt DOUBLE,\n" +
"\tlocation_lat DOUBLE,\n" +
"\tlocation_long DOUBLE,\n" +
"\tppx_x DOUBLE,\n" +
"\tppx_y DOUBLE,\n" +
"\tppx_z DOUBLE,\n" +
"\thight DOUBLE,\n" +
"\tuserid INT,\n" +
"\t`time` STRING,\n" +
"\tsend_message_messages ARRAY<STRING>,\n" +
"\tsend_message_times ARRAY<STRING>,\n" +
"\tnot_exist_not_exist1 STRING,\n" +
"\tnot_exist_not_exist2 STRING\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")");
// 注册自定义函数
tableEnvironment.createTemporaryFunction("collect_list", CollectList.class);
TableResult tableResult = tableEnvironment.executeSql(
"INSERT INTO printSinkTable\n" +
"SELECT\n" +
"\t_id\n" +
"\t,audio\n" +
"\t,detail\n" +
"\t,location.alt\n" +
"\t,location.lat\n" +
"\t,location.long\n" +
"\t,ppx.x\n" +
"\t,ppx.y\n" +
"\t,ppx.z\n" +
"\t,hight\n" +
"\t,userid\n" +
"\t,`time`\n" +
"\t,collect_list(send_message_message)\n" +
"\t,collect_list(send_message_time)\n" +
"\t,not_exist.not_exist1\n" +
"\t,not_exist.not_exist2\n" +
"FROM mongoCDCSourceTable,UNNEST(send_message) as t(send_message_message,send_message_time)\n" +
"group by \n" +
"\t_id\n" +
"\t,audio\n" +
"\t,detail\n" +
"\t,location.alt\n" +
"\t,location.lat\n" +
"\t,location.long\n" +
"\t,ppx.x\n" +
"\t,ppx.y\n" +
"\t,ppx.z\n" +
"\t,hight\n" +
"\t,userid\n" +
"\t,`time`\n" +
"\t,not_exist.not_exist1\n" +
"\t,not_exist.not_exist2");
tableResult.getJobClient().get().getJobExecutionResult().get();
}
}
运行结果
自定义Mongo CDC DataStream的序列化器
简单的json序列化器
结构如下,fullDocument为具体数据,operationType有insert、update、delete,documentKey为主键
{
"db": "xxx",
"tableName": "xxx",
"fullDocument":{},
"documentKey":{},
"operationType": ""
}
代码
package com.ambitfly.mongo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import io.debezium.spi.converter.ConvertedField;
import io.debezium.spi.converter.CustomConverter;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class MyMongoDeserializationSchema implements DebeziumDeserializationSchema<String>, CustomConverter {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
JSONObject result = new JSONObject();
String topic = sourceRecord.topic();
String[] fileds = topic.split("\\.");
// 获取数据库名和表名
if (fileds.length >= 2) {
result.put("db",fileds[0]);
result.put("tableName",fileds[1]);
}
Struct value = (Struct) sourceRecord.value();
Schema schema1 = value.schema();
/* List<Field> fields1 = schema1.fields();
for (Field field : fields1) {
System.out.println(field.name()+":"+value.getString(field.name()));
}*/
// 获取fullDocument数据
String fullDocument = value.getString("fullDocument");
if(fullDocument!=null) {
JSONObject fullDocumentJson = (JSONObject) JSONObject.toJSON(JSON.parse(fullDocument));
result.put("fullDocument",fullDocumentJson);
}
// 获取documentKey数据
String documentKey = value.getString("documentKey");
if (documentKey != null) {
JSONObject documentKeyJson = (JSONObject) JSONObject.toJSON(JSON.parse(documentKey));
result.put("documentKey",documentKeyJson);
}
// 获取操作类型
String operationType = value.getString("operationType");
if(operationType != null){
result.put("operationType",operationType);
}
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
@Override
public void configure(Properties properties) {
}
@Override
public void converterFor(ConvertedField convertedField, ConverterRegistration converterRegistration) {
}
}
Debezium Format json序列化器
Debezium是一个CDC (Changelog Data Capture)工具,可以实时从MySQL, PostgreSQL, Oracle, Microsoft SQL Server和许多其他数据库到Kafka中进行更改。Debezium为更新日志提供了统一的格式模式,并支持使用JSON和Apache Avro序列化消息。Flink支持将Debezium JSON和Avro消息解释为INSERT/UPDATE/DELETE消息到Flink SQL系统。Flink还支持将Flink SQL中的INSERT/UPDATE/DELETE消息编码为Debezium JSON或Avro消息,并发送到Kafka等外部系统。然而,目前Flink不能将UPDATE_BEFORE和UPDATE_AFTER合并到一个更新消息中。因此,Flink将UPDATE_BEFORE和UDPATE_AFTER编码为DELETE和INSERT Debezium消息。
结构如下
{
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
},
"op": "d"
}
代码
package com.ambitfly.mongo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.spi.converter.ConvertedField;
import io.debezium.spi.converter.CustomConverter;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
import java.util.Properties;
public class MyMongoDebeziumSchema implements DebeziumDeserializationSchema<String>, CustomConverter {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
JSONObject result = new JSONObject();
String topic = sourceRecord.topic();
String[] fileds = topic.split("\\.");
// 获取数据库名和表名
if (fileds.length >= 2) {
result.put("db",fileds[0]);
result.put("tableName",fileds[1]);
}
Struct value = (Struct) sourceRecord.value();
Schema schema1 = value.schema();
// 获取fullDocument数据
String fullDocument = value.getString("fullDocument");
if(fullDocument!=null) {
JSONObject fullDocumentJson = (JSONObject) JSONObject.toJSON(JSON.parse(fullDocument));
result.put("fullDocument",fullDocumentJson);
}
// 获取documentKey数据
String documentKey = value.getString("documentKey");
String _id = "";
if (documentKey != null) {
JSONObject documentKeyJson = (JSONObject) JSONObject.toJSON(JSON.parse(documentKey));
_id = documentKeyJson.getJSONObject("_id").getString("$oid");
result.put("documentKey",documentKeyJson);
}
result.put("_id",_id);
// 获取操作类型
String operationType = value.getString("operationType");
if(operationType != null){
result.put("operationType",operationType);
JSONObject debeziumJson = new JSONObject();
if("insert".equals(operationType)){
debeziumJson.put("before",null);
debeziumJson.put("after",result);
debeziumJson.put("op","c");
collector.collect(debeziumJson.toJSONString());
}else if ("delete".equals(operationType)){
debeziumJson.put("before",result);
debeziumJson.put("after",null);
debeziumJson.put("op","d");
collector.collect(debeziumJson.toJSONString());
}else if ("update".equals(operationType)){
debeziumJson.put("before",result);
debeziumJson.put("after",null);
debeziumJson.put("op","d");
collector.collect(debeziumJson.toJSONString());
debeziumJson.put("before",null);
debeziumJson.put("after",result);
debeziumJson.put("op","c");
collector.collect(debeziumJson.toJSONString());
}
}
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
@Override
public void configure(Properties properties) {
}
@Override
public void converterFor(ConvertedField convertedField, ConverterRegistration converterRegistration) {
}
}
未完待续!
mongo->kafka
kafka->mysql
kafka->clickhouse
更多推荐
所有评论(0)