CDH中添加的Kafka消息不能同步[爬坑过程+总结解决]
首先通过这篇博客我们可以在CDH中安装好Kafka:CDH添加Kafka服务上面安装好Kafka已经可以创建Topic,并且Topic信息所搭建的集群均可以同步。后面因为还需要测试一下消息传输,发现开启了生产者与消费者后,在生产者发送消息到Topic时报超时,并且消费者也一直没有接受到数据。然后后面一直在解决这个问题到后面一直报异常输出 WARN clients.NetworkClient ,还.
首先通过这篇博客我们可以在CDH中安装好Kafka:CDH添加Kafka服务
上面安装好Kafka已经可以创建Topic,并且Topic信息所搭建的集群均可以同步。后面因为还需要测试一下消息传输,发现开启了生产者与消费者后,在生产者发送消息到Topic时报超时,并且消费者也一直没有接受到数据。然后后面一直在解决这个问题到后面一直报异常输出 WARN clients.NetworkClient ,还重新装kafka了三次。中间因为数据删除不干净连kafka都启动不了了,弄来弄去了三四天,今天终于解决了。总结一下,若也遇到这样的问题后来者提供一些解决思路。
首先先看一下kafka的 发布订阅系统的代理结构:
一、遇到问题
首先打开生产者:
kafka-console-producer --broker-list cluster2-1:2181,cluster2-2:2181,cluster2-3:2181,cluster2-4:2181 --topic test
现在再看这个指令挺可笑的。。。 kafka的端口号应该是9092,我这里跟zookeeper的端口号没理解一直弄混了,后面有提到。不过还是要把之前的问题还原一下,有端口问题的同学可以尽快改过来再试试可以成功了吗。
打开消费者:
kafka-console-consumer --bootstrap-server cluster2-1:2181,cluster2-2:2181,cluster2-3:2181,cluster2-4:2181 --topic test
然后我在生产者中输入消息:hahaha,结果报超时:
19/12/04 03:13:54 INFO utils.AppInfoParser: Kafka version: 2.2.1-kafka-4.1.0
19/12/04 03:13:54 INFO utils.AppInfoParser: Kafka commitId: unknown
>hahaha
19/12/04 03:15:13 ERROR internals.ErrorLoggingCallback: Error when sending message to topic test with key: null, value: 6 bytes with error:
org.apache.kafka.common.errors.TimeoutException: Topic test not present in metadata after 60000 ms.
>
消费者中没有收到任何数据:
二、开始百度爬坑
这些很多都是错误的解决办法,看着网上这样解决自己也就试试。
a.在CDH中添加角色实例,添加 Kafka 源集群和目标集群配置
然后进入下一步:
Destination Broker List :目标集群 Broker列表,需在目标集群先创建需要同步Topic
Source Broker List :源集群 Broker 列表
Topic Whitelist :需要同步的 Topics,支持正则。
在下面的Topic Whitelist属性里填写:^.*
然后在clouderaMANAGER中选择kafka的配置,输入properties搜索有关的配置,在 Gateway日志记录高级配置代码段 与 Kafka Broker高级配置代码段 中输入zookeeper信息:
zookeeper=cluster2-1:2181,cluster2-2:2181,cluster2-3:2181,cluster2-4:2181
这个信息我是直接在Xftp中的 /etc/kafka/conf.cloudera.kafka 路径下看 kafka-client.conf 文件:
然后又运行了一次:
还是超时不行。
b.然后看了一篇博客,发现了一个重要的问题,自己的端口号有问题,kafka与zookeeper的端口弄混,一直使用2181的端口去访问,找到 connect-standalone.properties 这个文件路径:
查看这个配置信息,看到这里的端口号是 9092:
然后我又试了一下,这回指令是正确了。
开始传递消息,启动生产者,在里面输入要传送给消费者的数据:
kafka-console-producer --broker-list cluster2-4:9092 --topic test
启动消费者接收数据:
kafka-console-consumer --bootstrap-server cluster2-3:9092 --topic test --from-beginning
结果不会报超时,但是一直在报警告,消息还是没有发送成功。网上也说kafka配置还是没配好。
报错信息:
>19/12/05 06:10:38 INFO clients.Metadata: Cluster ID: 08y0mdmIRquCVHLwoLlKLw
haha
19/12/05 06:10:44 WARN clients.NetworkClient: [Producer clientId=console-producer] 3 partitions have leader brokers without a matching listener, including [test-0, test-2, test-1]
>19/12/05 06:10:44 WARN clients.NetworkClient: [Producer clientId=console-producer] 3 partitions have leader brokers without a matching listener, including [test-0, test-2, test-1]
19/12/05 06:10:44 WARN clients.NetworkClient: [Producer clientId=console-producer] 3 partitions have leader brokers without a matching listener, including [test-0, test-2, test-1]
19/12/05 06:10:44 WARN clients.NetworkClient: [Producer clientId=console-producer] Error while fetching metadata with correlation id 8 : {test=LEADER_NOT_AVAILABLE}
19/12/05 06:10:45 WARN clients.NetworkClient: [Producer clientId=console-producer] 3 partitions have leader brokers without a matching listener, including [test-0, test-2, test-1]
······
还是不行。。。。继续爬,谁叫自己学的不精呢。
c.还有说需要在server.properties中添加监听,在里面添加:
advertised.listeners=PLAINTEXT://cluster2-3:9092
d.c的方法结果还是不可以,所以我也考虑了是不是kafka与CDH的版本兼容问题。
e.之前弄得太乱了,配置改了很多,最后一种办法重新安装配置。
后面重新安装了结果连kafka都不能启动了,后面知道要重装kafka需要把之前的数据删干净。
/var/local/kafka/data中的数据与zookeeper中的 brokers/topic 数据。
重装kafka清空之前的数据博客可以看这篇:
CDH中重装kafka——清空之前产生的数据(data文件数据与Zookeeper中的数据)
三、重新安装配置
在删干净之前的数据后,重新安装配置kafka。
首先记得在CDH中的kafka配置中将zook.chroot改为 /kafka:
然后创建topic,注意CDH的使用kafka不需要通过sh来调用,并且路径要到/kafka中:
kafka-topics --zookeeper cluster2-4:2181/kafka --create -replication-factor 1 --partitions 3 --topic test
可以查看/kafka下有无这个主题:
kafka-topics --list --zookeeper cluster2-4:2181/kafka
然后启动生产者:
kafka-console-producer --broker-list cluster2-4:9092 --topic test
然后在生产者中写入消息:
然后我在cluster2-3这个服务器中启动消费者:
kafka-console-consumer --bootstrap-server cluster2-3:9092 --topic test --from-beginning
可以看到消费者接收到了数据:
终于解决这个问题了!!
总结遇到问题还是不要网上看到一种方法就去试,还是多思考一下。启动不成功也许是因为文中提到的几个问题,这里汇总一下:
a.CDH中添加角色实例,添加 Kafka 源集群和目标集群配置;
b.看看是不是自己的端口号有问题,端口要和zookeeper的端口区别开;
c.还有也有可能需要在 server.properties 中添加监听;
d.考虑是不是因为 kafka 与 CDH 的版本兼容问题;
e.这些都还不能成功的话,还是跟我一样重新安装配置。
更多推荐
所有评论(0)