spring-kafka入门学习(四):消息监听器容器ConcurrentMessageListenerContainer测试示例
目录一、前言二、测试准备1.kafka客户端配置2.SpringBoot配置三、消费者分配情况测试1.使用默认的PartitionAssignor ->RangeAssignor2.使用PartitionAssignor ->RoundRobinAssignor四、性能测试1.情景一 单线程消费2.情景二 并发消费五、总结一、前言在官网学习spring-kafka的过程中,Messag
目录
1.使用默认的PartitionAssignor -> RangeAssignor
2.使用PartitionAssignor -> RoundRobinAssignor
一、前言
在官网学习spring-kafka的过程中,MessageListenerContainer不易理解,实践出真知,故通过敲代码实测来加深理解。
官网链接:https://docs.spring.io/spring-kafka/docs/2.2.13.RELEASE/reference/html/#message-listener-container
通过配置MessageListenerContainer和提供消息侦听器或使用@KafkaListener批注来接收消息。
MessageListenerContainer提供了两种实现:
KafkaMessageListenerContainer
ConcurrentMessageListenerContainer
该KafkaMessageListenerContainer接收在单个线程从所有的主题或分区上的所有消息。所述ConcurrentMessageListenerContainer的一个或多个代表KafkaMessageListenerContainer实例,以提供多线程消耗。
ConcurrentMessageListenerContainer构造函数
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
对于这个构造函数,Kafka使用其组管理功能在用户之间分配分区。
@KafkaListener可以配置明确的主题和分区(本文示例中不要配置分区,因为要测试分区分配策略)
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
官方教程中有如下内容:
When listening to multiple topics, the default partition distribution may not be what you expect. For example, if you have three topics with five partitions each and you want to use concurrency=15
, you see only five active consumers, each assigned one partition from each topic, with the other 10 consumers being idle. This is because the default Kafka PartitionAssignor
is the RangeAssignor
(see its Javadoc). For this scenario, you may want to consider using the RoundRobinAssignor
instead, which distributes the partitions across all of the consumers. Then, each consumer is assigned one topic or partition. To change the PartitionAssignor
, you can set the partition.assignment.strategy
consumer property (ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
) in the properties provided to the DefaultKafkaConsumerFactory
.
注意:本文针对这段内容进行测试
二、测试准备
1.kafka客户端配置
修改kafka的配置文件server.properties(目录D:\ProgramFiles\kafka_2.12\config),配置五个分区
num.partitions=5
重启kafka
注意:如果启动报错可以清空这2个目录
2.SpringBoot配置
本文代码在上一篇博客《spring-kafka入门学习(三):使用SpringBoot发送并接收消息》的基础上更改。
配置线程数是15
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(15);//例如,container.setConcurrency(3)创建三个KafkaMessageListenerContainer实例。
return factory;
}
配置监听器
@KafkaListener(id = "Listener", topics = {"topic_1", "topic_2", "topic_3"}, groupId = "consumer_group_1")
public void listen1(String foo) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
logger.info(sdf.format(new Date()) + " - Listener-接收消息:" + foo);
Thread.sleep(1000 * 2);
}
消息发送接口
@RequestMapping(value = "test2")
public String test2() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//每个topic发送5条数据,每条数据在一个分区
for (int i = 0; i < 5; i++) {
//send方法的参数(String topic, Integer partition, K key, @Nullable V data)
kafkaTemplate.send("topic_1", i, 0, "topic_1_" + i);
kafkaTemplate.send("topic_2", i, 0, "topic_2_" + i);
kafkaTemplate.send("topic_3", i, 0, "topic_3_" + i);
kafkaTemplate.flush();
}
return "test";
}
三、消费者分配情况测试
1.使用默认的PartitionAssignor -> RangeAssignor
启动项目,整理出如下日志:
只会看到五个活动的使用者,每个消费者都为每个主题分配了一个分区,而其他10个消费者处于空闲状态。
2020-05-12 14:30:24.917 INFO 13152 --- [Listener-10-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-12, groupId=consumer_group_1] Setting newly assigned partitions [topic_1-2, topic_2-2, topic_3-2]
2020-05-12 14:30:24.917 INFO 13152 --- [ Listener-7-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-9, groupId=consumer_group_1] Setting newly assigned partitions []
2020-05-12 14:30:24.917 INFO 13152 --- [ Listener-4-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-6, groupId=consumer_group_1] Setting newly assigned partitions []
2020-05-12 14:30:24.917 INFO 13152 --- [Listener-13-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-15, groupId=consumer_group_1] Setting newly assigned partitions []
2020-05-12 14:30:24.918 INFO 13152 --- [ Listener-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-5, groupId=consumer_group_1] Setting newly assigned partitions []
2020-05-12 14:30:24.918 INFO 13152 --- [ Listener-5-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-7, groupId=consumer_group_1] Setting newly assigned partitions []
2020-05-12 14:30:24.918 INFO 13152 --- [ Listener-9-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11, groupId=consumer_group_1] Setting newly assigned partitions [topic_1-1, topic_2-1, topic_3-1]
2020-05-12 14:30:24.918 INFO 13152 --- [ Listener-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=consumer_group_1] Setting newly assigned partitions []
2020-05-12 14:30:24.918 INFO 13152 --- [ Listener-8-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10, groupId=consumer_group_1] Setting newly assigned partitions [topic_1-0, topic_2-0, topic_3-0]
2020-05-12 14:30:24.918 INFO 13152 --- [Listener-12-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-14, groupId=consumer_group_1] Setting newly assigned partitions [topic_1-4, topic_2-4, topic_3-4]
2020-05-12 14:30:24.918 INFO 13152 --- [ Listener-6-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-8, groupId=consumer_group_1] Setting newly assigned partitions []
2020-05-12 14:30:24.919 INFO 13152 --- [ Listener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=consumer_group_1] Setting newly assigned partitions []
2020-05-12 14:30:24.919 INFO 13152 --- [Listener-11-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-13, groupId=consumer_group_1] Setting newly assigned partitions [topic_1-3, topic_2-3, topic_3-3]
2020-05-12 14:30:24.920 INFO 13152 --- [Listener-14-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-16, groupId=consumer_group_1] Setting newly assigned partitions []
2020-05-12 14:30:24.920 INFO 13152 --- [ Listener-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=consumer_group_1] Setting newly assigned partitions []
2.使用PartitionAssignor ->
RoundRobinAssignor
为每个消费者分配一个主题或分区。要更改PartitionAssignor
,可以在提供给的属性中设置partition.assignment.strategy
使用者属性(ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
)DefaultKafkaConsumerFactory
。
Kafka默认的 PartitionAssignor
是RangeAssignor,需要设置成
RoundRobinAssignor。
只需要修改一个地方
重启项目,整理出如下日志:
可以看出没有空闲的消费者
2020-05-12 15:02:57.306 INFO 13720 --- [ Listener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=consumer_group_1] Setting newly assigned partitions [topic_2-2]
2020-05-12 15:02:57.306 INFO 13720 --- [ Listener-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=consumer_group_1] Setting newly assigned partitions [topic_2-4]
2020-05-12 15:02:57.306 INFO 13720 --- [ Listener-8-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10, groupId=consumer_group_1] Setting newly assigned partitions [topic_1-0]
2020-05-12 15:02:57.306 INFO 13720 --- [ Listener-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=consumer_group_1] Setting newly assigned partitions [topic_2-3]
2020-05-12 15:02:57.306 INFO 13720 --- [ Listener-9-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11, groupId=consumer_group_1] Setting newly assigned partitions [topic_1-1]
2020-05-12 15:02:57.306 INFO 13720 --- [Listener-11-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-13, groupId=consumer_group_1] Setting newly assigned partitions [topic_1-3]
2020-05-12 15:02:57.306 INFO 13720 --- [ Listener-6-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-8, groupId=consumer_group_1] Setting newly assigned partitions [topic_3-3]
2020-05-12 15:02:57.306 INFO 13720 --- [Listener-14-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-16, groupId=consumer_group_1] Setting newly assigned partitions [topic_2-1]
2020-05-12 15:02:57.307 INFO 13720 --- [ Listener-4-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-6, groupId=consumer_group_1] Setting newly assigned partitions [topic_3-1]
2020-05-12 15:02:57.308 INFO 13720 --- [Listener-12-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-14, groupId=consumer_group_1] Setting newly assigned partitions [topic_1-4]
2020-05-12 15:02:57.308 INFO 13720 --- [ Listener-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-5, groupId=consumer_group_1] Setting newly assigned partitions [topic_3-0]
2020-05-12 15:02:57.310 INFO 13720 --- [Listener-10-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-12, groupId=consumer_group_1] Setting newly assigned partitions [topic_1-2]
2020-05-12 15:02:57.312 INFO 13720 --- [ Listener-7-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-9, groupId=consumer_group_1] Setting newly assigned partitions [topic_3-4]
2020-05-12 15:02:57.313 INFO 13720 --- [Listener-13-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-15, groupId=consumer_group_1] Setting newly assigned partitions [topic_2-0]
2020-05-12 15:02:57.315 INFO 13720 --- [ Listener-5-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-7, groupId=consumer_group_1] Setting newly assigned partitions [topic_3-2]
四、性能测试
启动SpringBoot,访问接口:http://127.0.0.1:8080/test2
监听器睡眠2s模拟业务耗时,方便日志分析
@KafkaListener(id = "Listener", topics = {"topic_1", "topic_2", "topic_3"}, groupId = "consumer_group_1")
public void listen1(String foo) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
logger.info(sdf.format(new Date()) + " - Listener-接收消息:" + foo);
Thread.sleep(1000 * 2);
}
1.情景一 单线程消费
factory.setConcurrency(1);
消费结果:消费性能比较差(每2秒消费一条消息)
2020-05-12 15:15:15.223 INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:15:15 - Listener-接收消息:topic_1_0
2020-05-12 15:15:17.224 INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:15:17 - Listener-接收消息:topic_1_4
2020-05-12 15:15:19.224 INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:15:19 - Listener-接收消息:topic_1_3
2020-05-12 15:15:21.224 INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:15:21 - Listener-接收消息:topic_2_4
2020-05-12 15:15:23.225 INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:15:23 - Listener-接收消息:topic_1_2
2020-05-12 15:15:25.225 INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:15:25 - Listener-接收消息:topic_2_3
2020-05-12 15:15:27.226 INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:15:27 - Listener-接收消息:topic_3_4
2020-05-12 15:15:29.227 INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:15:29 - Listener-接收消息:topic_1_1
2020-05-12 15:15:31.227 INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:15:31 - Listener-接收消息:topic_2_2
2020-05-12 15:15:33.228 INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:15:33 - Listener-接收消息:topic_3_3
2020-05-12 15:15:35.229 INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:15:35 - Listener-接收消息:topic_2_1
2020-05-12 15:15:37.230 INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:15:37 - Listener-接收消息:topic_3_2
2020-05-12 15:15:39.230 INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:15:39 - Listener-接收消息:topic_2_0
2020-05-12 15:15:41.231 INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:15:41 - Listener-接收消息:topic_3_1
2020-05-12 15:15:43.232 INFO 17188 --- [ Listener-0-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:15:43 - Listener-接收消息:topic_3_0
2.情景二 并发消费
factory.setConcurrency(15);
消费结果:消费性能很好
2020-05-12 15:12:56.225 INFO 13720 --- [ Listener-0-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:12:56 - Listener-接收消息:topic_2_2
2020-05-12 15:12:56.225 INFO 13720 --- [Listener-10-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:12:56 - Listener-接收消息:topic_1_2
2020-05-12 15:12:56.225 INFO 13720 --- [Listener-12-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:12:56 - Listener-接收消息:topic_1_4
2020-05-12 15:12:56.225 INFO 13720 --- [ Listener-2-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:12:56 - Listener-接收消息:topic_2_4
2020-05-12 15:12:56.225 INFO 13720 --- [ Listener-8-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:12:56 - Listener-接收消息:topic_1_0
2020-05-12 15:12:56.225 INFO 13720 --- [ Listener-5-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:12:56 - Listener-接收消息:topic_3_2
2020-05-12 15:12:56.225 INFO 13720 --- [ Listener-6-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:12:56 - Listener-接收消息:topic_3_3
2020-05-12 15:12:56.225 INFO 13720 --- [ Listener-4-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:12:56 - Listener-接收消息:topic_3_1
2020-05-12 15:12:56.225 INFO 13720 --- [ Listener-7-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:12:56 - Listener-接收消息:topic_3_4
2020-05-12 15:12:56.225 INFO 13720 --- [ Listener-1-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:12:56 - Listener-接收消息:topic_2_3
2020-05-12 15:12:56.225 INFO 13720 --- [ Listener-9-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:12:56 - Listener-接收消息:topic_1_1
2020-05-12 15:12:56.225 INFO 13720 --- [ Listener-3-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:12:56 - Listener-接收消息:topic_3_0
2020-05-12 15:12:56.225 INFO 13720 --- [Listener-11-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:12:56 - Listener-接收消息:topic_1_3
2020-05-12 15:12:56.225 INFO 13720 --- [Listener-14-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:12:56 - Listener-接收消息:topic_2_1
2020-05-12 15:12:56.225 INFO 13720 --- [Listener-13-C-1] com.asyf.demo.config.Listener : 2020-05-12 15:12:56 - Listener-接收消息:topic_2_0
五、总结
使用并发消费可以提高消息的消费速率,但是要注意以下问题:
1.使用并发消息侦听器容器时,将在所有消费者线程上调用一个侦听器实例。因此,侦听器必须是线程安全的。
2.不在同一个分区的消息不能做到顺序读取。
更多推荐
所有评论(0)