基于spring多Kafka实例配置
1、多实例配置原理和单实例配置的区别项目中如果使用了不同实例的kafka就需要多配置,单个实例的话使用springboot的yaml配置自动装配即可。1.1 单实例spring:kafka:bootstrap-servers: serverconsumer:group-id: 消费者你的groupIdenable-auto-commit: falseauto-offset-reset: earli
·
1、多实例配置原理和单实例配置的区别
项目中如果使用了不同实例的kafka就需要多配置,单个实例的话使用springboot的yaml配置自动装配即可。
1.1 单实例
spring:
kafka:
bootstrap-servers: server
consumer:
group-id: 消费者你的groupId
enable-auto-commit: false
auto-offset-reset: earliest
#用于链接带密码的kafka 配置,如果kafka没有密码需要注释掉
properties:
sasl.mechanism: PLAIN
security.protocol: SASL_PLAINTEXT
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="用户" password="密码";
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#根据需要配置生产者还是消费者
#producer:
listener:
ack-mode: manual
concurrency: 1
1.2 多kafka实例
- yaml配置
spring:
kafka:
bootstrap-servers: server
consumer:
group-id: 消费者你的groupId
enable-auto-commit: false
auto-offset-reset: earliest
#用于链接带密码的kafka 配置,如果kafka没有密码需要注释掉
properties:
sasl.mechanism: PLAIN
security.protocol: SASL_PLAINTEXT
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="用户" password="密码";
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#根据需要配置生产者还是消费者
#producer:
listener:
ack-mode: manual
concurrency: 1
kafka1:
bootstrap-servers: server1
consumer:
group-id: 消费者你的groupId
enable-auto-commit: false
auto-offset-reset: earliest
#用于链接带密码的kafka 配置,如果kafka没有密码需要注释掉
properties:
sasl.mechanism: PLAIN
security.protocol: SASL_PLAINTEXT
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="server1的用户" password="密码";
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#根据需要配置生产者还是消费者
#producer:
listener:
ack-mode: manual
concurrency: 1
此处第一个kafka会被spring自动装配,所以只需要对kafka2进行配置即可。
- Java配置
注意最好手动指定数据源factory的bean名称,在消费者端会用到。
package com.cmbchina.archman.mprogramcore.config;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.util.StringUtils;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka2.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka2.consumer.group-id}")
private String groupId;
@Value("${spring.kafka2.consumer.properties.security.protocol}")
private String kafkaSecurityProtocol;
@Value("${spring.kafka2.consumer.properties.sasl.mechanism}")
private String kafkaSASLMechanism;
@Value("${spring.kafka2.consumer.properties.sasl.jaas.config}")
private String kafkaConsumerSASLJaasConfig;
//此处最好手动指定数据源factory的bean名称,在消费者端会用到
@Bean("myKafkaListenerContainerFactory")
public KafkaListenerContainerFactory<?> batchFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.setBatchListener(true); // 开启批量监听
return factory;
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> config = new HashMap<>();
//kafka地址
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//组id
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
if (!StringUtils.isEmpty(kafkaSecurityProtocol) && !StringUtils.isEmpty(kafkaSASLMechanism)
&& !StringUtils.isEmpty(kafkaConsumerSASLJaasConfig)) {
config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSecurityProtocol);
config.put(SaslConfigs.SASL_MECHANISM, kafkaSASLMechanism);
config.put("sasl.jaas.config", kafkaConsumerSASLJaasConfig);
}
return config;
}
}
- 消费者监听
使用@KafkaListener注解即可完成消费者监听,此处声明刚才定义的factory名称即可。
@KafkaListener(topics = {CREATE_MPRO_TOPIC}, containerFactory = "myKafkaListenerContainerFactory")
public void createMp(ConsumerRecord msg, Acknowledgment ack){
try{
Optional<?> value = Optional.ofNullable(msg.value());
log.info("接收消息:"+ msg);
if (value.isPresent()){
//业务代码
}
}catch (Exception e){
log.error("消息异常:" + e);
}
finally{
//签收消息
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)