问题描述

最近在开发flink程序时,需要开窗计算人次,在反复测试中发现flink的并行度会影响数据准确性,当kafka的分区数为6时,如果flink的并行度小于6,会有一定程度的数据丢失。而当flink 并行度等于kafka分区数的时候,则不会出现该问题。

例如Parallelism = 3,则会丢失近1/2的数据,但是丢失数据并不确定,会在0–1/2直接浮动。

问题查证

  1. 针对该问题,初步怀疑是source端没有取到所有kafka分区数据;

为验证该假设编写如下代码,将source数据直接sink为文件,手动统计文件内数据数量

val path = new Path("bi-stream-store-full/file/opt/huamei.txt")    
val sink: FileSink[String] = FileSink
      .forRowFormat(path, new SimpleStringEncoder[String]("UTF-8"))
      .withRollingPolicy(
        DefaultRollingPolicy.builder()
          .build())
      .build()
rtpPersonEventEnterBase.filter(_.contains("harmay")).sinkTo(sink)

注: 本地执行时,Path类中传入的路径为项目的根路径,而非执行module的根路径

根据统计输出文件中的数据,可以得知source端可以获取全部分区数据,故此该假设不成立

  1. 既然source取到了所有数据,则怀疑是开窗计算中,因为数据延迟导致数据被丢弃,以如下代码验证假设。

针对所有数据,开24小时窗口,并且允许24小时内的迟到数据。如果数据正常,则只会有少数迟到数据。反之会有大量迟到数据。

rtpPersonEventEnter
  .map((_,1))
  .keyBy(x => (x._1.siteId))
  .window(TumblingEventTimeWindows.of(Time.hours(24)))
  .allowedLateness(Time.hours(24))
  .sum(1)
  .map(x => (x._1.siteId,x._1.date,x._2))
  .print()

代码执行结果验证该假设,当窗口关闭输出结果后,会有大量的迟到数据到来,从而导致大量的update结果输出。

数据缺失的原因找到了,那是如何造成大量数据迟到的呢?

问题源码定位

具体原因则需要到源码中寻找,首先要看的是FlinkKafkaConsumer,该类为flink消费kafka消息的主类。该类主要任务是创建consumer,为分配partition,本文主旨是探究分区消费方式,故此只关注partition的分配。该代码中的关键点如下:

        final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
        if (restoredState != null) {
            for (KafkaTopicPartition partition : allPartitions) {
                if (!restoredState.containsKey(partition)) {
                    restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
                }
            }

            for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry :
                    restoredState.entrySet()) {
                // seed the partition discoverer with the union state while filtering out
                // restored partitions that should not be subscribed by this subtask
                if (KafkaTopicPartitionAssigner.assign(
                                restoredStateEntry.getKey(),
                                getRuntimeContext().getNumberOfParallelSubtasks())
                        == getRuntimeContext().getIndexOfThisSubtask()) {
                    subscribedPartitionsToStartOffsets.put(
                            restoredStateEntry.getKey(), restoredStateEntry.getValue());
                }
            }

以下代码展示:KafkaTopicPartitionAssigner.assign的分配规律:对topic取hashcode值,然后和最大整数做一个&运算,然后再加上分区的编号,最后再除以并行度就得到flink的substask的值

public class KafkaTopicPartitionAssigner {

   /**
    * Returns the index of the target subtask that a specific Kafka partition should be
    * assigned to.
    *
    * <p>The resulting distribution of partitions of a single topic has the following contract:
    * <ul>
    *     <li>1. Uniformly distributed across subtasks</li>
    *     <li>2. Partitions are round-robin distributed (strictly clockwise w.r.t. ascending
    *     subtask indices) by using the partition id as the offset from a starting index
    *     (i.e., the index of the subtask which partition 0 of the topic will be assigned to,
    *     determined using the topic name).</li>
    * </ul>
    *
    * <p>The above contract is crucial and cannot be broken. Consumer subtasks rely on this
    * contract to locally filter out partitions that it should not subscribe to, guaranteeing
    * that all partitions of a single topic will always be assigned to some subtask in a
    * uniformly distributed manner.
    *
    * @param partition the Kafka partition
    * @param numParallelSubtasks total number of parallel subtasks
    *
    * @return index of the target subtask that the Kafka partition should be assigned to.
    */
   public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
      int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;

      // here, the assumption is that the id of Kafka partitions are always ascending
      // starting from 0, and therefore can be used directly as the offset clockwise from the start index
      return (startIndex + partition.getPartition()) % numParallelSubtasks;
   }
}

该段代码获取所有kafka分区,然后根据规则分配给各个task。各task获取到需要消费的topic和offset后,开始消费数据,在run方法中调用kafkaFetcher.runFetchLoop(),该方法内核心代码如下:

        try {
            // kick off the actual Kafka consumer
            consumerThread.start();
            while (running) {
                // this blocks until we get the next records
                // it automatically re-throws exceptions encountered in the consumer thread
                final ConsumerRecords<byte[], byte[]> records = handover.pollNext();

                // get the records for each topic partition
                for (KafkaTopicPartitionState<T, TopicPartition> partition :
                        subscribedPartitionStates()) {

                    List<ConsumerRecord<byte[], byte[]>> partitionRecords =
                            records.records(partition.getKafkaPartitionHandle());

                    partitionConsumerRecordsHandler(partitionRecords, partition);
                }
            }

可以看到该方法在获取到数据时,会根据partition做一个分配。并通过partitionConsumerRecordsHandler

方法输出records。

至此可以说找到了问题根源,当一个task的source负责消费多个kafka partition数据时,**handover.pollNext()通过调用consumer.poll()**方法获取records后(consumer.poll默认获取500条记录),会将数据根据partition分开,然后循环输出。这就导致如果先输出的分区内的数据推动watermark关闭窗口,则后面输出分区内的数据就都会被抛弃,故此会导致数据丢失bug。而且因为partiiton的先后输出不确定,就会导致数据结果虽然缺失,但是每次缺失情况并不相同。

解放方案

找到了问题原因,接下来就要寻找解决方案。

因为该bug是一个task消费多个分区数据导致,故此最简单的方案就是保持job的parallelism与kafka分区数相同。如果该topic 数据量非常大,则可以直接指定全局parallelisms = partitions,如果该topic数据量不大,则可以在source算子上指定parallelisms = partitions。

或者使用Kafka Connector的一个特性,在comsumer上配置watermark,具体见watermark-strategies-and-the-kafka-connector

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐