本章我们讨论有关Kafka集群的容错性与高可用性话题

在Kafka中,复制的单元是分区,每一个主题中都有一个或者多个分区,每个分区都有一个领导者以及若干个追随者。当创建主题之后,需要指定分区及复制因子,常见的复制因子通常是3,即一个领导者,两个追随者。

在这里插入图片描述

在Kafka集群中,所有的读写操作都会路由到领导者,追随者只会定期从领导者请求获取最新消息,消费者并不会从追随者分区中获取消息,追随者存在的意义只在于数据冗余和故障转移。

分区故障转移

当某个代理节点出现宕机或者下线,位于当前上的领导者分区也会丢失,那么位于其余节点上的追随者将会被提升为领导者。但实际提升操作则依赖于以下前提:是否追随者已与领导者同步,如果没有,是否允许故障转移到未同步副本上。这里,我们简化问题:

在这里插入图片描述

Broker 3宕机,Partition 2在Broker 2上选取了新的领导者
在这里插入图片描述

Broker 1宕机,Partition 1丢失了领导者,故障转移到了Broker 2上。
在这里插入图片描述

Broker 1重新上线之后,创建了四个冗余分区追随者,此刻,所有的分区领导者均集中在Broker 2上。
在这里插入图片描述

Broker 3重新上线之后,同样的为每个分区创建了冗余分区追随者,对于每一个分区来说,都有了三个副本。但分区领导者还是集中在Broker 2上。
在这里插入图片描述

较之RabbitMQ,Kafka则提供了良好的再平衡领导者工具,Kafka引入了首选复本领导者的概念,当Kafka创建主题分区时,它会尽可能平均分布每个分区领导者,并且将各个节点上的第一个分区领导者标记为首选领导者。随着时间的推移,比如服务器重启、宕机或者网络中断等原因,这些首选领导者都变成了首选副本,就像上面说的这种极端场景,为了解决这个问题,Kafka提供了两种解决方案:

  • 主题配置auto.leader.rebalance.enable=true,允许控制节点重新分配领导关系给首选副本领导者以达到均匀分布的目的。

  • 管理员通过* kafka-preferred-replica-election.sh*脚本人工处理

在这里插入图片描述

In-Sync Replicas (ISR):同步副本集

ISR:所有与领导者同步的副本追随者集合。ISR总是包含着领导者及若干追随者。如果一个追随者在replica.lag.time.max.ms周期内与领导者保持一致,那么就可以任务是同步的(in-sync)或者最新的,同步或者最新意味着当前副本与领导者消息内容完全一致。

如果追随者满足以下条件,则将会被从ISR中删除:

默认情况,追随者会每隔replica.fetch.wait.max.ms(默认时间500ms)时间去发起主动拉取请求。为了更清晰的解释ISR的作用目的,我们重新回顾下生产者确认逻辑以及一些故障转移场景,

  • 无需确认,Acks=0

  • 领导者需持久化当前消息,Acks=1

  • 领导者和所有副本都需持久化当前消息,Acks=All

下面我们看下生产者ACKS配置如何影响ISR概念的。

Acks=1 and the ISR

Acks=1的前提下,领导者将不会等待追随者持久化消息,当领导者发生故障转移时,就可能导致数据丢失,下图中,Broker 3上分区落后领导者8s,存在7456条的消息延迟。Broker 1上的分区落后领导者1s,有123条消息延迟。因为配置acks=1,所以生产者会即刻收到消息确认。

在这里插入图片描述

Broker 2宕机,生产者连接错误,分区领导者故障转移至Broker 1,并丢失了123条消息,Broker 1上的追随者属于ISR,但却没有与领导者完全同步。

在这里插入图片描述

生产者可以在bootstrap.servers配置多个Broker ,并且向新的分区领导者发送请求。
在这里插入图片描述

此时,Broker 3上的分区追随者已落后太多(Broker节点间网络较差、或者自身存储出错等原因导致),因此它将被从ISR中除去。此刻,ISR中只剩下了分区领导者。
在这里插入图片描述

Broker 1宕机之后,故障转移到Broker 3,此时,Broker 3上的分区追随者已丢失了15286条消息。同时生产者连接失败。只有当unclean.leader.election.enable=true时,才允许故障转移到ISR之外的分区追随者上。否则,Kafka集群将不执行故障转移操作,所有的读写将被拒绝。这时,我们需要将Broker 1重新上线,以恢复分区领导关系。

在这里插入图片描述

Broker 1重新与Broker3建立关系,但此时,Broker 3已变成分区领导者
在这里插入图片描述

我们看到,除了在生产者建立新的连接和发现新的领导者时的短暂中断之外,在整个场景中都在发送消息,这种场景牺牲了数据的安全性以获取可用性。

Acks=all and the ISR

让我们重现上面的场景,但区别在于acks=all,生产者发送消息但无法立即收到消息确认,分区领导者必须等待所有的副本(Broker 3上4秒延迟,4056条消息待同步,Broker 1上34条消息待同步)执行完消息持久化。

在这里插入图片描述

4s延迟之后,所有的副本同步完成,消息确认发出。

在这里插入图片描述

Broker 3落后太多,从ISR中删除,此时延迟已大大降低,因此ISR中没有落后副本。Broker 2只需等待Broker 1获取到消息,而Broker 1只有500ms的落后延迟。

在这里插入图片描述

Broker 2宕机下线,分区数据无损的故障转移至Broker 1上。
在这里插入图片描述

生产者发现了新的分区领导者,并且开始发送消息,此时,消息延迟更进一步缩减,因为此时ISR中只有单个副本,尽管配置了acks=all,但如果ISR中只有一个副本,其并没有增加冗余。

在这里插入图片描述

Broker 1宕机之后,故障转移至Broker 3,此时,出现了14238条的消息丢失。
在这里插入图片描述

我们可以设置unclean.leader.election.enabletrue,默认情况下为false,使用unclean.leader.election.enable=trueacks=all确保了数据安全性,但仍然存在消息丢失的场景。可能有人会说,可以设置unclean.leader.election.enable = false,但这不是避免数据丢失的有效方式,更好的方式是确保数据冗余或者拒绝写入。

Acks=all, min.insync.replicas and the ISR

有了* min.insync.replicas配置选项,我们就可以将数据安全性提高一个档次。让我们看下最后的一个场景,但我们增加一个配置 min.insync.replicas=2.*

Broker 2拥有分区领导者,Broker 3从ISR中删除。
在这里插入图片描述

Broker 2宕机下线之后,故障无损转移到Broker 1上,此时,ISR只包含一个副本,小于我们配置的最小副本数,因此生产者会收到NotEnoughReplicas报错,拒绝写入。

在这里插入图片描述

这种配置侧重于消息的一致性而非可用性。

ISR的意义

ISR的意义在于平衡数据安全性与数据延迟之间的关系,它允许大多数复制失败,并且仍然提供可用性,同时最小化下线复制或落后副本在数据延迟方面的影响。我们可以通过调整replica.lag.time.max.ms配置来满足我们的需求,其表示当acks=all时,我们可以接受的最大可接受数据延迟(默认10s)。

对于RabbitMQ来说,其只需要一些镜像集合需要同步,慢的镜像可能会导致较长的延迟,而ISR则是一种巧妙解决高延迟问题的方式,使用ISR的风险在于消除了冗余,因为ISR允许只有领导者存在,当然,你可以通过配置min.insync.replicas避免ISR只有领导者的情况的发生。

客户端连接保证

客户端(生产者、消费者)可以通过bootstrap.servers配置连接多个Broker 节点,即便一个节点宕掉,客户端仍有其他备选节点可连。bootstrap.servers不必是领导者分区所在节点,相反的,bootstrap.servers只是作为客户端连接的桥头堡,客户端可以通过他们来获取领导者分区所在服务节点。

对于 RabbitMQ来说,客户端可以连接至任意节点, RabbitMQ集群的内部路由规则可以确保客户端访问到正确的节点。这意味着你可以通过负载均衡来访问RabbitMQ。Kafka则要求客户端必须连接到领导者分区所在的服务节点上,因此,对于Kafka来说,负载均衡并不适用。bootstrap.servers确保了客户端可以正确的访问服务节点,并且在故障发生时可以寻找到新的节点。

Kafka的共识架构

到目前为止,我们尚未讨论到Kafka集群是如何知晓集群中的节点何时失败的,或者选举是如何产生的,要想了解Kafka如何处理网络分区中断的问题,我们首先要了解Kafka的共识架构。

每一个Kafka节点都是伴随着一个ZooKeeper集群进行部署的,Zookeeper是一种分布式共识服务,允许分布式系统在给定状态下达成共识。它是分布式的,并且侧重的是分布式的一致性特性而非可用性。如何要实现正确的读和写,集群中的大部分节点必须可用。

ZooKeeper负责存储Kafka集群的状态

  • 主题列表,分区,配置,当前领导者副本,首选副本

  • 集群成员,每一个节点都会向Zookeeper发送心跳,当一定周期内Zookeeper收不到心跳时,Zookeeper就认为当前节点失败或者不可用

  • 选取控制节点及控制转移节点

控制节点也是Kafka集群节点之一,只不过其负责选取副本领导者,Zookeeper会将集群中的成员信息和主题变更信息发送给控制节点,控制节点需根据变更情况做出相应应对措施。

当一个新的主题被创建完成后,主题包含10分区及3个复制因子,那么控制节点必须为每个分区选择一个领导者,并且有选择的将这些领导者分布在不同的节点上。

对于每一个分区来说,它需要做以下事情:

  • 将ISR和领导者信息更新到ZooKeeper中

  • 发送LeaderAndISRCommand命令给包含当前分组副本的节点,告知节点的ISR和领导者信息

当包含副本领导者的节点宕机下线时,ZooKeeper会将对应信息发送给控制节点,控制节点则会选取新的领导者,之后控制节点会将领导者信息更新到ZooKeeper中去,并且通知其他节点新的领导变更信息。

每个领导者都负责维护ISR信息,通过replica.lag.time.max.ms来确定ISR集合中的成员信息,当ISR集合发生变更时,领导者同样也需要将变更信息更新到Zookeeper中去。

Zookeeper总是保存着变更的最新信息,以便当故障发生时可以实现新的领导者平稳过渡。
在这里插入图片描述

复制协议

了解复制细节可以更好的了解潜在的数据丢失场景

Fetch requests, Log End Offset (LEO) and the Highwater Mark (HW)

我们之前有说到,Kafka集群中的分区追随者会定期从领导者获取请求以实现消息同步,默认间隔为500ms,与此相反,RabbitMQ的队列间的同步请求则是由主队列发起而非镜像队列,主队列会主动将消息推送给镜像队列。

分区领导者及对应的追随者会各自存储LEO以及HW,其中LEO是每一个副本自身所有拥有的最新消息偏移量,而HW表示最后提交的偏移量,注意,如果一个消息需要执行提交,前提是必须已经持久化到了ISR中的每一个副本中,这也就是说,LEO可能会比HW略微靠前。

当领导者收到消息之后,它首先执行消息持久化。追随者发送消息拉取请求,同时包含自身HW信息,领导者会从LEO开始发送一批信息给请求一方,同时发送当前的HW信息,当领导者直到所有的追随者都已完成给定偏移量内的消息存储,它就会将HW往前移。只有领导者可以执行HW迁移操作,它才会在消息拉取响应中告知所有的追随者HW当前值。这也就是说,追随者不仅在消息上落后于领导者,在HW值上也同样落后于领导者一侧。

注意,这里说的“消息持久化”是指将消息保存到内存中而非磁盘中,基于性能的考虑,Kafka会定时异步存储消息到磁盘中,RabbitMQ也是定期将消息保存到磁盘中,两者区别在于,RabbitMQ只有在主队列和所有的镜像队列都完成了消息存盘操作才会发送消息确认给生产者,而Kafka则基于性能考虑的因素,一旦消息保存到内存中,就会发送确认信息。其实在这一点上,Kafka做了一种冒险策略,策略相信在很短的时间内数据冗余是可以弥补未能正确磁盘化内存中已确认的消息风险。

领导者故障转移

当领导者出现失败掉线时,ZooKeeper会通知控制节点,然后控制节点会重新选取出新的领导者,新的领导者会将新的HW标识为当前的LEO,同时ZooKeeper也会将新的领导者信息通知到其他追随者,基于不同的Kafka版本,每个追随者会执行以下动作:

  • 截断本地日志至HW位置,然后从HW偏移量处向新的领导者发起消息同步请求

  • 在选举领导者的同时,向领导者发起请求,用以获知当前HW的值,之后从HW偏移量位置开始定期开始消息同步请求。

在新的领导者选举之后,追随者需要截断本地日志,原因如下:

  • 当领导者发生故障转移时,ISR中的第一个追随者会向ZooKeeper注册赢取成为领导者。对于ISR中的每个追随者来说,他们的状态可能有两种,已经与之前的领导者保持了同步状态,或者落后于之前领导者正在努力同步。但当故障转移发生时,落后的追随者可能成为新的领导者,而Kafka要确保集群副本之间没有多个版本,为了避免多版本的发生,每一个追随者必须在新领导选举时,截断至新领导者的HW偏移量处,这也就是必须设置asks=all来确保集群数据一致性的原因。

  • 消息是周期性写入磁盘的,如果集群中的节点同时失败,各个副本写到磁盘中的消息偏移量各不相同,当节点重新上线之后,新的领导者选举之后,很有可能出现领导者落后于追随者记录。

重新加入集群

与kafka领导者故障转移处理方式一致,当副本重新加入到集群中时,会首先查找确认领导者,然后在领导者选举时截断本地日志至HW,RabbitMQ也采取同样的方式来处理新节点和重新加入的旧节点,两者均被视作新节点对待,即便包含了任何状态信息,也会采取抛弃处理的方式,如果采取了自动同步的策略,那么主队列会采取stop the world的方式来同步信息到新的镜像队列中去,在此期间,主队列不接受任何消息读写操作,这种处理策略对于大容量队列的可能是不小的问题。

Kafka是一种分布式日志,通常来讲,会比消息队列存储更多的消息。对于RabbitMQ来说,消息读取之后即刻被从队列中删除,所以活动队列的理想状态应该是相对小。但是Kafka是一种日志,其依据数据保留策略来保存消息,对于分布式日志来说,stop the world的方式完全不可行,相反的,如果追随者领先于领导者,那么Kafka追随者只是简单的截取日志至领导者的HW位置,而对于落后于领导者的追随者,其采取从当前LEO位置重新请求的方式实现与领导者的同步。

新副本或者重新加入副本都会首先落在ISR之外,并不参与消息提交。他们会尽可能快的实现与领导者的同步,之后才会被加入到ISR集合中来。

网络分区(网络中断)

较之RabbitMQ来说,Kafka拥有更多活动部件,因此当集群发生网络分区时,也会带来更复杂的表现行为,但Kafka本身就是作为集群而诞生的,因此应对网络分区的异常也是相当成熟有效的。

接下来我们讨论Kafka几种不同的网络分区场景。

场景一:追随者无法访问领导者,但可以访问ZooKeeper

在这里插入图片描述

Broker 3与Broker 1,2出现隔离,但仍旧可以正常访问ZooKeeper,replica.lag.time.max.ms 时间之后,Broker 3会从ISR中删除,当网络隔离消失后,Broker 3重新开始消息同步请求,并重新加入到ISR中,此刻ZooKeeper仍旧可以收到Broker 3的心跳,Kafka没有RabbitMQ的那种split brain或者暂停节点的问题,只是牺牲了数据冗余性。
在这里插入图片描述

场景二:领导者无法访问任何追随者,但可以正常访问Zookeeper

在这里插入图片描述

与场景一类似,ISR缩减至只有领导者,但没有split-brain问题,只是缺失数据冗余,Zookeeper可以正常接收心跳,Broker仍可认作是活动的。

在这里插入图片描述

场景三:追随者可以访问领导者,但无法访问Zookeeper

在这里插入图片描述

隔离节点仍旧可以与领导者保持同步,并且保持在ISR内,但ZooKeeper认为其已经下线,但因其只是一个追随者角色,所以原则上没有影响

场景四:领导者可以访问追随者,但无法访问Zookeeper

在这里插入图片描述

领导者无法访问Zookeeper,但可以访问追随者。

在这里插入图片描述

因领导者无法访问Zookeeper,过一段时间之后,Zookeeper会将节点标记为下线并通知控制节点,控制节点会从剩余的追随者中选取新的领导者,然后对于之前的领导者来说,它仍旧认为自己仍旧是领导者,并且继续执行acks=1的写入请求,追随者将不再执行从旧的领导者同步消息请求,之前的领导者将其当作下线节点,并试图将ISR集合缩减至自己本身,但因为之前的领导者无法访问Zookeeper,所以缩减ISR操作无法成功执行,并且开始拒绝写入操作。

acks=all消息也无法接收到消息确认,因为在最初的时候,ISR包含所有的副本,但副本将不再确认消息收到,之前的领导者试图ISR集合缩减至自己本身,但因为之前的领导者无法访问Zookeeper,所以缩减ISR操作无法成功执行,并且开始拒绝写入操作。

客户端将很快检测到领导关系的变化,并且开始向新的领导者发送写入请求,一旦网络分区隔离消除,之前的领导者会发现自己已不再是领导者,将执行本地日志截取操作,然后实现与新领导者的消息同步。任何写入到之前领导者并且没有同步到新领导者的消息将丢失。

在这里插入图片描述

场景五:追随者既不能访问Zookeeper也无法访问集群其他节点

在这里插入图片描述

场景六:领导者无法访问ZooKeeper且无法访问集群其他节点

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

The original leader becomes a follower after the network partition is resolved.

场景七:控制节点无法访问集群其他节点

一般来讲,控制节点无法访问某个节点会导致控制节点无法将领导者变更信息通知到该节点,可能会导致短暂的split-brain发生,更为常见的是在故障转移时,该节点是无法作为领导者候选节点。

场景八:控制节点无法访问Zookeeper

控制节点无法发送心跳信息给Zookeeper,Zookeeper会选取新的控制节点,一旦网络隔离消除,之前控制节点将变成普通集群节点

场景总结:

影响追随者的网络中断不会导致消息丢失只是牺牲了数据冗余,而影响领导者无法访问Zookeeper的则会导致acks=1的消息丢失,无法访问Zookeeper可以引起短时间的split-brainsmin.insync.replicas配置大于等于2可以避免类似场景六中的短时间网络中断引起的消息丢失。

消息丢失总结

Kafka中可能引起消息丢失的场景总结如下

  • 领导者发生故障转移的前提下发送acks=1的消息

  • 故障转移至未ISR外的追随者(即便设置了acks=all)

  • 领导者无法访问Zookeeper,但发送acks=1的消息

  • 完全孤立的领导者,ISR只剩领导者自身,在min.insync.replicas=1的配置下收到了消息,即便acks=all

  • 一个分区的所有节点同步失败,因为Kafka采取的是内存确认的方式,某些节点的消息可能尚未持久化到磁盘,当节点重新上线时,消息可能丢失。

RabbitMQ vs Kafka 持久性及高可用性总结

两者均提供了主-从复制的持久性及高可用性解决方案,但,RabbitMQ却有有一个致命弱点。重新连接集群节点会导致他们丢弃原有数据,并且消息同步是阻塞操作,这两个缺点使得RabbitMQ在大容量队列的持久性运用中非常成问题。要么牺牲冗余,要么牺牲可用性。减少冗余增加了大量数据丢失的风险。但是,如果队列可以保持很小,那么为确保冗余而导致的短期不可用(几秒钟)就可以通过连接重试实现。

Kafka并没有这样的问题,Kafka旨在领导者和追随者出现分歧的地方丢弃数据,并且复制操作非阻塞,复制的同时领导者可以继续接收消息读写,与此同时,追随者则努力与领导者同步,节点的加入对集群的影响相对微不足道。当然,Kafka并非十全十美,比如复制中的带宽问题就令人头疼,在多个追随者同时复制的场景下,带宽很容易成为瓶颈。

在集群节点同时故障时,在持久性问题上,RabbitMQ就具备了一些优势,这是因为RabbitMQ只有在主队列和所有镜像队列都完成了消息磁盘化操作才会发送确认消息。但这种操作会额外增加消息延迟:

  • fsyncs操作每几百毫秒执行一次

  • 当镜像下线时,需要花费一定的时钟时间来确认具体哪个镜像掉线,无形中增加了额外的延迟时间。

在消息持久化问题上,Kafka采取了一种冒险策略,策略认为只要消息保存到了多个节点的内存中,那么消息就是安全的,这种策略使得当所有节点同时失败时消息丢失成为必然。总的来说,Kafka已被证明可以用以处理大容量消息,并且Kafka就是为大容量消息处理而生,你可以根据需要将Kafka集群的持久性提升到11。在复制因子为5,min.insync.replicas=3的配置下,消息丢失几乎不可能。

如果实际消息队列并不是很大,那么RabbitMQ集群是一个不错的选择,如果速率理想,小队列也可以达到很快的处理效果。因为,在面对大容量队列时,你必须作一个抉择,是选择持久性还是选择可用性。而在消息容量不大的业务场景中,RabbitMQ的灵活性优点远盖过其自身集群设计的不足,因此,在这种业务场景中,RabbitMQ可谓是独领风骚。

(完)

原文链接
https://jack-vanlightly.com/blog/2018/9/2/rabbitmq-vs-kafka-part-6-fault-tolerance-and-high-availability-with-kafka

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐