配置文件

replenish:
  kafka:
    bootstrapServers: 服务器地址
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: false
      auto-offset-reset: latest
      max-poll-interval: 1200000
      max-poll-records: 2 # 一次取多条,listener.type 要设置成 batch
      isolation-level: read_committed
    listener:
      ack-mode: manual
      type: batch

 properties类

package com.geely.algo.conf.kafka;

import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Component;


@Component
@ConfigurationProperties(prefix = "replenish.kafka")
@Getter
@Setter
public class ReplenishKafkaProperties {
    private String bootstrapServers;

    private Listener listener;

    private Consumer consumer;
}

@Data
class Listener {
    /**
     * Listener type.
     */
    private KafkaProperties.Listener.Type type;

    /**
     * Listener AckMode. See the spring-kafka documentation.
     */
    private ContainerProperties.AckMode ackMode;
}

@Data
class Consumer{
    private Boolean enableAutoCommit;

    private String autoOffsetReset;

    private String maxPollRecords;

    private String maxPollInterval;

    private String isolationLevel;

    private String keyDeserializer;

    private String valueDeserializer;
}

自定义config类

package com.geely.algo.conf.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

@Component
public class KafkaListenerConfig {

    @Autowired
    private ReplenishKafkaProperties replenishKafkaProperties;

    @Bean("replenishListenerContainerFactory")
    public KafkaListenerContainerFactory replenishListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        //设置手动提交
        factory.getContainerProperties().setAckMode(replenishKafkaProperties.getListener().getAckMode());
        //设置批量获取数据的标识
        if(Objects.equals(replenishKafkaProperties.getListener().getType(), KafkaProperties.Listener.Type.BATCH) ){
            factory.setBatchListener(true);
        }
        return factory;
    }

    /**
     * consumer属性注入ConsumerFactory
     * @return
     */
    private ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    /**
     * consumer属性设置
     * @return
     */
    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>(8);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, replenishKafkaProperties.getBootstrapServers());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, replenishKafkaProperties.getConsumer().getEnableAutoCommit());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, replenishKafkaProperties.getConsumer().getAutoOffsetReset());
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, replenishKafkaProperties.getConsumer().getIsolationLevel());
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, replenishKafkaProperties.getConsumer().getMaxPollInterval());
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, replenishKafkaProperties.getConsumer().getMaxPollRecords());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
}

 引用类

package com.geely.algo.article.kafka.consumer;

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
@Slf4j
public class ReplenishBusinessConsumer {


    @KafkaListener(
            topics = {"#{'${kafka.replenish.consumer.topic}'}"},
            groupId = "#{'${kafka.replenish.consumer.group}'}",
            containerFactory = "replenishListenerContainerFactory")
    public void onMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment ack){


        System.out.println(records.size());
        System.out.println(JSON.toJSONString(records));
        ack.acknowledge();
    }
}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐