行为数据ods到dwd层

采集的日志数据已经保存到 Kafka 中,作为日志数据的 ODS 层,从 Kafka 的ODS 层读取的日志数据分为 3 类, 页面日志、启动日志和曝光日志。这三类数据虽然都是用户行为数据,但是有着完全不一样的数据结构,所以要拆分处理。将拆分后的不同的日志写回 Kafka 不同主题中,作为日志 DWD 层。

(1) 识别新老用户工具类

本身客户端业务有新老用户的标识,但是不够准确,需要用实时计算再次确认。

package com.yyds.utils;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

public class MyKafkaUtils {


    private static String brokers = "centos01:9092,centos02:9092,centos03:9092";

    public static FlinkKafkaProducer<String> getKafkaProducer(String topic){
        return new FlinkKafkaProducer<String>(
                brokers,
          topic,
          new SimpleStringSchema()
        );
    }


    public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic,String groupId){
        Properties properties = new Properties();

        properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers);

        return new FlinkKafkaConsumer<String>(
                topic,
                new SimpleStringSchema(),
                properties
        );
    }
}

(2) 代码实现

package com.yyds.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.yyds.utils.MyKafkaUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class BaseLogApp {
    public static void main(String[] args) throws Exception {
        //TODO 1、获取执行环境
        System.setProperty("HADOOP_USER_NAME","root");

        // 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 开启 Checkpoint,每隔 5 秒钟做一次 Checkpoint
        env.enableCheckpointing(5000L);
        //指定 CK 的一致性语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 设置超时时间
        //env.getCheckpointConfig().setAlignmentTimeout(10000L);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
        // 重启策略
        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L));
        //设置任务关闭的时候保留最后一次 CK 数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        );
        // 设置状态后端
        env.setStateBackend(new FsStateBackend("hdfs://centos01:8020/flinkCDC/ck"));


        //TODO 2、消费ods_base_log  主题数据
        String sourceTopic = "ods_base_log";
        String groupId = "base_log_app_2022";
        FlinkKafkaConsumer<String> kafkaConsumer = MyKafkaUtils.getKafkaConsumer(sourceTopic, groupId);
        DataStreamSource<String> kafkaDS = env.addSource(kafkaConsumer);

        //TODO 3、将每行数据转换为JSON对象
        OutputTag<String> outputTag = new OutputTag<String>("dirty") {

        };
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() {
            @Override
            public void processElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {
                try {
                    JSONObject jsonObject = JSON.parseObject(value);
                    out.collect(jsonObject);
                } catch (Exception e) {
                    // TODO 统计脏数据的信息,一般为千分之1 到 千分之3
                    // 写入侧输出流
                    ctx.output(outputTag,value);
                }
            }
        });

        jsonObjDS.getSideOutput(outputTag).print("dirty>>>>>>>>>>>>>>>>>>>>>>");

        //TODO 4、新老用户校验  状态编程
        // 按照mid进行分组
        SingleOutputStreamOperator<JSONObject> jsonObjWithNewTag = jsonObjDS
                .keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"))
                .map(new RichMapFunction<JSONObject, JSONObject>() {
                    private ValueState<String> valueState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        ValueState<String> valueState = getRuntimeContext()
                                .getState(new ValueStateDescriptor<String>("value-state", String.class));
                    }

                    @Override
                    public JSONObject map(JSONObject value) throws Exception {
                        // 获取数据中的is_new标记
                        String isNew = value.getJSONObject("common").getString("is_new");

                        // 判断isNew是否为1
                        if ("1".equals(isNew)) {
                            String state = valueState.value();
                            if (state != null) {
                                // 修改isNew标记
                                value.getJSONObject("common").put("is_new", "0");
                                return value;
                            } else {
                                // 没有来过
                                valueState.update("1");
                                return value;
                            }
                        } else {
                            return value;
                        }
                    }
                });


        //TODO 5、分流处理   侧输出流    页面为主流  启动和曝光为侧输出流
        OutputTag<String> stringOutputTag = new OutputTag<String>("start") {
        };
        OutputTag<String> displayTag = new OutputTag<String>("display") {
        };
        SingleOutputStreamOperator<String> pageDS = jsonObjWithNewTag.process(new ProcessFunction<JSONObject, String>() {
            @Override
            public void processElement(JSONObject value, Context ctx, Collector<String> out) throws Exception {
                // 启动
                String start = value.getString("start");
                if (start != null && start.length() > 0) {
                    // 将数据写入到启动日志侧输出流
                    ctx.output(stringOutputTag, value.toString());
                } else {
                    // 页面为主流
                    out.collect(value.toString());

                    // 曝光数据
                    JSONArray displays = value.getJSONArray("displays");

                    if (displays != null && displays.size() > 0) {
                        // 获取页面Id
                        String pageId = value.getJSONObject("page").getString("page_id");

                        for (int i = 0; i < displays.size(); i++) {
                            JSONObject display = displays.getJSONObject(i);
                            display.put("page_id", pageId);
                            // 将输出写入到曝光侧输出流
                            ctx.output(displayTag, display.toString());
                        }
                    }
                }
            }
        });



        //TODO 6、提取侧输出流
        DataStream<String> startDS = pageDS.getSideOutput(stringOutputTag);
        DataStream<String> displayDS = pageDS.getSideOutput(displayTag);


        //TODO 7、将三个流进行打印并输出到对应的kafka主题中
        startDS.print("start>>>>>>>>>>>>>>>>>>>>>>>>>>");
        displayDS.print("displayDS>>>>>>>>>>>>>>>>>>>>>>>>>>");
        pageDS.print("pageDS>>>>>>>>>>>>>>>>>>>>>>>>>>");

        startDS.addSink(MyKafkaUtils.getKafkaProducer("dwd_start_log"));
        displayDS.addSink(MyKafkaUtils.getKafkaProducer("dwd_display_log"));
        pageDS.addSink(MyKafkaUtils.getKafkaProducer("dwd_page_log"));


        //TODO 8、启动任务
        env.execute("BaseLogApp");
    }
}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐