利用flink cdc将业务数据库到ods(kafka)

(1)主要代码

package com.yyds.app.ods;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.yyds.app.function.MyDeserialization;
import com.yyds.utils.MyKafkaUtils;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkCDC {


    public static void main(String[] args) throws Exception {
        System.setProperty("HADOOP_USER_NAME","root");

        // 1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);



        // 开启 Checkpoint,每隔 5 秒钟做一次 Checkpoint
        env.enableCheckpointing(5000L);

        //指定 CK 的一致性语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        // 设置超时时间
//        env.getCheckpointConfig().setAlignmentTimeout(10000L);

        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);

        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);

        // 重启策略
        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L));

        //设置任务关闭的时候保留最后一次 CK 数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        );


        // 设置状态后端
        env.setStateBackend(new FsStateBackend("hdfs://centos01:8020/flinkCDC/ck"));


        // 2、通过cdc构建SourceFunction并且读取数据
        DebeziumSourceFunction<String> mySQLSource = MySQLSource.<String>builder()
                .hostname("centos01")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("flink")
               // .tableList("flink.base_trademark") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据  注意:指定的时候需要使用"db.table"的方式
                .deserializer(new MyDeserialization())
                .startupOptions(StartupOptions.latest())
                .build();


        DataStreamSource<String> streamSource = env.addSource(mySQLSource);


        // 3、打印数据,将数据写入到kafka中
        streamSource.print();
        String sinkTopic = "ods_base_db";
        streamSource.addSink(MyKafkaUtils.getKafkaProducer(sinkTopic));


        // 4、启动任务
        env.execute("FlinkCDCWithMyDeserialization");

    }
}

(2)自定义反序列化器

package com.yyds.app.function;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;

/**
 * 自定义序列化器
 */
public class MyDeserialization implements DebeziumDeserializationSchema<String> {



    /**
     *封装为json字符串
     * {
     *     "database":"",
     *     "tableName":"",
     *     "type":"c u d",
     *     "before":{
     *         "":"",
     *         "":"",
     *         "":""
     *     },
     *     "after":{
     *               "":"",
     *               "":"",
     *              "":""
     *     }
     * }
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {

        JSONObject res = new JSONObject();

        // 获取数据库和表名称
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        String database = fields[1];
        String tableName = fields[2];



        Struct value = (Struct)sourceRecord.value();
        // 获取before数据
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if(before != null){
            Schema beforeSchema = before.schema();
            List<Field> beforeFields = beforeSchema.fields();
            for (Field field : beforeFields) {
                Object beforeValue = before.get(field);
                beforeJson.put(field.name(),beforeValue);
            }
        }


        // 获取after数据
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if(after != null){
            Schema afterSchema = after.schema();
            List<Field> afterFields = afterSchema.fields();
            for (Field field : afterFields) {
                Object afterValue = after.get(field);
                afterJson.put(field.name(),afterValue);
            }
        }

        //获取操作类型 READ DELETE UPDATE CREATE
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if("create".equals(type)){
            type = "insert";
        }

        // 将字段写到json对象中
        res.put("database",database);
        res.put("tableName",tableName);
        res.put("before",beforeJson);
        res.put("after",afterJson);
        res.put("type",type);

        //输出数据
        collector.collect(res.toString());
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

(3)工具类

package com.yyds.utils;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

public class MyKafkaUtils {


    private static String brokers = "centos01:9092,centos02:9092,centos03:9092";

    public static FlinkKafkaProducer<String> getKafkaProducer(String topic){
        return new FlinkKafkaProducer<String>(
                brokers,
          topic,
          new SimpleStringSchema()
        );
    }


    public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic,String groupId){
        Properties properties = new Properties();

        properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers);

        return new FlinkKafkaConsumer<String>(
                topic,
                new SimpleStringSchema(),
                properties
        );
    }


}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐