Kafka原理

2017年09月22日 22:39:45317人阅读  评论(0)  收藏  举报

  分类:

Kafka(1)  

目录(?)[+]

Kafka

这段时间研究RabbitMQ、Kafka、RocketMQ消息队列,发现对她们原理的介绍都过于简单,所以整理了众多资料,写成下面这边Kafka的原理。主要内容包括:

 

功能与使用场景:特性、适用场景、

基本概念:Broker、Topic、Partition、Partition、Producer、Consumer、Consumer Group

kafka模型:producer-broker-consumer模型,集群部署模型

Topic 、 Partition 、 消息文件

Producer:负载均衡、消息发送流程、Ack机制、消息要锁、平滑扩容机制、同步VS异步、消息发送机制

Consumer:模型、设计原理、Api

Replications  :容错性、副本、ISR、同步复制、异步复制、分配Replica的算法

Leader:leader、选主

性能相关:顺序写磁盘、数据拷贝、充分利用Page Cache、支持多Disk Drive、批处理、数据压缩降低网络负担、高效序列化方式、数据清理

 

 

一、  功能和适用场景

Kafka是一个基于分布式的消息push-subscribe系统,它被设计成快速、可扩展的、持久的。与其他消息发布-订阅系统类似,Kafka在主题当中保存消息的信息。生产者向主题写入数据,消费者从主题读取数据。

Kafka设计中将topic的每个分区当作一个具有顺序的日志。同处于一个分区中的消息都被设置了一个唯一的偏移量。Kafka只会保持跟踪未读消息,一旦消息被置为已读状态,Kafka就不会再去管理它了。

 

1.  Kafka是一个分布式数据流平台。

在系统或者应用之间建立实施的数据流管道,可靠的传输数据。建立实时的流式应用,用来传输数据流或者对数据流做出响应。kafka提供以下三个核心能力。

1)  可以发布、订阅数据流中的记录。这一点比较类似于消息队列。

2)  可以以容错的方式存储数据流。

3)  可以及时处理数据流

2. Kafka特性

1)  高吞吐量、低延迟

kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒。

2)  可扩展性

kafka集群支持热扩展。

3)  持久性、可靠性

消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。

4)  容错性

允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)。

5)  高并发

支持数千个客户端同时读写。

 

3.  适用场景

1)  MQ

对于一些常规的消息系统是个不错的选择;partitons/replication和容错,可以使kafka具有良好的扩展性和性能优势。kafka只能使用作为“常规”的消息系统,未确保消息的发送与接收绝对可靠(比如,消息重发,消息发送丢失等)。

2)  Websiteactivitytracking

kafka可以作为“网站活性跟踪”的最佳工具;可以将网页/用户操作等信息发送到kafka中。并实时监控,或者离线统计分析等。

3)  Metrics

Kafka通常被用于可操作的监控数据。这包括从分布式应用程序来的聚合统计用来生产集中的运营数据提要。

4)  LogAggregation

kafka的特性决定它非常适合作为“日志收集中心”;application可以将操作日志批量、异步的发送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/压缩消息等,这对producer端而言,几乎感觉不到性能的开支。此时consumer端可以使Hadoop等其他系统化的存储和分析系统

 

二、  术语

1.  Broker

Kafka集群包含一个或多个服务器,这种服务器被称为broker。

2.  Topic

每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上,但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处。

3.  Partition

Partition是物理上的概念,每个Topic包含一个或多个Partition。

4.  Producer

负责发布消息到Kafka broker。

5.  Consumer

消息消费者,向Kafka broker读取消息的客户端。

6.  ConsumerGroup

各个consumer可以组成一个Group,每个消息只能被Group中的一个consumer消费,如果一个消息可以被多个consumer消费的话,那么这些consumer必须在不同的组。可为每个Consumer指定group name,若不指定group name则属于默认的group。

 

三、  kafka模型

1.   producer-broker-consumer

结构图如下:

 

 

2.   集群部署模型

 

 

四、   Topic & Partition

1.  模型

一个partiton由segment list组成,消息总是被append到最新的一个日志文件尾部;读取消息的时候也是按照顺序读取的。通过删除过期的文件来清理历史数据。

2.  Topic &Partition

消息发送时需要指定topic,topic由是由一些Partition Logs(分区日志)组成。partition的名称规则为:topic名称+有序序号,第一个序号从0开始计。partition是实际物理上的概念,而topic是逻辑上的概念。其组织结构如下图所示:

以Partition的形式存放日志由两个好处:

l 方便在集群中扩展,一个topic可以由多个分区组层,每个分区可以放在多个borker上,可以做到适合任意大小的消息量。

l 可以提高并发,消费时是针对consumer消费的,通过增加partation、consumer可以提升性能。

3.  Partation

1)  Segment

partition还可以细分为segment。如果以partition为最小存储单位,当producer不断发送消息,必然会引起partition文件的无限扩张,这对于消息文件的维护以及已被消费消息的清理带来严重的影响,所以以segment为单位将partition细分。每个partition(目录)相当于一个巨型文件被平均分配到多个大小相等的segment(段)数据文件中(每个segment文件中消息数量不一定相等)。每个partition只需要支持顺序读写就行,segment的文件生命周期由服务端配置参数(log.segment.bytes,log.roll.{ms,hours}等若干参数)决定。

 

2)  消息消费

一个partition只能被一个消费者消费,一个消费者可以同时消费多个partition。如果partition数量小于consumer数量,就会有消费者无法消费消息。

推荐partition的数量要大于同时运行的consumer线程数量。需要注意的是,kafka需要为每个partition分配一些内存来缓存消息数据,partition数量越大,就要为kafka分配更大的heap space。

 

3)  消息清理

Kafka集群会保存所有的消息,不管消息有没有被消费。通过设定消息的过期时间,让过期的数据自动被清除掉,以释放磁盘空间。比如我们设置消息过期时间为2天,那么这2天内的所有消息都会被保存到集群中,数据只有超过了两天才会被清除。

启用清理策略(直接删除,删除后的消息不可恢复):

og.cleanup.policy=delete

清理超过指定时间清理:log.retention.hours=72

超过指定大小后,删除旧的消息:log.retention.bytes=1024*1024*1024

 

4)  分区容错

Kafka的数据是持久化的,并且是能够容错的。Kafka允许用户为每个topic设置副本数量,副本是以partation为单位的。例如:你的副本数量设置为3,一份数据会被存放在3台不同的broker上,就允许有2个机器失败。一般推荐副本数量至少为2,这样就可以保证增减、重启机器时不会影响到数据消费。如果对数据持久化有更高的要求,可以把副本数量设置为3或者更多。

 

4. Log

segment由索引文件(“.index”文件)和数据文件(“.log”文件)组成。文件的命令规则为:partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充,如下:

00000000000000000000.index

00000000000000000000.log

00000000000000170410.index

00000000000000170410.log

00000000000000239430.index

00000000000000239430.log

以上面的segment文件为例,展示出segment:00000000000000170410的“.index”文件和“.log”文件的对应的关系,如下图:

 

“.index”索引文件存储大量的元数据,“.log”数据文件存储大量的消息,索引文件中的元数据指向对应数据文件中message的物理偏移地址。其中以“.index”索引文件中的元数据[3, 348]为例,在“.log”数据文件表示第3个消息,即在全局partition中表示170410+3=170413个消息,该消息的物理偏移地址为348。

如何从partition中通过offset查找message呢?以上图为例,读取offset=170418的消息,首先查找segment文件,其中00000000000000000000.index为最开始的文件,第二个文件为00000000000000170410.index(起始偏移为170410+1=170411),而第三个文件为00000000000000239430.index(起始偏移为239430+1=239431),所以这个offset=170418就落到了第二个文件之中。其他后续文件可以依次类推,以其实偏移量命名并排列这些文件,然后根据二分查找法就可以快速定位到具体文件位置。其次根据00000000000000170410.index文件中的[8,1325]定位到00000000000000170410.log文件中的1325的位置进行读取。

要是读取offset=170418的消息,从00000000000000170410.log文件中的1325的位置进行读取,那么怎么知道何时读完本条消息,否则就读到下一条消息的内容了?这个就需要联系到消息的物理结构了,消息都具有固定的物理结构,包括:offset(8 Bytes)、消息体的大小(4 Bytes)、crc32(4Bytes)、magic(1 Byte)、attributes(1 Byte)、keylength(4 Bytes)、key(K Bytes)、payload(N Bytes)等等字段,可以确定一条消息的大小,即读取到哪里截止。

 

五、   Producer

1.  负载均衡

producer将会和Topic下所有partition leader保持socket连接;消息由producer直接通过socket发送到broker,中间不会经过任何"路由层"。事实上,消息被路由到哪个partition上,由producer客户端决定。比如可以采用、random、key-hash、轮询等。

 

2.  流程图

 

3.  ack机制

发送消息时指定request.required.acks。

acks=0,producer不等待来自broker同步完成的确认就继续发送下一条(批)消息。提供最低的延迟但最弱的耐久性保证,因为其没有任何确认机制。acks值为0会得到最大的系统吞吐量。

acks=1,producer在leader已成功收到的数据并得到确认后发送下一条消息。等待leader的确认后就返回,而不管partion的follower是否已经完成。

acks=-1,producer在所有follower副本确认接收到数据后才算一次发送完成。此选项提供最好的数据可靠性,只要有一个同步副本保持存活,kafka保证信息将不会丢失。

 

 

4.  压缩

Producer端可以通过GZIP或Snappy格式对消息集合进行压缩。Producer端进行压缩之后,在Consumer端需进行解压。压缩的好处就是减少传输的数据量,减轻对网络传输的压力,在对大数据处理上,瓶颈往往体现在网络上而不是CPU(压缩和解压会耗掉部分CPU资源)。Kafka在消息头部添加了一个描述压缩属性字节,这个字节的后两位表示消息的压缩采用的编码,如果后两位为0,则表示消息未被压缩。

 

 

5.  平滑扩容机制

·      感知broker变化

Producer是从broker获取元数据的,没有直接和zookeeper进行通信。

在创建Produer对象时,需要指定broker地址(通过metadata.broker.list指定,部分broker的地址即可)。

客户会定时(topic.metadata.refresh.interval.ms设置)从metadata.broker.list中获取元数据(Broker的topic、partition、replicas等)。metadata.broker.list列表中的机器只要有一台正常服务,producer就能获取元数据。topic.metadata.refresh.interval.ms=-1,只有在发送失败时,才会重新刷新matedata。topic.metadata.refresh.interval.ms=0,每次发送完后更新元数据。

获取元数据后,producer可以写数据到非metadata.broker.list列表中的broker。

当发现有新的borker时,客户端会创建一个新的SyncProduer对象连接到该broker,并将该对象放到ProducerPool中,等待发送消息,这样Producer端就支持了Broker动态的增减。

 

·      数据分发

有key的分发分发逻辑:对key求hash,然后对partition数量求模Utils.abs(key.hashCode)%numPartitions。

没有key时的分发逻辑:每隔topic.metadata.refresh.interval.ms随机选择一个partition。这个时间窗口内的所有记录发送到这个partition;发送数据出错后再重新选择一个partition。

 

·      发送失败的错误处理:

producer的send函数默认没有返回值。出错处理有EventHandler实现。DefaultEventHandler的错误处理如下:

获取出错的数据。等待一个间隔时间,由配置retry.backoff.ms决定这段时间长短。重新获取元数据。重新发送数据。

重试次数由message.send.max.retries决定。所有重试全部失败时,DefaultEventHandler会抛出异常。

 

 

6.  同步VS异步

1.  机制

通过配置producer.type的值来确定是异步还是同步,默认为同步。async/sync默认是sync。

异步提供了批量发送的功能。当满足以下其中一个条件的时候就触发发送:batch.num.messages异步发送是每次批量发送的条目。queue.buffering.max.ms异步发送的时发送时间间隔,单位是毫秒。

Producer的这种在内存缓存消息,当累计达到阀值时批量发送请求,小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率。但是如果在达到阀值前,producer不可用了,缓存的数据将会丢失。

异步发送消息的实现很简单,客户端消息发送过来以后,先放入到一个队列中然后就返回了。Producer再开启一个线程(ProducerSendThread)不断从队列中取出消息,然后调用同步发送消息的接口将消息发送给Broker。

 

2.  同步发送模型

 

 

3.  异步发送模型

 

Kafka中Producer异步发送消息是基于同步发送消息的接口来实现的,异步发送消息的实现很简单,客户端消息发送过来以后,先放入到一个BlockingQueue队列中然后就返回了。Producer再开启一个线程(ProducerSendThread)不断从队列中取出消息,然后调用同步发送消息的接口将消息发送给Broker。

 

 

7.  消息发送机制

Producer在发布消息到某个Partition时,通过ZooKeeper找到该Partition的Leader,Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log,每个Follower都从Leaderpull数据,Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入其Log后,向Leader发送ACK。一旦Leader收到了ISR中的所有Replica的ACK,该消息就被认为已经commit了,Leader将增加HW并且向Producer发送ACK。

为了提高性能,每个Follower在接收到数据后就立马向Leader发送ACK,而非等到数据写入Log中。因此,对于已经commit的消息,Kafka只能保证它被存于多个Replica的内存中,而不能保证它们被持久化到磁盘中,也就不能完全保证异常发生后该条消息一定能被Consumer消费。但考虑到这种场景非常少见,可以认为这种方式在性能和数据持久化上做了一个比较好的平衡。

 

 

 

 

六、  Consumer

1.  模型

 

 

2.  设计原理

1.  Consumer Group

在kafka中,producers将消息推送给broker端,consumer在和broker建立连接之后,主动去pull(或者说fetch)消息。这种模式有些优点,首先consumer端可以根据自己的消费能力适时的去fetch消息并处理,且可以控制消息消费的进度(offset);此外,消费者可以控制每次消费的数,批量消费。

kafka只支持Topic,消息消费以Consumer Group为单位,每个Consumer Group中可以有多个consumer,每个consumer是一个线程。发送到Topic的消息,只会被订阅此Topic的每个group中的一个consumer消费。如果所有的consumer都具有相同的group,这种情况和queue模式很像;消息将会在consumers之间负载均衡。如果所有的consumer都具有不同的group,那这就是"发布-订阅",消息将会广播给所有的消费者。

在kafka中,一个partition中的消息只会被group中的一个consumer消费;每个group中consumer消息消费互相独立;我们可以认为一个group是一个"订阅"者,一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,一个consumer可以消费多个partitions中的消息。

2.  消费

在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是相当轻量级的。当消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并间歇性的向zookeeper注册offset。由此可见,consumer客户端也很轻量级。consumer端也可以重置offset来重新消费消息。

kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺序的。从Topic角度来说,消息仍不是有序的。

kafka的设计原理决定,对于一个topic,同一个group中partitions数目大于consumer数目时,某些consumer将无法得到消息。

 

3.  Api

Kafka提供了两套consumer api,分为high-levelapi和sample-api。

1)  Sample-api

Sample-api是一个底层的API,它维持了一个和单一broker的连接,并且这个API是完全无状态的,每次请求都需要指定offset值,这套API也是最灵活的。

2)  High-levelAPI

High-levelAPI封装了对集群中一系列broker的访问,可以透明的消费一个topic。它自己维持了已消费消息的状态,即每次消费的都是下一个消息。

High-levelAPI还支持以组的形式消费topic,如果consumers有同一个组名,那么kafka就相当于一个队列消息服务,而各个consumer均衡的消费相应partition中的数据。若consumers有不同的组名,那么此时kafka就相当与一个广播服务,会把topic中的所有消息广播到每个consumer。

 

 

七、  Replications

1.  容错性

备份机制是Kafka0.8版本的新特性,备份机制的出现大大提高了Kafka集群的可靠性、稳定性。有了备份机制后,Kafka允许集群中的节点挂掉后而不影响整个集群工作。一个备份数量为n的集群允许n-1个节点失败。在所有备份节点中,有一个节点作为lead节点,这个节点保存了其它备份节点列表,并维持各个备份间的状态同步。

默认为同步复制。

2.  副本

Kafka中topic的每个partition有一个预写式的日志文件

 

LEO(LogEnd Offset)表示每个partition的log最后一条Message的位置。

HW俗称高水位(High Watermark)是指consumer能够看到的此partition的位置。另外每个replica都有HW,leader和follower各自负责更新自己的HW的状态。对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW,此时消息才能被consumer消费。

下图详细的说明了当producer生产消息至broker后,ISR以及HW和LEO的流转过程:

 

 

为了提高消息的可靠性,Kafka每个topic的partition有N个replicas(副本),由offsets.topic.replication.factor指定。Kafka通过多副本机制实现故障自动转移,当Kafka集群中一个broker失效情况下仍然保证服务可用。

在Kafka中复制时,确保partition的日志能有序地写到其他节点上,N个replicas中,其中一个replica为leader,其他都为follower, leader处理partition的所有读写请求,follower会被动定期地去复制leader上的数据。

Broker数目为4,topic有3个分区的示意图如下:

 

3.  ISR

1)  ISR

ISR (In-SyncReplicas),指副本同步队列,只有ISR里的成员才有被选为leader的可能。每个Partition都会由Leader维护这样一个ISR列表,该列表中包含了所有与Leader同步的Replica。

副本数影响Kafka的吞吐率,同时也增强了数据的可靠性。默认情况下Kafka的replica数量为1,即每个partition都有一个唯一的leader,为了确保消息的可靠性,通常应用中都需要指定大于1的副本个数。

2)  ISR、OSR

ISR是所有副本中的一个子集。follower从leader同步数据有一些延迟,包括延迟时间(replica.lag.time.max.ms)和延迟条数(replica.lag.max.messages)两个维度, 0.10.x版本中只支持replica.lag.time.max.ms这个维度,任意一个超过阈值,Leader都会把follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。副本总数=ISR+OSR。

当producer发起瞬时高峰流量,producer一次发送的消息超过replica.lag.max.messages条时,此时follower都会被认为是与leader副本不同步了,从而被踢出了ISR。但实际上这些follower都是存活状态的且没有性能问题。那么在之后追上leader,并被重新加入了ISR。于是就会出现它们不断地剔出ISR然后重新回归ISR。这个参数是broker全局的。设置太大了,影响真正“落后”follower的移除;设置的太小了,导致follower的频繁进出。无法给定一个合适的replica.lag.max.messages的值,所以新版本的Kafka移除了这个参数。

 

3)  复制

Kafka既不是完全的同步复制,也不是完全的异步复制,而是基于ISR的动态复制方案。每次数据写入时,只有ISR中的所有Replica都复制完,Leader才会将其置为Commit,它才能被Consumer所消费。

这种方案,与同步复制非常接近。但不同的是,这个ISR是由Leader动态维护的。每次改变ISR后,Leader都会将最新的ISR持久化到Zookeeper中。由于Leader可移除不能及时与之同步的Follower,故与同步复制相比可避免最慢的Follower拖慢整体速度,也即ISR提高了系统可用性。

从Consumer的角度而言,ISR中的所有Replica都始终处于同步状态,从而与异步复制方案相比提高了数据一致性。

ISR可动态调整,极限情况下,可以只包含Leader,极大提高了可容忍的宕机的Follower的数量。与Majority Quorum方案相比,容忍相同个数的节点失败,所要求的总节点数少了近一半。

Kafka使用ISR很好的均衡了确保数据不丢失以及吞吐率。

 

4)  ISR相关配置说明

Broker的min.insync.replicas参数指定了Broker所要求的ISR最小长度为1,也即极限情况下ISR可以只包含Leader,但此时如果Leader宕机,则该Partition不可用,可用性得不到保证。

Producer可以通过acks参数指定最少需要多少个Replica确认收到该消息才视为该消息发送成功。acks的默认值是1,即Leader收到该消息后立即告诉Producer收到该消息,此时如果在ISR中的消息复制完该消息前Leader宕机,那该条消息会丢失。而如果将该值设置为0,则Producer发送完数据后,立即认为该数据发送成功,不作任何等待,而实际上该数据可能发送失败,并且Producer的Retry机制将不生效。更推荐的做法是,将acks设置为all或者-1,此时只有ISR中的所有Replica都收到该数据(也即该消息被Commit),Leader才会告诉Producer该消息发送成功,从而保证不会有未知的数据丢失。

 

 

4.  同步复制

同步复制要求所有能工作的follower都复制完,这条消息才会被commit,这种复制方式影响吞吐率。消息发送到Leader中后,所有的follower开始拉取消息;通过使用一个单独的通道,保证了消息的顺序。每个follow副本在将消息写入日志后会向leader副本发送反馈信息。复制完成后,lead副本向producer发送反馈信息。

 

5.  异步复制

异步复制方式下,数据只要被leader写入log就被认为已经commit,follower异步的从leader复制数据。如果follower都还没有复制完,落后于leader时,突然leader宕机,则会丢失数据。

 

6.  分配Replica的算法

假设将所有Broker(假设共n个Broker)和待分配的Partition。将第i个Partition分配到第i%n个Broker上。将第i个Partition的第j个Replica分配到第(i+j) %n个Broker上

 

 

 

八、  leader

1.  leader

leader处理所有的read-write请求,follower需要和leader保持同步。leader负责跟踪所有的follower状态,如果follower落后太多或者失效,leader将会把它从replicas同步列表中删除。

如果ISR中的follower故障了,leader会在配置的时间超时后将其从ISR列表中删掉。当故障的follower恢复了时,它首先将日志截断到最后的checkpoint(最后提交消息的offset),然后从checkpoint开始,从leader获取所有消息。当follower完全同步了leader时,leader将其加回到ISR列表中。

 

等待响应的场景中,在将信息写入本地日志时或向producer发送反馈信息前,如果leader故障了,producer会将消息发送给新的leader。

新leader的选择跟所有的follower ISR注册它们自己到ZooKeeper的顺序有关。最先注册的将会成为新的leader,它的log end offset(LEO)成为最后提交消息的offset(也被称为high watermark(HW))。剩下的follower称为新的候选leader。每个副本向ZooKeeper注册一个监听器,在leader变化时会收到通知。当新的leader被选出,被通知的副本不再是leader时,它将日志截断到最后提交消息的offset,并开始赶上新的leader。新leader在配置的时间超时或者所有活跃的副本都同步完成了时,将当前ISR写入ZooKeeper,并开启自己的消息读取和写入。

 

2.  选主

1)  基本原则

如果leader不在了,新的leader必须拥有原来的leader commit的所有消息。这就需要做一个折中,如果leader在一个消息被commit前等待更多的follower确认,那么在它挂掉之后就有更多的follower可以成为新的leader,但这也会造成吞吐率的下降。

选择follower时需要兼顾一个问题,就是新leader server上所已经承载的partition leader的个数,如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.在选举新leader,需要考虑到"负载均衡",partition leader较少的broker将会更有可能成为新的leader。

 

2)  Majority vote(少数服从多数选举算法)

这种模式下,如果我们有2f+1个副本,那么在commit之前必须保证有f+1个replica复制完消息,同时为了保证能正确选举出新的leader,失败的副本数不能超过f个。

Majority vote优势:系统的延迟取决于最快的几台机器,也就是说比如副本数为3,那么延迟就取决于最快的那个follower而不是最慢的那个。

Majority vote劣势:为了保证leader选举的正常进行,它所能容忍的失败的follower数比较少,如果要容忍1个follower挂掉,那么至少要3个以上的副本,如果要容忍2个follower挂掉,必须要有5个以上的副本。也就是说,在生产环境下为了保证较高的容错率,必须要有大量的副本,而大量的副本又会在大数据量下导致性能的急剧下降。

这种算法更多用在Zookeeper这种共享集群配置的系统中,而很少在需要大量数据的系统中使用的原因。HDFS的HA功能也是基于Majorityvote的方式,但是其数据存储并不是采用这样的方式。

实际上,leader选举的算法非常多,比如Zookeeper的Zab、Raft以及Viewstamped Replication。而Kafka所使用的leader选举算法更像是微软的PacificA算法。

 

3)  kafka选举Leader

Kafka在Zookeeper中为每一个partition动态的维护了一个ISR ,leader负责维护和跟踪ISR中所有follower滞后的状态。当producer发送一条消息到broker后,leader写入消息,并在所有follower复制消息以后ack producer。消息复制延迟受最慢的follower限制,重要的是快速检测慢副本,如果follower“落后”太多或者失效,leader将会把它从ISR中删除。用这种方式来使ISR里的所有replica都跟上了leader,只有ISR里的成员才能有被选为leader的可能。

如果当前ISR中有至少一个Replica还幸存,则选择其中一个作为新Leader,否则选择该Partition中任意一个幸存的Replica作为新的Leader以及ISR(该场景下可能会有潜在的数据丢失)。如果该Partition的所有Replica都宕机了,则将新的Leader设置为-1。

在ISR中至少有一个replica时,Kafka可以确保已经commit的数据不丢失,但如果某一个partition的所有replica都挂了,就无法保证数据不丢失了。这种情况下有两种可行的方案:

1.       等待ISR中任意一个replica“活”过来,并且选它作为leader

2.       选择第一个“活”过来的replica(并不一定是在ISR中)作为leader

这就需要在可用性和一致性当中作出抉择。如果一定要等待ISR中的replica“活”过来,那不可用的时间就可能会相对较长。而且如果ISR中所有的replica都无法“活”过来了,或者数据丢失了,这个partition将永远不可用。选择第一个“活”过来的replica作为leader,而这个replica不是ISR中的replica,那即使它并不保障已经包含了所有已commit的消息,它也会成为leader而作为consumer的数据源。默认情况下,Kafka采用第二种策略,即unclean.leader.election.enable=true,也可以将此参数设置为false来启用第一种策略。

在这种模式下,对于f+1个副本,一个Kafka topic能保证在不丢失已经commit消息的前提下,容忍f个副本的失败。为了容忍f个副本的失败,Majorityvote的方式和ISR在commit前需要等待的副本的数量是一样的,但是ISR需要的总的副本的个数几乎是Majority vote的方式的一半。

 

4)  ISR管理

Kafka的ISR的管理最终都会反馈到Zookeeper节点上。具体位置为:/brokers/topics/[topic]/partitions/[partition]/state。目前有两个地方会对这个Zookeeper的节点进行维护:

Controller来维护:Kafka集群中的其中一个Broker会被选举为Controller,主要负责Partition管理和副本状态管理,也会执行类似于重分配partition之类的管理任务。在符合某些特定条件下,Controller下的Leader Selector会选举新的leader,ISR和新的leader_epoch及controller_epoch写入Zookeeper的相关节点中。同时发起LeaderAndIsrRequest通知所有的replicas。

leader有单独的线程定期检测ISR中follower是否脱离ISR,如果发现ISR变化,则会将新的ISR的信息返回到Zookeeper的相关节点中。

 

 

5)  Controller对Broker failure的处理过程

Controller在Zookeeper的/brokers/ids节点上注册Watch。一旦有Broker宕机(本文用宕机代表任何让Kafka认为其Broker die的情景,包括但不限于机器断电,网络不可用,GC导致的Stop The World,进程crash等),其在Zookeeper对应的Znode会自动被删除,Zookeeper会触发Controller注册的Watch,Controller即可获取最新的幸存的Broker列表。

Controller设置Partition的过程:

a)  从/brokers/topics/[topic]/partitions/[partition]/state读取该Partition当前的ISR。

b)  决定该Partition的新Leader。如果当前ISR中有至少一个Replica还幸存,则选择其中一个作为新Leader,新的ISR则包含当前ISR中所有幸存的Replica。否则选择该Partition中任意一个幸存的Replica作为新的Leader以及ISR(该场景下可能会有潜在的数据丢失)。如果该Partition的所有Replica都宕机了,则将新的Leader设置为-1。

c)  将新的Leader、ISR和新的leader_epoch及controller_epoch写入/brokers/topics/[topic]/partitions/[partition]/state, 同时发起LeaderAndIsrRequest通知所有的replicas。注意,该操作只有Controller版本在a至c的过程中无变化时才会执行,否则跳转到a。

 

 

 

3.  其他

如果ISR中的副本数少于min.insync.replicas配置的数量时,客户端会返回异常:org.apache.kafka.common.errors.NotEnoughReplicasExceptoin:Messages are rejected since there are fewer in-sync replicas than required。

producer.type=sync,request.required.acks=-1,replication.factor>=2, min.insync.replicas>=2的情况下,不会丢失数据。

 

 

九、  消息传送机制

1)  消息原语

at most once:最多一次,发送一次,无论成败,将不会重发。消费者fetch消息,然后保存offset,然后处理消息;当client保存offset之后,但是在消息处理过程中出现了异常,导致部分消息未能继续处理。那么此后"未处理"的消息将不能被fetch到,这就是at most once。

at least once:消息至少发送一次,如果消息未能接受成功,可能会重发,直到接收成功。消费者fetch消息,然后处理消息,然后保存offset。如果消息处理成功之后,但是在保存offset阶段zookeeper异常导致保存操作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是at least once,原因offset没有及时的提交给zookeeper,zookeeper恢复正常还是之前offset状态。

exactly once:消息只会发送一次。kafka中并没有严格的去实现,我们认为这种策略在kafka中是没有必要的。

通常情况下at-least-once是kafka的首选。(相比at most once而言,重复接收数据总比丢失数据要好)。

 

 

十、  性能优化

1. 顺序写磁盘

磁盘驱动器线性写的速度远远大于随机写。(甚至网上有些文章提到:顺序写磁盘快于随机写内存。)比如:在一个67200rpm SATARAID-5的磁盘阵列上线性写的速度大概是600M/秒,但是随机写的速度只有100K/秒,两者相差将近6000倍。

线性读写在大多数应用场景下是可以预测的,因此,操作系统利用read-ahead和write-behind技术来从大的数据块中预取数据,或者将多个逻辑上的写操作组合成一个大写物理写操作中。(磁盘写入速度的数据来源于网络,个人没有验证过。)

 

2. 数据拷贝

1)  一般将数据从文件传到socket

通过系统调用将文件数据读入到内核态Buffer(DMA拷贝);应用程序将内存态Buffer数据读入到用户态Buffer(CPU拷贝);接着用户程序通过Socket发送数据时将用户态Buffer数据拷贝到内核态Buffer(CPU拷贝);最后通过DMA拷贝将数据拷贝到NIC Buffer。与此同时,还伴随着四次上下文切换。

 

 

 

 

 

 

2)  sendfile

kafka是通过sendfile来完成数据拷贝的。数据通过DMA拷贝到内核态Buffer后,直接通过DMA拷贝到NICBuffer,无需CPU拷贝。

 

因为整个读文件-网络发送由一个sendfile调用完成,整个过程只有两次上下文切换,因此大大提高了性能。

 

 

3.  充分利用Page Cache

1)  使用Page Cache的优点

I/O Scheduler会将连续的小块写组装成大块的物理写从而提高性能

I/O Scheduler会尝试将一些写操作重新按顺序排好,从而减少磁盘头的移动时间

充分利用所有空闲内存(非JVM内存)。如果使用应用层Cache(即JVM堆内存),会增加GC负担

读操作可直接在Page Cache内进行。如果消费和生产速度相当,甚至不需要通过物理磁盘(直接通过Page Cache)交换数据。

进程重启时,不会影响Page Cache中的数据。

 

2)  使用Page Cache问题

Broker收到数据后,写磁盘时只是将数据写入Page Cache,并不保证数据一定完全写入磁盘。从这一点看,可能会机器宕机时,PageCache内的数据未写入磁盘从而造成数据丢失。但是这种丢失只发生在机器断电等造成操作系统不工作的场景。如果为了保证这种情况下数据不丢失而每次强制将Page Cache中的数据Flush到磁盘,会降低性能。

Kafka提供了flush.messages和flush.ms两个参数将Page Cache中的数据强制Flush到磁盘,但Kafka并不建议使用。

4.  支持多Disk Drive

Broker的log.dirs配置项,允许配置多个文件夹。如果机器上有多个Disk Drive,可将不同的Disk挂载到不同的目录,然后将这些目录都配置到log.dirs里。Kafka会尽可能将不同的Partition分配到不同的目录,也即不同的Disk上,从而充分利用了多Disk的优势。

 

5.  批处理

批处理是一种常用的用于提高I/O性能的方式。对Kafka而言,批处理既减少了网络传输的Overhead,又提高了写磁盘的效率。

Kafka 0.8.2开始新的Producer API,虽然从send接口来看,一次只能发送一个ProducerRecord,但是send方法并非立即将消息发送出去,而是通过batch.size和linger.ms控制实际发送频率,从而实现批量发送。

由于每次网络传输,除了传输消息本身以外,还要传输非常多的网络协议本身的一些内容(称为Overhead),所以将多条消息合并到一起传输,可有效减少网络传输的Overhead,进而提高了传输效率。

 

6.  数据压缩降低网络负载

Kafka从0.7开始,支持将数据压缩后再传输给Broker,Broker端直接将消息以压缩后的形式持久化到磁盘。Consumer Fetch到数据后再解压缩。Kafka的压缩不仅减少了Producer到Broker的网络传输负载,同时也降低了Broker磁盘操作的负载,也降低了Consumer与Broker间的网络传输量,从而极大得提高了传输效率,提高了吞吐量。

 

7.  高效的序列化方式

Kafka消息的Key和Payload(或者说Value)的类型可自定义,只需同时提供相应的序列化器和反序列化器即可。因此用户可以通过使用快速且紧凑的序列化-反序列化方式(如Avro,ProtocalBuffer)来减少实际网络传输和磁盘存储的数据规模,从而提高吞吐率。这里要注意,如果使用的序列化方法太慢,即使压缩比非常高,最终的效率也不一定高。

 

8.  数据清理

kafka是定时清理的,通过删除整个文件的方式去删除Partition的旧数据,也避免了对文件的随机写操作。可以通过log.cleanup.policy,log.retention.hours或log.retention.bytes配置。

 

 

 

十一、   参考资料

1.  博客

1)  kafka数据可靠性深度解读

https://mp.weixin.qq.com/s?__biz=MzA5NzkxMzg1Nw==&mid=2653162320&idx=1&sn=4f76117e40a43f64621503901604fa74&key=238113c46368d35bf72e6a9deea8bda2aee2e3dc04b4952960a09930a97eeb20c96e3387cab799fe86b2764fcce4e4724d458ddb5610ae3462ef3b5ccf2b9c7c41ae742c5cf572c6c078dc0cd72fb88c&ascene=0&uin=OTc5Njc2MTQw&devicetype=iMac+MacBookPro13%2C2+OSX+OSX+10.12.5+build(16F73)&version=12020810&nettype=WIFI&fontScale=100&pass_ticket=a6skFIfeaCdNPXml2KGIPB9F46GmXU1DeUMez5zEXZe4i8v28JoHCxN86bG0sbAT

 

2)  Kafka设计解析(三)- Kafka High Availability(下)

http://www.jasongj.com/2015/06/08/KafkaColumn3/

 

3)  Kafka设计解析(六)- Kafka高性能关键技术解析

http://www.infoq.com/cn/articles/kafka-analysis-part-6

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐