写在前面

通过前面对producer、服务端源码的剖析,consumer很多地方和前面类似,这里简单概述即可kafka版本:kafka-0.10.0.1。

学习计划:四月份完成kafka源码的分析,也算对kafka的基础知识点有了简单的回顾。
五月份:回到Flink篇章,这次重点了解资源调度、webui等相关基本的知识点,目的为后面自己编写Flink代码,如何分析调优,观察执行情况准备;一个小的任务:Flink从kafka写入Redis,数据一致性保证的测试。

1、consumer 初始化

阶段 1:Find Group Coordinator
查找 Group Coordinator的方式:

  • 先根据消费组 groupid的 hash值计算它所应该在__consumer_offsets中的分区编号;
  • 找到对应的分区号后,再寻找此分区 leader所在的 broker节点,则此节点即为自己的
    Group Coordinator;
    注:注意这里是消费组的 groupid

阶段 2:join the group

  • 此阶段的重要操作之一:选举消费组的 leader
    虽然也是 hashMap,但是仍然是随机
  • 此阶段的重要操作之二:选举分区分配策略 (这个策略不知道是不是和kafka版本有关,笔者使用的是kafka-0.10.0.1,似乎是由leader节点决定
    最终选举的分配策略基本上可以被看作各个消费者支持的最多的策略,具体的选举过程
    如下:
    1. 收集各个消费者支持的所有分配策略,组成候选的 candidates
    2. 每个消费者从候选集 candidates找出第一个自身支持的策略,为这个策略投上一票。
    3. 计算候选集中各个策略的选票数,选票数最多的策略即为当前消费组的分配策略。
      其实,此逻辑并不需要 consumer来执行,而是由 Group Coordinator来执行。

阶段 3:SYNC GROUP
此阶段,主要是由消费组 leader将分区分配方案,通过 Group Coordinator来转发给组中各
个消费者

阶段 4:HEART BEAT
进入这个阶段之后,消费组中的所有消费者就会处于正常工作状态。
各个消费者在消费数据的同时,保持与 Group Coordinator的心跳通信;,默认值是 3秒,
但是可以通过参数值进行修改。在这里插入图片描述

2、consumers选择 group coordinator

  1. consumer获取元数据信息,发送请求,进入sendGroupCoordinatorRequest()方法
    org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    注:Discover the current coordinator for the group. Sends a GroupMetadata request to one of the brokers.这里consumer的ConsumerCoordinator用于和group coordinator进行交互,由消费者创建。
    在这里插入图片描述

  2. 进入handleGroupMetadataResponse,对返回的元数据信息进行处理, this.coordinator=Node,说明这个时候consumer知道group coordinator在哪个节点上
    在这里插入图片描述

3、consumer leader的选举

  1. consumer向group coordinator发送请求,事件类型JOIN_GROUP
    在这里插入图片描述

  2. 服务端接收请求,通过KafkaApis的handler处理,最终代码到doJoinGroup()方法,第一次进来,
    if the member id is unknown, register the member to the group
    在这里插入图片描述

  3. 进入GroupMetadata的add()方法,说明leader选举也是先到先得,和Controller也是一样的
    kafka.coordinator.GroupMetadata
    在这里插入图片描述

4、【assignment for the group】制定与下发

  1. 进入JoinGroupResponseHandler的handle()方法,如果自己是leader,则执行onJoinLeader()方法
    org.apache.kafka.clients.consumer.internals.AbstractCoordinator.JoinGroupResponseHandler
    在这里插入图片描述

  2. 进行onJoinLeader()方法
    位置1:指定分区方案,perform the leader synchronization and send back the assignment for the group
    位置2:发送请求给coordinator,事件类型SYNC_GROUP
    在这里插入图片描述

  3. 同样的,服务端通过KafkaApis接收并处理请求,进入doSyncGroup()方法,执行case AwaitingSync =>,下发leader的assignment给消费者组的其他消费,然后转变消费者组状态为Stable
    在这里插入图片描述

5、consumer消费数据

  1. 回到KafkaConsumer,执行fetcher.sendFetches();方法,事件类型为ApiKeys.FETCH
    在这里插入图片描述
  2. 这个时候就回到了【服务端副本同步】,follower去leader拉取数据,略
    在这里插入图片描述

6、consumer自动提交偏移量

  1. org.apache.kafka.clients.consumer.internals.ConsumerCoordinator中,开启自动提交,
    注:This class manages the coordination process with the consumer coordinator.
    在这里插入图片描述

  2. 同样最终发送请求到coordinator所在节点上,到自定义Topic,__consumer_offset对应的partition上,事件类型OFFSET_COMMIT,后续略
    在这里插入图片描述

7.consumer发送心跳

  1. 来到AbstractCoordinatorHeartbeatTask,实现org.apache.kafka.clients.consumer.internals.DelayedTask
    在这里插入图片描述

  2. 进入run()方法,consumer发送请求,事件类型ApiKeys.HEARTBEAT
    在这里插入图片描述

  3. 服务端进入kafka.coordinator.GroupCoordinator
    位置1:更新对应consumer上一次的心态时间
    位置2:时间轮机制,向时间轮中插入一个任务,用于检查心跳是否超时
    在这里插入图片描述

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐