kafka重复消费问题
问题描述采用kafka读取消息进行处理时,consumer会重复读取afka队列中的数据。问题原因kafka的consumer消费数据时首先会从broker里读取一批消息数据进行处理,处理完成后再提交offset。而我们项目中的consumer消费能力比较低,导致取出的一批数据在session.timeout.ms时间内没有处理完成,自动提交offset失败,然后kafka会重新分配part
·
问题描述
采用kafka读取消息进行处理时,consumer会重复读取afka队列中的数据。
问题原因
kafka的consumer消费数据时首先会从broker里读取一批消息数据进行处理,处理完成后再提交offset。而我们项目中的consumer消费能力比较低,导致取出的一批数据在session.timeout.ms时间内没有处理完成,自动提交offset失败,然后kafka会重新分配partition给消费者,消费者又重新消费之前的一批数据,又出现了消费超时,所以会造成死循环,一直消费相同的数据。
解决方案
项目中使用的是spring-kafka,所以把kafka消费者的配置enable.auto.commit设为false,禁止kafka自动提交offset,从而使用spring-kafka提供的offset提交策略。spring-kafka中的offset提交策略可以保证一批消息数据没有完成消费的情况下,也能提交offset,从而避免了提交失败而导致永远重复消费的问题。
spring-kafka中的offset提交策略原理参考了总结kafka的consumer消费能力很低的情况下的处理方案中的分析:
>
首先来看看spring-kafka的消费线程逻辑
if (isRunning() && this.definedPartitions != null) {
initPartitionsIfNeeded();
// we start the invoker here as there will be no rebalance calls to
// trigger it, but only if the container is not set to autocommit
// otherwise we will process records on a separate thread
if (!this.autoCommit) {
startInvoker();
}
}
上面可以看到,如果auto.commit关掉的话,spring-kafka会启动一个invoker,这个invoker的目的就是启动一个线程去消费数据,他消费的数据不是直接从kafka里面直接取的,那么他消费的数据从哪里来呢?他是从一个spring-kafka自己创建的阻塞队列里面取的。
然后会进入一个循环,从源代码中可以看到如果auto.commit被关掉的话, 他会先把之前处理过的数据先进行提交offset,然后再去从kafka里面取数据。
然后把取到的数据丢给上面提到的阻塞列队,由上面创建的线程去消费,并且如果阻塞队列满了导致取到的数据塞不进去的话,spring-kafka会调用kafka的pause方法,则consumer会停止从kafka里面继续再拿数据。
接着spring-kafka还会处理一些异常的情况,比如失败之后是不是需要commit offset这样的逻辑。
更多推荐
已为社区贡献1条内容
所有评论(0)