引言

以此记录一下在使用 Kafka 的过程中遇到的一些古怪问题
因为是初学,所以遇到的问题还不算多,之后会慢慢更新(大概 👀)


生产者相关

记录一下有关 Producer 的一些问题

Topic 不存在

问题描述:org.apache.kafka.common.errors.TimeoutException: Topic xxx not present in metadata after xxx ms.

很显然,意思是 Topic 不存在于 metadata 导致了 Producer 发送超时
这个问题倒是很常见,但是它就怪在:导致这个问题出现的原因千奇百怪。比如:

  • Kafka 中没有这个 Topic,并且设置了不自动创建 Topic
  • Topic 存在,但是要发送的消息所指定的 Partition 信息有错误
  • 甚至于程序中某某依赖包发生了冲突也可能导致了这个错误(StackOverflow 上看到了一个贴子,但是还没验证过)

总之原因就是很奇葩,以上所说的这些问题根源都可以通过在网上搜索一些博客、贴子来解决

问题出现的原因

在这里要说的是另一个导致了这个问题的原因:未开放 Kafka 的相关端口
当时发现是这个原因时,我也是哭笑不得,因为这个原因和这个问题的描述(Topic xxx not presen…)没有半毛钱关系,最开始遇到这个问题的时候也没有往这方面想,导致卡了很久
这里稍微吐槽一下 Kafka 的 Java 客户端,异常描述太笼统了,导致 debug 的时候没有什么头绪

问题的解决方案

既然找到了是未开放端口引起的问题,那么解决起来也很简单:开放对应的端口呗。两种方案:

  • 暂时性开放:
# Kafka 的默认端口是 9092,可以选择暂时性开放
firewall-cmd --zone=public --add-port=9092/tcp
  • 永久性开放
# Kafka 的默认端口是 9092,可以添加 --permanent 选项来永久性开放
firewall-cmd --zone=public --add-port=9092/tcp --permanent
# 重启防火墙
systemctl restart firewalld.service
# 重载配置文件
firewall-cmd --reload

开放端口之后,问题就解决了

Batch 中的消息过期

问题描述:Expiring 1 record(s) for xxx-0:120000 ms has passed since batch creation

问题描述中的 xxx 指对应的 Topic。很明显,是说自批次创建以来,某某主题中的 1 条记录过期了
要理解这个问题,得先了解 Kafka 的生产者发送消息的原理:

“为了提高效率,消息被分批次写入 Kafka。批次就是一组消息,这些消息属于同一个主题和分区。”
													  ———— 来自《Kafka权威指南》

所以,Producer 调用了 send 方法后,消息并不会被立即发送,而是会待在某个批次中之后再一起发送。批次中的消息过期了,说明该批次并没有被发送给 Kafka,消息发送出现了问题

问题出现的原因

针对这个问题,我也在网上了搜了一些博看来看,意识到有可能是 Kafka 的一些配置出现了问题。之前在查看修改 Kafka 的配置文件(server.properties)的时候,留了一个心眼,有意的记住了 listenersadvertised.listeners 这两项配置。然后我设置修改了一下这两项配置,果然问题就解决了

问题的解决方案

一开始我是没有对 listenersadvertised.listeners 这两项配置做修改的,维持了默认的样子,如下:

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://your.host.name:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

可以看到这两项配置默认是不启用的,被注释掉了
后来我开放了这两项配置:

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://xx.xx.xx.xx:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://xx.xx.xx.xx:9092

上面的 xx.xx.xx.xx 都是指你的 Kafka 服务器的 IP 地址,也就是说 listenersadvertised.listeners 的值是设置成一样的
然后再用 Producer 发送消息,就 ok 了


最后

文章有什么错误之处欢迎(麻烦)各位指出 🙆‍♂️

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐