从消息的流转梳理kafka的整体架构
kafka 消息流转 生产者 broker集群 消费者
写在前文
关于kafka架构知识的梳理之前想了很多怎么去写,总感觉有很多知识点,每个知识点都有大量的文章去讲解,但感觉知识总是碎片化接收,看了这个忘了那个,最近终于 下定决心,对kafka进行一次梳理,也为了接下来的面试准备,所以此文从一条消息的流转去看整个kafka的架构,以及架构中使用到的技术,也算是对自己知识的一次加强。
看完记得一键三连 哦~~~
看完记得一键三连 哦~~~
看完记得一键三连 哦~~~
前置
虽然kafka生成要舍弃zookeeper,但是从官网版本来看,也是从3.3.X版本才开始提供kraft启动版本。本文还是基于3.0版本。
文中关于一些图片,因为看到有些已经画的很好了,就直接拿过来用了,毕竟,对于画图这事,对我很难。。。。。。
关于kafka的整体架构,这里就不再说了,记住1点,消息中间件,都是生产者、broker、消费者三者,所以消息的流转,其实就是从生产者到broker,再从broker到达消费者的过程。
生产端
生产者怎么知道往哪发?
要发送消息,总要知道往哪里发送吧?在生产端启动时肯定会配置kafka中broker的地址(kafka.bootstrap-servers),在生产者启动时会根据配置地址发送元数据请求,broker会响应包含主题的分区信息、每个分区的leader以及ISR(In-Sync Replicas)等详细信息,并在本地进行缓存,并且定期去更新元数据缓存。
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList("bootstrap.servers"), config.getString("client.dns.lookup"));
if (metadata != null) {
this.metadata = metadata;
} else {
this.metadata = new ProducerMetadata(retryBackoffMs, config.getLong("metadata.max.age.ms"), config.getLong("metadata.max.idle.ms"), logContext, clusterResourceListeners, Time.SYSTEM);
//调用bootstrap方法,使用之前解析出的broker地址列表来启动元数据获取过程
this.metadata.bootstrap(addresses);
}
retryBackoffMs
:重试间隔时间metadata.max.age.ms
:元数据的最大生存时间,在此时间后将自动刷新元数据metadata.max.idle.ms
:元数据在无消息发送时的最大空闲时间,超过这个时间也会触发元数据刷新logContext
、clusterResourceListeners
和Time.SYSTEM
:这些参数与日志记录、集群资源监听以及时间戳相关
生产者发送消息流程
在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。
流程说明
RecordAccumulator(ConcurrentMap<TopicPartition, Deque> batches;)为什么是一个双端队列?
- 高效的插入和删除操作:双端队列允许从队列的头部和尾部进行插入和删除操作,这对 RecordAccumulator 来说非常重要。生产者可以将新的消息追加到队尾,同时可以从队头快速移除已经发送的消息。
- 支持批量处理:双端队列能够有效地支持批量处理消息,生产者可以将多条消息组织成一个批次一次性添加到队列中,提高发送消息的效率。
- 顺序写入和读取:双端队列保证了消息的顺序性,生产者发送的消息会按照先后顺序存储并发送。这有助于维护消息的顺序,确保消费者按照正确的顺序接收消息。
- 动态内存管理:双端队列相对灵活,能够动态分配和释放内存,适应不同大小的消息。这对 RecordAccumulator 来说很有用,因为它需要处理各种大小的消息。
提升生产者发送消息的速度
1、缩小batch.size(不建议)或者调整linger.ms。这两个参数就是控制什么时候去RecordAccumulator中拉取消息。
2、修改RecordAccumulator缓冲区大小
3、其实还有一个compression.type。控制消息进行压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd。
4、调整ack策略(涉及到消息的可靠性)。
topic分区
发送消息是按照topic进行发送的,在发送过程中可以指定topic下的partition分区,也可以采用默认的分区(这里涉及到分区的分配策略)。
topic为什么要分区(有了topic为什么还要partition)?
topic可以看做是消息的逻辑分区,它可以跨broker存在,从表面上看生产者将消息发送给topic,然后从topic中获取消息。而partition可以看做是消息的物理分区,在kafka的架构中,消息其实最终是由partition进行顺序存储的,每个分区根据需要位于不同的broker上面,而消息最终其实也是从broker上的leader partition上进行获取。
总结:
1、提升吞吐量。一个topic多个分区,其实也是提升了消息的并行处理能力。对消费者来说多个partition也可以提升消息的消费能力。
2、负载均衡。分区和消费者的数量存在对应关系,每个partition在一个broker上进行存储,可以看做是对消息的承载进行了负载均衡。
3、增加可扩展性。对于后续扩展来说,只需要增加分区数量就可以完成对kafka集群的扩展。
消息是怎么选择分区进行存储的?
默认分区器:DefaultPartitioner
分区策略:
public class DefaultPartitioner implements Partitioner {
//获取要写入的分区ID
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,int numPartitions) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
1、指明partition的情况下,直接将指明的值作为partition值
2、没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;
3、既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进行使用(如果还是0会继续随机)。
上面的是kafka默认分区器,如果项目有特殊要求,可以自定义分区器。
消息可靠性
知道了消息要发送到哪很重要,消息发送到了没有也很重要,也就是消息的可靠性!
ack机制
kafka对生产者发送消息提供了ack机制:
0:生产者发送过来的数据,不需要等数据落盘应答
1:生产者发送过来的数据,Leader收到数据后应答。
-1(all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答。
通过ack的设置,我们可以确定的知道消息是否到达broker,然后做对应的后续处理。要注意的是ack的设置只是一方面,如果想要确保消息的完全可靠性,还和分区副本数量有关!因为对于ack=-1来说,要等ISR中的所有分区副本节点也完成数据同步才会进行ack!
针对ack的值不一样,在kafka中定义的语义概念也不一样,主要有
1、最多一次(ack=0),可能会丢,但数据不会重复。
2、至少一次(ack=-1 && 分区副本大于等于2 && ISR中分区副本节点数量大于等于2),保证消息不丢失,但消息可能重复。
上面两种看着好像都不行啊,要么会丢,要么会重复,别急,kafka还提供了**精确一次(Exactly Once)**的语义。
精确一次(Exactly Once):要用到kafka提供的事务和幂等性特性。
什么是幂等性?
就是不管你生产者发送了多少条消息,到broker这里就给你持久化一条,保证不重复。
kafka怎么判断消息唯一?
Producer ID(PID) + partition ID +Sequence Number !
PID:生产者ID,每个生产者在连接到Kafka集群时会被分配一个唯一的Producer ID (PID),用于标识这个生产者。要注意的是如果生产者重启,这个ID是可能会变的!
Partition ID :分区序号
Sequence Number:Producer为每条消息分配的全局唯一的标识符,用于确保消息在生产者端的顺序和唯一性。它是在消息发送时由Producer分配的,并随着每条新消息递增。
幂等性保证了消息只会发一次,也就是不会重复,所以
精确一次:幂等性 + 至少一次
注意:幂等性是要手动开启的,启参数 enable.idempotence 默认为 true,false 关闭。
事务消息
当然除了上面的幂等性+至少一次,kafka还提供了事务消息
注意:开启事务,必须先开启幂等性。事务消息必须先自定义个一个transactional.id,即使客户端挂掉了,它重启后也能继续处理未完成的事务。
事务消息和幂等性+至少一次有什么区别:
1、事务消息可以允许一个事务中提交多条消息,写入到不同分区
2、事务消息因为提前定义了事务ID,因此生产者重启后还可以继续处理未完成的事务,但幂等性+至少一次,生产者重启后,可能因为生产者ID变化导致消息重复。
3、事务消息可以跨操作,因为是自定义提交,灵活性也更高。
消息的顺序性
消息有时候不仅要求精确一次,有些场景下还会要求消息必须有顺序性,kafka中怎么保证消息顺序性。
我们知道消息是被顺序写入日志文件的,也就是说kafka中天然的就保证在同一个分区中,消息的顺序性和发送消息的顺序有关,那我们要做的就是让消息都进入到同一个分区,然后只让一个消费者单线程的消费这个分区即可。
怎么保证消息都进入同一个分区?
参考消息分区策略和自定义分区策略。
要注意的是如果kafka开启了幂等性,max.in.flight.requests.per.connection需要设置小于等于5。没有开启幂等性设置为1即可。
设置小于5的说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的。
broker端
消息终于到broker了,看看在broker上消息是怎么被处理的。
消息在broker上面是怎么进行存储的?
broker基础知识
先了解下broker:
通常情况下都会搭建broker集群,也就是存在多个broker,那broker集群中谁做主?
broker的controller选举
broker集群会选举出一个controller角色的broker,controller节点主要用于管理分区副本分配、分区leader选举、集群broker的上下线管理等。
controller选举:
1、当kafka启动时每个broker都会向zookeeper注册节点(/controller节点下为自己的ID),在首次启动时,基本上就是谁先注册成功,谁就是controller。其余的broker没有成为controller,则对/controller节点进行监听。
2、controller节点负责将所有broker的信息(ID、主机名、端口等)上传到zk中,同时将分区的分配方案(分区leader及副本信息等)也上传至zk,其他broker进行信息同步。
如果controller宕机,此时其他broker进行节点注册,通常情况下,会由节点序号最小的broker称为controller。这是因为每个broker向zk进行注册时,zk都会创建一个临时递增序号的节点,而broker节点号最小,意味着可以很好的避免分区不一致情况。
如果非controller broker节点宕机,此时该broker上的分区leader也是宕机状态,此时将由controller从zk上查询该分区的ISR列表,再根据从前往后的顺序在ISR中选举出新的分区leader。
ISR、AR、OSR
每个broker中都会存在很多个分区,不同的topic根据副本数量将会在broker集群中分配到不同的broker中,但是在一个topic下只会存在一个分区leader,其余的分区将成为follower。
ISR:与leader保持同步的follower集合。如果follower长时间未向leader发送同步请求,则该follower将被剔除ISR。该时间阈值由 replica.lag.time.max.ms参数设定,默认 30s。Leader 发生故障之后,就会从 ISR 中选举新的 Leader。
OSR:表示与leader同步延迟过多的分区集合。
AR:AR=ISR+OSR。
分区leader和follower
LEO、HW
LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1。
HW(High Watermark):所有副本中最小的LEO 。
分区可能存在的故障:
1、leader发生故障
leader发生故障后,将由broker中的controller进行重新选举出一个leader,此时为了保证数据一致性,其他的follower将各自log中大于HW的日志截掉,然后从新leader中进行数据同步。(注意只是保证了各个分区的数据一致性,不保证数据不丢失)
2、followe发生故障
follower发生故障后,会从ISR中剔除,此后leader将继续与该follower保持联系,如果follower后续恢复,将自身log日志中的大于hw的日志截掉,重新从leader中进行数据同步,等待该follower中的leo大于hw,将再次加入ISR。
leader和follower之间是怎么进行数据同步的?follower宕机后怎么和leader保持联系?一直宕机会一直进行等待么?
kafka中数据同步是通过 日志复制的机制来保证的。当leader接收到消息,会将消息追加到本地文件中,然后将消息发送给所有的follower,各个follower再将消息追加到自己的日志文件中。
消息写到日志中,这也是生产者发送消息的ack机制的提现。
日志同步方式:
1、同步复制:
2、异步复制
kafka中默认是异步复制,要修改为同步复制的话,涉及参数min.insync.replicas(设置大于1),生产者ack机制设置为all。
leader与follower通信
Leader节点会定期检测所有的Follower节点的状态,以确保数据的同步和可靠性。这个定期检测的时间间隔可以通过参数replica.lag.time.max.ms
来控制。参数replica.lag.time.max.ms
定义了Leader节点等待Follower节点恢复的最大时间间隔。如果Follower节点在超过这个时间间隔仍未能与Leader节点保持同步,Leader节点可能会视该节点为失效节点并触发相应的处理机制。默认情况下,replica.lag.time.max.ms
的值为10000毫秒(10秒)。
这个时间到期后,如果follower节点恢复,则进行数据同步,如果还是没有恢复,leader节点则尝试重新建立与该Follower节点的数据连接,replica.socket.timeout.ms: 这个参数指定了与Follower节点建立连接的Socket超时时间。默认值为30000毫秒(30秒)。如果在这个时间内无法与Follower节点建立连接,则连接会被视为失败。
确定follower节点彻底失联,此时将有controller节点进行自动故障转移(重新选举 leader,其他副本将被重新进行分配),当然极端情况下可能需要人工进行干预,手动调整分区副本分配。
分区的自平衡
正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但broker节点宕机后会重新分配分区leader和follower的重新分配,如果broker宕机后,其他机器承载的分区leader就会相应增加,导致部分broker读写请求压力过大, 即使后续其他broker恢复,上面的分区也都是follower分区,造成集群的负载不均衡。
解决方案:
分区自自动平衡机制。
auto.leader.rebalance.enable,默认是true。自动Leader Partition 平衡
leader.imbalance.per.broker.percentage,默认是10%。每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡。
leader.imbalance.check.interval.seconds,默认值300秒。检查leader负载是否平衡的间隔时间。
增加副本
其实就是通过设置更多的副本,从而达到负载均衡的效果。
自平衡的方案可以根据配置规则自动调整分区和副本的分布,减少人工干预,提高管理效率。但是leader的选举代价也 比较大, 尤其是大规模集群下可能带来性能影响。
增加副本因子的方案提高容错性、读取性能、和数据可靠性,但相应的也会占用更多的资源,增加副本数量也会导致数据的传输和同步开销。
broker接收消息后干了什么?
文件存储
我们知道消息最终是要落盘的,这也是kafka保证消息持久化的方式,每个partition都会在本地对应一个log文件,这个log文件就是partition接收到的消息载体,消息是被顺序写入到log文件的。
当然为了避免所有消息都写入到一个文件中,造成数据定位效率低下,kafka对日志采取了分片和索引机制,也就是将每个partition下的log分为多个segement。每个segement下面包含 .index,.log,.timeindex等文件,这些文件的命名规则topic+分区序号。index和log文件则以当前第一条消息的offset命名。
.log:日志文件
.index:偏移量索引文件。
index文件中定义的是稀疏索引文件,大约每往log中写入4kb(log.index.interval.bytes参数控制),就往index文件中写入一条记录,index中保存的是日志的相对offset(从第一条消息的offset相对),这样能确保offset的值所占空间不会过大,因此能将offset的值控制在固定大小。通过相对offset找到log文件中的log位置,再往下遍历就能快速找到具体offset的消息。
.timeindex :时间戳文件
文件不停地写入,总有一天会把磁盘撑满,kafka中提供了的日志保存时间,默认为7天,可以设置日志保存时间:
log.retention.hours,最低优先级小时,默认 7 天。
log.retention.minutes,分钟。
log.retention.ms,最高优先级毫秒。
log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟
日志过了设置时间,怎么处理?
log.cleanup.policy:delete/compact
1、delete删除策略:所有数据启用删除策略,
(1)基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。
(2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。
2、compact日志压缩:对于相同key的不同value值,只保留最后一个版本。这种情况下压缩后的offset可能是不连续的。
为什么快?
1、kafka分布式集群,分区分布式,并行度高
2、读数据采取稀疏索引,定位快
3、顺序写磁盘
4、页缓存和零copy
kafka中的零拷贝主要体现:
内核缓冲区:Kafka生产者将数据写入到操作系统内核缓冲区中,而不是直接写入到用户空间缓冲区。
sendfile系统调用:Kafka使用sendfile系统调用将数据从内核缓冲区直接发送到网络套接字缓冲区中,避免了数据的复制。sendfile系统调用是一种高效的数据传输方式,可以直接将文件中的数据发送到网络中,而不需要将其复制到用户空间缓冲区中。
零拷贝读取:Kafka消费者从网络套接字缓冲区中读取数据时,可以使用mmap系统调用将网络缓冲区映射到用户空间内存中,从而避免了数据的复制。
简单来说就是mmap从磁盘读取消息,sendfile发送给消费者。
消费端
消息终于要到消费端了。
消息推拉方式
先了解下消费者是怎么获取消息的,其实就分为两种,一种你给我(推),一种我找你(拉)。
pull模式:kafka默认采用的是pull模式。由消费者主动拉取消息。
push模式:由broker主动推送消息,这种模式很难兼顾消费者的消费速度,容易造成消费端消息堆积。
在讲解拉消息过程前,先了解下消费者在kafka中是什么架构。
在kafka架构中,消费端是以组的形式存在, 多个消费者通过设置同一个groupId形成一个消费者组,而消息分区也是面对消费者组,也就是说分区是面对消费者组的,在一个消费者组中,一个分区只会分配给一个消费者进行消费。不同的消费者组互不影响,消费者组可以看作是逻辑上的一个消费者。
要注意的是消费者组中的消费者数量与分区数量是有关系的,如果消费者数量大于分区数量,会有消费者空闲,因为没有多余的分区分配给它。
group coordinator
有了消费者组,消费者与分区的关系是怎么确定的?怎么确定哪个分区由哪个消费者消费?消费者宕机后分区会重新进行分配么?
kafka架构中针对消费者组引入了一个协调者(coordinator),coordinator节点是位于broker的,但coordinator是和消费者组对应的,也就是说有多少个消费者组,就会有多少个coordinator。
coordinator位于哪个borker?
我们知道启动kafka后,会默认创建么__consumer_offsets topic,这个topic下有50个分区,而coordinator所处分区,是由消费者组的groupId 的hashcode值%50,得到分区序号,这个分区序号位于哪个broker,就选择哪个broker作为coordinator。
coordinator的作用?
Group Coordinator是Apache Kafka中的一个重要概念,负责管理和协调消费者组(Consumer Group)的成员,以及处理相关的协调任务。其主要作用包括以下几个方面:
- 消费者组成员管理:Group Coordinator负责管理消费者组中的成员,包括新成员加入和已有成员离开。它会跟踪消费者组的当前成员列表,并处理成员动态变化带来的协调工作。
- 分区分配:Group Coordinator负责协调消费者组成员之间的分区分配。在消费者加入或离开时,Group Coordinator会进行分区再分配,确保每个消费者都被分配到合适的分区以进行数据消费。消费者组的再平衡其实就是分区分配。
- 偏移量提交:消费者组在消费消息时需要提交偏移量(offset)以记录消费进度,Group Coordinator负责接收和存储消费者提交的偏移量信息。这样可以确保消费者组能够从正确位置继续消费数据。
- 偏移量查找:当消费者启动时,需要从上次提交的偏移量处开始消费数据。Group Coordinator提供了查找偏移量的功能,使消费者能够准确地定位到上次消费的位置进行继续消费。
- 心跳检测:消费者会定期向Group Coordinator发送心跳请求,以确认消费者仍然活跃。如果某个消费者长时间未响应心跳请求,则认为该消费者已经离开消费者组,需要重新进行分区分配。
coordinator与消费者怎么保持联系?
每一个消费者会定期给coordinator发送心跳(heartbeat.interval.ms默认3s),一旦超时(session.timeout.ms),该消费者会被移除,并触发再平衡;或者消费者处理消息的时间过长(max.poll.interval.ms),也会触发再平衡。
coordinator怎么计算分区分配?
- 消费者组成员信息:首先,Coordinator需要收集当前消费者组的所有成员信息,包括消费者ID、订阅的主题列表等。
- 主题分区信息:Coordinator获取与消费者组订阅的主题相关的分区信息,包括每个主题的分区数量、每个分区的副本分布等。
- 分区分配算法:Kafka提供了几种不同的分区分配算法,常用的有以下几种:
- Round Robin(轮询):按照消费者组内成员的顺序依次分配分区。
- Range(范围):将所有分区按顺序排列,然后将连续的分区范围分配给消费者。多个topic下,可能造成数据倾斜,因为有消费者可能多消费了分区,造成消息堆积等问题
- Sticky(粘性):尽量保持消费者之前的分区分配不变,只在必要时进行调整。
- 计算分区分配:根据选定的分区分配算法,Coordinator会根据消费者组成员信息和主题分区信息来计算新的分区分配方案。这个计算过程通常涉及到以下步骤:
- 确定每个消费者需要处理的分区数量。
- 根据分区分配算法为每个消费者分配合适的分区。
- 处理消费者加入或离开消费者组时的分区再平衡,使得每个消费者负责的分区数尽可能均衡。
- 发送分区分配方案:一旦计算出新的分区分配方案,Coordinator会将这个方案发送给各个消费者,要求它们按照新的分区分配方案重新加入消费者组并开始消费对应的分区。
偏移量维护
在了解偏移量维护前,先看看消费者是怎么消费消息的,因为消费完消息才是会提交偏移量。
消费者采用的是pull模式,因此是消费者发送消费请求,那消费者多久拉取一次消息?
消费者拉取消息的频率是由 Kafka 中的 fetch.min.bytes 和 fetch.max.wait.ms 参数控制的。
fetch.min.bytes:指定了消费者从代理服务器获取消息的最小字节数。如果当前可用数据量不足 fetch.min.bytes,则消费者将等待更多数据到达,直到满足或超过设定的最小字节数为止。
fetch.max.wait.ms:表示了消费者等待消息的超时时间,即最长等待时间。当当前可用数据不足 fetch.min.bytes 时,消费者会等待至 fetch.max.wait.ms 设置的时间,然后立即返回已经获取到的消息,而不必等到达到 fetch.min.bytes。
较小的 fetch.min.bytes 可能导致消费者更频繁地向代理服务器请求数据,而较大的 fetch.min.bytes 则可能导致消费者在一次请求中获取更多数据,但需要等待更长的时间。
除了上述参数外,还有一些其他与消息拉取频率相关的参数,如 fetch.max.bytes(每次请求获取的最大字节数)、max.poll.records(每次调用 poll() 方法最多获取的记录数)等,也会影响消费者的消息拉取频率。
从上面的几个参数中,可以看出消费者拉取消息用的是长轮询的方式。
offset存在哪里
consumer将offset保存在kafka的__consumer_offsets的topic中,采用 key 和 value 的方式存储数据。key 是 group.id+topic+
分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic+分区号就保留最新数据。
offset提交方式:
自动提交
enable.auto.commit:是否开启自动提交offset功能,默认是true
auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s
手动提交
分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。
• commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
• commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。
offset提交其实就是消费者将offset发送给broker中的coordinator,由coordinator在__consumer_offsets中进行维护,而kafka则定期对__consumer_offsets主题进行压缩,保留最新的offset信息,而旧的偏移量信息则会被删除,以减少主题的存储空间占用。
log.cleaner.enable
:该参数用于控制是否启用日志压缩功能,默认值为true
。设置为true
表示允许 Kafka 自动执行日志压缩操作。log.cleanup.policy
:该参数指定了主题的日志清理策略,包括delete
、compact
和compact,delete
等选项。对于__consumer_offsets
主题,通常会采用compact
策略,以实现日志压缩。log.retention.hours
和log.retention.minutes
:这两个参数指定了数据在日志中的保留时间,默认为 7 天。超过指定时间后,旧的偏移量信息将被清理和删除。
kafka还提供了消费者指定offset和时间戳开始消费。
消息的重复消费和漏消费
重复消费是指消费者多次消费了同一消息,造成重复消费的可能有 生产者 重复投递、消费者出现问题,导致一条消息被多个消费消费。
先不说生产者重复投递这个问题(参考生产者发送消息语义),消费者怎么避免重复消费和漏消费?
重复消费:消费者端首先要做好幂等消费,也就是你的逻辑要支持幂等性,再好的架构都挡不住意外的发生,所以做好提前预防才是最重要的。
重复消费的可能有很多,比如开启了自动offset提交,如果消费者已经消费,但是还没自动提交offset,消费者重启,导致重复消费。
漏消费:手动提交offset,提交的早了,消费者又挂了,重启后消息丢失。
怎么避免:
手动提交和业务逻辑绑定成原子操作。就是说你的业务逻辑确认完成后再去提交offset,同时做好逻辑的幂等性。
消费端消息堆积:
1、、消息堆积也要看情况,如果是消费者消费速度就是比较慢,例如业务逻辑复杂,耗时严重,这时候可以考虑增加消费者(前提是分区数量要足够,如果分区数量较少,可以考虑增加分区数)。
2、如果是消费者消费速度也可以,就是慢,还可以考虑排查是不是每次拉取的消息数量太少,增加批量消息拉取数量。
max.poll.records:一次 poll 拉取数据返回消息的最大条数,默认是 500 条。
fetch.max.bytes:默认 Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响。
关于kafka去除zookeeper
从kafka2.8之后就宣称要去除zookeeper,但看官方文档,直到3.2的版本才开始提供了kraft启动脚本。
zookeeper的主要作用还是controller节点的选举以及元数据的同步,去除zk,kafka减少了外部依赖,同时在kafka集群扩展时,也不再受zk瓶颈的影响,但同时controller节点的选举也不再是动态选举,而是由配置文件决定,让我们更具有针对性的加强controller节点的配置。
更多推荐
所有评论(0)