一、问题描述

首先说一下背景,项目的基本逻辑是今天计划数据会提前一到两天发过来(创建类型报文),kafka 接收到创建类型的报文消息后会在数据库创建对应记录;之后时间来到今日会实时推送更新报文用来不断更新数据;今天有业务部门反应数据不对疑似今日数据没有更新,收到通知后我赶紧查看该业务涉及的消费者日志。首先 jps 一下看到任务还在,然后 tail -F 日志文件发现“消费正常”。

二、问题定位

正当我一筹莫展的时候,我突然发现消费者提交的偏移量不对,因为涉事主题的消息偏移量和消费者提交的偏移量相差几十万。到这里已经初步定位问题:那就是消息积压了,消费者还没有消费今天的数据因此导致今日数据没有更新。这时候只需要定位一下为什么会导致消息积压,首先绝对不是生产消息与消费消息不对等造成的,虽然每条消息的处理时间在1s左右,但在开发过程中已经测试过正常业务数据产生也是在1s,虽然存在生产消息异常导致一段时间内(因为测试,但也是在极短的时间)产生大量消息,但也不至于相差这么多。因此我只能拉取近一天的日志,希望能从中得到一些有用的线索。当我尝试在日志中搜索 ERROR 时,我发现了问题的所在

从日志能看出来该主题发生过再均衡,消费者疑似被剔除消费者组后又重新假如,但也继续了消费。初步判断可能这次偏移量的提交因为网络问题没有提交上,因为我是异步提交所以就没有了重试机制,但程序并没有停止,按道理不会出现这个问题呀!当我继续查找下一个 ERROR 时我发现居然是同样的错误,而且两次 ERROR 的间隔是五分钟,最后我发现了一个致胜的线索,那就是消费者重新加入消费者组再次消费时偏移量回去了,而且回到了上一次提交的位置而两个偏移量相差 500,到这里我就已经明白本次事故的原因了。各位看官明白了吗?

三、问题解决

本次问题原因一句话描述:消费者在规定时间内没有处理完该处理的消息会向 coordinator 发起 leave group 请求主动离开消费者组,当处理完消息准备提交偏移量时发现自己已经被踢了,于是重新加入当前消费者组从最新的偏移量出开始消费,同样这次依然在规定时间消费不完,导致陷入了死循环,就产生了消息积压。

熟悉 kafka 消费者的小伙伴或许已经知道了解决方案,因为 kafka 消费者有四个重要的参数分别是:max.poll.records、max.poll.interval.ms、session.timeout.ms、heartbeat.interval.ms;本次事故是因为前两个参数!!!我们知道 kafka 消费者代码通过 poll 拉取数据,为了提高消费速率采用批量的形式拉取,那么一次拉取多少就是通过 max.poll.records 决定的(还有一个单次拉取的消息总大小,但道理都是一样)默认 500,同时 max.poll.interval.ms 决定两次 poll 的最大间隔,也就是说一次 poll 拉取的数据必须要在这个间隔内消费完,否则消费者主动离队触发 rebalance,rebalance 后re-join前的偏移量提交将不被接受。上面说过一条消息处理时间大概在1s,因此如果一次拉取超过300条数据,就需要五分钟来处理,这样就超过两次 poll 的最大间隔,从日志也看出来 re-join 后开始消费的偏移量正好在报错之前的 500 位置,而多次报错间隔也在六七分钟左右远远超过 max.poll.interval.ms 间隔;从上游业务部分获取到消息,因此今天有试运行演练,在晚上九点的时候(这个九点是一个特殊的业务时间不解释)发送过一大波测试数据,因此一次 poll 就拉满了 max.poll.records 的默认值 500,而处理时间也就超过了五分钟导致了这次事故。

解决方案就很简单了,降低最大拉取次数或提高两次poll的时间间隔,最终我将参数修改如下

max.poll.records=100
max.poll.interval.ms=600000

一次最多拉取 100 条,最大处理时间为 10 分钟,经过两天的观察再也没有出现类似的错误了。本次事故总结:没有考虑到消息处理时间的问题,下次在开发消费者代码时需要评估最大拉取条数、单挑消息处理时间和最大处理时间三者的平衡,需要满足即使拉取最大条数也可以在最大时间间隔内消费完

四、问题衍生

这次事故让我重新去了解 kafka 的消费者,也提出了更多的问题,如:

问题一:poll 方法中传的时间作用是什么?

初学者可以会以为这个时间是每次 poll 之后停留的时间(因为消费者是不断地轮训嘛),但仔细想想似乎并不是,因为我们通常通过 while 循环来不断的调用 poll,消息没有消费完是不会进入下一次循环的。这个时间主要是在没有消息或者消息没有满足最大条数或最大容量时poll停留的时间。比如我们 poll 传的时间是 3s,如果当前主题没有最新的消息或者消息很少很小远达不到拉取的条件时,当超过 3s 就返回,也就是说这个时间是确保数据量比较小的时候也可以较为及时的消费数据,不然能攒到满足条件时不知道过了多少年!!!

问题二:session.timeout.ms、heartbeat.interval.ms 作用是什么?

消费者需要和 coordinator 维持一个心跳,当 coordinator 长时间接收不到心跳数据时该消费者就会被认定为离线,会被踢出并触发 rebalance 将被踢出消费者组之前所持有的分区交给其它组内其它消费者消费,保证数据可以被及时的消费(杜绝占着茅坑不拉屎现象)。但也不能因为消费者只一次没有成功发送心跳就把它给踢了那该多冤啊(主要是 rebalanca 期间该主题无法被消费),因此就多给了几次机会。heartbeat.interval.ms 的作用就是消费者发送心跳的间隔;session.timeout.ms 的作用就是消费者最长发送心跳的间隔,如果超过 session.timeout.ms 还没有收到心跳就会被踢出来;通常建议

session.timeout.ms = 3 * heartbeat.interval.ms

也就是事不过三,防止因为网络波动导致偶尔的提交失败;但这两个值不益设置过大,会丧失对消费者的感知(离线很久你都不知道)

需要注意消费者心跳机制在kafka 新老版本是不一致的(貌似是0.10左右),老版本的心跳机制是同步的,类似偏移量的提交,因此在使用老版本时候心跳机制是和消息的处理时间相挂钩;新版本的心跳机制则改为异步线程,对开发者是无感知的,这样的设计才是最好的即使你 process 线程处理时间再长也不影响心跳的发送,开发者只需要控制好 poll 相关的参数即可,新版本的心跳机制也就受网络因素了。

问题三:处理大消息的若干种方法

本次事故主要就是处理消息太慢了,问题很多的小伙伴就想了,极限情况下假如一条就需要处理五分钟该怎么处理呢?这个时候在调整 max.poll.* 等参数不明显的情况下我们可以考虑异步处理,代码如下:

对于有很多大消息甚至每一条消息都需要异步处理开启一个线程也不是很好,若处理每条消息处理时间都很长导致需要创建很多线程,而一个进程可以创建的线程数是有限的。这种情况就需要考虑处理逻辑、消息模式是否存在优化空间

问题四:比较下面的代码和上面代码的区别

很多小伙伴看不出这两个代码的区别,主要是假如一次 poll 拉取了若干条消息,循环处理时每处理一条消息提交一次偏移量和循环结束提交偏移量有什么区别?结果一样吗?过程一样吗?处理过程中异常恢复结果一样吗?回答这些问题本质就是提交偏移量到底是提交一个批次最后一条消息的偏移量还是每条消息的偏移量?答案是一次 poll 最后一个偏移量,也就是说两种方式最终的结果都是一样的,都可以把消费到的数据偏移量正确提交上去。

但是过程大不一样,我们可以发现如果每处理一条消息提交一次返回的回调信息并不是当前消息的偏移量而是最后一条消息的偏移量,也就是说假如一次 poll 拉取了 10 条数据,每处理一条消息提交一次偏移量其实是提交了十次第十条消息的偏移量,虽然最终结果一样,但如果处理过程中发生错误会产生数据的丢失。因此每处理一条消息提交一次的方式相当于先提交后消费,这种方式是会丢数据的。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐