消息中间件(四)之-kafka重复消费问题
其实kafka的重复消费问题究其底层根本原因就是:已经消费了数据,但是offset没提交(kafka没有或者不知道该数据已经被消费)。 基于这种原因总结以下几个易造成重复消费的配置:原因1:强行kill线程,导致消费后的数据,offset没有提交(消费系统宕机、重启等)。原因2:设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe().
其实kafka的重复消费问题究其底层根本原因就是:已经消费了数据,但是offset没提交(kafka没有或者不知道该数据已经被消费)。 基于这种原因总结以下几个易造成重复消费的配置:
原因1:强行kill线程,导致消费后的数据,offset没有提交(消费系统宕机、重启等)。
原因2:设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。例如:
try {
consumer.unsubscribe();
} catch (Exception e) {
}
try {
consumer.close();
} catch (Exception e) {
}
上面代码会导致部分offset没提交,下次启动时会重复消费。
原因3:(重复消费最常见的原因):消费后的数据,当offset还没有提交时,partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。
原因4:当消费者重新分配partition的时候,可能出现从头开始消费的情况,导致重发问题。
原因5:当消费者消费的速度很慢的时候,可能在一个session周期内还未完成,导致心跳机制检测报告出问题。
问题描述:
我们系统压测过程中出现下面问题:异常rebalance,而且平均间隔3到5分钟就会触发rebalance,分析日志发现比较严重。错误日志如下:
08-09 11:01:11 131 pool-7-thread-3 ERROR [] -
commit failed
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713) ~[MsgAgent-jar-with-dependencies.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596) ~[MsgAgent-jar-with-dependencies.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218) ~[MsgAgent-jar-with-dependencies.jar:na]
at com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121) ~[MsgAgent-jar-with-dependencies.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
这个错误的意思是,消费者在处理完一批poll的消息后,在同步提交偏移量给broker时报的错。初步分析日志是由于当前消费者线程消费的分区已经被broker
给回收了,因为kafka认为这个消费者死了,那么为什么呢?
问题分析:
这里就涉及到问题是消费者在创建时会有一个属性max.poll.interval.ms
,
该属性意思为kafka消费者在每一轮poll()
调用之间的最大延迟,消费者在获取更多记录之前可以空闲的时间量的上限。如果此超时时间期满之前poll()
没有被再次调用,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。
如上图,在while循环里,我们会循环调用poll拉取broker中的最新消息。每次拉取后,会有一段处理时长,处理完成后,会进行下一轮poll。引入该配置的用途是,限制两次poll之间的间隔,消息处理逻辑太重,每一条消息处理时间较长,但是在这次poll()到下一轮poll()时间不能超过该配置间隔,协调器会明确地让使用者离开组,并触发新一轮的再平衡。max.poll.interval.ms
默认间隔时间为300s。
测试对应表格:
kafka的生产消费具体参数配置实例如下:
kafka.consumer.zookeeper.connect=zookeeper-ip:2181
kafka.consumer.servers=kafka-ip:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=test
kafka.consumer.group.id=test
kafka.consumer.concurrency=10
kafka.producer.servers=kafka-ip:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960
详细参数讲解描述请参考:https://blog.csdn.net/qq_33689414/article/details/80621572
如有披露或问题欢迎留言或者入群探讨
更多推荐
所有评论(0)