在服务器上搭建了一个单机环境的kafka broker,在服务器上使用命令生产消息时,一切正常。当在本地使用JAVA程序发送消息时,一直出错。
抛出的错误为:
Exception in thread "main" Failed to send requests for topics test with correlation ids in [0,12]
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.

问题追踪

最初怀疑是防火墙限制了端口,因此在本地使用telnet连接服务器端口,发现无法连接,因此关闭服务器防火墙。重试,还是抛同样的问题。
后来查看kafka日志,发现TCP连接可以正常关闭,而且IP也是客户端的IP,证明网络没问题,客户端确实可以连上kafka服务器。
那到底是什么问题呢?
通过查看kafka配置,发现有个属性:advertised.host.name。官方文档里的备注信息表明,该字段的值是生产者和消费者使用的。如果没有设置,则会取host.name的值,默认情况下,该值为localhost。思考一下,如果生产者拿到localhost这个值,只往本地发消息,必然会报错(因为本地没有kafka服务器)。

问题处理

将advertised.host.name设置为服务器IP地址,经测试,消息顺利发送。

总结

其实,在生产者的日志中,也看到先连接kafka服务器,然后关闭;然后又连接了本地,再关闭。
Connected to 192.168.56.101:9092 for producing
Disconnecting from 192.168.56.101:9092
Connected to localhost:9092 for producing
Disconnecting from localhost:9092
但是因为不了解kafka的配置信息,所以也没仔细分析。

另外,在配置生产者时,metadata.broker.list会设置成kafka服务器的IP和地址。但这个只是获取一些元信息,后续发送消息时会根据获取的元信息来发送,而获取得元信息中,由于advertised.host.name被默认为localhost,所以本地当然会把消息发到本地,结果导致问题出现。


Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐