1、在使用Java API访问之前先在安装有kafka,且在各个节点启动服务

bin/kafka-server-start.sh config/server.properties &

2、报如下错误

props.put("metadata.broker.list", "storm1:9092,storm2:9092,storm3:9092"); 
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
props.put("zookeeper.connect", "storm1:2181,storm2:2181,storm3:2181/kafka")

Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)

分析:报以上错误,我最初写的时候是写ip地址的,按理说如果写ip地址的话,是不需要在hosts文件里配置的,网上查了查,人家说需要配置hosts,所以配置了hosts之后就解决了。

3、查看某一topic下有哪些数据

bin/kafka-console-consumer.sh --zookeeper localhost:2181--topic test002 --from-beginning

4、报错

Exception in thread "main" kafka.common.ConsumerRebalanceFailedException: flume-kafka-group-id_panguoyuan-1426066079371-91d6eea6 can't rebalance after 4 retries
    at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
    at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)
    at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212)
    at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:80)
    at com.panguoyuan.kafa.consumer.KafkaConsumer.consume(KafkaConsumer.java:52)
    at com.panguoyuan.kafa.consumer.KafkaConsumer.main(KafkaConsumer.java:62)

解决办法:把zookeeper.sync.time.ms的值调大一点

5.每个topic下有多个partition,不同的partition有各自的leader,每台broker都可能是某个partition的leader。kafka只有leader才能接受读写请求,多个partition可以让多个consumer并发操作





Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐