kafka内部实现原理

在这里插入图片描述

两种消费模式
  1. 点对点

点对点模型通常是一个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,

而不是将消息推送到客户端。这个模型的特点是发送到队列的消息被一个且只有一个接收者接收处理,即使有多个消息监听者也是如此。

  1. 发布/订阅

发布订阅模型则是一个基于推送的消息传送模型。发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态。

为什么需要kafka
  1. 解耦
    允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束
  2. 冗余
    消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
  3. 扩展性
    因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
  4. 峰值处理能力
    在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
  5. 可恢复性
    系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
  6. 顺序保证
    在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性)
  7. 缓冲
    有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
  8. 异步通信
    很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
什么是 Kafka

在流式计算中,Kafka一般用来缓存数据,Storm 通过消费 Kafka 的数据进行计算

  1. Apache Kafka 是一个开源消息系统,由 Scala 写成。是由 Apache 软件基金会开发的一个开源消息系统项目。
  2. Kafka 最初是由 LinkedIn公司开发,并于2011 年初开源。2012年 10 月从 ApacheIncubator 毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
  3. Kafka 是一个分布式消息队列。Kafka 对消息保存时根据 Topic 进行归类,发送消息者称为 Producer,消息接受者称为 Consumer,此外 kafka 集群有多个 kafka 实例组成,每个实例(server)称为 broker。
  4. 无论是 kafka 集群,还是 consumer 都依赖于 zookeeper 集群保存一些 meta 信息,
    来保证系统可用性。
kafka架构

在这里插入图片描述

在这里插入图片描述

  1. Producer :消息生产者,就是向 kafka broker 发消息的客户端;
  2. Consumer :消息消费者,向 kafka broker 取消息的客户端;
  3. Topic :可以理解为一个队列;
  4. Consumer Group(CG):这是kafka 用来实现一个topic 消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic 可以有多个CG。topic 的消息会复制(不是真的复制,是概念上的)到所有的 CG,但每个 partion 只会把消息发给该 CG 中的一个 consumer。如果需要实现广播,只要每个 consumer 有一个独立的 CG 就可以了。要实现单播只要所有的 consumer 在同一个 CG。用 CG 还可以将 consumer 进行自由的分组而不需要多次发送消息到不同的 topic;
  5. Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker
    可 以 容 纳 多 个 topic;
  6. Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上, 一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的 id(offset)。kafka 只保证按一个 partition 中的顺序将消息发给consumer,不保证一个 topic 的整体(多个partition 间)的顺序;
  7. Offset:kafka 的存储文件都是按照 offset.kafka 来命名,用offset 做名字的好处是方便查找。例如你想找位于 2049 的位置,只要找到 2048.kafka 的文件即可。当然thefirst offset 就是00000000000.kafka。
安装Zookeeper

Kafka使用Zookeeper来存储集群元数据以及消费者元数据,如下所示:
在这里插入图片描述
在本书(译者:也就是《Kafka:The Definitive Guide》)完成时,Kafka已经测试通过Zookeeper的3.4.6稳定版,推荐使用该版本的Zookeeper。

Zookeeper集群

Zookeeper集群的机器数一般都是奇数的(3、5等等),拥有3个节点的Zookeeper集群可以容忍1个节点故障,5个节点的集群可以容忍2个节点故障。关于Zookeeper集群节点数,推荐使用5个节点,只是因为当需要更改集群配置时可以一次重启一台机器而不干扰集群正常运行;不推荐使用超过7个节点的Zookeeper集群,因为一致性协议会导致集群性能下降。
配置Zookeeper集群时,配置中需要指出集群的所有机器,同时每台机器需要在data目录中包含一个myid文件来指明该机器的ID。下面是一个配置例子,其中集群机器分别为hadoop102,hadoop103,hadoop104:

tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=20
syncLimit=5
server.1=hadoop102:2888:3888
server.2=hadoop103:2888:3888
server.3=hadoop104:2888:3888

其中,initLimit是限制跟随者初始连接到群首的时间,syncLimt是限制跟随者落后于群首的时间跨度,这两个值都是以tickTime为单位的,例子中的initLimit为20 * 2000毫秒,也就是40秒。配置中也指明了集群机器的情况,机器的格式为server.X=hostname:peerPort:leaderPort,其中:

  • X:机器的ID,整数值;
  • hostname:机器的域名或IP;
  • peerPort:集群中机器相互通信的端口;
  • leaderPort:群首选举的端口;
Kafka 集群部署
  1. 解压安装包
[root@hadoop102 software]$ tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/   
  1. 修改解压后的文件名称
[root@hadoop102 module]$ mv kafka_2.11-0.11.0.0/ kafka
  1. 在/opt/module/kafka 目录下创建 logs 文件夹
[root@hadoop102 kafka]$ mkdir logs
  1. 修改配置文件
[root@hadoop102 kafka]$ cd config/ 
[root@hadoop102 config]$ vi server.properties

输入以下内容:
#broker 的全局唯一编号,不能重复
broker.id=0            
#删除 topic 功能使能
delete.topic.enable=true
#kafka 运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#配置连接 Zookeeper 集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
  1. 配置环境变量
[root@hadoop102 module]$ sudo vi /etc/profile
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka export
PATH=$PATH:$KAFKA_HOME/bin
[root@hadoop102 module]$ source /etc/profile
  1. 分发安装包
[root@hadoop102 module]$ xsync kafka/

注意:分发之后记得配置其他机器的环境变量
7. 分别在 hadoop103 和hadoop104上修改配置文件/opt/module/kafka/config/server.properties中的 broker.id=1、broker.id=2
注:broker.id 不得重复
8. 启动集群
依次在hadoop102、hadoop103、hadoop104 节点上启动 kafka

[root@hadoop102 kafka]$ bin/kafka-server-start.sh config/server.properties &
[root@hadoop103 kafka]$ bin/kafka-server-start.sh config/server.properties & 
[root@hadoop104 kafka]$ bin/kafka-server-start.sh config/server.properties &

//后台启动:
[root@hadoop000 kafka]# bin/kafka-server-start.sh -daemon config/server.properties &
  1. 关闭集群
[root@hadoop102 kafka]$ bin/kafka-server-stop.sh stop
[root@hadoop103 kafka]$ bin/kafka-server-stop.sh stop 
[root@hadoop104 kafka]$ bin/kafka-server-stop.sh stop
broker配置

上面的配置用来运行单机的Kafka已经足够了。Kafka中有很多参数可以调优,但大部分都可以使用默认参数,除非你有特殊的场景需求。

基本配置

当以集群形式来运行Kafka时有一些参数需要重新考虑,这些参数是Kafka的基本参数,如果以集群形式部署大部分需要做些修改。

broker.id:

每个Kafka broker都需要有一个整数标识,默认为0,但可以是任意值。需要注意的是,集群内broker的id必须不同。

port:

Kaka的监听端口。上面例子中使用9092作为监听端口,但可以修改成任意值。

zookeeper.connect:

Zookeeper连接的地址,例子中使用localhost的2181端口连接。但这个参数的完整格式是以逗号分隔的hostname:port/path字符串,其中:

  • hostname:Zookeeper的域名或IP地址;
  • port:Zookeeper的连接端口;
  • /path:Kafka的工作根目录,可选参数。
log.dirs

Kafka将消息持久化到磁盘,存放在这个参数指定的路径下。这个参数可以是逗号分隔的多个路径,如果指定超过1个路径,那么Kafka会基于最少使用的原则来实现分区日志均衡,但前提是同一个分区的日志放在相同的路径下。

num.recovery.threads.per.data.dir

Kafka使用一个线程池来处理目录的日志,这个线程池会用在:

  • 当成功启动时,用来打开每个分区的日志;
  • 当从故障恢复时,检查和恢复分区日志;
  • 当关闭时,优雅关闭日志。
    由于这个线程池只是在启动或关闭时使用,因此可以适当设置大一点。尤其是从故障恢复时,对于有大量日志的broker来说大的线程池可以并行处理,可以节省几个小时的时间!这个参数设置的是每个目录的线程数,因此总线程数与log.dirs有关。举个例子,假如设置num.recovery.threads.per.data.dir为8,log.dirs为3,那么总线程数量为24。
auto.create.topics.enable

这个参数默认为true,指明broker在如下场景会自动创建主题:

  • 生产者开始写消息到这个主题时;
  • 消费者开始从这个主题读消息时;
  • 当任何客户端查询这个主题的元信息时。

但在很多场景下这个特性可能并不需要,那么可以手动将这个参数设置为false。

默认主题参数

Kafka服务器配置指定了主题创建时的默认参数,当然我们也可以通过管理工具为每个主题来设置个性化参数值来覆盖默认值。下面来看看这些默认参数。

num.partitions

num.partitions参数决定了主题创建时的分区数,默认为1个分区。需要注意的是,主题的分区数只能增加而不能减少。分区机制使得主题可以均衡分布在Kafka集群,许多用户往往设置分区数等于broker数或者是broker数的几倍。下面是一些设置分区数的考虑因素:

  • 你希望主题的吞吐量是多少?
  • 你希望单个分区的吞吐量是多少?举个例子,假如消费者从分区读取消息写入数据库,而写入数据库上限为50MB/s,那么分区的上限可以限定在60MB/s。
  • 可以适当考虑生产者写入单个分区的速率。不过由于写入速度往往比读取处理速度快,因此瓶颈往往在读取并处理消息上。
  • 如果写入时是基于键值来决定分区的,那么应该考虑以后的量级而不仅仅当前的量级,因为扩容起来比较麻烦。
  • 考虑每个broker的分区数、磁盘容量和带宽。
  • 避免过度设计,因为更多的分区数会增加broker的内存和其他资源消耗,也增加群首选举的时间。

如果我们得出主题的预期吞吐以及分区的预期吞吐,那么可以得出分区数。举个例子,假如预期主题的吞吐为读写速率为1GB/s,而每个消费者的处理速度为50MB/s,那么至少需要20个分区。

log.segment.bytes

当写入消息时,消息会被追加到日志段文件中。如果日志段超过log.segment.bytes指定大小(默认为1G),那么会打开日志段进行追加。旧日志段关闭后,超过了一定时间会被过期删除。如果主题的写入速度很慢,那么这个参数最好做些调整。假如一个主题一天只有100MB的数据写入,假如log.segment.bytes默认1G,那么一个日志段需要10天才能写满并关闭;如果过期策略设置为604800000(也就是7天),那么一个日志段需要17天才能被删除。因为一个日志段需要10天写满,然后这个日志段需要等待7天才能保证它包含的所有消息过期。

log.segment.ms

这个参数也可以控制日志段文件的关闭,一旦设置,当日志段到达这个参数设置的时间限制,日志段会被关闭。此参数默认没有设置,也就是日志段只根据大小来关闭。

当使用log.segment.ms参数时,有一个场景需要注意,那就是如果有大量的分区而这些分区的日志段都没有到达指定大小,那么达到log.segment.ms时间时,这些分区的日志段会同时被关闭,可能会影响磁盘性能。

message.max.bytes

Kafka broker通过这个参数来限制消息大小,当生产者发送的消息超过这个大小时,发送消息会失败。注意这个参数限制的是发送者发送到broker的消息大小,如果发送前消息超过此阈值,但是压缩后消息小于此阈值,那么发送仍然会成功。

这个值默认为1M,我们可以调整这个值,但需要注意的是更大的消息会导致broker处理消息以及持久化消息性能下降。另外,Kafka中有另外一个参数fetch.message.max.bytes来限制消费者获取的消息大小,fetch.message.max.bytes小于message.max.bytes,那么可能会导致消费者无法消费消息而被卡住。

硬件考虑因素

选择硬件是技术,但更是艺术。Kafka本身不要求硬件,但是如果追求极致的性能,那么有几个因素需要考虑:磁盘吞吐及容量、内存、CPU、网络带宽等等。在实际情况中,我们需要考虑哪部分是瓶颈,然后根据预算重点优化。

磁盘吞吐

这个指标直接影响到生产者写入消息的性能。生产消息时,消息需要至少在一个broker上持久化,这样消费者才能读取。磁盘写入越快,写入延迟越小。

对于磁盘,一般会从传统磁盘与固态硬盘(solid-state disk)中选择。固态硬盘性能最好,而传统磁盘则更加便宜、容量更大。如果使用传统磁盘,我们可以对一个broker挂载多个磁盘,或者对于磁盘使用RAID来进行加速。

磁盘容量

磁盘容量需要根据保留的消息量得出。如果一个broker期望每天可写入1TB消息,而消息持久化策略为7天,那么1个broker需要至少7TB的磁盘容量。另外,最好考虑10%的额外容量来存储其他文件或者留点buffer。

当评估Kafka集群大小以及判断何时需要扩容时,磁盘容量是一个重要的参考因素。当评估Kafka集群容量或者考虑扩容时,磁盘容量是一个重要的参考因素。对于一个Kafka集群,我们可以设置一个主题多个分区,这样当一个broker磁盘不足时,我们可以通过增加broker并分配分区来解决问题。另外,磁盘容量也受集群的冗余(replication)策略影响。

内存

通常来说,消费者消费速度与生产者生产速度相当,也就是说消费者从分区末尾读取。在这种情况下,如果消费者可以从系统的页缓存(page cache)读取而不是磁盘,那么消费性能可以大大提升。因此,内存越大,能够用来做页缓存(page cache)的容量也就越多,消费者的性能越好。

Kafka本身不需要配置太多的JVM内存,一般5GB的堆就足够了。另外,不建议Kafka与其他应用同时部署,因为其他应用会分享系统的页缓存而导致Kafka消费性能下降。

网络带宽

当评估集群容量时,网络带宽也是一个重要的参考因素,网络带宽评估会稍微复杂点。首先,网络带宽分为inbound带宽与outbound带宽,如果一个生产者以1MB/s速度写入,但可能有多个消费者,那么outbound带宽可能是inbound的数倍。另外,集群的复制和镜像也会导致outbound带宽上升。网络带宽需要慎重考虑,否则可能会导致集群的复制落后,从而导致集群状态不稳定。

CPU

相对于磁盘与内存来说,CPU不是一个重要的考虑因素。在消息压缩的场景下,Kafka broker需要对于批量消息进行解压缩,然后验证校验和(checksum)以及赋予消息位移(offset),最后再压缩消息写入磁盘。这是CPU占用最多的地方。再提醒下,在选择硬件时,CPU不是一个主要的考虑因素。

Kafka集群

单机版的Kafka对于开发已经足够了,但如果在线上环境,我们通常以集群形态部署。下图是一个集群的例子:
在这里插入图片描述

我们需要多少个broker?

在评估集群broker数量时,第一个需要考虑的是集群的磁盘容量与单个broker的磁盘容量。如果集群期望保留10TB的数据,而单个broker可以存储2TB,那么我们需要5个broker。另外,使用复制的话会导致磁盘容量至少上升100%(准确的数字根据复制策略来判断),这意味着我们至少需要10个broker。

然后,我们需要考虑集群处理请求的速度。我们需要知道网络带宽是多少?是否能支持多个消费者?举个例子,如果在只有一个消费者的前提下,broker的网络带宽已经占用70%,那么增加一个消费者会导致这两个消费者不能及时消费消息,除非我们再增加一个broker。如果集群使用复制的话,那么相当于再增加一个数据消费者。磁盘读写速率与机器内存大小也会影响请求处理速度,因此实际情况中由于这两个因素而扩容的例子也不少。

broker配置

对于broker来说,加入Kafka集群的参数配置只有两项:

  • zookeeper.connect:这个参数指定集群存储元数据的zookeeper集群。
  • broker.id:在相同Kafka集群内,broker.id不能相同
系统调优

对于Linux来说,大部分发行版的默认参数已经能满足需求了,但如果想追求极致性能,我们可以重点关注虚拟内存(virtual memory)和网络子系统(network subsystem),这两个参数一般可以在/etc/sysctl.conf文件中配置。

虚拟内存

一般来说,Linux系统会根据系统负载来自动调整虚拟内存,但在Kafka的使用场景下,我们可以适当调整交换空间(swap space)和内存脏页(dirty memory page)的处理策略。

处于性能考虑,大部分应用(尤其是高吞吐应用)会尽可能避免系统交换(swapping),Kafka也不例外。一种避免交换的方法是,不配置交换空间。使用交换空间不是必须的,它只是避免了在内存不足的情况下系统杀死进程而已。我们可以设置vm.swappiness为一个非常小的值(例如1),通过这个参数我们可以控制操作系统倾向于减少页缓存,而不是使用swap。为什么不设置为1?以前这个值通常设置为0,意味着“不使用swap,除非内存不足”;但后来部分Linux发行版的内核修改了含义,变成了“任何情况下都不使用swap”。因此这里建议设置为1。

另外,我们也可以调整操作系统刷新脏页到磁盘的策略。首先,我们可以降低操作系统后台刷新脏页到磁盘的阈值。这个由vm.dirty_background_ratio来控制,默认为10,表示为脏页与总内存容量的比例。我们可以将其调整为5,这个调整通常不会有太大问题,因此Kafka一般使用SSD或者其他磁盘I/O优化方案(例如RAID),磁盘I/O速度非常快。但不应该设置成0,这样内核会一直刷新脏页。然后,我们可以把写入而导致同步刷新的阈值调高,这个值由vm.dirty_ratio控制,默认为20,同样表示为脏页与总内存容量的比例。将vm.dirty_ratio调整到60到80是一个可以考虑的方案。调大vm.dirty_ratio有两个比较小的风险,一个是脏页增多导致宕机数据损失可能性增加,另外则是一旦触发同步刷新,I/O停顿增加。对于宕机数据损失,我们可以使用集群复制策略来避免。

在调整完参数后,我们可以通过/proc/vmstat来观察系统在高负载情况下脏页的数量:

# cat /proc/vmstat | egrep "dirty|writeback"
nr_dirty 3875
nr_writeback 29
nr_writeback_temp 0
#
磁盘

在优化磁盘I/O时,选择完磁盘硬件之后,磁盘的文件系统也是一个重要的考虑因素。通常,我们使用EXT4(Fourth Extended File System)或者XFS(Extents File System)。其中,XFS成为了很多Linux发行版的文件系统,原因在于它性能优于EXT4而且不需要特别的参数调优。

但无论是选择哪个文件系统,建议在挂载磁盘时设置noatime。Linux中文件的元数据包含ctime(创建时间)、mtime(最近修改时间)和atime(最近访问时间),而在Kafka的使用场景下,atime是没有用到的,但每次读取时却要触发磁盘写入。

网络

调整Linux的网络栈对于高吞吐的应用来说是非常常见的,下面来看下一些常见的参数调优。

第一个调整socket的接收和发送缓冲区,通过调大这两个缓冲区,我们可以提高大量数据传输的性能。首先,net.core.wmem_default和net.core.rmem_default是socket读/写的默认大小,我们可以调整为131072,也就是128K;然后net.core.wmem_max和net.core.rmem_max是socket读/写缓冲区的最大值,我们可以调整为2097152,也就是2M。对于TCP的socket来说,我们需要额外设置两个参数,它们是net.ipv4.tcp_wmem和net.ipv4.tcp_rmem,这两个参数的值都是以空格分隔的三个整数,分别是最小、默认和最大值。其中,最大值不能超过net.core.wmem_max和net.core.rmem_max指定的值。我们可以将这两个TCP socket参数设置为“4096 65536 2048000”,也就是最小4K、默认64K、最大2M。

还有其他的一些网络参数可以调优:

  • 设置net.ipv4.tcp_window_scaling为1,这样可以使得传输数据更有效率,并在broker侧缓存数据;
  • 调大net.ipv4.tcp_max_syn_backlog(默认为1024),这样可以允许更多并发连接等待被接受(accept);
  • 调大net.core.netdev_max_backlog(默认为1000),这样可以允许在流量高峰时更多数据包在内核中排队等待处理。
线上考虑因素

当Kafka上线时,有一些额外因素需要考虑。

垃圾收集参数

对JVM垃圾收集参数调优,往往需要从应用本身特点出发,需要长时间的观察、修改参数并观察验证。有了G1垃圾收集器(Garbage first),垃圾收集参数调优工作简单了很多。G1会根据负载情况自适应,并在应用声明周期内提供始终一致的GC停顿时间,其实现内部会将整个堆分成多个区域,这样在垃圾收集时不需要对整个堆进行垃圾收集。

下面是G1两个比较重要的参数:

  • MaxGCPauseMills:指定垃圾回收的最大停顿时间,默认为200ms。G1会尽可能在保证垃圾回收时不超过这个阈值,但是在需要的情况下停顿时间会超过这个时间。
  • InitiatingHeapOccupancyPercent:指定多大的堆使用比例会触发垃圾收集,默认为45%。

Kafka本身使用内存非常高效,因此我们可以将这两个参数设置得更小。在64G机器内存,5G的Kafka内存情况下,我们可以设置MaxGCPauseMills为20ms,InitiatingHeapOccupancyPercent为35。

当前的Kafka启动脚本使用的是CMS垃圾收集器,我们可以修改使用G1:

# export JAVA_HOME=/usr/java/jdk1.8.0_51
# export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMills=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:DisableExplicitGC -Djava.awt.headless=true"
# # /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
#
集群的机器分布

Kafka在分配分区到不同的broker时并没有考虑机架、网络远近等因素,因此很可能将一个分区的所有副本都分配到同一个机架上,这样当这个机架断电或故障时,这个分区就会丢失。因此最佳实践是,集群的broker都分布在不同的机架上,至少不共用一个单点基础设施(例如电源、交换机等等)。

Zookeeper

Kafka使用Zookeeper存储broker、主题及分区等集群元信息,而这些信息极少修改,因此Kafka与Zookeeper通信也很少。在实际情况中,往往有多个Kafka集群都使用同一个Zookeeper集群。

但是,对于消费者与Zookeeper来说,则不然。消费者在读取消息时,需要不断提交消费位移,而这个数据可以存放在Zookeeper或者Kafka中。对于拥有大量消费者的Kafka集群来说,如果使用Zookeeper来存储消费位移,那么会对Zookeeper造成相当大的压力。我们可以调整提交位移的间隔(例如设置为1分钟)来减轻Zookeeper的压力,但最好是使用Kafka来存储位移信息。

另外,Kafka依赖的Zookeeper集群不应该向其他应用来提供服务,因为这样容易因为其他应用的原因而导致Kafka集群不可用。

kafka的命令行操作
  1. 查看当前服务器的所有topic
[root@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop000:2181 --list
  1. 创建topic
[root@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop000:2181 --create --replication-factor 1 --partitions 3 --topic message

选项说明:
--topic 定义 topic 名
--replication-factor   定义副本数
--partitions 定义分区数
  1. 删除topic
[root@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first

需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除或者直接重启。
  1. 发送消息
[root@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop000:9092 --topic first
>hello world
  1. 消费消息
[root@hadoop102 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop000:2181 --from-beginning --topic first 

新命令:
[root@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop000:2181 --from-beginning --topic first

--from-beginning:会把 first 主题中以往所有的数据都读取出来。根据业务场景选择是
否增加该配置
  1. 查看某个topic详情
[root@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first

Kafka生产者:写消息到Kafka

概要

当我们发送消息之前,先问几个问题:每条消息都是很关键且不能容忍丢失么?偶尔重复消息可以么?我们关注的是消息延迟还是写入消息的吞吐量?

举个例子,有一个信用卡交易处理系统,当交易发生时会发送一条消息到Kafka,另一个服务来读取消息并根据规则引擎来检查交易是否通过,将结果通过Kafka返回。对于这样的业务,消息既不能丢失也不能重复,由于交易量大因此吞吐量需要尽可能大,延迟可以稍微高一点。

再举个例子,假如我们需要收集用户在网页上的点击数据,对于这样的场景,少量消息丢失或者重复是可以容忍的,延迟多大都不重要只要不影响用户体验,吞吐则根据实时用户数来决定。

不同的业务需要使用不同的写入方式和配置。后面我们将会讨论这些API,现在先看下生产者写消息的基本流程:
在这里插入图片描述
流程如下:

  1. 首先,我们需要创建一个ProducerRecord,这个对象需要包含消息的主题(topic)和值(value),可以选择性指定一个键值(key)或者分区(partition)。
  2. 发送消息时,生产者会对键值和值序列化成字节数组,然后发送到分配器(partitioner)。
  3. 如果我们指定了分区,那么分配器返回该分区即可;否则,分配器将会基于键值来选择一个分区并返回。
  4. 选择完分区后,生产者知道了消息所属的主题和分区,它将这条记录添加到相同主题和分区的批量消息中,另一个线程负责发送这些批量消息到对应的Kafka broker。
  5. 当broker接收到消息后,如果成功写入则返回一个包含消息的主题、分区及位移的RecordMetadata对象,否则返回异常。
  6. 生产者接收到结果后,对于异常可能会进行重试。
创建Kafka生产者

创建Kafka生产者有三个基本属性:

  • bootstrap.servers:属性值是一个host:port的broker列表。这个属性指定了生产者建立初始连接的broker列表,这个列表不需要包含所有的broker,因为生产者建立初始连接后会从相应的broker获取到集群信息。但建议指定至少包含两个broker,这样一个broker宕机后生产者可以连接到另一个broker。
  • key.serializer:属性值是类的名称。这个属性指定了用来序列化键值(key)的类。Kafka broker只接受字节数组,但生产者的发送消息接口允许发送任何的Java对象,因此需要将这些对象序列化成字节数组。key.serializer指定的类需要实现org.apache.kafka.common.serialization.Serializer接口,Kafka客户端包中包含了几个默认实现,例如ByteArraySerializer、StringSerializer和IntegerSerializer。
  • value.serializer:属性值是类的名称。这个属性指定了用来序列化消息记录的类,与key.serializer差不多。

下面是一个样例代码:

private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer<String, String>(kafkaProps);

创建完生产者后,我们可以发送消息。Kafka中有三种发送消息的方式:

  • 只发不管结果(fire-and-forget):只调用接口发送消息到Kafka服务器,但不管成功写入与否。由于Kafka是高可用的,因此大部分情况下消息都会写入,但在异常情况下会丢消息。
  • 同步发送(Synchronous send):调用send()方法返回一个Future对象,我们可以使用它的get()方法来判断消息发送成功与否。
  • 异步发送(Asynchronous send):调用send()时提供一个回调方法,当接收到broker结果后回调此方法。

本章的例子都是单线程发送的,但生产者对象是线程安全的,它支持多线程发送消息来提高吞吐。需要的话,我们可以使用多个生产者对象来进一步提高吞吐。

发送消息到Kafka

最简单的发送消息方式如下:

ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry", "Precision Products", "France");

try {
  producer.send(record);
} catch (Exception e) {
  e.printStackTrace();
}

这里做了如下几件事:

  1. 我们创建了一个ProducerRecord,并且指定了主题以及消息的key/value。主题总是字符串类型的,但key/value则可以是任意类型,在本例中也是字符串。需要注意的是,这里的key/value的类型需要与serializer和生产者的类型匹配。
  2. 使用send()方法来发送消息,该方法会返回一个RecordMetadata的Future对象,但由于我们没有跟踪Future对象,因此并不知道发送结果。如前所述,这种方式可能会丢失消息。
  3. 虽然我们忽略了发送消息到broker的异常,但是我们调用send()方法时仍然可能会遇到一些异常,例如序列化异常、发送缓冲区溢出异常等等。
同步发送消息

同步发送方式可以简单修改如下:

ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry", "Precision Products", "France");

try {
  producer.send(record).get();
} catch (Exception e) {
  e.printStackTrace();
}

注意到,这里使用了Future.get()来获取发送结果,如果发送消息失败则会抛出异常,否则返回一个RecordMetadata对象。发送失败异常包含:1)broker返回不可恢复异常,生产者直接抛出该异常;2)对于broker其他异常,生产者会进行重试,如果重试超过一定次数仍不成功则抛出异常。

可恢复异常指的是,如果生产者进行重试可能会成功,例如连接异常;不可恢复异常则是进行重试也不会成功的异常,例如消息内容过大。

异步发送消息

首先了解下什么场景下需要异步发送消息。假如生产者与broker之间的网络延时为10ms,我们发送100条消息,发送每条消息都等待结果,那么需要1秒的时间。而如果我们采用异步的方式,几乎没有任何耗时,而且我们还可以通过回调知道消息的发送结果。
异步发送消息的样例如下:

private class DemoProducerCallback implements Callback {
  @Override
  public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    if (e != null) {
	  e.printStackTrace();
	}
  }
}

ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry", "Precision Products", "France");

producer.send(record, new DemoProducerCallback());

异步回调的类需要实现org.apache.kafka.clients.producer.Callback接口,这个接口只有一个onCompletion方法。当Kafka返回异常时,异常值不为null,代码中只是简单的打印,但我们可以采取其他处理方式。

生产者的配置

上面我们只配置了bootstrap.servers和序列化类,其实生产者还有很多配置,上面只是使用了默认值。下面来看下这些配置参数。

acks

acks控制多少个副本必须写入消息后生产者才能认为写入成功,这个参数对消息丢失可能性有很大影响。这个参数有三种取值:

  • acks=0:生产者把消息发送到broker即认为成功,不等待broker的处理结果。这种方式的吞吐最高,但也是最容易丢失消息的。
  • acks=1:生产者会在该分区的群首(leader)写入消息并返回成功后,认为消息发送成功。如果群首写入消息失败,生产者会收到错误响应并进行重试。这种方式能够一定程度避免消息丢失,但如果群首宕机时该消息没有复制到其他副本,那么该消息还是会丢失。另外,如果我们使用同步方式来发送,延迟会比前一种方式大大增加(至少增加一个网络往返时间);如果使用异步方式,应用感知不到延迟,吞吐量则会受异步正在发送中的数量限制。
  • acks=all:生产者会等待所有副本成功写入该消息,这种方式是最安全的,能够保证消息不丢失,但是延迟也是最大的。
buffer.memory

这个参数设置生产者缓冲发送的消息的内存大小,如果应用调用send方法的速度大于生产者发送的速度,那么调用会阻塞或者抛出异常,具体行为取决于block.on.buffer.full(这个参数在0.9.0.0版本被max.block.ms代替,允许抛出异常前等待一定时间)参数。

compresstion.type

默认情况下消息是不压缩的,这个参数可以指定使用消息压缩,参数可以取值为snappy、gzip或者lz4。snappy压缩算法由Google研发,这种算法在性能和压缩比取得比较好的平衡;相比之下,gzip消耗更多的CPU资源,但是压缩效果也是最好的。通过使用压缩,我们可以节省网络带宽和Kafka存储成本。

retries

当生产者发送消息收到一个可恢复异常时,会进行重试,这个参数指定了重试的次数。在实际情况中,这个参数需要结合retry.backoff.ms(重试等待间隔)来使用,建议总的重试时间比集群重新选举群首的时间长,这样可以避免生产者过早结束重试导致失败。

batch.size

当多条消息发送到一个分区时,生产者会进行批量发送,这个参数指定了批量消息的大小上限(以字节为单位)。当批量消息达到这个大小时,生产者会一起发送到broker;但即使没有达到这个大小,生产者也会有定时机制来发送消息,避免消息延迟过大。

linger.ms

这个参数指定生产者在发送批量消息前等待的时间,当设置此参数后,即便没有达到批量消息的指定大小,到达时间后生产者也会发送批量消息到broker。默认情况下,生产者的发送消息线程只要空闲了就会发送消息,即便只有一条消息。设置这个参数后,发送线程会等待一定的时间,这样可以批量发送消息增加吞吐量,但同时也会增加延迟。

client.id

这个参数可以是任意字符串,它是broker用来识别消息是来自哪个客户端的。在broker进行打印日志、衡量指标或者配额限制时会用到。

max.in.flight.requests.per.connection

这个参数指定生产者可以发送多少消息到broker并且等待响应,设置此参数较高的值可以提高吞吐量,但同时也会增加内存消耗。另外,如果设置过高反而会降低吞吐量,因为批量消息效率降低。设置为1,可以保证发送到broker的顺序和调用send方法顺序一致,即便出现失败重试的情况也是如此。

timeout.ms, request.timeout.ms, metadata.fetch.timeout.ms

这些参数控制生产者等待broker的响应时间。request.timeout.ms指定发送数据的等待响应时间,metadata.fetch.timeout.ms指定获取元数据(例如获取分区的群首信息)的等待响应时间。timeout.ms则指定broker在返回结果前等待其他副本(与acks参数相关)响应的时间,如果时间到了但其他副本没有响应结果,则返回消息写入失败。

max.block.ms

这个参数指定应用调用send方法或者获取元数据方法(例如partitionFor)时的阻塞时间,超过此时间则抛出timeout异常。

max.request.size

这个参数限制生产者发送数据包的大小,数据包的大小与消息的大小、消息数相关。如果我们指定了最大数据包大小为1M,那么最大的消息大小为1M,或者能够最多批量发送1000条消息大小为1K的消息。另外,broker也有message.max.bytes参数来控制接收的数据包大小。在实际中,建议这些参数值是匹配的,避免生产者发送了超过broker限定的数据大小。

receive.buffer.bytes, send.buffer.bytes

这两个参数设置用来发送/接收数据的TCP连接的缓冲区,如果设置为-1则使用操作系统自身的默认值。如果生产者与broker在不同的数据中心,建议提高这个值,因为不同数据中心往往延迟比较大。

最后讨论下顺序保证。Kafka保证分区的顺序,也就是说,如果生产者以一定的顺序发送消息到Kafka的某个分区,那么Kafka在分区内部保持此顺序,而且消费者也按照同样的顺序消费。但是,应用调用send方法的顺序和实际发送消息的顺序不一定是一致的。举个例子,如果retries参数不为0,而max.in.flights.requests.per.session参数大于1,那么有可能第一个批量消息写入失败,但是第二个批量消息写入成功,然后第一个批量消息重试写入成功,那么这个顺序乱序的。因此,如果需要保证消息顺序,建议设置max.in.flights.requests.per.session为1,这样可以在第一个批量消息发送失败重试时,第二个批量消息需要等待。

序列化

上面提到了Kafka自带的序列化类,现在来看下如何使用其他的序列化策略。

自定义序列化

如果我们发送的消息不是整数或者字符串时,我们需要自定义序列化策略或者使用通用的Avro、Thrift或者Protobuf这些序列化方案。下面来看下如何使用自定义的序列化方案,以及存在的问题。
假如我们要发送的消息对象是这么一个Customer:

public class Customer {
    private int customerID;
    private String customerName;
    
    public Customer(int ID, String name) {
        this.customerID = ID;
        this.customerName = name;
    }
    public int getID() {
        return customerID;
    }
    public String getName() {
        return customerName;
    } 
}

那么,自定义的序列化类实现样例如下:

import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;

public class CustomerSerializer implements Serializer<Customer> {
    @Override
    public void configure(Map configs, boolean isKey) {
        // nothing to configure
    }

    @Override
    /**
     We are serializing Customer as:
     4 byte int representing customerId
     4 byte int representing length of customerName in UTF-8 bytes (0 if name is
     Null)
     N bytes representing customerName in UTF-8
     */
    public byte[] serialize(String topic, Customer data) {
        try {
            byte[] serializedName;
            int stringSize;
            if (data == null)
                return null;
            else {
                if (data.getName() != null) {
                    serializeName = data.getName().getBytes("UTF-8");
                    stringSize = serializedName.length;
                } else {
                    serializedName = new byte[0];
                    stringSize = 0;
                }
            }
            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
            buffer.putInt(data.getID());
            buffer.putInt(stringSize);
            buffer.put(serializedName);
            return buffer.array();
        } catch (Exception e) {
            throw new SerializationException("Error when serializing Customer to byte[] " + e);
        } 
    }

    @Override
    public void close() {
        // nothing to close
    } 
}

我们将Customer的ID和名字进行了序列化,通过这个序列化对象,我们可以发送Customer的对象消息。但这样的序列化存在很多问题,比如想要将ID升级为Long型或者增加新的Customer域时,我们需要兼容新老消息。尤其是公司内多个团队同时消费Customer数据时,他们需要同时修改代码进行兼容。
因此,建议使用JSON、Apache Avro、Thrift或者Protobuf这些成熟的序列化/反序列化方案。下面来看下如何使用Avro来进行序列化。

使用Avro序列化

Apache Avro是一个语言无关的序列化方案,使用Avro的数据使用语言无关的结构来描述,例如JSON。Avro一般序列化成字节文件,当然也可以序列化成JSON形式。Kafka使用Avro的好处是,当写入消息需要升级协议时,消费者读取消息可以不受影响。

例如,原始的协议可能是这样的:

{
  "namespace": "customerManagement.avro",
  "type": "record",
  "name": "Customer",
  "fields": [
      {"name": "id", "type": "int"},
      {"name": "name",  "type": "string""},
      {"name": "faxNumber", "type": ["null", "string"], "default": "null"}
  ] 
}

在这个例子中,id和name是必须的,而faxNumber是可选的,默认为null。

在使用这个格式一段时间后,我们需要升级协议,去掉faxNumber属性并增加email属性:

{
  "namespace": "customerManagement.avro",
  "type": "record",
  "name": "Customer",
  "fields": [
      {"name": "id", "type": "int"},
      {"name": "name",  "type": "string""},
      {"name": "email", "type": ["null", "string"], "default": "null"}
  ] 
}

消费者在处理消息时,会通过getName()、getId()和getFaxNumber()来获取消息属性,对于新的消息,消费者获取的faxNumber会为null。如果消费者升级应用代码,调用getEmail而不是getFaxNumber,对于老的消息,getEmail会返回null。

这个例子体现了Avro的优势:即使修改消息的结构而不升级消费者代码,消费者仍然可以读取数据而不会抛出异常错误。不过需要注意下面两点:

  • 写入的消息格式与期待读取的格式需要兼容,关于兼容可以参考这个文档
  • 消费者需要知道写入数据的格式,对于Avro文件来说写入格式包含在文件中,对于Kafka我们接下来看下如何处理。
在Kafka中使用Avro消息

当使用Avro序列化成文件时,我们可以将数据的结构添加到文件中;但对于Kafka,如果对于每条Avro消息我们都附上消息结构,那么将会增加差不多一倍的开销。因此,我们可以使用模式注册中心(Schema Registry)的架构模式,将消息的结构存储在注册中心,这样消费者可以从注册中心获取数据的结构。模式注册中心不是Kafka项目的一部分,但有很多开源的方案可以考虑。

使用模式注册中心的话,我们需要存储消息的所有格式到注册中心,然后在消息记录中添加格式的ID,这样消费者可以通过这个ID从注册中心获取数据的模式以进行反序列化。看上去很麻烦,我们需要存储数据的结构以及在消费端拉取数据结构,但是不需要担心,这些工作已经由serializer/deserializer来完成了,应用只需要使用Avro提供的serializer即可。

整体处理流程如下:
在这里插入图片描述
下面是一个发送生成的Avro对象到Kafka的代码样例

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", schemaUrl);

String topic = "customerContacts";
int wait = 500;
Producer<String, Customer> producer = new KafkaProducer<String, Customer>(props);

// We keep producing new events until someone ctrl-c
while (true) {
    Customer customer = CustomerGenerator.getNext();
    System.out.println("Generated customer " + customer.toString());
    ProducerRecord<String, Customer> record = new ProducerRecord<>(topic, customer.getId(), customer);
    producer.send(record);
}

其中,

  1. 我们使用KafkaAvroSerializer来序列化对象,注意它可以处理原子类型,上面代码中使用其来序列化消息的key。
  2. schema.registry.url参数指定了注册中心的地址,我们将数据的结构存储在该注册中心。
  3. Customer是生成的对象,生产者发送的消息类型也为Customer。
  4. 我们创建一个包含Customer的记录并发送,序列化的工作由KafkaAvroSerializer来完成。

当然,我们也可以使用通用的Avrod对象而不是生成的Avro对象,对于这种情况,我们需要提供数据的结构:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", schemaUrl);

String schemaString = "{\"namespace\": \"customerManagement.avro\",
                        \"type\": \"record\", " +
                        "\"name\": \"Customer\"," +
                        "\"fields\": [" +
                            "{\"name\": \"id\", \"type\": \"int\"}," +
                             "{\"name\": \"name\", \"type\": \"string\"}," +
                             "{\"name\": \"email\", \"type\": [\"null\",\"string\"], \"default\":\"null\" }" +
                       "]}";

Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaString);

for (int nCustomers = 0; nCustomers < customers; nCustomers++) {
    String name = "exampleCustomer" + nCustomers;
    String email = "example " + nCustomers + "@example.com";
    GenericRecord customer = new GenericData.Record(schema);
    customer.put("id", nCustomer);
    customer.put("name", name);
    customer.put("email", email);
   
    ProducerRecord<String, GenericRecord> data = new ProducerRecord<String,GenericRecord>("customerContacts",name, customer);
    
    producer.send(data);
}

这里我们仍然使用KafkaAvroSerializer,也提供模式注册中心的地址;但我们现在需要自己提供Avro的模式,而这个之前是由Avro生成的对象来提供的,我们发送的对象是GenericRecord,在创建的时候我们提供了模式以及写入的数据。最后,serializer会知道如何从GenericRecord中取出模式并且存储到注册中心,然后序列化对象数据。

分区

我们创建消息的时候,必须要提供主题和消息的内容,而消息的key是可选的,当不指定key时默认为null。消息的key有两个重要的作用:1)提供描述消息的额外信息;2)用来决定消息写入到哪个分区,所有具有相同key的消息会分配到同一个分区中。

如果key为null,那么生产者会使用默认的分配器,该分配器使用轮询(round-robin)算法来将消息均衡到所有分区。

如果key不为null而且使用的是默认的分配器,那么生产者会对key进行哈希并根据结果将消息分配到特定的分区。注意的是,在计算消息与分区的映射关系时,使用的是全部的分区数而不仅仅是可用的分区数。这也意味着,如果某个分区不可用(虽然使用复制方案的话这极少发生),而消息刚好被分配到该分区,那么将会写入失败。另外,如果需要增加额外的分区,那么消息与分区的映射关系将会发生改变,因此尽量避免这种情况。

自定义分配器

现在来看下如何自定义一个分配器,下面将key为Banana的消息单独放在一个分区,与其他的消息进行分区隔离:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;

public class BananaPartitioner implements Partitioner {
    public void configure(Map<String, ?> configs) {}
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if ((keyBytes == null) || (!(key instanceOf String)))
        throw new InvalidRecordException("We expect all messages to have customer name as key")
    if (((String) key).equals("Banana"))
        return numPartitions; // Banana will always go to last partition
   
     // Other records will get hashed to the rest of the partitions
    return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
    }
    
    public void close() {}
 
}

Kafka消费者:从Kafka中读取数据

Kafka消费者相关的概念
消费者与消费组

假设这么个场景:我们从Kafka中读取消息,并且进行检查,最后产生结果数据。我们可以创建一个消费者实例去做这件事情,但如果生产者写入消息的速度比消费者读取的速度快怎么办呢?这样随着时间增长,消息堆积越来越严重。对于这种场景,我们需要增加多个消费者来进行水平扩展。

Kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息。假设有一个T1主题,该主题有4个分区;同时我们有一个消费组G1,这个消费组只有一个消费者C1。那么消费者C1将会收到这4个分区的消息,如下所示:
在这里插入图片描述
如果我们增加新的消费者C2到消费组G1,那么每个消费者将会分别收到两个分区的消息,如下所示:
在这里插入图片描述
如果增加到4个消费者,那么每个消费者将会分别收到一个分区的消息,如下所示:
在这里插入图片描述
但如果我们继续增加消费者到这个消费组,剩余的消费者将会空闲,不会收到任何消息:
在这里插入图片描述
总而言之,我们可以通过增加消费组的消费者来进行水平扩展提升消费能力。这也是为什么建议创建主题时使用比较多的分区数,这样可以在消费负载高的情况下增加消费者来提升性能。另外,消费者的数量不应该比分区数多,因为多出来的消费者是空闲的,没有任何帮助。

Kafka一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。换句话说,每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组。对于上面的例子,假如我们新增了一个新的消费组G2,而这个消费组有两个消费者,那么会是这样的:
在这里插入图片描述
在这个场景中,消费组G1和消费组G2都能收到T1主题的全量消息,在逻辑意义上来说它们属于不同的应用。

最后,总结起来就是:如果应用需要读取全量消息,那么请为该应用设置一个消费组;如果该应用消费能力不足,那么可以考虑在这个消费组里增加消费者。

消费组与分区重平衡

可以看到,当新的消费者加入消费组,它会消费一个或多个分区,而这些分区之前是由其他消费者负责的;另外,当消费者离开消费组(比如重启、宕机等)时,它所消费的分区会分配给其他分区。这种现象称为重平衡(rebalance)。重平衡是Kafka一个很重要的性质,这个性质保证了高可用和水平扩展。不过也需要注意到,在重平衡期间,所有消费者都不能消费消息,因此会造成整个消费组短暂的不可用。而且,将分区进行重平衡也会导致原来的消费者状态过期,从而导致消费者需要重新更新状态,这段期间也会降低消费性能。后面我们会讨论如何安全的进行重平衡以及如何尽可能避免。

消费者通过定期发送心跳(hearbeat)到一个作为组协调者(group coordinator)的broker来保持在消费组内存活。这个broker不是固定的,每个消费组都可能不同。当消费者拉取消息或者提交时,便会发送心跳。

如果消费者超过一定时间没有发送心跳,那么它的会话(session)就会过期,组协调者会认为该消费者已经宕机,然后触发重平衡。可以看到,从消费者宕机到会话过期是有一定时间的,这段时间内该消费者的分区都不能进行消息消费;通常情况下,我们可以进行优雅关闭,这样消费者会发送离开的消息到组协调者,这样组协调者可以立即进行重平衡而不需要等待会话过期。

在0.10.1版本,Kafka对心跳机制进行了修改,将发送心跳与拉取消息进行分离,这样使得发送心跳的频率不受拉取的频率影响。另外更高版本的Kafka支持配置一个消费者多长时间不拉取消息但仍然保持存活,这个配置可以避免活锁(livelock)。活锁,是指应用没有故障但是由于某些原因不能进一步消费。

创建Kafka消费者

读取Kafka消息只需要创建一个kafkaConsumer,创建过程与KafkaProducer非常相像。我们需要使用四个基本属性,bootstrap.servers、key.deserializer、value.deserializer和group.id。其中,bootstrap.servers与创建KafkaProducer的含义一样;key.deserializer和value.deserializer是用来做反序列化的,也就是将字节数组转换成对象;group.id不是严格必须的,但通常都会指定,这个参数是消费者的消费组。
下面是一个代码样例:

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
订阅主题

创建完消费者后我们便可以订阅主题了,只需要通过调用subscribe()方法即可,这个方法接收一个主题列表,非常简单:

consumer.subscribe(Collections.singletonList("customerCountries"));

这个例子中只订阅了一个customerCountries主题。另外,我们也可以使用正则表达式来匹配多个主题,而且订阅之后如果又有匹配的新主题,那么这个消费组会立即对其进行消费。正则表达式在连接Kafka与其他系统时非常有用。比如订阅所有的测试主题:

consumer.subscribe("test.*");
拉取循环

消费数据的API和处理方式很简单,我们只需要循环不断拉取消息即可。Kafka对外暴露了一个非常简洁的poll方法,其内部实现了协作、分区重平衡、心跳、数据拉取等功能,但使用时这些细节都被隐藏了,我们也不需要关注这些。下面是一个代码样例:

try {
   while (true) {  //1)
       ConsumerRecords<String, String> records = consumer.poll(100);  //2)
       for (ConsumerRecord<String, String> record : records)  //3)
       {
           log.debug("topic = %s, partition = %s, offset = %d,
              customer = %s, country = %s\n",
              record.topic(), record.partition(), record.offset(),
              record.key(), record.value());
           int updatedCount = 1;
           if (custCountryMap.countainsValue(record.value())) {
               updatedCount = custCountryMap.get(record.value()) + 1;
           }
           custCountryMap.put(record.value(), updatedCount)
           JSONObject json = new JSONObject(custCountryMap);
           System.out.println(json.toString(4))
       }
   }
} finally {
      consumer.close(); //4
}

其中,代码中标注了几点,说明如下:

  • 1)这个例子使用无限循环消费并处理数据,这也是使用Kafka最多的一个场景,后面我们会讨论如何更好的退出循环并关闭。
  • 2)这是上面代码中最核心的一行代码。我们不断调用poll拉取数据,如果停止拉取,那么Kafka会认为此消费者已经死亡并进行重平衡。参数值是一个超时时间,指明线程如果没有数据时等待多长时间,0表示不等待立即返回。
  • 3)poll()方法返回记录的列表,每条记录包含key/value以及主题、分区、位移信息。
  • 4)主动关闭可以使得Kafka立即进行重平衡而不需要等待会话过期。

另外需要提醒的是,消费者对象不是线程安全的,也就是不能够多个线程同时使用一个消费者对象;而且也不能够一个线程有多个消费者对象。简而言之,一个线程一个消费者,如果需要多个消费者那么请使用多线程来进行一一对应。

消费者配置

上面的例子中只设置了几个最基本的消费者参数,bootstrap.servers,group.id,key.deserializer和value.deserializer,虽然我们很多情况下只是使用默认设置就行,但了解一些比较重要的参数还是很有帮助的。

fetch.min.bytes

这个参数允许消费者指定从broker读取消息时最小的数据量。当消费者从broker读取消息时,如果数据量小于这个阈值,broker会等待直到有足够的数据,然后才返回给消费者。对于写入量不高的主题来说,这个参数可以减少broker和消费者的压力,因为减少了往返的时间。而对于有大量消费者的主题来说,则可以明显减轻broker压力。

fetch.max.wait.ms

上面的fetch.min.bytes参数指定了消费者读取的最小数据量,而这个参数则指定了消费者读取时最长等待时间,从而避免长时间阻塞。这个参数默认为500ms。

max.partition.fetch.bytes

这个参数指定了每个分区返回的最多字节数,默认为1M。也就是说,KafkaConsumer.poll()返回记录列表时,每个分区的记录字节数最多为1M。如果一个主题有20个分区,同时有5个消费者,那么每个消费者需要4M的空间来处理消息。实际情况中,我们需要设置更多的空间,这样当存在消费者宕机时,其他消费者可以承担更多的分区。

需要注意的是,max.partition.fetch.bytes必须要比broker能够接收的最大的消息(由max.message.size设置)大,否则会导致消费者消费不了消息。另外,在上面的样例可以看到,我们通常循环调用poll方法来读取消息,如果max.partition.fetch.bytes设置过大,那么消费者需要更长的时间来处理,可能会导致没有及时poll而会话过期。对于这种情况,要么减小max.partition.fetch.bytes,要么加长会话时间。

session.timeout.ms

这个参数设置消费者会话过期时间,默认为3秒。也就是说,如果消费者在这段时间内没有发送心跳,那么broker将会认为会话过期而进行分区重平衡。这个参数与heartbeat.interval.ms有关,heartbeat.interval.ms控制KafkaConsumer的poll()方法多长时间发送一次心跳,这个值需要比session.timeout.ms小,一般为1/3,也就是1秒。更小的session.timeout.ms可以让Kafka快速发现故障进行重平衡,但也加大了误判的概率(比如消费者可能只是处理消息慢了而不是宕机)。

auto.offset.reset

这个参数指定了当消费者第一次读取分区或者上一次的位置太老(比如消费者下线时间太久)时的行为,可以取值为latest(从最新的消息开始消费)或者earliest(从最老的消息开始消费)。

enable.auto.commit

这个参数指定了消费者是否自动提交消费位移,默认为true。如果需要减少重复消费或者数据丢失,你可以设置为false。如果为true,你可能需要关注自动提交的时间间隔,该间隔由auto.commit.interval.ms设置。

partition.assignment.strategy

我们已经知道当消费组存在多个消费者时,主题的分区需要按照一定策略分配给消费者。这个策略由PartitionAssignor类决定,默认有两种策略:

  • 范围(Range):对于每个主题,每个消费者负责一定的连续范围分区。假如消费者C1和消费者C2订阅了两个主题,这两个主题都有3个分区,那么使用这个策略会导致消费者C1负责每个主题的分区0和分区1(下标基于0开始),消费者C2负责分区2。可以看到,如果消费者数量不能整除分区数,那么第一个消费者会多出几个分区(由主题数决定)。
  • 轮询(RoundRobin):对于所有订阅的主题分区,按顺序一一的分配给消费者。用上面的例子来说,消费者C1负责第一个主题的分区0、分区2,以及第二个主题的分区1;其他分区则由消费者C2负责。可以看到,这种策略更加均衡,所有消费者之间的分区数的差值最多为1。

partition.assignment.strategy设置了分配策略,默认为org.apache.kafka.clients.consumer.RangeAssignor(使用范围策略),你可以设置为org.apache.kafka.clients.consumer.RoundRobinAssignor(使用轮询策略),或者自己实现一个分配策略然后将partition.assignment.strategy指向该实现类。

client.id

这个参数可以为任意值,用来指明消息从哪个客户端发出,一般会在打印日志、衡量指标、分配配额时使用。

max.poll.records

这个参数控制一个poll()调用返回的记录数,这个可以用来控制应用在拉取循环中的处理数据量。

receive.buffer.bytes、send.buffer.bytes

这两个参数控制读写数据时的TCP缓冲区,设置为-1则使用系统的默认值。如果消费者与broker在不同的数据中心,可以一定程度加大缓冲区,因为数据中心间一般的延迟都比较大。

提交(commit)与位移(offset)

当我们调用poll()时,该方法会返回我们没有消费的消息。当消息从broker返回消费者时,broker并不跟踪这些消息是否被消费者接收到;Kafka让消费者自身来管理消费的位移,并向消费者提供更新位移的接口,这种更新位移方式称为提交(commit)。

在正常情况下,消费者会发送分区的提交信息到Kafka,Kafka进行记录。当消费者宕机或者新消费者加入时,Kafka会进行重平衡,这会导致消费者负责之前并不属于它的分区。重平衡完成后,消费者会重新获取分区的位移,下面来看下两种有意思的情况。

假如一个消费者在重平衡前后都负责某个分区,如果提交位移比之前实际处理的消息位移要小,那么会导致消息重复消费,如下所示:
在这里插入图片描述
假如在重平衡前某个消费者拉取分区消息,在进行消息处理前提交了位移,但还没完成处理宕机了,然后Kafka进行重平衡,新的消费者负责此分区并读取提交位移,此时会“丢失”消息,如下所示:
在这里插入图片描述
因此,提交位移的方式会对应用有比较大的影响,下面来看下不同的提交方式。

自动提交

这种方式让消费者来管理位移,应用本身不需要显式操作。当我们将enable.auto.commit设置为true,那么消费者会在poll方法调用后每隔5秒(由auto.commit.interval.ms指定)提交一次位移。和很多其他操作一样,自动提交也是由poll()方法来驱动的;在调用poll()时,消费者判断是否到达提交时间,如果是则提交上一次poll返回的最大位移。

需要注意到,这种方式可能会导致消息重复消费。假如,某个消费者poll消息后,应用正在处理消息,在3秒后Kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。

提交当前位移

为了减少消息重复消费或者避免消息丢失,很多应用选择自己主动提交位移。设置auto.commit.offset为false,那么应用需要自己通过调用commitSync()来主动提交位移,该方法会提交poll返回的最后位移。

为了避免消息丢失,我们应当在完成业务逻辑后才提交位移。而如果在处理消息时发生了重平衡,那么只有当前poll的消息会重复消费。下面是一个自动提交的代码样例:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
    }
    
    try {
        consumer.commitSync();
    } catch (CommitFailedException e) {
        log.error("commit failed", e)
    }
}

上面代码poll消息,并进行简单的打印(在实际中有更多的处理),最后完成处理后进行了位移提交。

异步提交

手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API。以下为使用异步提交的方式,应用发了一个提交请求然后立即返回:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        System.out.printf("topic = %s, partition = %s,
        offset = %d, customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value());
    }
    
    consumer.commitAsync();
}

但是异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。

因此,基于这种性质,一般情况下对于异步提交,我们可能会通过回调的方式记录提交结果:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("topic = %s, partition = %s,
        offset = %d, customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value());
    }
    consumer.commitAsync(new OffsetCommitCallback() {
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            if (e != null)
                log.error("Commit failed for offsets {}", offsets, e);
        } 
    });
}

而如果想进行重试同时又保证提交顺序的话,一种简单的办法是使用单调递增的序号。每次发起异步提交时增加此序号,并且将此时的序号作为参数传给回调方法;当消息提交失败回调时,检查参数中的序号值与全局的序号值,如果相等那么可以进行重试提交,否则放弃(因为已经有更新的位移提交了)。

混合同步提交与异步提交

正常情况下,偶然的提交失败并不是什么大问题,因为后续的提交成功就可以了。但是在某些情况下(例如程序退出、重平衡),我们希望最后的提交成功,因此一种非常普遍的方式是混合异步提交和同步提交,如下所示:

try {
    while (true) {
       ConsumerRecords<String, String> records = consumer.poll(100);
       for (ConsumerRecord<String, String> record : records) {
           System.out.printf("topic = %s, partition = %s, offset = %d,
           customer = %s, country = %s\n",
           record.topic(), record.partition(),
           record.offset(), record.key(), record.value());
       }
       
       consumer.commitAsync();
    }
} catch (Exception e) {
    log.error("Unexpected error", e);
} finally {
    try {
        consumer.commitSync();
    } finally {
        consumer.close();
    }
}

在正常处理流程中,我们使用异步提交来提高性能,但最后使用同步提交来保证位移提交成功。

提交特定位移

commitSync()和commitAsync()会提交上一次poll()的最大位移,但如果poll()返回了批量消息,而且消息数量非常多,我们可能会希望在处理这些批量消息过程中提交位移,以免重平衡导致从头开始消费和处理。幸运的是,commitSync()和commitAsync()允许我们指定特定的位移参数,参数为一个分区与位移的map。由于一个消费者可能会消费多个分区,所以这种方式会增加一定的代码复杂度,如下所示:

private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;

....

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());

        currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
        if (count % 1000 == 0)
            consumer.commitAsync(currentOffsets, null);
        count++;
} }

代码中在处理poll()消息的过程中,不断保存分区与位移的关系,每处理1000条消息就会异步提交(也可以使用同步提交)。

重平衡监听器(Rebalance Listener)

在分区重平衡前,如果消费者知道它即将不再负责某个分区,那么它可能需要将已经处理过的消息位移进行提交。Kafka的API允许我们在消费者新增分区或者失去分区时进行处理,我们只需要在调用subscribe()方法时传入ConsumerRebalanceListener对象,该对象有两个方法:

  • public void onPartitionRevoked(Collection partitions):此方法会在消费者停止消费消费后,在重平衡开始前调用。
  • public void onPartitionAssigned(Collection partitions):此方法在分区分配给消费者后,在消费者开始读取消息前调用。

下面来看一个onPartitionRevoked9)的例子,该例子在消费者失去某个分区时提交位移(以便其他消费者可以接着消费消息并处理):

private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

private class HandleRebalance implements ConsumerRebalanceListener {
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    }
    
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        System.out.println("Lost partitions in rebalance.
          Committing current
        offsets:" + currentOffsets);
        consumer.commitSync(currentOffsets);
    }
}

try {
    consumer.subscribe(topics, new HandleRebalance());
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
        {
             System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
             currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
        }
        consumer.commitAsync(currentOffsets, null);
    }
} catch (WakeupException e) {
    // ignore, we're closing
} catch (Exception e) {
   log.error("Unexpected error", e);
} finally {
   try {
       consumer.commitSync(currentOffsets);
   } finally {
       consumer.close();
       System.out.println("Closed consumer and we are done");
   }
}

代码中实现了onPartitionsRevoked()方法,当消费者失去某个分区时,会提交已经处理的消息位移(而不是poll()的最大位移)。上面代码会提交所有的分区位移,而不仅仅是失去分区的位移,但这种做法没什么坏处。

从指定位移开始消费

在此之前,我们使用poll()来从最后的提交位移开始消费,但我们也可以从一个指定的位移开始消费。

如果想从分区开始端重新开始消费,那么可以使用seekToBeginning(TopicPartition tp);如果想从分区的最末端消费最新的消息,那么可以使用seekToEnd(TopicPartition tp)。而且,Kafka还支持我们从指定位移开始消费。从指定位移开始消费的应用场景有很多,其中最典型的一个是:位移存在其他系统(例如数据库)中,并且以其他系统的位移为准。

考虑这么个场景:我们从Kafka中读取消费,然后进行处理,最后把结果写入数据库;我们既不想丢失消息,也不想数据库中存在重复的消息数据。对于这样的场景,我们可能会按如下逻辑处理:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        currentOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset());
        processRecord(record);
        storeRecordInDB(record);
        consumer.commitAsync(currentOffsets);
    }
}

这个逻辑似乎没什么问题,但是要注意到这么个事实,在持久化到数据库成功后,提交位移到Kafka可能会失败,那么这可能会导致消息会重复处理。对于这种情况,我们可以优化方案,将持久化到数据库与提交位移实现为原子性操作,也就是要么同时成功,要么同时失败。但这个是不可能的,因此我们可以在保存记录到数据库的同时,也保存位移,然后在消费者开始消费时使用数据库的位移开始消费。这个方案是可行的,我们只需要通过seek()来指定分区位移开始消费即可。下面是一个改进的样例代码:

public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        //在消费者负责的分区被回收前提交数据库事务,保存消费的记录和位移
        commitDBTransaction();
    }
    
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        //在开始消费前,从数据库中获取分区的位移,并使用seek()来指定开始消费的位移
        for(TopicPartition partition: partitions)
            consumer.seek(partition, getOffsetFromDB(partition));
    } 
}

    consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
    //在subscribe()之后poll一次,并从数据库中获取分区的位移,使用seek()来指定开始消费的位移
    consumer.poll(0);
    for (TopicPartition partition: consumer.assignment())
        consumer.seek(partition, getOffsetFromDB(partition));

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
        {
            processRecord(record);
            //保存记录结果
            storeRecordInDB(record);
            //保存位移
            storeOffsetInDB(record.topic(), record.partition(), record.offset());
        }
        //提交数据库事务,保存消费的记录以及位移
        commitDBTransaction();
    }

具体逻辑见代码注释,此处不再赘述。另外注意的是,seek()只是指定了poll()拉取的开始位移,这并不影响在Kafka中保存的提交位移(当然我们可以在seek和poll之后提交位移覆盖)。

优雅退出

下面我们来讨论下消费者如何优雅退出。

在一般情况下,我们会在一个主线程中循环poll消息并进行处理。当需要退出poll循环时,我们可以使用另一个线程调用consumer.wakeup(),调用此方法会使得poll()抛出WakeupException。如果调用wakup时,主线程正在处理消息,那么在下一次主线程调用poll时会抛出异常。主线程在抛出WakeUpException后,需要调用consumer.close(),此方法会提交位移,同时发送一个退出消费组的消息到Kafka的组协调者。组协调者收到消息后会立即进行重平衡(而无需等待此消费者会话过期)。

下面是一个优雅退出的样例代码:

//注册JVM关闭时的回调钩子,当JVM关闭时调用此钩子。
Runtime.getRuntime().addShutdownHook(new Thread() {
          public void run() {
              System.out.println("Starting exit...");
              //调用消费者的wakeup方法通知主线程退出
              consumer.wakeup();
              try {
                  //等待主线程退出
                  mainThread.join();
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          } 
});

...

try {
    // looping until ctrl-c, the shutdown hook will cleanup on exit
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        System.out.println(System.currentTimeMillis() + "--  waiting for data...");
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s\n",record.offset(), record.key(), record.value());
        }
        for (TopicPartition tp: consumer.assignment())
            System.out.println("Committing offset at position:" + consumer.position(tp));
        consumer.commitSync();
    }
} catch (WakeupException e) {
    // ignore for shutdown
} finally {
    consumer.close();
    System.out.println("Closed consumer and we are done");
}
反序列化

如前所述,Kafka生产者负责将对象序列化成字节数组并发送到Kafka。消费者则需要将字节数组转换成对象,这就是反序列化做的事情。序列化与反序列化需要匹配,如果序列化使用IntegerSerializer,但使用StringDeserializer来反序列化,那么会反序列化失败。因此作为开发者,我们需要关注写入到主题使用的是什么序列化格式,并且保证写入的数据能够被消费者反序列化成功。如果使用Avro与模式注册中心(Schema Registry)来序列化与反序列化,那么事情会轻松许多,因为AvroSerializer会保证所有写入的数据都是结构兼容的,并且能够被反序列化出来。

下面先来看下如何自定义反序列化,后面会进一步讨论如何使用Avro。

自定义反序列化

首先,假设序列化的对象为Customer:

public class Customer {
     private int customerID;
     private String customerName;
     public Customer(int ID, String name) {
         this.customerID = ID;
         this.customerName = name;
     }
     public int getID() {
         return customerID;
     }
     public String getName() {
         return customerName;
     } 
}

根据之前的序列化策略,我们的反序列化代码如下:

import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;

public class CustomerDeserializer implements Deserializer<Customer> {
    @Override
    public void configure(Map configs, boolean isKey) {
     // nothing to configure
    }

    @Override
    public Customer deserialize(String topic, byte[] data) {
        int id;
        int nameSize;
        String name;
        try {
            if (data == null)
                return null;
            if (data.length < 8)
                throw new SerializationException("Size of data received by IntegerDeserializer is shorter than expected");
            ByteBuffer buffer = ByteBuffer.wrap(data);
            id = buffer.getInt();
            String nameSize = buffer.getInt();
            byte[] nameBytes = new Array[Byte](nameSize);
            buffer.get(nameBytes);
            name = new String(nameBytes, 'UTF-8');
            return new Customer(id, name);
        } catch (Exception e) {
            throw new SerializationException("Error when serializing Customer to byte[] " + e);
        }
    }
    @Override
    public void close() {
            // nothing to close
    } 
}

消费者使用这个反序列化的代码如下:

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.CustomerDeserializer");

KafkaConsumer<String, Customer> consumer = new KafkaConsumer<>(props);
consumer.subscribe("customerCountries")
while (true) {
    ConsumerRecords<String, Customer> records = consumer.poll(100);
    for (ConsumerRecord<String, Customer> record : records)
    {
    System.out.println("current customer Id: " + record.value().getId() + " and current customer name: " + record.value().getName());
    } 
}

最后提醒下,我们并不推荐实现自定义的序列化与反序列化,因为往往这些方案并不成熟,难以维护和升级,而且容易出错。我们可以使用JSON、Thrift、Protobuf或者Avro的成熟的解决方案。

使用Avro反序列化

使用Avro反序列化

假设我们使用之前生产者Avro序列化时使用的Customer,那么使用Avro反序列化的话,我们的样例代码如下:

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//使用KafkaAvroDeserializer来反序列化Avro消息
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
//这里增加了schema.registry.url参数,获取生产者注册的消息模式
props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts"

KafkaConsumer consumer = new KafkaConsumer(createConsumerConfig(brokers, groupId, url));
consumer.subscribe(Collections.singletonList(topic));

System.out.println("Reading topic:" + topic);

while (true) {
    //这里使用之前生产者使用的Avro生成的Customer类
    ConsumerRecords<String, Customer> records = consumer.poll(1000);
    for (ConsumerRecord<String, Customer> record: records) {
        System.out.println("Current customer name is: " + record.value().getName());
    }
    consumer.commitSync();
}
单个消费者

一般情况下我们都是使用消费组(即便只有一个消费者)来消费消息的,因为这样可以在增加或减少消费者时自动进行分区重平衡。这种方式是推荐的方式。在知道主题和分区的情况下,我们也可以使用单个消费者来进行消费。对于这种情况,我们需要自己给消费者分配消费分区,而不是让消费者订阅(成为消费组)主题。

下面是一个给单个消费者指定分区进行消费的代码样例:

List<PartitionInfo> partitionInfos = null;
//获取主题下所有的分区。如果你知道所指定的分区,可以跳过这一步
partitionInfos = consumer.partitionsFor("topic");

if (partitionInfos != null) {
    for (PartitionInfo partition : partitionInfos)
        partitions.add(new TopicPartition(partition.topic(), partition.partition()));
    //为消费者指定分区
    consumer.assign(partitions);

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        for (ConsumerRecord<String, String> record: records) {
            System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
        }
        consumer.commitSync();
    }
}

除了需要主动获取分区以及没有分区重平衡,其他的处理逻辑都是一样的。需要注意的是,如果添加了新的分区,这个消费者是感知不到的,需要通过consumer.partitionsFor()来重新获取分区。

Kafka内部机制

探讨下以下三个问题:

  • Kafka复制机制是怎么工作的?
  • Kafka是怎么处理生产者和消费者的请求的?
  • Kafka是怎么存储文件或者索引的?
集群成员管理

Kafka使用Zookeeper管理集群成员状态,每一个broker都有一个唯一ID(在配置文件中指定或者自动生成),当broker启动时会在Zookeeper中注册相应的临时节点。如果集群中存在相同的ID,那么新的broker会启动失败。

Zookeeper中的节点注册路径为/broker/ids,Kafka的各个组件会监听此路径下的变更信息,当broker加入或者离开时,它们会收到通知。当节点离开(可能由于停机、网络故障、长GC等导致)时,Zookeeper中相应的节点会消失,但该broker的ID仍然会在某些数据结构中存在。比如,每个主题的副本列表会包含副本所在的broker ID,因此如果一个broker离开同时有一个新的broker拥有此相同的ID,那么新的broker会在集群中替代之前的broker,并且会被分配同样的主题和分区。

集群控制器(Controller)

集群控制器也是一个broker,在承担一般的broker职责外,它还负责选举分区的主副本(后面会讨论分区主副本的作用)。集群中的第一个broker通过在Zookeeper的/controller路径下创建一个临时节点来成为控制器。当其他broker启动时,也会试图创建一个临时节点,但是会收到一个“节点已存在”的异常,这样便知道当前已经存在集群控制器。这些broker会监听Zookeeper的这个控制器临时节点,当控制器发生故障时,该临时节点会消失,这些broker便会收到通知,然后尝试去创建临时节点成为新的控制器。

对于一个控制器来说,如果它发现集群中的一个broker离开时,它会检查该broker是否有分区的主副本,如果有则需要对这些分区选举出新的主副本。控制器会在分区的副本列表中选出一个新的主副本,并且发送请求给新的主副本和其他的跟随者;这样新的主副本便知道需要处理生产者和消费者的请求,而跟随者则需要向新的主副本同步消息。

如果控制器发现有新的broker(这个broker也有可能是之前宕机的)加入时,它会通过此broker的ID来检查该broker是否有分区副本存在,如果有则通知该broker向相应的分区主副本同步消息。

最后,每当选举一个新的控制器时,就会产生一个新的更大的控制器时间戳(controller epoch),这样集群中的broker收到控制器的消息时检查此时间戳,当发现消息来源于老的控制器,它们便会忽略,以避免脑裂(split brain)。

复制

复制,是Kafka架构的核心。通过复制机制,Kafka达到了高可用的要求,保证在少量节点发生故障时集群仍然可用。如前所述,每个主题都有若干个分区,而每个分区有多个副本,这些副本都存在broker中。Kafka中的副本有两种类型:

  • 主副本(leader replica):每个分区都有唯一的主副本,所有的生产者和消费者请求都由主副本处理,这样才能保证一致性。
  • 跟随者副本(follower replica):分区的其他副本为跟随者副本,跟随者副本不处理生产者或消费者请求,它们只是向主副本同步消息,当主副本所在的broker宕机后,跟随者副本会选举出新的主副本。

对于主副本来说,它还有一个职责,那就是关注跟随者副本的同步状态。每个跟随者副本都会保持与主副本同步,但在异常情况(例如网络阻塞、机器故障、重启等)下,跟随者的状态可能会同步失败。跟随者副本通过向主副本发送Fetch请求来进行同步(与消费者一样),请求中包含希望同步的下一个消息位移,该位移始终是有序的。比如,一个跟随者可能会按序请求消息1,消息2,消息3…如果跟随者请求消息N,那么主副本可以确定此跟随者已经接收到N-1及以前的消息了。因此根据请求中的位移信息,主副本知道跟随者的落后状态,如果副本超过10秒钟没有发送同步请求或者请求的位移属于10秒钟以前的位移,那么主副本会认为该跟随者是同步落后(out of sync)的。如果一个跟随者副本是同步落后的,那么在主副本发生故障时该跟随者不能成为新的主副本。而能够及时同步的跟随者副本则是in-sync状态的,这些跟随者副本有资格成为新的主副本。当然10秒钟不是固定的,可以通过replica.lag.time.max.ms来设置。

每个分区除了有主副本之外,还有一个备选主副本(preferred leader),它是主题初始创建时的主副本。在主题初始创建时,Kafka使用一定的算法来分散所有主题的主副本与跟随者副本,因此备份主副本通常能保证集群的流量是均衡分布的。如果备份主副本是in-sync状态的,那么在主副本发生故障后,它会自动成为新的主副本。

请求处理

Kafka的broker主要工作是,当作为分区的主副本时,处理来自生产者/消费者客户端、跟随者副本以及控制器的请求。对于此Kafka使用自定义的二进制协议来进行通信,请求的头部包含如下信息:

  • 请求类型
  • 请求版本(通过此参数broker可以正确处理不同版本客户端的请求)
  • 关联ID:标识请求的全局唯一数字,也会在响应结果和错误日志中呈现。
  • 客户端ID:用来识别请求的来源。

同一个客户端发送的所有请求都是按序处理的,这保证了消息的有序性。

对于监听的每一个端口,broker都会运行一个接收者(acceptor)线程来建立连接,并且把连接交给一个处理器(processor)线程处理,处理器线程的数量是可配置的。处理器线程接收客户端的请求,把请求放进请求队列,最后从响应队列中取出结果返回给客户端。当请求放进请求队列后,IO线程负责进行处理,大部分的请求都属于两种类型:

  • 生产请求(produce request):由生产者发送,包含需要写入的消息。
  • 拉取请求(fetch request):由消费者和跟随者副本发送。

整体架构如下所示:
在这里插入图片描述
生产请求和拉取请求都需要发送给分区的主副本处理,如果一个跟随者副本收到这两种请求,它会返回“Not a Leader for Partition”的异常信息,客户端收到异常信息后向正确的主副本发送请求。

客户端怎么知道哪个是主副本呢?通过使用另一种类型的请求来实现,那就是元信息请求(metadata request)。Kafka的任意一个broker都可以处理这种请求,请求中包含客户端感兴趣的主题列表,broker会返回这些主题的分区列表、分区的副本列表以及主副本信息。客户端收到这些信息后,会进行一定时间的缓存(由metadata.max.age.ms指定),当超过时间或者broker返回请求的异常后,会刷新此信息。整体交互处理如下:
在这里插入图片描述

生产请求(produce request)

acks参数控制多少个副本确认写入成功后生产者才认为消息生产成功。这个参数的取值可以为:

  • acks=0:消息发送完毕,生产者认为消息写入成功;
  • acks=1:主副本写入成功,生产者认为消息写入成功;
  • acks=all:所有in-sync副本写入成功,生产者认为消息写入成功。

如果主副本收到生产消息,它会执行一些检查逻辑,包含:

  • 发送的用户是否有权限写入主题?
  • 请求的acks参数取值是否合法(只允许0,1,all)?
  • 如acks设置为all,是否有足够的in-sync副本来安全写入消息?(我们可以配置如果in-sync副本低于一定数量,主副本拒绝写入消息)

检查通过后主副本会持久化消息到本地磁盘,在Linux系统上消息只会写入到文件系统的缓存,因此并没有保证一定写入到磁盘;Kafka依赖复制机制来保证数据不丢失。

一旦消息本地持久化后,如果acks=1那么会返回结果给客户端,如果acks=all那么会将请求放置在一个称为purgatory的缓冲区中等待其他的副本写入完成。

拉取请求(fetch request)

主副本处理拉取请求和处理生产请求差不多。客户端发送一个拉取请求,包含主题、分区和位移信息,主副本返回相应的数据。另外,客户端也指定返回的最大数据量,防止数据量过大造成客户端内存溢出。同时,客户端也指定返回的最小数据量,当消息数据量没有达到最小数据量时,请求会一直阻塞直到有足够的数据返回,如下所示:
在这里插入图片描述
指定最小的数据量在负载不高的情况下非常有用,通过这种方式可以减轻网络往返的额外开销。当然请求也不能永远的阻塞,客户端可以指定最大的阻塞时间,如果到达指定的阻塞时间,即便没有足够的数据也会返回。

有意思的是,不是所有主副本的数据都能够被读取。当数据被所有in-sync状态的副本写入成功后,它才能被客户端读取。对于主副本来说,这并不难实现,因为之前已经说过通过副本同步,它知道所有副本当前已经完成同步的消息位移。该机制描述如下:
在这里插入图片描述
但为什么要这么做呢?这是因为如果没有足够的副本持久化消息,该消息是不安全的。如果主副本发生故障,然后其他副本成为新的主副本,这些消息将会在Kafka中莫名其妙的消失。也就是说,存在一个消费组能够读到某个消息,但另外的消费组读不到这个消息,从而导致不一致的行为。

这样的机制也导致了新消息到达消费者应用的高延迟,特别是存在副本之间网络拥塞的情况。我们可以通过replica.lag.time.max.ms来指定一个副本落后多少仍被视为in-sync状态的,减小该值可以使得消费者无需等待延迟较大的副本写入。

在读取消息上,Kafka使用零复制(zero-copy)来提高性能。也就是说,Kafka将文件(更准确的说,是文件系统缓存)的消息直接传给网络通道,并没有使用中间的buffer。这避免了内存的字节拷贝和buffer维护,极大地提高了性能。

其他请求

我们讨论了Kafka中最常见的三种请求类型:元信息请求,生产请求和拉取请求。这些请求都是使用的是Kafka的自定义二进制协议。集群中broker间的通信请求也是使用同样的协议,这些请求是内部使用的,客户端不能发送。比如在选举分区主副本过程中,控制器会发送LeaderAndIsr请求给新的主副本和其他跟随副本。

物理存储

Kafka中基本的存储单元是一个分区副本。分区副本不能跨越多个broker,甚至不能跨越同一个broker的多个磁盘。当配置Kafka时,管理员需要设置log.dirs来指定分区存储的目录。
下面来看下存储的细节。

分区分配

当创建一个新主题时,Kafka首先需要决定如何分配分区到不同的broker。假设有6个broker,那么创建一个有10个分区且复制因子为3的主题的话,Kafka需要分配30个分区副本到这6个broker。分配策略主要的考虑因素如下:

  • 能将分区副本均衡分配到集群的broker中;
  • 对于每个分区,它的所有副本需要在不同的broker;
  • 果broker(0.10.0及更高版本)有机架信息,那么对于一个分区的所有副本,尽量分配这些副本到不同的机架。这保证了机架发生故障时集群仍然可用。

下面以一个例子来说明这几个策略。假设集群由6个broker,我们从一个随机的broker(不妨称为4号)开始以轮询的方式来分配分区的主副本。因此分区0的主副本在4号broker,分区1的主副本在5号broker,分区2的主副本在0号(总数量为6,下标从0开始)broker…然后对于每个分区的其他副本,从其主副本所在的broker开始分配。由于分区0的主副本在4号broker,分区0的第一个跟随者副本在5号broker,第二个跟随者副本在6号broker,以此类推…

如果考虑机架因素,那么我们需要首先根据broker的机架信息来对broker进行排序,而不是上面的按照序号递增来排序。假如0号、1号和2号broker在同一个机架,3号、4号和5号broker在另一个机架,那么broker的顺序可能为0,3,1,4,2,5。这个顺序穿插了不同机架的broker。那么对于这种情况,分区0的主副本在4号broker,分区1的主副本在2号broker…

在分配分区和副本到broker之后,下一步需要决定使用哪个目录来存储分区。策略非常简单:统计每个目录的分区数,把新分区分配到最少分区数的目录中。因此如果新增加一个磁盘,所有的新分区都会分配到在这个磁盘上,因为它的分区数最少。

文件管理

消息保留,是Kafka中一个很重要的概念。Kafka不会永远保留数据,也不会等待所有的消费组读取了消息才删除消息。只要数据量达到上限或者数据达到过期时间,Kafka会删除老的消息数据。

因为在一个大文件中查找需要清理的数据并进行删除是非常耗时而且容易出错的,Kafka将每个分区切割成段(segment)。默认每个段大小不超过1G,且只包含7天的数据。如果段的消息量达到1G,那么该段会关闭,同时打开一个新的段进行写入。

正在写入的段称为活跃段(active segment),活跃段不会被删除。因此,假如设置日志保留时间为1天,但是日志段包含5天的数据,那么我们实际上会保留5天的数据,因为活跃段正在使用而且在关闭前不会删除。

对于每个分区的每个段(包括不活跃的段),broker都会维护文件句柄,因此打开的文件句柄数通常会比较多,这个需要适度调整系统的进程文件句柄参数。

文件格式

每个段都单独存为一个文件,文件内存放消息和位移。磁盘上的数据格式、生产者发送的数据格式和消费者读取的数据格式都是一样的,使用相同的数据格式使得Kafka可以进行零拷贝优化,以及避免压缩和解压缩。

除了key/value和位移之外,每个消息还包含消息大小、校验和(检测数据损坏)、魔数(标识消息格式版本)、压缩算法(Snappy、GZip或者LZ4)和时间戳(0.10.0新增)。

如果发送者发送压缩的消息,那么批量发送的消息会压缩在一起,以“包装消息”(wrapper message)来发送,如下所示:
在这里插入图片描述
因此如果生产者使用压缩,那么发送更大的批量消息可以得到更好的网络传输效率,并且节省磁盘存储空间。

索引

Kafka允许消费者从任意合法的位移拉取消息,也就是说如果消费者希望从位移为100的地方开始读取1MB的消息,broker需要在该分区的所有段中定位到该消息的位置然后开始读取数据。为了帮助broker迅速定位,Kafka对于每个分区都维护一个索引,该索引将位移映射到段以及段内偏移。

索引也是按照段来切割的,因此清理过期消息时相应的索引也可以很容易清理。另外,索引并没有维护校验和,因此如果索引损坏了,Kafka会重新读取段文件生成索引。

压缩(compaction)

正常情况下,Kafka存储一定数量的消息,并且如果消息超过一定时间,这些消息会被清除。此外,Kafka还支持压缩的消息保留策略,使用这种策略会使得对于主题内的每个键,Kafka只会保留最新的消息内容。显然,压缩的策略对同时包含键和消息内容的主题才生效,如果主题内的消息键为null,那么压缩的策略不会生效。

压缩是怎么实现的?

每个分区都可以分为两部分:

  • 净(clean):这部分消息之前已经被压缩过,对于每个key来说这部分只存在一个value。
  • 脏(dirty):在上一次压缩后写入的新消息。

如下所示:
在这里插入图片描述
如果使用压缩,那么每个broker会启动一个压缩管理器线程和若干个压缩线程,每个线程负责一个分区。
在压缩分区时,压缩线程会首先读取脏的部分,并且建立一个key的哈希和位移的映射,对于相同的键,只保留最新的位移。其中key的哈希大小为16字节,位移大小为8个字节。也就是说,一个映射只有24字节,假设消息大小为1KB,那么1GB的段有1百万条消息,建立这个段的映射只需要24MB的内存,映射的内存效率是非常高效的。

在配置Kafka时,管理员需要设置这些压缩线程可以使用的总内存。如果设置1GB的总内存同时有5个压缩线程,那么每个线程只有200MB的内存可用。在压缩线程工作时,它不需要把所有脏的段文件都一起在内存中建立上述映射,但需要保证至少能够建立一个段的映射。如果不能同时处理所有脏的段,Kafka会一次压缩最老的几个脏段,然后在下一次再处理其他的脏段。

一旦建立完脏段的键与位移的映射后,压缩线程会从最老的干净的段开始处理。如果发现段中的消息的键没有在映射中出现,那么可以知道这个消息是最新的,然后简单的复制到一个新的干净的段中;否则如果消息的键在映射中出现,这条消息需要抛弃,因为对于这个键,已经有新的消息写入。处理完会将产生的新段替代原始段,并处理下一个段。

对于一个段,处理前和处理后的效果如下:
在这里插入图片描述

删除事件

对于只保留最新消息的压缩策略来说,Kafka还支持删除相应键的消息操作(而不仅仅是保留最新的消息内容)。这是通过生产者发送一条特殊的消息来实现的,该消息包含一个键以及一个null的消息内容。当压缩线程发现这条消息时,它首先仍然进行一个正常的压缩并且保留这个包含null的特殊消息一段时间,在这段时间内消费者消费者可以获取到这条消息并且知道消息内容已经被删除。过了这段时间,压缩线程会删除这条消息,这个键会从分区中消失。这段时间是必须的,因为它可以使得消费者有一定的时间余地来收到这条消息。

什么时候压缩?

与过期清除的策略一样,压缩策略也不会对活跃段进行压缩。在0.10.0以及更老的版本,Kafka会在主题包含50%脏记录的时候进行压缩,目的是为了既不频繁压缩(影响性能),也不留存太多脏记录。

可靠的数据传输

可靠性是一个系统的属性,这种属性必须在系统设计之初就进行考虑。而Kafka支持不同程度的数据可靠性,这得取决于不同的使用场景。有的应用需要极强的数据可靠性,而有的应用则更倾向于性能和可扩展性…Kafka提供了非常灵活的配置和API来支持不同的用户场景。

也正是由于Kafka的灵活性,如果使用时不加以留意可能会导致问题。比如,你以为当前的系统是非常可靠的但实际却不然。下面我们来看下Kafka中的可靠性语义。

可靠性保证

当我们讨论可靠性的时候,我们总会提到保证这个词语。可靠性保证是基础,我们基于这些基础之上构建我们的应用。比如关系型数据库的可靠性保证是ACID,也就是原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)。

Kafka中的可靠性保证有如下四点:

  • 一个分区来说,它的消息是有序的。如果一个生产者向一个分区先写入消息A,然后写入消息B,那么消费者会先读取消息A再读取消息B。
  • 当消息写入所有in-sync状态的副本后,消息才会认为已提交(committed)。这里的写入有可能只是写入到文件系统的缓存,不一定刷新到磁盘。生产者可以等待不同时机的确认,比如等待分区主副本写入即返回,后者等待所有in-sync状态副本写入才返回。
  • 旦消息已提交,那么只要有一个副本存活,数据不会丢失。
  • 消费者只能读取到已提交的消息。

使用这些基础保证,我们构建一个可靠的系统,这时候需要考虑一个问题:究竟我们的应用需要多大程度的可靠性?可靠性不是无偿的,它与系统可用性、吞吐量、延迟和硬件价格息息相关,得此失彼。因此,我们往往需要做权衡,一味的追求可靠性并不实际。下面我们先来看下Kafka的复制机制是如何保证可靠性的,然后进一步讨论可靠性配置。

复制

复制机制是Kafka可靠性保证的核心,前面已经提及,这里再进行总结。

每一个Kafka的主题都会分成几个分区,每个分区都单独存放在磁盘上。Kafka保证分区内的消息顺序。每个分区可以有若干个相同的副本,其中一个为主副本。生产者发送消息到主副本,消费者从主副本中读取消息。其他副本需要保持与主副本同步,如果主副本不可用,那么这些同步中的副本其中之一会成为新的主副本。

一个副本被认为是in-sync(也就是及时同步)状态,当且仅当它为主副本,或者是保持如下行为的其他副本:

  • 与Zookeeper的会话仍然活跃,即心跳间隔不超过6秒(时间可以配置)。
  • 向主副本发起拉取数据请求间隔不超过10秒(时间可以配置)。
  • 向主副本拉取的数据应当是10秒内产生的消息,也就是说不能拉取太老的消息。

如果一个副本不满足以上要求,那么它会变成out-of-sync状态直到它重新满足以上要求。

稍微有落后但仍然是in-sync状态的副本会拖慢生产者和消费者,因为它们需要等待所有in-sync状态的副本写入该消息。一旦某个副本变成out-of-sync状态,我们不需要等待它写入消息,这时候该副本不会造成任何性能影响,但同时由于in-sync状态副本减少,系统的冗余度也减少,数据丢失的可能性增大。

下面来看下实际中的具体应用及配置。

broker配置

Kafka中有三个broker的参数会影响数据可靠性。但就像其他的参数一样,这些参数可以在broker中设置(影响所有的主题),也可以在主题中设置(只影响该主题)。下面来看下这些参数。

复制因子(Replication Factor)

主题的复制因子配置参数为replication.factor;而对于自动创建的主题来说,broker中可以配置default.replication.factor来控制复制因子。

如果设置复制因子为N,那么我们能够容忍N-1个副本发生故障。因此更高的复制因子意味着更高的可用性以及数据可靠性;但另一方面,N个副本意味着需要存储N份相同的数据,需要N倍的存储空间。因此我们需要基于业务特点来决定复制因子。如果单个broker重启而导致主题不可用是可以接受的,那么复制因子为1就行。如果设置为2,那么我们可以容忍一个副本发生故障,但在此情况下系统只有1个副本运行,有丢失数据的风险。基于以上考虑,建议设置复制因子为3,对于大部分应用来说这种级别的容灾已经足够了,但特殊应用可能需要更高的冗余度,比如银行的关键数据存储会使用5个副本以防万一。

副本的位置也很重要。默认情况下,Kafka会将一个分区的不同副本放在不同的broker。但这仍然不够,如果所有的副本都在同一个机架上,那么无论复制因子为多少,该机架断电就会损失所有的数据。因此为了预防机架故障,在配置broker时建议配置broker.rack参数来指明机架信息,Kafka会利用此信息来将不同副本均衡到多个机架上。

有损主副本选举

unclean.leader.election.enable这个参数只能在broker级别设置,默认为true。这个参数有什么用?如前所述,如果分区的主副本不可用,那么in-sync副本中会选举出新的主副本,这样的选举是无损(clean)的,因为主副本切换没有导致数据丢失。但如果除了主副本外,没有in-sync副本存在呢?这种情况可能由于如下两种场景导致:

  • 有多个副本,除了主副本外其他副本均发生故障,但我们仍然往主副本中写入消息,这会导致其他副本同步落后。如果主副本不幸也发生故障,而在故障恢复中其他副本先恢复,那么现在集群中只有落后的副本可用。
  • 分区有多个副本,但由于网络原因,其他副本落后于主副本变成out-of-sync状态。假如主副本发生故障,集群中同样只有落后的副本可用。

对于这样的情况,我们需要从可用性与一致性之间作出选择:

  • 如果我们不允许落后副本成为新的主副本,那么集群是不可用的直到我们启动老的主副本。在实际情况中,这可能不能快速恢复,而且也不一定能恢复。
  • 如果我们允许落后副本成为新的主副本,那么丢失了部分已经提交的数据,而且导致消费者看到不一致的状态。

设置unclean.leader.election.enable为true,会允许落后副本成为新的主副本,这也是Kafka的默认设置。对于数据可靠性以及一致性非常重要的场景,用户可以设置为false。

最少in-sync副本数

对于最少in-sync副本数,主题与broker维度的参数都为min.insync.replicas。

上面可以看到,即便我们有多个副本,但最后可能只有一个副本(也就是主副本)是in-sync状态的。而且诡异的是,即便我们设置acks=all,也只有成功写入到一个副本中;也就是说,这里的all指的是所有in-sync状态副本,而不是所有副本。如果主副本故障,那么就面临上面说的问题,需要在一致性和可用性中做艰难取舍。

为了避免陷入这种情况,对于3个副本的分区来说,我们可以设置min.insync.replicas至少为2,这样可以保证数据至少写入了2份。如果系统中in-sync状态副本只剩下主副本,由于不满足min.insync.replicas,这时候生产者生产消息会失败,但消费者仍然可以读取之前的消息。这保证了一致性,牺牲了部分的可用性(但没有完全不可用)。

可靠的生产者

即使我们使用最可靠的方式来配置broker以保证数据不丢失,但如果使用生产者的方式不对,那么仍然可能会造成丢失。

下面从两个场景来说明这个问题:

  • 我们配置broker使用3个副本来进行冗余,并且禁止了有损主副本选举,这保证了一旦消息提交成功,那么消息将不会丢失。但是,假如我们配置生产者使用acks=1的确认方式,那么消息发送到主副本并且写入成功后,主副本返回结果,这时候生产者认为消息已经提交成功了。如果这时候主副本返回结果后就出现故障,并且没有把该消息同步到in-sync副本(in-sync副本允许有短暂的消息落后),那么in-sync副本会选举出新的主副本,而新的主副本并没有包含该消息。这样便出现生产者“认为”消息提交成功,但消息丢失的情况。
  • 我们配置broker使用3个副本来进行冗余,并且禁止了有损主副本选举,而且生产者使用acks=all的配置。假如在写入消息时,主副本发生故障(并进行新主副本选举),那么broker会返回“主副本不可用”的异常。如果我们程序没有处理这个错误并进行重试,那么消息将会丢失。这样的数据丢失并不是由于broker本身可靠性导致的,因为broker就没有成功收到这个消息。

因此对于可靠性非常重要的系统来说,在使用生产者写入消息时需要注意如下两点:

  • 使用正确的acks参数来满足业务的可靠性需要;
  • 正确处理写入消息的异常信息。

下面来对不同的acks取值进行深入的讨论。

消息确认

生产者可以选择三种acks取值:

  • acks=0:这种方式意味着,一旦生产者将消息通过网络传输成功,那么就认为消息已经写入到Kafka。假如这时候分区并不可用或者正在进行主副本选举,那么消息将会丢失,但生产者并不知晓。因为生产者在传输成功就进行后续处理了,没有关注Kafka的处理结果。当然,如果是消息序列化失败或者网卡发生故障,生产者还是能够得知这些错误的,因为这些错误发生在传输过程中。但这种方式也是性能最高的方式,虽然可能会导致消息丢失。
  • acks=1:这种方式意味着,生产者传输消息到主副本,并等待主副本写入磁盘(注意,可能只是写入到文件系统缓存,不一定刷新到磁盘),如果写入完成则返回成功,否则返回失败。如果发送消息时分区正在进行主副本选举,那么生产者会收到主副本不可用异常,在收到此异常信息后,生产者可以进行重试来避免消息丢失。但在主副本发生故障时,那些没有及时同步到其他副本的消息会丢失。
  • acks=all:这种方式意味着,生产者传输消息到主副本,并且在所有in-sync副本写入成功后主副本返回确认信息,否则返回错误。这个参数同时配合broker的min.insync.replica参数使用,能够实现“一旦返回成功,意味着最少写入N个副本”的语义,其中N为min.insync.replica的取值。如果生产者接收到错误,那么需要进行重试以防止消息丢失。这种方式也是最慢的处理方式,因为生产者需要等待所有in-sync副本写入成功。

下面对生产者遇到错误后的重试策略进行深入讨论。

配置生产者的重试机制

在使用生产者发送消息的时候,我们将会两类错误:

  • 可恢复错误:对于这种错误,生产者内部会进行重试。比如说broker返回主副本不可用异常,当生产者收到此异常后会进行重试。
  • 不可恢复错误:对于这种错误,即便生产者进行重试也不会成功,因此需要应用本身进行处理。比如说broker返回配置非法异常,当生产者收到此异常后进行重试也于事无补,需要应用自身进行处理。

如果你希望一条消息也不丢失,那么对于可恢复异常来说,可能会希望生产者一直重试直到成功。如果是因为主副本选举或者网络抖动而导致的异常,那么这种策略没什么问题。但如果网络一时半会恢复不了,我们也可以放弃重试并记录异常(比如记录日志、持久化到数据库等等),后续再进行处理。这取决于应用本身。注意到Kafka的跨机房复制工具(MirrorMaker)默认采取无限重试的策略,这是因为作为高可靠性的复制工具来说,它不应该丢失任何一条消息。

另外,生产者重试也可能会导致消息重复。假如消息发送到broker并且所有in-sync副本都写入成功,但在返回结果时网络发生故障,这时候生产者由于没收到回复认为消息没有发送成功,然后进行重试,这样便导致消息重复。异常处理和重试能够保证消息“最少一次”的语义,但无法保证“有且仅有一次”的语义(至少0.10.0版本Kafka如此)。应用本身如果需要实现“有且仅有一次”的语义,可以在消息中加入全局唯一标识符,这样在消费消息时可以进行去重。或者,应用生产幂等的消息,也就是说发送重复的消息没有影响,比如说“账户余额为110元”是幂等消息(因为发送多次也不会对账户造成影响),而“账户增加110元”则不是幂等消息(因为发送多次的结果并不一样)。

其他的错误处理

生产者的内部重试机制已经能解决大部分问题,但还有一些错误是需要应用进行处理的:

  • 不可恢复异常(比如消息大小非法、权限认证失败);
  • 消息发送到broker之前发生的异常(比如序列化异常);
  • 生产者达到重试上限的异常,或者由于消息重试导致消息堆积最终内存溢出的异常。

对于这些异常,我们可以记录日志,持久化到数据库或者简单的抛弃。但如果说我们的处理方式仍然为不断重试,那么建议把这样的重试策略下沉到生产者内部重试机制。

可靠的消费者

上面讨论了如何使得生产者更可靠,现在来看下消息消费端的可靠处理方式。之前说到,当消息变成已提交状态(也就是写入到所有in-sync副本)后,它才能被消费端读取。这保证了消费者读取到的数据始终是一致的,为了达到高可靠,消费者需要保证在消费消息时不丢失数据。

在处理分区消息时,消费者一般的处理流程为:拉取批量消息,处理完成后提交位移,然后再拉取下一批消息。提交位移保证了当前消费者发生故障或重启时,其他消费者可以接着上一次的消息位移来进行处理。需要提醒的是,消费端丢失消息的一个主要原因为:消费者拉取消息后还没处理完就提交位移,一旦在消息处理过程中发生故障,新的消费者会从已提交的位移接着处理,导致发生故障时的消息丢失。

下面来看下消费端处理流程中的一些需要注意的细节。

重要的可靠性配置

如果希望设计一个高可靠的消费者,那么消费者中有4个重要的属性需要慎重考虑。

第一个属性是group.id,这个属性在上文讨论过,大概的作用是:如果有多个消费者拥有相同的group.id并且订阅相同的主题,那么每个消费者会负责消费一部分的消息。如果消费组内存在多个消费者,那么一个消费者发生故障那么其他消费者可以接替其工作,保证高可用。

第二个属性是auto.offset.reset,这个属性在如下场景中起作用:当消费者读取消息,Kafka中没有提交的位移(比如消费者所属的消费组第一次启动)或者希望读取的位移不合法(比如消费组曾经长时间下线导致位移落后)时,消费者如何处理?当设置为earliest,消费者会从分区的起始端开始读取,这可能会导致消费者重复处理消息,但也将消息丢失可能性降低到最小;当设置为latest,消费者会从分区末端开始读取,这会导致消息丢失可能性加大,但会降低消息重复处理的概率。

第三个属性是enable.auto.commit,这个属性需要慎重考虑,那就是:你希望消费者定期自动提交位移,还是应用手动提交位移?自动提交位移可以让应用在处理消息时不用实现提交位移的逻辑,并且如果我们是在poll循环中使用相同的线程处理消息(poll循环详见这篇文章),那么自动提交位移可以保证在消息处理完成后才提交位移。如果我们在poll循环中使用另外的线程处理消息,那么自动提交位移可能会导致提交还没完成处理的消息位移。

第四个属性是auto.commit.interval.ms,它与第三个属性有关。如果选择了自动提交位移,那么这个属性控制提交位移的时间间隔。默认值是5秒,通常来说降低间隔可以降低消息重复处理的可能性。

手动提交位移

如果我们选择手动提交位移,下面来根据不同场景来讨论如何实现更可靠的消费者。

处理完消息后立即提交

如果在poll循环中进行消息处理,并且处理完后提交位移,那么提交位移的实现方式非常简单。对于这种场景,可以考虑使用自动提交而不是手动提交。

在处理消息过程中多次提交

消费者拉取批量消息后处理消息时,在处理过程中可以使用手动提交位移方式来多次更新位移。这种方式可以使得消息重复处理可能性降低。不过在这个场景中,如果不加以注意,那么可能会提交上一次拉取的最大位移而不是当前已经处理的消息位移。这上文-消费者 已经讨论过相应的处理方法,这里不再赘述。

重平衡

在设计应用时,我们需要记得正确处理重平衡。当重平衡发生时,消费者当前处理的分区可能会被回收,我们需要记得在回收前提交位移。

消费者的重试

在某些场景下,消费者拉取消息后进行处理时会遇到一些问题,可能希望这些消息可以延迟处理。比如,对于从Kafka拉取消息然后持久化到数据库的应用来说,如果某个时刻数据库不可用,我们可能希望延后重试。延后重试的策略可以分成如下两大类:

第一种处理方式是,我们提交已经处理成功的位移,然后将处理失败的消息存储到一个缓冲区,并不断进行重试处理这些消息。另外,在处理这些消息时可能poll循环仍然在继续,我们可以使用pause()方法来使得poll不会返回新的数据,这样使得重试更容易。

第二种处理方式是,我们把处理失败的消息写入到另外的主题,然后继续处理当前的消息。对于失败消息的主题,我们既可以使用同一个消费组进行处理,也可以使用不同的消费组进行处理。这种主题类似于其他消息系统使用的死信队列(dead-letter-queue)。

持久化状态

在某些场景下,我们可能需要在拉取消息时维护状态。比如,对于计算滑动平均数(moving average),我们每次拉取新消息时需要更新相应的平均数。当消费者重启时,我们不但需要从上一次提交的位移开始消费,同时还需要从相应的滑动平均数中恢复。一种处理方式是,我们提交位移时将滑动平均数写入到一个用于保存结果的主题,这样应用重启时可以获取上一次的处理结果。但由于Kafka不支持多操作的事务性,因此这种方式并不严谨。我们当然可以自己加以处理,但这个问题解决起来比较复杂,建议可以使用Kafka Streams这样的开源库。

消息处理时间长

Kafka不支持有且仅有一次的语义,但可以支持至少一次的语义。因此对于需要实现有且仅有一次语义的应用来说,我们需要自己额外处理。

一种常见的处理方式为,我们使用支持唯一键的外部系统(比如关系型数据库、Elasticsearch等)来进行结果去重。我们可以自己实现唯一键并且在消息中加入此属性,也可以根据消息的主题、分区以及位移信息来生成唯一键。另外,如果该外部系统支持事务,那么我们可以在一个事务中同时保存消息处理结果和位移。消费者重启时可以从该系统中获取位移,并且使用seek()方法来开始从相应的位移开始消费。

数据管道

当我们使用Kafka来构建数据管道的时候,通常有两种主要的场景:1)Kafka是数据的起点或终点,比如从Kafka传输数据到S3或者从MongoDB传输数据到Kafka;2)Kafka作为数据的中间缓冲区,比如构建Twitter到Elasticsearch的数据管道时,Twitter先把数据传输到Kafka,然后Kafka再将数据传输到Elasticsearch。

使用Kafka构建数据管道可以将数据的生产者和消费者进行解耦,并且能够保证高可靠以及高性能。另外在0.9版本,Kafka加入了Kafka Connect这个新的API,使得将Kafka集成到数据管道更加方便。

下面来看下数据管道的一些具体细节。

构建数据管道的考虑因素
时间线

在实际中,有一些系统的数据可能每天进行一次数据处理,有一些系统可能希望数据从产生到消费只有毫秒级延迟,而另外的系统则介于这两个极端之间。一个优秀的数据集成系统应当能满足不同场景的时间线要求,并且能够支持时间线的迁移(因为实际应用中需求是不断变化的)。Kafka具备这样的性质,既支持准实时的数据传输,也支持定时的批量数据传输,并且保证数据可靠存储以及水平扩展。在Kafka中,生产者可以根据需要来决定写入Kafka的时机,而一旦数据到达Kafka,消费者可以立即读取(其实消费者也可以定时批量读取,取决于使用场景)。

在这个场景中,Kafka充当数据的大缓冲区角色,并且解耦了生产者与消费者的时间敏感度要求:生产者可以实时产生数据而消费者定期消费数据,反之亦然。

可靠性

我们需要避免单点故障,并且在发生故障时能够快速的自动恢复。对于核心系统来说,即便是秒级的不可用也有可能造成巨大的损失,因此系统可用性极为重要。另外,数据传输可靠性也非常重要,一些系统能够容忍数据丢失,但更多情况下业务需要的是至少一次(at-least-once)的数据传输保证。至少一次意味着数据一旦生产成功,那么必定会到达终点,但有可能会出现数据重复出现的情况。在某些情况下,我们甚至需要有且仅有一次(exactly-once)的数据传输,这意味着数据一旦生产必须到达终点,而且不允许数据丢失或者重复。

Kafka本身能够提供至少一次的数据传输,而通过与外部系统(具备事务性质或者支持唯一键)结合使用能够保证数据有且仅有一次的语义。值得一提的是,Kafka Connect这个API让外部系统与Kafka结合更为方便,使得实现端到端的有且仅有一次的语义更简单。

高吞吐

数据管道一般需要支持高吞吐,而且更为重要的是在流量激增的情况下仍然能正常运行。通过使用Kafka,我们可以将生产者与消费者的处理能力进行解耦。如果某个时刻生产者的生产速度远超于消费者的消费速度,那么数据会存放在Kafka中直至消费,也就是说Kafka具备流量削峰的特性。另外,我们可以通过增加消费者或者生产者来分别提高两端的处理能力。

总的来说,Kafka是一个高吞吐的分布式系统,在集群情况下每秒处理百兆级别的数据并不是什么难事,我们也不需要担心在数据量增长的情况下系统不能横向扩展。另外,Kafka Connect使得数据处理不仅可以横向扩展,并且可以并行化,后面我们会深入讨论这一点。

数据格式

构建数据管道的一个重要考虑因素是不同数据格式的支持程度。在实际应用中,我们的数据库或者其他存储系统的存储格式通常是多种多样的,比如说可能源数据格式是XML或者关系型的,存储到Kafka中是Avro类型的,最后可能需要转换成JSON格式以便写入Elasticsearch。

Kafka能够满足不同的数据类型要求,在前面系列文章中,我们讨论过生产者和消费者如何使用不同的序列化/反序列化来支持多种数据格式。另外,Kafka Connect的内存数据具有自己的数据类型,但后面我们会进一步看到,我们可以通过增加可插拔的转换器来支持不同的数据格式。

有一点需要注意的是,数据源与数据终点的数据格式通常具有自己的数据结构(Schema),当数据源的数据结构改变时,我们可能需要同时更新数据终点的数据结构。一个经典的例子为,当我们构建MySQL到Hive的数据管道时,如果MySQL增加了一列,那么当我们写入新数据到Hive时需要保证新的列也以某种形式添加到Hive中。

在支持不同数据格式之外,一个通用的数据集成框架应当能支持数据源与数据终点的不同特性。比如,Syslog是一个主动推送数据的数据源,而关系型数据库则要求我们主动拉取它的数据;HDFS只支持数据追加,而其他系统则允许追加和更新。

数据转换

构建数据管道时我们有如下两种数据转换方案:

  • ELT(Extract-Transform-Load):这种方案意味着数据管道负责做数据转换,这样做的好处是可以节省目标系统的转换时间和存储空间。但这种方案也有一个缺点,那就是数据管道的转换与下游的依赖需要时刻保持同步。比如,如果我们构建MongoDB到MySQL的数据管道,并且在数据管道中进行数据过滤并且移除某些域,那么MySQL中只能看到部分数据;如果后续我们需要访问这些缺失的数据域,那么数据管道需要重建并且重新处理历史数据。
  • ELT(Extract-Load-Transform):这种方案意味着数据管道做最少的转换(大部分情况下只是转换数据格式),终点的数据与源数据基本一样,这样做的好处是目标系统拥有极大的处理灵活性(因为能看到几乎原始的数据),并且由于数据处理与过滤只在目标系统上进行,减轻追溯问题的复杂程度。这种方案的缺点是目标系统会消耗较多的存储空间,并且的转换也会消耗CPU资源。
安全性

对于数据管道来说,安全性包含如下几个方面:

  • 经过数据管道的数据是加密的吗?这个问题在跨数据中心时尤其突出。
  • 谁允许对数据管道进行修改?
  • 如果数据管道需要从访问受限的地方读取或写入数据,它是否能正确的进行身份验证?

Kafka支持对数据传输进行加密,以及支持身份验证(通过SASL)和授权。授权能够保证包含隐私数据的主题在未经授权的情况下不能被读取。另外,Kafka还提供授权与非授权的访问记录,并且能够跟踪主题中的事件来源以及谁进行了何种修改。

错误处理

认为数据始终是正确的是一件很危险的事情,我们需要提前考虑错误处理。例如,是否能阻止错误的记录进入管道?是否能从分析失败的记录恢复数据?错误记录是否能被修复以及重新处理?如果不良事件被当做正常事件处理了,但过了几天才发现,这会这么样?

由于Kafka能够在一段时间内保存所有事件,因此在需要的情况下我们可以回溯并且进行错误恢复。

耦合与敏捷

数据管道的一个重要作用就是将数据源与目标系统进行解耦,但在某些情况下如果不加以注意便会发生耦合:

  • 专门定制管道:有一些公司会针对不同的应用专门定制管道,比如使用Logstash转储日志到Elasticsearch,使用Flume转储日志到HDFS,使用GoldenGate从Oracle获取数据并写入HDFS,等等…这样做会将数据管道与特定的终端耦合在一起,并且意味着每一个新系统都需要搭建新的数据管道。
  • 结构元数据缺失:如果数据管道不包含结构元数据而且不允许结构变化,那么其实我们已经将产生数据的源系统与消费数据的目标系统耦合在一起。假如数据从Oracle数据库流向HDFS,DBA在数据库中添加了一列,在数据管道不包含结构元数据而且不允许结构变化的情况下,目标系统要么处理数据失败,要么需要升级应用代码。因此,数据管道应该能支持结构变化,每个独立的团队都可以根据需要来在合适的时刻修改应用逻辑。
  • 过度处理:前面已经提到,一些数据处理会在数据管道中进行,毕竟数据管道负责把数据转移到不同的系统。但如果数据管道进行了过度的处理(比如数据清洗、数据聚合),那么会导致下游使用数据的系统与数据管道耦合在一起。最好的处理方式应该为,数据管道尽可能保留元数据的属性,只是做简单的格式转换,允许下游系统来决定他们需要什么样的数据。

跨集群数据镜像

在之前系列文章中,我们讨论了一个Kafka集群的搭建、维护和使用,而在实际情况中我们往往拥有多个Kafka集群,而且这些Kafka集群很可能是相互隔离的。一般来说,这些集群之间不需要进行数据交流,但如果在某些情况下这些集群之间存在数据依赖,那么我们可能需要持续的将数据从一个集群复制到另一个集群。而由于“复制”这个术语已经被用于描述同一集群内的副本冗余,因此我们将跨集群的数据复制称为数据镜像(Mirroring)。另外,Kafka中内置的跨集群数据复制器称为MirrorMaker。

跨集群数据镜像的用户场景

以下为跨集群数据镜像的一些典型用户场景:

  • 区域集群与中心集群:很多公司往往有多个数据中心,而且每个数据中心维护独立的Kafka集群。一般的应用可能只需要跟本地集群通信即可,但存在一些应用需要所有集群的数据。比如,一个公司在每个城市都有一个数据中心,并且该中心维护相应城市的产品供需数据以及价格数据;这些数据需要汇总到一个中心集群以便进行公司维度的营利分析。
  • 数据冗余:为了防止一个集群故障导致应用不可用,我们需要把数据同步到另一个集群,这样当一个集群出现故障,可以把应用的流量切换到备份集群。
  • 云迁移:一般公司都维护有自己的数据中心,但随着云设施越来越便宜,很多公司会选择将服务迁移到云上。数据迁移与复制也是其中一个重要部分,我们可以使用Kafka Connect将数据库更新同步到本地Kafka集群,然后再把数据从本地Kafka集群同步到云上的Kafka集群。
多集群架构

上面列举了多集群的用户场景,现在来看下多集群的常见架构。但在讨论架构前,先来了解跨集群通信的一些现实因素。

跨集群通信的现实因素
  • 高延迟:由于集群间的距离较长以及网络拓扑节点增多,集群的通信延迟也会增加。
  • 带宽有限:广域网(WAN)带宽通常比机房内带宽要小得多,并且可用带宽可能无时无刻都在变化。
  • 高成本:无论是自己维护的集群还是云上的集群,集群间通信的成本都是非常高的。这是因为带宽有限并且增加带宽会带来昂贵的成本,而且服务提供商对于跨集群、跨区域、跨云的数据传输会额外收取费用。

Kafka的broker和生产者/消费者客户端都是基于一个集群来进行性能调优的,也就是说在低延迟和高吞吐的假设前提下,经过测试与验证从而得到了Kafka的超时和缓冲区默认值。因此,一般我们不推荐同一个集群的不同broker处于多个数据中心。大多数情况下,由于高延迟和网络错误,最好避免生产数据到另一个集群。当然,我们可以通过提高重试次数、增加缓冲区大小等手段来处理这些问题。

这么看,broker跨集群、生产者-broker跨集群这两种方案都被否决了,那么对于跨集群数据镜像,我们只剩下一种方案:broker-消费者跨集群。这种方案是最安全的,因为即便存在网络分区导致消费者不能消费数据,这些数据仍然保留在broker中,当网络恢复后消费者仍然可以读取。也就是说,无论网络状况如何,都不会造成数据丢失。另外,如果存在多个应用需要读取另一个集群的数据,我们可以在每个数据中心都搭建一个Kafka集群,使用集群数据镜像来只同步一次数据,然后应用从本地集群中消费数据,避免重复读取数据浪费广域网带宽。

下面是跨集群架构设计的一些准则:

  • 每个数据中心都应该至少有一个Kafka集群;
  • 集群间尽可能只同步一次数据;
  • 跨集群消费数据由于跨集群生产数据。
中心集群架构

下面是多个本地集群和一个中心集群的架构:
在这里插入图片描述
简单情况下只存在两个集群,即主集群和副本集群:
在这里插入图片描述
这种架构适用于,数据分布在多个数据中心而某些应用需要访问整体数据集。另外每个数据中心的应用可以处理本地数据,但无法访问全量数据。这种架构的主要优点在于,数据生产到本地,而且跨集群只复制一次数据(到中心集群)。只依赖本地数据的应用可以部署在本地集群,而依赖多数据中心的应用则部署在中心集群。这种架构也非常简单,因为数据流向是单向的,这使得部署、运维和监控非常容易。

它的主要缺点在于,区域的集群不能访问另一个集群的数据。比如,我们在每个城市维护一个Kafka集群来保存银行的用户信息和账户历史,并且将这些数据同步到中心集群以便做银行的商业分析。当用户访问本地的银行分支网站时,这些请求可以被分发到本地集群处理;但如果用户访问异地的银行分支网站时,要么该异地集群跟中心集群通信(此种方式不建议),要么直接拒绝请求(是的非常尴尬)。

多活架构

这种架构适用于多个集群共享数据,如下所示:
在这里插入图片描述
此架构主要优点在于,每个集群都可以处理用户的任何请求并且不阉割产品功能(与前一种架构对比),而且就近处理用户请求,响应时间可以大大降低。其次,由于数据冗余与弹性控制,一个集群出现故障,可以把用户请求导流到别的集群进行处理。

此架构主要缺点在于,由于多个集群都可以处理用户请求,异步的数据读取和更新无法保证全局数据一致性。下面列举一些可能会遇到的挑战:

  • 如果用户发送一个事件到一个集群,然后从其他集群读取事件信息,那么由于事件复制延迟,很有可能读取不到该事件。比如,用户添加一本书到心愿单后,访问心愿单却看不到添加的书。为了解决这个问题,研发人员可能会将用户与集群进行绑定,使用同一个集群来处理用户请求(当然在集群故障情况下会转移)。
  • 一个集群包含用户订购A的事件,另一个集群包含用户订购B的事件,而且这两个事件是几乎同时的。经过数据镜像后,每个数据中心都有这两个事件,而这两个事件可能是冲突的。我们需要决定哪个事件才是目前正确的最终事件么?如果需要,那么我们得制定规则来使得多个集群的应用都能得出相同的结论。或者我们可以认为这两个事件都是正确的,认为用户同时订购了A和B。亚马逊以前采取这种方式来处理冲突,但像证券交易这种机构不能采取这种方式。这个问题的解决方案是因地制宜的,我们需要知道的是一旦采取这种架构,冲突是无法避免的。

如果我们找到多集群异步读写的数据一致性问题,那么这种架构是最好的,因为它是可扩展的、弹性的,并且相对于冷热互备来说性价比也不错。

多活架构的另一个挑战是,如果存在多个数据中心,那么每一对中心都需要通信链路。也就是说,如果有5个数据中心,那么总共需要部署20个镜像进程来处理数据复制;如果考虑高可用,那么可能需要40个。

另外,我们需要避免事件被循环复制和处理。对于这个问题,我们可以将一个逻辑概念的主题拆分成多个物理主题,并且一个物理主题与一个数据中心对应。比如,users这个逻辑主题可以拆分成SF.users和NYC.users这两个物理主题,每个主题对应一个数据中心;NYC的镜像进程从SF的SF.users读取数据到本地,SF的镜像进程从NYC的NYC.users读取数据到本地。因此每个事件都只会被复制一次,而且每个数据中心都包含SF.users和NYC.users主题,并且包含全量的users数据。消费者如果需要获取全量的users数据,那么需要消费所有本地.users主题的数据。

需要提醒的是,Kafka正在计划添加记录头部,允许我们添加标记信息。我们在生产消息时可以加上数据中心的标记,这样也可以避免循环数据复制。当然,我们也可以自己在消息体中增加标记信息进行过滤,但缺点是当前的镜像工具并不支持,我们得自己开发复制逻辑。

冷热互备架构

有时候,多集群是为了防止单点故障。比如说,我们可能有两个集群,其中集群A处于服务状态,另一个集群B通过数据镜像来接收集群A所有的事件,当集群A不可用时,B可以启动服务。在这种场景中,集群B包含了数据的冷备份。架构如下所示:
在这里插入图片描述
这种架构的优点在于搭建简单并且适用于多种场景。我们只需搭建第二个集群,设置一个镜像进程来将源集群的所有事件同步到该集群即可,并且不用担心发生数据冲突。缺点在于,我们浪费了一个集群资源,因为集群故障通常很少发生。一些公司会选择搭建低配的备份集群,但这样会存在一个风险,那就是无法保证出现紧急情况时该备份集群是否能支撑所有服务;另一些公司则选择适当利用备份集群,那就是把一些读取操作转移到备份集群。

集群故障转移也具有一些挑战性。但无论我们选择何种故障转移方案,SRE团队都需要进行日常的故障演练。因为,即便今天故障转移是有效的,在进行系统升级之后很可能失效了。一个季度进行一次故障转移演练是最低限度,强大的SRE团队会演练更频繁,Netflix著名的Chaos Monkey玩的更溜,它会随机制造故障,也就是说故障每天都可能发生。

下面来看下故障转移比较具有挑战性的地方。

数据损失与不一致

很多Kafka的数据镜像解决方案都是异步的,也就是说备份集群不会包含主集群最新的消息。在一个高并发的系统中,备份集群可能落后主集群几百甚至上千条消息。假如集群每秒处理100万条消息,备份集群与主集群之间有5ms的落后,那么在理想情况下备份集群也落后将近5000条消息。因此,我们需要对故障转移时的数据丢失做好准备。当然在故障演练时,我们停止主集群之后,可以等待数据镜像进程接收完剩余的消息,再进行故障转移,避免数据丢失。另外,Kafka不支持事务,如果多个主题的数据存在关联性,那么在数据丢失的情况下可能会导致不一致,因此应用需要注意处理这种情况。

故障转移的开始消费位移

在故障转移中,其中一个挑战就是如何决定应用在备份集群的开始消费位移。下面来讨论几个可选的方案。

  • 自动位移重置:Kafka消费者可以配置没有已提交位移时的行为,要么从每个分区的起始端消费,要么从每个分区的最末端消费。如果我们的消费者提交位移到Zookeeper,而且没有对Zookeeper中的位移数据进行镜像备份,那么我们需要从这两个选项中做出选择。选择从起始端开始消费的话,可能会存在大量重复的消息;选择从最末端消费的话,可能会存在消息丢失。如果这两种情况可以忍受的话,那么建议选择这种方案,因为这种方案非常简单。
  • 复制位移主题:如果我们使用0.9或者更高版本的Kafka消费者,消费者会提交位移到一个特殊的主题,_consumer_offsets。如果我们复制这个主题到备份集群,那么备份集群的消费者可以从已提交的位移处开始消费。这种方案也很简单,但是有一些情况需要注意。首先,主集群和备份集群的消息位移不能保证是一样的。举个例子,我们在主集群中只保留3天的数据,在主题创建并且使用了一个星期之后,我们开始进行备份集群的数据镜像;在这个场景中,主集群的最新消息位移可能到达57000000,而备份集群的最新消息位移是0,并且由于主集群中老的数据已经被过期删除了,备份集群的消息位移跟主集群始终是不一样的。其次,即便我们在创建主题就进行数据镜像,由于生产者失败重试,仍然会导致不同集群的消息位移是不同的。最后,即便主集群和备份集群的消息位移完全一致,由于主集群和备份集群存在一定的消息落后并且Kafka不支持事务,消费者提交的消息位移可能在相应消息之前或之后到达。因此,在故障转移时消费者可能根据位移找不到匹配的消息,或者位移落后于主集群。总的来说,如果备份集群的提交位移比主集群的提交位移更老,或者由于重试导致备份集群的消息比主集群的消息多,那么会存在一定的数据重复消费;如果备份集群的提交位移没有匹配到相应的消息,那么我们可能仍然需要从主题起始端或者最末端进行消费。因此,这种方案能够减少数据重复消费或者数据丢失,但也不能完全避免。
  • 基于时间的故障转移:如果我们使用0.10.0或者更高版本的Kafka消费者,每条消息都会包含发送到Kafka的时间戳。而且,0.10.1.0或者更高版本的broker会建立一个索引,并且提供一个根据时间戳来查询位移的API。因此,假如我们知道故障在某个时间发生,比如说为早上4:05,那么我们可以让备份集群的消费者从早上4:03处开始消费数据,虽然这样会有两分钟的数据重复消费,但至少数据没有丢失。这个方案的唯一问题是,我们怎么告诉备份集群的消费者从特定时间点开始消费呢?一个解决思路是,我们在应用代码中支持指定开始消费的时间,然后使用API来获取该时间对应的位移,然后从该位移处开始消费处理。但如果应用代码没有支持这种功能,我们可以自己写一个小工具,该工具接收一个时间戳,然后使用API来获取所有主题分区的位移,最后提交这些位移,这样备份集群的消费组在启动时会自动获取位移,然后进行消费处理。这种方案是最优的。
  • 外部位移映射:在上面讨论复制位移主题的时候,曾提到一个最大的挑战是主集群和备份集群的消息位移不一致。基于这个问题,一些公司选择开发自己的数据镜像工具,并且使用外部存储系统来存储集群间的消息位移映射。比如,主集群中位移为495的消息对应于备份集群中位移为500的消息,那么在外部存储系统中记录(495,500),这样在故障转移时我们可以基于主集群的已提交位移和映射来得到备份集群中的提交位移。但这种方案没有解决位移比消息提前到达备份集群的问题。这种方案比较复杂,升级集群然后使用基于时间的故障转移可能更便捷。
故障转移之后

假如故障顺利转移到备份集群,并且备份集群正常工作,那么原主集群应该怎么处理呢?可能需要将其转化为备份集群。你可能会想,能不能简单修改数据镜像工具,让其换个同步方向,从新的主集群同步数据到老的主集群?这样会导致两个问题:

  • 我们如何得知从什么地方开始进行数据镜像呢?这个问题跟故障转移时消费者不知道消费位移的问题是一样的,而且解决方案也会存在消息重复或者丢失的问题。
  • 如前所述,老的主集群可能会包含备份集群没有同步的数据更新,如果只是简单的将新主集群的数据同步回来,那么这两个集群又会发生不一致的情况。

因此,最简单的解决方案是,清除老主集群的所有状态和数据,然后重新与新主集群进行数据镜像,这样可以保证这两个集群的状态是一致的。

其他事项

故障转移还有一个需要注意的地方是,应用如何切换与备份集群进行通信?如果我们在代码中直接硬编码主集群的broker,那么故障转移比较麻烦。因此,很多公司会创建一个DNS名称来解析到主集群的broker,当故障转移时将DNS解析到备份集群的broker。由于Kafka客户端只需要成功连接到集群的一个broker便可通过该broker发现整个集群,因此我们创建3个左右的DNS解析到broker即可。

延伸集群

延伸集群主要用来防止单个数据中心故障导致Kafka服务不可用,其解决方案为:将一个Kafka集群分布在多个数据中心。因此延伸集群与其他集群方案有本质的区别,它就是一个Kafka集群。在这种方案中,我们不需要数据镜像来同步,因为Kafka本身就有复制机制,并且是同步复制的。在生产者发送消息时,我们可以通过配置分区机架信息、min.isr、acks=all来使得数据写入到至少两个数据中心副本后,才返回成功。

这种方案的优点是,多个数据中心的数据是实时同步的,而且不存在资源浪费问题。由于集群跨数据中心,为了得到最好的服务性能,数据中心间需要搭建高质量的通信设施以便得到低延迟和高吞吐,部分公司可能无法提供。

另外需要注意的是,一般需要3个数据中心,因为Kafka依赖的Zookeeper需要奇数的节点来保证服务可用性,只要有超过一半的节点存活,服务即可用。如果我们只有两个数据中心,那么肯定其中一个数据中心拥有多数的Zookeeper节点,那么该数据中心发生故障的话服务便不可用;如果拥有三个数据中心并且Zookeeper节点均匀分布,那么其中一个数据中心发生故障,服务仍然可用。

MirrorMaker

Kafka内置了一个用于集群间做数据镜像的简单工具–MirrorMaker,它的核心是一个包含若干个消费者的消费组,该消费组从指定的主题中读取数据,然后使用生产者把这些消息推送到另一个集群。每个消费者负责一部分主题和分区,而生产者则只需要一个,被这些消费者共享;每隔60秒消费者会通知生产者发送消息数据,然后等待另一个集群的Kafka接收写入这些数据;最后这些消费者提交已写入消息的位移。MirrorMaker保证数据不丢失,而且在发生故障时不超过60秒的数据重复。内部架构如下所示:
在这里插入图片描述

如何配置

首先,MirrorMaker依赖消费者和生产者,因此消费者和生产者的配置属性对MirrorMaker也适用。另外,MirrorMaker也有自身的属性需要配置。先来看一个配置的代码样例:

bin/kafka-mirror-maker --consumer.config etc/kafka/consumer.properties --producer.config etc/kafka/producer.properties --new.consumer --num.streams=2 --whitelist ".\*"
  • consumer.config:这个配置文件指定了所有消费者的属性,其中bootstrap.servers属性指定了源集群,group.id指定了所有消费者的使用的消费组ID。另外,auto.commit.enable=false这个配置最好不要更改,因为MirrorMaker根据消息是否写入目标集群来决定是否提交位移,修改此属性可能会造成数据丢失。auto.offset.reset这个属性默认为latest,也就是说创建MirrorMaker时会从该时间点开始数据镜像,如果需要对历史数据进行数据镜像,可以设置成earliest。
  • producer.config:这个配置文件指定了MirrorMaker中生产者的属性,其中bootstrap.servers属性指定了目标写入集群。
  • new.consumer:MirrorMaker可以使用0.8版本或者新的0.9版本消费者,建议使用0.9版本消费者。
  • num.streams:指定消费者的数量。
  • whitelist:使用正则表达式来指定需要数据镜像的主题。上面的例子中指定对所有的主题进行数据镜像。
在生产环境中部署MirrorMaker

上面的例子展示了如何使用命令行启动MirrorMaker,当在生产环境中部署MirrorMaker时,你可能会使用nohub和输出重定向来将使得它在后台运行,不过MirrorMaker已经包含-daemon参数来指定后台运行模式。很多公司都有自己的部署运维系统,比如Ansible,Puppet,Chef,Salt等等。一个更为高级的部署方案是使用Docker来运行MirrorMaker,而且越来越流行。MirrorMaker本身是无状态的,不需要任何磁盘存储,并且这种方案可以使一台机器运行多个MirrorMaker(也就是说运行多个Docker)。对于一个MirrorMaker来说,它的吞吐瓶颈在于只有一个生产者,因此使用多个MirrorMaker可以提高吞吐,而使用Docker部署多个MirrorMaker尤其方便。另外,Docker也可以支持业务洪峰低谷的弹性伸缩。

如果允许的话,建议将MirrorMaker部署在目标集群内,这是因为如果一旦发生网络分区,消费者与源集群断开连接比生产者与目标集群断开连接要安全。如果消费者断开连接,那么只是当前读取不到数据,但是数据仍然在源集群内,并不会丢失;而生产者断开连接,MirrorMaker便生产不了数据,如果MirrorMaker本身处理不当,可能会丢失数据。

但对于在集群间需要加密传输数据的场景来说,将MirrorMaker部署在源集群也是个可以考虑的方案。这是因为在Kafka中使用SSL进行加密传输时,消费者相比生产者来说性能受影响更大。因此我们可以在源集群内部broker到MirrorMaker的消费者间不使用SSL加密,而在MirrorMaker跨集群生产数据时使用SSL加密,这样可以将SSL的性能影响降到最低。另外,尽量配置acks=all和足够的重试次数来降低数据丢失的风险,而且如果MirrorMaker一旦发送消息失败最好让其暂时退出,避免丢失数据。

为了降低目标集群和源集群的消息延迟,建议将MirrorMaker部署在两台不同的机器上并且使用相同的消费组,这样一台发生故障另外一台仍然可以保证服务正常。

kafka工作流程分析
  1. 生产过程分析
    1. 写入方式
      producer采用推(push)模式将消息发布到broker,每条消息都被追加(append)到分区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka吞吐率)。
    2. 分区(Partition)
      消息发送时都被发送到一个 topic,其本质就是一个目录,而topic 是由一些 Partition Logs(分区日志)组成;每个 Partition 中的消息都是有序的,生产的消息被不断追加到 Partitionlog 上,其中的每一个消息都被赋予了一个唯一的offset 值。(保证同一个分区内的数据有序)
      1. 分区原因:
        1)方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个
        topic又可以有多个Partition 组成,因此整个集群就可以适应任意大小的数据了;
        2)可以提高并发,因为可以以Partition 为单位读写了。

      2. 分区原则:
        1)指定了 patition,则直接使用;
        2)未指定 patition 但指定key,通过对 key 的 value 进行 hash 出一个patition;
        3)patition 和 key 都未指定,使用轮询选出一个patition。

    3. 副本
      同 一 个 partition 可 能 会 有 多 个 replication ( 对 应 server.properties 配 置 中 的
      default.replication.factor=N)。没有 replication 的情况下,一旦 broker 宕机,其上所有 patition 的数据都不可被消费,同时producer也不能再将数据存于其上的 patition。引入 replication 之后,同一个 partition 可能会有多个 replication,而这时需要在这些 replication 之间选出一个 leader,producer 和 consumer 只与这个 leader 交互,其它 replication 作为 follower 从 leader 中复制数据。
    4. 写入流程
      1. producer 先从 zookeeper的 "/brokers/…/state"节点找到该 partition 的 leader
      2. producer 将消息发送给该 leader
      3. leader 将消息写入本地 log
      4. followers 从 leader pull 消息,写入本地log 后向 leader 发送 ACK
      5. leader 收到所有 ISR 中的 replication 的 ACK 后,增加 HW(high watermark,最后commit的offset)并向producer 发送ACK
        在这里插入图片描述
    5. Broker 保存消息
      1. 存储方式
        物理上把 topic 分成一个或多个 patition(对应server.properties中的 num.partitions=3 配置),每个 patition物理上对应一个文件夹(该文件夹存储该 patition 的所有消息和索引文件),如下:
      2. 存储策略
        无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:
        1)基于时间:log.retention.hours=168
        2)基于大小:log.retention.bytes=1073741824
        需要注意的是,因为 Kafka 读取特定消息的时间复杂度为 O(1),即与文件大小无关, 所以这里删除过期文件与提高 Kafka 性能无关。
zookeeper存储结构

在这里插入图片描述
注意:producer 不在 zk中注册,消费者在 zk 中注册。

  1. Kafka 消费过程分析
    kafka 提供了两套 consumerAPI:高级 Consumer API 和低级 ConsumerAPI。
    1. 高级 API
      1)高级 API优点
      高级API 写起来简单不需要自行去管理 offset,系统通过 zookeeper 自行管理。
      不需要管理分区,副本等情况,.系统自动管理。
      消费者断线会自动根据上一次记录在zookeeper 中的offset 去接着获取数据(默认设置1 分钟更新一下 zookeeper 中存的 offset)
      可以使用 group 来区分对同一个topic 的不同程序访问分离开来(不同的 group 记录不同的 offset,这样不同程序读取同一个topic 才不会因为 offset 互相影响)
      2)高级 API 缺点
      不能自行控制 offset(对于某些特殊需求来说)
      不能细化控制如分区、副本、zk 等
    2. 低级 API
      1)低级 API优点:
      能够让开发者自己控制 offset,想从哪里读取就从哪里读取。自行控制连接分区,对分区自定义进行负载均衡对 zookeeper 的依赖性降低(如:offset 不一定非要靠 zk 存储,自行存储offset 即可, 比如存在文件或者内存中)
      2)低级 API 缺点:
      太过复杂,需要自行控制 offset,连接哪个分区,找到分区leader 等。
    3. 消费者组
      在这里插入图片描述
      消费者是以 consumer group 消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个 topic。每个分区在同一时间只能由 group 中的一个消费者读取,但是多个 group 可以同时消费这个partition。在图中,有一个由三个消费者组成的 group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者。
      在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的 group 成员会自动负载均衡读取之前失败的消费者读取的分区。
    4. 消费方式
      consumer 采用 pull(拉)模式从 broker 中读取数据。
      push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由
      broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息, 典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。
      对于 Kafka 而言,pull 模式更合适,它可简化broker 的设计,consumer 可自主控制消费消息的速率,同时 consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
      pull模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直等待数据到达。为了避免这种情况,我们在我们的拉请求中有参数,允许消费者请求在等待数据到达的“长轮询”中进行阻塞(并且可选地等待到给定的字节数,以确保大的传输大小)。
    5. 消费者组案例
      需求:测试同一个消费者组中的消费者,同一时刻只能有一个消费者消费。
      案例实操:
      在hadoop102、hadoop103 上修改/opt/module/kafka/config/consumer.properties
      配置文件中的 group.id 属性为任意组名。
[root@hadoop103 config]$ vi consumer.properties group.id=01

在 hadoop102、hadoop103 上分别启动消费者

[root@hadoop102 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first --consumer.config config/consumer.properties
[root@hadoop103 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first --consumer.config config/consumer.properties

在 hadoop104 上启动生产者

[root@hadoop104 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
>hello world

查看hadoop102 和hadoop103 的接收者。
同一时刻只有一个消费者接收到消息。

再谈partition和Consumer Group

每个partition在同一时间只能被同一个consumer group中的一个consumer消费,但多个consumer group可以同时消费这个partition

鉴于此,实现多个消费者交叉消费同一个topic中不同的数据,只需要partition的数量大于等于消费者组中的consumer数量以实现下图效果:
在这里插入图片描述

SpringBoot整合kafka流入MySQL
  1. 核心代码:
package com.qwx.service;

import com.alibaba.fastjson.JSON;
import com.qwx.dao.StepInfoMapper;
import com.qwx.entity.StepInfo;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Service
public class IndicatorService {

    private Logger LOG = LoggerFactory.getLogger(IndicatorService.class);

    private final KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 注入KafkaTemplate
     * @param kafkaTemplate kafka模版类
     */
    @Autowired
    public IndicatorService(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @Autowired
    private StepInfoMapper mapper;


    @KafkaListener(topics = "message", groupId = "test")
    public void processMessage(ConsumerRecord<Integer, String> record) {
        LOG.info("kafka processMessage start");
        LOG.info("processMessage, topic = {}, msg = {}", record.topic(), record.value());

        // do something ...
        StepInfo  stepInfo = JSON.parseObject(record.value(), StepInfo.class);
        System.out.println(stepInfo);
        mapper.addStepInfo(stepInfo);
        LOG.info("kafka processMessage end");
    }



    public void sendMessage(String topic, StepInfo data) {
        LOG.info("kafka sendMessage start");
        String dataJson = JSON.toJSONString(data);
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic,dataJson);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable ex) {
                LOG.error("kafka sendMessage error, ex = {}, topic = {}, data = {}", ex, topic, data);
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOG.info("kafka sendMessage success topic = {}, data = {}",topic, data);
            }
        });
        LOG.info("kafka sendMessage end");
    }
}
  1. pom文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.qwx</groupId>
    <artifactId>springboot-kafka</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.0.1</version>
        </dependency>

        <!-- 热部署 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
            <scope>true</scope>
        </dependency>

        <!-- alibaba的druid数据库连接池 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.10</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.18</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.7</version>
        </dependency>

    </dependencies>
</project>
  1. application.yml
server:
  port: 8888

spring:
  kafka:
    bootstrap-servers: hadoop000:9092
    producer:
      retries: 0
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        linger.ms: 1

    consumer:
      enable-auto-commit: false
      auto-commit-interval: 100ms
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        session.timeout.ms: 15000
  application:
    name: spring-boot-mybatis-druid
  datasource:
    druid:
      url: jdbc:mysql://localhost:3306/dyip?serverTimezone=GMT%2B8
      username: root
      password: xuan
      # 配置初始化大小(默认0)、最小、最大(默认8)
      initial-size: 1
      min-idle: 1
      max-active: 10
      # 配置获取连接等待超时的时间
      max-wait: 3000
      # 是否缓存preparedStatement,也就是PSCache。PSCache对支持游标的数据库性能提升巨大。 默认为false
      pool-prepared-statements: true
      # 要启用PSCache,必须配置大于0,当大于0时,poolPreparedStatements自动触发修改为true。
      max-open-prepared-statements: 20
      # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
      time-between-eviction-runs-millis: 60000


      # 用来检测连接是否有效的sql,要求是一个查询语句,常用select 'x'。
      # 如果validationQuery为null,testOnBorrow、testOnReturn、testWhileIdle都不会起作用。
      validation-query: SELECT 1
      # 申请连接时执行validationQuery检测连接是否有效 默认为true
      test-on-borrow: true
      # 归还连接时执行validationQuery检测连接是否有效 默认为false
      test-on-return: false
      # 申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。
      test-while-idle: true
mybatis:
  mapper-locations: classpath:mapper/*.xml

Broker:消息中间件处理节点;每个Kafka服务节点称之为一个Broker,一个Kafka集群由一个或多个Broker组成
Topic:一类特定数据集合的统称;可类比DB中Table的概念;逻辑概念
Producer:消息的生产者,向Broker发送消息的客户端
Consumer:消息的消费者,向Broker读取消息的客户端
Consumer Group:每一个Consumer隶属于一个特定的Consumer
Group,一条消息可以被不同Group中的Consumer消费,但同一Group内的消息只能被一个Consumer消费
Partition:是对Topic中所包含数据集的物理分区;物理概念
Replication:副本集;是Kafka高可用的一种保障机制

  • 完整原理描述:

一个topic 可以配置几个partition,produce发送的消息分发到不同的partition中,consumer接受数据的时候是按照group来接受,kafka确保每个partition只能同一个group中的同一个consumer消费,如果想要重复消费,那么需要其他的组来消费。
Zookeerper中保存着每个topic下的每个partition在每个group中消费的offset。
新版kafka把这个offsert保存到了一个__consumer_offsert的topic下。这个__consumer_offsert 有50个分区,通过将group的id哈希值%50的值来确定要保存到那一个分区. 这样也是为了考虑到zookeeper不擅长大量读写的原因。
所以,如果要一个group用几个consumer来同时读取的话,需要多线程来读取,一个线程相当于一个consumer实例。当consumer的数量大于分区的数量的时候,有的consumer线程会读取不到数据。
假设一个topic test 被groupA消费了,现在启动另外一个新的groupB来消费test,默认test-groupB的offset不是0,而是没有新建立,除非当test有数据的时候,groupB会收到该数据,该条数据也是第一条数据,groupB的offset也是刚初始化的ofsert, 除非用显式的用–from-beginnging 来获取从0开始数据。

  • 查看topic-group的offsert
    位置:zookeeper
    路径:[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics/__consumer_offsets/partitions
    在zookeeper的topic中有一个特殊的topic __consumer_offserts
    计算方法:(放入哪个partitions)
    int hashCode = Math.abs(“ttt”.hashCode());
    int partition = hashCode % 50;
    先计算group的hashCode,再除以分区数(50),可以得到partition的值
    使用命令查看: kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter “kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter”

  • 参数:
    auto.offset.reset:默认值为largest,代表最新的消息,smallest代表从最早的消息开始读取,当consumer刚开始创建的时候没有offset这种情况,如果设置了largest,则为当收到最新的一条消息的时候开始记录offsert,若设置为smalert,那么会从头开始读partition

Topic:

Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。若创建topic1和topic2两个topic,且分别有13个和19个分区,则整个集群上会相应会生成共32个文件夹(本文所用集群共8个节点,此处topic1和topic2 replication-factor均为1),如下图所示:
在这里插入图片描述
对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),
因此Kafka提供两种策略删除旧数据。一是基于时间,二是基于Partition文件大小。
例如可以通过配置$KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据,也可在Partition文件超过1GB时删除旧数据,配置如下所示:
在这里插入图片描述
这里要注意,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高Kafka性能无关。选择怎样的删除策略只与磁盘以及具体的需求有关。另外,Kafka会为每一个Consumer Group保留一些metadata信息——当前消费的消息的position,也即offset。这个offset由Consumer控制。正常情况下Consumer会在消费完一条消息后递增该offset。当然,Consumer也可将offset设成一个较小的值,重新消费一些消息。因为offet由Consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些消费过,也不需要通过broker去保证同一个Consumer Group只有一个Consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。

producer:

Producer发送消息到broker时,会根据Paritition机制选择将其存储到哪一个Partition。如果Partition机制设置合理,所有消息可以均匀分布到不同的Partition里,这样就实现了负载均衡。如果一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不同的消息可以并行写入不同broker的不同Partition里,极大的提高了吞吐率。可以在$KAFKA_HOME/config/server.properties中通过配置项num.partitions来指定新建Topic的默认Partition数量,也可在创建Topic时通过参数指定,同时也可以在Topic创建之后通过Kafka提供的工具修改。

在发送一条消息时,可以指定这条消息的key,Producer根据这个key和Partition机制来判断应该将这条消息发送到哪个Parition。Paritition机制可以通过指定Producer的paritition. class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。本例中如果key可以被解析为整数则将对应的整数与Partition总数取余,该消息会被发送到该数对应的Partition。(每个Parition都会有个序号,序号从0开始)

import kafka.producer.Partitioner;import kafka.utils.VerifiableProperties; 
public class JasonPartitioner<T> implements Partitioner {     
	public JasonPartitioner(VerifiableProperties verifiableProperties) {}     
	@Override    
	public int partition(Object key, int numPartitions) {        
		try {            
		  int partitionNum = Integer.parseInt((String) key);            
		  return Math.abs(Integer.parseInt((String) key) % numPartitions);        
		} catch (Exception e) {            
		  return Math.abs(key.hashCode() % numPartitions);        
		}    
	}
}

如果将上例中的类作为partition.class,并通过如下代码发送20条消息(key分别为0,1,2,3)至topic3(包含4个Partition)。

public void sendMessage() throws InterruptedException{  
	for(int i = 1; i <= 5; i++){        
	  List messageList = new ArrayList<KeyedMessage<String, String>>();        
	  for(int j = 0; j < 4; j++{            
	  messageList.add(new KeyedMessage<String, String>("topic2", j+"", "The " + i + " message for key " + j));        
	  }        
	producer.send(messageList);    
	}  
	producer.close();
}

则key相同的消息会被发送并存储到同一个partition里,而且key的序号正好和Partition序号相同。(Partition序号从0开始,本例中的key也从0开始)。下图所示是通过Java程序调用Consumer后打印出的消息列表。

在这里插入图片描述

consumer group (本节所有描述都是基于Consumer hight level API而非low level API)。

使用Consumer high level API时,同一Topic的一条消息只能被同一个Consumer Group内的一个Consumer消费,但多个Consumer Group可同时消费这一消息。

这是Kafka用来实现一个Topic消息的广播(发给所有的Consumer)和单播(发给某一个Consumer)的手段。一个Topic可以对应多个Consumer Group。如果需要实现广播,只要每个Consumer有一个独立的Group就可以了。要实现单播只要所有的Consumer在同一个Group里。用Consumer Group还可以将Consumer进行自由的分组而不需要多次发送消息到不同的Topic。
实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的Consumer属于不同的Consumer Group即可。

下面这个例子更清晰地展示了Kafka Consumer Group的特性:
首先创建一个Topic (名为topic1,包含3个Partition),然后创建一个属于group1的Consumer实例,并创建三个属于group2的Consumer实例,最后通过Producer向topic1发送key分别为1,2,3的消息。结果发现属于group1的Consumer收到了所有的这三条消息,同时group2中的3个Consumer分别收到了key为1,2,3的消息。

Partition & Replication
Partition(分区):

Partition是作用于具体的Topic而已的,而不是一个独立的概念。Partition能水平扩展客户端的读写性能,是高吞吐量的 保障。通俗的将,Partition就是一块保存具体数据的空间,本质就是磁盘上存放数据的文件夹,所以Partition是不能跨Broker存在,也不能在同一个Broker上跨磁盘。对于一个Topic,可以根据需要设定Partition的个数;Kafka默认的Partition个数num.partitions为1($KAFKA_HOME/config/server.properties),表示该Topic的所有数据均写入至一个文件夹下;用户也可以在新建Topic的时候通过显示的指定–partitions 参数实现自定义Partition个数。在数据持久化时,每条消息都是根据一定的分区规则路由到对应的Partition中,并append在log文件的尾部(这一点类似于HDFS);在同一个Partition中消息是顺序写入的且始终保持有序性;但是不同Partition之间不能保证消息的有序性(高吞吐量的保障)。

Kafka也支持动态增加一个已存在Topic的Partition个数,但不支持动态减少Partition个数。因为被减少Partition所对应的数据处理是个难题;由于Kafka的数据写模式的限制,所以如果要把这些Partition的历史数据集追加到有效的Partition的尾部,就会破坏了Kafka在Partition上消息的有序性,显然是不合理的;但如果按照时间戳重新构分区的数据文件,可操作性和难度都将是非常大的,所以目前并不支持动态减少Partition个数。

Partition是用来存储数据的,但并不是最小的数据存储单元。Partition下还可以细分成Segment,每个Partition是由一个或多个Segment组成。每个Segment分别对应两个文件:一个是以.index结尾的索引文件,另一个是以.log结尾的数据文件,且两个文件的文件名完全相同。所有的Segment均存在于所属Partition的目录下。

Segment的必要性:如果以partition作为数据存储的最小单元,那么partition将会是一个很大的数据文件,且数据量是持续递增的;当进行过期数据清理或消费指定offset数据时,操作如此的大文件将会是一个很严重的性能问题。

Replication(副本集)

Replication是Kafka架构中一个比较重要的概念,是系统高可用的一种保障。Replication逻辑上是作用于Topic的,但实际上是体现在每一个Partition上。例如:有一个Topic,分区(partitions)数为3(分别为a, b, c),副本因子(replication-factor)数也为3;其本质就是该Topic一共有3个a分区,3个b分区,3个c分区。这样的设计在某种意义上就很大程度的提高了系统的容错率。接着上述的例子想另外一个问题:一个Topic下a分区一共有三个,既然是副本集,那这三个所包含的数据都完全一样吗?作用都一样吗?说到这就不得不引出两个概念:

  • Leader Replica

概念:每一个Partition有且只有一个Replica可以作为Leader
职责:负责处理所有Producer、Consumer的请求;与此同时,Leader还负责监管和维护ISR(In-Sync Replicas:副本同步队列)中所有follower的滞后状态。

  • Follower Replica

概念:每个Partition中除了Leader以外的所有Replica均为follower
职责:不处理任何来自客户端的请求;只通过Fetch Request拉取leader replica的数据进行同步

Tips: leader partition(主分区) & leader replica(主副本集):其实这两个概念是一回事;因为副本集策略只是一种机制,是为了提高可用性而生的。这种策略就是作用于partition上的,通俗的说增加副本集个数其实就是增加同一个partition的备份个数;同样的对于主分区而言,就是同一个partition下所有备份中的主副本集。

注意:同一个topic下的不同partition之间是没有主次之分,都是同等重要且存储不同数据的。

命名规则 & 数据存储
Partition:

当新建一个topic,并指定partition个数后,会在log.dirs参($KAFKA_HOME/config/server.properties)所指定的目录下创建对应的分区目录,用来存储落到该分区上的数据。分区目录的命名格式为:topic名称 + 短横线 + 分区序号;序号默认从0开始,最大为分区数 - 1。

为了尽可能的提升服务的可用性和容错率,Kafka遵循如下的分区分配原则:

  • 所有的replica要尽可能的平均分配到集群中的每一台broker上
  • 尽可能保证同一个partition的leader和follower分在不同的broker上
  • 如果集群跨机架,尽可能的保证每个partition的replica分配到不同的机架上

Eg:集群中有四个节点,均在统一机架上,新建一个topic:demoTopic,指定分区个数为4,副本因子为3;则对应的partition目录分别为:demoTopic-0、demoTopic-1、demoTopic-2、demoTopic-3;具体如下图所示:
在这里插入图片描述
因为集群未跨机架,所以在这里主要验证一下前两条分区分配原则:四个主分区分别位于四个不同的broker上,且另外两个replica也随机分配到除leader所在节点以外的其他三个broker上;具体的分区分布图如下所示:
在这里插入图片描述

Segment

在Kafka0.10.1.0以前,每个Partition全局的第一个Segment文件名均是从0开始,后续每个Segment的文件名为上一个Segment文件中最后一条消息的offset值;数据的大小为64位,20位数字字符的长度,未用到的用0填充。同一个Segment的.index文件和.log文件的文件名完全相同;所以初始化每个Partition下的Segment的文件名如下所示:
在这里插入图片描述
这种命名格式的好处在于可以有效的规避单文件数据量过大导致的操作难问题,不仅如此,还可以方便、快速的定位数据。例如:要实现从指定offset处开始读取数据,只需要根据给定的offset值与对应Partition下的segment文件名所比对,就可以快速的定位目标数据所在的segment文件,然后根据目标segment的.index文件查找给定offset值所对应的实际磁盘偏移量,即可快速在.log中读取目标数据。

在Kafka 0.10.1.0以后,对于每个Segment文件,在原有的.index和.log文件的基础上,新增加一个.timeindex文件,通过该索引文件 可以实现基于时间戳操作消息的功能.

Kafka中所说的Offset本质上是一个逻辑值,代表的是目标数据对应在Partition上的偏移量;而数据在磁盘上的实际偏移量是存储在对应Segment的.index文件中。

数据同步

通过简单介绍replica之间的offset的变化和更新逻辑,来初步了解Kafka的数据同步机制。首先引入几个概念:

  • Offset相关概念:
    LEO(LogEndOffset):表示每个Partition中log最后一条message的位置
    HW(HighWatermark):表示Consumer能够看到该Partition的位置
  • Replica相关概念:
    SR(In-Sync Replicas):副本同步列表【包含Leader和Follower】
    OSR(Outof-Sync Replicas):由于同步落后而被剔除的副本列表
    AR(Assigned Replicas):所有副本集;AR = ISR + OSR

清楚LEO、HW和ISR之间的相互关系是了解Kafka底层数据同步的关键:Kafka取Partition所对应的ISR中最小的LEO作为整个Partition的HW;每个Partition都会有自己独立的HW,与此同时leader和follower都会负责维护和更新自己的HW。对于leader新写入的消息,Consumer不能立刻被发现并进行消费,leader会等待该消息被ISR中所有的replica同步更新HW后,此时leader才会更新该partition的HW为之前新写入消息的offset,此时该消息对外才可见。
LEO和HW的转化逻辑如下图所示:
在这里插入图片描述

可用性 & 一致性

在分布式架构中,服务的可用性和数据的一致性是一个绕不开的话题,Kafka也不例外。如上文所说:当leader接受到一条消息后,需要等待ISR中所有的replica都同步复制完成以后,该消息才能被消费。如果在同步的过程中,ISR中如果有follower replica的同步落后延迟超过了阈值,则会被leader从ISR中剔除;只要ISR中所有的replica均同步成功,则该消息就一定不会丢失。从数据的角度出发,这种方式很契合一致性的需求,但是当集群的节点数较多,ISR队里的副本数变大时,每条消息的同步时长可能并不是所有业务场景所能容忍的,所以Kafka在Producer阶段通过request.required.acks参数提供了不同类型的应答机制以方便用户在系统吞吐量和一致性之间进行权衡:

  • 1(Default):表示Producer在ISR中的Leader成功接收到消息后并确认后,则代表该消息以成功写入
  • 0:表示Producer将消息发送到Broker中后无需等待Broker的确认;即就是:只管发消息,不关注消息是否被成功接收
  • -1(all):表示Producer需要等待ISR中所有的Replica都确认收到消息才算写入成功;如果ISR中只剩下Leader,则等通过request.required.acks=1的效果

在老版本的Kafka(0.11.0.0以前)中,存在一个潜在的数据一致性问题:假如一个Partition有两个Replica,A(Leader)中包含的数据为a, b, c, d, e,LEO为5;B(Follower)包含的数据为a, b, c,LEO为3;此时该Partition的HW为3,Consumer可见的消息为a, b, c,系统对外表示正常;当follower还未来得及同步消息d、e时,leader挂了,此时B变成Leader,并且Producer重新发了两条消息f和g;因为此时系统中只有B一个存活,所以Partition对外的HW这会更新为5没有问题,Consumer可见的内容为a, b, c, f, g;此时A被唤醒并作为Follower开始从Leader中拉取数据,因为follower自身的HW等于Leader的HW,所以B没有拉去到任何数据,当Producer继续发送消息时,就会导致副本A、B的数据集不一致。这个问题在0.11.0.0中通过leader epoch机制来消除该问题。可以把epoch理解为代(版本)的概念,即每一次的leader对应一个唯一的epoch,如果leader更换,则对应的epoch值也会随之更换,而过期的epoch请求则都会被忽略。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐