Kafka生产者

1 分区策略

1)分区的原因

(1)方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;

(2)可以提高并发,因为可以以Partition为单位读写了。

2)分区的原则

我们需要将producer发送的数据封装成一个ProducerRecord对象。
在这里插入图片描述

The default partitioning strategy:
1.If a partition is specified in the record, use it
2.If no partition is specified but a key is present choose a partition based on a hash of the key
3.If no partition or key is present choose the sticky partition that changes when the batch is full.

(1) 指定 partition 情况下,直接将数据发到对应的partition;

(2) 没有指定 partition ,但指定 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余(key.hashCode % 分区数),得到对应的partition;

(3) 既没有指定 partition ,也没有指定 key 的情况下, kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者过了间隔时间,kafka再随机一个分区进行使用(不包含正在使用的分区,在剩余的其他分区随机选择).

while (newPart == null || newPart.equals(oldPart)) {
	Integer random =Utils.toPositive(ThreadLocalRandom.current().nextInt());
	newPart = availablePartitions.get(random % availablePartitions.size()).partition();
}

2 数据可靠性保证

1)生产者发送数据到topic partition的可靠性保证

为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个partition 收到 producer 发送的数据后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据
在这里插入图片描述

2)Topic partition存储数据的可靠性保证

(1)副本数据同步策略

方案优点缺点
半数以上完成同步,就发送ack延迟低选举新的leader时,容忍n台节点的故障,需要2n+1个副本
全部完成同步,才发送ack选举新的leader时,容忍n台节点的故障,需要n+1个副本延迟高

Kafka选择了第二种方案,原因如下:

  1. 同样为了容忍n台节点的故障,第一种方案需要2n+1个副本,而第二种方案只需要n+1个副本,而Kafka的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
  2. 虽然第二种方案的网络延迟会比较高,但网络延迟对Kafka的影响较小。

(2)ISR(in+out=all)

采用第二种方案之后,设想以下情景:
leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader进行同步,那leader就要一直等下去,直到它完成同步,才能发送ack。这个问题怎么解决呢?

Leader维护了一个动态的同步副本集合in-sync replica set (ISR),意为和leader保持同步的follower集合。

当 ISR 中的 follower 完成数据的同步之后,leader 就会给 producer 发送ack。如果 follower 长时间未向 leader 同步数据,则该 follower 将被踢出 ISR,该时间阈值由replica.lag.time.max.ms参数设定(默认10s)。

若 Leader 发生故障之后,就会从 ISR 中选举新的 leader。(当follower被踢出ISR后,在OSR也会进行同步,当follower与leader保持同步后(LEO>=HW),又会进入ISR)

(3)ack应答级别

Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。

acks参数配置:

0:代表 leader 接收到消息马上 返回ack(不需要写入磁盘),效率最高,但安全性最低,当 leader 故障时有可能丢失数据

1:(默认值)代表 lleader 接收到消息,且落盘成功后 返回ack,效率和安全性都适中。如果在 follower 同步完成之前 leader 故障,那么将会丢失数据

-1(all):代表 lleader 接收到消息,落盘成功,且 ISP 中的 follower 全部同步完成后 才返回ack,效率最慢,但安全性最高,不会丢失数据。但如果在 follower 同步完成后,leader 发送ack之前,leader发生故障,那么会造成数据重复。(当ISR只有一个leader时(且掉线了),会丢数据

3 Exactly Once语义

ACK = -1,可以保证Producer到Server之间不会丢失数据,即At Least Once语义,最少一次<数据可能重复>,可以保证数据不丢失,但是不能保证数据不重复

ACK = 0,可以保证Producer每条消息只会被发送一次,即At Most Once语义,最多一次<数据可能丢失>,可以保证数据不重复,但是不能保证数据不丢失

0.11版本的Kafka,引入了一项重大特性:幂等性。

所谓的幂等性就是指Producer不论向Server发送多少次重复数据,Server端都只会持久化一条。幂等性结合At Least Once语义,就构成了Kafka的Exactly Once语义。即:At Least Once + 幂等性 = Exactly Once

启用幂等性,将 Producer 的参数中enable.idempotence设置为true即可。开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一Partition的消息会附带Sequence Number。

而Broker端会对 <PID, Partition, SeqNumber> 做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。

#主键: producerid + partition + sequenceNumber
producerid: 在producer端口生成
partition: 分区号
sequenceNumber : 序列号。写到分区的第几条数据

该主键会缓存到partition所在的broker上。后续在发送数据的时候,会对比数据的主键与缓存的主键。

注:producer 重启 PID 会变化,不同的 Partition 也具有不同主键,所以,幂等性无法保证跨分区跨会话的Exactly Once

Kafka消费者

1 消费方式

consumer采用pull(拉)模式从broker中读取数据

优点是消费速度可以由消费者自由控制,逐条消费或者批量消费可以由消费者自由控制

push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。

pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据
针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再消费,这段时长即为timeout。

2 分区分配策略

一个consumer group中有多个consumer,一个 topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费。

Kafka有三种分配策略,RoundRobin,Range,Sticky。

将分区的所有权从一个消费者移到另一个消费者称为重新平衡(rebalance)。当以下事件发生时,Kafka 将会进行一次分区分配:

  • 同一个 Consumer Group 内新增消费者
  • 消费者离开当前所属的Consumer Group,包括shuts down 或 crashes
  • 订阅的主题新增分区

1)Range Assignor(默认策略)
range: 范围分配,以主题为单位,如果消费者没有订阅此主题,就不会收到此主题里的数据,不会导致消费错数据的情况,但是有可能导致消费者组里面的每个消费者消费分区数不均等

例子:
topic [ partition0,parition1,partition2,partition3,partition4 ]
consumer group [ consumer1,consumer2 ]

分配分区分为两步:

  1. 大致估算每个消费者分配多少分区: 分区数 / 消费者个数 = 5/2 =2
  2. 计算前面几个消费者多消费一个分区: 分区数 % 消费者个数= 5%2=1

结果:
consumer1 [ partition0,parition1,partition2 ]
consumer2 [ partition3,parition4 ]

2)RoundRobin Assignor
round_robin:轮循的方式,以消费者组为单位,将消费者组里的,消费者订阅的所有主题全部取出来,将主题里的每个分区排序,再对每个消费者进行轮循,

  1. 如果同一个消费组内所有的消费者的订阅信息都是相同的,那么RoundRobinAssignor策略的分区分配会是均匀的。
  2. 如果同一个消费组内的消费者所订阅的信息是不相同的,那么在执行分区分配的时候就不是完全的轮询分配,有可能会导致分区分配的不均匀。如果某个消费者没有订阅消费组内的某个topic,那么在分配分区的时候,此消费者将分配不到这个topic的任何分区

例子:
topic [ partition0,parition1,partition2,partition3,partition4 ]
consumer group [ consumer1,consumer2]
结果:
consumer1 [ partition0,parition2,partition4 ]
consumer2 [ partition1,parition3 ]

3)Sticky Assignor
StickyAssignor策略,“sticky”这个单词可以翻译为“粘性的”,Kafka从0.11.x版本开始引入这种分配策略,它主要有两个目的:

  1. 分区的分配要尽可能的均匀,分配给消费者者的主题分区数最多相差一个;
  2. 分区的分配尽可能的与上次分配的保持相同。

当两者发生冲突时,第一个目标优先于第二个目标

如果从新分配,会尽可能保留上一次,对原有消费者的分配结果,并在此基础上,对 新加入 或 删除 的消费者进行资源分配

3 offset的维护

由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,

所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。(Group + Topic + Partition)

Kafka 0.9版本之前,consumer默认将offset保存在Zookeeper中,
从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,
该topic为 __consumer_offsets

4 故障处理细节

leader 和 follower故障处理细节(多去少补
在这里插入图片描述
LEO:指的是每个副本最大的offset;

HW:指的是消费者能见到的最大的offset,ISR队列中最小的LEO。

(1)follower故障

follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后,follower 会读取本地磁盘记录的上次的HW,先将log文件,高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步,等该 follower 的 LEO 大于等于该 Partition 的 HW,即follower 追上 leader 之后,就可以重新加入ISR了。

(2)leader故障

leader 发生故障之后,会从 ISR 中选出一个新的 leader,
之后,为保证多个副本之间的数据一致性,其余的 follower 会先将各自的log文件高于 HW 的部分截掉,然后从新的leader同步数据。

注:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐