业务数据从ods到dwd中数据的动态分流

从 Kafka 的业务数据 ODS 层读取数据,经过处理后, 将维度数据保存到 HBase,将事实数据写回 Kafka 作为业务数据的 DWD 层。
一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL 等。这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。
这里使用广播流进行实现。
在这里插入图片描述

(1)主程序代码

package com.yyds.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.yyds.app.function.MyDeserialization;
import com.yyds.app.function.TableProcessFunction;
import com.yyds.bean.TableProcess;
import com.yyds.utils.MyKafkaUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.OutputTag;

public class BaseDBApp {
    public static void main(String[] args) {
        //TODO 1、获取执行环境
        System.setProperty("HADOOP_USER_NAME","root");

        // 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 开启 Checkpoint,每隔 5 秒钟做一次 Checkpoint
        env.enableCheckpointing(5000L);
        //指定 CK 的一致性语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 设置超时时间
        //env.getCheckpointConfig().setAlignmentTimeout(10000L);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
        // 重启策略
        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L));
        //设置任务关闭的时候保留最后一次 CK 数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        );
        // 设置状态后端
        env.setStateBackend(new FsStateBackend("hdfs://centos01:8020/flinkCDC/ck"));

        //TODO 2、消费kafka ods_base_db  主题数据创建流
        String sourceTopic = "ods_base_db";
        String groupId = "base_db_app_2022";
        FlinkKafkaConsumer<String> kafkaConsumer = MyKafkaUtils.getKafkaConsumer(sourceTopic, groupId);
        DataStreamSource<String> kafkaDS = env.addSource(kafkaConsumer);

        //TODO 3、将每行数据转换为JSON对象并过滤   主流
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject)
                .filter(new FilterFunction<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject value) throws Exception {
                        String type = value.getString("type");
                        return !"delete".equals(type);
                    }
                });


        //TODO 4、使用flink cdc消费配置表   并处理为广播流
        DebeziumSourceFunction<String> mySQLSource = MySQLSource.<String>builder()
                .hostname("centos01")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("flink-realtime")
                 .tableList("flink-realtime.table_process") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据  注意:指定的时候需要使用"db.table"的方式
                .deserializer(new MyDeserialization())
                .startupOptions(StartupOptions.initial())
                .build();

        DataStreamSource<String> tableProcessDS = env.addSource(mySQLSource);

        // 广播状态
        MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor(
                "map-state",
                String.class,
                TableProcess.class
        );
        BroadcastStream<String> broadcastStream = tableProcessDS.broadcast(mapStateDescriptor);


        //TODO 5、连接主流和广播流
        BroadcastConnectedStream<JSONObject, String> connectedStream = jsonObjDS.connect(broadcastStream);


        //TODO 6、处理数据 (分流)
        OutputTag<JSONObject> hbaseTag = new OutputTag<JSONObject>("hbase-tag"){};
        SingleOutputStreamOperator<JSONObject> kafkaDataStream = connectedStream.process(new TableProcessFunction(
                hbaseTag,
                mapStateDescriptor
        ));



        //TODO 7、提取kafka流数据  和 Hbase流数据
        DataStream<JSONObject> hbaseDataStream = kafkaDataStream.getSideOutput(hbaseTag);
        kafkaDataStream.print("kafka----------");
        hbaseDataStream.print("hbase----------");


        //TODO 8、kafka数据到kafka主题  和 Hbase流数据写入phoenix表



        //TODO 9、启动任务
        env.execute("BaseDBApp");

    }
}

(2)BroadcastProcessFunction

package com.yyds.app.function;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yyds.bean.TableProcess;
import com.yyds.common.FlinkConfig;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;


public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {

    private Connection connection;
    private OutputTag<JSONObject> hbaseTag;
    private MapStateDescriptor<String, TableProcess> mapStateDescriptor;


    public TableProcessFunction(OutputTag<JSONObject> hbaseTag,
                                MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
        this.hbaseTag = hbaseTag;
        this.mapStateDescriptor = mapStateDescriptor;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName(FlinkConfig.PHOENIX_DRIVER);
        connection = DriverManager.getConnection(
                FlinkConfig.PHOENIX_SERVER
        );

    }

    // 广播流
    @Override
    public void processBroadcastElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {
        //{"database":"","before":{},"after":{},"type":"insert","tableName":""}
        // 1、解析数据
        String data = JSONObject.parseObject(value).getString("after");
        TableProcess tableProcess = JSON.parseObject(data, TableProcess.class);
        // 2、检查hbase表是否存在并建表
        if (TableProcess.SINK_TYPE_HBASE.equals(tableProcess.getSinkType())) {
            checkTable(
                    tableProcess.getSinkTable(),
                    tableProcess.getSinkColumns(),
                    tableProcess.getSinkPk(),
                    tableProcess.getSinkExtend()
            );
        }

        // 3、写入状态 广播出去
        BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        String key = tableProcess.getSourceTable() + "-" + tableProcess.getOperateType();
        broadcastState.put(key, tableProcess);
    }


    // 主流
    @Override
    public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
        // 1、读取状态
        ReadOnlyBroadcastState<String, TableProcess> readOnlyBroadcastState = ctx.getBroadcastState(mapStateDescriptor);
        //{"database":"","before":{},"after":{},"type":"insert","tableName":""}
        String tableName = value.getString("tableName");
        String type = value.getString("type");
        String key = tableName + "-" + type;
        TableProcess tableProcess = readOnlyBroadcastState.get(key);


        if (tableProcess != null) {
            // 2、过滤数据
            JSONObject data = value.getJSONObject("after");
            filterColumn(data, tableProcess.getSinkColumns());
            // 3、分流
            // 将hbase输出表/kafka主题信息写入到value中
            value.put("sinkTable", tableProcess.getSinkTable());
            String sinkType = tableProcess.getSinkType();
            if (TableProcess.SINK_TYPE_KAFKA.equals(sinkType)) {
                //kafka数据,写入主流
                out.collect(value);
            } else if (TableProcess.SINK_TYPE_HBASE.equals(sinkType)) {
                //hbase 写入侧输出流
                ctx.output(hbaseTag, value);
            }
        } else {
            System.out.println("key = " + key + " ,不存在");
        }
    }

    //过滤字段
    private void filterColumn(JSONObject data, String sinkColumns) {

        String[] split = sinkColumns.split(",");
        List<String> columns = Arrays.asList(split);
//        Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator();
//        while (iterator.hasNext()){
//            Map.Entry<String, Object> next = iterator.next();
//            if(!columns.contains(next.getKey())){
//                iterator.remove();
//            }
//        }
        data.entrySet().removeIf(next -> !columns.contains(next.getKey()));
    }


    // 建表create table if not exists mydb.test(id varchar primary key,name varchar,sex varchar)
    private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {
        PreparedStatement preparedStatement = null;
        try {


            //给主键以及扩展字段赋默认值
            if (sinkPk == null) {
                sinkPk = "id";
            }
            if (sinkExtend == null) {
                sinkExtend = "";
            }

            StringBuffer sql = new StringBuffer("create table if not exists ")
                    .append(FlinkConfig.HBASE_SCHEMA)
                    .append(".")
                    .append(sinkTable)
                    .append(" ( ");


            String[] fields = sinkColumns.split(",");
            for (int i = 0; i < fields.length; i++) {
                String field = fields[i];
                // 判断是否为主键
                if (sinkPk.equals(field)) {
                    sql.append(field).append(" varchar primary key");
                } else {
                    sql.append(field).append(" varchar");
                }

                // 判断是否为最后一个字段
                if (i < fields.length - 1) {
                    sql.append(",");
                }
            }
            sql.append(")").append(sinkExtend);
            System.out.println(sql);

            //执行
            preparedStatement = connection.prepareStatement(sql.toString());
            preparedStatement.execute();
        } catch (SQLException e) {
            e.printStackTrace();
            throw new RuntimeException(sinkTable + " 建表失败!!!");
        }finally {
            if(preparedStatement != null){
                try {
                    preparedStatement.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }


}

(3)其他

package com.yyds.bean;

import lombok.Data;

@Data
public class TableProcess {
    //动态分流 Sink 常量
    public static final String SINK_TYPE_HBASE = "hbase";
    public static final String SINK_TYPE_KAFKA = "kafka";
    public static final String SINK_TYPE_CK = "clickhouse";
    //来源表
    String sourceTable;
    //操作类型 insert,update,delete
    String operateType;
    //输出类型 hbase kafka
    String sinkType;
    //输出表(主题)
    String sinkTable;
    //输出字段
    String sinkColumns;
    //主键字段
    String sinkPk;
    //建表扩展
    String sinkExtend;
}

package com.yyds.common;

public class FlinkConfig {
    //Phoenix 库名
    public static final String HBASE_SCHEMA = "FLINK_REALTIME";
    //Phoenix 驱动
    public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
    //Phoenix 连接参数
    public static final String PHOENIX_SERVER =
            "jdbc:phoenix:centos01,centos02,centos03:2181";
}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐