在写公司需求的时候碰到一个需求,需要对不同集群下的多个kafka消费者topic进行消费,以前都是在yml文件中配置一个消费者。最多也就是同一集群不同topic,没想到这次是不同集群,一下子犯了难,查资料,请教同事后算是找到了解决方法,其实也很简单,配置多个kafka消费者配置即可。
配置文件如下:(不同集群的不同消费者和不同生产者)

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

/**
 * 配置不同集群下的多个kafka消费者和生产者
 */
@Configuration
@EnableConfigurationProperties({})
public class KafkaConfig {

    @Value("${spring.kafka.auditText.bootstrap-servers}")
    private String auditTextSend;

    @Value("${spring.kafka.auditTextResult.bootstrap-servers}")
    private String auditTextResult;

    @Value("${spring.kafka.authorStatus.bootstrap-servers}")
    private String authorStatus;

    @Value("${spring.kafka.articleInfo.bootstrap-servers}")
    private String articleInfo;

    @Value("${spring.kafka.consumer.concurrency}")
    private Integer concurrency;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoReset;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private Boolean autoCommit;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private Integer maxPoll;


    private Map<String, Object> producerConfigs(String servers) {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    private DefaultKafkaConsumerFactory<Object, Object> createConsumerFactory(String servers) {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoReset);
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPoll);
        return new DefaultKafkaConsumerFactory<>(configs);
    }

    private ConcurrentKafkaListenerContainerFactory<String, String> createListenerContainerFactory(ConsumerFactory<Object, Object> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setBatchListener(true);
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }


    private ProducerFactory<String, String> producerAuditTextSendFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs(auditTextSend));
    }

    @Bean(name = "auditTextSendKafkaTemplate")
    public KafkaTemplate<String, String> AuditTextSendKafkaTemplate() {
        return new KafkaTemplate<>(producerAuditTextSendFactory());
    }

    private ConsumerFactory<Object, Object> auditResultConsumerFactory() {
        return createConsumerFactory(auditTextResult);
    }

    private ConsumerFactory<Object, Object> articleInfoConsumerFactory() {
        return createConsumerFactory(articleInfo);
    }

    private ConsumerFactory<Object, Object> authorStatusConsumerFactory() {
        return createConsumerFactory(authorStatus);
    }

    @Bean(name = "auditResultConsumer")
    public ConcurrentKafkaListenerContainerFactory<String, String> auditResultContainerFactory() {
        return createListenerContainerFactory(auditResultConsumerFactory());
    }

    @Bean(name = "articleInfoConsumer")
    public ConcurrentKafkaListenerContainerFactory<String, String> articleInfoContainerFactory() {
        return createListenerContainerFactory(articleInfoConsumerFactory());
    }

    @Bean(name = "authorStatusConsumer")
    public ConcurrentKafkaListenerContainerFactory<String, String> authorStatusContainerFactory() {
        return createListenerContainerFactory(authorStatusConsumerFactory());
    }
}

消费监听用的是@KafkaListener注解,注解中有参数可以配置,例如下边这种。多个topic需要写多个class,这样自己看着也清晰,每个class对应一个监听的topic。

    @KafkaListener(containerFactory = "articleInfoConsumer", topics = {"${spring.kafka.articleInfoTopic}"}, groupId = "${spring.kafka.consumer.group-id}")

生产者更简单,直接实例化不同的Bean即可。向多个集群的topic发送消息配置文件如下,里边的每一个bean都是一个单独的集群,bean可以直接调用send方法,参数是json消息和topic。

@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.rewardOrderProducer.bootstrap-servers}")
    private String rewardOrderProducerForBeiYanServers;

    @Value("${spring.kafka.asyncDataProducer.bootstrap-servers}")
    private String asyncDataProducerServer;

    @Value("${spring.kafka.asyncDataProducerForBeiyan.bootstrap-servers}")
    private String asyncDataProducerForBeiYanServers;


    private Map<String, Object> producerConfigs(String servers) {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    private ProducerFactory<String, String> producerAsyncDataProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs(asyncDataProducerServer));
    }

    @Bean(name = "asyncDataProducerKafkaTemplate")
    public KafkaTemplate<String, String> asyncDataProducerKafkaTemplate() {
        return new KafkaTemplate<>(producerAsyncDataProducerFactory());
    }

    private ProducerFactory<String, String> producerBeiYanFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs(asyncDataProducerForBeiYanServers));
    }

    @Bean(name = "BeiYanKafkaTemplate")
    public KafkaTemplate<String, String> mpKafkaTemplate() {
        return new KafkaTemplate<>(producerBeiYanFactory());
    }

    private ProducerFactory<String, String> producerOrderFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs(rewardOrderProducerForBeiYanServers));
    }

    @Bean(name = "rewardOrderKafkaTemplate")
    public KafkaTemplate<String, String> rewardOrderKafkaTemplate() {
        return new KafkaTemplate<>(producerOrderFactory());
    }

}
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐