一、Topic、Partition、Broker关系

每一个topic都有多个Partition,每个partition内部是有序的,每个Partition负责存储这个Topic一部分的数据。

对于每一个topic, Kafka集群都会维持一个分区日志,如下所示:

图片来源: https://kafka.apachecn.org/intro.html

每个分区都是有序且顺序不可变的记录集,并且不断地追加到结构化的commit log文件。分区中的每一个记录都会分配一个id号来表示顺序,我们称之为offset,offset用来唯一的标识分区中每一条记录。

在kafka集群中,每个Partition都有多个副本,其中一个副本叫做leader,其他的副本叫做follower,副本数对Kafka的吞吐率是有一定的影响,但极大的增强了可用性。如下图。

 

假设一个Topic拆分为了3个Partition,分别是Partition0,Partiton1,Partition2,此时每个Partition都有2个副本 。

多个副本之间数据是如何同步的?其实任何一个Partition,只有Leader是对外提供读写服务的,也就是说,如果有一个客户端往一个Partition写入数据,此时一般就是写入这个Partition的Leader副本。然后Leader副本接收到数据之后,Follower副本会不停的给他发送请求尝试去拉取最新的数据,拉取到自己本地后,写入磁盘中。如下图所示:

 二、 保证数据不丢失

1、Producer端怎么保证数据不丢失? 

Producer端有个设置acks参数。然后这个参数实际上有三种常见的值可以设置,分别是:0、1 和 -1(ALL)。

 https://kafka.apachecn.org/documentation.html

第一种选择是把acks参数设置为0,如果设置为0,则 producer 不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成。在这种情况下,服务器是否收到请求是没法保证的,并且参数retries也不会生效(因为客户端无法获得失败信息)

如果你采用这种设置的话,那么你必须注意的一点是,可能你发送出去的消息还在半路。结果呢,Partition Leader所在Broker就直接挂了,然后结果你的客户端还认为消息发送成功了,此时就会导致这条消息就丢失了。

第二种选择是设置 acks = 1,意思就是说只要Partition Leader接收到消息而且写入本地磁盘了,就认为成功了,不管他其他的Follower有没有同步过去这条消息了。

这种设置其实是kafka默认的设置,大家请注意,划重点!这是默认的设置

也就是说,默认情况下,你要是不管acks这个参数,只要Partition Leader写成功就算成功。

但是这里有一个问题,万一Partition Leader刚刚接收到消息而且写入本地磁盘了,Follower还没来得及同步过去,此时Leader所在的broker宕机了,此时也会导致这条消息丢失,因为Producer端已经认为发送成功了。

最后一种情况,就是设置acks=-1,这个意思就是说,Partition Leader接收到消息之后,还必须要求ISR列表里跟Leader保持同步的那些Follower都要把消息同步过去,才能认为这条消息是写入成功了。

如果说Partition Leader刚接收到了消息,并且已处理,但是Follower没有来得及同步Leader数据,此时Leader所在的broke宕机了,那么Producer端会感知到这个消息没发送成功,他会重试再次发送消息过去,知道成功。

此时可能Partition 2的Follower变成Leader了,此时ISR列表里只有最新的这个Follower转变成的Leader了,那么只要这个新的Leader接收消息就算成功了。

acks=-1 就可以代表数据一定不会丢失了吗?

当然不是,如果你的Partition只有一个副本,也就是一个Leader,任何Follower都没有,你认为acks=-1有用吗?

当然没用了,因为ISR(Kafka 动态维护一个同步状态的备份的集合(a set of in-sync replicas)里就一个Leader,他接收完消息后宕机,也会导致数据丢失。 

所以说,这个acks=-1,必须跟ISR列表里至少有2个以上的副本配合使用,起码是有一个Leader和一个Follower才可以。

这样才能保证说写一条数据过去,一定是2个以上的副本都收到了才算是成功,此时任何一个副本宕机,不会导致数据丢失。

每个Leader会动态维护一个ISR列表,该列表里存储的是和Leader基本同步的Follower。如果有Follower由于网络、GC等原因而没有向Leader发起拉取数据请求,此时Follower相对于Leader是不同步的,则会被踢出ISR列表。所以说,ISR列表中的Follower都是跟得上Leader的副本
 

Kafka 不是用大多数投票选择 leader 。Kafka 动态维护了一个同步状态的备份的集合 (a set of in-sync replicas), 简称 ISR ,在这个集合中的节点都是和 leader 保持高度一致的,只有这个集合的成员才 有资格被选举为 leader,一条消息必须被这个集合 所有 节点读取并追加到日志中了,这条消息才能视为提交。这个 ISR 集合发生变化会在 ZooKeeper 持久化,正因为如此,这个集合中的任何一个节点都有资格被选为 leader 

2、Consumer端怎么保证数据不丢失? 

 关闭自动提交:enable.auto.commit:false。执行完手动提交,自己要保证幂等性,比如ConsumerA 得到了 PartitionA 的几条消息,进行了一定的处理,然后还未来得及向Broker 确认它消费完了这几条消息(未commit),它就挂了。Broker rebalance之后,把PartitionA 交给了ComsumerB订阅,那么 ConsumerB 也会得到  ConsumerA 处理了 但未提交 的 那几条消息。那这几条消息 就被 重复消费了。

enable.auto.commit的默认值是 true;就是默认采用自动提交的机制

If true the consumer's offset will be periodically committed in the background

Kafka版本[0.10.1.1],已默认将消费的 offset 迁入到了 Kafka 一个名为 __consumer_offsets 的Topic中。

 三、选举

1、 Broker Leader选举(Controller选举)

 在一个kafka集群中,有多个broker节点,但是它们之间需要选举出一个leader,其他的broker充当follower角色。集群中第一个启动的broker会通过在zookeeper中创建临时节点/controller来让自己成为控制器(Controller),其他broker启动时也会在zookeeper中创建临时节点,但是发现节点已经存在,所以它们会收到一个异常,意识到控制器已经存在,那么就会在zookeeper中创建监听/controller节点watch对象,便于它们收到控制器变更的通知。

那么如果broker Leader节点由于网络原因与zookeeper断开连接或者异常宕机退出,那么其他broker通过watch收到控制器变更的通知,就会去尝试创建临时节点/controller,如果有一个broker创建成功,那么其他broker就会收到创建异常通知,也就意味着集群中已经有了控制器,其他broker只需创建watch对象即可。

如果有一个broker加入集群中,那么控制器就会通过Broker ID去判断新加入的broker中是否含有现有分区的副本,如果有,就会从分区副本中去同步数据。

2、 副本Leader选举 

Kafka会在Zookeeper上针对每个partition维护一个称为ISR(in-sync replica,已同步的副本)的集合,该集合中是一些分区的副本。只有当这些副本都跟Leader中的副本同步了之后,kafka才会认为消息已提交,并反馈给消息的生产者。 所以当leader副本宕机后,会从ISR已同步副本中选取出一个作为leader。

如果所有的ISR副本都失败了怎么办?
此时有两种方法可选,
  1. 等待ISR集合中的副本复活,让ISR中第一个replica成为leader
  2. 选择任何一个立即可用的副本,而这个副本不一定是在ISR集合中。这可能会造成数据的丢失

  • 需要设置 unclean.leader.election.enable=true

四、Kafka中的HW、LEO、LSO等分别代表什么?

  1. Base Offset:是起始位移,该副本中第一条消息的offset,如下图,这里的起始位移是0,如果一个日志文件写满1G后(默认1G后会log rolling),这个起始位移就不是0开始了。
  2. HW(high watermark):副本的高水印值,replica中leader副本和follower副本都会有这个值,通过它可以得知副本中已提交或已备份消息的范围,leader副本中的HW,决定了消费者能消费的最新消息能到哪个offset。如下图所示,HW值为8,代表offset为[0,8]的9条消息都可以被消费到,它们是对消费者可见的,而[9,12]这4条消息由于未提交,对消费者是不可见的。注意HW最多达到LEO值时,这时可见范围不会包含HW值对应的那条消息了,如下图如果HW也是13,则消费的消息范围就是[0,12]。
  3. LEO(log end offset):日志末端位移,代表日志文件中下一条待写入消息的offset,这个offset上实际是没有消息的。不管是leader副本还是follower副本,都有这个值。当leader副本收到生产者的一条消息,LEO通常会自增1,而follower副本需要从leader副本fetch到数据后,才会增加它的LEO,最后leader副本会比较自己的LEO以及满足条件的follower副本上的LEO,选取两者中较小值作为新的HW,来更新自己的HW值。
  4. LSO(log start offset): 一般情况下,日志文件的起始偏移量 logStartOffset 等于第一个日志分段的 baseOffset,但这并不是绝对的,logStartOffset 的值可以通过 DeleteRecordsRequest 请求(比如使用 KafkaAdminClient 的 deleteRecords()方法、使用 kafka-delete-records.sh 脚本、日志的清理和截断等操作进行修改。

五、零拷贝

零拷贝并不是不需要拷贝,而是减少不必要的拷贝次数。通常是说在IO读写过程中。

实际上,零拷贝是有广义和狭义之分,目前我们通常听到的零拷贝,包括上面这个定义减少不必要的拷贝次数都是广义上的零拷贝。其实了解到这点就足够了。

我们知道,减少不必要的拷贝次数,就是为了提高效率。那零拷贝之前,是怎样的呢?

传统的文件读写

传统的文件读写或者网络传输,通常需要将数据从内核态转换为用户态。应用程序读取用户态内存数据,写入文件 / Socket之前,需要从用户态转换为内核态之后才可以写入文件或者网卡当中。

 这里先分析下虚拟内存和物理页号和物理磁盘之间的关系,访问到某个地址的时候,通过页表中的有效位,可以得知此数据是否在内存中,如果不在,这也就是缺页中断,然后将磁盘对应的数据拷贝到内存中对应的页号上.

虚拟内存的最小单位是页,通常是4KB大小

 MMAP

 mmap是用来建立从虚拟空间磁盘空间的映射的,可以将一个虚拟空间地址映射到一个磁盘文件上,然后就可以读或者写,最后将页内存上的文件数据刷回到磁盘,也就是解除虚拟空间和磁盘空间的映射,这也是一种读写磁盘文件的方法。使用这种方式可以省去了用户空间和内核空间复制的开销,对文件的读取操作跨过了页缓存,减少了数据的拷贝次数,用内存读写取代I/O读写,提高了文件读取效率

当进行mmap系统调用的时候,将文件的内容的全部或一部分直接映射到进程的地址空间(虚拟内存),映射完成后,进程可以像访问普通内存一样做其他的操作,mmap并不分配物理地址空间,它只是占有进程的虚拟地址空间

当进程访问内核中的缓冲区时候,并没有实际拷贝数据,这时MMU在地址映射表中是无法找到与ptr相对应的物理地址的,也就是MMU失败,就会触发缺页中断。内核将文件的这一页数据读入到内核高速缓冲区中,并更新用户进程的页表,使页表指向内核缓冲中的这一页,实现了用户空间和内核空间数据的直接交换,可以看待为内核空间和用户空间共享的一段物理内存。

Kafka快的原因:

1、partition顺序追加写,充分利用磁盘特性,这是基础;

2、Producer生产的数据持久化到broker,采用mmap文件映射+ write,实现顺序的快速写入;

3、Customer从broker读取数据,采用sendfile,将磁盘文件读到OS内核缓冲区后,直接转到socket buffer进行网络发送。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐