提示:本文的源码是基于Flink 1.13版本


前言

因为最近在研究如何监控Flink消费Kafka数据延迟,在网上查阅资料,发现可以通过修改Kafka的connector生成Lag的metric进行监控,于是就顺带看了一下Kafka Connector的源码,然后整理了这篇博客


一、版本信息

<dependency> 
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId> 
  <version>1.13.0</version> 
</dependency>

二、类图

通过类图可以看到FlinkConsumer这个类的继承关系
在这里插入图片描述


三、源码分析

FlinkKafkaConsumerBase的open方法

1、确定offset的提交模式,offsetCommitMode有三种,ON_CHECKPOINTS,KAFKA_PERIODIC,DISABLED;如果打开checkpoint,offest会记录在snapshot中,否则offset会定期写回kafka topic,如果是disabled,就不会提交offset。

this.offsetCommitMode =
                OffsetCommitModes.fromConfiguration(
                        getIsAutoCommitEnabled(),
                        enableCommitOnCheckpoints,
                        ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());

2、创建分区发现器,用于自动获取新分区

 this.partitionDiscoverer =
                createPartitionDiscoverer(
                        topicsDescriptor,
                        getRuntimeContext().getIndexOfThisSubtask(),
                        getRuntimeContext().getNumberOfParallelSubtasks());

3、开启分区发现器,其实本质就是根据kafka的配置信息,初始化一个kafka消费者

this.partitionDiscoverer.open();

public void open() throws Exception {
        closed = false;
        initializeConnections();
}

protected void initializeConnections() {
        this.kafkaConsumer = new KafkaConsumer<>(kafkaProperties);
}

4、为每个SubTask分配消费的topic以及对应分区

final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();

discoverPartitions方法

public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {
        //确定分区发现器是否关闭和是否唤醒
        if (!closed && !wakeup) {
            try {
                List<KafkaTopicPartition> newDiscoveredPartitions;
               //判断topic是否是固定的topic,如果是固定topic,则直接获取对应topic分区的元数据信息,如果是正则表达式,则获取所有topic,然后进行正则匹配,未匹配的topic会被移除,匹配上的topic也会获取每个topic对应分区的元数据信息
                if (topicsDescriptor.isFixedTopics()) {
                    newDiscoveredPartitions =
                            getAllPartitionsForTopics(topicsDescriptor.getFixedTopics());
                } else {
                    List<String> matchedTopics = getAllTopics();
                    Iterator<String> iter = matchedTopics.iterator();
                    while (iter.hasNext()) {
                        if (!topicsDescriptor.isMatchingTopic(iter.next())) {
                            iter.remove();
                        }
                    }
                    if (matchedTopics.size() != 0) {
                        newDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics);
                    } else {
                        newDiscoveredPartitions = null;
                    }
                }
                // 如果新发现的分区信息为空就抛出异常提示找不到任何分区信息
                if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) {
                    throw new RuntimeException(
                            "Unable to retrieve any partitions with KafkaTopicsDescriptor: "
                                    + topicsDescriptor);
                } else {
                	//校验此SubTask是否应该订阅此topic对应的分区
                    Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
                    KafkaTopicPartition nextPartition;
                    while (iter.hasNext()) {
                        nextPartition = iter.next();
                        if (!setAndCheckDiscoveredPartition(nextPartition)) {
                            iter.remove();
                        }
                    }
                }
				//返回SubTask应该订阅的topic以及对应分区
                return newDiscoveredPartitions;
            } catch (WakeupException e) {
                wakeup = false;
                throw e;
            }
        } else if (!closed && wakeup) {
            wakeup = false;
            throw new WakeupException();
        } else {
            throw new ClosedException();
        }
    }

5、初始化topic对应的分区和该分区要开始读取的起始偏移量信息,保存到subscribedPartitionsToStartOffsets

	//判断是否有可恢复的state信息
 if (restoredState != null) {
 			//遍历分区信息,如果restoredState不存在,则默认该分区从最早的offset开始读取
            for (KafkaTopicPartition partition : allPartitions) {
                if (!restoredState.containsKey(partition)) {
                    restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
                }
            }
            for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry :
                    restoredState.entrySet()) {
                 //判断该subtask的索引和这个分区分配给的subtask索引是否一样,一样就把分区和要读取的开始位置信息记录到subscribedPartitionsToStartOffsets中
                if (KafkaTopicPartitionAssigner.assign(
                                restoredStateEntry.getKey(),
                                getRuntimeContext().getNumberOfParallelSubtasks())
                        == getRuntimeContext().getIndexOfThisSubtask()) {
                    subscribedPartitionsToStartOffsets.put(
                            restoredStateEntry.getKey(), restoredStateEntry.getValue());
                }
            }
			//判断是否要过滤不匹配的topic信息,是的话,就过滤掉subscribedPartitionsToStartOffsets不匹配的分区信息
            if (filterRestoredPartitionsWithCurrentTopicsDescriptor) {
                subscribedPartitionsToStartOffsets
                        .entrySet()
                        .removeIf(
                                entry -> {
                                    if (!topicsDescriptor.isMatchingTopic(
                                            entry.getKey().getTopic())) {
                                        LOG.warn(
                                                "{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution.",
                                                entry.getKey());
                                        return true;
                                    }
                                    return false;
                                });
            }

            LOG.info(
                    "Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
                    getRuntimeContext().getIndexOfThisSubtask(),
                    subscribedPartitionsToStartOffsets.size(),
                    subscribedPartitionsToStartOffsets);
        } else {
            //如果又要恢复的state信息,首先判断启动模式
            switch (startupMode) {
           //SPECIFIC_OFFSETS模式下,specificStartupOffsets为空就抛异常
                case SPECIFIC_OFFSETS:
                    if (specificStartupOffsets == null) {
                        throw new IllegalStateException(
                                "Startup mode for the consumer set to "
                                        + StartupMode.SPECIFIC_OFFSETS
                                        + ", but no specific offsets were specified.");
                    }
					//如果不为空,则遍历分区信息
                    for (KafkaTopicPartition seedPartition : allPartitions) {
                        Long specificOffset = specificStartupOffsets.get(seedPartition);
                        //如果specificStartupOffsets存在该分区,就把offset设置成specificOffset减一
                        if (specificOffset != null) {
                            subscribedPartitionsToStartOffsets.put(
                                    seedPartition, specificOffset - 1);
                        } else {
                            //如果不存在就用消费组的offset
                            subscribedPartitionsToStartOffsets.put(
                                    seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
                        }
                    }
                    break;
                 // TIMESTAMP,如果startupOffsetsTimestamp为空,就抛异常
                case TIMESTAMP:
                    if (startupOffsetsTimestamp == null) {
                        throw new IllegalStateException(
                                "Startup mode for the consumer set to "
                                        + StartupMode.TIMESTAMP
                                        + ", but no startup timestamp was specified.");
                    }
			//如果不为空,则遍历分区,通过时间戳获取对应的offset信息,如果获取不到对应的offset,则默认使用最新的offset
                    for (Map.Entry<KafkaTopicPartition, Long> partitionToOffset :
                            fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp)
                                    .entrySet()) {
                        subscribedPartitionsToStartOffsets.put(
                                partitionToOffset.getKey(),
                                (partitionToOffset.getValue() == null)
                                        ? KafkaTopicPartitionStateSentinel.LATEST_OFFSET
                                        : partitionToOffset.getValue() - 1);
                    }
                    break;
                default:
                    for (KafkaTopicPartition seedPartition : allPartitions) {
                        subscribedPartitionsToStartOffsets.put(
                                seedPartition, startupMode.getStateSentinel());
                    }
            }
            //如果subscribedPartitionsToStartOffsets不为空,则根据本不同的启动模式,打印提示信息,其中SPECIFIC_OFFSETS模式下,有按照消费offset进行读取的会单独打印提示信息,如果为空就打印信息提示未找到分区的offset信息
            if (!subscribedPartitionsToStartOffsets.isEmpty()) {
                switch (startupMode) {
                    case EARLIEST:
                        LOG.info(
                                "Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                subscribedPartitionsToStartOffsets.keySet());
                        break;
                    case LATEST:
                        LOG.info(
                                "Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                subscribedPartitionsToStartOffsets.keySet());
                        break;
                    case TIMESTAMP:
                        LOG.info(
                                "Consumer subtask {} will start reading the following {} partitions from timestamp {}: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                startupOffsetsTimestamp,
                                subscribedPartitionsToStartOffsets.keySet());
                        break;
                    case SPECIFIC_OFFSETS:
                        LOG.info(
                                "Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                specificStartupOffsets,
                                subscribedPartitionsToStartOffsets.keySet());

                        List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets =
                                new ArrayList<>(subscribedPartitionsToStartOffsets.size());
                        for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :
                                subscribedPartitionsToStartOffsets.entrySet()) {
                            if (subscribedPartition.getValue()
                                    == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
                                partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
                            }
                        }

                        if (partitionsDefaultedToGroupOffsets.size() > 0) {
                            LOG.warn(
                                    "Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}"
                                            + "; their startup offsets will be defaulted to their committed group offsets in Kafka.",
                                    getRuntimeContext().getIndexOfThisSubtask(),
                                    partitionsDefaultedToGroupOffsets.size(),
                                    partitionsDefaultedToGroupOffsets);
                        }
                        break;
                    case GROUP_OFFSETS:
                        LOG.info(
                                "Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                subscribedPartitionsToStartOffsets.keySet());
                }
            } else {
                LOG.info(
                        "Consumer subtask {} initially has no partitions to read from.",
                        getRuntimeContext().getIndexOfThisSubtask());
            }
        }

6、反序列化器初始化工作,

this.deserializer.open(
                RuntimeContextInitializationContextAdapters.deserializationAdapter(
                        getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));

FlinkKafkaConsumerBase的run方法

1、判断subscribedPartitionsToStartOffsets是否为空,为空抛出异常

 if (subscribedPartitionsToStartOffsets == null) {
            throw new Exception("The partitions were not set for the consumer");
        }

2、初始化offset提交成功或者失败的计数器

this.successfulCommits =
                this.getRuntimeContext()
                        .getMetricGroup()
                        .counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
this.failedCommits =
            this.getRuntimeContext()
            .getMetricGroup()
            .counter(COMMITS_FAILED_METRICS_COUNTER);

3、获取subtask索引

        final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();

4、初始化offset提交回调函数

 this.offsetCommitCallback =
                new KafkaCommitCallback() {
                    @Override
                    public void onSuccess() {
                        successfulCommits.inc();
                    }

                    @Override
                    public void onException(Throwable cause) {
                        LOG.warn(
                                String.format(
                                        "Consumer subtask %d failed async Kafka commit.",
                                        subtaskIndex),
                                cause);
                        failedCommits.inc();
                    }
                };

5、判断是否没有topic消费,是的话就暂时进入闲置状态

if (subscribedPartitionsToStartOffsets.isEmpty()) {
            sourceContext.markAsTemporarilyIdle();
        }

6、初始化kafkaFetcher,这是flink和kafka交互的核心

this.kafkaFetcher =
                createFetcher(
                        sourceContext,
                        subscribedPartitionsToStartOffsets,
                        watermarkStrategy,
                        (StreamingRuntimeContext) getRuntimeContext(),
                        offsetCommitMode,
                        getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
                        useMetrics);

确保offset模式在DISABLED和ON_CHECKPOINTS下,enable.auto.commit属性为false

@Override
    protected AbstractFetcher<T, ?> createFetcher(
            SourceContext<T> sourceContext,
            Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
            SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
            StreamingRuntimeContext runtimeContext,
            OffsetCommitMode offsetCommitMode,
            MetricGroup consumerMetricGroup,
            boolean useMetrics)
            throws Exception {

        adjustAutoCommitConfig(properties, offsetCommitMode);

        return new KafkaFetcher<>(
                sourceContext,
                assignedPartitionsWithInitialOffsets,
                watermarkStrategy,
                runtimeContext.getProcessingTimeService(),
                runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
                runtimeContext.getUserCodeClassLoader(),
                runtimeContext.getTaskNameWithSubtasks(),
                deserializer,
                properties,
                pollTimeout,
                runtimeContext.getMetricGroup(),
                consumerMetricGroup,
                useMetrics);
    }

KafkaFetcher构造函数

public KafkaFetcher(
            SourceFunction.SourceContext<T> sourceContext,
            Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
            SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
            ProcessingTimeService processingTimeProvider,
            long autoWatermarkInterval,
            ClassLoader userCodeClassLoader,
            String taskNameWithSubtasks,
            KafkaDeserializationSchema<T> deserializer,
            Properties kafkaProperties,
            long pollTimeout,
            MetricGroup subtaskMetricGroup,
            MetricGroup consumerMetricGroup,
            boolean useMetrics)
            throws Exception {
        super(
                sourceContext,//发送记录和水印的上下文
                assignedPartitionsWithInitialOffsets,//topic分区的开始读取的offset信息
                watermarkStrategy,//水印策略
                processingTimeProvider,//主要用于周期性发射水印的服务
                autoWatermarkInterval,//发射水印周期
                userCodeClassLoader,//类加载器,主要用于反序列化水印assigners
                consumerMetricGroup,//metric的分组
                useMetrics);//是否产生metrics
		//反序列化器
        this.deserializer = deserializer;
        //将一组数据或者异常从生产者线程传输到消费者线程的工具
        this.handover = new Handover();
		//实际运行KafkaConsumer,并将记录批次传递给fetcher的线程
        this.consumerThread =
                new KafkaConsumerThread(
                        LOG,
                        handover,
                        kafkaProperties,
                        unassignedPartitionsQueue,
                        getFetcherName() + " for " + taskNameWithSubtasks,
                        pollTimeout,
                        useMetrics,
                        consumerMetricGroup,
                        subtaskMetricGroup);
        //以批处理方式发出记录的收集器                
        this.kafkaCollector = new KafkaCollector();
    }

7、判断是否需要动态发现新分区,是则执行runWithPartitionDiscovery,否则执行runFetchLoop

if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
            kafkaFetcher.runFetchLoop();
        } else {
            runWithPartitionDiscovery();
        }

runWithPartitionDiscovery本质上也是执行runFetchLoop方法,不过会启动一个线程,按照discoveryIntervalMillis这个参数,周期性的去执行发现新分区的操作。

 private void runWithPartitionDiscovery() throws Exception {
        final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();
		//创建发现新分区的线程
        createAndStartDiscoveryLoop(discoveryLoopErrorRef);
        //开启循环拉数
        kafkaFetcher.runFetchLoop();
        //确保分区发现器被唤醒
        partitionDiscoverer.wakeup();
        //等待分区发现服务执行结束
        joinDiscoveryLoopThread();
        final Exception discoveryLoopError = discoveryLoopErrorRef.get();
        if (discoveryLoopError != null) {
            throw new RuntimeException(discoveryLoopError);
        }
    }

runFetchLoop方法循环拉取数据

@Override
    public void runFetchLoop() throws Exception {
        try {
            //启动消费线程
            consumerThread.start();
            while (running) {
				//从handover中获取数据 在没有获取到下一个数据的时候,此方法会阻塞
                final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
                // 遍历所有分区
                for (KafkaTopicPartitionState<T, TopicPartition> partition :
                        subscribedPartitionStates()) {
					//获取该分区的记录
                    List<ConsumerRecord<byte[], byte[]>> partitionRecords =
                            records.records(partition.getKafkaPartitionHandle());
					//反序列化记录并放到kafkaCollector中,并发送数据 更新offset,产生timestamp和watermarks,如果收到流停止信息,就结束循环拉数
                    partitionConsumerRecordsHandler(partitionRecords, partition);
                }
            }
        } finally {
            //关闭消费线程,做一些清理工作
            consumerThread.shutdown();
        }
        //等待消费线程结束
        try {
            consumerThread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

FlinkKafkaConsumerBase的initializeState方法

该方法主要是从checkpoint中恢复offset信息

public final void initializeState(FunctionInitializationContext context) throws Exception {

        OperatorStateStore stateStore = context.getOperatorStateStore();
        //从state里获取存储的offset信息
        this.unionOffsetStates =
                stateStore.getUnionListState(
                        new ListStateDescriptor<>(
                                OFFSETS_STATE_NAME,
                                createStateSerializer(getRuntimeContext().getExecutionConfig())));

        if (context.isRestored()) {
            restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());

            // 遍历unionOffsetStates把offset信息放到restoredState中
            for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
                restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
            }
            LOG.info(
                    "Consumer subtask {} restored state: {}.",
                    getRuntimeContext().getIndexOfThisSubtask(),
                    restoredState);
        } else {
            LOG.info(
                    "Consumer subtask {} has no restore state.",
                    getRuntimeContext().getIndexOfThisSubtask());
        }
    }

FlinkKafkaConsumerBase的snapshotState方法

该方法主要进行checkpoint快照

public final void snapshotState(FunctionSnapshotContext context) throws Exception {
		//消费线程不在运行,打印取消快照的提示
        if (!running) {
            LOG.debug("snapshotState() called on closed source");
        } else {
        	//清除unionOffsetStates
            unionOffsetStates.clear();
            final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
            //fetcher还没有初始化,把run方法中初始化的offset信息放到unionOffsetStates中
            if (fetcher == null) {
                for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :
                        subscribedPartitionsToStartOffsets.entrySet()) {
                    unionOffsetStates.add(
                            Tuple2.of(
                                    subscribedPartition.getKey(), subscribedPartition.getValue()));
                }
				//根据checkpointd的id保存对应的restoredState信息

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
                }
            } else {
            //如果fetcher不为空,则获取当前分区和offset信息,并记录到unionOffsetStates中
                HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {         
                    pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
                }

                for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry :
                        currentOffsets.entrySet()) {
                    unionOffsetStates.add(
                            Tuple2.of(
                                    kafkaTopicPartitionLongEntry.getKey(),
                                    kafkaTopicPartitionLongEntry.getValue()));
                }
            }
            /为防止内存泄漏,移除最早的未提交的checkpoint记录
            if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
                    pendingOffsetsToCommit.remove(0);
                }
            }
        }
    }

FlinkKafkaConsumerBase的notifyCheckpointComplete方法

主要通知checkpoint完成时做的操作

public final void notifyCheckpointComplete(long checkpointId) throws Exception {
        if (!running) {
            LOG.debug("notifyCheckpointComplete() called on closed source");
            return;
        }

        final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
        if (fetcher == null) {
            LOG.debug("notifyCheckpointComplete() called on uninitialized source");
            return;
        }

        if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
            // only one commit operation must be in progress
            if (LOG.isDebugEnabled()) {
                LOG.debug(
                        "Consumer subtask {} committing offsets to Kafka/ZooKeeper for checkpoint {}.",
                        getRuntimeContext().getIndexOfThisSubtask(),
                        checkpointId);
            }

            try {
            //根据checkpointId 找不到对应的checkpoint位置,打印提示信息
                final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
                if (posInMap == -1) {
                    LOG.warn(
                            "Consumer subtask {} received confirmation for unknown checkpoint id {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            checkpointId);
                    return;
                }

                @SuppressWarnings("unchecked")
                Map<KafkaTopicPartition, Long> offsets =
                        (Map<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);

                // 移除老的checkpoints
                for (int i = 0; i < posInMap; i++) {
                    pendingOffsetsToCommit.remove(0);
                }

                if (offsets == null || offsets.size() == 0) {
                    LOG.debug(
                            "Consumer subtask {} has empty checkpoint state.",
                            getRuntimeContext().getIndexOfThisSubtask());
                    return;
                }
			//提交offset到kafka
                fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
            } catch (Exception e) {
                if (running) {
                    throw e;
                }
            }
        }
    }

相关资源
Flink与Spark Streaming在与kafka结合的区别

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐