Flink实时数仓_DWD层数据准备
第1章 需求分析及实现思路1.1 分层需求分析在之前介绍实时数仓概念时讨论过,建设实时数仓的目的,主要是增加数据计算的复用性。每次新增加统计需求时,不至于从原始数据进行计算,而是从半成品继续加工而成。我们这里从 kafka 的 ods 层读取用户行为日志以及业务数据,并进行简单处理,写回到 kafka 作为 dwd 层。1.2 每层的职能分层数据描述生成计算工具存储媒介ODS原始数据,日志和业务数
第1章 需求分析及实现思路
1.1 分层需求分析
在之前介绍实时数仓概念时讨论过,建设实时数仓的目的,主要是增加数据计算的复用性。每次新增加统计需求时,不至于从原始数据进行计算,而是从半成品继续加工而成。
我们这里从 kafka 的 ods 层读取用户行为日志以及业务数据,并进行简单处理,写回到 kafka 作为 dwd 层。
1.2 每层的职能
分层 | 数据描述 | 生成计算工具 | 存储媒介 |
---|---|---|---|
ODS | 原始数据,日志和业务数据 日志服务器 | maxwell | kafka |
DWD | 根据数据对象为单位进行分流,比如订单、页面访问等等 | FLINK | kafka |
DIM | 维度数据 | FLINK | HBase/Redis |
DWM | 对于部分数据对象进行进一步加工,比如独立访问、跳出行为依旧是明细数据 | FLINK | kafka |
DWS | 根据某个维度主题将多个事实数据轻度聚合,形成主题宽表 | FLINK | Clickhouse |
ADS | 把 Clickhouse 中的数据根据可视化需要进行筛选聚合 | Clickhouse | 可视化展示 |
3 . DWD 层数据准备实现思路
➢ 功能 1:环境搭建
➢ 功能 2:计算用户行为日志 DWD 层
➢ 功能 3:计算业务数据 DWD 层
一、用户行为日志 DWD 层
我们前面采集的日志数据已经保存到 Kafka 中,作为日志数据的 ODS 层,从 kafka 的ODS 层读取的日志数据分为 3 类, 页面日志、启动日志和曝光日志。这三类数据虽然都是用户行为数据,但是有着完全不一样的数据结构,所以要拆分处理。利用侧输出流实现数据拆分,将拆分后的不同的日志写回 Kafka 不同主题中,作为日志 DWD 层。
- 页面日志输出到主流,
- 启动日志输出到启动侧输出流,
- 曝光日志输出到曝光侧输出流
2 识别新老访客
保存每个 mid 的首次访问日期,每条进入该算子的访问记录,都会把 mid 对应的首次访问时间读取出来,跟当前日期进行比较,只有首次访问时间不为空,且首次访问时间早于当日的,则认为该访客是老访客,否则是新访客。同时如果是新访客且没有访问记录的话,会写入首次访问时间。
/**
* Desc: 准备用户行为日志的DWD层
*/
public class BaseLogApp {
private static final String TOPIC_START = "dwd_start_log";
private static final String TOPIC_DISPLAY = "dwd_display_log";
private static final String TOPIC_PAGE = "dwd_page_log";
public static void main(String[] args) throws Exception {
//TODO 1.准备环境
//1.1 创建Flink流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.2设置并行度
env.setParallelism(1);
//1.3设置Checkpoint
//每5000ms开始一次checkpoint,模式是EXACTLY_ONCE(默认)
//env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
//env.getCheckpointConfig().setCheckpointTimeout(60000);
//env.setStateBackend(new FsStateBackend("hdfs://hadoop202:8020/gmall/checkpoint/baselogApp"));
//System.setProperty("HADOOP_USER_NAME","atguigu");
//TODO 2.从Kafka中读取数据
String topic = "ods_base_log";
String groupId = "base_log_app_group";
//2.1 调用Kafka工具类,获取FlinkKafkaConsumer
FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
//TODO 3.对读取到的数据格式进行转换 String->json
SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(
new MapFunction<String, JSONObject>() {
@Override
public JSONObject map(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
return jsonObject;
}
}
);
//jsonObjDS.print("json>>>>>>>>");
/*
TODO 4.识别新老访客 前端也会对新老状态进行记录,有可能会不准,咱们这里是再次做一个确认
保存mid某天方法情况(将首次访问日期作为状态保存起来),等后面该设备在有日志过来的时候,从状态中获取日期
和日志产生日志进行对比。如果状态不为空,并且状态日期和当前日期不相等,说明是老访客,如果is_new标记是1,那么对其状态进行修复
*/
//4.1 根据mid对日志进行分组
KeyedStream<JSONObject, String> midKeyedDS = jsonObjDS.keyBy(
data -> data.getJSONObject("common").getString("mid")
);
//4.2 新老方法状态修复 状态分为算子状态和键控状态,我们这里要记录某一个设备的访问,使用键控状态比较合适
SingleOutputStreamOperator<JSONObject> jsonDSWithFlag = midKeyedDS.map(
new RichMapFunction<JSONObject, JSONObject>() {
//定义该mid访问状态
private ValueState<String> firstVisitDateState;
//定义日期格式化对象
private SimpleDateFormat sdf;
@Override
public void open(Configuration parameters) throws Exception {
//对状态以及日期格式进行初始化
firstVisitDateState = getRuntimeContext().getState(
new ValueStateDescriptor<String>("newMidDateState", String.class)
);
sdf = new SimpleDateFormat("yyyyMMdd");
}
@Override
public JSONObject map(JSONObject jsonObj) throws Exception {
//获取当前日志标记状态
String isNew = jsonObj.getJSONObject("common").getString("is_new");
//获取当前日志访问时间戳
Long ts = jsonObj.getLong("ts");
if ("1".equals(isNew)) {
//获取当前mid对象的状态
String stateDate = firstVisitDateState.value();
//对当前条日志的日期格式进行抓换
String curDate = sdf.format(new Date(ts));
//如果状态不为空,并且状态日期和当前日期不相等,说明是老访客
if (stateDate != null && stateDate.length() != 0) {
//判断是否为同一天数据
if (!stateDate.equals(curDate)) {
isNew = "0";
jsonObj.getJSONObject("common").put("is_new", isNew);
}
} else {
//如果还没记录设备的状态,将当前访问日志作为状态值
firstVisitDateState.update(curDate);
}
}
return jsonObj;
}
}
);
//jsonDSWithFlag.print(">>>>>>>>>>>");
//TODO 5 .分流 根据日志数据内容,将日志数据分为3类, 页面日志、启动日志和 曝光日志。
// 页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光日志侧输出流
// 侧输出流:1)接收迟到数据 2)分流
//定义启动侧输出流标签
OutputTag<String> startTag = new OutputTag<String>("start"){};
//定义曝光侧输出流标签
OutputTag<String> displayTag = new OutputTag<String>("display"){};
SingleOutputStreamOperator<String> pageDS = jsonDSWithFlag.process(
new ProcessFunction<JSONObject, String>() {
@Override
public void processElement(JSONObject jsonObj, Context ctx, Collector<String> out) throws Exception {
//获取启动日志标记
JSONObject startJsonObj = jsonObj.getJSONObject("start");
//将json格式转换为字符串,方便向侧输出流输出以及向kafka中写入
String dataStr = jsonObj.toString();
//判断是否为启动日志
if (startJsonObj != null && startJsonObj.size() > 0) {
//如果是启动日志,输出到启动侧输出流
ctx.output(startTag, dataStr);
} else {
//如果不是启动日志,获取曝光日志标记(曝光日志中也携带了页面)
JSONArray displays = jsonObj.getJSONArray("displays");
//判断是否为曝光日志
if (displays != null && displays.size() > 0) {
//如果是曝光日志,遍历输出到侧输出流
for (int i = 0; i < displays.size(); i++) {
//获取每一条曝光事件
JSONObject displaysJsonObj = displays.getJSONObject(i);
//获取页面id
String pageId = jsonObj.getJSONObject("page").getString("page_id");
//给每一条曝光事件加pageId
displaysJsonObj.put("page_id", pageId);
ctx.output(displayTag, displaysJsonObj.toString());
}
} else { //如果不是启动日志 说明是页面日志 ,输出到主流
out.collect(dataStr);
}
}
}
}
);
//获取侧输出流
DataStream<String> startDS = pageDS.getSideOutput(startTag);
DataStream<String> displayDS = pageDS.getSideOutput(displayTag);
//打印输出
pageDS.print("page>>>>");
startDS.print("start>>>>");
displayDS.print("display>>>>");
//TODO 6.将不同流的数据写回到kafka的不同topic中
FlinkKafkaProducer<String> startSink = MyKafkaUtil.getKafkaSink(TOPIC_START);
startDS.addSink(startSink);
FlinkKafkaProducer<String> displaySink = MyKafkaUtil.getKafkaSink(TOPIC_DISPLAY);
displayDS.addSink(displaySink);
FlinkKafkaProducer<String> pageSink = MyKafkaUtil.getKafkaSink(TOPIC_PAGE);
pageDS.addSink(pageSink);
env.execute();
}
}
二、业务数据 DWD 层
1、实现动态分流功能
业务数据的变化,我们可以通过 Maxwell 采集到,但是 MaxWell 是把全部数据统一写入一个 Topic 中, 这些数据包括业务数据,也包含维度数据,这样显然不利于日后的数据处理,所以需要把各个表拆开处理。但是由于每个表有不同的特点,有些表是维度表,有些表是事实表,有的表既是事实表在某种情况下也是维度表。
所以这个功能是从 Kafka 的业务数据 ODS 层读取数据,在实时计算中,经过处理后, 一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase/ Redis / MySQL等。一般把事实数据写入流中,将事实数据写回 Kafka 作为业务数据的 DWD 层,进行进一步处理,最终形成宽表。
- 业务数据保存到 Kafka 的主题中
- 维度数据保存到 Hbase 的表中
但是作为 Flink 实时计算任务,如何得知哪些表是维度表,哪些是事实表呢?而这些表又应该采集哪些字段呢?
我们可以将上面的内容放到某一个地方,集中配置。这样的配置不适合写在配置文件中,因为业务端随着需求变化每增加一张表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。
这种可以有两个方案实现:
- ➢ 一种是用 Zookeeper 存储,通过 Watch 感知数据变化。
- ➢ 另一种是用 mysql 数据库存储,周期性的同步。
这里选择第二种方案,主要是 mysql 对于配置数据初始化和维护管理,用 sql 都比较方便,虽然周期性操作时效性差一点,但是配置变化并不频繁。
/**
* Desc: 准备业务数据的DWD层
*/
public class BaseDBApp {
public static void main(String[] args) throws Exception {
//TODO 1.准备环境
//1.1 创建流处理执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.2 设置并新度
env.setParallelism(1);
//1.3 开启Checkpoint,并设置相关的参数
//env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
//env.getCheckpointConfig().setCheckpointTimeout(60000);
//env.setStateBackend(new FsStateBackend("hdfs://hadoop202:8020/gmall/checkpoint/basedbapp"));
//重启策略
// 如果说没有开启重启Checkpoint,那么重启策略就是noRestart
// 如果说没有开Checkpoint,那么重启策略会尝试自动帮你进行重启 重启Integer.MaxValue
//env.setRestartStrategy(RestartStrategies.noRestart());
//TODO 2.从Kafka的ODS层读取数据
String topic = "ods_base_db_m";
String groupId = "base_db_app_group";
//2.1 通过工具类获取Kafka的消费者
FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
DataStreamSource<String> jsonStrDS = env.addSource(kafkaSource);
//TODO 3.对DS中数据进行结构的转换 String-->Json
//jsonStrDS.map(JSON::parseObject);
SingleOutputStreamOperator<JSONObject> jsonObjDS = jsonStrDS.map(jsonStr -> JSON.parseObject(jsonStr));
//jsonStrDS.print("json>>>>");
//TODO 4.对数据进行ETL 如果table为空 或者 data为空,或者长度<3 ,将这样的数据过滤掉
SingleOutputStreamOperator<JSONObject> filteredDS = jsonObjDS.filter(
jsonObj -> {
boolean flag = jsonObj.getString("table") != null
&& jsonObj.getJSONObject("data") != null
&& jsonObj.getString("data").length() > 3;
return flag;
}
);
//filteredDS.print("json>>>>>");
//TODO 5. 动态分流 事实表放到主流,写回到kafka的DWD层;如果维度表,通过侧输出流,写入到Hbase
//5.1定义输出到Hbase的侧输出流标签
OutputTag<JSONObject> hbaseTag = new OutputTag<JSONObject>(TableProcess.SINK_TYPE_HBASE){};
//5.2 主流 写回到Kafka的数据
SingleOutputStreamOperator<JSONObject> kafkaDS = filteredDS.process(
new TableProcessFunction(hbaseTag)
);
//5.3获取侧输出流 写到Hbase的数据
DataStream<JSONObject> hbaseDS = kafkaDS.getSideOutput(hbaseTag);
kafkaDS.print("事实>>>>");
hbaseDS.print("维度>>>>");
//TODO 6.将维度数据保存到Phoenix对应的维度表中
hbaseDS.addSink(new DimSink());
//TODO 7.将事实数据写回到kafka的dwd层
FlinkKafkaProducer<JSONObject> kafkaSink = MyKafkaUtil.getKafkaSinkBySchema(
new KafkaSerializationSchema<JSONObject>() {
@Override
public void open(SerializationSchema.InitializationContext context) throws Exception {
System.out.println("kafka序列化");
}
@Override
public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObj, @Nullable Long timestamp) {
String sinkTopic = jsonObj.getString("sink_table");
JSONObject dataJsonObj = jsonObj.getJSONObject("data");
return new ProducerRecord<>(sinkTopic,dataJsonObj.toString().getBytes());
}
}
);
kafkaDS.addSink(kafkaSink);
env.execute();
}
}
2) 程序流程分析
TableProcessFunction 是一个自定义算子,主要包括三条时间线任务
➢ 图中紫线,这个时间线与数据流入无关,只要任务启动就会执行。主要的任务方法是open()这个方法在任务启动时就会执行。他的主要工作就是初始化一些连接,开启周期调度。
➢ 图中绿线,这个时间线也与数据流入无关,只要周期调度启动,会自动周期性执行。主要的任务是同步配置表(tableProcessMap)。通过在 open()方法中加入 timer定时器实现。同时还有个附带任务就是如果发现不存在数据表,要根据配置自动创建数据库表。
➢ 图中黑线,这个时间线就是随着数据的流入持续发生,这部分的任务就是根据同步到内存的 tableProcessMap,来为流入的数据进行标识,同时清理掉没用的字段。
2、对业务数据进行分流处理的自定义处理函数
/**
* Desc: 用于对业务数据进行分流处理的自定义处理函数
*/
public class TableProcessFunction extends ProcessFunction<JSONObject, JSONObject> {
//因为要将维度数据通过侧输出流输出,所以我们在这里定义一个侧输出流标记
private OutputTag<JSONObject> outputTag;
//用于在内存中存放配置表信息的Map <表名:操作,tableProcess>
private Map<String, TableProcess> tableProcessMap = new HashMap<>();
//用于在内存中存放已经处理过的表(在phoenix中已经建过的表)
private Set<String> existsTables = new HashSet<>();
//声明Phoenix的连接对象
Connection conn = null;
………………
………………
//根据sinkType,将数据输出到不同的流
if (tableProcess != null && tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_HBASE)) {
//如果sinkType = hbase ,说明是维度数据,通过侧输出流输出
ctx.output(outputTag, jsonObj);
} else if (tableProcess != null && tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_KAFKA)) {
//如果sinkType = kafka ,说明是事实数据,通过主流输出
out.collect(jsonObj);
}
3、自定义函数 TableProcessFunction-open
生命周期方法,初始化连接,初始化配置表信息并开启定时任务,用于不断读取配置表信息
//在函数被调用的时候执行的方法,执行一次
@Override
public void open(Configuration parameters) throws Exception {
//初始化Phoenix连接
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
//初始化配置表信息
refreshMeta(); //========1.从MySQL数据库配置表中查询配置信息
//开启一个定时任务
// 因为配置表的数据可能会发生变化,每隔一段时间就从配置表中查询一次数据,更新到map,并检查建表
//从现在起过delay毫秒后,每隔period执行一次
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
refreshMeta();
}
}, 5000, 5000);
}
4、分流 Sink 之保存维度到 HBase(Phoenix) 、通过 Phoenix 向 Hbase 表中写数据
注意:为了开启 hbase 的 namespace 和 phoenix 的 schema 的映射,在程序中需要加这
个配置文件,另外在 linux 服务上,也需要在 hbase 以及 phoenix 的 hbase-site.xml 配置
文件中,加上以上两个配置,并使用 xsync 进行同步
/**
* Desc: 通过 Phoenix 向 Hbase 表中写数据
*/
public class DimSink extends RichSinkFunction<JSONObject> {
四、总结
DWD 的实时计算核心就是数据分流,其次是状态识别。在开发过程中我们实践了几个
灵活度较强算子,比如 RichMapFunction, ProcessFunction, RichSinkFunction。 那这
几个我们什么时候会用到呢?如何选择?
从对比表中能明显看出,Rich 系列能功能强大,ProcessFunction 功能更强大,但是相对的越全面的算子使用起来也更加繁琐。
更多推荐
所有评论(0)