记一次生产Kafka出口流量暴涨问题

告警

接到告警,业务的Kafka出口流量异常,这个kafka一直是订单类在用,平时插入的数据不会达到这个规模。其次,入口带宽很少,出口带宽这么多,掐指一算,大概率是消费的服务出现问题了。一般入口带宽和出口带宽 不是消费组特别多的情况下,两个的带宽相差不会太悬殊。

查找凶手

iftop 看了一下大致流量情况

iftop

发现主要流量流出到3.843.85这两台机

tcpdump -i eth0 host <kafka ip> and host <xxx.xxx.3.84> -w test.cap 抓取了几秒这两个主机相关的包

基本确定了流量使用多的端口,主要是 3.141:9093(Kafka) 发送给 3.84:15630 的流量比较多。

tcp stream

看了下(3.84:15630 -> 3.141:9093 )的内容 基本可以确定是哪个topic和消费组。

不抓包的话 用netstat来简单过滤下,如果两个主机间建立的连接比较少的话,基本也能定位出是哪个端口,如下

[root@xxx-xxx-3.141 ~]#netstat -ano|grep xxx.xxx.3.84
tcp        0      0 xxx.xxx.3.141:9093         xxx.xxx.3.84:15556         ESTABLISHED keepalive (4.24/0/0)
tcp        0 4128896 xxx.xxx.3.141:9093         xxx.xxx.3.84:15630         ESTABLISHED on (0.20/0/0)

接下来就需要找出3.84:15630是哪个服务监听的端口。

到3.84这台机执行 lsof -i:端口 得到进程ID

然后用 ps aux | grep 或者 lsof -p 找出具体进程信息。

[root@xxx.xxx.3.84 ~]#lsof -i:15630
COMMAND     PID USER   FD   TYPE   DEVICE SIZE/OFF NODE NAME
java    3047564 java  442u  IPv4 77454498      0t0  TCP xxx.xxx.3.84:15630->xxx.xxx.3.141:copycat (ESTABLISHED)
[root@xxx.xxx.3.84 ~]#ps aux|grep 3047564
root     2518948  0.0  0.0 112712   956 pts/2    S+   15:14   0:00 grep --color=auto 3047564
java     3047564 99.7 28.5 9016636 2212048 ?     Ssl  Jan14 8903:50 /usr/java/jdk1.8.0_202/bin/java <手动打码进程名>

确定了进程后,找到维护该服务的开发者,咨询了下,该服务有哪些消费组。和前面抓包的数据印证。

在kafka center观察了下,该消费组有少量积压的数据。其实topic几秒新增数据在个位数,但是发送的数据如此之多,基本断定消费者拿了消息,可能处理出现了问题,并没有返回ack。

拿着消费组名字去代码搜了搜相关消费者:

    public void consumer(@Payload String msg , Acknowledgment ack) throws IOException {
        Optional<String> kafkaMessage  = Optional.ofNullable(msg);
        if(kafkaMessage.isPresent()){
            String message = kafkaMessage.get();
            if(Optional.ofNullable(message).isPresent()){
                //开始判断业务内容数据
                //判断业务标识
                JsonNode jsonNode = objectMapper.readTree(message);
                String businessMark = jsonNode.get("business_mark").asText();
                if(!DUSINESS_MARK.equals(businessMark)){
                    ack.acknowledge();
                    return;
                }
                //判断操作类型
                String action = jsonNode.get("action").asText();
                if(!DiscussDataEnum.DISCUSS_DATA_ADD.getType().equals(action) && !DiscussDataEnum.DISCUSS_DATA_EDIT.getType().equals(action)){
                    ack.acknowledge();
                    return;
                }
                //开始序列化对象kafka原始报文
                DiscussOriginalInfo discussOriginalInfo = JacksonUtil.getObjectMapper().readValue(message, new TypeReference<DiscussOriginalInfo>(){});
                if(null == discussOriginalInfo){
                    return;
                }
                log.info("===========> discuss data message kafkaMessage = {}", message);
                //开始序列化评论内容报文
                DiscussContentInfo discussContentInfo = JacksonUtil.getObjectMapper().readValue(discussOriginalInfo.getContent(), new TypeReference<DiscussContentInfo>(){});
                //将数据字符串转化形成对象
                discussOriginalInfo.setDiscussContentInfo(discussContentInfo);
                //开始进入对队列处理器
                aliDiscussService.AliDiscussProccess(discussOriginalInfo);
            }
        }
        ack.acknowledge();
    }

简单看了下,有序列化和反序列相关的代码,这部分是相当容易出问题的,如果数据不符合规范,出问题就常见的。

而这里没有异常捕捉和数据合规校验相关代码,有些判断不符合预期,return的时候也没返回ack。

基本断定是因为数据处理异常,没有返回ack,导致消息重复被拉取消费,不断循环。

和相关开发同志反馈,最后加入相关异常捕捉代码,问题得以解决。

从下图可以看出,4点左右上线新版本后,断层式下降。

出口流量

但是也发现另外一个问题,貌似连接数有点高。平时大家都是创建了线程池来读写kafka,而据我所知连接这个kafka的程序并没有很多,怎么有高达9k左右的连接数?

连接数

带着介个问题,于是又有了另外一个故障处理,请看下一篇 《Kafka连接数过多问题排查》

公众号 尹安灿

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐