Kafka — 全面解析Kafka以及消息传输流程(一)
1、Kafka概述1.1 Kafka 总体架构Kafka是分布式、分区的、多副本的、多订阅者,高吞吐率,支持水平扩展,基于zookeeper协调的分布式消息系统。常见用于web/nginx日志、访问日志,消息服务等。主要应用场景是:日志收集系统和消息系统。一个典型的 kafka 集群包含若干 Producer,若干个 Broker(kafka )、若干个 Consumer Group,以及一个 z
1、Kafka概述
1.1 Kafka 总体架构
Kafka是分布式、分区的、多副本的、多订阅者,高吞吐率,支持水平扩展,基于zookeeper协调的分布式消息系统。常见用于web/nginx日志、访问日志,消息服务等。主要应用场景是:日志收集系统和消息系统。
一个典型的 kafka 集群包含若干 Producer,若干个 Broker(kafka )、若干个 Consumer Group,以及一个 zookeeper 集群。kafka 通过 zookeeper 管理集群配置及服务协同。Producer 使用 push 模式将消息发布到 broker,consumer 通过监听使用 pull 模式从 broker 订阅并消费消息。
和其他 mq 中间件不同的是,producer 发送消息到 broker 的过程是 push,而 consumer 从 broker 消费消息的过程是 pull,consumer主动去拉数据,而不是 broker 把数据主动发送给 consumer。
1.2 概念
-
Topic
kafka中没有queue的概念,只有topic,topic 是逻辑存储概念,是一系列消息的集合。每条发送到 kafka 集群的消息都会属于一个topic。物理上来说,不同的 topic 的消息是分开存储的,每个 topic 可以有多个生产者向它发送消息,也可以有多个消费者去消费其中的消息。 -
partition
一个topic可以有多个partition。topic理解为表,而partition为分区,同mysql的分区概念。同一 topic 下的不同partition中包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个 offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka 通过 offset保证消息在分区内的顺序,offset 的顺序不跨分区,即 kafka只保证在同一个分区内的消息是有序的。
设置多个partition后,这些partition会接近均匀的分布在kafka各个节点之上。每一条消息发送到 broker 时,会根据 partition 的规则选择存储到哪一个 partition。如果 partition 规则设置合理,那么所有的消息会均匀的分布在不同的partition中。 -
Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker; -
Consumer Group
逻辑上的订阅者,每个Consumer都从属于一个特定的Consumer Group,消息的单播和多播都是基于消费组来实现的,消费组中的消费者不是越多越好,消费者数量超过分区数量时,回导致消费者分配不到资源,造成资源浪费。
1.Kafka 支持多个 “消费者组” 读取同一个主题的消息
2.“消费者” 归属于 “消费者组”,同一个 “消费者组” 内,每个 “消费者” 仅接收主题的一部分分区的消息(消费者与分区的映射,称为消费者对分区的 “所有权关系”)
3.“消费者组” 增加 “消费者” 数量,能够实现消费能力的水平扩展,但 “消费者” 数量上限是分区的主题数量 -
Offset
每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息。 -
Message
消息是Kafka中最基本的数据单元,主要由key和value构成;真正有效的消息是value数据,key只作为消息路由分区使用,kafka根据key决定将当前消息存储在哪个分区。
1.3 消息处理发送与分区存储
从kafka1.0以后默认异步发送,将消息放入后台队列中,然后由单独线程去从队列中取出消息然后发送。
消息分区路由
消息要保存到分区中,分区选择算法为:
- 如果在发消息的时候指定了分区,则消息投递到指定的分区
- 如果没有指定分区,但是消息的key不为空,则基于key的哈希值来选择一个分区
- 如果既没有指定分区,且消息的key也是空,则用轮询的方式选择一个分区
消息存储
kafka消息全部持久化到磁盘,其使用日志文件的方式来保存。Partition 以文件的形式存储在文件系统中,partition命名规则为:
<topic_name>-<partition_id>
消息消费
同一时刻,一条消息只能被group中的一个消费者实例消费,一个topic下的每个partition只从属于group中的一个消费者,不可能出现group中的两个消费者消费同一个分区。
为了提高消费端的消费能力,一般会通过多个consumer 去消费同一个 topic ,如果 分区为1、2、3,消费者也为1、2、3,那么会给每个消费者分配且仅分配一个分区来消费。如果消费者还有个4,那么4会空闲,因为每个分区都有了一个消费者。如果消费者为1、2,那么其中有一个消费者消费两个分区,另一个消费一个分区。
所以消费者的数量不要大于分区,否则会造成资源浪费。消费者的数量最好控制为分区的数量,这样能为消费者合理分配分区。
Kafka消息在分区内有序,消费者消费消息时也要按照分区内消息顺序进行消费,有序消费就要保证消息是由消费者主动拉取的(pull),其次还要保证一个分区只能由一个消费者负责。kafka消费者自己可以控制读取消息的offset,如果两个消费者负责同一个分区,就有可能C1读到2,C1还没处理完,C2已经读到3了,因为这就相当于多线程读取同一个消息,造成消息处理的重复,且不能保证消息的顺序,这就跟主动推送(push)无异。
消费者的offset是保存在zookeeper中的,通过kafka监控工具可以看到。新建的group没有offset,这时候就可以选择none或latest、earlist策略。如果是earlist,新建group时,会从最早的一条消息开始取,即使这些消息可能被其他group已经消费过了,又因为kafka消息是全部持久化到磁盘,所以还能拿到。
多分区的优点
kafka使用分区将topic的消息打散到多个分区分布保存在不同的broker上,实现了producer和consumer消息处理的高吞吐量。Kafka的producer和consumer都可以多线程地并行操作,而每个线程处理的是一个分区的数据。因此分区实际上是调优Kafka并行度的最小单元。对于producer而言,它实际上是用多个线程并发地向不同分区所在的broker发起Socket连接同时给这些分区发送消息;而consumer,同一个消费组内的所有consumer线程都被指定topic的某一个分区进行消费。
- 分区不是越多越好
- 客户端/服务器端需要使用的内存就越多
Kafka0.8.2之后,在客户端producer有个参数batch.size,默认是16KB。它会为每个分区缓存消息,一旦满了就打包将消息批量发出。看上去这是个能够提升性能的设计。不过很显然,因为这个参数是分区级别的,如果分区数越多,这部分缓存所需的内存占用也会更多。假设你有10000个分区,按照默认设置,这部分缓存需要占用约157MB的内存。而consumer端呢?我们抛开获取数据所需的内存不说,只说线程的开销。如果还是假设有10000个分区,同时consumer线程数要匹配分区数(大部分情况下是最佳的消费吞吐量配置)的话,那么在consumer client就要创建10000个线程,也需要创建大约10000个Socket去获取分区数据。这里面的线程切换的开销本身已经不容小觑了。
服务器端的开销也不小,如果阅读Kafka源码的话可以发现,服务器端的很多组件都在内存中维护了分区级别的缓存,比如controller,FetcherManager等,因此分区数越多,这种缓存的成本就越大。 - 文件句柄的开销
每个分区在底层文件系统都有属于自己的一个目录。该目录下通常会有两个文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager会为每个broker都保存这两个文件句柄(file handler)。很明显,如果分区数越多,所需要保持打开状态的文件句柄数也就越多,最终可能会突破你的ulimit -n的限制。 - 降低高可用性
Kafka通过副本(replica)机制来保证高可用。具体做法就是为每个分区保存若干个副本(replica_factor指定副本数)。每个副本保存在不同的broker上。其中的一个副本充当leader 副本,负责处理producer和consumer请求。其他副本充当follower角色,由Kafka controller负责保证与leader的同步。如果leader所在的broker挂掉了,controller会检测到然后在zookeeper的帮助下重选出新的leader——这中间会有短暂的不可用时间窗口,虽然大部分情况下可能只是几毫秒级别。但如果你有10000个分区,10个broker,也就是说平均每个broker上有1000个分区。此时这个broker挂掉了,那么zookeeper和controller需要立即对这1000个分区进行leader选举。比起很少的分区leader选举而言,这必然要花更长的时间,并且通常不是线性累加的。如果这个broker还同时是controller情况就更糟了。
- 如何确定分区数量
可以遵循一定的步骤来尝试确定分区数:创建一个只有1个分区的topic,然后测试这个topic的producer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位可以是MB/s。然后假设总的目标吞吐量是Tt,那么分区数 = Tt / max(Tp, Tc)
说明:Tp表示producer的吞吐量。测试producer通常是很容易的,因为它的逻辑非常简单,就是直接发送消息到Kafka就好了。Tc表示consumer的吞吐量。测试Tc通常与应用的关系更大, 因为Tc的值取决于你拿到消息之后执行什么操作,因此Tc的测试通常也要麻烦一些。
Consumer个数与分区数的关系
topic下的一个分区只能被同一个consumer group下的一个consumer线程来消费,但反之并不成立,即一个consumer线程可以消费多个分区的数据,比如Kafka提供的ConsoleConsumer,默认就只是一个线程来消费所有分区的数据。
即分区数决定了同组消费者个数的上限。如果你的分区数是N,那么最好线程数也保持为N,这样通常能够达到最大的吞吐量。超过N的配置只是浪费系统资源,因为多出的线程不会被分配到任何分区。
Consumer消费Partition的分配策略
Kafka提供的两种分配策略: range和roundrobin,由参数partition.assignment.strategy指定,默认是range策略。当以下事件发生时,Kafka 将会进行一次分区分配:
- 一个 Consumer Group 内新增消费者
- 消费者离开当前所属的Consumer Group,包括shuts down 或 crashes
- 订阅的主题新增分区
将分区的所有权从一个消费者移到另一个消费者称为重新平衡(rebalance),如何rebalance就涉及到本文提到的分区分配策略。
1.4 Rebalance
kafka consuemr 的 rebalance 机制规定了一个 consumer group 下的所有 consumer 如何达成一致来分配订阅 topic的每个分区。
当出现以下几种情况时,kafka 会进行一次rebalance分区分配操作:
- 同一个 consumer group 内新增了消费者
- 消费者离开当前所属的 consumer group,比如主动停机或者宕机
- topic 新增了分区(也就是分区数量发生了变化)
1.5 零拷贝
“零拷贝”可以去掉没必要的数据复制操作,同时也会减少上下文切换次数。Linux 中通过 sendfile 系统调用来完成。Java 提供了访问这个系统调用的方法:
FileChannel.transferTo API
这样只需要一次拷贝,操作系统将数据直接从页缓存发送到网络上。
1.6 消息的文件存储机制
producer 不断发送消息,会引起 partition 文件的无限扩张,因此 kafka 通过分段的方式将 Log 分为多个 LogSegment,LogSegment 是逻辑上的概念,一个 LogSegment 对应磁盘上的日志文件和一个索引文件,其中日志文件用来记录消息。索引文件是用来保存消息的索引。
segment file 由两部分组成,index file 和 data file,.index 和 .timeindex 文件为索引文件,.data 文件为数据文件。
segment 文件命名规则:partion 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值进行递增。
为了提高查找消息的性能,为每一个日志文件添加 2 个索引文件:OffsetIndex 和 TimeIndex,分别对应 .index 以及 .timeindex 文件, TimeIndex 索引文件格式是映射时间戳和相对 offset,index 中存储了索引以及物理偏移量。
例如查找 offset 为 7 的message:
- 首先用二分查找确定它是在哪个 LogSegment 中,自然是在第一个 Segment 中;
- 打开这个 Segment 的index文件,二分查找找到 offset 小于或者等于指定offset的索引条目中最大的那个offset,为图中 offset 为 6 的索引,通过索引文件知道 offset 为 6 的 Message 的position为1407,即在数据文件中的位置为1407;
- 打开数据文件,从位置为1407处开始顺序扫描直到找到 offset 为 7 的那条Message。
即 [6, 1407] 在 log 文件中,对应的是第 6 条记录,其物理偏移量(position)为 1407。得到 position 后,再到对应的 log 文件中,从 position 为1407 处开始顺序查找 offset 对应的消息,将每条消息的 offset 与目标 offset 进行比较,直到找到消息。
这套机制是建立在offset是有序的。索引文件被映射到内存中,所以查找的速度还是很快的。
1.7 日志的清除策略以及压缩策略
日志清除策略
- 根据消息的保留时间,当消息在 kafka 中保存的时间超过了指定的时间,就会触发清理过程
- 根据 topic 存储的数据大小,当 topic 所占的日志文件大小大于一定的阀值,则可以开始删除最旧的消息。
kafka 会启动一个后台线程,定期检查是否存在可以删除的消息,通过 log.retention.bytes 和 log.retention.hours 这两个参数来设置,当其中任意一个达到要求,都会执行删除。
日志压缩策略
Kafka 还提供了“日志压缩(Log Compaction)”功能,通过这个功能可以有效的减少日志文件的大小,缓解磁盘紧张的情况,在很多实际场景中,消息的 key 和 value 的值之间的对应关系是不断变化的,就像数据库中的数据会不断被修改一样,消费者只关心 key 对应的最新的 value。因此,我们可以开启 kafka 的日志压缩功能,服务端会在后台启动启动Cleaner 线程池,定期将相同的 key 进行合并,只保留最新的 value 值。
1.8 消息去重
生产者发送数据给 Leader,Leader 同步数据给 ISR 中的 Follower,同步到一半 Leader 宕机,此时选出新的 Leader,它可能具有部分此次提交的数据,而生产者收到发送失败响应后将重发数据,新的 Leader 接受数据则数据重复。因此 Kafka 只支持“At Most Once”和“At Least Once”,而不支持“Exactly Once”,消息去重需在具体的业务中实现。
- At most once:消息可能会丢,但绝不会重复传输
- At least once:消息绝不会丢,但可能会重复传输
- Exactly once:每条消息肯定会被传输一次且仅传输一次
Kafka 为了保证高可靠性,Producer 生产的消息有可能重复,那么,一旦生产的消息重复了,Consumer 端就会出现消息的重复消费,这就需要“去重”处理。去重的方式有两种:
- Broker 去重 — 服务端去重
Kafka 文档中提及 GUID(Globally Unique Identifier)的概念。通过客户端生成算法得到每个消息的 unique id,同时可映射至 Broker 上存储的地址,即通过 GUID 便可查询提取消息内容,也便于发送方的幂等性保证,需要在 Broker 上提供此去重处理模块,目前版本尚不支持。 - Consumer 去重—客户端去重
针对 GUID,如果从客户端的角度去重,那么需要引入缓存系统。Consumer 每 Pull 一条消息便去缓存中查询 GUID 是否存在,如果存在则为重复消息。但是,这种策略必然会增加系统的复杂度,同时,缓存的大小难以界定。不只是 Kafka,RabbitMQ 、RocketMQ 也只保障“At least once”,而无法凭借自身机制进行消息去重。因此,在实际应用中,用户须根据具体的业务场景和特点进行去重。
2、Kafka数据可靠性与持久性
谈及可靠性,最常规、最有效的策略就是 “副本(replication)机制” ,Kafka 实现高可靠性同样采用了该策略。通过调节副本相关参数,可使 Kafka 在性能和可靠性之间取得平衡。
2.1 Kafka文件存储机制
Kafka 中消息是以 topic 进行分类的,生产者通过 topic 向 Kafka broker 发送消息,消费者通过 topic 读取数据。然而 topic 在物理层面又能以 partition 为分组,一个 topic 可以分成若干个 partition。事实上,partition 并不是最终的存储粒度,partition 还可以细分为 segment,一个 partition 物理上由多个 segment 组成。
在分布式系统中,为了提高可靠性,最常用、也最有效的策略是“副本机制”,Kafka 也不例外。Kafka 为每个 Partition 维护一个AR(Assigned Replicas)列表,由 ISR(In-Sync Replicas,与 Leader 数据同步的 Replica)和 OSR(Outof-Sync Replicas,与 Leader 数据不同步的 Replica)组成。初始状态下,所有的 Replica 都在 ISR 中,但在 Kafka 工作的过程中,由于各种问题(网络、磁盘、内存)可能导致部分 Replica 的同步速度慢于参数“replica.lag.time.max.ms”指定的阈值,一旦出现这种情况,这部分 Replica 会被移出 ISR,降级至 OSR 中。
2.2 复制原理和同步方式
每个 partition 都由一些列有序的、不可变的消息组成,这些消息被连续的追加到 partition 中。
如果 leader 所在的 broker 发生故障或宕机,对应 partition 将因无 leader 而不能处理客户端请求,这时副本的作用就体现出来了:一个新 leader 将从 follower 中被选举出来并继续处理客户端的请求。
leader 将负责维护和跟踪一个 ISR(In-Sync Replicas)列表,即同步副本队列,这个列表里面的副本与 leader 保持同步,状态一致。如果新的 leader 从 ISR 列表中的副本中选出,那么就可以保证新 leader 为优选。
2.3 同步副本 ISR
HW 俗称高水位,HighWatermark 的缩写,取一个 partition 对应的 ISR 中最小的 LEO 作为 HW,consumer 最多只能消费到 HW 所在的位置。
当 producer 生产消息至 broker 后,ISR 以及 HW 和 LEO 的流转过程:
2.4 数据可靠性和持久性保证
可以通过 request.required.acks 参数来设置数据可靠性的级别:
-
request.required.acks = 1
这是默认情况,即:producer 发送数据到 leader,leader 写本地日志成功,返回客户端成功;此时 ISR 中的其它副本还没有来得及拉取该消息,如果此时 leader 宕机了,那么此次发送的消息就会丢失。 -
request.required.acks = 0
producer 不停向leader发送数据,而不需要 leader 反馈成功消息,这种情况下数据传输效率最高,但是数据可靠性确是最低的。可能在发送过程中丢失数据,可能在 leader 宕机时丢失数据。 -
request.required.acks = -1(all)
producer 发送数据给 leader,leader 收到数据后要等到 ISR 列表中的所有副本都同步数据完成后(强一致性),才向生产者返回成功消息,如果一直收不到成功消息,则认为发送数据失败会自动重发数据。这是可靠性最高的方案,当然,性能也会受到一定影响。
如果要提高数据的可靠性,在设置 request.required.acks=-1 的同时,还需参数 min.insync.replicas 配合,如此才能发挥最大的功效。min.insync.replicas 这个参数用于设定 ISR 中的最小副本数,默认值为1,当且仅当 request.required.acks 参数设置为-1时,此参数才生效。
2.5 深入解读 HW 机制
acks=-1,部分 ISR 副本完成同步,此时leader挂掉,如下图所示:follower1 同步了消息 4、5,follower2 同步了消息 4,与此同时 follower2 被选举为 leader,那么此时 follower1 中的多出的消息 5 该做如何处理呢?
一个 partition 中的 ISR 列表中,leader 的 HW 是所有 ISR 列表里副本中最小的那个的 LEO。类似于木桶原理,水位取决于最低那块短板。
如果 Leader 宕机,Follower1 被选为新的 Leader,而新的 Leader(原 Follower1 )并没有完全同步之前 Leader 的所有数据(少了一个消息 6),之后,新 Leader 又继续接受了新的数据,此时,原本宕机的 Leader 经修复后重新上线,它将发现新 Leader 中的数据和自己持有的数据不一致,怎么办呢?
为了保证一致性,必须有一方妥协,显然旧的 Leader 优先级较低,因此, 它会将自己的数据截断到宕机之前的 HW 位置(HW 之前的数据,与 Leader 一定是相同的),然后同步新 Leader 的数据。这便是所谓的 “截断机制”。
2.6 Leader 选举
Kafka 在 ZooKeeper 中为每一个 partition 动态的维护了一个 ISR,这个 ISR 里的所有 replica 都与 leader 保持同步,只有 ISR 里的成员才能有被选为 leader 的可能(通过参数配置:unclean.leader.election.enable=false)。在这种模式下,对于 f+1 个副本,一个 Kafka topic 能在保证不丢失已经 commit 消息的前提下容忍 f 个副本的失败,在大多数使用场景下,这种模式是十分有利的。事实上,对于任意一条消息,只有它被 ISR 中的所有 follower 都从 leader 复制过去才会被认为已提交,并返回信息给 producer,从而保证可靠性。但与 “少数服从多数” 策略不同的是,Kafka ISR 列表中副本的数量不需要超过副本总数的一半,即不需要满足 “多数派” 原则,通常,ISR 列表副本数大于等于 2 即可,如此,便在可靠性和吞吐量方面取得平衡。
当 ISR 中至少有一个 follower 时(ISR 包括 leader),Kafka 可以确保已经 commit 的消息不丢失,但如果某一个 partition 的所有 replica 都挂了,自然就无法保证数据不丢失了。这种情况下如何进行 leader 选举呢?通常有两种方案:
- 等待 ISR 中任意一个 replica 恢复过来,并且选它作为 leader;
- 选择第一个恢复过来的 replica(并不一定是在 ISR 中)作为leader。
2.7 消息发送的可靠性
Kafka 新版 Java 客户端(kafka-clients-2.0.0)使用的是异步方式发送消息,即:消息提交给 Kafka Producer 的 send 方法后,实际上是将该消息放入了它本身的一个后台发送队列,之后由一个后台线程不断地从队列中取出消息进行发送,消息发送成功后会回调 send 方法的 callback(如果存在就调用)。
batch.size
当多条消息的目标 Partition 相同时,Producer 会尝试将它们组装成一个批量消息,以便减少请求的数量,有助于提升客户端和服务器的性能。批量消息的大小由参数“batch.size”控制。
batch.size:int 型,默认值 16384,单位字节,即默认大小 16KB。
参数“batch.size”的不宜过大或过小,过大浪费内存,过小则有可能降低吞吐量(大小为零将完全禁用批处理)。
linger.ms
由于采用批处理模式,如果消息到达的速度比后台线程发送到 Partition 的速度快,则会出现消息堆积的问题。不过,在某些情况下,客户端可能通过添加人工延迟来减少请求数量,也就是说,生产者将等待直到给定延迟才允许发送其它消息,从而可以将待发送的消息批量在一起,而不是立即发送。
linger.ms 设置批处理延迟的上限是,一旦我们得到一个分区的 batch.size 值的记录,它将立即发送,而不管该设置如何;但如果这个分区的累积字节数少于 batch.size,我们将“逗留”指定的等待时间,到达 linger.ms 指定的延迟后,即使累积的消息字节数少于 batch.size,也会发送 。
linger.ms 默认为 0(即没有延迟)。例如,设置 linger.ms=5 将具有减少发送请求数量的效果,
但是对于在没有负载的情况下发送的记录,将增加高达 5ms 的延迟。
buffer.memory 和 max.block.ms
当程序的发送速率大于后台线程发送到 Broker 的速率时,生产的消息会在发送方堆积,为此 Kafka 提供了相应的堆积控制策略,主要涉及参数 buffer.memory 和 max.block.ms。
buffer.memory;long 型,默认值 33554432,单位字节,即默认大小为 32MB。
Producer 可以用来缓冲等待发送给服务器的记录的总字节数。如果记录的发送速度快于传送到服务器的速度,那么缓冲区将被占满,Producer 将阻塞 max.block.ms,之后将抛出异常。此参数的大小应大致与 Producer 可使用的总内存相对应,但不是硬绑定,因为并非 Producer 使用的所有内存都用于缓冲。一些额外的内存将用于压缩(如果启用压缩)以及维护等请求。
max.block.ms:long 型,默认值 60000,单位毫秒,即大小为 60s。
在 buffer.memory 指定的缓存被占满后,Producer 相关的方法可阻塞的最大时间由 max.block.ms 控制,之后将抛出异常。比如,KafkaProducer.send() 和 KafkaProducer.partitionsFor()。注意,用户提供的序列化器或分区器中的阻塞将不计入该超时。
retries
Producer 发送消息存在失败的可能,比如,目标 Partition 对应的 Leader 突然故障。因此,为了保证可靠性,发送消息具有“重试机制”,由参数 retries 控制:
retries:int 型,默认值 0。
部分场景下,为了“绝对可靠”,会将失败重试的次数设置为一个很大的数值,如 Integer.MAX_VALUE。
避免消息重排序
Kafka 默认情况下是批量发送:当多条消息的目标 Partition 相同时,Producer 会尝试将它们组装成一个批量消息,以便减少请求的数量,有助于提升客户端和服务器的性能,批量消息的大小由参数“batch.size”控制。
这种方式存在一个潜在的问题,假设有两个批量消息的目标 Partition 相同,先后顺序为 T1、T2,当他们先后发送到 Broker 上的目标 Partition 时,T1 对应的批量消息因某种原因发送失败,T2 则成功;之后,由于重试机制的保障,T1 重试发送成功;但是,两个批量消息到达的顺序变成了 T2、T1。对于某些对消息顺序敏感的应用场景,这是很危险的。
为了解决应对上述场景,Kafka 提供了一个参数“max.in.flight.requests.per.connection”。通过它,客户端可以设置在单个连接上发送的未确认请求的最大数目。如果设置大于 1,并且发送失败,则存在由于重试(如果启用了重试机制)而导致消息重新排序的风险。
max.in.flight.requests.per.connection: int型,默认值5,官方介绍:
如果启用了重试机制,即 retries>0,为了规避消息被重新排序的风险,修改默认设置如下:
max.in.flight.requests.per.connection = 1
-
让消息无序
kafka有个max.in.flight.requests.per.connection参数,这个参数是用来调整每个分区的可写入的连接数。如果把retries参数设置为一个非零整数,同时把max.in.flight.requests.per.connection设置为一个大于1的整数,那么,如果第一个批次消息写入失败,而第二个批次写入成功,broker会重试写入第一个批次。如果此时第一个批次也写入成功,那么两个批次的顺序就反过来了。 -
让消息有序
一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是很关键的,所以不建议把retries参数设置为0。可以把max.in.flight.requests.per.connection设置为1,这样在生产者尝试将第一批消息发送到分区上时,就不会有其他的消息发送给broker了。不过这样会严重影响生产的吞吐量,所以只有在对消息的顺序有严格的要求的情况下才能这么做。
2.8 消息接收的可靠性
默认情况下,客户端采用的是自动提交模式,由参数“enable.auto.commit” 控制,如下:
enable.auto.commit: boolean型,默认值True。
默认的自动提交模式存在潜在风险:可能导致消息丢失,比如客户端接收到一批消息并进行处理,在处理过程中,客户端 offset 定时提交的时间点到达,这批消息的 offset 被提交。但是,这批消息可能尚未处理完毕,更严重的是,处理过程中可能出现异常或错误,甚至是宕机。如是,这些尚未处理的消息将会丢失,因为 offset 已经提交,但实际上并未成功的被处理,Consumer 下次 Pull 消息时将从新的 offset 处读取。
基于上述分析,为了保证消息接收的可靠性,通常将参数“enable.auto.commit”设置为 false,防止程序自动提交。
2.9 消息存储的可靠性
Kafka 通过持久化消息到磁盘来保障消息存储的可靠性,但存在矛盾点:由于磁盘的 IO 瓶颈,过于频繁的 “刷盘” 会降低 Kafka 的性能;但是,“刷盘”的时间间隔过长又存在消息丢失的风险,降低可靠性。鉴于此,写磁盘的策略需要平衡性能和可靠性。Kafka 提供了两个参数来控制 Broker 的刷盘时机,如下:
log.flush.interval.ms:long型,默认值null,单位ms,用于控制日志刷盘的时间间隔,每隔多少时间将消息刷到磁盘上
log.flush.interval.messages:long型,默认值9223372036854775807,用于控制日志刷盘的消息量,即每积累多少条消息将消息刷
到磁盘上。
建议配置:
每当producer写入10000条消息时,刷数据到磁盘
log.flush.interval.messages=10000
每间隔1秒钟时间,刷数据到磁盘
log.flush.interval.ms=1000
3、Kafka生产消费基础流程
3.1 producer 发布消息
producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。
其路由机制为:
- 指定了 patition,则直接使用;
- 未指定 patition 但指定 key,通过对 key 进行 hash 选出一个 patition;
- patition 和 key 都未指定,使用轮询选出一个 patition。
写入流程:
- producer 先从 ZooKeeper 的 “/brokers/…/state” 节点找到该 partition 的leader;
- producer 将消息发送给该 leader;
- leader 将消息写入本地 log;
- followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK;
- leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK;
3.2 Broker 存储消息
物理上把 topic 分成一个或多个 patition,每个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的所有消息和索引文件)
3.3 Consumer 消费消息
high-level consumer API 提供了 consumer group 的语义,一个消息只能被 group 内的一个 consumer 所消费,且 consumer 消费消息时不关注 offset,最后一个 offset 由 ZooKeeper 保存(下次消费时,该group 中的consumer将从offset记录的位置开始消费)。
注意:
- 如果消费线程大于 patition 数量,则有些线程将收不到消息;
- 如果 patition 数量大于消费线程数,则有些线程多收到多个 patition 的消息;
- 如果一个线程消费多个 patition,则无法保证你收到的消息的顺序,而一个 patition 内的消息是有序的。
consumer 采用 pull 模式从 broker 中读取数据
push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。
对于 Kafka 而言,pull 模式更合适,它可简化 broker 的设计,consumer 可自主控制消费消息的速率,同时 consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
更多推荐
所有评论(0)