Kafka系列文章

大数据 - Kafka系列《一》- Kafka基本概念-CSDN博客

目录

2.1 分布式系统的三大特性:

2.2 Kafka的优先考虑的特性:

情况1:

情况2:

情况3:

2.2 数据一致性困难 

🌰问题1:分区副本间动态不一致

🌰问题2:消费者所见不一致

2.3一致性问题解决方案(HW-高水位线)

🌰解决“消费者所见不一致” (消费者只允许看到HW以下的message)

🌰解决“分区副本数据最终不一致” (follower数据按HW截断)

2.4 HW方案的天生缺陷

2.5 Leader-Epoch机制的引入

1. leader-epoch的含义

2. leader epoch 具体的工作机制

3.🌰leader epoch 如何解决HW的备份缺陷


2.1 分布式系统的三大特性:

C - Consistency:一致性

A - Availability:可用性

P - Partition tolerance:分区容错性

CAP是分布式系统的基础理论,这三者在分布式系统中一般不能同时很好的被满足,最多满足其中的两个特性。

一致性: 就是指分布式系统的各个节点时时刻刻要保持数据的一致性。客户端每次读操作,要么读到的是最新的数据,要么读取失败。

可用性:客户端一直可以正常访问并得到系统的正常响应。、

分区容错性:指的是分布式系统中的某个节点或者网络分区出现了故障的时候,整个系统仍然能对外提供满足一致性和可用性的服务。

2.2 Kafka的优先考虑的特性:

Kafka作为一个商业级消息中间件,分区容错性(数据可靠性)和可用性是优先考虑的重点,兼顾数据一致性。

  • 分区容错性(可靠性):副本机制

  • 可用性:leader挂了,follower会立刻顶上去

情况1:

(当三个分区没有副本时->满足一致性、可用性)

情况2:

(当三个分区有副本时->满足可用性,分区容错性)

 

情况3:

(当三个分区有副本时,并只能对接leader->一致性提升,可用性、分区容错性下降)

 

2.2 数据一致性困难 

kafka 从 0.8.0 版本开始引入了分区副本引入了数据冗余

用CAP理论来说,就是通过副本及副本leader动态选举机制提高了kafka的 分区容错性可用性

但从而也带来了数据一致性的巨大困难!

kafka让分区多副本同步的基本手段是: follower副本定期向leader请求数据同步!

既然是定期同步,则leader和follower之间必然存在各种数据不一致的情景!

🌰问题1:分区副本间动态不一致

 

🌰问题2:消费者所见不一致

如果此时leader宕机,follower1或follower2被选为新的leader,则leader换届前后,消费者所能读取到的数据发生了不一致;

 🌰问题3:分区副本间最终不一致

2.3一致性问题解决方案(HW-高水位线)

动态过程中的副本数据不一致,是很难解决的;

kafka先尝试着解决上述“消费者所见不一致”及“副本间数据最终不一致”的问题;

解决方案的核心思想

  • 在动态不一致的过程中,维护一条步进式的“临时一致线”(既所谓的High Watermark);高水位线

  • 高水位线HW = ISR副本中最小LEO(副本的最大消息偏移量+1);

  • 底层逻辑就是:offset<HW的message,是各副本间一致的且安全的!

🌰解决“消费者所见不一致” (消费者只允许看到HW以下的message)
🌰解决“分区副本数据最终不一致” (follower数据按HW截断)

2.4 HW方案的天生缺陷

如前所述,看似HW解决了“分区数据最终不一致”的问题,以及“消费者所见不一致”的问题,但其实,这里面存在一个巨大的隐患,导致:

  • “分区数据最终不一致”的问题依然存在

  • producer设置acks=all后,依然有可能丢失数据的问题

产生如上结果的根源是:HW高水位线的更新,与数据同步的进度,存在迟滞! 

 

第一次fetch请求,分leader端和follower端:

leader端:

  1. 读取底层log数据。

  2. 根据fetch带过来的offset=0的数据(就是follower的LEO,因为follower还没有写入数据,因此LEO=0),更新remote LEO为0。

  3. 一轮结束后尝试更新HW,做min(leader LEO,remote LEO)的计算,结果为0。

  4. 把读取到的三条log数据,加上leader HW=0,一起发给follower副本。

follower端:

  1. 写入数据到log文件,更新自己的LEO=3。

  2. 更新HW,做min(leader HW,follower LEO)的计算,由于leader HW=0,因此更新后HW=0。

可以看出,第一次fetch请求后,leader和follower都成功写入了三条消息,但是HW都依然是0,对消费者来说都是不可见的,还需要第二次fetch请求。

 

第二次fetch请求,分leader端和follower端:

leader端:

  1. 读取底层log数据。

  2. 根据fetch带过来的offset=3的数据(上一次请求写入了数据,因此LEO=3),更新remote LEO为3。

  3. 尝试更新HW,做min(leader LEO,remote LEO)的计算,结果为3。

  4. 把读取到的log数据(其实没有数据),加上leader HW=3,一起发给follower副本。

follower端:

  1. 写入数据到log文件,没有数据可以写,LEO依然是3。

  2. 更新HW,做min(leader HW,follower LEO)的计算,由于leader HW=3,因此更新后HW=3。

这个时候,才完成数据的写入,并且分区HW(分区HW指的就是leader副本的HW)更新为3,代表消费者可以消费offset=0,1,2的三条消息了,上面的过程就是kafka处理消息写入和备份的全流程。

从以上步骤可看出,leader 中保存的 remote LEO 值的更新(也即HW的更新)总是需要额外一轮 fetch RPC 请求才 能完成,这意味着在 leader 切换过程中,会存在数据丢失以及数据不一致的问题! 

 问题发生:在水位线同步之前,leader挂了

如上图所示:

  • 状态起始:最新消息c已同步,但是水位线还没开始同步

  • 在此时leader崩溃(即 follower 没能通过下一轮请求来更新 HW 值)

  • follower成为了leader,会自动将 LEO 值调整到之前的 HW 值,即会进行日志截断

  • 然后,原来的leader重启上线,会向新的leader发送请求请求,收到 fetch 响应后,拿到 HW 值,并更新本地 HW 值,发现我也要截取,悲剧发生了,数据丢了

如上图所示:

  • 状态起始:最新消息c已同步,但是水位线还没开始同步

  • 在此时leader崩溃(即 follower 没能通过下一轮请求来更新 HW 值)

  • follower成为了leader,会自动将 LEO 值调整到之前的 HW 值,即会进行日志截断

  • 在截断日志之后,也就是这个d被截断了之后,我又加了一条数据是e

  • 然后,原来的leader重启上线,会向新的leader发送请求请求,收到 fetch 响应后,拿到 HW 值,并更新本地 HW 值,发现我的数据和leader的数据一样,好的,我就不用截取了,我更新HW就好了,就这样,一个新的悲剧又发生了,数据不一致了

只要新一届leader在老leader重启上线前,接收了新的数据,就可能发生上图中的场景,根源也在于HW的更新落后于数据同步进度 

 

2.5 Leader-Epoch机制的引入

为了解决 HW 更新时机是异步延迟的,而 HW 又是决定日志是否备份成功的标志,从而造成数据丢失和数据不一致的现象,Kafka 引入了 leader epoch 机制;

在每个副本日志目录下都创建一个 leader-epoch-checkpoint 文件,用于保存 leader 的 epoch 信息;

1. leader-epoch的含义

如下,leader epoch 长这样:

它的格式为 (epoch offset),epoch指的是 leader 版本,它是一个单调递增的一个正整数值,每次 leader 变更,epoch 版本都会 +1,offset 是每一代 leader 写入的第一条消息的位移值,比如:

(0,0)

(1,300)

以上第2个版本是从位移300开始写入消息,意味着第一个版本写入了 0-299 的消息。

2. leader epoch 具体的工作机制
  • 当副本成为 leader 时:

这时,如果此时生产者有新消息发送过来,会首先更新leader epoch 以及LEO ,并添加到 leader-epoch-checkpoint 文件中;

  • 当副本变成 follower 时:

发送LeaderEpochRequest请求给leader副本,该请求包括了follower中最新的epoch 版本;

leader返回给follower的响应中包含了一个LastOffset,如果 follower last epoch = leader last epoch(纪元相同),则 LastOffset = leader LEO,否则取follower last epoch 中最小的 leader epoch 的 start offset 值;

举个例子:假设 follower last epoch = 1,此时 leader 有 (1, 20) (2, 80) (3, 120),则 LastOffset = 80;

follwer 拿到 LastOffset 之后,会对比当前 LEO 值是否大于 LastOffset,如果当前 LEO 大于 LastOffset,则从 LastOffset 截断日志;

follower 开始发送 fetch 请求给 leader 保持消息同步。

3.🌰leader epoch 如何解决HW的备份缺陷
  • 解决数据丢失和数据不一致的问题

 

 如上图所示:

follower当选leader后,收到纪元消息,发现 LastOffset等于当前 LEO 值,故不用进行日志截断。

follower重启后同步消息,发现自己也不用截取,数据一致,齐活儿

当然,如果说后来增加消息以后,也不需要截取,直接同步数据就行(当ack=-1)

 

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐