自动提交offset

第一步 配置文件修改


#–> 设置自动提交offset 设置为false
enable-auto-commit: false

spring:
  profiles:
    active: local
  kafka:
    consumer:
      # 指定kafka server的地址,集群配多个,中间,逗号隔开
      bootstrap-servers: 175.24.42.150:9092
      #如果'enable.auto.commit'true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
      auto-commit-interval: 100
      # smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
      auto-offset-reset: earliest
      #--> 设置自动提交offset
      enable-auto-commit: false
      # 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
      group-id: nashConsumerGroup
      # 指定消息key和消息体的编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
第二步 写手动方法
    @Bean
    /**
    * 写在接收kafka消息的 同一个类中就行了
    * 原先拉去数据是单个json 现在会是多个 list 我设置factory.setConcurrency(1)为一跳,
    * 但返回数据还是list。这里的 1条是你提交1次的数据。你一次提交了20条也是一个json里的所有 
    * 算做一条数据。
    */
    public KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory consumerFactory){
        ConcurrentKafkaListenerContainerFactory<Integer,String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        // 消费几条数据
        factory.setConcurrency(1);
        factory.getContainerProperties().setPollTimeout(1500);
        //设置为批量消费,每个批次数量在Kafka配置参数中设置
        factory.setBatchListener(true);
        //设置手动提交ackMode
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
第三步 手动提交
    /**
     *  消费 HIS 送检、送手术、标本数据
     * @param record
     * containerFactory="batchFactory" 关联到刚刚那个方法
     * ack.acknowledge(); 告知kafka消费成功 
     */
    @KafkaListener(topics = "his_hospital_1", containerFactory="batchFactory")
    public void consumerHisData(List<ConsumerRecord<?, ?>> record, Acknowledgment ack){
        try{
            logConsumerOffsetService.logOffset(record.get(0));
            // 获取接收的数据
            orderService.save(record.get(0).value().toString());
            // 告知kafka消费成功
            ack.acknowledge();
        }catch (Exception e){
            String logInfo = String.format("\"consumerHisData error,data:%s. /n", record.get(0).value());
            sysErrorLogService.saveExceptionLog(e,logInfo);
        }
    }

kafka发送数据
kafka接收数据

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐