场景:

kafka消费端应用部署在两台机器上,其中一台能消费到生产端发出的kafka消息,另一台服务器接收不到任何消息。

解决过程:

一、从消费端启动日志中找出所有消费端线程

2019-04-23 20:04:44,726 [xx_xxapp03-1556011171628-976bc2af_watcher_executor] INFO  kafka.consumer.RangeAssignor -
                [Ip:|User:]Consumer xx_xxapp03-1556011171628-976bc2af rebalancing the following partitions: ArrayBuffer(0, 1) for topic xx_import_data with consumers: List(xx_xxapp03-1556011171628-976bc2af-0, xx_xxapp03-1556021084370-e7a7a47d-0, xx_xxapp04-1556011055427-2927d364-0)
2019-04-23 20:04:44,726 [xx_xxapp03-1556011171628-976bc2af_watcher_executor] INFO  kafka.consumer.RangeAssignor -
                [Ip:|User:]xx_xxapp03-1556011171628-976bc2af-0 attempting to claim partition 0

二、分析启动日志

从以上日志中第二行可以看出有三个消费端线程xx_xxapp03-1556011171628-976bc2af-0, xx_xxapp03-1556021084370-e7a7a47d-0,xx_xxapp04-1556011055427-2927d364-0消费两个分区partitions: ArrayBuffer(0, 1)。

我们知道kafka的一个partition 只能让一个消费者线程消费,那么排在最后的消费端线程肯定消费不到消息,因为只有2个partition已经分别被前两个消费线程消费了,如果前两个消费线程和最后一个消费线程分别位于两台不同的机器上,则有一台机器消费不到kafka消息,问题就是出自这里。

三、修改kafka消费线程生产相关的代码

public void startConsumer() {
        consumerConnector = consumerConnectorFactory.createConsumerConnector(zookeeperConnect, groupId);
        LOGGER.info("CSIRAS 启动 Kafka topic xx_import_data Kafka Consumer连接");
        // topic信息容器
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        //cosumer线程数
        int consumerThreadNum = partitionNum / serverNum != 0 ? partitionNum / serverNum : 1;
        topicCountMap.put(topic, consumerThreadNum);
        // 获取消息流
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
        // 获取消息列表
        List<KafkaStream<byte[], byte[]>> kafkaStreamList = consumerMap.get(topic);

        ......
}

int consumerThreadNum = partitionNum / serverNum != 0 ? partitionNum / serverNum : 1;

通过上面的代码控制每个服务器应用上的消费线程数,保证每个应用不会生产过多无效的消费线程。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐