1、组件和依赖

采用spring-kafka包

        <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.5.3.RELEASE</version>
        </dependency>

2、配置类

@Bean("aiKafkaListenerFactory")
    public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
        Map<String, Object> props = new HashMap<>(5);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        //轮询时间配置
        props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 12000);
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory(props, new StringDeserializer(), new StringDeserializer()));
        return factory;
    }

 

3、消费类

@KafkaListener(topicPattern = "{topic前缀}.*", containerFactory = "aiKafkaListenerFactory")
    public void onMessage(ConsumerRecord<String, String> record) {
        log.info("kafka消费内容:topic=" + record.topic() + ";content=" + record.value());
        try {
            //处理业务逻辑
        } catch (Exception e) {
            e.printStackTrace();
            log.error("kafka消费失败:" + record.value());
        }
    }

 

注意:

1、topicPattern后面为正则表达式,凡是匹配该正则的都可以消费

2、原理为spring定时轮询topic列表,符合条件的重新订阅,轮询时间配置项为ConsumerConfig.METADATA_MAX_AGE_CONFIG

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐