Kafka实现动态订阅topic
2、原理为spring定时轮询topic列表,符合条件的重新订阅,轮询时间配置项为ConsumerConfig.METADATA_MAX_AGE_CONFIG。1、topicPattern后面为正则表达式,凡是匹配该正则的都可以消费。采用spring-kafka包。
·
1、组件和依赖
采用spring-kafka包
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.3.RELEASE</version>
</dependency>
2、配置类
@Bean("aiKafkaListenerFactory")
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
Map<String, Object> props = new HashMap<>(5);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
//轮询时间配置
props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 12000);
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory(props, new StringDeserializer(), new StringDeserializer()));
return factory;
}
3、消费类
@KafkaListener(topicPattern = "{topic前缀}.*", containerFactory = "aiKafkaListenerFactory")
public void onMessage(ConsumerRecord<String, String> record) {
log.info("kafka消费内容:topic=" + record.topic() + ";content=" + record.value());
try {
//处理业务逻辑
} catch (Exception e) {
e.printStackTrace();
log.error("kafka消费失败:" + record.value());
}
}
注意:
1、topicPattern后面为正则表达式,凡是匹配该正则的都可以消费
2、原理为spring定时轮询topic列表,符合条件的重新订阅,轮询时间配置项为ConsumerConfig.METADATA_MAX_AGE_CONFIG
更多推荐
已为社区贡献1条内容
所有评论(0)