核心概念

从概念上讲,topic只是一个逻辑概念,代表了一类消息,也可以认为是消息被发送到的地方。通常可以使用topic来区分实际业务。

Kafka中的topic通常会被多个消费者订阅,因此出于性能考虑,kafka并不是topic-meaaage的两极结构,而是采用了topic-partition-message的三级结构来分散负载。从本质上说,每个kafka的topic都有若干个partition组成。
topic是由多个partition组成的,kafka的partition是不可修改的有序消息序列,也可以说是有序的消息日志。每个partition都有自己专属的partition号,通常是从0开始的,用户对partition唯一的操作就是在消息序列尾部追加写入消息。Partition上的每条消息都会被分配一个唯一的序列号,该序列号被称为位移(offset)。该位移是从0开始顺序递增的整数,位移信息可以定位到partition下的一条消息。

kafka的Partition实际上没有太多的业务含义,它的引入就是单纯地为了提升系统的吞吐量,因此在创建kafka topic的时候可以根据集群实际配置设置具体的Partition数,实现整体性能的最大化。

Offset

Kafka Partition下的每条消息都会被分配一个位移值。实际上,kafka消费者也有位移的概念,但是这个两个位移值属于不同的概念。每条消息在某个Partition的位移是固定的,但消费该partition的消费者位移会随着消费进度不断前移,但终究不可能超过该分区最新一条消息的位移。
Kafka中的一条消息其实就是一个<topic,partition,offset>三元组(tuple),通过该元组可以在kafka集群中找到唯一对应的那条消息。

Replica

分布式系统实现高可靠性,目前主要的途径还是依靠冗余机制,简单说,就是备份多份日志。

Partition 是有序消息日志,那么一定不能只保存这一份日志,否则一旦保存 partition 的 kafka 服务器挂掉了,其上保存的日志也就都丢失了。这些备份日志在 kafka 中称为副本(replica)。Kafka 从0.8版本开始为分区引入了多副本机制,通过增加副本数量来提升数据容灾能力。

副本是相对于分区而言的,即副本是特定分区的副本。

一个分区中包含一个或多个副本,其中一个为 leader 副本,其余为 follower 副本,各个副本位于不同的 broker 节点中。
follower 副本是不能提供服务给客户端的,也就是说不负责响应客户端发来的消息写入和消息消费请求。它只是被动的向leader 副本获取数据,而一旦leader 副本所在的 broker 宕机,kafka 会从剩余的副本中选举出新的 leader 继续提供服务。

分区中所有的副本统称为 AR(Assigned Replicas)
一个分区的 AR 集合在分配的时候就被指定,并且只要不发生重分配的情况,集合内部副本的顺序是操持不变,而分区的 ISR 集合中副本的顺序可能会发生改变。

副本存在的唯一目的就是防止数据丢失。

Leader 和 Follower

副本的 leader 和 follower 角色设定几乎完全取代了过去的主备提法。和传统的主备系统不同的是,leader-follower 系统中通常只用 leader 对外提供服务,follower 只是被动的追随 leader 的状态,保持与 leader 的同步。follower 存在的唯一价值就是充当 leader 的候补,一旦 leader 挂掉立即就会有一个 follower 被选举成新的 leader 接替他的工作。

Kafka 保证同一个 broker 节点不可能出现一个 partition 的多个副本,即 kafka 集群的一个 broker 中最多只能有一个 partition 的一个副本。如果同一个 Partition 的多个副本分配在同一个 broker,将无法实现冗余备份的效果。

ISR

ISR的全称是in-sync replica,也就是与leader副本保持一定程度同步的副本集合(包括leader副本在内)。ISR集合是AR集合中的一个子集。与leader副本同步滞后过多的副本(不包含leader副本)组成OSR(Out-of-Sync Replica)。由此可见AR=ISR+OSR。在正常的情况下,所有的follower副本都应该与leader副本保持一定程度的同步,及AR=ISR,OSR为空。

Kafka为Partition动态维护一个replica集合。该集合中的所有replica保存的消息日志都与leader replica保持同步状态。只有在这个集合中的replica才能被选举为leader。也只有这个集合中所有replica都收到同一条消息,kafka才会将这条消息置为“已提交”状态,即认为这条消息发送成功。之后才会更新分区的HW,进而消费者可以消费到这条消息。

正常情况下,Partition的所有replica(含leader replica)都应该与leader replica保持同步,即所有的replica都在ISR中。因为各种各样的原因。一小部分replica开始落后于leader replica的进度。当滞后到一定程度时,kafka会将这些replica踢出ISR,当这些replica重新追上leader的进度时,kafka会将他们加回到ISR中。这一切都是自动维护的,不需要用户进行人工干预,因而在保证了消息交付语义的同时还简化了用户的操作成本。

Kafka承诺只要ISR集合中至少存在一个replica,那些“已提交”状态的消息就不会丢失。这句话的两个关键点:
(1)ISR中至少存在一个“活着的”replica
(2)“已提交”消息

HW 与 LEO

详见 consumer 位移提交部分。

失效副本

正常情况下,分区的所有副本都处于 ISR 集合中,但是难免会有异常情况发生,从而某些副本被剥离出 ISR 集合中。在 ISR 集合之外,也就是处于同步失效功能失效(比如副本处于下线状态)的副本统称为失效副本,存在失效副本的分区也就称为同步失效分区,即 under-replicated 分区。

怎么判定一个分区是否有副本处于同步失效的状态昵?
Kafka 从0.9.x版本开始就通过唯一的 broker 端参数 replica.lag.time.max.ms 来抉择,默认值为10000,当 ISR 集合中的一个 follower 副本滞后 leader 副本的时间超过此参数指定的值时则判定为同步失败,需要将此 follower 副本剔除 ISR 集合。

Kafka 源码注释中说明了一般有两种情况会导致副本失效:

  • follower 副本进程卡住,在一段时间内根本没有向 leader 副本发起同步请求,比如频繁 Full GC。
  • follower 副本进程同步过慢,在一段时间内都无法追赶上 leader 副本,比如 I/O 开销过大。

在这里再补充一点,如果通过工具增加了副本因子,那么新增加的副本在赶上 leader 副本之前也都是处于失效状态的。如果一个 follower 副本由于某些原因(比如宕机)而下线,之后又上线,在追赶上 leader 副本之前也处于失效状态。


Topic 的管理

创建Topic

通过查看 kafka-topics.sh 脚本的源码得知,创建 topic 实际是通过 kafka.admin.TopicCommand 进行的

当创建一个topic时会在zookeeper的 /brokers/topics 目录下创建一个同名的实节点,该节点中记录了该topic的分区副本分配方案。示例:
get /brokers/topics/Test
{"version":1,"partitions":{"2":[1,2],"1":[3,1],"0":[2,3]}}

其中 "2":[1,2] 表示分区2分配了两个副本,分别在brokerId为1和2的broker节点中。

zookeeper的/config/topics目录下与topic同名的实节点保存了该topic的Config参数。示例:
get /config/topics/Test
{"version":1,"config":{}}

其中 "config":{} 对应的数据即为 topic 的 Config 参数。

当创建一个topic时,无论是通过什么方式,实质上是在 zookeeper 中的 /brokers/topics 节点下创建与该topic对应的子节点并写入分区副本分配方案,并且在 /config/topics 节点子下创建与该 topic 对应的子节点并写入 topic 相关的配置信息(这个步骤可以省略不执行)。而 kafka 创建 topic 的实质动作是交由 controller 异步去完成的。

可以直接使用zookeeper的客户端在/brokers/topics节点下创建相应的topic节点并写入预先设定好的分配方案来创建一个新的topic。

分区副本的分配

在创建topic时,如果使用了replica-assignment参数,那么就按照指定的方案来进行分区副本的创建,如果没有使用replica-assignment参数,那么就需要按照内部的逻辑来计算分配方案;
创建时未指定机架的分配策略,具体实现为kafka.admin.AdminUtils类的assignReplicasToBrokersRackUnaware方;指定机架的分配策略,具体实现为assignReplicasToBrokersRackAware方法。

查看Topic

(1)查看包含覆盖配置的topic,它只会列出包含了与集群不一样配置的topic。使用topics-with-overrides参数时只会显示使用describe指令的第一行信息
./kafka-topics.sh --zookeeper 196.163.10.103 --describe --topics-with-overrides
(2)找出包含失效副本的分区。包含失效副本的分区可能正在进行同步操作,也有可能同步发生异常,此时分区的 ISR 集合小于 AR 集合
./kafka-topics.sh --zookeeper 196.163.10.103 --describe --under-replicated-partitions

(3)查看topic中没有leader副本的分区,这些分区已经处于离线状态,对于外界的生产者和消费者来说处于不可用的状态
./kafka-topics.sh --zookeeper 196.163.10.103 --describe --unavailable-partitions

删除Topic

删除Topic可以使用kafka-topics.sh脚本中的--delete指令就可以删除topic
./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topicName
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
执行命令之后,会有相关提示信息,这个提示信息和broker端配置参数delete.topic.enable有关,默认值为true。必须将delete.topic.enable设置为true才能够删除topic。如果设置为false,那么删除主题的操作将会被忽略。

如果使用kafka-topics.sh脚本删除kafka内部topic,删除时会报错。截至kafka 2.0.0,kafka一共有2个内部topic,分别是__consumer_offsets和__transaction_state;删除一个不存在的topic也会报错。可以使用--if-exists参数来忽略异常。

使用 kafka-topics.sh 脚本删除 topic 的本质上只是在 Zookeeper 中的 /admin/delete_topics 路径下创建一个与待删除 topic 同名的节点,依次标记该 topic 为待删除的状态,与创建 topic 相同的是,真正删除 topic 的动作也是由 kafka 的 controller 负责完成的。

了解这一原理后,可以直接在/admin/delete_topics路径下创建同名的节点来删除指定的topic。
create /admin/delete_topics/topicName ""

还可以通过手动的方式来删除topic,topic的元数据存储在zookeeper中的/brokers/topics和/config/topics路径下,topic中的消息存储在log.dir或log.dirs配置的路径下,只需要手动删除这三处地方的内容即可。先删除zk节点信息,再删除data对应目录。
(1)单独删除/config/topics/,不会删除topic,重启kafka之后,topic依旧存在,但是/config/topics/节点下相应的节点不存在了,可能会存在其他问题
(2)单独删除/brokers/topics/,停止kafka时会报错,再次启动kafka后,对应topic不存在,但是data目录和/config/topics/依旧存在,可能会存在其他问题
(3)单独删除data目录,不会删除topic。停止topic时会报错,再次启动之后topic依旧存在,data目录会重新被创建,但是数据丢失

注意:删除topic是一个不可逆的操作。一旦删除之后,与其相关的所有消息数据会被全部删除。

Topic端参数

与topic相关的所有配置参数在broker层面都有对应参数。比如topic端参数cleanup.policy对应broker层面的log.cleanup.policy。如果没有修改过topic的任何配置参数,那么就会使用broker端的对应参数作为其默认值。


Partition 的管理

优先副本的选举

kafka 集群的 broker 节点不可避免地会遇到宕机或崩溃的问题,当分区的 leader 节点发生故障时,其中的一个 follower 节点就会成为新的 leader 节点,即使发生故障的 broker 恢复正常后,其上的分区副本也只能作为 follower 副本加入 ISR 中。这样就有可能会导致集群的负载失衡(集群中的一小部分 broker 上承载了大量的分区 leader),从而影响整体的健壮性和稳定性。

为了能够有效的治理负载失衡的情况,kafka 引入了优先副本(Preferred Replica)的概念。所谓的优先副本是在 AR 集合列表中的第一个副本理想情况下,优先副本就是该分区的 leader 副本。kafka 要确保所有 topic 的优先副本在 kafka 集群中均匀分布,这样就保证了所有分区的 leader 均衡分布。

优先副本的选举是指通过一定的方式促使优先副本选举为 leader 副本,以此来促进集群的负载均衡,这一行为也可以称为分区平。需要注意的是,分区平衡并不意味着 Kafka 集群的负载均衡,因为还要考虑集群中的分区分配是否均衡。

在 kafka 提供分区自动平衡的功能,与此对应的 broker 端参数是 auto.leader.rebalance.enable,默认值为 true,即此功能默认开启。如果开启分区自动平衡的功能,则 kafka 的控制器会启动一个定时任务,这个定时任务会轮询所有的 broker 节点,该定时任务的执行周期由参数 leader.imbalance.check.interval.seconds 控制,默认值为300s,计算每个 broker 节点的分区不平衡率(broker 中的不平衡率 = 非优先副本的 leader 个数 / 分区总数)是否超过 leader.imbalance.per.broker.percentage 参数配置的比值,默认值为10%,如果超过设定的比值则会自动执行优先副本的选举动作以分区平衡。

在生产环境中建议将 auto.leader.rebalance.enable 值修改为 false,因为可能会引起负面的性能问题,也有可能引起客户端一定时间的阻塞。因为因为执行的时间无法自主掌控,如果在关键时期执行关键任务的时刻执行优先副本的自动选举操作,势必会有业务阻塞、频繁超时之类的风险。

kafka-preferred-replica-election.sh 脚本提供了对分区 leader 副本进行重新平衡的功能。优先副本的选举过程是一个安全的过程,kafka 客户端可以自动感知分区 leader 副本的变更,具体使用方法如下:

./kafka-preferred-replica-election.sh -zookeeper 196.163.10.103
执行命令后会打印
Created preferred replica election path with [partition名称,……]
Successfully started preferred replica election for partitions Set([partition名称,……])

同时会在Controller节点的controller.log日志中打印如下关键日志:
Starting preferred replica leader election for partitions [partition名称,……]
Partition [partition名称] completed preferred replica leader election. New leader is [数字] 

如上示例会将集群上所有的分区都执行一遍优先副本的选举操作,如果集中包含了大量的分区,那么上面的这种使用方式有可能会失效。在优先副本的选举过程中,具体的元数据信息会被存入 zookeeper 的 /admin/preferred_replica_election 节点(在0.10.2.x版本中没有找到该节点),如果分区元数据超过了 zookeeper 节点所允许的大小,那么选举就会失败。默认情况下zookeeper 所有允许的节点数据大小为 1MB。使用 path-to-json-file 参数来指定一个 json 文件,可以对指定的分区进行优先副本的选举操作。json文件示例如下:
{
    "partition": [{
        "topic": "topicName",
        "partition": 0
    }, {
        "topic": "topicName",
        "partition": 1
    }]
}

Partition 的 leader 选举

Partition 的 leader 副本的选举由 Controller 负责具体实施。

(1)当创建分区(创建 topic 或在原先的topic上增加分区都有创建分区的动作)或分区上线(比如分区中原先的 leader 副本下线,此时分区需要重新选举一个新的 leader 上线来对外提供服务)的时候都需要执行 leader 的选举动作,对应的选举策略为 OfflinePartitionLeaderSelector。这种策略的基本思想是按照 AR 集合中副本的顺序查找第一个存活的副本,并且这个副本在 ISR 集合中。如果 ISR 集合中没有可用的副本, 那么此时还要再检查一下所配置的 unclean.leader.election.enable 参数,默认值为false。如果这个参数配置为 true,那么表示允许从非 ISR 列表中的选举 leader,从 AR 列表中找到第一个存活的副本即为 leader。
(2)当进行分区重分配的时候也需要执行 leader 的选举动作,对应的选举策略为 ReassignedPartitionLeaderSelector。这个选举策略的思路比较简单:从重分配的 AR 列表中找到第一个存活的副本,且这个副本在目前的 ISR 列表中。
(3)当发生优先副本的选举时,直接将优先副本设置为 leader 即可。对应的选举策略为 PreferredReplicaPartitionLeaderSelector。
(4)当某节点被优雅地关闭(也就是执行 ControlledShutdown)时,位于这个节点的 leader 副本都会下线,所以与此对应的分区需要执行 leader 的选举,对应的选举策略为 ControlledShutdownLeaderSelector。基本思想是按照 AR 集合中副本的顺序查找第一个存活的副本,并且这个副本在 ISR 集合中,于此同时还要确保这个副本不处于正在被关闭的节点上。

分区重分配

当集群中的一个节点突然宕机下线时,如果节点上的分区是单副本的,那么这些分区就变得不可用了,在节点恢复前,相应的数据也就处于丢失状态;如果节点上的分区是多副本的,那么位于这个节点上的 leader 副本的角色会转移到集群的其他 follower 副本中。总而言之,宕机下线这个节点上的分区副本都处于失效的状态,kafka 并不会将这些失效的分区副本自动迁移到集群中的剩余的可用 broker 节点上
当要对集群中的一个节点进行有计划的下线操作时,为了保证分区及副本的合理分配,也希望通过某种方式能够将该节点上的分区副本迁移到其他的可用节点上;
当集群中新增 broker 节点时,只有新创建的 topic 分区才有可能被分配到这个节点上,而之前的 topic 分区并不会自动的分配到新加入的节点中。

为了解决上述问题,需要让分区副本再次进行合理的分配,也就是所谓的分区重分配。kafka 提供了 kafka-reassign-partitions.sh 脚本来执行分区重分配的工作,它可以在就集群扩容,broker 节点失效的场景下对分区进行迁移。

Partition 的四种状态

NonExistentPartition:这个状态表示该分区要么没有被创建过或曾经被创建过但后面被删除了。
NewPartition:分区创建之后就处于 NewPartition 状态。在这个状态中,分区分配了副本,但是还没有选举出 leader 和 ISR。
OnlinePartition:一旦分区的 leader 被推选出来,它就处于 OnlinePartition 状态。
OfflinePartition:如果 leader 选举出来后,leader 节点宕机了,那么该分区就处于 OfflinePartition 状态。

四种状态的相互转换:

NonExistentPartition -> NewPartition
首先将第一个可用的副本broker作为leader broker并把所有可用的副本对象都装入ISR,然后写leader和ISR信息到zookeeper中保存。
对于这个分区而言,发送LeaderAndIsr请求到每个可用的副本broker,以及UpdateMetadata请求到每个可用的broker上。

OnlinePartition, OfflinePartition -> OnlinePartition
为该分区选取新的leader和ISR以及接收LeaderAndIsr请求的一组副本,然后写入leader和ISR信息到zookeeper中保存。

NewPartition, OnlinePartition -> OfflinePartition
标记分区状态为离线(offline)。

OfflinePartition -> NonExistentPartition
离线状态标记为不存在分区,表示该分区失败或者被删除。


KafkaAdminClient API

在0.11.0.0版本之前,可以通过kafak-core包(Kafka服务端代码)下的kafka.admin.AdminUtils和kafka.admin.AdminClient来实现部分kafka的管理功能,但他们都已经过时了,在未来的版本中会被删除。
从0.11.0.0版本开始,kafka提供了另一个工具类org.apache.kafka.clients.amdin.KafkaAdminClient来作为替代方案。KafkaAdminClient不仅可以用来管理broker、配置和ACL,还可以用来管理topic。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐