(一)案例介绍

本案例是把Mongo数据库的数据通过FlinkCDC实时导入到Kafka,消费Kafka数据把维表数据写入到MySQL。读取MySQL维表数据和消费Kafka的数据通过Flink SQL Join后导入到ClickHouse。

在这里插入图片描述

(二) maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>mini-program</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>flink-mongo-realdata</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
<!--        <flink.version>1.12.7</flink.version>-->
<!--        <flink.scala.version>2.11</flink.scala.version>-->
        <flink.version>1.13.3</flink.version>
        <flink.scala.version>2.12</flink.scala.version>
        <flink-cdc-version>2.1.0</flink-cdc-version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${flink.scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${flink.scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.3.1-patch</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-dbcp2 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-dbcp2</artifactId>
            <version>2.7.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.mongodb/mongo-java-driver -->

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mongodb-cdc</artifactId>
            <version>2.2.1</version>
           <!-- <exclusions>
               <exclusion>
                   <groupId>org.apache.kafka</groupId>
                   <artifactId>kafka-clients</artifactId>
               </exclusion>
            </exclusions>-->
        </dependency>
        
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${flink.scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.2.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${flink.scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${flink.scala.version}</artifactId>
            <version>${flink.version}</version>
     <!--       <exclusions>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>-->
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.7.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.16</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table</artifactId>
            <version>${flink.version}</version>
            <type>pom</type>
<!--            <scope>provided</scope>-->
            <!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${flink.scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${flink.scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${flink.scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${flink.scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-clickhouse</artifactId>
            <version>1.13.2-SNAPSHOT</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>


</project>

(三) 数据准备

test01表 json数据


{
	"audio": "12345",
	"detail": "hello world01",
	"location": {
        "alt": 1209.2559999999994,
        "lat": 39.38871949999998,
        "long": 106.9585184
    },
	"ppx": {
        "x": -4833.720000125517,
        "y": 944.3920672436675,
        "z": 1209.2559999999994
    },
	"hight": 90.203,
	"userid": 8,
	"time": "2022/11/14 09:59:52",
	"send_message": [
		{
			"message": "how are you!",
			"time": "2022/11/14 09:58:50"
		},
		{
			"message": "what?",
			"time": "2022/11/14 09:59:35"
		}
	]
}


{
	"audio": "67890",
	"detail": "hello world02",
	"location": {
        "alt": 1209.2559999999994,
        "lat": 39.38871949999998,
        "long": 106.9585184
    },
	"ppx": {
        "x": -4833.720000125517,
        "y": 944.3920672436675,
        "z": 1209.2559999999994
    },
	"hight": 80.203,
	"userid": 7,
	"time": "2022/11/13 09:59:52",
	"send_message": [
		{
			"message": "how are you!",
			"time": "2022/11/14 09:58:50"
		},
		{
			"message": "what?",
			"time": "2022/11/14 09:59:35"
		}
	]
}

user表 json数据


{
	"id": 7,
	"name": "abel",
	register_time: "2021/04/12 15:08:31",
	"age": 36
}


{
	"id": 8,
	"name": "piter",
	register_time: "2020/05/09 06:59:52",
	"age": 45
}

使用db.test01.insertOne(json数据) 插入到mongo数据库即可

(四) Mongo到Kafka

Flink Mongo CDC介绍

github地址

https://github.com/ambitfly/flink-cdc-connectors/blob/master/docs/content/connectors/mongodb-cdc.md

MongoDB CDC连接器通过伪装一个MongoDB集群里副本,利用MongoDB集群的高可用机制,该副本可以从master节点获取完整oplog(operation log)事件流。

依赖条件

MongoDB版本
MongoDB version >= 3.6

集群部署
副本集 或 分片集群

Storage Engine
WiredTiger存储引擎。

副本集协议版本
副本集协议版本1 (pv1) 。
从4.0版本开始,MongoDB只支持pv1。 pv1是MongoDB 3.2或更高版本创建的所有新副本集的默认值。

需要的权限
MongoDB Kafka Connector需要changeStream 和 read 权限。
您可以使用下面的示例进行简单授权:
更多详细授权请参考MongoDB数据库用户角色。

use admin;
db.createUser({
  user: "flinkuser",
  pwd: "flinkpw",
  roles: [
    { role: "read", db: "admin" }, //read role includes changeStream privilege 
    { role: "readAnyDatabase", db: "admin" } //for snapshot reading
  ]
});

Flink Mongo CDC DataStream Demo

Flink Mongo CDC DataStream代码
package com.ambitfly.mongo.demo;

import com.ambitfly.mongo.MyMongoDebeziumSchema;
import com.ambitfly.utils.PropertiesUtil;
import com.ververica.cdc.connectors.mongodb.MongoDBSource;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkCDCDataStreamDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        String host = "10.10.21.229:27017";
        String username = "flinkuser";
        String password = "flinkpw";
        String databaseList = "test";
        String collectionList = "test.test01,test.user";

        DebeziumSourceFunction<String> source = MongoDBSource.<String>builder()
                .hosts(host)
                .username(username)
                .password(password)
                .databaseList(databaseList.split(","))
                .collectionList(collectionList.split(","))
                .deserializer(new StringDebeziumDeserializationSchema())
                .build();

        DataStreamSource<String> stringDataStreamSource = env.addSource(source);

        stringDataStreamSource.print();

        env.execute();
    }
}

运行结果

在这里插入图片描述

Flink Mongo CDC SQL方式 Demo

Flink SQL语句准备

Source Table SQL

CREATE TABLE mongoCDCSourceTable (
    _id STRING,
	audio STRING,
	detail STRING,
	location ROW<
		alt DOUBLE,
		lat DOUBLE,
		long DOUBLE
		>,
	ppx ROW<
		x DOUBLE,
		y DOUBLE,
		z DOUBLE
		>,
	hight DOUBLE,
	userid INT,
	`time` STRING,
	send_message ARRAY<
		ROW<
			message STRING,
			`time` STRING
			>
		>,
	not_exist ROW<
		not_exist1 STRING,
		not_exist2 STRING
	>,
     PRIMARY KEY(_id) NOT ENFORCED
) WITH (
    'connector' = 'mongodb-cdc',
    'hosts' = '10.10.21.229:27017',
    'username' = 'flinkuser',
    'password' = 'flinkpw',
    'database' = 'test',
    'collection' = 'test01'
)

Sink Table SQL

CREATE TABLE printSinkTable(
    _id STRING,
	audio STRING,
	detail STRING,
	location_alt DOUBLE,
	location_lat DOUBLE,
	location_long DOUBLE,
	ppx_x DOUBLE,
	ppx_y DOUBLE,
	ppx_z DOUBLE,
	hight DOUBLE,
	userid INT,
	`time` STRING,
	send_message_messages ARRAY<STRING>,
	send_message_times ARRAY<STRING>,
	not_exist_not_exist1 STRING,
	not_exist_not_exist2 STRING
) WITH (
    'connector' = 'print'
)

Insert SQL

INSERT INTO printSinkTable
SELECT
	_id
	,audio
	,detail
	,location.alt
	,location.lat
	,location.long
	,ppx.x
	,ppx.y
	,ppx.z
	,hight
	,userid
	,`time`
	,collect_list(send_message_message)
	,collect_list(send_message_time)
	,not_exist.not_exist1
	,not_exist.not_exist2
FROM mongoCDCSourceTable,UNNEST(send_message) as t(send_message_message,send_message_time)
group by 
	_id
	,audio
	,detail
	,location.alt
	,location.lat
	,location.long
	,ppx.x
	,ppx.y
	,ppx.z
	,hight
	,userid
	,`time`
	,not_exist.not_exist1
	,not_exist.not_exist2
CollectList UDF函数

需要自己自定义一个聚合函数,collect_list

package com.ambitfly.mongo.demo.function;

import org.apache.flink.table.functions.AggregateFunction;

import java.util.ArrayList;
import java.util.List;

public class CollectList extends AggregateFunction<String[], List<String>> {

    public void retract(List acc,String conlum){
        acc.remove(conlum);
    }

    public void accumulate(List acc,String conlum){
        acc.add(conlum);
    }

    @Override
    public String[] getValue(List list) {
        return (String[]) list.toArray(new String[0]);
    }

    @Override
    public List createAccumulator() {
        List list = new ArrayList<>();
        return list;
    }

    public void resetAccumulator(List list){
        list.clear();
    }
}

Flink Mongo CDC SQL方式代码
package com.ambitfly.mongo.demo;

import com.ambitfly.mongo.demo.function.CollectList;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;

import java.util.concurrent.ExecutionException;

public class FlinkCDCSQLDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                //.inBatchMode()
                .build();


        TableEnvironment tableEnvironment = TableEnvironment.create(settings);

        tableEnvironment.executeSql(
                "CREATE TABLE mongoCDCSourceTable (\n" +
                "\t_id STRING,\n" +
                "\taudio STRING,\n" +
                "\tdetail STRING,\n" +
                "\tlocation ROW<\n" +
                "\t\talt DOUBLE,\n" +
                "\t\tlat DOUBLE,\n" +
                "\t\tlong DOUBLE\n" +
                "\t\t>,\n" +
                "\tppx ROW<\n" +
                "\t\tx DOUBLE,\n" +
                "\t\ty DOUBLE,\n" +
                "\t\tz DOUBLE\n" +
                "\t\t>,\n" +
                "\thight DOUBLE,\n" +
                "\tuserid INT,\n" +
                "\t`time` STRING,\n" +
                "\tsend_message ARRAY<\n" +
                "\t\tROW<\n" +
                "\t\t\tmessage STRING,\n" +
                "\t\t\t`time` STRING\n" +
                "\t\t\t>\n" +
                "\t\t>,\n" +
                "\tnot_exist ROW<\n" +
                "\t\tnot_exist1 STRING,\n" +
                "\t\tnot_exist2 STRING\n" +
                "\t>,\n" +
                "PRIMARY KEY(_id) NOT ENFORCED\n" +
                ") WITH (\n" +
                "    'connector' = 'mongodb-cdc',\n" +
                "    'hosts' = '10.10.21.229:27017',\n" +
                "    'username' = 'flinkuser',\n" +
                "    'password' = 'flinkpw',\n" +
                "    'database' = 'test',\n" +
                "    'collection' = 'test01'\n" +
                ")");

        tableEnvironment.executeSql(
                "CREATE TABLE printSinkTable(\n" +
                "\t_id STRING,\n" +
                "\taudio STRING,\n" +
                "\tdetail STRING,\n" +
                "\tlocation_alt DOUBLE,\n" +
                "\tlocation_lat DOUBLE,\n" +
                "\tlocation_long DOUBLE,\n" +
                "\tppx_x DOUBLE,\n" +
                "\tppx_y DOUBLE,\n" +
                "\tppx_z DOUBLE,\n" +
                "\thight DOUBLE,\n" +
                "\tuserid INT,\n" +
                "\t`time` STRING,\n" +
                "\tsend_message_messages ARRAY<STRING>,\n" +
                "\tsend_message_times ARRAY<STRING>,\n" +
                "\tnot_exist_not_exist1 STRING,\n" +
                "\tnot_exist_not_exist2 STRING\n" +
                ") WITH (\n" +
                "    'connector' = 'print'\n" +
                ")");

        // 注册自定义函数
        tableEnvironment.createTemporaryFunction("collect_list", CollectList.class);


        TableResult tableResult = tableEnvironment.executeSql(
                "INSERT INTO printSinkTable\n" +
                        "SELECT\n" +
                        "\t_id\n" +
                        "\t,audio\n" +
                        "\t,detail\n" +
                        "\t,location.alt\n" +
                        "\t,location.lat\n" +
                        "\t,location.long\n" +
                        "\t,ppx.x\n" +
                        "\t,ppx.y\n" +
                        "\t,ppx.z\n" +
                        "\t,hight\n" +
                        "\t,userid\n" +
                        "\t,`time`\n" +
                        "\t,collect_list(send_message_message)\n" +
                        "\t,collect_list(send_message_time)\n" +
                        "\t,not_exist.not_exist1\n" +
                        "\t,not_exist.not_exist2\n" +
                        "FROM mongoCDCSourceTable,UNNEST(send_message) as t(send_message_message,send_message_time)\n" +
                        "group by \n" +
                        "\t_id\n" +
                        "\t,audio\n" +
                        "\t,detail\n" +
                        "\t,location.alt\n" +
                        "\t,location.lat\n" +
                        "\t,location.long\n" +
                        "\t,ppx.x\n" +
                        "\t,ppx.y\n" +
                        "\t,ppx.z\n" +
                        "\t,hight\n" +
                        "\t,userid\n" +
                        "\t,`time`\n" +
                        "\t,not_exist.not_exist1\n" +
                        "\t,not_exist.not_exist2");

        tableResult.getJobClient().get().getJobExecutionResult().get();
    }
}
运行结果

在这里插入图片描述

自定义Mongo CDC DataStream的序列化器

简单的json序列化器

结构如下,fullDocument为具体数据,operationType有insert、update、delete,documentKey为主键

{
    "db": "xxx",
    "tableName": "xxx",
    "fullDocument":{},
    "documentKey":{},
    "operationType": ""
}

代码

package com.ambitfly.mongo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import io.debezium.spi.converter.ConvertedField;
import io.debezium.spi.converter.CustomConverter;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;
import java.util.Map;
import java.util.Properties;


public class MyMongoDeserializationSchema implements DebeziumDeserializationSchema<String>, CustomConverter {
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        JSONObject result = new JSONObject();

        String topic = sourceRecord.topic();

        String[] fileds = topic.split("\\.");

        // 获取数据库名和表名
        if (fileds.length >= 2) {
            result.put("db",fileds[0]);
            result.put("tableName",fileds[1]);
        }

        Struct value = (Struct) sourceRecord.value();

        Schema schema1 = value.schema();
/*        List<Field> fields1 = schema1.fields();


        for (Field field : fields1) {
            System.out.println(field.name()+":"+value.getString(field.name()));
        }*/
        // 获取fullDocument数据
        String fullDocument = value.getString("fullDocument");

        if(fullDocument!=null) {
            JSONObject fullDocumentJson = (JSONObject) JSONObject.toJSON(JSON.parse(fullDocument));
            result.put("fullDocument",fullDocumentJson);
        }



        // 获取documentKey数据
        String documentKey = value.getString("documentKey");
        if (documentKey != null) {
            JSONObject documentKeyJson = (JSONObject) JSONObject.toJSON(JSON.parse(documentKey));
            result.put("documentKey",documentKeyJson);
        }


        // 获取操作类型
        String operationType = value.getString("operationType");
        if(operationType != null){
            result.put("operationType",operationType);
        }

        collector.collect(result.toJSONString());

    }
    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }

    @Override
    public void configure(Properties properties) {

    }

    @Override
    public void converterFor(ConvertedField convertedField, ConverterRegistration converterRegistration) {

    }
}

Debezium Format json序列化器

Debezium是一个CDC (Changelog Data Capture)工具,可以实时从MySQL, PostgreSQL, Oracle, Microsoft SQL Server和许多其他数据库到Kafka中进行更改。Debezium为更新日志提供了统一的格式模式,并支持使用JSON和Apache Avro序列化消息。Flink支持将Debezium JSON和Avro消息解释为INSERT/UPDATE/DELETE消息到Flink SQL系统。Flink还支持将Flink SQL中的INSERT/UPDATE/DELETE消息编码为Debezium JSON或Avro消息,并发送到Kafka等外部系统。然而,目前Flink不能将UPDATE_BEFORE和UPDATE_AFTER合并到一个更新消息中。因此,Flink将UPDATE_BEFORE和UDPATE_AFTER编码为DELETE和INSERT Debezium消息。

结构如下

{
  "before": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.18
  },
  "after": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.15
  },
  "op": "d"  
}

代码

package com.ambitfly.mongo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.spi.converter.ConvertedField;
import io.debezium.spi.converter.CustomConverter;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;
import java.util.Properties;


public class MyMongoDebeziumSchema implements DebeziumDeserializationSchema<String>, CustomConverter {
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        JSONObject result = new JSONObject();

        String topic = sourceRecord.topic();

        String[] fileds = topic.split("\\.");

        // 获取数据库名和表名
        if (fileds.length >= 2) {
            result.put("db",fileds[0]);
            result.put("tableName",fileds[1]);
        }

        Struct value = (Struct) sourceRecord.value();

        Schema schema1 = value.schema();

        // 获取fullDocument数据
        String fullDocument = value.getString("fullDocument");

        if(fullDocument!=null) {
            JSONObject fullDocumentJson = (JSONObject) JSONObject.toJSON(JSON.parse(fullDocument));
            result.put("fullDocument",fullDocumentJson);
        }



        // 获取documentKey数据
        String documentKey = value.getString("documentKey");
        String _id = "";
        if (documentKey != null) {
            JSONObject documentKeyJson = (JSONObject) JSONObject.toJSON(JSON.parse(documentKey));
            _id = documentKeyJson.getJSONObject("_id").getString("$oid");
            result.put("documentKey",documentKeyJson);
        }

        result.put("_id",_id);

        // 获取操作类型
        String operationType = value.getString("operationType");
        if(operationType != null){
            result.put("operationType",operationType);
            JSONObject debeziumJson = new JSONObject();
            if("insert".equals(operationType)){
                debeziumJson.put("before",null);
                debeziumJson.put("after",result);
                debeziumJson.put("op","c");
                collector.collect(debeziumJson.toJSONString());
            }else if ("delete".equals(operationType)){
                debeziumJson.put("before",result);
                debeziumJson.put("after",null);
                debeziumJson.put("op","d");
                collector.collect(debeziumJson.toJSONString());
            }else if ("update".equals(operationType)){
                debeziumJson.put("before",result);
                debeziumJson.put("after",null);
                debeziumJson.put("op","d");
                collector.collect(debeziumJson.toJSONString());

                debeziumJson.put("before",null);
                debeziumJson.put("after",result);
                debeziumJson.put("op","c");
                collector.collect(debeziumJson.toJSONString());
            }
        }

    }
    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }

    @Override
    public void configure(Properties properties) {

    }

    @Override
    public void converterFor(ConvertedField convertedField, ConverterRegistration converterRegistration) {

    }
}

未完待续!

mongo->kafka

kafka->mysql

kafka->clickhouse

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐