// --------------------- partition discovery loop ---------------------

List discoveredPartitions;

// throughout the loop, we always eagerly check if we are still running before

// performing the next operation, so that we can escape the loop as soon as possible

while (running) {

if (LOG.isDebugEnabled()) {

LOG.debug(“Consumer subtask {} is trying to discover new partitions …”, getRuntimeContext().getIndexOfThisSubtask());

}

try {

discoveredPartitions = partitionDiscoverer.discoverPartitions();

} catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) {

// the partition discoverer may have been closed or woken up before or during the discovery;

// this would only happen if the consumer was canceled; simply escape the loop

break;

}

// no need to add the discovered partitions if we were closed during the meantime

if (running && !discoveredPartitions.isEmpty()) {

kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);

}

// do not waste any time sleeping if we’re not running anymore

if (running && discoveryIntervalMillis != 0) {

try {

Thread.sleep(discoveryIntervalMillis);

} catch (InterruptedException iex) {

// may be interrupted if the consumer was canceled midway; simply escape the loop

break;

}

}

}

} catch (Exception e) {

discoveryLoopErrorRef.set(e);

} finally {

// calling cancel will also let the fetcher loop escape

// (if not running, cancel() was already called)

if (running) {

cancel();

}

}

}

}, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());

它定义了一个线程池对象,去动态发现kafka新增的topic(支持正则形式指定消费的topic),或者动态发现kafka新增的分区。

接着肯定是启动动态发现分区或者topic线程,并且启动kafkaFetcher。

discoveryLoopThread.start();

kafkaFetcher.runFetchLoop();

// --------------------------------------------------------------------

// make sure that the partition discoverer is properly closed

partitionDiscoverer.close();

discoveryLoopThread.join();

接着,我们进入kafkaFetcher的runFetchLoop方法,映入眼帘的是

// kick off the actual Kafka consumer

consumerThread.start();

这个线程是在构建kafka09Fetcher的时候创建的

this.consumerThread = new KafkaConsumerThread(

LOG,

handover,

kafkaProperties,

unassignedPartitionsQueue,

createCallBridge(),

getFetcherName() + " for " + taskNameWithSubtasks,

pollTimeout,

useMetrics,

consumerMetricGroup,

subtaskMetricGroup);

KafkaConsumerThread 继承自Thread,然后在其run方法里,首先看到的是

// this is the means to talk to FlinkKafkaConsumer’s main thread

final Handover handover = this.handover;

这个handover的作用呢暂且不提,接着分析run方法里面内容

1,获取消费者

try {

this.consumer = getConsumer(kafkaProperties);

}

2,检测分区并且会重分配新增的分区

try {

if (hasAssignedPartitions) {

newPartitions = unassignedPartitionsQueue.pollBatch();

}

else {

// if no assigned partitions block until we get at least one

// instead of hot spinning this loop. We rely on a fact that

// unassignedPartitionsQueue will be closed on a shutdown, so

// we don’t block indefinitely

newPartitions = unassignedPartitionsQueue.getBatchBlocking();

}

if (newPartitions != null) {

reassignPartitions(newPartitions);

}

3,消费数据

// get the next batch of records, unless we did not manage to hand the old batch over

if (records == null) {

try {

records = consumer.poll(pollTimeout);

}

catch (WakeupException we) {

continue;

}

}

4,通过handover将数据发出去

try {

handover.produce(records);

records = null;

}

由于被kafkaConsumerThread打断了kafkaFetcher的runFetchLoop方法的分析,我们在这里继续

1,拉取handover.producer生产的数据

while (running) {

// this blocks until we get the next records

// it automatically re-throws exceptions encountered in the consumer thread

自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。img

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!

如果你觉得这些内容对你有帮助,可以扫码获取!!(备注Java获取)

img

总结

这份面试题几乎包含了他在一年内遇到的所有面试题以及答案,甚至包括面试中的细节对话以及语录,可谓是细节到极致,甚至简历优化和怎么投简历更容易得到面试机会也包括在内!也包括教你怎么去获得一些大厂,比如阿里,腾讯的内推名额!

某位名人说过成功是靠99%的汗水和1%的机遇得到的,而你想获得那1%的机遇你首先就得付出99%的汗水!你只有朝着你的目标一步一步坚持不懈的走下去你才能有机会获得成功!

成功只会留给那些有准备的人!

《一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码》点击传送门即可获取!
面试题以及答案,甚至包括面试中的细节对话以及语录,可谓是细节到极致,甚至简历优化和怎么投简历更容易得到面试机会也包括在内!也包括教你怎么去获得一些大厂,比如阿里,腾讯的内推名额!

某位名人说过成功是靠99%的汗水和1%的机遇得到的,而你想获得那1%的机遇你首先就得付出99%的汗水!你只有朝着你的目标一步一步坚持不懈的走下去你才能有机会获得成功!

成功只会留给那些有准备的人!

[外链图片转存中…(img-R8klNFiw-1711974435799)]

《一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码》点击传送门即可获取!

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐