大数据面试重点之kafka(六)
大数据面试重点之kafka(六)
大数据面试重点之kafka(六)
Kafka分区分配算法
可回答:Kafka的partition分区策略问过的一些公司:阿里云,小米参考答案:
1、生产者分区分配策略
生产者在将消息发送到某个Topic ,需要经过拦截器、序列化器和分区器( Partitioner )的一系列作用之后才能发送到对应的Broker,在发往Broker之前是需要确定它所发往的分区。
如果消息如果消息
指定了partition字段,那么就不需要分区器。
没有指定partition字段,那么就需要依赖分区器,根据key这个字段来
ProducerRecord
ProducerRecord
计算partition的值。分区器的作用就是为消息分配分区。
- public class ProducerRecord<K, V> {
- // 该消息需要发往的主题
- private final String topic;
- // 该消息需要发往的主题中的某个分区,如果该字段有值,则分区器不起作用,直接发往指定的分区
- // 如果该值为null,则利用分区器进行分区的选择
- private final Integer partition;
- private final Headers headers;
- // 如果partition字段为null,则使用分区器进行分区选择时会用到该key字段,该值可为空
- private final K key;
- private final V value;
- private final Long timestamp; 12
Kafka 中提供的默认分区器是 DefaultPartitioner ,它实现了Partitioner接口(用户可以实现这个接口来自定义分区器),其中的partition方法就是用来实现具体的分区分配逻辑:
如果在发消息的时候指定了分区,则消息投递到指定的分区。
如果没有指定分区,但是消息的key不为空,则使用称之为 murmur 的Hash算法(非加密型Hash函数,具备高运算性能及低碰撞率)来计算分区分配。
如果既没有指定分区,且消息的key也是空,则用轮询的方式选择一个分区。
1 public class DefaultPartitioner implements Partitioner { 2
3 private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
4
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
- // 首先通过cluster从元数据中获取topic所有的分区信息
- List partitions = cluster.partitionsForTopic(topic);
- // 拿到该topic的分区数
- int numPartitions = partitions.size();
- // 如果消息记录中没有指定key
- if (keyBytes == null) {
- // 则获取一个自增的值
- int nextValue = nextValue(topic);
- // 通过cluster拿到所有可用的分区(可用的分区这里指的是该分区存在首领副本)
- List availablePartitions = cluster.availablePartitionsForTopic(topic);
- // 如果该topic存在可用的分区
- if (availablePartitions.size() > 0) {
- // 那么将nextValue转成正数之后对可用分区数进行取余
- int part = Utils.toPositive(nextValue) % availablePartitions.size();
- // 然后从可用分区中返回一个分区
- return availablePartitions.get(part).partition();
- } else { // 如果不存在可用的分区
- // 那么就从所有不可用的分区中通过取余的方式返回一个不可用的分区
- return Utils.toPositive(nextValue) % numPartitions; 25 }
- } else { // 如果消息记录中指定了key
- // 则使用该key进行hash操作,然后对所有的分区数进行取余操作,这里的hash算法采用的是murmur2算法,然后再转成正数
- //toPositive方法很简单,直接将给定的参数与0X7FFFFFFF进行逻辑与操作。
- return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; 30 }
31 }
43 if (currentCounter != null) {
44 // 之后把这个随机数返回
45 counter = currentCounter;
46 }
47 }
48 // 一旦存入了随机数之后,后续的请求均在该随机数的基础上+1之后进行返回
2、消费者分区分配策略
32
- // nextValue方法可以理解为是在消息记录中没有指定key的情况下,需要生成一个数用来代替key的hash值
- // 方法就是最开始先生成一个随机数,之后在这个随机数的基础上每次请求时均进行+1的操作
- private int nextValue(String topic) {
- // 每个topic都对应着一个计数
- AtomicInteger counter = topicCounterMap.get(topic);
- if (null == counter) { // 如果是第一次,该topic还没有对应的计数
- // 那么先生成一个随机数
- counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
- // 然后将该随机数与topic对应起来存入map中
- AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
49
50
51
return counter.getAndIncrement();
}
消费者以组的名义订阅主题,主题有多个分区,消费者组中有多个消费者实例,同一时刻,一条消息只 能被组中的一个消费者实例消费。
如果分区数大于或者等于组中的消费者实例数,一个消费者会负责多个分区。
如果分区数小于组中的消费者实例数,有些消费者将处于空闲状态并且无法接收消息。
如果多个消费者负责同一个分区,那么就意味着两个消费者同时读取分区的消息,由于消费者自己可以 控制读取消息的Offset,就有可能C1才读到2,而C1读到1,C1还没处理完,C2已经读到3了,这就相当于 多线程读取同一个消息,会造成消息处理的重复,且不能保证消息的顺序。
81 Range策略
(默认分配策略)对应的实现类是 org.apache.kafka.clients.consumer.RangeAssignor
range
。
首先,将分区按数字顺序排行序,消费者按名称的字典序排序。
然后,用分区总数除以消费者总数。如果能够除尽,平均分配;若除不尽,则位于排序前面的消费 者将多负责一个分区。
假设,有1个主题、10个分区、3个消费者线程, 10 / 3 = 3,而且除不尽,那么消费者C1将会多消费一个分区,分配结果是:
C1将消费T1主题的0、1、2、3分区。C2将消费T1主题的4、5、6分区。
C3将消费T1主题的7、8、9分区。
假设,有11个分区,分配结果是:
C1将消费T1主题的0、1、2、3分区。C2将消费T1主题的4、5、 6、7分区。C2将消费T1主题的8、9、10分区。
假如,有2个主题(T0和T1),分别有3个分区,分配结果是:
C1将消费T1主题的 0、1 分区,以及T1主题的 0、1 分区。
275
C2将消费T1主题的 2、3 分区,以及T2主题的 2、3 分区。
a. public Map<String, List> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {
b. // 主题与消费者的映射
a. Map<String, List> consumersPerTopic = consumersPerTopic(subscriptions);
b. Map<String, List> assignment = new HashMap<>();
c. for (String memberId : subscriptions.keySet())
d. assignment.put(memberId, new ArrayList()); 7
- for (Map.Entry<String, List> topicEntry : consumersPerTopic.entrySet()) {
- String topic = topicEntry.getKey(); // 主 题
- List consumersForTopic = topicEntry.getValue(); // 消费者列表
11 - // partitionsPerTopic表示主题和分区数的映射
- // 获取主题下有多少个分区
- Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
- if (numPartitionsForTopic == null)
- continue;
17 - // 消费者按字典序排序
- Collections.sort(consumersForTopic); 20
- // 分区数量除以消费者数量
- int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
- // 取模,余数就是额外的分区
- int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
25 - List partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
- for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
276
- int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
- int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
- // 分配分区
31
assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start
- length));
32 }
33 }
34 return assignment;
35 }
36
82 RoundRobin策略
RoundRobin基于轮询算法,对应的实现类是
org.apache.kafka.clients.consumer.RoundRobinAssignor
首先,将所有主题的分区组成 TopicAndPartition 列表。
然后对TopicAndPartition列表按照hashCode进行排序某个topic。
假设,有两个消费者C0和C1,两个主题T0和T1,每个主题有3个分区,分配结果是:
C0将消费T0主题的0、2分区,以及T1主题的1分区。C1将消费T0主题的1分区,以及T1主题的0、2分区。
Kafka蓄水池机制
问过的一些公司:深信服参考答案:
Kafka中如何保证数据一致性?
可回答:Kafka的一致性
问过的一些公司:字节,美团参考答案:
不论是旧的Leader还是新选举产生的Leader,Consumer都能读到一样的数据,Kafka是通过引入
HW(High Water Mark)机制来保证数据一致性。
假设分区的副本为3,其中副本0是Leader,副本1和副本2是follower,并且在ISR列表里面,虽然副本0已 经写入了Message4,但是Consumer只能卖取到Message2。因为所有的ISR都同步了Message2,只有High Water Mark以上的消息才支持Consumer读取,而High Water Mark取决于ISR列表里面偏移量最小的分
区,对应于上图的副本2,这个很类似于木桶原理。
这样做的原因是还没有被足够多副本复制的消息被认为是“不安全”的,如果 Leader 发生崩溃,另一个副本成为新Leader,那么这些消息很可能丢失了。如果我们允许消费者读取这些消息,可能就会破坏一致 性。试想,一个消费者从当前Leader(副本0)读取并处理了Message4,这个时候Leader挂掉了,选举了 副本1为新的Leader,这时候另一个消费者再去从新的Leader读取消息,发现这个消息其实并不存在,这 就导致了数据不一致性的问题。
当然,引入了High Water Mark 机制,会导数Broker间的消息复制因为某些原因变慢,那么消息到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕),延迟时间可以通过参数replica.lag.time.max.ms参数配置,它指定了副本在复制消息时可被允许的最大延迟时间。
Kafka新旧API区别
问过的一些公司:腾讯参考答案:
1、高级API
优点:
高级API写起来简单
不需要去自行去管理offset,系统通过zookeeper自行管理不需要管理分区,副本等情况,系统自动管理
消费者断线会自动根据上一次记录在zookeeper中的offset去接着获取数据
可以使用group来区分对访问同一个topic的不同程序访问分离开来(不同的group记录不同的
offset,这样不同程序读取同一个topic才不会因为offset互相影响)
缺点:
不能自行控制offset(对于某些特殊需求来说) 不能细化控制如分区、副本、zk 等
2、低级API
优点:
能够开发者自己控制offset,想从哪里读取就从哪里读取自行控制连接分区,对分区自定义进行负载均衡
对zookeeper 的依赖性降低(如:offset 不一定非要靠zk 存储,自行存储offset 即可,比如存在文件或者内存中)
缺点:
太过复杂,需要自行控制offset,连接哪个分区,找到分区leader等
Kafka消息在磁盘上的组织方式
问过的一些公司:字节参考答案:
Kafka中的消息是以主题为基本单位进行归类的,各个主题在逻辑上相互独立。每个主题又可以分为一 个或多个分区,分区的数量可以在主题创建的时候指定,也可以在之后修改。每条消息在发送的时候会 根据分区规则被追加到指定的分区中,分区中的每条消息都会被分配一个唯一的序列号,也就是通常所 说的偏移量(oGset),具有4个分区的主题的逻辑结构
如果分区规则设置得合理,那么所有的消息可以均匀地分布到不同的分区中,这样就可以实现水平扩 展。不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止Log过大,Kafka又引入了日志分 段(LogSegment)的概念,将Log切分为多个LogSegment,相当于一个巨型文件被平均分配为多个相对 较小的文件,这样也便于消息的维护和清理。
事实上,Log和LogSegment也不是纯粹物理意义上的概念,Log在物理上只以文件夹的形式存储,而每个LogSegment对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以“.txnindex”为 后缀的事务索引文件)。
Kafka在哪些地方会有选举过程,使用什么工具支持选举?
问过的一些公司:字节参考答案:
Kafka中的选举大致可以分为三大类:控制器的选举(先到先得)、分区leader的选举(ISR)以及消费 者相关的选举
1、控制器的选举
在Kafka集群中会有一个或多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态等工作。比如当某个分区的leader副本出现故障时,由控制器 负责为该分区选举新的leader副本。再比如当检测到某个分区的ISR集合发生变化时,由控制器负责通知 所有broker更新其元数据信息。
Kafka Controller的选举是依赖Zookeeper来实现的,在Kafka集群中哪个broker能够成功创建/controller这个临时(EPHEMERAL)节点他就可以成为Kafka Controller。
2、分区Leader的选举
分区leader副本的选举由Kafka Controller负责具体实施。当创建分区(创建主题或增加分区都有创建分区的动作)或分区上线(比如分区中原先的leader副本下线,此时分区需要选举一个新的leader上线来对 外提供服务)的时候都需要执行leader的选举动作。
基本思路是按照AR(Assigned Repllicas:分区中的所有副本)集合中副本的顺序查找第一个存活的副本,并且这个副本在ISR集合中。一个分区的AR集合在分配的时候就被指定,并且只要不发生重分配的 情况,集合内部副本的顺序是保持不变的,而分区的ISR集合中副本的顺序可能会改变。注意这里是根 据AR的顺序而不是ISR的顺序进行选举的。这个说起来比较抽象,有兴趣的读者可以手动关闭/开启某个 集群中的broker来观察一下具体的变化。
还有一些情况也会发生分区leader的选举,比如当分区进行重分配(reassign)的时候也需要执行leader 的选举动作。这个思路比较简单:从重分配的AR列表中找到第一个存活的副本,且这个副本在目前的 ISR列表中。
再比如当发生优先副本(preferred replica partition leader election)的选举时,直接将优先副本设置为leader即可,AR集合中的第一个副本即为优先副本。
还有一种情况就是当某节点被优雅地关闭(也就是执行ControlledShutdown)时,位于这个节点上的leader副本都会下线,所以与此对应的分区需要执行leader的选举。这里的具体思路为:从AR列表中找 到第一个存活的副本,且这个副本在目前的ISR列表中,与此同时还要确保这个副本不处于正在被关闭 的节点上。
3、消费者相关的选举
组协调器GroupCoordinator需要为消费组内的消费者选举出一个消费组的leader,这个选举的算法也很简 单,分两种情况分析。如果消费组内还没有leader,那么第一个加入消费组的消费者即为消费组的leader。如果某一时刻leader消费者由于某些原因退出了消费组,那么会重新选举一个新的leader,这个 重新选举leader的过程又更“随意”了,相关代码如下:
- private val members = new mutable.HashMap[String, MemberMetadata]
- var leaderId = members.keys.head 3
解释一下这2行代码:在GroupCoordinator中消费者的信息是以HashMap的形式存储的,其中key为消费 者的member_id,而value是消费者相关的元数据信息。leaderId表示leader消费者的member_id,它的取 值为HashMap中的第一个键值对的key,这种选举的方式基本上和随机无异。总体上来说,消费组的leader选举过程是很随意的。
用过Kafka的对partition.assignment.strategy(取值为RangeAssignor、RoundRobinAssignor、StickyAssignor等)这个参数都并不陌生。每个消费者都可以设置自己的分区分配策略,对消费组而言需 要从各个消费者呈报上来的各个分配策略中选举一个彼此都“信服”的策略来进行整体上的分区分配。这 个分区分配的选举并非由leader消费者决定,而是根据消费组内的各个消费者投票来决定的。
Kafka搭建过程要配置什么参数?
问过的一些公司:Bigo 参考答案:
Kafka的重要配置是在server.propertis文件中,具体如下:
1 #broker的全局唯一编号,不能重复
290
- broker.id=0
- #删除topic功能使能
- delete.topic.enable=true
- #处理网络请求的线程数量
- num.network.threads=3
- #用来处理磁盘IO的现成数量
- num.io.threads=8
- #发送套接字的缓冲区大小
- socket.send.buffer.bytes=102400
- #接收套接字的缓冲区大小
- socket.receive.buffer.bytes=102400
- #请求套接字的缓冲区大小
- socket.request.max.bytes=104857600
- #kafka运行日志存放的路径
- log.dirs=/opt/module/kafka/logs
- #topic在当前broker上的分区个数
- num.partitions=1
- #用来恢复和清理data下数据的线程数量
- num.recovery.threads.per.data.dir=1
- #segment文件保留的最长时间,超时将被删除
- log.retention.hours=168
- #配置连接Zookeeper集群地址
- zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka 25
基本上熟悉上面的一些参数就可以了,下面放一些更详细的,有需要可以看看。
1、Kafka配置参数
broker.id:broker的id,id是唯一的非负整数,集群的broker.id不能重复。log.dirs:kafka存放数据的路径。可以是多个,多个使用逗号分隔即可。port:server 接 受 客 户 端 连 接 的 端 口 , 默 认 6667 zookeeper.connect:zookeeper集群连接地址。
格式如:zookeeper.connect=server01:2181,server02:2181,server03:2181。
如 果 需 要 指 定 zookeeper 集 群 的 路 径 位 置 , 可 以 : zookeeper.connect=server01:2181,server02:2181,server03:2181/kafka/cluster。这样设置后,在启动kafka 集群前,需要在zookeeper集群创建这个路径/kafka/cluster。
message.max.bytes:server可以接受的消息最大尺寸。默认1000000。
重要的是,consumer和producer有关这个属性的设置必须同步,否则producer发布的消息对consumer来 说太大。
num.network.threads:server用来处理网络请求的线程数,默认3。num.io.threads:server用来处理请求的I/O线程数。这个线程数至少等于磁盘的个数。background.threads:用于后台处理的线程数。例如文件的删除。默认4。
queued.max.requests:在网络线程停止读取新请求之前,可以排队等待I/O线程处理的最大请求个数。默 认500。
host.name:broker的hostname
如果hostname已经设置的话,broker将只会绑定到这个地址上;如果没有设置,它将绑定到所有接口, 并发布一份到ZK
advertised.host.name:如果设置,则就作为broker 的hostname发往producer、consumers以及其他
brokers
advertised.port:此端口将给与producers、consumers、以及其他brokers,它会在建立连接时用到; 它仅在实际端口和server需要绑定的端口不一样时才需要设置。
socket.send.buffer.bytes:SO_SNDBUFF 缓存大小,server进行socket 连接所用,默认1001024。
socket.receive.buffer.bytes:SO_RCVBUFF缓存大小,server进行socket连接时所用。默认100 * 1024。
socket.request.max.bytes:server允许的最大请求尺寸;这将避免server溢出,它应该小于Java heap size。
num.partitions:如果创建topic时没有给出划分partitions个数,这个数字将是topic下partitions数目的默 认数值。默认1。
log.segment.bytes:topic partition的日志存放在某个目录下诸多文件中,这些文件将partition的日志切分成一段一段的;这个属性就是每个文件的最大尺寸;当尺寸达到这个数值时,就会创建新文件。此设 置可以由每个topic基础设置时进行覆盖。默认1014 1024 1024
log.roll.hours:即使文件没有到达log.segment.bytes,只要文件创建时间到达此属性,就会创建新文件。 这个设置也可以有topic层面的设置进行覆盖。默认247
log.cleanup.policy:log清除策略。默认delete。
log.retention.minutes和log.retention.hours:每个日志文件删除之前保存的时间。默认数据保存时间对所 有topic都一样。
log.retention.minutes 和 log.retention.bytes 都是用来设置删除日志文件的,无论哪个属性已经溢出。这个属性设置可以在topic基本设置时进行覆盖。
log.retention.bytes:每个topic下每个partition保存数据的总量。
注意,这是每个partitions的上限,因此这个数值乘以partitions的个数就是每个topic保存的数据总量。如 果log.retention.hours和log.retention.bytes都设置了,则超过了任何一个限制都会造成删除一个段文件。
注意,这项设置可以由每个topic设置时进行覆盖。
log.retention.check.interval.ms:检查日志分段文件的间隔时间,以确定是否文件属性是否到达删除要 求。默认5min。
log.cleaner.enable:当这个属性设置为false时,一旦日志的保存时间或者大小达到上限时,就会被删 除;如果设置为true,则当保存属性达到上限时,就会进行log compaction。默认false。
log.cleaner.threads:进行日志压缩的线程数。默认1。
log.cleaner.io.max.bytes.per.second:进行log compaction时,log cleaner可以拥有的最大I/O数目。这项设置限制了cleaner,以避免干扰活动的请求服务。
log.cleaner.io.buffer.size:log cleaner清除过程中针对日志进行索引化以及精简化所用到的缓存大小。最好设置大点,以提供充足的内存。默认500 1024 1024。
log.cleaner.io.buffer.load.factor:进行log cleaning时所需要的I/O chunk尺寸。你不需要更改这项设置。默认512*1024。
log.cleaner.io.buffer.load.factor:log cleaning中所使用的hash表的负载因子;你不需要更改这个选项。默认0.9
log.cleaner.backoff.ms:进行日志是否清理检查的时间间隔,默认15000。log.cleaner.min.cleanable.ratio:这项配置控制log compactor试图清理日志的频率(假定log compaction
是打开的)。
默认避免清理压缩超过50%的日志。这个比率绑定了备份日志所消耗的最大空间(50%的日志备份时压 缩率为50%)。更高的比率则意味着浪费消耗更少,也就可以更有效的清理更多的空间。这项设置在每 个topic设置中可以覆盖。
log.cleaner.delete.retention.ms:保存时间;保存压缩日志的最长时间;也是客户端消费消息的最长时 间,与log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据;会被topic创建时 的指定时间覆盖。
log.index.size.max.bytes:每个log segment的最大尺寸。注意,如果log尺寸达到这个数值,即使尺寸没有超过log.segment.bytes限制,也需要产生新的log segment。默认10 1024 1024。
log.index.interval.bytes:当执行一次fetch后,需要一定的空间扫描最近的offset,设置的越大越好,一般 使用默认值就可以。默认4096。
log.flush.interval.messages:log文件“sync”到磁盘之前累积的消息条数。
因为磁盘IO操作是一个慢操作,但又是一个“数据可靠性”的必要手段,所以检查是否需要固化到硬盘的 时间间隔。需要在“数据可靠性”与“性能”之间做必要的权衡,如果此值过大,将会导致每次“发sync”的 时间过长(IO阻塞),如果此值过小,将会导致“fsync”的时间较长(IO阻塞),导致”发sync“的次数较 多,这也就意味着整体的client请求有一定的延迟,物理server故障,将会导致没有fsync的消息丢失。
log.flush.scheduler.interval.ms:检查是否需要fsync的时间间隔。默认Long.MaxValue
log.flush.interval.ms:仅仅通过interval来控制消息的磁盘写入时机,是不足的,这个数用来控制” fsync“的时间间隔,如果消息量始终没有达到固化到磁盘的消息数,但是离上次磁盘同步的时间间隔达 到阈值,也将触发磁盘同步。
log.delete.delay.ms:文件在索引中清除后的保留时间,一般不需要修改。默认60000。
auto.create.topics.enable:是否允许自动创建topic。如果是true,则produce或者fetch 不存在的topic 时,会自动创建这个topic。否则需要使用命令行创建topic。默认true。
controller.socket.timeout.ms:partition管理控制器进行备份时,socket的超时时间。默认30000。controller.message.queue.size:controller-to-broker-channles的buffer尺寸,默认Int.MaxValue。default.replication.factor:默认备份份数,仅指自动创建的topics。默认1。
replica.lag.time.max.ms:如果一个follower在这个时间内没有发送fetch请求,leader将从ISR重移除这个follower,并认为这个follower已经挂了,默认10000。
replica.lag.max.messages:如果一个replica没有备份的条数超过这个数值,则leader将移除这个follower,并认为这个follower已经挂了,默认4000。
replica.socket.timeout.ms:leader 备份数据时的socket网络请求的超时时间,默认301000
replica.socket.receive.buffer.bytes:备份时向leader发送网络请求时的socket receive buffer。默认641024。
replica.fetch.max.bytes:备份时每次fetch的最大值。默认1024*1024。replica.fetch.max.bytes:leader发出备份请求时,数据到达leader的最长等待时间。默认500。replica.fetch.min.bytes:备份时每次fetch之后回应的最小尺寸。默认1。num.replica.fetchers:从leader备份数据的线程数。默认1。
replica.high.watermark.checkpoint.interval.ms:每个replica检查是否将最高水位进行固化的频率。默认5000.
fetch.purgatory.purge.interval.requests:fetch 请求清除时的清除间隔,默认1000 producer.purgatory.purge.interval.requests:producer请求清除时的清除间隔,默认1000
zookeeper.session.timeout.ms:zookeeper 会 话 超 时 时 间 。 默 认 6000 zookeeper.connection.timeout.ms:客户端等待和zookeeper建立连接的最大时间。默认6000 zookeeper.sync.time.ms:zk follower落后于zk leader的最长时间。默认2000
controlled.shutdown.enable:是否能够控制broker的关闭。如果能够,broker将可以移动所有leaders到 其他的broker上,在关闭之前。这减少了不可用性在关机过程中。默认true。
controlled.shutdown.max.retries:在执行不彻底的关机之前,可以成功执行关机的命令数。默认3. controlled.shutdown.retry.backoff.ms:在关机之间的backoff时间。默认5000
auto.leader.rebalance.enable:如果这是true,控制者将会自动平衡brokers对于partitions的leadership。 默认true。
leader.imbalance.per.broker.percentage:每个broker所允许的leader最大不平衡比率,默认10。leader.imbalance.check.interval.seconds: 检 查 leader 不 平 衡 的 频 率 , 默 认 300 offset.metadata.max.bytes: 允 许 客 户 端 保 存 他 们 offsets 的 最 大 个 数 。 默 认 4096 max.connections.per.ip:每个ip地址上每个broker可以被连接的最大数目。默认Int.MaxValue。max.connections.per.ip.overrides:每个ip或者hostname默认的连接的最大覆盖。connections.max.idle.ms: 空 连 接 的 超 时 限 制 , 默 认 600000 log.roll.jitter.{ms,hours}: 从 logRollTimeMillis 抽 离 的 jitter 最 大 数 目 。 默 认 0 num.recovery.threads.per.data.dir:每个数据目录用来日志恢复的线程数目。默认1。unclean.leader.election.enable:指明了是否能够使不在ISR中replicas设置用来作为leader。默认true delete.topic.enable:能够删除topic,默认false。
offsets.topic.num.partitions:默认50。由于部署后更改不受支持,因此建议使用更高的设置来进行生产
(例如100-200)。
offsets.topic.retention.minutes:存在时间超过这个时间限制的offsets都将被标记为待删除。默认1440。offsets.retention.check.interval.ms:offset管理器检查陈旧offsets的频率。默认600000。
offsets.topic.replication.factor:topic的offset的备份份数。建议设置更高的数字保证更高的可用性。默认 3
offset.topic.segment.bytes:offsets topic的segment尺寸。默认104857600
offsets.load.buffer.size:这项设置与批量尺寸相关,当从offsets segment中读取时使用。默认5242880
offsets.commit.required.acks:在offset commit可以接受之前,需要设置确认的数目,一般不需要更改。默认-1。
2、Kafka生产者配置参数
boostrap.servers:用于建立与kafka集群连接的host/port组。
数据将会在所有servers上均衡加载,不管哪些server是指定用于bootstrapping。 这 个 列 表 格 式 :host1:port1,host2:port2,… acks:此配置实际上代表了数据备份的可用性。
acks=0: 设置为0表示producer不需要等待任何确认收到的信息。副本将立即加到socket buffer并认为已经发送。没有任何保障可以保证此种情况下server已经成功接收数据,同时重试配置不会发生作用
acks=1: 这意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
acks=all: 这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
buffer.memory:producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度, producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。
compression.type:producer用于压缩数据的压缩类型。默认是无压缩。正确的选项值是none、gzip、snappy。压缩最好用于批量处理,批量处理消息越多,压缩性能越好。
retries:设置大于0的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户 端接收到发送错误时的重试没有什么不同。
允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失 败第二个发送成功,则第二条消息会比第一条消息出现要早。
batch.size:producer将试图批处理消息记录,以减少请求次数。这将改善client与server之间的性能。这 项配置控制默认的批量处理消息字节数。
client.id:当向server发出请求时,这个字符串会发送给server。目的是能够追踪请求源头,以此来允许ip/port许可列表之外的一些应用可以发送信息。这项应用可以设置任意字符串,因为没有任何功能性的 目的,除了记录和跟踪。
linger.ms:producer组将会汇总任何在请求与发送之间到达的消息记录一个单独批量的请求。通常来 说,这只有在记录产生速度大于发送速度的时候才能发生。
max.request.size:请求的最大字节数。这也是对最大记录尺寸的有效覆盖。注意:server具有自己对消 息记录尺寸的覆盖,这些尺寸和这个设置不同。此项设置将会限制producer每次批量发送请求的数目, 以防发出巨量的请求。
receive.buffer.bytes:TCP receive缓存大小,当阅读数据时使用。send.buffer.bytes:TCP send缓存大小,当发送数据时使用。
timeout.ms:此配置选项控制server等待来自followers的确认的最大时间。如果确认的请求数目在此时间 内没有实现,则会返回一个错误。这个超时限制是以server端度量的,没有包含请求的网络延迟。
block.on.buffer.full:当我们内存缓存用尽时,必须停止接收新消息记录或者抛出错误。
默认情况下,这个设置为真,然而某些阻塞可能不值得期待,因此立即抛出错误更好。设置为false则会 这样:producer会抛出一个异常错误:BufferExhaustedException, 如果记录已经发送同时缓存已满。
metadata.fetch.timeout.ms:是指我们所获取的一些元素据的第一个时间数据。元素据包含:topic, host,partitions。此项配置是指当等待元素据fetch成功完成所需要的时间,否则会抛出异常给客户端。
metadata.max.age.ms:以微秒为单位的时间,是在我们强制更新metadata的时间间隔。即使我们没有看 到任何partition leadership改变。
metric.reporters:类的列表,用于衡量指标。实现MetricReporter接口,将允许增加一些类,这些类在新 的衡量指标产生时就会改变。JmxReporter总会包含用于注册JMX统计
metrics.num.samples:用于维护metrics的样本数。
metrics.sample.window.ms:metrics系统维护可配置的样本数量,在一个可修正的window size。这项配置配置了窗口大小,例如。我们可能在30s的期间维护两个样本。当一个窗口推出后,我们会擦除并重 写最老的窗口。
recoonect.backoff.ms:连接失败时,当我们重新连接时的等待时间。这避免了客户端反复重连。retry.backoff.ms:在试图重试失败的produce请求之前的等待时间。避免陷入发送-失败的死循环中。
3、Kafka消费者配置参数
group.id:用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes
都是属于同一个consumer group。
zookeeper.connect:指定zookeeper的连接的字符串,格式是hostname:port, hostname:port… consumer.id:不需要设置,一般自动产生
socket.timeout.ms:网络请求的超时限制。真实的超时限制是max.fetch.wait+socket.timeout.ms。默认3000
socket.receive.buffer.bytes:socket用于接收网络请求的缓存大小。默认641024。fetch.message.max.bytes:每次fetch请求中,针对每次fetch消息的最大字节数。默认10241024
这些字节将会督导用于每个partition的内存中,因此,此设置将会控制consumer所使用的memory大 小。
这个fetch请求尺寸必须至少和server允许的最大消息尺寸相等,否则,producer可能发送的消息尺寸大 于consumer所能消耗的尺寸。
num.consumer.fetchers:用于fetch数据的fetcher线程数。默认1
auto.commit.enable:如果为真,consumer所fetch的消息的offset将会自动的同步到zookeeper。这项提 交的offset将在进程挂掉时,由新的consumer使用。默认true。
auto.commit.interval.ms:consumer向zookeeper提交offset的频率,单位是秒。默认60*1000。
queued.max.message.chunks:用于缓存消息的最大数目,每个chunk必须和fetch.message.max.bytes相 同。默认2。
rebalance.max.retries:当新的consumer加入到consumer group时,consumers集合试图重新平衡分配到每个consumer的partitions数目。如果consumers集合改变了,当分配正在执行时,这个重新平衡会失败 并重入。默认4
fetch.min.bytes:每次fetch请求时,server应该返回的最小字节数。如果没有足够的数据返回,请求会等 待,直到足够的数据才会返回。
fetch.wait.max.ms:如果没有足够的数据能够满足fetch.min.bytes,则此项配置是指在应答fetch请求之 前,server会阻塞的最大时间。默认100
rebalance.backoff.ms:在重试reblance之前backoff时间。默认2000
refresh.leader.backoff.ms:在试图确定某个partition的leader是否失去他的leader地位之前,需要等待的backoff时间。默认200
auto.offset.reset:zookeeper中没有初始化的offset时,如果offset是以下值的回应: lastest:自动复位offset为lastest的offset
earliest:自动复位offset为earliest的offset none:向consumer抛出异常
consumer.timeout.ms:如果没有消息可用,即使等待特定的时间之后也没有,则抛出超时异常
exclude.internal.topics:是否将内部topics的消息暴露给consumer。默认true。
paritition.assignment.strategy:选择向consumer 流分配partitions的策略,可选值:range,roundrobin。默认range。
client.id:是用户特定的字符串,用来在每次请求中帮助跟踪调用。它应该可以逻辑上确认产生这个请 求的应用。
zookeeper.session.timeout.ms:zookeeper 会话的超时限制。默认6000
如果consumer在这段时间内没有向zookeeper发送心跳信息,则它会被认为挂掉了,并且reblance将会产 生
zookeeper.connection.timeout.ms:客户端在建立通zookeeper连接中的最大等待时间。默认6000 zookeeper.sync.time.ms:ZK follower 可 以 落 后 ZK leader 的 最 大 时 间 。 默 认 1000 offsets.storage:用于存放offsets的地点: zookeeper或者kafka。默认zookeeper。
offset.channel.backoff.ms:重新连接offsets channel或者是重试失败的offset的fetch/commit请求的backoff
时间。默认1000
offsets.channel.socket.timeout.ms:当读取offset的fetch/commit请求回应的socket 超时限制。此超时限制是被consumerMetadata请求用来请求offset管理。默认10000。
offsets.commit.max.retries:重试offset commit的次数。这个重试只应用于offset commits在shut-down之间。默认5。
dual.commit.enabled:如果使用“kafka”作为offsets.storage,你可以二次提交offset到zookeeper(还有一次 是提交到kafka)。
在zookeeper-based的offset storage到kafka-based的offset storage迁移时,这是必须的。对任意给定的consumer group来说,比较安全的建议是当完成迁移之后就关闭这个选项
partition.assignment.strategy:在“range”和“roundrobin”策略之间选择一种作为分配partitions给consumer 数据流的策略。
循环的partition分配器分配所有可用的partitions以及所有可用consumer线程。它会将partition循环的分 配到consumer线程上。如果所有consumer实例的订阅都是确定的,则partitions的划分是确定的分布。
循环分配策略只有在以下条件满足时才可以
- 每个topic在每个consumer实力上都有同样数量的数据流。
- 订阅的topic的集合对于consumer group中每个consumer实例来说都是确定的
Kafka的单播和多播
问过的一些公司:猿辅导参考答案:
1、单播
一条消息只能被某一个消费者消费的模式称为单播。要实现消息单播,只要让这些消费者属于同一个消 费者组即可。当生产者发送一条消息时,两个消费者中只有一个能收到消息。
2、多播
一条消息能够被多个消费者消费的模式称为多播。之所以不称之为广播,是因为一条消息只能被Kafka 同一个分组下某一个消费者消费,而不是所有消费者都能消费,所以从严格意义上来讲并不能算是广播 模式,当然如果希望实现广播模式只要保证每个消费者均属于不同的消费者组。针对Kafka同一条消息 只能被同一个消费者组下的某一个消费者消费的特性,要实现多播只要保证这些消费者属于不同的消费 者组即可。然后通过生产者发送几条消息,可以看到不同消费者组的消费者同时能消费到消息,然而同 一个消费者组下的消费者却只能有一个消费者能消费到消息。
Kafka的高水位和Leader Epoch
问过的一些公司:ebay 参考答案:
高水位(High Watermark),通常被用在流式处理领域(比如Apache Flink、Apache Spark等),以表征元素或事件在基于时间层面上的进度。一个比较经典的表述为:流式系统保证在水位t时刻,创建时间
(event time) = t’且t’ ≤ t的所有事件都已经到达或被观测到。在Kafka中,水位的概念反而与时间无关,而是与位置信息相关。严格来说,它表示的就是位置信息,即位移(oGset)。通俗的说下HW作 用:Kafka使用HW值来决定副本备份的进度
Kafka分区下有可能有很多个副本(replica)用于实现冗余,从而进一步实现高可用。副本根据角色的不同 可分为3类:
leader副本:响应clients端读写请求的副本
follower副本:被动地备份leader副本中的数据,不能响应clients端读写请求。
ISR副本:包含了leader副本和所有与leader副本保持同步的follower副本——如何判定是否与leader
同步后面会提到
每个Kafka副本对象都有两个重要的属性:LEO和HW。注意是所有的副本,而不只是leader副本。
LEO:即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下一条消息!也就是说,如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。另外,leader LEO和follower LEO的更新是有区别的。
HW:即上面提到的水位值。对于同一个副本对象而言,其HW值不会大于LEO值。小于等于HW值的 所有消息都被认为是“已备份”的(replicated)。同理,leader副本和follower副本的HW更新也是有 区别的。
通过下图我们来了解下LEO和HW两者的关系:
上图中,HW值是7,表示位移是0 ~ 7的所有消息都已经处于“已备份状态”(committed),而LEO值是15,那么8~14的消息就是尚未完全备份(fully replicated)——为什么没有15?因为刚才说过了,LEO指向的是下一条消息到来时的位移,故上图使用虚线框表示。我们总说consumer无法消费未提交消息。这 句话如果用以上名词来解读的话,应该表述为:consumer无法消费分区下leader副本中位移值大于分区 HW的任何消息。这里需要特别注意分区HW就是leader副本的HW值。
1、follower副本何时更新LEO?
如前所述,follower副本只是被动地向leader副本请求数据,具体表现为follower副本不停地向leader副本 所在的broker发送FETCH请求,一旦获取消息后写入自己的日志中进行备份。那么follower副本的LEO是 何时更新的呢?Kafka有两套follower副本LEO:1. 一套LEO保存在follower副本所在broker的副本管理机中;2. 另一套LEO保存在leader副本所在broker的副本管理机中——换句话说,leader副本机器上保存了所有的follower副本的LEO。
为什么要保存两套?这是因为Kafka使用前者帮助follower副本更新其HW值;而利用后者帮助leader副本 更新其HW使用。下面我们分别看下它们被更新的时机。
83 follower副本端的follower副本LEO何时更新?
follower副本端的LEO值就是其底层日志的LEO值,也就是说每当新写入一条消息,其LEO值就会被更新 (类似于LEO += 1)。当follower发送FETCH请求后,leader将数据返回给follower,此时follower开始向底层log写数据,从而自动地更新LEO值。
84 leader副本端的follower副本LEO何时更新?
leader副本端的follower副本LEO的更新发生在leader在处理follower FETCH请求时。一旦leader接收到follower发送的FETCH请求,它首先会从自己的log中读取相应的数据,但是在给follower返回数据之前它 先去更新follower的LEO(即上面所说的第二套LEO)。
2、follower副本何时更新HW?
follower更新HW发生在其更新LEO之后,一旦follower向log写完数据,它会尝试更新它自己的HW值。具 体算法就是比较当前LEO值与FETCH响应中leader的HW值,取两者的小者作为新的HW值。这告诉我们一 个事实:如果follower的LEO值超过了leader的HW值,那么follower HW值是不会越过leader HW值的。
3、leader副本何时更新LEO?
和follower更新LEO道理相同,leader写log时就会自动地更新它自己的LEO值。
4、leader副本何时更新HW值?
前面说过了,leader的HW值就是分区HW值,因此何时更新这个值是我们最关心的,因为它直接影响了 分区数据对于consumer的可见性 。以下4种情况下leader会尝试去更新分区HW——切记是尝试,有可能因为不满足条件而不做任何更新:
副本成为leader副本时:当某个副本成为了分区的leader副本,Kafka会尝试去更新分区HW。这是 显而易见的道理,毕竟分区leader发生了变更,这个副本的状态是一定要检查的!不过,本文讨论 的是当系统稳定后且正常工作时备份机制可能出现的问题,故这个条件不在我们的讨论之列。broker出现崩溃导致副本被踢出ISR时:若有broker崩溃则必须查看下是否会波及此分区,因此检查 下分区HW值是否需要更新是有必要的。本文不对这种情况做深入讨论
producer向leader副本写入消息时:因为写入消息会更新leader的LEO,故有必要再查看下HW值是 否也需要修改
leader处理follower FETCH请求时:当leader处理follower的FETCH请求时首先会从底层的log读取数据,之后会尝试更新分区HW值
特别注意上面4个条件中的最后两个。它揭示了一个事实——当Kafka broker都正常工作时,分区HW值的更新时机有两个:leader处理PRODUCE请求时和leader处理FETCH请求时。另外,leader是如何更新它的 HW值的呢?前面说过了,leader broker上保存了一套follower副本的LEO以及它自己的LEO。当尝试确定分区HW时,它会选出所有满足条件的副本,比较它们的LEO(当然也包括leader自己的LEO),并选择最小 的LEO值作为HW值。这里的满足条件主要是指副本要满足以下两个条件之一:
处于ISR中
副本LEO落后于leader LEO的时长不大于replica.lag.time.max.ms参数值(默认是10s)
乍看上去好像这两个条件说得是一回事,毕竟ISR的定义就是第二个条件描述的那样。但某些情况下
Kafka的确可能出现副本已经“追上”了leader的进度,但却不在ISR中——比如某个从failure中恢复的副
本。如果Kafka只判断第一个条件的话,确定分区HW值时就不会考虑这些未在ISR中的副本,但这些副本 已经具备了“立刻进入ISR”的资格,因此就可能出现分区HW值越过ISR中副本LEO的情况——这肯定是不允 许的,因为分区HW实际上就是ISR中所有副本LEO的最小值。
下面举个实际的例子。我们假设有一个topic,单分区,副本因子是2,即一个leader副本和一个follower 副本。我们看下当producer发送一条消息时,broker端的副本到底会发生什么事情以及分区HW是如何被 更新的。
下图是初始状态,稍微解释一下:初始时leader和follower的HW和LEO都是0(严格来说源代码会初始化 LEO为-1,不过这不影响之后的讨论)。leader中的remote LEO指的就是leader端保存的follower LEO,也被初始化成0。此时,producer没有发送任何消息给leader,而follower已经开始不断地给leader发送FETCH请求了,但因为没有数据因此什么都不会发生。值得一提的是,follower发送过来的FETCH请求因
为无数据而暂时会被寄存到leader端的purgatory中,待500ms(replica.fetch.wait.max.ms参数)超时后会强 制完成。倘若在寄存期间producer端发送过来数据,那么会Kafka会自动唤醒该FETCH请求,让leader继 续处理之。
因为FETCH请求发送和PRODUCE请求处理的时机会影响后面的一些内容。因此后续我们也将分两种情况 来讨论分区HW的更新。
第一种情况:follower发送FETCH请求在leader处理完PRODUCE请求之后
producer给该topic分区发送了一条消息。此时的状态如下图所示:
如图所示,leader接收到PRODUCE请求主要做两件事情:
把消息写入写底层log(同时也就自动地更新了leader的LEO)
尝试更新leader HW值(前面leader副本何时更新HW值一节中的第三个条件触发)。我们已经假设此时follower尚未发送FETCH请求,那么leader端保存的remote LEO依然是0,因此leader会比较它自己的LEO值和remote LEO值,发现最小值是0,与当前HW值相同,故不会更新分区HW值
所以,PRODUCE请求处理完成后leader端的HW值依然是0,而LEO是1,remote LEO是1。假设此时follower发送了FETCH请求(或者说follower早已发送了FETCH请求,只不过在broker的请求队列中排队), 那么状态变更如下图所示:
本例中当follower发送FETCH请求时,leader端的处理依次是:
a. 读取底层log数据
b. 更新remote LEO = 0(为什么是0? 因为此时follower还没有写入这条消息。leader如何确认follower 还未写入呢?这是通过follower发来的FETCH请求中的fetch offset来确定的)
c. 尝试更新分区HW——此时leader LEO = 1,remote LEO = 0,故分区HW值= min(leader LEO, follower
remote LEO) = 0
a. 把数据和当前分区HW值(依然是0)发送给follower副本而follower副本接收到FETCH response后依次执行下列操作:
- 写入本地log(同时更新follower LEO)
- 更新follower HW——比较本地LEO和当前leader HW取小者,故follower HW = 0
此时,第一轮FETCH RPC结束,我们会发现虽然leader和follower都已经在log中保存了这条消息,但分区HW值尚未被更新。实际上,它是在第二轮FETCH RPC中被更新的,如下图所示:
img
上图中,follower发来了第二轮FETCH请求,leader端接收到后仍然会依次执行下列操作: - 读取底层log数据
- 更新remote LEO = 1(这次为什么是1了? 因为这轮FETCH RPC携带的fetch offset是1,那么为什么这轮携带的就是1了呢,因为上一轮结束后follower LEO被更新为1了)
- 尝试更新分区HW——此时leader LEO = 1,remote LEO = 1,故分区HW值= min(leader LEO, follower remote LEO) = 1。注意分区HW值此时被更新了!!!
- 把数据(实际上没有数据)和当前分区HW值(已更新为1)发送给follower副本
同样地,follower副本接收到FETCH response后依次执行下列操作:
85 写入本地log,当然没东西可写,故follower LEO也不会变化,依然是1 - 更新follower HW——比较本地LEO和当前leader LEO取小者。由于此时两者都是1,故更新follower
HW = 1
producer端发送消息后broker端完整的处理流程就讲完了。此时消息已经成功地被复制到leader和follower的log中且分区HW是1,表明consumer能够消费offset = 0的这条消息。下面我们来分析下PRODUCE和FETCH请求交互的第二种情况。
第二种情况:FETCH请求保存在purgatory中PRODUCE请求到来
这种情况实际上和第一种情况差不多。前面说过了,当leader无法立即满足FECTH返回要求的时候(比如 没有数据),那么该FETCH请求会被暂存到leader端的purgatory中,待时机成熟时会尝试再次处理它。不 过Kafka不会无限期地将其缓存着,默认有个超时时间(500ms),一旦超时时间已过,则这个请求会被 强制完成。不过我们要讨论的场景是在寄存期间,producer发送PRODUCE请求从而使之满足了条件从而 被唤醒。此时,leader端处理流程如下: - leader写入本地log(同时自动更新leader LEO)
- 尝试唤醒在purgatory中寄存的FETCH请求
- 尝试更新分区HW
至于唤醒后的FETCH请求的处理与第一种情况完全一致,故这里不做详细展开了。
以上所有的东西其实就想说明一件事情:Kafka使用HW值来决定副本备份的进度,而HW值的更新通常 需要额外一轮FETCH RPC才能完成,故而这种设计是有问题的。它们可能引起的问题包括:
备份数据丢失备份数据不一致
我们来看下上述的两种情况:
86 数据丢失
如前所述,使用HW值来确定备份进度时其值的更新是在下一轮RPC中完成的。现在翻到上面使用两种不 同颜色标记的步骤处思考下, 如果follower副本在蓝色标记的第一步与紫色标记的第二步之间发生崩
溃,那么就有可能造成数据的丢失。我们举个例子来看下。
上图中有两个副本:A和B。开始状态是A是leader。我们假设producer端min.insync.replicas设置为1,那 么当producer发送两条消息给A后,A写入到底层log,此时Kafka会通知producer说这两条消息写入成
功。
但是在broker端,leader和follower底层的log虽都写入了2条消息且分区HW已经被更新到2,但follower HW尚未被更新(也就是上面紫色颜色标记的第二步尚未执行)。倘若此时副本B所在的broker宕机,那 么重启回来后B会自动把LEO调整到之前的HW值,故副本B会做日志截断(log truncation),将offset = 1的那条消息从log中删除,并调整LEO = 1,此时follower副本底层log中就只有一条消息,即offset = 0的消息。
B重启之后需要给A发FETCH请求,但若A所在broker机器在此时宕机,那么Kafka会令B成为新的leader, 而当A重启回来后也会执行日志截断,将HW调整回1。这样,位移=1的消息就从两个副本的log中被删 除,即永远地丢失了。
这个场景丢失数据的前提是在min.insync.replicas=1时,一旦消息被写入leader端log即被认为是“已提交”,而延迟一轮FETCH RPC更新HW值的设计使得follower HW值是异步延迟更新的,倘若在这个过程中
leader发生变更,那么成为新leader的follower的HW值就有可能是过期的,使得clients端认为是成功提交 的消息被删除。
87 leader/follower数据离散
除了可能造成的数据丢失以外,这种设计还有一个潜在的问题,即造成leader端log和follower端log的数 据不一致。比如leader端保存的记录序列是r1,r2,r3,r4,r5, ;而follower端保存的序列可能是
r1,r3,r4,r5,r6 。这也是非法的场景,因为顾名思义,follower必须追随leader,完整地备份leader端的数
据。
我们依然使用一张图来说明这种场景是如何发生的:
这种情况的初始状态与情况1有一些不同的:A依然是leader,A的log写入了2条消息,但B的log只写入了
1条消息。分区HW更新到2,但B的HW还是1,同时producer端的min.insync.replicas = 1。
这次我们让A和B所在机器同时挂掉,然后假设B先重启回来,因此成为leader,分区HW = 1。假设此时producer发送了第3条消息(绿色框表示)给B,于是B的log中offset = 1的消息变成了绿色框表示的消息,同时分区HW更新到2(A还没有回来,就B一个副本,故可以直接更新HW而不用理会A)之后A重启回来, 需要执行日志截断,但发现此时分区HW=2而A之前的HW值也是2,故不做任何调整。此后A和B将以这种 状态继续正常工作。
显然,这种场景下,A和B底层log中保存在offset = 1的消息是不同的记录,从而引发不一致的情形出现。
关于上述问题的解决方案:
造成上述两个问题的根本原因在于HW值被用于衡量副本备份的成功与否以及在出现failture时作为日志 截断的依据,但HW值的更新是异步延迟的,特别是需要额外的FETCH请求处理流程才能更新,故这中间 发生的任何崩溃都可能导致HW值的过期。鉴于这些原因,Kafka 0.11引入了leader epoch来取代HW值。Leader端多开辟一段内存区域专门保存leader的epoch信息,这样即使出现上面的两个场景也能很好地规 避这些问题。
所谓leader epoch实际上是一对值:(epoch,offset)。epoch表示leader的版本号,从0开始,当leader变更过1次时epoch就会+1,而offset则对应于该epoch版本的leader写入第一条消息的位移。因此假设有两 对值:
(0, 0)和(1, 120)
则表示第一个leader从位移0开始写入消息;共写了120条[0, 119];而第二个leader版本号是1,从位移120处开始写入消息。
leader broker中会保存这样的一个缓存,并定期地写入到一个checkpoint文件中。
当leader写底层log时它会尝试更新整个缓存——如果这个leader首次写消息,则会在缓存中增加一个条 目;否则就不做更新。而每次副本重新成为leader时会查询这部分缓存,获取出对应leader版本的位移, 这就不会发生数据不一致和丢失的情况。
下面使用图的方式来说明下利用leader epoch如何规避上述两种情况
88 规避数据丢失
上图左半边已经给出了简要的流程描述,这里不详细展开具体的leader epoch实现细节(比如OffsetsForLeaderEpochRequest的实现),我们只需要知道每个副本都引入了新的状态来保存自己当leader时开始写入的第一条消息的offset以及leader版本。这样在恢复的时候完全使用这些信息而非水位 来判断是否需要截断日志。
89 规避数据不一致
更多推荐
所有评论(0)