最近一个项目中使用到kafka作为分布式消息系统,于是网上找了些资料学习下kafka,在学习过程中遇到了如题的异常,google了下,找到了解决方法

实验环境

Tips:本机Mac系统

服务

  • zookeeper :单节点 进程ID:27318
  • kafka服务:集群 进程ID:27329、27345、27342
  • 消息发布者(控制台模式):进程ID:27373
  • 消息订阅者(控制台模式):进程ID:27376

如:

kuangaiyongs-MacBook-Pro:bin kay$ jps -l
27376 kafka.tools.ConsoleConsumer
27329 kafka.Kafka
27345 kafka.Kafka
27397 sun.tools.jps.Jps
27318 org.apache.zookeeper.server.quorum.QuorumPeerMain
27373 kafka.tools.ConsoleProducer
25294
27342 kafka.Kafka

topic信息

kuangaiyongs-MacBook-Pro:bin kay$ ./kafka-topics.sh --zookeeper localhost:2181 --describe
Topic:Multibrokerapplication    PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: Multibrokerapplication   Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 0,1,2
Topic:hello-kafka   PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: hello-kafka  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,0,2 Isr: 0,1,2
Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0

启动发布者控制台

kuangaiyongs-MacBook-Pro:bin kay$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test

my first message

在控制台输入my first message,抛出如下

[2018-06-25 09:52:05,348] ERROR Error when sending message to topic test with key: null, value: 0 bytes with error: Batch Expired (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
[2018-06-25 09:52:05,349] ERROR Error when sending message to topic test with key: null, value: 0 bytes with error: Batch Expired (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
[2018-06-25 09:52:05,349] ERROR Error when sending message to topic test with key: null, value: 0 bytes with error: Batch Expired (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
[2018-06-25 09:52:05,349] ERROR Error when sending message to topic test with key: null, value: 3 bytes with error: Batch Expired (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)

启动订阅者控制台

kuangaiyongs-MacBook-Pro:bin kay$ ./kafka-console-consumer.sh --zookeeper localhost:2181 --bootstrap-server localhost:9092 --from-beginning --topic test

确实也不能收到发布者发出的消息,并有如下异常抛出:

[2018-06-25 09:52:29,920] WARN Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(0,10.200.20.95,9092)] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:119)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

原因排查

检查服务启动情况

正常

检查端口

正常

检查配置

正常

检查输入是否有误

正常

以上检查都OK后,最后只能google了,最终在这个地方找到了解决方法,链接:https://community.cloudera.com/t5/Data-Ingestion-Integration/Error-when-sending-message-to-topic-in-Kafka/td-p/41440

topic

按照这个解决方法,检查了三台kafka服务的配置,配置原本是这样的:

listeners=PLAINTEXT://:9092
# The port the socket server listens on
#port=9092
listeners=PLAINTEXT://:9093
# The port the socket server listens on
port=9093
listeners=PLAINTEXT://:9094
# The port the socket server listens on
port=9094

按照解决方案修改配置后,重启各服务,验证发现异常没有了,发布者发布的消息,订阅者可以正常收到

附上修改后的部分配置内容:

listeners=PLAINTEXT://0.0.0.0:9092
# The port the socket server listens on
port=9092
listeners=PLAINTEXT://0.0.0.0:9093
# The port the socket server listens on
port=9093
listeners=PLAINTEXT://0.0.0.0:9094
# The port the socket server listens on
port=9094

原因分析

暂未分析为什么如此配置就OK了

总结

为了避免不必要的报错,如下两项配置内容建议按照这个来:

  • 监听配置加上对应的IP地址

  • port端口配置项前的#号去掉

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐