首先通过这篇博客我们可以在CDH中安装好Kafka:CDH添加Kafka服务
上面安装好Kafka已经可以创建Topic,并且Topic信息所搭建的集群均可以同步。后面因为还需要测试一下消息传输,发现开启了生产者与消费者后,在生产者发送消息到Topic时报超时,并且消费者也一直没有接受到数据。然后后面一直在解决这个问题到后面一直报异常输出 WARN clients.NetworkClient ,还重新装kafka了三次。中间因为数据删除不干净连kafka都启动不了了,弄来弄去了三四天,今天终于解决了。总结一下,若也遇到这样的问题后来者提供一些解决思路。

首先先看一下kafka的 发布订阅系统的代理结构:

一、遇到问题

首先打开生产者:

kafka-console-producer --broker-list cluster2-1:2181,cluster2-2:2181,cluster2-3:2181,cluster2-4:2181 --topic test

现在再看这个指令挺可笑的。。。 kafka的端口号应该是9092,我这里跟zookeeper的端口号没理解一直弄混了,后面有提到。不过还是要把之前的问题还原一下,有端口问题的同学可以尽快改过来再试试可以成功了吗。

打开消费者:

kafka-console-consumer --bootstrap-server cluster2-1:2181,cluster2-2:2181,cluster2-3:2181,cluster2-4:2181 --topic test

然后我在生产者中输入消息:hahaha,结果报超时:

19/12/04 03:13:54 INFO utils.AppInfoParser: Kafka version: 2.2.1-kafka-4.1.0
19/12/04 03:13:54 INFO utils.AppInfoParser: Kafka commitId: unknown
>hahaha
19/12/04 03:15:13 ERROR internals.ErrorLoggingCallback: Error when sending message to topic test with key: null, value: 6 bytes with error:
org.apache.kafka.common.errors.TimeoutException: Topic test not present in metadata after 60000 ms.
>

消费者中没有收到任何数据:

二、开始百度爬坑

这些很多都是错误的解决办法,看着网上这样解决自己也就试试。

a.在CDH中添加角色实例,添加 Kafka 源集群和目标集群配置

然后进入下一步:

Destination Broker List :目标集群 Broker列表,需在目标集群先创建需要同步Topic

Source Broker List :源集群 Broker 列表

Topic Whitelist :需要同步的 Topics,支持正则。

在下面的Topic Whitelist属性里填写:^.*

然后在clouderaMANAGER中选择kafka的配置,输入properties搜索有关的配置,在 Gateway日志记录高级配置代码段 与 Kafka Broker高级配置代码段 中输入zookeeper信息:
zookeeper=cluster2-1:2181,cluster2-2:2181,cluster2-3:2181,cluster2-4:2181
这个信息我是直接在Xftp中的 /etc/kafka/conf.cloudera.kafka 路径下看 kafka-client.conf 文件:

然后又运行了一次:

还是超时不行。

b.然后看了一篇博客,发现了一个重要的问题,自己的端口号有问题,kafka与zookeeper的端口弄混,一直使用2181的端口去访问,找到 connect-standalone.properties 这个文件路径:

查看这个配置信息,看到这里的端口号是 9092

然后我又试了一下,这回指令是正确了。

开始传递消息,启动生产者,在里面输入要传送给消费者的数据:

kafka-console-producer --broker-list cluster2-4:9092 --topic test

启动消费者接收数据:

kafka-console-consumer --bootstrap-server cluster2-3:9092 --topic test --from-beginning

结果不会报超时,但是一直在报警告,消息还是没有发送成功。网上也说kafka配置还是没配好。
报错信息:

>19/12/05 06:10:38 INFO clients.Metadata: Cluster ID: 08y0mdmIRquCVHLwoLlKLw
haha
19/12/05 06:10:44 WARN clients.NetworkClient: [Producer clientId=console-producer] 3 partitions have leader brokers without a matching listener, including [test-0, test-2, test-1]
>19/12/05 06:10:44 WARN clients.NetworkClient: [Producer clientId=console-producer] 3 partitions have leader brokers without a matching listener, including [test-0, test-2, test-1]
19/12/05 06:10:44 WARN clients.NetworkClient: [Producer clientId=console-producer] 3 partitions have leader brokers without a matching listener, including [test-0, test-2, test-1]
19/12/05 06:10:44 WARN clients.NetworkClient: [Producer clientId=console-producer] Error while fetching metadata with correlation id 8 : {test=LEADER_NOT_AVAILABLE}
19/12/05 06:10:45 WARN clients.NetworkClient: [Producer clientId=console-producer] 3 partitions have leader brokers without a matching listener, including [test-0, test-2, test-1]
······

还是不行。。。。继续爬,谁叫自己学的不精呢。

c.还有说需要在server.properties中添加监听,在里面添加:

advertised.listeners=PLAINTEXT://cluster2-3:9092

d.c的方法结果还是不可以,所以我也考虑了是不是kafka与CDH的版本兼容问题。

e.之前弄得太乱了,配置改了很多,最后一种办法重新安装配置。

后面重新安装了结果连kafka都不能启动了,后面知道要重装kafka需要把之前的数据删干净。

/var/local/kafka/data中的数据与zookeeper中的 brokers/topic 数据

重装kafka清空之前的数据博客可以看这篇:

CDH中重装kafka——清空之前产生的数据(data文件数据与Zookeeper中的数据)

三、重新安装配置

在删干净之前的数据后,重新安装配置kafka。

首先记得在CDH中的kafka配置中将zook.chroot改为 /kafka:

然后创建topic,注意CDH的使用kafka不需要通过sh来调用,并且路径要到/kafka中:

kafka-topics --zookeeper cluster2-4:2181/kafka --create -replication-factor 1 --partitions 3 --topic test

可以查看/kafka下有无这个主题:

kafka-topics --list --zookeeper cluster2-4:2181/kafka

然后启动生产者:

kafka-console-producer --broker-list cluster2-4:9092 --topic test

然后在生产者中写入消息:

然后我在cluster2-3这个服务器中启动消费者:

kafka-console-consumer --bootstrap-server cluster2-3:9092 --topic test --from-beginning

可以看到消费者接收到了数据:

终于解决这个问题了!!

总结遇到问题还是不要网上看到一种方法就去试,还是多思考一下。启动不成功也许是因为文中提到的几个问题,这里汇总一下:

a.CDH中添加角色实例,添加 Kafka 源集群和目标集群配置;

b.看看是不是自己的端口号有问题,端口要和zookeeper的端口区别开;

c.还有也有可能需要在 server.properties 中添加监听;

d.考虑是不是因为 kafka 与 CDH 的版本兼容问题;

e.这些都还不能成功的话,还是跟我一样重新安装配置。

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐