【Kafka】@Spring-kafka 消费功能使用总结
Kafka 的消费者模式,通过都会默认开启自动提交偏移量,但是在一些服务的消费业务中,可能由于业务本身的复杂性或者网络的不稳定,导致在设定的自动提交周期内监听器并没有及时提交偏移量,此时 kafka 服务没能接收到心跳而挂掉,就会触发 rebalance 重新分配分组给客户端,进而在消费完成后,提交偏移量失败。通过提供上述的接口,运维人员可以在服务发布上线之后,根据业务需求动态地管理 kafka
@KafkaListener 注解
在一般的消息消费场景中,开发者们常常使用 @KafkaListener
注解标注接收消息的处理方法。通过查看其注解的源码定义,可以得知它可用的功能点包括:
- 设置监听的多个 topic 和 消费分区
- 设置 topic 和消费分区的偏移量
- 设置消费线程并发数
- 设置消费异常处理器
- 自动启动、id等属性配置
这些功能在使用该注解时,可以配置上去:
@KafkaListener(id = "demo_group", concurrentcy = "3", errorHandler = "kafkaErrorHandler", topicPartitions = {
@TopicPartition(topic = "topic1", partitions = {"0","1"}),
@TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "10"))
public String consume(String record) {
logger.info("Consume message: " + record);
return "Done!"
}
})
上面代码中指定的消费异常处理器 kafkaErrorHandle
是一个实现了 KafkaListenerErrorHandler
接口的实例,指定的处理器名称是对应的实例在上下文中的名称。
指定监听器的生命周期
@KafkaListener
注解的参数 autoStartup 在没有指定的情况下默认为 true,即自动启动消费功能。在程序运行过程中,如果我们想动态地控制监听器的生命周期,可以通过调用 KafkaListenerEndpointRegistry
提供的方法来控制其启动(start)、停止(pause)、恢复(resume)。将上述方法封装成接口对外提供,一个简单的实现如下:
@RestController
public class KafkaController {
@Autowired
private KafkaListenerEndpointRegistry registry;
@GetMapping("/kafka/start")
public void start(String listenerId) {
registry.getListenerContainer(listenerId).start();
}
@GetMapping("/kafka/pause")
public void pause(String listenerId) {
registry.getListenerContainer(listenerId).pause();
}
@GetMapping("/kafka/resume")
public void resume(String listenerId) {
registry.getListenerContainer(listenerId).resume();
}
}
在监听器的 autoStartup 字段为 false,或者相关的 listenerId 处于停止(pause)状态时,向该 id 发送消息,发送动作能执行成功,但是消息并不会被监听器接收到。当启动(start)监听器,或者恢复(resume)监听后,就可以看到之前发送的消息被接收了。
通过提供上述的接口,运维人员可以在服务发布上线之后,根据业务需求动态地管理 kafka 消息监听的生命周期,而不需要经过一系列的代码修改、发布等流程对监听器的状态进行修改,大大提升了业务的效率。
手动 Ack
Kafka 的消费者模式,通过都会默认开启自动提交偏移量,但是在一些服务的消费业务中,可能由于业务本身的复杂性或者网络的不稳定,导致在设定的自动提交周期内监听器并没有及时提交偏移量,此时 kafka 服务没能接收到心跳而挂掉,就会触发 rebalance 重新分配分组给客户端,进而在消费完成后,提交偏移量失败。由于上一次提交偏移量失败,所以等下一次触发消费时,又会从同一个位置开始消费,然而依旧会出现消费超时的情况。
要启用手动 Ack,相应的需要关闭自动提交功能,可以通过以下配置进行设置:
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual
完成配置后,只要在 @KafkaListener
监听方法加上 Acknowledgment 入参,便可以调用其方法手动提交偏移量了。
@KafkaListener(id = "group-id", topics = "demo-topic")
public String consume(String msg, Acknowledgment ack) {
logger.info("Consume message: {}", msg);
ack.acknowledge();
return "Done!";
}
SendTo 消息转发
在使用 ReplyingKafkaTemplate
实现获取发送消息响应结果时,消费方法通过注解 @SentTo
可以将方法返回结果作为相应消费回复给生产者。在实际应用中,注解 @SentTo
还可以携带一个参数,用以指定消息转发接收的 Topic 队列。这里的业务场景就类似于一条生产线,或者是设计模式中的产业链模式,一个消息需要经过多次消费,多重加工,因此可以通过配置不同的 Topic 队列以及部署不同的消费方法来实现。
@KafkaListener(id = "group-id", topics = "demo-topic")
@SendTo("demo-topic2")
public String consume(String msg) {
logger.info("Forward message: {}", msg);
return "Forward message: " + msg;
}
@KafkaListener(id = "group-id2", topics = "demo-topic2")
public String consume2(String msg) {
logger.info("Consume message: {}", msg);
return "Done!";
}
消息重试处理
Kafka 内部封装了一套可重复消费信息的机制,当消费者在消费数据出现异常时,自动触发重新消费数据的动作。并且可以配置失败重试次数,超过这个次数后,将数据转移至预设好的队列(这里称之为 死信队列 ),统一进行日志等记录。
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
//最大重试三次
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3));
return factory;
}
@KafkaListener(id = "group-id", topics = "demo-topic")
public String consume(String msg) {
logger.info("Forward message: {}", msg);
throw new RuntimeException("dlt msg!");
}
@KafkaListener(id = "group-dlt", topics = "demo-topic.DLT")
public void dltConsume(String msg) {
logger.info("Failed message from DLT: {}", msg);
}
在上面的示例代码中,demo-topic 监听到消息后,会抛出超时异常,然后监听器会依次进行 3 次重试,超过 3 次后,该消息就会被转移至死信队列中去。由上面的例子可以看出,死信队列的 topic,就是在原来队列 topic 的后面加上”.DLT“。
更多推荐
所有评论(0)