1

项目背景

flink消费kafka,根据数据业务特点解耦写入不同的主题中,这常见场景就是日志数据,性能数据等要这样处理,后续再消费不同的主题进行实时分析.

2

案例分析

案例:kafka数据格式为json类型,json中的value有数组,也有json的,数组元素是json。现在提供这么一个场景,数据中的元素为同一类信息,将info 数组中的每个元素单独作为一个json串输出,同时将tags的值输出。


// 数据样例如下
{"info":[{"name":"zhangsan","age":18,"gender":"male"},{"name":"lisi","age":20,"gender":"female"}],"tags":{"hobby":"goshopping","address":"beijing","career":"programer"},"event_ts":1697698020,"objectType":"test"}

3

解决思路

直奔主题,使用侧输出流。把重要的数据放在主流进行输出,其余的放在测流输出,测流可以有多个。

调用上下文ctx的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”(OutputTag),它指定了侧输出流的id和类型。

4

具体实现

// 1.读kafka

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("cdh01:9092,cdh02:9092,cdh03:9092")
                .setTopics("test1116")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        SingleOutputStreamOperator<JSONObject> readKafkaStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").map(JSON::parseObject);
// 2.分流

 Map<String, DataStream<JSONObject>> streams = splitStream(readKafkaStream);
 // 方法实现
  private static Map<String, DataStream<JSONObject>> splitStream(SingleOutputStreamOperator<JSONObject> stream) {
        // 主流不需要打标签
        OutputTag<JSONObject> tagsTag = new OutputTag<JSONObject>("tags", TypeInformation.of(JSONObject.class));
        // 输出ods数据
        SingleOutputStreamOperator<JSONObject> infoStream = stream
                .process(new ProcessFunction<JSONObject, JSONObject>() {
                    @Override
                    public void processElement(JSONObject obj,
                                               Context ctx,
                                               Collector<JSONObject> out) throws Exception {
                        // 1.info数据
                        JSONArray infosBeams = obj.getJSONArray("info");
                        if (infosBeams != null) {
                            for (int i = 0; i < infosBeams.size(); i++) {
                                JSONObject infoBeam = infosBeams.getJSONObject(i);
                                out.collect(infoBeam);
                            }
//                            obj.remove("info");//移除 info 这个 key, 因为后面的数据处理用不上
                        }
                        // 2.tags数据
                        JSONObject tags = obj.getJSONObject("tags");
                        if (tags != null) {
                            ctx.output(tagsTag, tags);
//                            obj.remove("tags");
                        }
                    }});

        SideOutputDataStream<JSONObject> tagsStream = infoStream.getSideOutput(tagsTag);
        Map<String, DataStream<JSONObject>> streams = new HashMap<>();
        streams.put("info", infoStream);
        streams.put("tags", tagsStream);
        return streams;
    }
// 3.输出打印

 writeToConsole(streams);
 // 方法实现
   private static void writeToConsole(Map<String, DataStream<JSONObject>> streams) {
      //主流
        streams.get("info").map(JSONAware::toJSONString).print();
      //侧流 
        streams.get("tags").map(JSONAware::toJSONString).print();
// 4.结果

// 主流
{"gender":"male","name":"zhangsan","age":18}
{"gender":"female","name":"lisi","age":20}
// 侧流
{"career":"programer","address":"beijing","hobby":"goshopping"}

5

笔记扩展

为什么要将整条数据拆分到不同的主题中?

首先整条数据内容多,不容易进行分析。拆解后数据变得清晰易懂,也容易管理。其次分析的过程相当于对业务进行归纳,抽象。

数据分流除了侧输出流还有没有别的方式?

如果数据结构简单,可以使用filter算子,本身非常容易实现:只要针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流了。缺点就是将原始数据流复制了多份,然后对每一份分别做筛选;这明显是不够高效的。

坚持分享,欢迎交流,大家共同进步!

微信公众号|大数据进阶小铺

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐