1、多实例配置原理和单实例配置的区别

项目中如果使用了不同实例的kafka就需要多配置,单个实例的话使用springboot的yaml配置自动装配即可。

1.1 单实例

spring:
  kafka:
    bootstrap-servers: server
    consumer:
      group-id: 消费者你的groupId
      enable-auto-commit: false
      auto-offset-reset: earliest
      #用于链接带密码的kafka  配置,如果kafka没有密码需要注释掉
      properties:
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="用户" password="密码";
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #根据需要配置生产者还是消费者
	#producer:
    listener:
      ack-mode: manual
      concurrency: 1

1.2 多kafka实例

  • yaml配置
spring:
  kafka:
    bootstrap-servers: server
    consumer:
      group-id: 消费者你的groupId
      enable-auto-commit: false
      auto-offset-reset: earliest
      #用于链接带密码的kafka  配置,如果kafka没有密码需要注释掉
      properties:
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="用户" password="密码";
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #根据需要配置生产者还是消费者
	#producer:
    listener:
      ack-mode: manual
      concurrency: 1

  kafka1:
    bootstrap-servers: server1
    consumer:
      group-id: 消费者你的groupId
      enable-auto-commit: false
      auto-offset-reset: earliest
      #用于链接带密码的kafka  配置,如果kafka没有密码需要注释掉
      properties:
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="server1的用户" password="密码";
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #根据需要配置生产者还是消费者
	#producer:
    listener:
      ack-mode: manual
      concurrency: 1

此处第一个kafka会被spring自动装配,所以只需要对kafka2进行配置即可。

  • Java配置

注意最好手动指定数据源factory的bean名称,在消费者端会用到。

package com.cmbchina.archman.mprogramcore.config;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.util.StringUtils;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfiguration {
    @Value("${spring.kafka2.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka2.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka2.consumer.properties.security.protocol}")
    private String kafkaSecurityProtocol;
    @Value("${spring.kafka2.consumer.properties.sasl.mechanism}")
    private String kafkaSASLMechanism;
    @Value("${spring.kafka2.consumer.properties.sasl.jaas.config}")
    private String kafkaConsumerSASLJaasConfig;
	
	//此处最好手动指定数据源factory的bean名称,在消费者端会用到
    @Bean("myKafkaListenerContainerFactory")
    public KafkaListenerContainerFactory<?> batchFactory(){
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new
                ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        factory.setBatchListener(true); // 开启批量监听
        return factory;
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> config = new HashMap<>();
        //kafka地址
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        //组id
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        if (!StringUtils.isEmpty(kafkaSecurityProtocol) && !StringUtils.isEmpty(kafkaSASLMechanism)
                && !StringUtils.isEmpty(kafkaConsumerSASLJaasConfig)) {
            config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSecurityProtocol);
            config.put(SaslConfigs.SASL_MECHANISM, kafkaSASLMechanism);
            config.put("sasl.jaas.config", kafkaConsumerSASLJaasConfig);
        }
        return config;
    }

}

  • 消费者监听

使用@KafkaListener注解即可完成消费者监听,此处声明刚才定义的factory名称即可。

@KafkaListener(topics = {CREATE_MPRO_TOPIC}, containerFactory = "myKafkaListenerContainerFactory")
public void createMp(ConsumerRecord msg, Acknowledgment ack){
    try{
        Optional<?> value = Optional.ofNullable(msg.value());
        log.info("接收消息:"+ msg);
        if (value.isPresent()){
           //业务代码
        }
    }catch (Exception e){
        log.error("消息异常:" + e);
    }
    finally{
		//签收消息
}
}
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐