kafka系列之Coordinator(14)
Coordinator与消费者在数据库设计过程中,我们经常会有这样的情况下某个基础表会被多个视图或者存储过程引用修改基础表的时候,我们必须小心翼翼地,因为不会有任何提示告诉我们,如果继续修改,会不会造成视图或者存储过程有问题即便我们知道有问题,我们也没有办法去让视图和存储过程刷新得到表最新的信息如果你在创建视图中使用了select *,就会导致各种各样的bug"协调者"有些陌生,所谓协调者,在 K
Coordinator与消费者
在数据库设计过程中,我们经常会有这样的情况下
- 某个基础表会被多个视图或者存储过程引用
- 修改基础表的时候,我们必须小心翼翼地,因为不会有任何提示告诉我们,如果继续修改,会不会造成视图或者存储过程有问题
- 即便我们知道有问题,我们也没有办法去让视图和存储过程刷新得到表最新的信息
-
如果你在创建视图中使用了select *,就会导致各种各样的bug
-
"协调者"有些陌生,所谓协调者,在 Kafka 中对应的术语是 Coordinator,它专门为 Consumer Group 服务,负责Group Rebalance 以及提供位移管理和组成员管理等。
-
Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。
Coordinator
- 所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。也就是说,所有 Broker 都有各自的 Coordinator 组件。那么,Consumer Group 如何确定为它服务的 Coordinator 在哪台 Broker 上呢
- 不同的group id会被哈希到不同的分区上,从而不同的broker能充当不同group的Coordinator
Coordinator的确定与分区分配
前面我们说到一个问题,那就是一个group内部,1个parition只能被1个consumer消费
,其实看到这里我们就知道应该有这样一个组件来负责partition的分配,而且前面学习消费者组机制的时候还提到过分区的三种分配策略。
对于每一个consumer group,Kafka集群为其从broker集群中选择一个broker作为其coordinator。因此,第1步就是找到这个coordinator。也就是说1个consumer group对应一个coordinattor
下面我们有一个group有3个consumer: c0, c1, c2
GroupCoordinatorRequest 请求
- 第 1 步:确定由位移主题的哪个分区来保存该 Group 数据:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。
- 第 2 步:找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator。
其实这个过程是发送了一个GroupCoordinatorRequest
定位请求去寻找coordinator
首先,Kafka 会计算该 Group 的 group.id 参数的哈希值。比如你有个 Group 的 group.id 设置成了“test-group”,那么它的 hashCode 值就应该是 627841412。其次,Kafka 会计算 __consumer_offsets 的分区数,通常是 50 个分区,之后将刚才那个哈希值对分区数进行取模加求绝对值计算,即 abs(627841412 % 50) = 12。
此时,我们就知道了位移主题的分区 12 负责保存这个 Group 的数据。有了分区号,算法的第 2 步就变得很简单了,我们只需要找出位移主题分区 12 的 Leader 副本在哪个 Broker 上就可以了。
这个 Broker,就是我们要找的 Coordinator
JoinGroup
所有consumer都往coordinator发送JoinGroup消息之后,coordinator会指定其中一个consumer作为leader,并把组成员信息以及订阅信息发给leader
其他consumer作为follower,然后由这个leader进行partition分配
SyncGroup
leader开始分配消费方案,即哪个consumer负责消费哪些topic的哪些partition
一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给Coordinator,Coordinator给它返回null
follower发送 null的 SyncGroupRequest 给Coordinator,Coordinator回给它partition分配的结果。
确定的意义
-
Consumer 应用程序,特别是 Java Consumer API,能够自动发现并连接正确的 Coordinator,我们不用操心这个问题。知晓这个算法的最大意义在于,它能够帮助我们解决定位问题。当 Consumer Group 出现问题,需要快速排查 Broker 端日志时,我们能够根据这个算法准确定位 Coordinator 对应的 Broker,不必一台 Broker 一台 Broker 地盲查。
-
例如提交offset失败,可能是由于Coordinator 所咋 broker 所在节点出了问题,这个时候我们就可以根据这个算法快速找到 Coordinator所在,然后查看日志。
-
或者是broker rebalance 太频繁,去找Coordinator所在的broker日志,会有类似于"(Re)join group" 之类的日志
源码解析Coordinator
下面这张图就是整个kafka 的源码结构,前面我们已经分析过clients
clients 目录:保存 Kafka 客户端代码,比如生产者和消费者的代码都在该目录下。
config 目录:保存 Kafka 的配置文件,其中比较重要的配置文件是 server.properties。
connect 目录:保存 Connect 组件的源代码。我在开篇词里提到过,Kafka Connect 组件是用来实现 Kafka 与外部系统之间的实时数据传输的。
core 目录:保存 Broker 端代码。Kafka 服务器端代码全部保存在该目录下。
streams 目录:保存 Streams 组件的源代码。Kafka Streams 是实现 Kafka 实时流处理的组件。
下面我们看一下core目录下的代码,这里我们几乎看到kafka 服务端的几乎全部代码,都在这里了,我们比较熟悉的的有controller
,我们主要看一下今天的主角coordinator
整个coordinator
下面按照功能分为了两大块,第一块就是group 也就是为我们的消费者组服务的,第二块就是transaction,主要是为分布式事务服务的。
我们看到这个group包下面总共也没几个类。
下面我们看一下整个请求的流程,我们前面学习幂等性生产
的时候说过了,客户端发起的请求,在服务端进行处理的入口是在KafkaApis
这个类里面的,客户端发起不同的请求,KafkaApis
里面就有不同的方法来处理对应的请求。
我们今天要了解的就是上面这个几个方法。
handleFindCoordinatorRequest
代码这里有一点需要注意的是,这里判断了这个请求是处理事务的还是消费者组的
后面就是和我们前面介绍的就一样了(groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME
方法里面计算出了partition信息,其实就是那个partition,这个方法的实现的话,大致如下
Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
找到了partition之后就开始找这个partition的lead 分区了,也就是我们下面图上的第二段代码。这样我们的Coordinator节点就找到了
handleJoinGroupRequest
下面这个代码其实就比较简单了,其实就是一个consumer 注册的过程,调用了groupCoordinator.handleJoinGroup() 的方法,将我们的consumer加入到我们的消费组里面去了
同时将确定了consumer group 的leader consumer,然后返回给了客户端,也就是我们的sendResponseCallback 方法。
handleSyncGroupRequest
下面就是comsumer 同步分区的分配信息,最后返回给客户端的assignmentMap,就是分配的结果。
handleOffsetCommitRequest
这个就是处理consumer 客户端提交offset 的方法了,这个代码有点长,我这里就帖几处比较重要的地方了
这里就是获取我们提交的offset 信息了
根据我们的请求,来判断offset 存在那里,老版本是存储在ZK里
最后是调用GroupCoordinator的handleCommitOffsets方法进行offset 的提交
总结
- Coordinator 在整个kafka 的消费者体系中的作用,负责Group Rebalance 以及提供位移管理和组成员管理等
- Coordinator 是如何进行和客户端进行交互完成相应的职能
更多推荐
所有评论(0)