Kafka剖析(一):Kafka背景及架构介绍


Kafka是由LinkedIn开发的一个分布式的消息系统,使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark都支持与Kafka集成。InfoQ一直在紧密关注Kafka的应用以及发展,“Kafka剖析”专栏将会从架构设计、实现、应用场景、性能等方面深度解析Kafka。

背景介绍

Kafka创建背景

Kafka是一个消息系统,原本开发自LinkedIn,用作LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。现在它已被多家不同类型的公司 作为多种类型的数据管道和消息系统使用。

活动流数据是几乎所有站点在对其网站使用情况做报表时都要用到的数据中最常规的部分。活动数据包括页面访问量(Page View)、被查看内容方面的信息以及搜索情况等内容。这种数据通常的处理方式是先把各种活动以日志的形式写入某种文件,然后周期性地对这些文件进行统计分析。运营数据指的是服务器的性能数据(CPU、IO使用率、请求时间、服务日志等等数据)。运营数据的统计方法种类繁多。


Kafka简介

近年来,活动和运营数据处理已经成为了网站软件产品特性中一个至关重要的组成部分,这就需要一套稍微更加复杂的基础设施对其提供支持。

Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:

  • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。
  • 支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输。
  • 同时支持离线数据处理和实时数据处理。
  • Scale out:支持在线水平扩展。

为何使用消息系统

  • 解耦

    在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

  • 冗余

    有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

  • 扩展性

    因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。

  • 灵活性 & 峰值处理能力

    在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

  • 可恢复性

    系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

  • 顺序保证

    在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性。

  • 缓冲

    在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽可能的快速。该缓冲有助于控制和优化数据流经过系统的速度。

  • 异步通信

    很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

常用Message Queue对比

  • RabbitMQ

    RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。

  • Redis

    Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。

  • ZeroMQ

    ZeroMQ号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZeroMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。ZeroMQ具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演这个服务器角色。你只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后你就可以愉快的在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,也就是说如果宕机,数据将会丢失。其中,Twitter的Storm 0.9.0以前的版本中默认使用ZeroMQ作为数据流的传输(Storm从0.9版本开始同时支持ZeroMQ和Netty作为传输模块)。

  • ActiveMQ

    ActiveMQ是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。

  • Kafka/Jafka

    Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制统一了在线和离线的消息处理。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

Kafka架构

Terminology

  • Broker

    Kafka集群包含一个或多个服务器,这种服务器被称为broker

  • Topic

    每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

  • Partition

    Parition是物理上的概念,每个Topic包含一个或多个Partition.

  • Producer

    负责发布消息到Kafka broker

  • Consumer

    消息消费者,向Kafka broker读取消息的客户端。

  • Consumer Group

    每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

Kafka拓扑结构

如上图所示,一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。

Topic & Partition

Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。若创建topic1和topic2两个topic,且分别有13个和19个分区,则整个集群上会相应会生成共32个文件夹(本文所用集群共8个节点,此处topic1和topic2 replication-factor均为1),如下图所示。

每个日志文件都是一个log entrie序列,每个log entrie包含一个4字节整型数值(值为N+5),1个字节的"magic value",4个字节的CRC校验码,其后跟N个字节的消息体。每条消息都有一个当前Partition下唯一的64字节的offset,它指明了这条消息的起始位置。磁盘上存储的消息格式如下:

message length : 4 bytes (value: 1+4+n)
"magic" value : 1 byte 
crc : 4 bytes 
payload : n bytes 

这个log entries并非由一个文件构成,而是分成多个segment,每个segment以该segment第一条消息的offset命名并以“.kafka”为后缀。另外会有一个索引文件,它标明了每个segment下包含的log entry的offset范围,如下图所示。

因为每条消息都被append到该Partition中,属于顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)

对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka提供两种策略删除旧数据。一是基于时间,二是基于Partition文件大小。例如可以通过配置$KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据,也可在Partition文件超过1GB时删除旧数据,配置如下所示。

  
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according to the retention policies
log.retention.check.interval.ms=300000
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false

这里要注意,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高Kafka性能无关。选择怎样的删除策略只与磁盘以及具体的需求有关。另外,Kafka会为每一个Consumer Group保留一些metadata信息——当前消费的消息的position,也即offset。这个offset由Consumer控制。正常情况下Consumer会在消费完一条消息后递增该offset。当然,Consumer也可将offset设成一个较小的值,重新消费一些消息。因为offet由Consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些消费过,也不需要通过broker去保证同一个Consumer Group只有一个Consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。

Producer消息路由

Producer发送消息到broker时,会根据Paritition机制选择将其存储到哪一个Partition。如果Partition机制设置合理,所有消息可以均匀分布到不同的Partition里,这样就实现了负载均衡。如果一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不同的消息可以并行写入不同broker的不同Partition里,极大的提高了吞吐率。可以在$KAFKA_HOME/config/server.properties中通过配置项num.partitions来指定新建Topic的默认Partition数量,也可在创建Topic时通过参数指定,同时也可以在Topic创建之后通过Kafka提供的工具修改。

在发送一条消息时,可以指定这条消息的key,Producer根据这个key和Partition机制来判断应该将这条消息发送到哪个Parition。Paritition机制可以通过指定Producer的paritition. class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。本例中如果key可以被解析为整数则将对应的整数与Partition总数取余,该消息会被发送到该数对应的Partition。(每个Parition都会有个序号,序号从0开始)

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class JasonPartitioner<T> implements Partitioner {

    public JasonPartitioner(VerifiableProperties verifiableProperties) {}

    @Override
    public int partition(Object key, int numPartitions) {
        try {
            int partitionNum = Integer.parseInt((String) key);
            return Math.abs(Integer.parseInt((String) key) % numPartitions);
        } catch (Exception e) {
            return Math.abs(key.hashCode() % numPartitions);
        }
    }
}

如果将上例中的类作为partition.class,并通过如下代码发送20条消息(key分别为0,1,2,3)至topic3(包含4个Partition)。

public void sendMessage() throws InterruptedException{
  for(int i = 1; i <= 5; i++){
        List messageList = new ArrayList<KeyedMessage<String, String>>();
        for(int j = 0; j < 4; j++){
            messageList.add(new KeyedMessage<String, String>("topic2", j+"", "The " + i + " message for key " + j));
        }
        producer.send(messageList);
    }
  producer.close();
}

则key相同的消息会被发送并存储到同一个partition里,而且key的序号正好和Partition序号相同。(Partition序号从0开始,本例中的key也从0开始)。下图所示是通过Java程序调用Consumer后打印出的消息列表。

Consumer Group

(本节所有描述都是基于Consumer hight level API而非low level API)。

使用Consumer high level API时,同一Topic的一条消息只能被同一个Consumer Group内的一个Consumer消费,但多个Consumer Group可同时消费这一消息。

这是Kafka用来实现一个Topic消息的广播(发给所有的Consumer)和单播(发给某一个Consumer)的手段。一个Topic可以对应多个Consumer Group。如果需要实现广播,只要每个Consumer有一个独立的Group就可以了。要实现单播只要所有的Consumer在同一个Group里。用Consumer Group还可以将Consumer进行自由的分组而不需要多次发送消息到不同的Topic。

实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的Consumer属于不同的Consumer Group即可。下图是Kafka在Linkedin的一种简化部署示意图。

下面这个例子更清晰地展示了Kafka Consumer Group的特性。首先创建一个Topic (名为topic1,包含3个Partition),然后创建一个属于group1的Consumer实例,并创建三个属于group2的Consumer实例,最后通过Producer向topic1发送key分别为1,2,3的消息。结果发现属于group1的Consumer收到了所有的这三条消息,同时group2中的3个Consumer分别收到了key为1,2,3的消息。如下图所示。

Push vs. Pull

作为一个消息系统,Kafka遵循了传统的方式,选择由Producer向broker push消息并由Consumer从broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用push模式。事实上,push模式和pull模式各有优劣。

push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成Consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据Consumer的消费能力以适当的速率消费消息。

对于Kafka而言,pull模式更合适。pull模式可简化broker的设计,Consumer可自主控制消费消息的速率,同时Consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

Kafka delivery guarantee

有这么几种可能的delivery guarantee:

  • At most once 消息可能会丢,但绝不会重复传输
  • At least one 消息绝不会丢,但可能会重复传输
  • Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的。

    当Producer向broker发送消息时,一旦这条消息被commit,因数replication的存在,它就不会丢。但是如果Producer发送数据给broker后,遇到网络问题而造成通信中断,那Producer就无法判断该条消息是否已经commit。虽然Kafka无法确定网络故障期间发生了什么,但是Producer可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了Exactly once。截止到目前(Kafka 0.8.2版本,2015-03-04),这一Feature还并未实现,有希望在Kafka未来的版本中实现。(所以目前默认情况下一条消息从Producer到broker是确保了At least once,可通过设置Producer异步发送实现At most once)。

    接下来讨论的是消息从broker到Consumer的delivery guarantee语义。(仅针对Kafka consumer high level API)。Consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中保存该Consumer在该Partition中读取的消息的offset。该Consumer下一次再读该Partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。当然可以将Consumer设置为autocommit,即Consumer一旦读到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka是确保了Exactly once。但实际使用中应用程序并非在Consumer读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic。

  • 读完消息先commit再处理消息。这种模式下,如果Consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于At most once

  • 读完消息先处理再commit。这种模式下,如果在处理完消息之后commit之前Consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。这就对应于At least once。在很多使用场景下,消息都有一个主键,所以消息的处理往往具有幂等性,即多次处理这一条消息跟只处理一次是等效的,那就可以认为是Exactly once。(笔者认为这种说法比较牵强,毕竟它不是Kafka本身提供的机制,主键本身也并不能完全保证操作的幂等性。而且实际上我们说delivery guarantee 语义是讨论被处理多少次,而非处理结果怎样,因为处理方式多种多样,我们不应该把处理过程的特性——如是否幂等性,当成Kafka本身的Feature)

  • 如果一定要做到Exactly once,就需要协调offset和实际操作的输出。精典的做法是引入两阶段提交。如果能让offset和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,Consumer拿到数据后可能把数据放到HDFS,如果把最新的offset和数据本身一起写到HDFS,那就可以保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现Exactly once。(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中)

总之,Kafka默认保证At least once,并且允许通过设置Producer异步提交来实现At most once。而Exactly once要求与外部存储系统协作,幸运的是Kafka提供的offset可以非常直接非常容易得使用这种方式。


Kafka High Availability (上)

Kafka在0.8以前的版本中,并不提供High Availablity机制,一旦一个或多个Broker宕机,则宕机期间其上所有Partition都无法继续提供服务。若该Broker永远不能再恢复,亦或磁盘故障,则其上数据将丢失。而Kafka的设计目标之一即是提供数据持久化,同时对于分布式系统来说,尤其当集群规模上升到一定程度后,一台或者多台机器宕机的可能性大大提高,对Failover要求非常高。因此,Kafka从0.8开始提供High Availability机制。本文从Data Replication和Leader Election两方面介绍了Kafka的HA机制。

Kafka为何需要High Available

为何需要Replication

在Kafka在0.8以前的版本中,是没有Replication的,一旦某一个Broker宕机,则其上所有的Partition数据都不可被消费,这与Kafka数据持久性及Delivery Guarantee的设计目标相悖。同时Producer都不能再将数据存于这些Partition中。

  • 如果Producer使用同步模式则Producer会在尝试重新发送message.send.max.retries(默认值为3)次后抛出Exception,用户可以选择停止发送后续数据也可选择继续选择发送。而前者会造成数据的阻塞,后者会造成本应发往该Broker的数据的丢失。
  • 如果Producer使用异步模式,则Producer会尝试重新发送message.send.max.retries(默认值为3)次后记录该异常并继续发送后续数据,这会造成数据丢失并且用户只能通过日志发现该问题。同时,Kafka的Producer并未对异步模式提供callback接口。

由此可见,在没有Replication的情况下,一旦某机器宕机或者某个Broker停止工作则会造成整个系统的可用性降低。随着集群规模的增加,整个集群中出现该类异常的几率大大增加,因此对于生产系统而言Replication机制的引入非常重要。

注意:本文所述Leader Election主要指Replica之间的Leader Election。为何需要Leader Election

引入Replication之后,同一个Partition可能会有多个Replica,而这时需要在这些Replication之间选出一个Leader,Producer和Consumer只与这个Leader交互,其它Replica作为Follower从Leader中复制数据。

因为需要保证同一个Partition的多个Replica之间的数据一致性(其中一个宕机后其它Replica必须要能继续服务并且即不能造成数据重复也不能造成数据丢失)。如果没有一个Leader,所有Replica都可同时读/写数据,那就需要保证多个Replica之间互相(N×N条通路)同步数据,数据的一致性和有序性非常难保证,大大增加了Replication实现的复杂性,同时也增加了出现异常的几率。而引入Leader后,只有Leader负责数据读写,Follower只向Leader顺序Fetch数据(N条通路),系统更加简单且高效。

Kafka HA设计解析

如何将所有Replica均匀分布到整个集群

为了更好的做负载均衡,Kafka尽量将所有的Partition均匀分配到整个集群上。一个典型的部署方式是一个Topic的Partition数量大于Broker的数量。同时为了提高Kafka的容错能力,也需要将同一个Partition的Replica尽量分散到不同的机器。实际上,如果所有的Replica都在同一个Broker上,那一旦该Broker宕机,该Partition的所有Replica都无法工作,也就达不到HA的效果。同时,如果某个Broker宕机了,需要保证它上面的负载可以被均匀的分配到其它幸存的所有Broker上。

Kafka分配Replica的算法如下:

  1. 将所有Broker(假设共n个Broker)和待分配的Partition排序
  2. 将第i个Partition分配到第(i mod n)个Broker上
  3. 将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上

Data Replication

Kafka的Data Replication需要解决如下问题:

  • 怎样Propagate消息
  • 在向Producer发送ACK前需要保证有多少个Replica已经收到该消息
  • 怎样处理某个Replica不工作的情况
  • 怎样处理Failed Replica恢复回来的情况
Propagate消息

Producer在发布消息到某个Partition时,先通过ZooKeeper找到该Partition的Leader,然后无论该Topic的Replication Factor为多少(也即该Partition有多少个Replica),Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每个Follower都从Leader pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入其Log后,向Leader发送ACK。一旦Leader收到了ISR中的所有Replica的ACK,该消息就被认为已经commit了,Leader将增加HW并且向Producer发送ACK。

为了提高性能,每个Follower在接收到数据后就立马向Leader发送ACK,而非等到数据写入Log中。因此,对于已经commit的消息,Kafka只能保证它被存于多个Replica的内存中,而不能保证它们被持久化到磁盘中,也就不能完全保证异常发生后该条消息一定能被Consumer消费。但考虑到这种场景非常少见,可以认为这种方式在性能和数据持久化上做了一个比较好的平衡。在将来的版本中,Kafka会考虑提供更高的持久性。

Consumer读消息也是从Leader读取,只有被commit过的消息(offset低于HW的消息)才会暴露给Consumer。

Kafka Replication的数据流如下图所示:

ACK前需要保证有多少个备份

和大部分分布式系统一样,Kafka处理失败需要明确定义一个Broker是否“活着”。对于Kafka而言,Kafka存活包含两个条件,一是它必须维护与ZooKeeper的session(这个通过ZooKeeper的Heartbeat机制来实现)。二是Follower必须能够及时将Leader的消息复制过来,不能“落后太多”。

Leader会跟踪与其保持同步的Replica列表,该列表称为ISR(即in-sync Replica)。如果一个Follower宕机,或者落后太多,Leader将把它从ISR中移除。这里所描述的“落后太多”指Follower复制的消息落后于Leader后的条数超过预定值(该值可在$KAFKA_HOME/config/server.properties中通过replica.lag.max.messages配置,其默认值是4000)或者Follower超过一定时间(该值可在$KAFKA_HOME/config/server.properties中通过replica.lag.time.max.ms来配置,其默认值是10000)未向Leader发送fetch请求。

Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,完全同步复制要求所有能工作的Follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率(高吞吐率是Kafka非常重要的一个特性)。而异步复制方式下,Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种情况下如果Follower都复制完都落后于Leader,而如果Leader突然宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。Follower可以批量的从Leader复制数据,这样极大的提高复制性能(批量写磁盘),极大减少了Follower与Leader的差距。

需要说明的是,Kafka只解决fail/recover,不处理“Byzantine”(“拜占庭”)问题。一条消息只有被ISR里的所有Follower都从Leader复制过去才会被认为已提交。这样就避免了部分数据被写进了Leader,还没来得及被任何Follower复制就宕机了,而造成数据丢失(Consumer无法消费这些数据)。而对于Producer而言,它可以选择是否等待消息commit,这可以通过request.required.acks来设置。这种机制确保了只要ISR有一个或以上的Follower,一条被commit的消息就不会丢失。

Leader Election算法

上文说明了Kafka是如何做Replication的,另外一个很重要的问题是当Leader宕机了,怎样在Follower中选举出新的Leader。因为Follower可能落后许多或者crash了,所以必须确保选择“最新”的Follower作为新的Leader。一个基本的原则就是,如果Leader不在了,新的Leader必须拥有原来的Leader commit过的所有消息。这就需要作一个折衷,如果Leader在标明一条消息被commit前等待更多的Follower确认,那在它宕机之后就有更多的Follower可以作为新的Leader,但这也会造成吞吐率的下降。

一种非常常用的选举leader的方式是“Majority Vote”(“少数服从多数”),但Kafka并未采用这种方式。这种模式下,如果我们有2f+1个Replica(包含Leader和Follower),那在commit之前必须保证有f+1个Replica复制完消息,为了保证正确选出新的Leader,fail的Replica不能超过f个。因为在剩下的任意f+1个Replica里,至少有一个Replica包含有最新的所有消息。这种方式有个很大的优势,系统的latency只取决于最快的几个Broker,而非最慢那个。Majority Vote也有一些劣势,为了保证Leader Election的正常进行,它所能容忍的fail的follower个数比较少。如果要容忍1个follower挂掉,必须要有3个以上的Replica,如果要容忍2个Follower挂掉,必须要有5个以上的Replica。也就是说,在生产环境下为了保证较高的容错程度,必须要有大量的Replica,而大量的Replica又会在大数据量下导致性能的急剧下降。这就是这种算法更多用在ZooKeeper这种共享集群配置的系统中而很少在需要存储大量数据的系统中使用的原因。例如HDFS的HA Feature是基于majority-vote-based journal,但是它的数据存储并没有使用这种方式。

实际上,Leader Election算法非常多,比如ZooKeeper的ZabRaftViewstamped Replication。而Kafka所使用的Leader Election算法更像微软的PacificA算法。

Kafka在ZooKeeper中动态维护了一个ISR(in-sync replicas),这个ISR里的所有Replica都跟上了leader,只有ISR里的成员才有被选为Leader的可能。在这种模式下,对于f+1个Replica,一个Partition能在保证不丢失已经commit的消息的前提下容忍f个Replica的失败。在大多数使用场景中,这种模式是非常有利的。事实上,为了容忍f个Replica的失败,Majority Vote和ISR在commit前需要等待的Replica数量是一样的,但是ISR需要的总的Replica的个数几乎是Majority Vote的一半。

虽然Majority Vote与ISR相比有不需等待最慢的Broker这一优势,但是Kafka作者认为Kafka可以通过Producer选择是否被commit阻塞来改善这一问题,并且节省下来的Replica和磁盘使得ISR模式仍然值得。

如何处理所有Replica都不工作

上文提到,在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某个Partition的所有Replica都宕机了,就无法保证数据不丢失了。这种情况下有两种可行的方案:

  • 等待ISR中的任一个Replica“活”过来,并且选它作为Leader
  • 选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader

这就需要在可用性和一致性当中作出一个简单的折衷。如果一定要等待ISR中的Replica“活”过来,那不可用的时间就可能会相对较长。而且如果ISR中的所有Replica都无法“活”过来了,或者数据都丢失了,这个Partition将永远不可用。选择第一个“活”过来的Replica作为Leader,而这个Replica不是ISR中的Replica,那即使它并不保证已经包含了所有已commit的消息,它也会成为Leader而作为consumer的数据源(前文有说明,所有读写都由Leader完成)。Kafka0.8.*使用了第二种方式。根据Kafka的文档,在以后的版本中,Kafka支持用户通过配置选择这两种方式中的一种,从而根据不同的使用场景选择高可用性还是强一致性。

如何选举Leader

最简单最直观的方案是,所有Follower都在ZooKeeper上设置一个Watch,一旦Leader宕机,其对应的ephemeral znode会自动删除,此时所有Follower都尝试创建该节点,而创建成功者(ZooKeeper保证只有一个能创建成功)即是新的Leader,其它Replica即为Follower。

但是该方法会有3个问题:

  • split-brain 这是由ZooKeeper的特性引起的,虽然ZooKeeper能保证所有Watch按顺序触发,但并不能保证同一时刻所有Replica“看”到的状态是一样的,这就可能造成不同Replica的响应不一致
  • herd effect 如果宕机的那个Broker上的Partition比较多,会造成多个Watch被触发,造成集群内大量的调整
  • ZooKeeper负载过重 每个Replica都要为此在ZooKeeper上注册一个Watch,当集群规模增加到几千个Partition时ZooKeeper负载会过重。

Kafka 0.8.*的Leader Election方案解决了上述问题,它在所有broker中选出一个controller,所有Partition的Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的方式(比ZooKeeper Queue的方式更高效)通知需为为此作为响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。

HA相关ZooKeeper结构

首先声明本节所示ZooKeeper结构中,实线框代表路径名是固定的,而虚线框代表路径名与业务相关

admin (该目录下znode只有在有相关操作时才会存在,操作结束时会将其删除)

/admin/preferred_replica_election数据结构

{
   "fields":[
      {
         "name":"version",
         "type":"int",
         "doc":"version id"
      },
      {
         "name":"partitions",
         "type":{
            "type":"array",
            "items":{
               "fields":[
                  {
                     "name":"topic",
                     "type":"string",
                     "doc":"topic of the partition for which preferred replica election should be triggered"
                  },
                  {
                     "name":"partition",
                     "type":"int",
                     "doc":"the partition for which preferred replica election should be triggered"
                  }
               ],
            }
            "doc":"an array of partitions for which preferred replica election should be triggered"
         }
      }
   ]
}

Example:

{
  "version": 1,
  "partitions":
     [
        {
            "topic": "topic1",
            "partition": 8         
        },
        {
            "topic": "topic2",
            "partition": 16        
        }
     ]            
}

/admin/reassign_partitions用于将一些Partition分配到不同的broker集合上。对于每个待重新分配的Partition,Kafka会在该znode上存储其所有的Replica和相应的Broker id。该znode由管理进程创建并且一旦重新分配成功它将会被自动移除。其数据结构如下:

{ 
"fields":[ 
{ 
"name":"version", 
"type":"int", 
"doc":"version id" 
}, 
{ 
"name":"partitions", 
"type":{ 
"type":"array", 
"items":{ 
"fields":[ 
{ 
"name":"topic", 
"type":"string", 
"doc":"topic of the partition to be reassigned" 
}, 
{ 
"name":"partition", 
"type":"int", 
"doc":"the partition to be reassigned" 
}, 
{ 
"name":"replicas", 
"type":"array", 
"items":"int", 
"doc":"a list of replica ids" 
} 
], 
} 
"doc":"an array of partitions to be reassigned to new replicas" 
} 
} 
] 
}
Example:
{
  "version": 1,
  "partitions":
     [
        {
            "topic": "topic3",
            "partition": 1,
            "replicas": [1, 2, 3]
        }
     ]            
}

/admin/delete_topics数据结构:

Schema:
{ "fields":
    [ {"name": "version", "type": "int", "doc": "version id"},
      {"name": "topics",
       "type": { "type": "array", "items": "string", "doc": "an array of topics to be deleted"}
      } ]
}

Example:
{
  "version": 1,
  "topics": ["topic4", "topic5"]
}

brokers

broker(即/brokers/ids/[brokerId])存储“活着”的broker信息。数据结构如下:

Schema:
{ "fields":
    [ {"name": "version", "type": "int", "doc": "version id"},
      {"name": "host", "type": "string", "doc": "ip address or host name of the broker"},
      {"name": "port", "type": "int", "doc": "port of the broker"},
      {"name": "jmx_port", "type": "int", "doc": "port for jmx"}
    ]
}

Example:
{
    "jmx_port":-1,
    "host":"node1",
    "version":1,
    "port":9092
}

topic注册信息(/brokers/topics/[topic]),存储该topic的所有partition的所有replica所在的broker id,第一个replica即为preferred replica,对一个给定的partition,它在同一个broker上最多只有一个replica,因此broker id可作为replica id。数据结构如下:

Schema:
{ "fields" :
    [ {"name": "version", "type": "int", "doc": "version id"},
      {"name": "partitions",
       "type": {"type": "map",
                "values": {"type": "array", "items": "int", "doc": "a list of replica ids"},
                "doc": "a map from partition id to replica list"},
      }
    ]
}
Example:
{
    "version":1,
    "partitions":
        {"12":[6],
        "8":[2],
        "4":[6],
        "11":[5],
        "9":[3],
        "5":[7],
        "10":[4],
        "6":[8],
        "1":[3],
        "0":[2],
        "2":[4],
        "7":[1],
        "3":[5]}
}

partition state(/brokers/topics/[topic]/partitions/[partitionId]/state) 结构如下:

Schema:
{ "fields":
    [ {"name": "version", "type": "int", "doc": "version id"},
      {"name": "isr",
       "type": {"type": "array",
                "items": "int",
                "doc": "an array of the id of replicas in isr"}
      },
      {"name": "leader", "type": "int", "doc": "id of the leader replica"},
      {"name": "controller_epoch", "type": "int", "doc": "epoch of the controller that last updated the leader and isr info"},
      {"name": "leader_epoch", "type": "int", "doc": "epoch of the leader"}
    ]
}

Example:
{
    "controller_epoch":29,
    "leader":2,
    "version":1,
    "leader_epoch":48,
    "isr":[2]
}

controller 
/controller -> int (broker id of the controller)存储当前controller的信息

Schema:
{ "fields":
    [ {"name": "version", "type": "int", "doc": "version id"},
      {"name": "brokerid", "type": "int", "doc": "broker id of the controller"}
    ]
}
Example:
{
    "version":1,
  "brokerid":8
}

/controller_epoch -> int (epoch)直接以整数形式存储controller epoch,而非像其它znode一样以JSON字符串形式存储。

broker failover过程简介

  1. Controller在ZooKeeper注册Watch,一旦有Broker宕机(这是用宕机代表任何让系统认为其die的情景,包括但不限于机器断电,网络不可用,GC导致的Stop The World,进程crash等),其在ZooKeeper对应的znode会自动被删除,ZooKeeper会fire Controller注册的watch,Controller读取最新的幸存的Broker。
  2. Controller决定set_p,该集合包含了宕机的所有Broker上的所有Partition。
  3. 对set_p中的每一个Partition

    3.1 从/brokers/topics/[topic]/partitions/[partition]/state读取该Partition当前的ISR

    3.2 决定该Partition的新Leader。如果当前ISR中有至少一个Replica还幸存,则选择其中一个作为新Leader,新的ISR则包含当前ISR中所有幸存的Replica。否则选择该Partition中任意一个幸存的Replica作为新的Leader以及ISR(该场景下可能会有潜在的数据丢失)。如果该Partition的所有Replica都宕机了,则将新的Leader设置为-1。

    3.3 将新的Leader,ISR和新的leader_epochcontroller_epoch写入/brokers/topics/[topic]/partitions/[partition]/state。注意,该操作只有其version在3.1至3.3的过程中无变化时才会执行,否则跳转到3.1

  4. 直接通过RPC向set_p相关的Broker发送LeaderAndISRRequest命令。Controller可以在一个RPC操作中发送多个命令从而提高效率。

    broker failover顺序图如下所示。

Kafka设计解析(三):Kafka High Availability (下)

Kafka是由LinkedIn开发的一个分布式的消息系统,使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark都支持与Kafka集成。InfoQ一直在紧密关注Kafka的应用以及发展,“Kafka剖析”专栏将会从架构设计、实现、应用场景、性能等方面深度解析Kafka。

本文在上篇文章基础上,更加深入讲解了Kafka的HA机制,主要阐述了HA相关各种场景,如Broker failover、Controller failover、Topic创建/删除、Broker启动、Follower从Leader fetch数据等详细处理过程。同时介绍了Kafka提供的与Replication相关的工具,如重新分配Partition等。

Broker Failover过程

Controller对Broker failure的处理过程

  1. Controller在ZooKeeper的/brokers/ids节点上注册Watch。一旦有Broker宕机(本文用宕机代表任何让Kafka认为其Broker die的情景,包括但不限于机器断电,网络不可用,GC导致的Stop The World,进程crash等),其在ZooKeeper对应的Znode会自动被删除,ZooKeeper会fire Controller注册的Watch,Controller即可获取最新的幸存的Broker列表。
  2. Controller决定set_p,该集合包含了宕机的所有Broker上的所有Partition。
  3. 对set_p中的每一个Partition:

    3.1 从/brokers/topics/[topic]/partitions/[partition]/state读取该Partition当前的ISR。

    3.2 决定该Partition的新Leader。如果当前ISR中有至少一个Replica还幸存,则选择其中一个作为新Leader,新的ISR则包含当前ISR中所有幸存的Replica。否则选择该Partition中任意一个幸存的Replica作为新的Leader以及ISR(该场景下可能会有潜在的数据丢失)。如果该Partition的所有Replica都宕机了,则将新的Leader设置为-1。

    3.3 将新的Leader,ISR和新的leader_epochcontroller_epoch写入/brokers/topics/[topic]/partitions/[partition]/state。注意,该操作只有Controller版本在3.1至3.3的过程中无变化时才会执行,否则跳转到3.1。

  4. 直接通过RPC向set_p相关的Broker发送LeaderAndISRRequest命令。Controller可以在一个RPC操作中发送多个命令从而提高效率。

    Broker failover顺序图如下所示。

LeaderAndIsrRequest结构如下

LeaderAndIsrResponse结构如下

创建/删除Topic

  1. Controller在ZooKeeper的/brokers/topics节点上注册Watch,一旦某个Topic被创建或删除,则Controller会通过Watch得到新创建/删除的Topic的Partition/Replica分配。
  2. 对于删除Topic操作,Topic工具会将该Topic名字存于/admin/delete_topics。若delete.topic.enable为true,则Controller注册在/admin/delete_topics上的Watch被fire,Controller通过回调向对应的Broker发送StopReplicaRequest,若为false则Controller不会在/admin/delete_topics上注册Watch,也就不会对该事件作出反应。
  3. 对于创建Topic操作,Controller从/brokers/ids读取当前所有可用的Broker列表,对于set_p中的每一个Partition:

    3.1 从分配给该Partition的所有Replica(称为AR)中任选一个可用的Broker作为新的Leader,并将AR设置为新的ISR(因为该Topic是新创建的,所以AR中所有的Replica都没有数据,可认为它们都是同步的,也即都在ISR中,任意一个Replica都可作为Leader)

    3.2 将新的Leader和ISR写入/brokers/topics/[topic]/partitions/[partition]

  4. 直接通过RPC向相关的Broker发送LeaderAndISRRequest。

    创建Topic顺序图如下所示。

Broker响应请求流程

Broker通过kafka.network.SocketServer及相关模块接受各种请求并作出响应。整个网络通信模块基于Java NIO开发,并采用Reactor模式,其中包含1个Acceptor负责接受客户请求,N个Processor负责读写数据,M个Handler处理业务逻辑。

Acceptor的主要职责是监听并接受客户端(请求发起方,包括但不限于Producer,Consumer,Controller,Admin Tool)的连接请求,并建立和客户端的数据传输通道,然后为该客户端指定一个Processor,至此它对该客户端该次请求的任务就结束了,它可以去响应下一个客户端的连接请求了。其核心代码如下。

Processor主要负责从客户端读取数据并将响应返回给客户端,它本身并不处理具体的业务逻辑,并且其内部维护了一个队列来保存分配给它的所有SocketChannel。Processor的run方法会循环从队列中取出新的SocketChannel并将其SelectionKey.OP_READ注册到selector上,然后循环处理已就绪的读(请求)和写(响应)。Processor读取完数据后,将其封装成Request对象并将其交给RequestChannel。

RequestChannel是Processor和KafkaRequestHandler交换数据的地方,它包含一个队列requestQueue用来存放Processor加入的Request,KafkaRequestHandler会从里面取出Request来处理;同时它还包含一个respondQueue,用来存放KafkaRequestHandler处理完Request后返还给客户端的Response。

Processor会通过processNewResponses方法依次将requestChannel中responseQueue保存的Response取出,并将对应的SelectionKey.OP_WRITE事件注册到selector上。当selector的select方法返回时,对检测到的可写通道,调用write方法将Response返回给客户端。

KafkaRequestHandler循环从RequestChannel中取Request并交给kafka.server.KafkaApis处理具体的业务逻辑。

LeaderAndIsrRequest响应过程

对于收到的LeaderAndIsrRequest,Broker主要通过ReplicaManager的becomeLeaderOrFollower处理,流程如下:

  1. 若请求中controllerEpoch小于当前最新的controllerEpoch,则直接返回ErrorMapping.StaleControllerEpochCode。
  2. 对于请求中partitionStateInfos中的每一个元素,即((topic, partitionId), partitionStateInfo):

    2.1 若partitionStateInfo中的leader epoch大于当前ReplicManager中存储的(topic, partitionId)对应的partition的leader epoch,则:

    2.1.1 若当前brokerid(或者说replica id)在partitionStateInfo中,则将该partition及partitionStateInfo存入一个名为partitionState的HashMap中

    2.1.2 否则说明该Broker不在该Partition分配的Replica list中,将该信息记录于log中

    2.2 否则将相应的Error code(ErrorMapping.StaleLeaderEpochCode)存入Response中

  3. 筛选出partitionState中Leader与当前Broker ID相等的所有记录存入partitionsTobeLeader中,其它记录存入partitionsToBeFollower中。
  4. 若partitionsTobeLeader不为空,则对其执行makeLeaders方。
  5. 若partitionsToBeFollower不为空,则对其执行makeFollowers方法。
  6. 若highwatermak线程还未启动,则将其启动,并将hwThreadInitialized设为true。
  7. 关闭所有Idle状态的Fetcher。

LeaderAndIsrRequest处理过程如下图所示

Broker启动过程

Broker启动后首先根据其ID在ZooKeeper的/brokers/idszonde下创建临时子节点(Ephemeral node),创建成功后Controller的ReplicaStateMachine注册其上的Broker Change Watch会被fire,从而通过回调KafkaController.onBrokerStartup方法完成以下步骤:

  1. 向所有新启动的Broker发送UpdateMetadataRequest,其定义如下。

  2. 将新启动的Broker上的所有Replica设置为OnlineReplica状态,同时这些Broker会为这些Partition启动high watermark线程。
  3. 通过partitionStateMachine触发OnlinePartitionStateChange。

Controller Failover

Controller也需要Failover。每个Broker都会在Controller Path (/controller)上注册一个Watch。当前Controller失败时,对应的Controller Path会自动消失(因为它是Ephemeral Node),此时该Watch被fire,所有“活”着的Broker都会去竞选成为新的Controller(创建新的Controller Path),但是只会有一个竞选成功(这点由ZooKeeper保证)。竞选成功者即为新的Leader,竞选失败者则重新在新的Controller Path上注册Watch。因为ZooKeeper的Watch是一次性的,被fire一次之后即失效,所以需要重新注册。

Broker成功竞选为新Controller后会触发KafkaController.onControllerFailover方法,并在该方法中完成如下操作:

  1. 读取并增加Controller Epoch。
  2. 在ReassignedPartitions Patch(/admin/reassign_partitions)上注册Watch。
  3. 在PreferredReplicaElection Path(/admin/preferred_replica_election)上注册Watch。
  4. 通过partitionStateMachine在Broker Topics Patch(/brokers/topics)上注册Watch。
  5. delete.topic.enable设置为true(默认值是false),则partitionStateMachine在Delete Topic Patch(/admin/delete_topics)上注册Watch。
  6. 通过replicaStateMachine在Broker Ids Patch(/brokers/ids)上注册Watch。
  7. 初始化ControllerContext对象,设置当前所有Topic,“活”着的Broker列表,所有Partition的Leader及ISR等。
  8. 启动replicaStateMachine和partitionStateMachine。
  9. 将brokerState状态设置为RunningAsController。
  10. 将每个Partition的Leadership信息发送给所有“活”着的Broker。
  11. auto.leader.rebalance.enable配置为true(默认值是true),则启动partition-rebalance线程。
  12. delete.topic.enable设置为true且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。

Partition重新分配

管理工具发出重新分配Partition请求后,会将相应信息写到/admin/reassign_partitions上,而该操作会触发ReassignedPartitionsIsrChangeListener,从而通过执行回调函数KafkaController.onPartitionReassignment来完成以下操作:

  1. 将ZooKeeper中的AR(Current Assigned Replicas)更新为OAR(Original list of replicas for partition) + RAR(Reassigned replicas)。
  2. 强制更新ZooKeeper中的leader epoch,向AR中的每个Replica发送LeaderAndIsrRequest。
  3. 将RAR - OAR中的Replica设置为NewReplica状态。
  4. 等待直到RAR中所有的Replica都与其Leader同步。
  5. 将RAR中所有的Replica都设置为OnlineReplica状态。
  6. 将Cache中的AR设置为RAR。
  7. 若Leader不在RAR中,则从RAR中重新选举出一个新的Leader并发送LeaderAndIsrRequest。若新的Leader不是从RAR中选举而出,则还要增加ZooKeeper中的leader epoch。
  8. 将OAR - RAR中的所有Replica设置为OfflineReplica状态,该过程包含两部分。第一,将ZooKeeper上ISR中的OAR - RAR移除并向Leader发送LeaderAndIsrRequest从而通知这些Replica已经从ISR中移除;第二,向OAR - RAR中的Replica发送StopReplicaRequest从而停止不再分配给该Partition的Replica。
  9. 将OAR - RAR中的所有Replica设置为NonExistentReplica状态从而将其从磁盘上删除。
  10. 将ZooKeeper中的AR设置为RAR。
  11. 删除/admin/reassign_partition

注意:最后一步才将ZooKeeper中的AR更新,因为这是唯一一个持久存储AR的地方,如果Controller在这一步之前crash,新的Controller仍然能够继续完成该过程。

以下是Partition重新分配的案例,OAR = {1,2,3},RAR = {4,5,6},Partition重新分配过程中ZooKeeper中的AR和Leader/ISR路径如下

ARleader/isrSttep
{1,2,3} 1/{1,2,3} (initial state)
{1,2,3,4,5,6} 1/{1,2,3} (step 2)
{1,2,3,4,5,6} 1/{1,2,3,4,5,6} (step 4)
{1,2,3,4,5,6} 4/{1,2,3,4,5,6} (step 7)
{1,2,3,4,5,6} 4/{4,5,6} (step 8)
{4,5,6} 4/{4,5,6} (step 10)

Follower从Leader Fetch数据

Follower通过向Leader发送FetchRequest获取消息,FetchRequest结构如下

从FetchRequest的结构可以看出,每个Fetch请求都要指定最大等待时间和最小获取字节数,以及由TopicAndPartition和PartitionFetchInfo构成的Map。实际上,Follower从Leader数据和Consumer从Broker Fetch数据,都是通过FetchRequest请求完成,所以在FetchRequest结构中,其中一个字段是clientID,并且其默认值是ConsumerConfig.DefaultClientId。

Leader收到Fetch请求后,Kafka通过KafkaApis.handleFetchRequest响应该请求,响应过程如下:

  1. replicaManager根据请求读出数据存入dataRead中。
  2. 如果该请求来自Follower则更新其相应的LEO(log end offset)以及相应Partition的High Watermark
  3. 根据dataRead算出可读消息长度(单位为字节)并存入bytesReadable中。
  4. 满足下面4个条件中的1个,则立即将相应的数据返回
    • Fetch请求不希望等待,即fetchRequest.macWait <= 0
    • Fetch请求不要求一定能取到消息,即fetchRequest.numPartitions <= 0,也即requestInfo为空
    • 有足够的数据可供返回,即bytesReadable >= fetchRequest.minBytes
    • 读取数据时发生异常
  5. 若不满足以上4个条件,FetchRequest将不会立即返回,并将该请求封装成DelayedFetch。检查该DeplayedFetch是否满足,若满足则返回请求,否则将该请求加入Watch列表

Leader通过以FetchResponse的形式将消息返回给Follower,FetchResponse结构如下

Replication工具

Topic Tool

$KAFKA_HOME/bin/kafka-topics.sh,该工具可用于创建、删除、修改、查看某个Topic,也可用于列出所有Topic。另外,该工具还可修改某个Topic的以下配置。

unclean.leader.election.enable
delete.retention.ms
segment.jitter.ms
retention.ms
flush.ms
segment.bytes
flush.messages
segment.ms
retention.bytes
cleanup.policy
segment.index.bytes
min.cleanable.dirty.ratio
max.message.bytes
file.delete.delay.ms
min.insync.replicas
index.interval.bytes

Replica Verification Tool

$KAFKA_HOME/bin/kafka-replica-verification.sh,该工具用来验证所指定的一个或多个Topic下每个Partition对应的所有Replica是否都同步。可通过topic-white-list这一参数指定所需要验证的所有Topic,支持正则表达式。

Preferred Replica Leader Election Tool

用途

有了Replication机制后,每个Partition可能有多个备份。某个Partition的Replica列表叫作AR(Assigned Replicas),AR中的第一个Replica即为“Preferred Replica”。创建一个新的Topic或者给已有Topic增加Partition时,Kafka保证Preferred Replica被均匀分布到集群中的所有Broker上。理想情况下,Preferred Replica会被选为Leader。以上两点保证了所有Partition的Leader被均匀分布到了集群当中,这一点非常重要,因为所有的读写操作都由Leader完成,若Leader分布过于集中,会造成集群负载不均衡。但是,随着集群的运行,该平衡可能会因为Broker的宕机而被打破,该工具就是用来帮助恢复Leader分配的平衡。

事实上,每个Topic从失败中恢复过来后,它默认会被设置为Follower角色,除非某个Partition的Replica全部宕机,而当前Broker是该Partition的AR中第一个恢复回来的Replica。因此,某个Partition的Leader(Preferred Replica)宕机并恢复后,它很可能不再是该Partition的Leader,但仍然是Preferred Replica。

原理

1. 在ZooKeeper上创建/admin/preferred_replica_election节点,并存入需要调整Preferred Replica的Partition信息。

2. Controller一直Watch该节点,一旦该节点被创建,Controller会收到通知,并获取该内容。

3. Controller读取Preferred Replica,如果发现该Replica当前并非是Leader并且它在该Partition的ISR中,Controller向该Replica发送LeaderAndIsrRequest,使该Replica成为Leader。如果该Replica当前并非是Leader,且不在ISR中,Controller为了保证没有数据丢失,并不会将其设置为Leader。

用法

$KAFKA_HOME/bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181

在包含8个Broker的Kafka集群上,创建1个名为topic1,replication-factor为3,Partition数为8的Topic,使用如下命令查看其Partition/Replica分布。

$KAFKA_HOME/bin/kafka-topics.sh --describe --topic topic1 --zookeeper localhost:2181

查询结果如下图所示,从图中可以看到,Kafka将所有Replica均匀分布到了整个集群,并且Leader也均匀分布。

手动停止部分Broker,topic1的Partition/Replica分布如下图所示。从图中可以看到,由于Broker 1/2/4都被停止,Partition 0的Leader由原来的1变为3,Partition 1的Leader由原来的2变为5,Partition 2的Leader由原来的3变为6,Partition 3的Leader由原来的4变为7。

再重新启动ID为1的Broker,topic1的Partition/Replica分布如下。可以看到,虽然Broker 1已经启动(Partition 0和Partition5的ISR中有1),但是1并不是任何一个Parititon的Leader,而Broker 5/6/7都是2个Partition的Leader,即Leader的分布不均衡——一个Broker最多是2个Partition的Leader,而最少是0个Partition的Leader。

运行该工具后,topic1的Partition/Replica分布如下图所示。由图可见,除了Partition 1和Partition 3由于Broker 2和Broker 4还未启动,所以其Leader不是其Preferred Repliac外,其它所有Partition的Leader都是其Preferred Replica。同时,与运行该工具前相比,Leader的分配更均匀——一个Broker最多是2个Parittion的Leader,最少是1个Partition的Leader。

启动Broker 2和Broker 4,Leader分布与上一步相比并未变化,如下图所示。

再次运行该工具,所有Partition的Leader都由其Preferred Replica承担,Leader分布更均匀——每个Broker承担1个Partition的Leader角色。

除了手动运行该工具使Leader分配均匀外,Kafka还提供了自动平衡Leader分配的功能,该功能可通过将auto.leader.rebalance.enable设置为true开启,它将周期性检查Leader分配是否平衡,若不平衡度超过一定阈值则自动由Controller尝试将各Partition的Leader设置为其Preferred Replica。检查周期由leader.imbalance.check.interval.seconds指定,不平衡度阈值由leader.imbalance.per.broker.percentage指定。

Kafka Reassign Partitions Tool

用途

该工具的设计目标与Preferred Replica Leader Election Tool有些类似,都旨在促进Kafka集群的负载均衡。不同的是,Preferred Replica Leader Election只能在Partition的AR范围内调整其Leader,使Leader分布均匀,而该工具还可以调整Partition的AR。

Follower需要从Leader Fetch数据以保持与Leader同步,所以仅仅保持Leader分布的平衡对整个集群的负载均衡来说是不够的。另外,生产环境下,随着负载的增大,可能需要给Kafka集群扩容。向Kafka集群中增加Broker非常简单方便,但是对于已有的Topic,并不会自动将其Partition迁移到新加入的Broker上,此时可用该工具达到此目的。某些场景下,实际负载可能远小于最初预期负载,此时可用该工具将分布在整个集群上的Partition重装分配到某些机器上,然后可以停止不需要的Broker从而实现节约资源的目的。

需要说明的是,该工具不仅可以调整Partition的AR位置,还可调整其AR数量,即改变该Topic的replication factor。

原理

该工具只负责将所需信息存入ZooKeeper中相应节点,然后退出,不负责相关的具体操作,所有调整都由Controller完成。

1. 在ZooKeeper上创建/admin/reassign_partitions节点,并存入目标Partition列表及其对应的目标AR列表。

2. Controller注册在/admin/reassign_partitions上的Watch被fire,Controller获取该列表。

3. 对列表中的所有Partition,Controller会做如下操作:

  • 启动RAR - AR中的Replica,即新分配的Replica。(RAR = Reassigned Replicas, AR = Assigned Replicas)
  • 等待新的Replica与Leader同步
  • 如果Leader不在RAR中,从RAR中选出新的Leader
  • 停止并删除AR - RAR中的Replica,即不再需要的Replica
  • 删除/admin/reassign_partitions节点

用法

该工具有三种使用模式

  • generate模式,给定需要重新分配的Topic,自动生成reassign plan(并不执行)
  • execute模式,根据指定的reassign plan重新分配Partition
  • verify模式,验证重新分配Partition是否成功

下面这个例子将使用该工具将Topic的所有Partition重新分配到Broker 4/5/6/7上,步骤如下:

1. 使用generate模式,生成reassign plan

指定需要重新分配的Topic ({"topics":[{"topic":"topic1"}],"version":1}),并存入/tmp/topics-to-move.json文件中,然后执行如下命令

$KAFKA_HOME/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181
--topics-to-move-json-file /tmp/topics-to-move.json 
--broker-list "4,5,6,7" --generate

结果如下图所示

2. 使用execute模式,执行reassign plan

将上一步生成的reassignment plan存入/tmp/reassign-plan.json文件中,并执行

$KAFKA_HOME/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
--reassignment-json-file /tmp/reassign-plan.json --execute

此时,ZooKeeper上/admin/reassign_partitions节点被创建,且其值与/tmp/reassign-plan.json文件的内容一致。

3. 使用verify模式,验证reassign是否完成

执行verify命令

$KAFKA_HOME/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
--reassignment-json-file /tmp/reassign-plan.json --verify

结果如下所示,从图中可看出topic1的所有Partititon都根据reassign plan重新分配成功。

接下来用Topic Tool再次验证。

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic1

结果如下图所示,从图中可看出topic1的所有Partition都被重新分配到Broker 4/5/6/7,且每个Partition的AR与reassign plan一致。

需要说明的是,在使用execute之前,并不一定要使用generate模式自动生成reassign plan,使用generate模式只是为了方便。事实上,某些场景下,generate模式生成的reassign plan并不一定能满足需求,此时用户可以自己设置reassign plan。

State Change Log Merge Tool

用途

该工具旨在从整个集群的Broker上收集状态改变日志,并生成一个集中的格式化的日志以帮助诊断状态改变相关的故障。每个Broker都会将其收到的状态改变相关的的指令存于名为state-change.log的日志文件中。某些情况下,Partition的Leader election可能会出现问题,此时我们需要对整个集群的状态改变有个全局的了解从而诊断故障并解决问题。该工具将集群中相关的state-change.log日志按时间顺序合并,同时支持用户输入时间范围和目标Topic及Partition作为过滤条件,最终将格式化的结果输出。

用法

bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger 
--logs /opt/kafka_2.11-0.8.2.1/logs/state-change.log 
--topic topic1 --partitions 0,1,2,3,4,5,6,7

Kafka设计解析(四):Kafka Consumer解析

High Level Consumer

很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理。同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被所有Consumer消费(广播)。因此,Kafka High Level Consumer提供了一个从Kafka消费数据的高层抽象,从而屏蔽掉其中的细节并提供丰富的语义。

Consumer Group

High Level Consumer将从某个Partition读取的最后一条消息的offset存于ZooKeeper中(Kafka从0.8.2版本开始同时支持将offset存于Zookeeper中与将offset存于专用的Kafka Topic中)。这个offset基于客户程序提供给Kafka的名字来保存,这个名字被称为Consumer Group。Consumer Group是整个Kafka集群全局的,而非某个Topic的。每一个High Level Consumer实例都属于一个Consumer Group,若不指定则属于默认的Group。ZooKeeper中Consumer相关节点如下图所示:

很多传统的Message Queue都会在消息被消费完后将消息删除,一方面避免重复消费,另一方面可以保证Queue的长度比较短,提高效率。而如上文所述,Kafka并不删除已消费的消息,为了实现传统Message Queue消息只被消费一次的语义,Kafka保证每条消息在同一个Consumer Group里只会被某一个Consumer消费。与传统Message Queue不同的是,Kafka还允许不同Consumer Group同时消费同一条消息,这一特性可以为消息的多元化处理提供支持。

实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用 Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的Consumer在不同的 Consumer Group即可。下图展示了Kafka在LinkedIn的一种简化部署模型。

为了更清晰展示Kafka Consumer Group的特性,笔者进行了一项测试。创建一个Topic (名为topic1),再创建一个属于group1的Consumer实例,并创建三个属于group2的Consumer实例,然后通过 Producer向topic1发送Key分别为1,2,3的消息。结果发现属于group1的Consumer收到了所有的这三条消息,同时 group2中的3个Consumer分别收到了Key为1,2,3的消息,如下图所示。

(点击放大图像)

注:上图中每个黑色区域代表一个Consumer实例,每个实例只创建一个MessageStream。实际上,本实验将Consumer应用程序打成jar包,并在4个不同的命令行终端中传入不同的参数运行。

High Level Consumer Rebalance

注:本节所讲述Rebalance相关内容均基于Kafka High Level Consumer。

Kafka保证同一Consumer Group中只有一个Consumer会消费某条消息,实际上,Kafka保证的是稳定状态下每一个Consumer实例只会消费某一个或多个特定 Partition的数据,而某个Partition的数据只会被某一个特定的Consumer实例所消费。也就是说Kafka对消息的分配是以 Partition为单位分配的,而非以每一条消息作为分配单元。这样设计的劣势是无法保证同一个Consumer Group里的Consumer均匀消费数据,优势是每个Consumer不用都跟大量的Broker通信,减少通信开销,同时也降低了分配难度,实现也更简单。另外,因为同一个Partition里的数据是有序的,这种设计可以保证每个Partition里的数据可以被有序消费。

如果某Consumer Group中Consumer(每个Consumer只创建1个MessageStream)数量少于Partition数量,则至少有一个 Consumer会消费多个Partition的数据,如果Consumer的数量与Partition数量相同,则正好一个Consumer消费一个 Partition的数据。而如果Consumer的数量多于Partition的数量时,会有部分Consumer无法消费该Topic下任何一条消息。

如下例所示,如果topic1有0,1,2共三个Partition,当group1只有一个Consumer(名为consumer1)时,该 Consumer可消费这3个Partition的所有数据。

增加一个Consumer(consumer2)后,其中一个Consumer(consumer1)可消费2个Partition的数据(Partition 0和Partition 1),另外一个Consumer(consumer2)可消费另外一个Partition(Partition 2)的数据。

再增加一个Consumer(consumer3)后,每个Consumer可消费一个Partition的数据。consumer1消费partition0,consumer2消费partition1,consumer3消费partition2。

再增加一个Consumer(consumer4)后,其中3个Consumer可分别消费一个Partition的数据,另外一个Consumer(consumer4)不能消费topic1的任何数据。

此时关闭consumer1,其余3个Consumer可分别消费一个Partition的数据。

接着关闭consumer2,consumer3可消费2个Partition,consumer4可消费1个Partition。

再关闭consumer3,仅存的consumer4可同时消费topic1的3个Partition。

Consumer Rebalance的算法如下:

  • 将目标Topic下的所有Partirtion排序,存于PT
  • 对某Consumer Group下所有Consumer排序,存于CG,第i个Consumer记为Ci
  • N=size(PT)/size(CG),向上取整
  • 解除Ci对原来分配的Partition的消费权(i从0开始)
  • 将第i∗N到(i+1)∗N−1个Partition分配给Ci

目前,最新版(0.8.2.1)Kafka的Consumer Rebalance的控制策略是由每一个Consumer通过在Zookeeper上注册Watch完成的。每个Consumer被创建时会触发 Consumer Group的Rebalance,具体启动流程如下:

  • High Level Consumer启动时将其ID注册到其Consumer Group下,在Zookeeper上的路径为/consumers/[consumer group]/ids/[consumer id]
  • /consumers/[consumer group]/ids上注册Watch
  • /brokers/ids上注册Watch
  • 如果Consumer通过Topic Filter创建消息流,则它会同时在/brokers/topics上也创建Watch
  • 强制自己在其Consumer Group内启动Rebalance流程

在这种策略下,每一个Consumer或者Broker的增加或者减少都会触发 Consumer Rebalance。因为每个Consumer只负责调整自己所消费的Partition,为了保证整个Consumer Group的一致性,当一个Consumer触发了Rebalance时,该Consumer Group内的其它所有其它Consumer也应该同时触发Rebalance。

该方式有如下缺陷:

根据Kafka社区wiki,Kafka作者正在考虑在还未发布的0.9.x版本中使用中心协调器(Coordinator)。大体思想是为所有Consumer Group的子集选举出一个Broker作为Coordinator,由它Watch Zookeeper,从而判断是否有Partition或者Consumer的增减,然后生成Rebalance命令,并检查是否这些Rebalance 在所有相关的Consumer中被执行成功,如果不成功则重试,若成功则认为此次Rebalance成功(这个过程跟Replication Controller非常类似)。具体方案将在后文中详细阐述。

Low Level Consumer

使用Low Level Consumer (Simple Consumer)的主要原因是,用户希望比Consumer Group更好的控制数据的消费。比如:

  • 同一条消息读多次
  • 只读取某个Topic的部分Partition
  • 管理事务,从而确保每条消息被处理一次,且仅被处理一次

与Consumer Group相比,Low Level Consumer要求用户做大量的额外工作。

  • 必须在应用程序中跟踪offset,从而确定下一条应该消费哪条消息
  • 应用程序需要通过程序获知每个Partition的Leader是谁
  • 必须处理Leader的变化

使用Low Level Consumer的一般流程如下

  • 查找到一个“活着”的Broker,并且找出每个Partition的Leader
  • 找出每个Partition的Follower
  • 定义好请求,该请求应该能描述应用程序需要哪些数据
  • Fetch数据
  • 识别Leader的变化,并对之作出必要的响应

Consumer重新设计

根据社区社区wiki,Kafka在0.9.*版本中,重新设计Consumer可能是最重要的Feature之一。本节会根据社区wiki介绍Kafka 0.9.*中对Consumer可能的设计方向及思路。

设计方向

简化消费者客户端

部分用户希望开发和使用non-java的客户端。现阶段使用non-java发SimpleConsumer比较方便,但想开发High Level Consumer并不容易。因为High Level Consumer需要实现一些复杂但必不可少的失败探测和Rebalance。如果能将消费者客户端更精简,使依赖最小化,将会极大的方便non- java用户实现自己的Consumer。

中心Coordinator

如上文所述,当前版本的High Level Consumer存在Herd Effect和Split Brain的问题。如果将失败探测和Rebalance的逻辑放到一个高可用的中心Coordinator,那么这两个问题即可解决。同时还可大大减少 Zookeeper的负载,有利于Kafka Broker的Scale Out。

允许手工管理offset

一些系统希望以特定的时间间隔在自定义的数据库中管理Offset。这就要求Consumer能获取到每条消息的metadata,例如 Topic,Partition,Offset,同时还需要在Consumer启动时得到每个Partition的Offset。实现这些,需要提供新的 Consumer API。同时有个问题不得不考虑,即是否允许Consumer手工管理部分Topic的Offset,而让Kafka自动通过Zookeeper管理其它 Topic的Offset。一个可能的选项是让每个Consumer只能选取1种Offset管理机制,这可极大的简化Consumer API的设计和实现。

Rebalance后触发用户指定的回调

一些应用可能会在内存中为每个Partition维护一些状态,Rebalance时,它们可能需要将该状态持久化。因此该需求希望支持用户实现并指定一些可插拔的并在Rebalance时触发的回调。如果用户使用手动的Offset管理,那该需求可方便得由用户实现,而如果用户希望使用Kafka提供的自动Offset管理,则需要Kafka提供该回调机制。

非阻塞式Consumer API

该需求源于那些实现高层流处理操作,如filter by, group by, join等,的系统。现阶段的阻塞式Consumer几乎不可能实现Join操作。

如何通过中心Coordinator实现Rebalance

成功Rebalance的结果是,被订阅的所有Topic的每一个Partition将会被Consumer Group内的一个(有且仅有一个)Consumer拥有。每一个Broker将被选举为某些Consumer Group的Coordinator。某个Cosnumer Group的Coordinator负责在该Consumer Group的成员变化或者所订阅的Topic的Partititon变化时协调Rebalance操作。

Consumer

1) Consumer启动时,先向Broker列表中的任意一个Broker发送ConsumerMetadataRequest,并通过 ConsumerMetadataResponse获取它所在Group的Coordinator信息。ConsumerMetadataRequest 和ConsumerMetadataResponse的结构如下

ConsumerMetadataRequest
{
  GroupId                => String
}

ConsumerMetadataResponse
{
  ErrorCode              => int16
  Coordinator            => Broker
}

2)Consumer连接到Coordinator并发送 HeartbeatRequest,如果返回的HeartbeatResponse没有任何错误码,Consumer继续fetch数据。若其中包含 IllegalGeneration错误码,即说明Coordinator已经发起了Rebalance操作,此时Consumer停止fetch数据,commit offset,并发送JoinGroupRequest给它的Coordinator,并在JoinGroupResponse中获得它应该拥有的所有 Partition列表和它所属的Group的新的Generation ID。此时Rebalance完成,Consumer开始fetch数据。相应Request和Response结构如下

HeartbeatRequest
{
  GroupId                => String
  GroupGenerationId      => int32
  ConsumerId             => String
}

HeartbeatResponse
{
  ErrorCode              => int16
}

JoinGroupRequest
{
  GroupId                     => String
  SessionTimeout              => int32
  Topics                      => [String]
  ConsumerId                  => String
  PartitionAssignmentStrategy => String
}

JoinGroupResponse
{
  ErrorCode              => int16
  GroupGenerationId      => int32
  ConsumerId             => String
  PartitionsToOwn        => [TopicName [Partition]]
}
TopicName => String
Partition => int32

Consumer状态机

Down:Consumer停止工作

Start up & discover coordinator:Consumer检测其所在Group的Coordinator。一旦它检测到Coordinator,即向其发送JoinGroupRequest。

Part of a group:该状态下,Consumer已经是该Group的成员,并周期性发送HeartbeatRequest。如 HeartbeatResponse包含IllegalGeneration错误码,则转换到Stopped Consumption状态。若连接丢失,HeartbeatResponse包含NotCoordinatorForGroup错误码,则转换到 Rediscover coordinator状态。

Rediscover coordinator:该状态下,Consumer不停止消费而是尝试通过发送ConsumerMetadataRequest来探测新的Coordinator,并且等待直到获得无错误码的响应。

Stopped consumption:该状态下,Consumer停止消费并提交offset,直到它再次加入Group。

故障检测机制

Consumer成功加入Group后,Consumer和相应的Coordinator同时开始故障探测程序。Consumer向 Coordinator发起周期性的Heartbeat(HeartbeatRequest)并等待响应,该周期为 session.timeout.ms/heartbeat.frequency。若Consumer在session.timeout.ms内未收到 HeartbeatResponse,或者发现相应的Socket channel断开,它即认为Coordinator已宕机并启动Coordinator探测程序。若Coordinator在 session.timeout.ms内没有收到一次HeartbeatRequest,则它将该Consumer标记为宕机状态并为其所在Group触发一次Rebalance操作。

Coordinator Failover过程中,Consumer可能会在新的Coordinator完成Failover过程之前或之后发现新的Coordinator并向其发送HeatbeatRequest。对于后者,新的Cooodinator可能拒绝该请求,致使该Consumer重新探测Coordinator并发起新的连接请求。如果该Consumer向新的Coordinator发送连接请求太晚,新的Coordinator可能已经在此之前将其标记为宕机状态而将之视为新加入的Consumer并触发一次Rebalance操作。

Coordinator

1)稳定状态下,Coordinator通过上述故障探测机制跟踪其所管理的每个Group下的每个Consumer的健康状态。

2)刚启动时或选举完成后,Coordinator从Zookeeper读取它所管理的Group列表及这些Group的成员列表。如果没有获取到Group成员信息,它不会做任何事情直到某个Group中有成员注册进来。

3)在Coordinator完成加载其管理的Group列表及其相应的成员信息之前,它将为 HeartbeatRequest,OffsetCommitRequest和JoinGroupRequests返回 CoordinatorStartupNotComplete错误码。此时,Consumer会重新发送请求。

4)Coordinator会跟踪被其所管理的任何Consumer Group注册的Topic的Partition的变化,并为该变化触发Rebalance操作。创建新的Topic也可能触发Rebalance,因为 Consumer可以在Topic被创建之前就已经订阅它了。

Coordinator发起Rebalance操作流程如下所示。

Coordinator状态机

Down:Coordinator不再担任之前负责的Consumer Group的Coordinator

Catch up:该状态下,Coordinator竞选成功,但还未能做好服务相应请求的准备。

Ready:该状态下,新竞选出来的Coordinator已经完成从Zookeeper中加载它所负责管理的所有Group的metadata,并可开始接收相应的请求。

Prepare for rebalance:该状态下,Coordinator在所有HeartbeatResponse中返回IllegalGeneration错误码,并等待所有Consumer向其发送JoinGroupRequest后转到Rebalancing状态。

Rebalancing:该状态下,Coordinator已经收到了JoinGroupRequest请求,并增加其Group Generation ID,分配Consumer ID,分配Partition。Rebalance成功后,它会等待接收包含新的Consumer Generation ID的HeartbeatRequest,并转至Ready状态。

Coordinator Failover

如前文所述,Rebalance操作需要经历如下几个阶段

1)Topic/Partition的改变或者新Consumer的加入或者已有Consumer停止,触发Coordinator注册在Zookeeper上的watch,Coordinator收到通知准备发起Rebalance操作。

2)Coordinator通过在HeartbeatResponse中返回IllegalGeneration错误码发起Rebalance操作。

3)Consumer发送JoinGroupRequest

4)Coordinator在Zookeeper中增加Group的Generation ID并将新的Partition分配情况写入Zookeeper

5)Coordinator发送JoinGroupResponse

在这个过程中的每个阶段,Coordinator都可能出现故障。下面给出Rebalance不同阶段中Coordinator的Failover处理方式。

1)如果Coordinator的故障发生在第一阶段,即它收到Notification并未来得及作出响应,则新的Coordinator将从 Zookeeper读取Group的metadata,包含这些Group订阅的Topic列表和之前的Partition分配。如果某个Group所订阅的Topic数或者某个Topic的Partition数与之前的Partition分配不一致,亦或者某个Group连接到新的 Coordinator的Consumer数与之前Partition分配中的不一致,新的Coordinator会发起Rebalance操作。

2)如果失败发生在阶段2,它可能对部分而非全部Consumer发出带错误码的HeartbeatResponse。与第上面第一种情况一样,新的 Coordinator会检测到Rebalance的必要性并发起一次Rebalance操作。如果Rebalance是由Consumer的失败所触发并且Cosnumer在Coordinator的Failover完成前恢复,新的Coordinator不会为此发起新的Rebalance操作。

3)如果Failure发生在阶段3,新的Coordinator可能只收到部分而非全部Consumer的JoinGroupRequest。 Failover完成后,它可能收到部分Consumer的HeartRequest及另外部分Consumer的JoinGroupRequest。与第1种情况类似,它将发起新一轮的Rebalance操作。

4)如果Failure发生在阶段4,即它将新的Group Generation ID和Group成员信息写入Zookeeper后。新的Generation ID和Group成员信息以一个原子操作一次性写入Zookeeper。Failover完成后,Consumer会发送 HeartbeatRequest给新的Coordinator,并包含旧的Generation ID。此时新的Coordinator通过在HeartbeatResponse中返回IllegalGeneration错误码发起新的一轮 Rebalance。这也解释了为什么每次HeartbeatRequest中都需要包含Generation ID和Consumer ID。

5)如果Failure发生在阶段5,旧的Coordinator可能只向Group中的部分Consumer发送了 JoinGroupResponse。收到JoinGroupResponse的Consumer在下次向已经失效的Coordinator发送 HeartbeatRequest或者提交Offset时会检测到它已经失败。此时,它将检测新的Coordinator并向其发送带有新的 Generation ID 的HeartbeatRequest。而未收到JoinGroupResponse的Consumer将检测新的Coordinator并向其发送 JoinGroupRequest,这将促使新的Coordinator发起新一轮的Rebalance。

Kafka设计解析(五):Kafka Benchmark

性能测试及集群监控工具

Kafka提供了非常多有用的工具,如Kafka设计解析(三)- Kafka High Availability (下)中提到的运维类工具——Partition Reassign Tool,Preferred Replica Leader Election Tool,Replica Verification Tool,State Change Log Merge Tool。本章将介绍Kafka提供的性能测试工具,Metrics报告工具及Yahoo开源的Kafka Manager。

Kafka性能测试脚本

  • $KAFKA_HOME/bin/kafka-producer-perf-test.sh 该脚本被设计用于测试Kafka Producer的性能,主要输出4项指标,总共发送消息量(以MB为单位),每秒发送消息量(MB/second),发送消息总数,每秒发送消息数(records/second)。除了将测试结果输出到标准输出外,该脚本还提供CSV Reporter,即将结果以CSV文件的形式存储,便于在其它分析工具中使用该测试结果
  • $KAFKA_HOME/bin/kafka-consumer-perf-test.sh 该脚本用于测试Kafka Consumer的性能,测试指标与Producer性能测试脚本一样。

Kafka Metrics

Kafka使用Yammer Metrics来报告服务端和客户端的Metric信息。Yammer Metrics 3.1.0提供6种形式的Metrics收集——Meters,Gauges,Counters,Histograms,Timers,Health Checks。与此同时,Yammer Metrics将Metric的收集与报告(或者说发布)分离,可以根据需要自由组合。目前它支持的Reporter有Console Reporter,JMX Reporter,HTTP Reporter,CSV Reporter,SLF4J Reporter,Ganglia Reporter,Graphite Reporter。因此,Kafka也支持通过以上几种Reporter输出其Metrics信息。

使用JConsole查看单服务器Metrics

使用JConsole通过JMX,是在不安装其它工具(既然已经安装了Kafka,就肯定安装了Java,而JConsole是Java自带的工具)的情况下查看Kafka服务器Metrics的最简单最方便的方法之一。

首先必须通过为环境变量JMX_PORT设置有效值来启用Kafka的JMX Reporter。如export JMX_PORT=19797。然后即可使用JConsole通过上面设置的端口来访问某一台Kafka服务器来查看其Metrics信息,如下图所示。

使用JConsole的一个好处是不用安装额外的工具,缺点很明显,数据展示不够直观,数据组织形式不友好,更重要的是不能同时监控整个集群的Metrics。在上图中,在kafka.cluster->Partition->UnderReplicated->topic4下,只有2和5两个节点,这并非因为topic4只有这两个Partition的数据是处于复制状态的。事实上,topic4在该Broker上只有这2个Partition,其它Partition在其它Broker上,所以通过该服务器的JMX Reporter只看到了这两个Partition。

通过Kafka Manager查看整个集群的Metrics

Kafka Manager是Yahoo开源的Kafka管理工具。它支持如下功能:

  • 管理多个集群
  • 方便查看集群状态
  • 执行preferred replica election
  • 批量为多个Topic生成并执行Partition分配方案
  • 创建Topic
  • 删除Topic(只支持0.8.2及以上版本,同时要求在Broker中将delete.topic.enable设置为true)
  • 为已有Topic添加Partition
  • 更新Topic配置
  • 在Broker JMX Reporter开启的前提下,轮询Broker级别和Topic级别的Metrics
  • 监控Consumer Group及其消费状态
  • 支持添加和查看LogKafka

安装好Kafka Manager后,添加Cluster非常方便,只需指明该Cluster所使用的Zookeeper列表并指明Kafka版本即可,如下图所示。


Kafka Benchmark

这里要注意,此处添加Cluster是指添加一个已有的Kafka集群进入监控列表,而非通过Kafka Manager部署一个新的Kafka Cluster,这一点与Cloudera Manager不同。

Kafka的一个核心特性是高吞吐率,因此本文的测试重点是Kafka的吞吐率。

本文的测试共使用6台安装Red Hat 6.6的虚拟机,3台作为Broker,另外3台作为Producer或者Consumer。每台虚拟机配置如下:

  • CPU:8 vCPU, Intel(R) Xeon(R) CPU E5-2680 v2 @ 2.80GHz,2 Sockets,4 Cores per socket,1 Thread per core
  • 内存:16 GB
  • 磁盘:500 GB

开启Kafka JMX Reporter并使用19797端口,利用Kafka-Manager的JMX polling功能监控性能测试过程中的吞吐率。

本文主要测试如下四种场景,测试的指标主要是每秒多少兆字节数据,每秒多少条消息。

Producer Only

这组测试不使用任何Consumer,只启动Broker和Producer。

Producer Number VS. Throughput

实验条件:3个Broker,1个Topic,6个Partition,无Replication,异步模式,消息Payload为100字节。

测试项目:分别测试1,2,3个Producer时的吞吐量。

测试目标:如Kafka设计解析(一)- Kafka背景及架构介绍所介绍,多个Producer可同时向同一个Topic发送数据,在Broker负载饱和前,理论上Producer数量越多,集群每秒收到的消息量越大,并且呈线性增涨。本实验主要验证该特性。同时作为性能测试,本实验还将监控测试过程中单个Broker的CPU和内存使用情况

测试结果:使用不同个数Producer时的总吞吐率如下图所示

由上图可看出,单个Producer每秒可成功发送约128万条Payload为100字节的消息,并且随着Producer个数的提升,每秒总共发送的消息量线性提升,符合之前的分析。

性能测试过程中,Broker的CPU和内存使用情况如下图所示。

(点击放大图像)

由上图可知,在每秒接收约117万条消息(3个Producer总共每秒发送350万条消息,平均每个Broker每秒接收约117万条)的情况下,一个Broker的CPU使用量约为248%,内存使用量为601 MB。

Message Size VS. Throughput

实验条件:3个Broker,1个Topic,6个Partition,无Replication,异步模式,3个Producer。

测试项目:分别测试消息长度为10,20,40,60,80,100,150,200,400,800,1000,2000,5000,10000字节时的集群总吞吐量。

测试结果:不同消息长度时的集群总吞吐率如下图所示:

由上图可知,消息越长,每秒所能发送的消息数越少,而每秒所能发送的消息的量(MB)越大。另外,每条消息除了Payload外,还包含其它Metadata,所以每秒所发送的消息量比每秒发送的消息数乘以100字节大,而Payload越大,这些Metadata占比越小,同时发送时的批量发送的消息体积越大,越容易得到更高的每秒消息量(MB/s)。其它测试中使用的Payload为100字节,之所以使用这种短消息(相对短)只是为了测试相对比较差的情况下的Kafka吞吐率。

Partition Number VS. Throughput

实验条件:3个Broker,1个Topic,无Replication,异步模式,3个Producer,消息Payload为100字节。

测试项目:分别测试1到9个Partition时的吞吐量。

测试结果:不同Partition数量时的集群总吞吐率如下图所示:

由上图可知,当Partition数量小于Broker个数(3个)时,Partition数量越大,吞吐率越高,且呈线性提升。本文所有实验中,只启动3个Broker,而一个Partition只能存在于1个Broker上(不考虑Replication。即使有Replication,也只有其Leader接受读写请求),故当某个Topic只包含1个Partition时,实际只有1个Broker在为该Topic工作。如之前文章所讲,Kafka会将所有Partition均匀分布到所有Broker上,所以当只有2个Partition时,会有2个Broker为该Topic服务。3个Partition时同理会有3个Broker为该Topic服务。换言之,Partition数量小于等于3个时,越多的Partition代表越多的Broker为该Topic服务。如前几篇文章所述,不同Broker上的数据并行插入,这就解释了当Partition数量小于等于3个时,吞吐率随Partition数量的增加线性提升。

当Partition数量多于Broker个数时,总吞吐量并未有所提升,甚至还有所下降。可能的原因是,当Partition数量为4和5时,不同Broker上的Partition数量不同,而Producer会将数据均匀发送到各Partition上,这就造成各Broker的负载不同,不能最大化集群吞吐量。而上图中当Partition数量为Broker数量整数倍时吞吐量明显比其它情况高,也证实了这一点。

Replica Number VS. Throughput

实验条件:3个Broker,1个Topic,6个Partition,异步模式,3个Producer,消息Payload为100字节。

测试项目:分别测试1到3个Replica时的吞吐率。

测试结果:如下图所示:

由上图可知,随着Replica数量的增加,吞吐率随之下降。但吞吐率的下降并非线性下降,因为多个Follower的数据复制是并行进行的,而非串行进行。

Consumer Only

实验条件:3个Broker,1个Topic,6个Partition,无Replication,异步模式,消息Payload为100字节。

测试项目:分别测试1到3个Consumer时的集群总吞吐率。
测试结果:在集群中已有大量消息的情况下,使用1到3个Consumer时的集群总吞吐量如下图所示:

由上图可知,单个Consumer每秒可消费306万条消息,该数量远大于单个Producer每秒可消费的消息数量,这保证了在合理的配置下,消息可被及时处理。并且随着Consumer数量的增加,集群总吞吐量线性增加。

根据Kafka设计解析(四)- Kafka Consumer设计解析所述,多Consumer消费消息时以Partition为分配单位,当只有1个Consumer时,该Consumer需要同时从6个Partition拉取消息,该Consumer所在机器的I/O成为整个消费过程的瓶颈,而当Consumer个数增加至2个至3个时,多个Consumer同时从集群拉取消息,充分利用了集群的吞吐率。

Producer Consumer pair

实验条件:3个Broker,1个Topic,6个Partition,无Replication,异步模式,消息Payload为100字节。

测试项目:测试1个Producer和1个Consumer同时工作时Consumer所能消费到的消息量。

测试结果:1,215,613 records/second。


Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐