在做业务需求的中,由于对kafka 不够熟悉,导致出现了事故,通过这篇博客与大家分享一下在做整个需求,以及解决问题的过程。做个复盘总结,以免今后再犯错。

一、需求:订阅kafka 队列消息。将数据调用外部http接口同步过去。

接到这需求,第一时间反应很简单,无非要做两件事情。1、订阅消费指定kafka topic 消息 2、调用外部http 接口。
以下图为初始方案伪代码:
在这里插入图片描述
仔细琢磨,发现这样写有问题。老铁们有没看出有什么问题?

问题:1、直接消费kafka 消息调用外部接口,没做任何限流控制,当流量大的时候,很容易打垮下游接口。
2、http 接口请求响应需要时间。假如直接同步单线程处理,主线程需要等待响应后,才会ack 。这样当kafka 消息量大的时候必然会出现堆积。

方案优化伪代码如下:1、限流控制采用的是谷歌guava 框架 RateLimiter。 2、使用线程数异步发送请求。
在这里插入图片描述

本以为这方案非常完美。上线灰度时候瞬间打脸。kafka 消息消费太慢出现堆积。

在这里插入图片描述

猜想:为什么会出现堆积呢?发http 请求用线程池,难道是线程数不够?任务堆积?
于是将核心线程数调整到200. 最大线程数400. 验证依旧消费不过来堆积。
看来不是线程数的问题?那是什么问题呢?研究许久没有答案,果断求助小组大佬。

大佬果然是大佬,粗略看了下后就知道什么问题。巴拉巴拉~~

原来是consumer 采用来单条记录消费方式,每次拉取一条,消费后再拉。
这里应该改成批量拉取的方式即可解决。

在这里插入图片描述
fetch-min-bytes

改成批量后,消息可正常消费没有堆积。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐