本人菜鸡一只,该文章会比较短,而且没有比较详细的报错和图片,但是我想解决问题的思路还是可以分享下的!

公司有一个kafka集群,我接手做了些文字匹配的东西之后,好久都没人用过了。

然后最近公司想做统一日志的收集,打算开启一个消费者来处理多个项目的日志,结果就发生了如下的问题!

 

19/06/12 10:25:37 WARN clients.NetworkClient: 
[Producer clientId=console-producer] Error while fetching metadata with correlation id 62 : {my_test_topic=LEADER_NOT_AVAILABLE}

我发现有人的报错和我一样,报错就是类似这个:https://www.orchome.com/1108

 

情况大概是这样的,需要检查几个地方:

1、kafka服务是否正常?

-1.我们kafka服务是通过CM启动的,可以在CM上面查看运行情况(我这个报错,CM上看是正常的)

-2.可以去到具体启动kafka的那几台机器上通过:jps -ml来查看(jps -ml可以查看该broker启动的时候我读取哪个配置文件),这个很关键!因为如果服务用A配置文件启动,结果你改的是B配置文件,就算你把B配置文件改上天了,都不会生效的!!

 

2、是否是自己的代码或者其他东西写错了?

当你确认服务正常的时候,那么可以使用自带的生产者和消费者来测试,因为这个是可以确保生产者和消费者没问题的一个点(因为你手写的生产者或者消费者代码有可能会有问题啊!!!)

测试命令如下:

1、创建topic
kafka-topics.sh --create --zookeeper zk1:2181 --replication-factor 1 --partitions 1 --topic mytest_topic


2、开启生产者:
kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092... --topic mytest_topic 
 

3-1、消费者操作:
kafka-console-consumer.sh --broker-list kafka1:9092,kafka2:9092...  --from-beginning --topic mytest_topic
3-2、也可以这么开启消费者:
bin/kafka-console-consumer.sh --zookeeper zk1:2181 --topic topic mytest_topic

结果我就是在这一步开启生产者要发数据的时候报错了!!!

 

3、如何解决该报错?

当我通过前两步确认了你的环境没什么大问题,但是却发送不了数据,我开始静下心来看报错!

1、Error while fetching metadata (获取元数据异常)

2、LEADER_NOT_AVAILABLE (无法获取该topic的leader)

也就是说,很可能是注册在zookeeper上的信息有问题!通过新创建topic,也不能发送消息的情况,我确认了,这个异常跟topic没有什么关系,所以肯定还是broker的问题!

因此我回想起了之前的事情:

【kafka】报错:advertised.listeners参数的重要性(外部访问局域网kafka):https://blog.csdn.net/lsr40/article/details/84135959

这个集群的advertised.listeners参数,还是外网的ip和端口,是不是有可能这个外网ip和端口被运维关掉了的原因,导致节点之间通讯异常,topic选不出leader呢(因为topic是通过leader来与生产者和消费者交互的,这部分知识大家可以自行百度,或者查阅我的kafka的相关文章)

果然经过我的确认,确实那个ip和外网端口已经关闭,所以我将kafka的配置文件进行修改:

advertised.listeners改成内网ip(默认当前机器的ip)

advertised.port改成内网端口(默认9092)

重启kafka服务,就修好了该kafka环境

 

总结:当遇到一些报错的时候,还是要一点一点的有目的的找,不要盲目的乱试(当然这需要对该框架有一定了解的情况下),大部分报错是环境问题,其次才是代码问题!

 

好了,本文还满短的,主要是为了把这个解决问题的思路和方法记录下来,菜鸡一只,下次再见拜拜~

(题外话:最近工作,天天跑数据跑的头都疼了!!!!嘎嘎嘎嘎)

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐