不同kafka集群下的多个消费者监听及多个生产者
在写公司需求的时候碰到一个需求,需要对不同集群下的多个kafka消费者topic进行消费,以前都是在yml文件中配置一个消费者。最多也就是统一集群不同topic,没想到这次是不同集群,一下子犯了难,查资料,请教同事后算是找到了解决方法,其实也很简单,配置多个kafka消费者配置即可。配置文件如下:(不同集群的不同消费者和不同生产者)import org.apache.kafka.clients.c
·
在写公司需求的时候碰到一个需求,需要对不同集群下的多个kafka消费者topic进行消费,以前都是在yml文件中配置一个消费者。最多也就是同一集群不同topic,没想到这次是不同集群,一下子犯了难,查资料,请教同事后算是找到了解决方法,其实也很简单,配置多个kafka消费者配置即可。
配置文件如下:(不同集群的不同消费者和不同生产者)
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap;
import java.util.Map;
/**
* 配置不同集群下的多个kafka消费者和生产者
*/
@Configuration
@EnableConfigurationProperties({})
public class KafkaConfig {
@Value("${spring.kafka.auditText.bootstrap-servers}")
private String auditTextSend;
@Value("${spring.kafka.auditTextResult.bootstrap-servers}")
private String auditTextResult;
@Value("${spring.kafka.authorStatus.bootstrap-servers}")
private String authorStatus;
@Value("${spring.kafka.articleInfo.bootstrap-servers}")
private String articleInfo;
@Value("${spring.kafka.consumer.concurrency}")
private Integer concurrency;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoReset;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private Boolean autoCommit;
@Value("${spring.kafka.consumer.max-poll-records}")
private Integer maxPoll;
private Map<String, Object> producerConfigs(String servers) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
private DefaultKafkaConsumerFactory<Object, Object> createConsumerFactory(String servers) {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoReset);
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPoll);
return new DefaultKafkaConsumerFactory<>(configs);
}
private ConcurrentKafkaListenerContainerFactory<String, String> createListenerContainerFactory(ConsumerFactory<Object, Object> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
factory.setConcurrency(concurrency);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
private ProducerFactory<String, String> producerAuditTextSendFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(auditTextSend));
}
@Bean(name = "auditTextSendKafkaTemplate")
public KafkaTemplate<String, String> AuditTextSendKafkaTemplate() {
return new KafkaTemplate<>(producerAuditTextSendFactory());
}
private ConsumerFactory<Object, Object> auditResultConsumerFactory() {
return createConsumerFactory(auditTextResult);
}
private ConsumerFactory<Object, Object> articleInfoConsumerFactory() {
return createConsumerFactory(articleInfo);
}
private ConsumerFactory<Object, Object> authorStatusConsumerFactory() {
return createConsumerFactory(authorStatus);
}
@Bean(name = "auditResultConsumer")
public ConcurrentKafkaListenerContainerFactory<String, String> auditResultContainerFactory() {
return createListenerContainerFactory(auditResultConsumerFactory());
}
@Bean(name = "articleInfoConsumer")
public ConcurrentKafkaListenerContainerFactory<String, String> articleInfoContainerFactory() {
return createListenerContainerFactory(articleInfoConsumerFactory());
}
@Bean(name = "authorStatusConsumer")
public ConcurrentKafkaListenerContainerFactory<String, String> authorStatusContainerFactory() {
return createListenerContainerFactory(authorStatusConsumerFactory());
}
}
消费监听用的是@KafkaListener注解,注解中有参数可以配置,例如下边这种。多个topic需要写多个class,这样自己看着也清晰,每个class对应一个监听的topic。
@KafkaListener(containerFactory = "articleInfoConsumer", topics = {"${spring.kafka.articleInfoTopic}"}, groupId = "${spring.kafka.consumer.group-id}")
生产者更简单,直接实例化不同的Bean即可。向多个集群的topic发送消息配置文件如下,里边的每一个bean都是一个单独的集群,bean可以直接调用send方法,参数是json消息和topic。
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.rewardOrderProducer.bootstrap-servers}")
private String rewardOrderProducerForBeiYanServers;
@Value("${spring.kafka.asyncDataProducer.bootstrap-servers}")
private String asyncDataProducerServer;
@Value("${spring.kafka.asyncDataProducerForBeiyan.bootstrap-servers}")
private String asyncDataProducerForBeiYanServers;
private Map<String, Object> producerConfigs(String servers) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
private ProducerFactory<String, String> producerAsyncDataProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(asyncDataProducerServer));
}
@Bean(name = "asyncDataProducerKafkaTemplate")
public KafkaTemplate<String, String> asyncDataProducerKafkaTemplate() {
return new KafkaTemplate<>(producerAsyncDataProducerFactory());
}
private ProducerFactory<String, String> producerBeiYanFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(asyncDataProducerForBeiYanServers));
}
@Bean(name = "BeiYanKafkaTemplate")
public KafkaTemplate<String, String> mpKafkaTemplate() {
return new KafkaTemplate<>(producerBeiYanFactory());
}
private ProducerFactory<String, String> producerOrderFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(rewardOrderProducerForBeiYanServers));
}
@Bean(name = "rewardOrderKafkaTemplate")
public KafkaTemplate<String, String> rewardOrderKafkaTemplate() {
return new KafkaTemplate<>(producerOrderFactory());
}
}
更多推荐
所有评论(0)