kafka源码学习(三)消费者源码
通过前面对producer、服务端源码的剖析,consumer很多地方和前面类似,这里简单概述即可kafka版本:kafka-0.10.0.1。学习计划:四月份完成kafka源码的分析,也算对kafka的基础知识点有了简单的回顾。五月份:回到Flink篇章,这次重点了解资源调度、webui等相关基本的知识点,目的为后面自己编写Flink代码,如何分析调优,观察执行情况准备;一个小的任务:Flink
写在前面
通过前面对producer、服务端源码的剖析,consumer很多地方和前面类似,这里简单概述即可kafka版本:kafka-0.10.0.1。
学习计划:四月份完成kafka源码的分析,也算对kafka的基础知识点有了简单的回顾。
五月份:回到Flink篇章,这次重点了解资源调度、webui等相关基本的知识点,目的为后面自己编写Flink代码,如何分析调优,观察执行情况准备;一个小的任务:Flink从kafka写入Redis,数据一致性保证的测试。
1、consumer 初始化
阶段 1:Find Group Coordinator
查找 Group Coordinator的方式:
- 先根据消费组 groupid的 hash值计算它所应该在__consumer_offsets中的分区编号;
- 找到对应的分区号后,再寻找此分区 leader所在的 broker节点,则此节点即为自己的
Group Coordinator;
注:注意这里是消费组的 groupid
阶段 2:join the group
- 此阶段的重要操作之一:选举消费组的 leader
虽然也是 hashMap,但是仍然是随机 - 此阶段的重要操作之二:选举分区分配策略 (
这个策略不知道是不是和kafka版本有关,笔者使用的是kafka-0.10.0.1,似乎是由leader节点决定
)
最终选举的分配策略基本上可以被看作各个消费者支持的最多的策略,具体的选举过程
如下:- 收集各个消费者支持的所有分配策略,组成候选的 candidates
- 每个消费者从候选集 candidates找出第一个自身支持的策略,为这个策略投上一票。
- 计算候选集中各个策略的选票数,选票数最多的策略即为当前消费组的分配策略。
其实,此逻辑并不需要 consumer来执行,而是由 Group Coordinator来执行。
阶段 3:SYNC GROUP
此阶段,主要是由消费组 leader将分区分配方案,通过 Group Coordinator来转发给组中各
个消费者
阶段 4:HEART BEAT
进入这个阶段之后,消费组中的所有消费者就会处于正常工作状态。
各个消费者在消费数据的同时,保持与 Group Coordinator的心跳通信;,默认值是 3秒,
但是可以通过参数值进行修改。
2、consumers选择 group coordinator
-
consumer获取元数据信息,发送请求,进入
sendGroupCoordinatorRequest()
方法
org.apache.kafka.clients.consumer.internals.AbstractCoordinator
注:Discover the current coordinator for the group. Sends a GroupMetadata request to one of the brokers.这里consumer的ConsumerCoordinator
用于和group coordinator
进行交互,由消费者创建。
-
进入
handleGroupMetadataResponse
,对返回的元数据信息进行处理,this.coordinator=Node
,说明这个时候consumer知道group coordinator在哪个节点上
3、consumer leader的选举
-
consumer向group coordinator发送请求,事件类型
JOIN_GROUP
-
服务端接收请求,通过KafkaApis的handler处理,最终代码到
doJoinGroup()
方法,第一次进来,
if the member id is unknown, register the member to the group
-
进入
GroupMetadata
的add()方法,说明leader选举也是先到先得,和Controller也是一样的
kafka.coordinator.GroupMetadata
4、【assignment for the group】制定与下发
-
进入
JoinGroupResponseHandler
的handle()方法,如果自己是leader,则执行onJoinLeader()
方法
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.JoinGroupResponseHandler
-
进行onJoinLeader()方法
位置1:指定分区方案,perform the leader synchronization and send back the assignment for the group
位置2:发送请求给coordinator,事件类型SYNC_GROUP
-
同样的,服务端通过KafkaApis接收并处理请求,进入
doSyncGroup()
方法,执行case AwaitingSync =>
,下发leader的assignment
给消费者组的其他消费,然后转变消费者组状态为Stable
5、consumer消费数据
- 回到KafkaConsumer,执行
fetcher.sendFetches();
方法,事件类型为ApiKeys.FETCH
,
- 这个时候就回到了【服务端副本同步】,follower去leader拉取数据,略
6、consumer自动提交偏移量
-
在
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
中,开启自动提交,
注:This class manages the coordination process with the consumer coordinator.
-
同样最终发送请求到coordinator所在节点上,到自定义Topic,
__consumer_offset对应的partition上
,事件类型OFFSET_COMMIT
,后续略
7.consumer发送心跳
-
来到
AbstractCoordinator
的HeartbeatTask
,实现org.apache.kafka.clients.consumer.internals.DelayedTask
-
进入run()方法,consumer发送请求,事件类型
ApiKeys.HEARTBEAT
-
服务端进入
kafka.coordinator.GroupCoordinator
,
位置1:更新对应consumer上一次的心态时间
位置2:时间轮机制,向时间轮中插入一个任务,用于检查心跳是否超时
更多推荐
所有评论(0)