实时数仓(三)业务数据从ods到dwd中数据的动态分流
业务数据从ods到dwd中数据的动态分流从 Kafka 的业务数据 ODS 层读取数据,经过处理后, 将维度数据保存到 HBase,将事实数据写回 Kafka 作为业务数据的 DWD 层。一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL 等。这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。这里使用广播流进行实现
·
业务数据从ods到dwd中数据的动态分流
从 Kafka 的业务数据 ODS 层读取数据,经过处理后, 将维度数据保存到 HBase,将事实数据写回 Kafka 作为业务数据的 DWD 层。
一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL 等。这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。
这里使用广播流进行实现。
(1)主程序代码
package com.yyds.app.dwd;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.yyds.app.function.MyDeserialization;
import com.yyds.app.function.TableProcessFunction;
import com.yyds.bean.TableProcess;
import com.yyds.utils.MyKafkaUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.OutputTag;
public class BaseDBApp {
public static void main(String[] args) {
//TODO 1、获取执行环境
System.setProperty("HADOOP_USER_NAME","root");
// 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 开启 Checkpoint,每隔 5 秒钟做一次 Checkpoint
env.enableCheckpointing(5000L);
//指定 CK 的一致性语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置超时时间
//env.getCheckpointConfig().setAlignmentTimeout(10000L);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
// 重启策略
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L));
//设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// 设置状态后端
env.setStateBackend(new FsStateBackend("hdfs://centos01:8020/flinkCDC/ck"));
//TODO 2、消费kafka ods_base_db 主题数据创建流
String sourceTopic = "ods_base_db";
String groupId = "base_db_app_2022";
FlinkKafkaConsumer<String> kafkaConsumer = MyKafkaUtils.getKafkaConsumer(sourceTopic, groupId);
DataStreamSource<String> kafkaDS = env.addSource(kafkaConsumer);
//TODO 3、将每行数据转换为JSON对象并过滤 主流
SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject)
.filter(new FilterFunction<JSONObject>() {
@Override
public boolean filter(JSONObject value) throws Exception {
String type = value.getString("type");
return !"delete".equals(type);
}
});
//TODO 4、使用flink cdc消费配置表 并处理为广播流
DebeziumSourceFunction<String> mySQLSource = MySQLSource.<String>builder()
.hostname("centos01")
.port(3306)
.username("root")
.password("123456")
.databaseList("flink-realtime")
.tableList("flink-realtime.table_process") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据 注意:指定的时候需要使用"db.table"的方式
.deserializer(new MyDeserialization())
.startupOptions(StartupOptions.initial())
.build();
DataStreamSource<String> tableProcessDS = env.addSource(mySQLSource);
// 广播状态
MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor(
"map-state",
String.class,
TableProcess.class
);
BroadcastStream<String> broadcastStream = tableProcessDS.broadcast(mapStateDescriptor);
//TODO 5、连接主流和广播流
BroadcastConnectedStream<JSONObject, String> connectedStream = jsonObjDS.connect(broadcastStream);
//TODO 6、处理数据 (分流)
OutputTag<JSONObject> hbaseTag = new OutputTag<JSONObject>("hbase-tag"){};
SingleOutputStreamOperator<JSONObject> kafkaDataStream = connectedStream.process(new TableProcessFunction(
hbaseTag,
mapStateDescriptor
));
//TODO 7、提取kafka流数据 和 Hbase流数据
DataStream<JSONObject> hbaseDataStream = kafkaDataStream.getSideOutput(hbaseTag);
kafkaDataStream.print("kafka----------");
hbaseDataStream.print("hbase----------");
//TODO 8、kafka数据到kafka主题 和 Hbase流数据写入phoenix表
//TODO 9、启动任务
env.execute("BaseDBApp");
}
}
(2)BroadcastProcessFunction
package com.yyds.app.function;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yyds.bean.TableProcess;
import com.yyds.common.FlinkConfig;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {
private Connection connection;
private OutputTag<JSONObject> hbaseTag;
private MapStateDescriptor<String, TableProcess> mapStateDescriptor;
public TableProcessFunction(OutputTag<JSONObject> hbaseTag,
MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
this.hbaseTag = hbaseTag;
this.mapStateDescriptor = mapStateDescriptor;
}
@Override
public void open(Configuration parameters) throws Exception {
Class.forName(FlinkConfig.PHOENIX_DRIVER);
connection = DriverManager.getConnection(
FlinkConfig.PHOENIX_SERVER
);
}
// 广播流
@Override
public void processBroadcastElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {
//{"database":"","before":{},"after":{},"type":"insert","tableName":""}
// 1、解析数据
String data = JSONObject.parseObject(value).getString("after");
TableProcess tableProcess = JSON.parseObject(data, TableProcess.class);
// 2、检查hbase表是否存在并建表
if (TableProcess.SINK_TYPE_HBASE.equals(tableProcess.getSinkType())) {
checkTable(
tableProcess.getSinkTable(),
tableProcess.getSinkColumns(),
tableProcess.getSinkPk(),
tableProcess.getSinkExtend()
);
}
// 3、写入状态 广播出去
BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
String key = tableProcess.getSourceTable() + "-" + tableProcess.getOperateType();
broadcastState.put(key, tableProcess);
}
// 主流
@Override
public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
// 1、读取状态
ReadOnlyBroadcastState<String, TableProcess> readOnlyBroadcastState = ctx.getBroadcastState(mapStateDescriptor);
//{"database":"","before":{},"after":{},"type":"insert","tableName":""}
String tableName = value.getString("tableName");
String type = value.getString("type");
String key = tableName + "-" + type;
TableProcess tableProcess = readOnlyBroadcastState.get(key);
if (tableProcess != null) {
// 2、过滤数据
JSONObject data = value.getJSONObject("after");
filterColumn(data, tableProcess.getSinkColumns());
// 3、分流
// 将hbase输出表/kafka主题信息写入到value中
value.put("sinkTable", tableProcess.getSinkTable());
String sinkType = tableProcess.getSinkType();
if (TableProcess.SINK_TYPE_KAFKA.equals(sinkType)) {
//kafka数据,写入主流
out.collect(value);
} else if (TableProcess.SINK_TYPE_HBASE.equals(sinkType)) {
//hbase 写入侧输出流
ctx.output(hbaseTag, value);
}
} else {
System.out.println("key = " + key + " ,不存在");
}
}
//过滤字段
private void filterColumn(JSONObject data, String sinkColumns) {
String[] split = sinkColumns.split(",");
List<String> columns = Arrays.asList(split);
// Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator();
// while (iterator.hasNext()){
// Map.Entry<String, Object> next = iterator.next();
// if(!columns.contains(next.getKey())){
// iterator.remove();
// }
// }
data.entrySet().removeIf(next -> !columns.contains(next.getKey()));
}
// 建表create table if not exists mydb.test(id varchar primary key,name varchar,sex varchar)
private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {
PreparedStatement preparedStatement = null;
try {
//给主键以及扩展字段赋默认值
if (sinkPk == null) {
sinkPk = "id";
}
if (sinkExtend == null) {
sinkExtend = "";
}
StringBuffer sql = new StringBuffer("create table if not exists ")
.append(FlinkConfig.HBASE_SCHEMA)
.append(".")
.append(sinkTable)
.append(" ( ");
String[] fields = sinkColumns.split(",");
for (int i = 0; i < fields.length; i++) {
String field = fields[i];
// 判断是否为主键
if (sinkPk.equals(field)) {
sql.append(field).append(" varchar primary key");
} else {
sql.append(field).append(" varchar");
}
// 判断是否为最后一个字段
if (i < fields.length - 1) {
sql.append(",");
}
}
sql.append(")").append(sinkExtend);
System.out.println(sql);
//执行
preparedStatement = connection.prepareStatement(sql.toString());
preparedStatement.execute();
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException(sinkTable + " 建表失败!!!");
}finally {
if(preparedStatement != null){
try {
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}
(3)其他
package com.yyds.bean;
import lombok.Data;
@Data
public class TableProcess {
//动态分流 Sink 常量
public static final String SINK_TYPE_HBASE = "hbase";
public static final String SINK_TYPE_KAFKA = "kafka";
public static final String SINK_TYPE_CK = "clickhouse";
//来源表
String sourceTable;
//操作类型 insert,update,delete
String operateType;
//输出类型 hbase kafka
String sinkType;
//输出表(主题)
String sinkTable;
//输出字段
String sinkColumns;
//主键字段
String sinkPk;
//建表扩展
String sinkExtend;
}
package com.yyds.common;
public class FlinkConfig {
//Phoenix 库名
public static final String HBASE_SCHEMA = "FLINK_REALTIME";
//Phoenix 驱动
public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
//Phoenix 连接参数
public static final String PHOENIX_SERVER =
"jdbc:phoenix:centos01,centos02,centos03:2181";
}
更多推荐
已为社区贡献8条内容
所有评论(0)