业务上需要将同一个时间段的两种数据收集到一起做一些计算,这两种数据分别存在于Kafka的两个Topic中。计算逻辑是这样的:

    使用两个DataStream分别消费两个Topic中的数据,对两条流先分别设置WaterMark,然后union,接着进行keyBy操作,最后使用Window将同一个时间窗口中的两种数据汇聚在一起进行计算。但是发现程序无论是在本地运行还是在yarn-cluster模式下运行,只要并行度不为1,程序都不能正常执行。明明显示已经收到了数据,但是Window就是不触发:

 

Window何时触发:

    Window是通过Trigger来触发的,时间使用EventTime时默认使用EventTimeTrigger:

    每条数据进来的时候都会通过WindowOperator中的processElement()方法走到onElement()方法中。如果当前WaterMark大于窗口结束时间,那么就会立即触发窗口计算,否则会使用窗口结束时间注册一个触发器(Timer),用于触发Window的计算(底层是Set结构,所以触发器会覆盖从而不会内存溢出):

 

WaterMark何时更新:

    在对流使用assignTimestampsAndWatermarks之后,会对流中的元素调用用户定义的方法,然后生成WaterMark:

再将WaterMark broadcast出去:

broadcast出去的目的是为了下游再检查WaterMark的时候,以最小的那个WaterMark作为总体的WaterMark:

这里可以看到,最小的watermark居然是个负值(在调试了好一会之后),也就是说至少有一个任务WaterMark一直没有更新,也就是说并行度为3的task里面有task没有接受到数据

 

Flink是如何消费Kafka数据的:

   

    Flink source用到了FlinkKafkaConsumer010,没有指定KafkaPartitioner的话,会通过FixedPartitioner来给出默认的partitioner方法,而默认的Flink partition的规则,就是Flink的并行度ID除以kafka partition length取余。程序的并行度为3,Kafka的Partition数量也为3,即每个Task消费一个Partition中的数据:

public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        Preconditions.checkArgument(partitions != null && partitions.length > 0, "Partitions of the target topic is empty.");
        return partitions[this.parallelInstanceId % partitions.length];
    }

然后使用命令检查发现,有两个Partition是没有数据的,所以导致两个Task的WaterMark一直更新不了:

 

解决方法:

    将WaterMark的并行度与Source的并行度设置不一致,使得数据进行Shuffle,从而使得Task都可以更新WaterMark,最终结果如下:

 

 

参考:

    https://www.jianshu.com/p/753e8cf803bb(问题定位:Flink水位线不触发问题)

    https://www.cnblogs.com/ljygz/p/11392952.html

    https://www.jianshu.com/p/c8c789ff5570(EventTimeTrigger解析)

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐