最近用kafka 做一个监控
通过flume采集数据推给kafka producer ,再由consumer来消费,过了一天发现消息队列有堆积,
去查日志发现报错:
Auto offset commit failed for group 0: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
查了一下
大概的意思是消费者的消费速度小于生产者的产生速度,所以导致了消息堆积,但是通过之前的测试可以知道.消费者的速度其实是远大于生产者的. 那么是什么原因呢? 真相只有一个:重复消费!
consumer在堆积的消息队列中拿出部分消息来消费,但是这个拿取是随机的(不确定,目前看来是),如果生产者一次产生的数据量过大,1秒钟5W条,那么consumer是有可能在规定的时间内(session.timeout.ms)消费不完的.
如果设置的是自动提交(enable.auto.commit),就会出现提交失败的情况,提交失败就会回滚,这部分数据就相当于没有被消费过,然后consumer继续去拿数据如果还没消费完就还是回滚,这样循环下去.(但是我发现有时候数据还是会被消费的,比如说数据小于两万条/次,所以我说取得数量是随机的,应该可以设置).
那么就目前解决方法来看:
一个是增加session.timeout.ms的时间,
一个是设置不要自动提交(enable.auto.commit=false).
最后我采取的方案是不要自动提交,因为如果随着我增加session.timeout.ms的时间,取得数据量也增加了那还是会超时(而且我也不知道怎么设置每次取得数量)
最后成功解决问题.
接触kafka时间不足一天,以上是本人一些愚见.
附spring-kafka配置图
所有评论(0)