一次关于Kafka消息不消费的记录
kafka消息投递成功,消费者端一直消费不成功
首先,说明一下情况,开发环境可以正常消费消息的,到了测试环境,刚开始一两天还能正常消费消息,但是某一天突然,消费不到消息,日志再没打印收到消息的情况。
一.下载OffsetExplorer2工具
官网下载地址:https://www.kafkatool.com/download.html
1.配置kafak连接地址
在consumer下找到自己的消费者组:
说明:目前已经修复该问题,lag为0,在未修复的时候消息的堆积量是大于0的;正常情况下offset+lag=end;
从以上可以排查到消息,投递成功,但是迟迟未消费。
二.服务器命令消费
./kafka-console-consumer.sh --bootstrap-server ip --topic topic名 --group group名
通过在服务器上运行上面的命令发现可以正常消费消息,此时在怀疑是不是客户端版本原因,经过查看,kafak服务端版本2.0.0低于客户端版本,客户端版本是2.5.3;
三.新建消费者demo
这里相关配置就不贴了,主要看降低版本后消费者是否能消费消息
package com.lgh.redis;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 描述
*
* @author LGH
* createTime 2022-07-04 11:48:30
* lastModify LGH
* lastModifyTime 2022-07-04 11:48:30
* group 开发小组
*/
@Slf4j
@Component
public class DemoKafkaListener {
@KafkaListener(topics = "TOPIC", groupId = "GROUP_TOPIC")
public void receiveMessage(List<String> messages) {
for (String message : messages) {
log.info("kafka接收到消息:{}", message);
}
}
}
启动发现真能收到消息,至此怀疑是客户端版本原因,虽然理由有点牵强,因为在同样的客户端下有其它topic收到了消息。感觉还是没啥头绪,知道昨天晚上,我去看日志,发现有大约一分钟的时间,这个消费者收到了消息,但是在那段时间日志打出了这么一段日志:
Attempt to heartbeat failed since group is rebalancing
这是kafka重平衡机制,这里不做过多赘述,主要是说明问题排查的过程,相当于是有新的消费者加入了,但是查看nacos注册中心,又只有这一台服务器呀。
四.服务器查看客户端的ip地址
./kafka-consumer-groups.sh --bootstrap-server ip:端口 --group 消费者组名 --describe
经过上面的命令查看,发现客户端ip并非我nacos那台服务器的ip,然后到这个客户端ip的服务器上查看日志,发现程序一直在报错,并且kafka一直在重复推消息,都没有注册进到nacos里。接下来的操作就是重启正确注册进nacos即可,此时消费者终于能正常消费消息了,说到底还是队友惹的祸,不过也感谢队友,通过这一次让我对kafka有了更深的认识。
更多推荐
所有评论(0)