实时数仓(一)行为数据ods到dwd层
行为数据ods到dwd层采集的日志数据已经保存到 Kafka 中,作为日志数据的 ODS 层,从 Kafka 的ODS 层读取的日志数据分为 3 类, 页面日志、启动日志和曝光日志。这三类数据虽然都是用户行为数据,但是有着完全不一样的数据结构,所以要拆分处理。将拆分后的不同的日志写回 Kafka 不同主题中,作为日志 DWD 层。(1) 识别新老用户工具类本身客户端业务有新老用户的标识,但是不够准
·
行为数据ods到dwd层
采集的日志数据已经保存到 Kafka 中,作为日志数据的 ODS 层,从 Kafka 的ODS 层读取的日志数据分为 3 类, 页面日志、启动日志和曝光日志。这三类数据虽然都是用户行为数据,但是有着完全不一样的数据结构,所以要拆分处理。将拆分后的不同的日志写回 Kafka 不同主题中,作为日志 DWD 层。
(1) 识别新老用户工具类
本身客户端业务有新老用户的标识,但是不够准确,需要用实时计算再次确认。
package com.yyds.utils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
public class MyKafkaUtils {
private static String brokers = "centos01:9092,centos02:9092,centos03:9092";
public static FlinkKafkaProducer<String> getKafkaProducer(String topic){
return new FlinkKafkaProducer<String>(
brokers,
topic,
new SimpleStringSchema()
);
}
public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic,String groupId){
Properties properties = new Properties();
properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers);
return new FlinkKafkaConsumer<String>(
topic,
new SimpleStringSchema(),
properties
);
}
}
(2) 代码实现
package com.yyds.app.dwd;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.yyds.utils.MyKafkaUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class BaseLogApp {
public static void main(String[] args) throws Exception {
//TODO 1、获取执行环境
System.setProperty("HADOOP_USER_NAME","root");
// 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 开启 Checkpoint,每隔 5 秒钟做一次 Checkpoint
env.enableCheckpointing(5000L);
//指定 CK 的一致性语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置超时时间
//env.getCheckpointConfig().setAlignmentTimeout(10000L);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
// 重启策略
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L));
//设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// 设置状态后端
env.setStateBackend(new FsStateBackend("hdfs://centos01:8020/flinkCDC/ck"));
//TODO 2、消费ods_base_log 主题数据
String sourceTopic = "ods_base_log";
String groupId = "base_log_app_2022";
FlinkKafkaConsumer<String> kafkaConsumer = MyKafkaUtils.getKafkaConsumer(sourceTopic, groupId);
DataStreamSource<String> kafkaDS = env.addSource(kafkaConsumer);
//TODO 3、将每行数据转换为JSON对象
OutputTag<String> outputTag = new OutputTag<String>("dirty") {
};
SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() {
@Override
public void processElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {
try {
JSONObject jsonObject = JSON.parseObject(value);
out.collect(jsonObject);
} catch (Exception e) {
// TODO 统计脏数据的信息,一般为千分之1 到 千分之3
// 写入侧输出流
ctx.output(outputTag,value);
}
}
});
jsonObjDS.getSideOutput(outputTag).print("dirty>>>>>>>>>>>>>>>>>>>>>>");
//TODO 4、新老用户校验 状态编程
// 按照mid进行分组
SingleOutputStreamOperator<JSONObject> jsonObjWithNewTag = jsonObjDS
.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"))
.map(new RichMapFunction<JSONObject, JSONObject>() {
private ValueState<String> valueState;
@Override
public void open(Configuration parameters) throws Exception {
ValueState<String> valueState = getRuntimeContext()
.getState(new ValueStateDescriptor<String>("value-state", String.class));
}
@Override
public JSONObject map(JSONObject value) throws Exception {
// 获取数据中的is_new标记
String isNew = value.getJSONObject("common").getString("is_new");
// 判断isNew是否为1
if ("1".equals(isNew)) {
String state = valueState.value();
if (state != null) {
// 修改isNew标记
value.getJSONObject("common").put("is_new", "0");
return value;
} else {
// 没有来过
valueState.update("1");
return value;
}
} else {
return value;
}
}
});
//TODO 5、分流处理 侧输出流 页面为主流 启动和曝光为侧输出流
OutputTag<String> stringOutputTag = new OutputTag<String>("start") {
};
OutputTag<String> displayTag = new OutputTag<String>("display") {
};
SingleOutputStreamOperator<String> pageDS = jsonObjWithNewTag.process(new ProcessFunction<JSONObject, String>() {
@Override
public void processElement(JSONObject value, Context ctx, Collector<String> out) throws Exception {
// 启动
String start = value.getString("start");
if (start != null && start.length() > 0) {
// 将数据写入到启动日志侧输出流
ctx.output(stringOutputTag, value.toString());
} else {
// 页面为主流
out.collect(value.toString());
// 曝光数据
JSONArray displays = value.getJSONArray("displays");
if (displays != null && displays.size() > 0) {
// 获取页面Id
String pageId = value.getJSONObject("page").getString("page_id");
for (int i = 0; i < displays.size(); i++) {
JSONObject display = displays.getJSONObject(i);
display.put("page_id", pageId);
// 将输出写入到曝光侧输出流
ctx.output(displayTag, display.toString());
}
}
}
}
});
//TODO 6、提取侧输出流
DataStream<String> startDS = pageDS.getSideOutput(stringOutputTag);
DataStream<String> displayDS = pageDS.getSideOutput(displayTag);
//TODO 7、将三个流进行打印并输出到对应的kafka主题中
startDS.print("start>>>>>>>>>>>>>>>>>>>>>>>>>>");
displayDS.print("displayDS>>>>>>>>>>>>>>>>>>>>>>>>>>");
pageDS.print("pageDS>>>>>>>>>>>>>>>>>>>>>>>>>>");
startDS.addSink(MyKafkaUtils.getKafkaProducer("dwd_start_log"));
displayDS.addSink(MyKafkaUtils.getKafkaProducer("dwd_display_log"));
pageDS.addSink(MyKafkaUtils.getKafkaProducer("dwd_page_log"));
//TODO 8、启动任务
env.execute("BaseLogApp");
}
}
更多推荐
已为社区贡献8条内容
所有评论(0)