@KafkaListener 注解

在一般的消息消费场景中,开发者们常常使用 @KafkaListener 注解标注接收消息的处理方法。通过查看其注解的源码定义,可以得知它可用的功能点包括:

  • 设置监听的多个 topic 和 消费分区
  • 设置 topic 和消费分区的偏移量
  • 设置消费线程并发数
  • 设置消费异常处理器
  • 自动启动、id等属性配置

这些功能在使用该注解时,可以配置上去:

@KafkaListener(id = "demo_group", concurrentcy = "3", errorHandler = "kafkaErrorHandler", topicPartitions = {
    @TopicPartition(topic = "topic1", partitions = {"0","1"}),
    @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "10"))
    public String consume(String record) {
        logger.info("Consume message: " + record);
        return "Done!"
    }
})

上面代码中指定的消费异常处理器 kafkaErrorHandle 是一个实现了 KafkaListenerErrorHandler 接口的实例,指定的处理器名称是对应的实例在上下文中的名称。

指定监听器的生命周期

@KafkaListener 注解的参数 autoStartup 在没有指定的情况下默认为 true,即自动启动消费功能。在程序运行过程中,如果我们想动态地控制监听器的生命周期,可以通过调用 KafkaListenerEndpointRegistry 提供的方法来控制其启动(start)、停止(pause)、恢复(resume)。将上述方法封装成接口对外提供,一个简单的实现如下:

@RestController
public class KafkaController {
    @Autowired
    private KafkaListenerEndpointRegistry registry;
    
    @GetMapping("/kafka/start")
    public void start(String listenerId) {
        registry.getListenerContainer(listenerId).start();
    }
        
    @GetMapping("/kafka/pause")
    public void pause(String listenerId) {
        registry.getListenerContainer(listenerId).pause();
    }    
    
    @GetMapping("/kafka/resume")
    public void resume(String listenerId) {
        registry.getListenerContainer(listenerId).resume();
    }
}

在监听器的 autoStartup 字段为 false,或者相关的 listenerId 处于停止(pause)状态时,向该 id 发送消息,发送动作能执行成功,但是消息并不会被监听器接收到。当启动(start)监听器,或者恢复(resume)监听后,就可以看到之前发送的消息被接收了。

通过提供上述的接口,运维人员可以在服务发布上线之后,根据业务需求动态地管理 kafka 消息监听的生命周期,而不需要经过一系列的代码修改、发布等流程对监听器的状态进行修改,大大提升了业务的效率。

手动 Ack

Kafka 的消费者模式,通过都会默认开启自动提交偏移量,但是在一些服务的消费业务中,可能由于业务本身的复杂性或者网络的不稳定,导致在设定的自动提交周期内监听器并没有及时提交偏移量,此时 kafka 服务没能接收到心跳而挂掉,就会触发 rebalance 重新分配分组给客户端,进而在消费完成后,提交偏移量失败。由于上一次提交偏移量失败,所以等下一次触发消费时,又会从同一个位置开始消费,然而依旧会出现消费超时的情况。

要启用手动 Ack,相应的需要关闭自动提交功能,可以通过以下配置进行设置:

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual

完成配置后,只要在 @KafkaListener 监听方法加上 Acknowledgment 入参,便可以调用其方法手动提交偏移量了。

@KafkaListener(id = "group-id", topics = "demo-topic")
public String consume(String msg, Acknowledgment ack) {
    logger.info("Consume message: {}", msg);
    ack.acknowledge();
    return "Done!";
}

SendTo 消息转发

在使用 ReplyingKafkaTemplate 实现获取发送消息响应结果时,消费方法通过注解 @SentTo 可以将方法返回结果作为相应消费回复给生产者。在实际应用中,注解 @SentTo 还可以携带一个参数,用以指定消息转发接收的 Topic 队列。这里的业务场景就类似于一条生产线,或者是设计模式中的产业链模式,一个消息需要经过多次消费,多重加工,因此可以通过配置不同的 Topic 队列以及部署不同的消费方法来实现。

@KafkaListener(id = "group-id", topics = "demo-topic")
@SendTo("demo-topic2")
public String consume(String msg) {
    logger.info("Forward message: {}", msg);
    return "Forward message: " + msg;
}

@KafkaListener(id = "group-id2", topics = "demo-topic2")
public String consume2(String msg) {
    logger.info("Consume message: {}", msg);
    return "Done!";
}

消息重试处理

Kafka 内部封装了一套可重复消费信息的机制,当消费者在消费数据出现异常时,自动触发重新消费数据的动作。并且可以配置失败重试次数,超过这个次数后,将数据转移至预设好的队列(这里称之为 死信队列 ),统一进行日志等记录。

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
		ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
		ConsumerFactory<Object, Object> kafkaConsumerFactory,
		KafkaTemplate<Object, Object> template) {
	ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
	configurer.configure(factory, kafkaConsumerFactory);
	//最大重试三次
	factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3));
	return factory;
}

@KafkaListener(id = "group-id", topics = "demo-topic")
public String consume(String msg) {
    logger.info("Forward message: {}", msg);
    throw new RuntimeException("dlt msg!");
}

@KafkaListener(id = "group-dlt", topics = "demo-topic.DLT")
public void dltConsume(String msg) {
    logger.info("Failed message from DLT: {}", msg);
}

在上面的示例代码中,demo-topic 监听到消息后,会抛出超时异常,然后监听器会依次进行 3 次重试,超过 3 次后,该消息就会被转移至死信队列中去。由上面的例子可以看出,死信队列的 topic,就是在原来队列 topic 的后面加上”.DLT“。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐