目录

一、Kafka Broker

1. 工作原理

2. Kafka副本

2.1 副本基本信息

2.2 Leader选举流程

2.3 Leader和Follower故障处理细节

2.4 分区和副本分配

3. 文件存储

3.1 文件存储机制

3.2 文件清理策略

3.3 Kafka的高效读写数据

二、Kafka 消费者

1. 消费方式

2. 消费者组

​编辑

3. 分区分配策略

4. 消费者offset的存储

三、Kafka Eagle监控


一、Kafka Broker

1. 工作原理

1)每一个Broker节点启动后都会向ZK注册,存储在/kafka/brokers/ids中,上图例子中[0,1,2]

2)Controller对应最优先启动注册的节点。

3)最优先启动注册(上图例子中 Broker0)的Controller通过zk的kafka/brokers/ids监听Brokers节点的变化。

4)Controller管理Leader选举,选举规则:在isr中存活为前提,按照AR(Assigned Replicas、Kafka分区中的所有副本统称[broker0,1,2])中排在前面的优先。例如 ar[1,0,2], isr [1,0,2],那么leader 就会按照1,0,2的顺序轮询。其中,ISR(In-Sync Replicas、同步副本集,是AR的子集)为所有Follower和Leader之间通讯正常的所有节点。

5)Controller将节点信息上传到ZK,存储在/brokers/topics/first/partitions/0/state: "leader":1 ,"isr":[1,0,2 ]

6)其他Controller从zk同步相关数据,防止leader挂掉,随时上位

7)生产环境中若Leader挂掉了,/kafka/brokers/ids 变化为[0,2]

8)Controller监听节点变化

9)从ZK获取新的Leader和isr信息,进行重新选举, 更新zk,"leader":0 ,"isr":[0,2 ]

2. Kafka副本

2.1 副本基本信息

(1)Kafka 副本作用:提高数据可靠性。

(2)Kafka 默认副本 1 个,生产环境一般配置为 2 个。太多副本一方面会增加磁盘存储空间,另一方面增加网络上数据传输,降低效率。

(3)Kafka 中副本包含Leader 和 Follower。Kafka 生产者只会把数据发往 Leader, 然后 Follower 找 Leader 进行同步数据,消费者对Leader进行消费。

(4)Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。 AR = ISR + OSR

ISR:表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间参数默认 30s。Leader 发生故障之后,就会从 ISR 中选举新的 Leader。

OSR:Follower 与 Leader 副本同步时,延迟过多的副本(被踢出ISR的副本)。

2.2 Leader选举流程

Kafka集群中有一个broker会被选举为Controller,负责管理集群broker的上下线、所有topic的分区副本分配和leader的选举等工作。Controller的工作管理是依赖于zookeeper的。Controller本身不决定leader的归属,只是一个代理的作用。

选举规则:在isr中存活为前提,按照AR排在前面的优先。

eg. AR = [0,1,2], isr = [0.1,2] 此时leader = 0,若broker0宕机,则Isr = [1,2],ar=[0,1,2],leader = 1

2.3 Leader和Follower故障处理细节

LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1

HW(High Watermark):所有副本中最小的LEO

eg.下图broker0的 HW = 5, LEO = 8

上图,broker0为Leader,其余为Follower。

  • 若此时broker0发生宕机(故障)

(1) Leader发生故障之后,会从ISR中选出一个新的Leader(ISR变为[1,2],再根据AR顺序选择)

(2)为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件大于等于HW的部分截掉,然后从新的Leader同步数据。(若broker1被选举为新leader,则broker2中5,6部分数据被删除,保留0,1,2,3,4与新leader同步)

Leader故障的故障处理只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

  • 若此时broker1发生宕机(故障)

(1) Follower发生故障后会被临时踢出ISR(ISR变为[0,2])

(2) 这个期间Leader和Follower继续接收数据 

(3)待该Follower恢复后,Follower会读取本地磁盘记录的上次的HW(5),并将log文件大于等于HW的部分(5及以后的数据)截取掉,从HW开始向Leader进行同步。

(4)等该Follower的LEO大于等于该Partition的HW,即Follower追上Leader之后,就可以重新加入ISR了。

2.4 分区和副本分配

分配策略:(尽量保障负载均衡

  1. 尽量将副本平均分配在所有的broker上
  2. 每个broker上分配到的leader副本尽可能一样多
  3. 分区的多个副本被分配在不同的broker上

eg.现有4个broker,创建1个新的topic,16 个分区,3 个副本,自动分配情况如下。

当然,也可以自定义修改分配策略,这里不作介绍。详情可参考Kafka分区副本重分配源码分析_罗纳尔光的博客-CSDN博客

        正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上来保证每台机器的读写吞吐量都是均匀的。但如果某些broker宕机,会导致Leader Partition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,其他宕机的broker重启之后都是follower partition,读写请求很低,造成集群负载不均衡。

负载不均衡示例:

3. 文件存储

3.1 文件存储机制

Kafka文件存储是通过本地落盘的方式存储的。由于Kafka中消息是以topic进行分类的,Topic是逻辑上的概念,而partition是物理上的概念。

  • 一个Topic分为多个partition,每个partition对应一个log文件,该log文件中存储的就是Producer生产的数据。
  • Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片索引机制,每个partition分为多个segment。
  • 每个segment包括:“.index”文件、“.log”文件和“.timeindex”等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如:first-0。目录为/kafka/datas 。

“.index”为稀疏索引,大约每往log文件写入4kb数据,会往index文件写入一条索引。且文件中保存的offset为相对offset,这样能确保offset的值所占空间不会过大。该文件包含offset和position(用于定位.log文件数据的位置).log中存放的是真实的数据。 一个Segment默认大小为1G

index文件与log文件结构示意图

如何快速定位offset = 3的数据?

  • 若要找到offset = 3的数据,首先根据目标offset定位Segment文件,通过二分查找.index文件到查找到当前消息具体的偏移,如上图所示,3大于0小于6,则定位到第一个segment文件中。
  • 找到小于等于目标offset的最大offset对应的索引项。通过第一个.index文件的相对偏移量,定位位置3。
  • 通过position定位到log文件,向下遍历找到目标Record。获取到position = 756之后,定位到.log文件向下得到,在此例中为message3。

3.2 文件清理策略

Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。

  • log.retention.hours,最低优先级小时,默认 7 天。
  • log.retention.minutes,分钟。
  • log.retention.ms,最高优先级毫秒。(不常用)
  • log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。

清理策略分为deletecompact(压缩)。

1. delete 日志删除:将过期数据删除

  • log.cleanup.policy = delete 所有数据启用删除策略

(1)基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。

如果一个 segment 中有一部分数据过期,一部分没有过期,那这个segment就不会被删除。

(2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。

2.compact 日志压缩:对于相同key的不同value值,只保留最后一个版本。

  • log.cleanup.policy = compact 所有数据启用压缩策略

压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个offset大的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。

这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。 

3.3 Kafka的高效读写数据

1)Kafka 本身是分布式集群,可以采用分区技术,并行度高

2)顺序写磁盘

Kafka的producer生产数据,需要写入到log文件中,写的过程是追加到文件末端,顺序写的方式,官网有数据表明,同样的磁盘,顺序写能够到600M/s,而随机写只有200K/s,这与磁盘的机械结构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

3)零拷贝

Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者。Kafka Broker应用层不关心存储的数据,所以就不用走应用层,传输效率高。

零拷贝技术只用将磁盘文件的数据复制到页面缓存(PageCache)中一次,然后将数据从页面缓存直接发送到网络中。下图为传统数据读取操作,需要有4步。

非零拷贝技术

  • 操作系统将数据从磁盘文件中读取到内核空间的页面缓存
  • 应用程序将数据从内核空间读入到用户空间缓冲区
  • 应用程序将读到的数据写回内核空间并放入到socket缓冲区
  • 操作系统将数据从socket缓冲区复制到网卡接口,此时数据通过网络发送给消费者

Kafka重度依赖底层操作系统(Linux)提供的PageCache功能。当上层有写操作时,操作系统只是将数据写入 PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。 

eg. 如果有10个消费者,传统方式下,数据复制次数为4*10=40次,而使用“零拷贝技术”只需要1+10=11次,一次为从磁盘复制到页面缓存(PageCache),10次表示10个消费者各自读取一次页面缓存。

二、Kafka 消费者

1. 消费方式

  • pull(拉):主动获取数据。
  • push(推):被动获取数据。

Kafka的consumer采用pull(拉)的方式来从broker中读取数据。

push(推)的模式中,消息发送速率由broker决定,很难适应消费速率不同的消费者,容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。
pull(拉)可能会出现broker中没有数据,消费者陷入循环,此时可设置timeout,取数据为空时,过一段时间再消费。

2. 消费者组

一个消费者可以消费单个或多个分区数据。

同一个消费者组中的消费者,同一时刻只能有一个消费者消费。

Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。

  • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
  • 消费者组之间互不影响。所有的消费者都属于某个消费者组(都有一个groupid),即消费者组是逻辑上的一个订阅者。

3. 分区分配策略

一个consumer group中有多个consumer,一个topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由那个consumer消费的问题。

Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。

可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。

Kafka可以同时使用多个分区分配策略。

Range分区策略:

按照每个topic进行平均分配。partitions数/consumer数计算每个Consumer会得到多少个分区,如果没有除尽,多出来的分区则按照字典序挨个分配给消费者。按照此方式以此分配每一个topic给订阅的消费者,最后完成topic分区的分配。

eg.1一个Topic,7 个分区,3 个消费者,排序后的分区将会 是C0:0,1,2;C1:3,4;C2:5,6; 7/3 = 2余1,除不尽,消费者C0便会多消费1个分区。

 eg. t0,t1两个Topic,均有3个Partition,C0,C1两个Consumer,分配情况如下:

注意:如果有N多个topic,那么针对每个topic,消费者C0都将多消费1个分区,topic越多,C0消 费的分区会比其他消费者明显多消费 N 个分区。

RoundRobin分区策略:

RoundRobin 针对集群中所有Topic而言。按照分区的字典对分区和消费者进行排序,对分区进行循环遍历,遇到自己订阅的则消费,否则向下轮询下一个消费者。即按照分区轮询消费者,继而消息被消费。

eg.假设存在三个topic:t0/t1/t2,分别拥有1/2/3个分区,共有6个分区,分别为t0-0;t1-0/t1-1;t2-0/t2-1/t2-2,这里假设我们有三个Consumer,C0、C1、C2,订阅情况为C0:t0,C1:t0、t1,C2:t0/t1/t2。

Round Robin策略

Sticky 粘性分区:

分配的结果带有“粘性”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。

首先会尽量均衡的放置分区 到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。

4. 消费者offset的存储

由于Consumer在消费过程中可能会出现断电宕机等故障,Consumer恢复以后,需要从故障前的位置继续消费,所以Consumer需要实时记录自己消费到了那个offset,以便故障恢复后继续消费。

zookeeper节点存储数据详细信息

从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为:__consumer_offsets

__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key是group.id+topic+分区号,value 就是当前 offset 的值。每隔一段时间,kafka内部会对这个topic进行compact,也就是每个group.id+topic+分区号就保留最新数据。

具体工作中存在重复消费漏消费可能

重复消费:已经消费了数据,但是 offset 没提交。

eg.

1) Consumer 每5s提交offset

2)如果提交offset后的2s,consumer挂了

3)再次重启consumer,则从上一次提交的 offset处继续消费,导致重复消费。

漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。设置offset为手动提交,当offset被提交时,数据还在内存中未落盘,此时刚好消费者线程被kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。

如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset 过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质(比如 MySQL)。

三、Kafka Eagle监控

Kafka-Eagle 框架可以监控 Kafka 集群的整体运行情况。

  •  Eagle的数据看板

 

参考资料:

1.【尚硅谷】2022版Kafka3.x教程(从入门到调优,深入全面)_哔哩哔哩_bilibili

2.  看完这篇Kafka,你也许就会了Kafka_心的步伐的博客-CSDN博客

3.  Kafka分区副本分配规则

4.  Kafka分区副本重分配源码分析_罗纳尔光的博客-CSDN博客

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐