实时数仓(二)业务数据库到ods(kafka)
利用flink cdc将业务数据库到ods(kafka)(1)主要代码package com.yyds.app.ods;import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;import
·
利用flink cdc将业务数据库到ods(kafka)
(1)主要代码
package com.yyds.app.ods;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.yyds.app.function.MyDeserialization;
import com.yyds.utils.MyKafkaUtils;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCDC {
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME","root");
// 1、获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 开启 Checkpoint,每隔 5 秒钟做一次 Checkpoint
env.enableCheckpointing(5000L);
//指定 CK 的一致性语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置超时时间
// env.getCheckpointConfig().setAlignmentTimeout(10000L);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
// 重启策略
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L));
//设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// 设置状态后端
env.setStateBackend(new FsStateBackend("hdfs://centos01:8020/flinkCDC/ck"));
// 2、通过cdc构建SourceFunction并且读取数据
DebeziumSourceFunction<String> mySQLSource = MySQLSource.<String>builder()
.hostname("centos01")
.port(3306)
.username("root")
.password("123456")
.databaseList("flink")
// .tableList("flink.base_trademark") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据 注意:指定的时候需要使用"db.table"的方式
.deserializer(new MyDeserialization())
.startupOptions(StartupOptions.latest())
.build();
DataStreamSource<String> streamSource = env.addSource(mySQLSource);
// 3、打印数据,将数据写入到kafka中
streamSource.print();
String sinkTopic = "ods_base_db";
streamSource.addSink(MyKafkaUtils.getKafkaProducer(sinkTopic));
// 4、启动任务
env.execute("FlinkCDCWithMyDeserialization");
}
}
(2)自定义反序列化器
package com.yyds.app.function;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
/**
* 自定义序列化器
*/
public class MyDeserialization implements DebeziumDeserializationSchema<String> {
/**
*封装为json字符串
* {
* "database":"",
* "tableName":"",
* "type":"c u d",
* "before":{
* "":"",
* "":"",
* "":""
* },
* "after":{
* "":"",
* "":"",
* "":""
* }
* }
*/
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
JSONObject res = new JSONObject();
// 获取数据库和表名称
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Struct value = (Struct)sourceRecord.value();
// 获取before数据
Struct before = value.getStruct("before");
JSONObject beforeJson = new JSONObject();
if(before != null){
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforeValue = before.get(field);
beforeJson.put(field.name(),beforeValue);
}
}
// 获取after数据
Struct after = value.getStruct("after");
JSONObject afterJson = new JSONObject();
if(after != null){
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
afterJson.put(field.name(),afterValue);
}
}
//获取操作类型 READ DELETE UPDATE CREATE
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if("create".equals(type)){
type = "insert";
}
// 将字段写到json对象中
res.put("database",database);
res.put("tableName",tableName);
res.put("before",beforeJson);
res.put("after",afterJson);
res.put("type",type);
//输出数据
collector.collect(res.toString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
(3)工具类
package com.yyds.utils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
public class MyKafkaUtils {
private static String brokers = "centos01:9092,centos02:9092,centos03:9092";
public static FlinkKafkaProducer<String> getKafkaProducer(String topic){
return new FlinkKafkaProducer<String>(
brokers,
topic,
new SimpleStringSchema()
);
}
public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic,String groupId){
Properties properties = new Properties();
properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers);
return new FlinkKafkaConsumer<String>(
topic,
new SimpleStringSchema(),
properties
);
}
}
更多推荐
已为社区贡献8条内容
所有评论(0)