spring-kafka的官方文档介绍,可以知道自1.1版本之后,

@KafkaListener开始支持批量消费,只需要设置batchListener参数为true

把application.yml中的enable-auto-commit设置为false,设置为不自动提交

 

复制代码

@Bean
public KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory consumerFactory){
    ConcurrentKafkaListenerContainerFactory<Integer,String> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setConcurrency(10);
    factory.getContainerProperties().setPollTimeout(1500);
    factory.setBatchListener(true);//设置为批量消费,每个批次数量在Kafka配置参数中设置
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//设置手动提交ackMode
  return factory; 
}

复制代码

复制代码

   //批量消息
    @KafkaListener(topics = {"first_top"},containerFactory="batchFactory")
    public void consumerBatch(List<ConsumerRecord<?, ?>> records, Acknowledgment ack){
        log.info("接收到消息数量:{}",record.size());
       //手动提交
      ack.acknowledge();
}

复制代码

这里containerFactory = “batchFactory”要指定为批量消费

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐