首先,说明一下情况,开发环境可以正常消费消息的,到了测试环境,刚开始一两天还能正常消费消息,但是某一天突然,消费不到消息,日志再没打印收到消息的情况。

一.下载OffsetExplorer2工具

官网下载地址:https://www.kafkatool.com/download.html

1.配置kafak连接地址

在consumer下找到自己的消费者组:

说明:目前已经修复该问题,lag为0,在未修复的时候消息的堆积量是大于0的;正常情况下offset+lag=end;

从以上可以排查到消息,投递成功,但是迟迟未消费。

二.服务器命令消费

./kafka-console-consumer.sh --bootstrap-server ip --topic topic名 --group group名

通过在服务器上运行上面的命令发现可以正常消费消息,此时在怀疑是不是客户端版本原因,经过查看,kafak服务端版本2.0.0低于客户端版本,客户端版本是2.5.3;

三.新建消费者demo

这里相关配置就不贴了,主要看降低版本后消费者是否能消费消息

package com.lgh.redis;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * 描述
 *
 * @author LGH
 * createTime 2022-07-04 11:48:30
 * lastModify LGH
 * lastModifyTime 2022-07-04 11:48:30
 * group 开发小组
 */
@Slf4j
@Component
public class DemoKafkaListener {
    @KafkaListener(topics = "TOPIC", groupId = "GROUP_TOPIC")
    public void receiveMessage(List<String> messages) {
        for (String message : messages) {
            log.info("kafka接收到消息:{}", message);
        }
    }
}

启动发现真能收到消息,至此怀疑是客户端版本原因,虽然理由有点牵强,因为在同样的客户端下有其它topic收到了消息。感觉还是没啥头绪,知道昨天晚上,我去看日志,发现有大约一分钟的时间,这个消费者收到了消息,但是在那段时间日志打出了这么一段日志:

Attempt to heartbeat failed since group is rebalancing

这是kafka重平衡机制,这里不做过多赘述,主要是说明问题排查的过程,相当于是有新的消费者加入了,但是查看nacos注册中心,又只有这一台服务器呀。

四.服务器查看客户端的ip地址

./kafka-consumer-groups.sh --bootstrap-server ip:端口 --group 消费者组名 --describe

经过上面的命令查看,发现客户端ip并非我nacos那台服务器的ip,然后到这个客户端ip的服务器上查看日志,发现程序一直在报错,并且kafka一直在重复推消息,都没有注册进到nacos里。接下来的操作就是重启正确注册进nacos即可,此时消费者终于能正常消费消息了,说到底还是队友惹的祸,不过也感谢队友,通过这一次让我对kafka有了更深的认识。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐