目录

一、前言

二、测试准备

1.kafka客户端配置

2.SpringBoot配置

三、消费者分配情况测试

1.使用默认的PartitionAssignor -> RangeAssignor

2.使用PartitionAssignor -> RoundRobinAssignor

四、性能测试

1.情景一 单线程消费

2.情景二 并发消费

五、总结


一、前言

在官网学习spring-kafka的过程中,MessageListenerContainer不易理解,实践出真知,故通过敲代码实测来加深理解。

官网链接:https://docs.spring.io/spring-kafka/docs/2.2.13.RELEASE/reference/html/#message-listener-container

通过配置MessageListenerContainer和提供消息侦听器或使用@KafkaListener批注来接收消息。

MessageListenerContainer提供了两种实现:
KafkaMessageListenerContainer
ConcurrentMessageListenerContainer

该KafkaMessageListenerContainer接收在单个线程从所有的主题或分区上的所有消息。所述ConcurrentMessageListenerContainer的一个或多个代表KafkaMessageListenerContainer实例,以提供多线程消耗。

ConcurrentMessageListenerContainer构造函数

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)

对于这个构造函数,Kafka使用其组管理功能在用户之间分配分区。

@KafkaListener可以配置明确的主题和分区(本文示例中不要配置分区,因为要测试分区分配策略

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

官方教程中有如下内容:

When listening to multiple topics, the default partition distribution may not be what you expect. For example, if you have three topics with five partitions each and you want to use concurrency=15, you see only five active consumers, each assigned one partition from each topic, with the other 10 consumers being idle. This is because the default Kafka PartitionAssignor is the RangeAssignor (see its Javadoc). For this scenario, you may want to consider using the RoundRobinAssignor instead, which distributes the partitions across all of the consumers. Then, each consumer is assigned one topic or partition. To change the PartitionAssignor, you can set the partition.assignment.strategyconsumer property (ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG) in the properties provided to the DefaultKafkaConsumerFactory.

注意:本文针对这段内容进行测试

二、测试准备

1.kafka客户端配置

修改kafka的配置文件server.properties(目录D:\ProgramFiles\kafka_2.12\config),配置五个分区

num.partitions=5

重启kafka

注意:如果启动报错可以清空这2个目录

2.SpringBoot配置

本文代码在上一篇博客《spring-kafka入门学习(三):使用SpringBoot发送并接收消息》的基础上更改。

配置线程数是15

  @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(15);//例如,container.setConcurrency(3)创建三个KafkaMessageListenerContainer实例。
        return factory;
    }

配置监听器

    @KafkaListener(id = "Listener", topics = {"topic_1", "topic_2", "topic_3"}, groupId = "consumer_group_1")
    public void listen1(String foo) throws Exception {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        logger.info(sdf.format(new Date()) + " - Listener-接收消息:" + foo);
        Thread.sleep(1000 * 2);
    }

消息发送接口

    @RequestMapping(value = "test2")
    public String test2() {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        //每个topic发送5条数据,每条数据在一个分区
        for (int i = 0; i < 5; i++) {
            //send方法的参数(String topic, Integer partition, K key, @Nullable V data)
            kafkaTemplate.send("topic_1", i, 0, "topic_1_" + i);
            kafkaTemplate.send("topic_2", i, 0, "topic_2_" + i);
            kafkaTemplate.send("topic_3", i, 0, "topic_3_" + i);
            kafkaTemplate.flush();
        }
        return "test";
    }

三、消费者分配情况测试

1.使用默认的PartitionAssignor -> RangeAssignor

启动项目,整理出如下日志:

只会看到五个活动的使用者,每个消费者都为每个主题分配了一个分区,而其他10个消费者处于空闲状态。

2020-05-12 14:30:24.917  INFO 13152 --- [Listener-10-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-12, groupId=consumer_group_1] Setting newly assigned partitions [topic_1-2, topic_2-2, topic_3-2]
2020-05-12 14:30:24.917  INFO 13152 --- [ Listener-7-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-9, groupId=consumer_group_1] Setting newly assigned partitions []
2020-05-12 14:30:24.917  INFO 13152 --- [ Listener-4-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-6, groupId=consumer_group_1] Setting newly assigned partitions []
2020-05-12 14:30:24.917  INFO 13152 --- [Listener-13-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-15, groupId=consumer_group_1] Setting newly assigned partitions []
2020-05-12 14:30:24.918  INFO 13152 --- [ Listener-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-5, groupId=consumer_group_1] Setting newly assigned partitions []
2020-05-12 14:30:24.918  INFO 13152 --- [ Listener-5-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-7, groupId=consumer_group_1] Setting newly assigned partitions []
2020-05-12 14:30:24.918  INFO 13152 --- [ Listener-9-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11, groupId=consumer_group_1] Setting newly assigned partitions [topic_1-1, topic_2-1, topic_3-1]
2020-05-12 14:30:24.918  INFO 13152 --- [ Listener-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-3, groupId=consumer_group_1] Setting newly assigned partitions []
2020-05-12 14:30:24.918  INFO 13152 --- [ Listener-8-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10, groupId=consumer_group_1] Setting newly assigned partitions [topic_1-0, topic_2-0, topic_3-0]
2020-05-12 14:30:24.918  INFO 13152 --- [Listener-12-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-14, groupId=consumer_group_1] Setting newly assigned partitions [topic_1-4, topic_2-4, topic_3-4]
2020-05-12 14:30:24.918  INFO 13152 --- [ Listener-6-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-8, groupId=consumer_group_1] Setting newly assigned partitions []
2020-05-12 14:30:24.919  INFO 13152 --- [ Listener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=consumer_group_1] Setting newly assigned partitions []
2020-05-12 14:30:24.919  INFO 13152 --- [Listener-11-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-13, groupId=consumer_group_1] Setting newly assigned partitions [topic_1-3, topic_2-3, topic_3-3]
2020-05-12 14:30:24.920  INFO 13152 --- [Listener-14-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-16, groupId=consumer_group_1] Setting newly assigned partitions []
2020-05-12 14:30:24.920  INFO 13152 --- [ Listener-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=consumer_group_1] Setting newly assigned partitions []

2.使用PartitionAssignor -> RoundRobinAssignor

为每个消费者分配一个主题或分区。要更改PartitionAssignor,可以在提供给的属性中设置partition.assignment.strategy使用者属性(ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIGDefaultKafkaConsumerFactory

Kafka默认的 PartitionAssignorRangeAssignor,需要设置成RoundRobinAssignor。

只需要修改一个地方

重启项目,整理出如下日志:

可以看出没有空闲的消费者

2020-05-12 15:02:57.306  INFO 13720 --- [ Listener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=consumer_group_1] Setting newly assigned partitions [topic_2-2]
2020-05-12 15:02:57.306  INFO 13720 --- [ Listener-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=consumer_group_1] Setting newly assigned partitions [topic_2-4]
2020-05-12 15:02:57.306  INFO 13720 --- [ Listener-8-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10, groupId=consumer_group_1] Setting newly assigned partitions [topic_1-0]
2020-05-12 15:02:57.306  INFO 13720 --- [ Listener-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-3, groupId=consumer_group_1] Setting newly assigned partitions [topic_2-3]
2020-05-12 15:02:57.306  INFO 13720 --- [ Listener-9-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11, groupId=consumer_group_1] Setting newly assigned partitions [topic_1-1]
2020-05-12 15:02:57.306  INFO 13720 --- [Listener-11-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-13, groupId=consumer_group_1] Setting newly assigned partitions [topic_1-3]
2020-05-12 15:02:57.306  INFO 13720 --- [ Listener-6-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-8, groupId=consumer_group_1] Setting newly assigned partitions [topic_3-3]
2020-05-12 15:02:57.306  INFO 13720 --- [Listener-14-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-16, groupId=consumer_group_1] Setting newly assigned partitions [topic_2-1]
2020-05-12 15:02:57.307  INFO 13720 --- [ Listener-4-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-6, groupId=consumer_group_1] Setting newly assigned partitions [topic_3-1]
2020-05-12 15:02:57.308  INFO 13720 --- [Listener-12-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-14, groupId=consumer_group_1] Setting newly assigned partitions [topic_1-4]
2020-05-12 15:02:57.308  INFO 13720 --- [ Listener-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-5, groupId=consumer_group_1] Setting newly assigned partitions [topic_3-0]
2020-05-12 15:02:57.310  INFO 13720 --- [Listener-10-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-12, groupId=consumer_group_1] Setting newly assigned partitions [topic_1-2]
2020-05-12 15:02:57.312  INFO 13720 --- [ Listener-7-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-9, groupId=consumer_group_1] Setting newly assigned partitions [topic_3-4]
2020-05-12 15:02:57.313  INFO 13720 --- [Listener-13-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-15, groupId=consumer_group_1] Setting newly assigned partitions [topic_2-0]
2020-05-12 15:02:57.315  INFO 13720 --- [ Listener-5-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-7, groupId=consumer_group_1] Setting newly assigned partitions [topic_3-2]

四、性能测试

启动SpringBoot,访问接口:http://127.0.0.1:8080/test2

监听器睡眠2s模拟业务耗时,方便日志分析

 @KafkaListener(id = "Listener", topics = {"topic_1", "topic_2", "topic_3"}, groupId = "consumer_group_1")
    public void listen1(String foo) throws Exception {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        logger.info(sdf.format(new Date()) + " - Listener-接收消息:" + foo);
        Thread.sleep(1000 * 2);
    }

1.情景一 单线程消费

factory.setConcurrency(1);

消费结果:消费性能比较差(每2秒消费一条消息)

2020-05-12 15:15:15.223  INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:15:15 - Listener-接收消息:topic_1_0
2020-05-12 15:15:17.224  INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:15:17 - Listener-接收消息:topic_1_4
2020-05-12 15:15:19.224  INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:15:19 - Listener-接收消息:topic_1_3
2020-05-12 15:15:21.224  INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:15:21 - Listener-接收消息:topic_2_4
2020-05-12 15:15:23.225  INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:15:23 - Listener-接收消息:topic_1_2
2020-05-12 15:15:25.225  INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:15:25 - Listener-接收消息:topic_2_3
2020-05-12 15:15:27.226  INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:15:27 - Listener-接收消息:topic_3_4
2020-05-12 15:15:29.227  INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:15:29 - Listener-接收消息:topic_1_1
2020-05-12 15:15:31.227  INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:15:31 - Listener-接收消息:topic_2_2
2020-05-12 15:15:33.228  INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:15:33 - Listener-接收消息:topic_3_3
2020-05-12 15:15:35.229  INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:15:35 - Listener-接收消息:topic_2_1
2020-05-12 15:15:37.230  INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:15:37 - Listener-接收消息:topic_3_2
2020-05-12 15:15:39.230  INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:15:39 - Listener-接收消息:topic_2_0
2020-05-12 15:15:41.231  INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:15:41 - Listener-接收消息:topic_3_1
2020-05-12 15:15:43.232  INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:15:43 - Listener-接收消息:topic_3_0

2.情景二 并发消费

factory.setConcurrency(15);

消费结果:消费性能很好

2020-05-12 15:12:56.225  INFO 13720 --- [ Listener-0-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:12:56 - Listener-接收消息:topic_2_2
2020-05-12 15:12:56.225  INFO 13720 --- [Listener-10-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:12:56 - Listener-接收消息:topic_1_2
2020-05-12 15:12:56.225  INFO 13720 --- [Listener-12-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:12:56 - Listener-接收消息:topic_1_4
2020-05-12 15:12:56.225  INFO 13720 --- [ Listener-2-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:12:56 - Listener-接收消息:topic_2_4
2020-05-12 15:12:56.225  INFO 13720 --- [ Listener-8-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:12:56 - Listener-接收消息:topic_1_0
2020-05-12 15:12:56.225  INFO 13720 --- [ Listener-5-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:12:56 - Listener-接收消息:topic_3_2
2020-05-12 15:12:56.225  INFO 13720 --- [ Listener-6-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:12:56 - Listener-接收消息:topic_3_3
2020-05-12 15:12:56.225  INFO 13720 --- [ Listener-4-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:12:56 - Listener-接收消息:topic_3_1
2020-05-12 15:12:56.225  INFO 13720 --- [ Listener-7-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:12:56 - Listener-接收消息:topic_3_4
2020-05-12 15:12:56.225  INFO 13720 --- [ Listener-1-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:12:56 - Listener-接收消息:topic_2_3
2020-05-12 15:12:56.225  INFO 13720 --- [ Listener-9-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:12:56 - Listener-接收消息:topic_1_1
2020-05-12 15:12:56.225  INFO 13720 --- [ Listener-3-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:12:56 - Listener-接收消息:topic_3_0
2020-05-12 15:12:56.225  INFO 13720 --- [Listener-11-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:12:56 - Listener-接收消息:topic_1_3
2020-05-12 15:12:56.225  INFO 13720 --- [Listener-14-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:12:56 - Listener-接收消息:topic_2_1
2020-05-12 15:12:56.225  INFO 13720 --- [Listener-13-C-1] com.asyf.demo.config.Listener            : 2020-05-12 15:12:56 - Listener-接收消息:topic_2_0

五、总结

使用并发消费可以提高消息的消费速率,但是要注意以下问题:

1.使用并发消息侦听器容器时,将在所有消费者线程上调用一个侦听器实例。因此,侦听器必须是线程安全的。

2.不在同一个分区的消息不能做到顺序读取。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐