MySQL FlinkCDC 通过Kafka实时同步到ClickHouse(自定义Debezium格式支持增加删除修改)
把MySQL多库多表的数据通过FlinkCDC DataStream的方式实时同步到同一个Kafka的Topic中,然后下游再写Flink SQL拆分把数据写入到ClickHouse,FlinkCDC DataStream通过自定义Debezium格式的序列化器,除了增加,还能进行删除修改。关于Debezium格式的更多信息,参考Flink官网,网址如下。MySQL FlinkCDC 通过Kafk
·
MySQL FlinkCDC 通过Kafka实时同步到ClickHouse(自定义Debezium格式支持增加删除修改)
把MySQL多库多表的数据通过FlinkCDC DataStream的方式实时同步到同一个Kafka的Topic中,然后下游再写Flink SQL拆分把数据写入到ClickHouse,FlinkCDC DataStream通过自定义Debezium格式的序列化器,除了增加,还能进行删除修改。关于Debezium格式的更多信息,参考Flink官网,网址如下。
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/debezium/
相关依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${flink.scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${flink.scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.21</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>${flink.version}</version>
<type>pom</type>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${flink.scala.version}</artifactId>
<version>1.13.5</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${flink.scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-clickhouse</artifactId>
<version>1.12.7-SNAPSHOT</version>
</dependency>
</dependencies>
FlinkCDC DataStream主程序
- MySQL FlinkCDC DataStream方式时区有问题,快8小时,需要自己处理一下
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.1 开启 Checkpoint,每隔 5 秒钟做一次 CK
env.enableCheckpointing(5000L);
//2.2 指定 CK 的一致性语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//2.3 设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//2.4 指定从 CK 自动重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
//2.5 设置状态后端
env.setStateBackend(new FsStateBackend("hdfs://master:8020/flink/cdc"));
//2.6 设置访问 HDFS 的用户名
System.setProperty("HADOOP_USER_NAME", "hadoop");
String hostname = "88.88.88.888";
int port = 3306;
String username = "root";
String password = "password";
String databaseList = "test";
String[] databases = databaseList.split(",");
String tableList = "test.user";
String[] tables = tableList.split(",");
MySqlSource<String> mySQLSource = MySqlSource.<String>builder()
.hostname(hostname)
.port(port)
.username(username)
.password(password)
.databaseList(databases)
.tableList(tables) //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据
.startupOptions(StartupOptions.initial())
.deserializer(new MyDebeziumSchema())
.build();
DataStreamSource<String> mySQL_source = env.fromSource(mySQLSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
mySQL_source.print();
// Kafka Sink
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "kafka.bootstrap.servers=master:9092,node1:9092,node2:9092");
properties.setProperty("transaction.timeout.ms", "300000");
String topicName = "mysql_test_user";
KafkaSerializationSchema<String> serializationSchema = new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, Long timestamp) {
return new ProducerRecord<>(
topicName, // target topic
element.getBytes(StandardCharsets.UTF_8)); // record contents
}
};
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
topicName, // target topic
serializationSchema, // serialization schema
properties, // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
mySQL_source.addSink(myProducer);
env.execute();
}
自定义Debezium序列化器
public class MyDebeziumSchema implements DebeziumDeserializationSchema<String>, CustomConverter {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
// JSONObject result = new JSONObject();
String topic = sourceRecord.topic();
String[] fileds = topic.split("\\.");
String db = fileds[1];
String tableName = fileds[2];
Struct value = (Struct) sourceRecord.value();
// 获取before数据
Struct before = value.getStruct("before");
JSONObject beforeJson = new JSONObject();
if(before != null){
//获取列信息
Schema schema = before.schema();
List<Field> fields = schema.fields();
for (Field field : fields) {
// 调整时区减8个小时
if("io.debezium.time.Timestamp".equals(field.schema().name())){
Object o = before.get(field);
if(o!=null) {
Long v = (Long) o - 8*60*60*1000;
beforeJson.put(field.name(),v);
}else {
beforeJson.put(field.name(),null);
}
}
else {
beforeJson.put(field.name(),before.get(field));
}
}
//添加库名和表名
beforeJson.put("db",db);
beforeJson.put("tableName",tableName);
}
// 获取after数据
Struct after = value.getStruct("after");
JSONObject afterJson = new JSONObject();
if(after != null){
//获取列信息
Schema schema = after.schema();
List<Field> fields = schema.fields();
for (Field field : fields) {
if("io.debezium.time.Timestamp".equals(field.schema().name())){
// 调整时区减8个小时
Object o = after.get(field);
if(o!=null) {
Long v = (Long) o - 8*60*60*1000;
afterJson.put(field.name(),v);
}else {
afterJson.put(field.name(),null);
}
}
else {
afterJson.put(field.name(),after.get(field));
}
//添加库名和表名
afterJson.put("db",db);
afterJson.put("tableName",tableName);
}
}
// 获取操作类型
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String op = operation.toString();
// 构造DebeziumJson数据
JSONObject debeziumJson = new JSONObject();
if(op.equals("UPDATE")){
// flink update 由 delete 和 create 构成
debeziumJson.put("before",beforeJson);
debeziumJson.put("after",null);
debeziumJson.put("op","d");
collector.collect(debeziumJson.toJSONString());
debeziumJson.put("before",null);
debeziumJson.put("after",afterJson);
debeziumJson.put("op","c");
collector.collect(debeziumJson.toJSONString());
}else if(op.equals("CREATE") || op.equals("READ")){ //READ为全量数据
debeziumJson.put("before",null);
debeziumJson.put("after",afterJson);
debeziumJson.put("op","c");
collector.collect(debeziumJson.toJSONString());
}else if(op.equals("DELETE")){
debeziumJson.put("before",beforeJson);
debeziumJson.put("after",null);
debeziumJson.put("op","d");
collector.collect(debeziumJson.toJSONString());
}
// collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
@Override
public void configure(Properties properties) {
}
@Override
public void converterFor(ConvertedField convertedField, ConverterRegistration converterRegistration) {
}
}
把Kafka数据接入ClickHouse
public class KafkaToClickHouse {
public static void main(String[] args) throws ExecutionException, InterruptedException {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
//.inBatchMode()
.build();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);
tableEnvironment.executeSql("CREATE TABLE kafkaTable ( \n" +
"id Int, \n" +
"name String,\n"+
"age Int, \n" +
"`time` BIGINT, \n" +
"db String, \n" +
"tableName String \n" +
") WITH (" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'mysql_test_user',\n" +
" 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'latest-offset'," +
" 'debezium-json.ignore-parse-errors' = 'true' ," +
" 'format' = 'debezium-json'" +
")");
tableEnvironment.executeSql("CREATE TABLE test_user (" +
"id INT, \n" +
"name String, \n" +
"age Int, \n" +
"`time` TIMESTAMP, \n" +
"PRIMARY KEY (`id`) NOT ENFORCED\n" +
") WITH (" +
" 'connector' = 'clickhouse'," +
" 'url' = 'clickhouse://node2:8123'," +
" 'database-name' = 'default'," +
" 'table-name' = 'test_user'," +
" 'username' = 'default'," +
" 'password' = 'qwert', " +
" 'sink.ignore-delete' = 'false', " +
" 'catalog.ignore-primary-key' = 'false'," +
" 'sink.batch-size' = '1',\n" +
" 'sink.flush-interval' = '500'" +
")");
TableResult tableResult = tableEnvironment.executeSql("insert into test_user \n" +
"select " +
" id " +
" ,name " +
" ,age " +
" ,TO_TIMESTAMP(FROM_UNIXTIME(`time` / 1000, 'yyyy-MM-dd HH:mm:ss')) " +
"from kafkaTable " +
"where " +
" db = 'test' " +
" and tableName='user' ");
tableResult.getJobClient().get().getJobExecutionResult().get();
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)