kafka多线程并发消费,批量消费
注:上篇博客说了怎么搭建kafka集群,并实现springboot集成kafka实现消息单条消费,但是如果有场景需要批量消费,那么就需要对消费者进行配置了,并可以实现并发消费,即多个KafkaListener同时消费消息,以此达到批量消费和提高速度的目的。1.上消费者配置@Configuration@EnableKafkapublic class KafkaConsumerConfig {@Val
·
注:上篇博客说了怎么搭建kafka集群,并实现springboot集成kafka实现消息单条消费,但是如果有场景需要批量消费,那么就需要对消费者进行配置了,并可以实现并发消费,即多个KafkaListener同时消费消息,以此达到批量消费和提高速度的目的。
1.上消费者配置
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String service;
@Value("${spring.kafka.consumer.group-id}")
private String groupid;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private String autoCommit;
@Value("${spring.kafka.consumer.auto-commit-interval}")
private String interval;
// 默认发送心跳时间为10000ms,超时时间需要大于发送心跳时间
@Value("10000")
private String timeout;
@Value("${spring.kafka.consumer.key-deserializer}")
private String keyDeserializer;
@Value("${spring.kafka.consumer.value-deserializer}")
private String valueDeserializer;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String offsetReset;
/**
* 获取kafka配置
* @return 配置map
*/
private Map<String,Object> consumerConfig(){
Map<String,Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,service);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, interval);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,timeout);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,offsetReset);
return props;
}
/**
* 获取工厂
* @return kafka工厂
*/
private ConsumerFactory<String,String> consumerFactory(){
Map<String, Object> props = consumerConfig();
// 日志过滤入库一批量为1500条消息
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1500);// 单次poll的数量,批量消费时配置
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
/**
* 实时推送使用的消费者工厂
* @return kafka消费者工厂
*/
private ConsumerFactory<String,String> infoPushConsumerFactory(){
Map<String, Object> props = consumerConfig();
// 实时推送单次批量拉取数据设置为150
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,150);
return new DefaultKafkaConsumerFactory<>(props);
}
/**
* 获取kafka实例,该方法为单条消费
* @return kafka实例
*/
@Bean(name = "kafkaListenerContainerFactory1")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>> kafkaListenerContainerFactory1(){
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1); // 连接池中消费者数量
factory.getContainerProperties().setPollTimeout(4000); //拉取topic的超时时间
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 关闭ack自动提交偏移
System.out.println("调用的是自定义消费者池");
return factory;
}
/**
* 获取kafka实例,该实例为批量消费
* @return kafka实例
*/
@Bean(name = "kafkaListenerContainerFactory2")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>> kafkaListenerContainerFactory2(){
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(2); // 连接池中消费者数量
factory.setBatchListener(true); // 是否并发消费
factory.getContainerProperties().setPollTimeout(4000); //拉取topic的超时时间
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 关闭ack自动提交偏移
return factory;
}
/**
* 实时推送获取的kafka实例,该实例为批量消费
* @return kafka实例
*/
@Bean(name = "infoPushKafkaListenerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>> infoPushKafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(infoPushConsumerFactory());
factory.setConcurrency(10); // 连接池中消费者数量
factory.setBatchListener(true); // 是否并发消费
factory.getContainerProperties().setPollTimeout(4000); // 拉取topic的超时时间
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); // 每次poll之前提交一次偏移
// factory.getConsumerFactory().getConfigurationProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,50); // 单次poll的数量
return factory;
}
}
上述配置中,有多个消费者实例,在使用时指定bean名称即可
factory.setBatchListener(true)这行代码开启并发消费
factory.setConcurrency(2)这行代码声明有几个KafkaListener同时监听,达到多线程目的
如何手动管理偏移量?
先将自动提交设置为false
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);//这里的autoCommit为变量,我设置的false
然后设置消费工厂的提交模式
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 关闭ack自动提交偏移
//
AckMode 如下:
RECORD :当listener一读到消息,就提交offset
BATCH : poll() 函数读取到的所有消息,就提交offset
TIME : 当超过设置的ackTime ,即提交Offset
COUNT :当超过设置的COUNT,即提交Offset
COUNT_TIME :TIME和COUNT两个条件都满足,提交offset
MANUAL : Acknowledgment.acknowledge()即提交Offset,和Batch类似
MANUAL_IMMEDIATE: Acknowledgment.acknowledge()被调用即提交Offset
设置完后,可以再listen中进行回调,手动提交偏移量
//批量消费
@KafkaListener(topics = {"topicone"},containerFactory="kafkaListenerContainerFactory2")
public void batchConsumer(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
System.out.println("此线程消费"+records.size()+"条消息----线程名:"+Thread.currentThread().getName());
records.forEach(record -> System.out.println("topic名称:"+record.topic()+"\n"+"分区位置:"+record.partition()+"\n"+"key:"+record.key()+"\n"+"偏移量:"+record.offset()+"\n"+"消息内容:"+record.value()));
ack.acknowledge();
}
下面看结果:
topicone 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topicone, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=key2, value=[{"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"core","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"encrypt","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"front","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}], timestamp=null), recordMetadata=topicone-1@45]
topicone 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topicone, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=key3, value=[{"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"core","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"encrypt","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"front","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}], timestamp=null), recordMetadata=topicone-1@46]
topicone 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topicone, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=key4, value=[{"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"core","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"encrypt","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"front","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}], timestamp=null), recordMetadata=topicone-1@47]
topicone 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topicone, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=key5, value=[{"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"core","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"encrypt","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"front","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}], timestamp=null), recordMetadata=topicone-1@48]
topicone 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topicone, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=key0, value=[{"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"core","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"encrypt","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"front","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}], timestamp=null), recordMetadata=topicone-0@37]
topicone 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topicone, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=key1, value=[{"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"core","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"encrypt","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"front","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}], timestamp=null), recordMetadata=topicone-0@38]
此线程消费4条消息----线程名:org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1
此线程消费2条消息----线程名:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1
更多推荐
已为社区贡献2条内容
所有评论(0)