基于kafka的延时队列实现
基于kafka的延时队列实现这里写目录标题基于kafka的延时队列实现延时队列什么是延时队列使用场景相关消息中间件的延时队列实现rabbitmq基于kafka的简单延时队列消费实现为什么要基于kafka来做延时队列demo效果延时队列延时队列顾名思义就是一个可以延时(消费)的队列。生产者和消费者约定消息延迟的时间,投递到延时队列的消息则会在约定的时间被消费。什么是延时队列使用场景相关消息中间件的延
基于kafka的延时队列实现
延时队列简介
延时队列顾名思义就是一个可以延时(消费)的队列。生产者和消费者约定消息延迟的时间,投递到延时队列的消息则会在约定的时间被消费。那么为什么要这样使用消息队列呢?我们来看一下如下的几个业务场景:
- 用户创建订单、锁定库存,30分钟内未支付则取消订单,释放库存。
- 用户提交作业后一天之内未批改,通知老师进行批改。
- 直播结束30分钟后开始直播效果统计计算。
- ········
以上几个业务场景如果使用定时轮询的方式执行,那肯定会存在较大误差并且增加了大量无效数据扫描,效率低。为了高效精确制导就需要我们的延时队列了。
相关延时队列的实现
这里简单介绍下几个常用实现。
Java DelayQueue
Java util包提供的延迟队列,是一个优先级阻塞队列。业务线程通过take方法获取可用消息,如果队列无消息则无限期阻塞当前线程,直到其他线程往队列中添加元素时,唤醒当前阻塞线程。take时当队列中有消息且已到过期时间时则直接返回对应消息,如果消息未到过期时间则需要判断是否有leader线程已经阻塞,如果存在则自己无限期休眠。没有leader则阻塞自己到队首消息的过期时间。
该实现将消息放入内存,触发基本无延迟,但是需要进行持久化与分布式环境应用的相关拓展。
rabbitmq
rabbitmq的延迟队列实现是通过消息过期时间是死信队列路由实现,rabbitmq的消息以及队列都可以设置,如果同时设置则是会取小的那个。死信路由则可以对应多个队列,当一个消息满足以下三个条件时就可以被死信路由投递:
- 消息过期。
- 消息被拒收且不重新入队。
- 消息队列被塞满,前面的消息会被死信路由处理。
所以用rabbitmq来实现死信队列需要使用两个队列。
redis 过期回调
redis 内数据可以设置过期时间,业务上则可以接受消息过期后的回调。但是由于redis的过期策略是惰性的,所以消息的通知也会不及时。只有在真正删除时才会通知到客户端。由于业务上也有使用过redis消息,经常出现个别客户端收不到消息的情况,还没深究到底是使用方式问题还是redis pub、sub稳定性问题。
时间轮
时间轮是一种环形的数据格式,相当于时钟一样,只是时间间隔可能不是一秒一个格。每个格上放着延时任务的队列。当时间走到特定的格子后遍历链表上的任务是否到可执行时间点。这里就不再详述了,相关实现可以自行百度。
基于kafka的简单延时队列消费实现
为什么要基于kafka来做延时队列
首先我们的业务场景是一个非常密集的用户打点行为,例如每个用户五秒一个请求,统计后转消息队列,两分钟后消费。基于这个场景我们选用kafka作为延时队列有过几点考虑。首先当前技术栈中使用的消息中间件有rabbitmq和kafka,并且有成熟的运维。限于业务进度原因、并且不是很有必要引入新的技术栈来增加运维难度。所以自然而然地就二选一了。而当前这个场景每个用户两分钟会堆积24个消息,2w用户同时学就是堆积48w消息,如果有集中的学习,例如统一考试、竞赛等。对于rabbitmq是个不小的挑战。因此选择了更能堆的kafka。
实现
使用实例
使用kafka延迟消费功能非常便捷,开箱即用。只需要引入对应jar包然后在启动类上加上@EnableKafkaDelay注解。最后在具体的消费逻辑方法上加上注解@KafkaDelayListener即可体验延迟消费的快乐。就像@kafkaListener一样方便高效。对于延迟消息的消费方法,提供了两个参数,分别是ConsumerRecord与KafkaDelayCommitter,KafkaDelayCommitter用于业务需要手动提交的场景,注意需要配合consumer的autocommit=false来使用。
配置示例
kafka:
delay:
bootstrap-servers: 10.177.47.5:9092
consumer:
groupId: testDelay
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
使用示例
//pom配置
<dependency>
<groupId>com.netease.edu.tiku</groupId>
<artifactId>tiku-common-kafka</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
//启动类配置
@SpringBootApplication
@EnableKafkaDelay
public class TestApplication {
public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
}
//示例方法1
@KafkaDelayListener(topic = "test-delay",delayTimeSec = 2 )
public void onMessage(ConsumerRecord<String,String> record, KafkaDelayCommitter committer){
System.out.println(record.value());
committer.commitSync();
}
//示例方法2
@KafkaDelayListener(topic = "test")
public void onMessage1(ConsumerRecord<String,String> record){
System.out.println(record.value());
}
具体实现
文件结构如下,一共是是十一个文件,具体的实现参考了spring autoconfig的大致逻辑以及spring data kafka的相关实现。但是复杂度与完善度肯定还是不如的。相信通过下文你也能对spring的自动配置逻辑有一定的理解。
EnableKafkaDelay
@EnableKafkaDelay注解用于加载kafkaDelay相关组件。通过import来加载KafkaDelayConsumerBootstrap到spring 容器中,注意这里的import是只能加在类上的,所以整个注解限制的Target是ElementType.TYPE。
import org.springframework.context.annotation.Import;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author dewey
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(KafkaDelayConsumerBootstrap.class)
public @interface EnableKafkaDelay {
}
KafkaDelayListener
KafkaDelayListener注解用于启动消费者线程来延迟消费kafka消息,方法内只需要关心消息的处理逻辑即可,如果关闭了自动提交则还需要在方法中控制消费位移的提交。该注解提供了三个可配置参数,topic,delayTimeSec和pauseTimeSec。topic表示消费者需要订阅队列。delayTimeSec则是从消息产生到消费之间需要等待的时间,即延时消息的延迟时间。这里可以看到延迟时间是配置到消费者端的,所以该延迟消息投递后消费者是一直可见的,这与rabbitmq的实现有着很大的不同。pauseTimeSec则是kafka消费者发现当前无延迟消息到期时主动paus的时间。(tips:kafkaConsumer pause后只是不拉取消息,心跳线程不会受到影响,不会引发分区重分区)
import org.springframework.context.annotation.Import;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author dewey
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface KafkaDelayListener {
String topic();
/**
* 延迟消费的时间,单位秒
* @return
*/
int delayTimeSec() default 120;
/**
* 消费者pause时间,单位秒
* @return
*/
int pauseTimeSec() default 2;
}
KafkaDelayConsumerBootstrap
负责启动kafka延迟消费的managerbean以及KafkaDelay的健康检查bean。这里需要注意的是两个bean的启动顺序。这里KafkaDelayHealthIndicator依赖了kafkaDelayConsumerManager来获取kafkadelayconsumer的整体状态,所以需要在kafkaDelayConsumerManager之后启动。
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
/**
* @author dewey
*/
@Configuration
@Slf4j
@EnableConfigurationProperties(KafkaDelayConfig.class)
public class KafkaDelayConsumerBootstrap {
@Bean("kafkaDelayConsumerManager")
public KafkaDelayConsumerManager kafkaDelayConsumerManager(){
log.info("=====init kafkaDelayConsumerManager");
return new KafkaDelayConsumerManager();
}
@Bean("kafkaDelayHealthIndicator")
@DependsOn("kafkaDelayConsumerManager")
public KafkaDelayHealthIndicator kafkaDelayHealthIndicator(){
log.info("=====init KafkaDelayHealthIndicator");
return new KafkaDelayHealthIndicator();
}
}
KafkaDelayConfig
kafka延迟消费者相关配置,注意仅用于消费者,毕竟我们是基于消费者来实现。KafkaDelayConfig引入了spring kafka的配置类,这样如果项目中引用了spring-data-kafka则会默认使用spring-kafka配置。避免了重复配置,只需关注延迟消费的topic以及延迟时间等相关配置即可。但是如果同时配置了kafka-delay相关的配置则在delayConsumer中会覆盖spring-data-kafka的配置。
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.unit.DataSize;
import java.time.Duration;
import java.util.*;
/**
* 自定义kafka配置
* 仅用作consumer
* @author dewey
*/
@ConfigurationProperties(prefix = "kafka.delay")
@Configuration
public class KafkaDelayConfig {
@Autowired(required = false)
private KafkaProperties kafkaProperties;
private Consumer consumer = new Consumer();
private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));
private String clientId;
/**
* 获取消费端配置
*/
public Map<String,Object> getConsumerProperties(){
Map<String,Object> properties = this.getSpringKafkaConfig();
properties.putAll(this.buildCommonProperties());
properties.putAll(this.consumer.buildProperties());
return properties;
}
/**
* 集成spring-kafka工程的先获取默认配置
* @return
*/
public Map<String, Object> getSpringKafkaConfig(){
Map<String,Object> properties = new HashMap<>();
if(kafkaProperties != null){
return kafkaProperties.buildConsumerProperties();
} else{
return properties;
}
}
private Map<String, Object> buildCommonProperties() {
Map<String, Object> properties = new HashMap<>();
if (this.bootstrapServers != null) {
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
}
if (this.clientId != null) {
properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
}
return properties;
}
public static class Consumer {
/**
* Frequency with which the consumer offsets are auto-committed to Kafka if
* 'enable.auto.commit' is set to true.
*/
private Duration autoCommitInterval;
/**
* What to do when there is no initial offset in Kafka or if the current offset no
* longer exists on the server.
*/
private String autoOffsetReset;
/**
* Comma-delimited list of host:port pairs to use for establishing the initial
* connections to the Kafka cluster. Overrides the global property, for consumers.
*/
private List<String> bootstrapServers;
/**
* ID to pass to the server when making requests. Used for server-side logging.
*/
private String clientId;
/**
* Whether the consumer's offset is periodically committed in the background.
*/
private Boolean enableAutoCommit;
/**
* Maximum amount of time the server blocks before answering the fetch request if
* there isn't sufficient data to immediately satisfy the requirement given by
* "fetch-min-size".
*/
private Duration fetchMaxWait;
/**
* Minimum amount of data the server should return for a fetch request.
*/
private DataSize fetchMinSize;
/**
* Unique string that identifies the consumer group to which this consumer
* belongs.
*/
private String groupId;
/**
* Expected time between heartbeats to the consumer coordinator.
*/
private Duration heartbeatInterval;
/**
* Isolation level for reading messages that have been written transactionally.
*/
private IsolationLevel isolationLevel = IsolationLevel.READ_UNCOMMITTED;
/**
* Deserializer class for keys.
*/
private Class<?> keyDeserializer = StringDeserializer.class;
/**
* Deserializer class for values.
*/
private Class<?> valueDeserializer = StringDeserializer.class;
/**
* Maximum number of records returned in a single call to poll().
*/
private Integer maxPollRecords;
/**
* Additional consumer-specific properties used to configure the client.
*/
private final Map<String, String> properties = new HashMap<>();
public Duration getAutoCommitInterval() {
return this.autoCommitInterval;
}
public void setAutoCommitInterval(Duration autoCommitInterval) {
this.autoCommitInterval = autoCommitInterval;
}
public String getAutoOffsetReset() {
return this.autoOffsetReset;
}
public void setAutoOffsetReset(String autoOffsetReset) {
this.autoOffsetReset = autoOffsetReset;
}
public List<String> getBootstrapServers() {
return this.bootstrapServers;
}
public void setBootstrapServers(List<String> bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
public String getClientId() {
return this.clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public Boolean getEnableAutoCommit() {
return this.enableAutoCommit;
}
public void setEnableAutoCommit(Boolean enableAutoCommit) {
this.enableAutoCommit = enableAutoCommit;
}
public Duration getFetchMaxWait() {
return this.fetchMaxWait;
}
public void setFetchMaxWait(Duration fetchMaxWait) {
this.fetchMaxWait = fetchMaxWait;
}
public DataSize getFetchMinSize() {
return this.fetchMinSize;
}
public void setFetchMinSize(DataSize fetchMinSize) {
this.fetchMinSize = fetchMinSize;
}
public String getGroupId() {
return this.groupId;
}
public void setGroupId(String groupId) {
this.groupId = groupId;
}
public Duration getHeartbeatInterval() {
return this.heartbeatInterval;
}
public void setHeartbeatInterval(Duration heartbeatInterval) {
this.heartbeatInterval = heartbeatInterval;
}
public IsolationLevel getIsolationLevel() {
return this.isolationLevel;
}
public void setIsolationLevel(IsolationLevel isolationLevel) {
this.isolationLevel = isolationLevel;
}
public Class<?> getKeyDeserializer() {
return this.keyDeserializer;
}
public void setKeyDeserializer(Class<?> keyDeserializer) {
this.keyDeserializer = keyDeserializer;
}
public Class<?> getValueDeserializer() {
return this.valueDeserializer;
}
public void setValueDeserializer(Class<?> valueDeserializer) {
this.valueDeserializer = valueDeserializer;
}
public Integer getMaxPollRecords() {
return this.maxPollRecords;
}
public void setMaxPollRecords(Integer maxPollRecords) {
this.maxPollRecords = maxPollRecords;
}
public Map<String, String> getProperties() {
return this.properties;
}
public Map<String, Object> buildProperties() {
Map<String, Object> properties = new HashMap<>();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(this::getAutoCommitInterval).asInt(Duration::toMillis)
.to(v->properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,v));
map.from(this::getAutoOffsetReset).to(v->properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,v));
map.from(this::getBootstrapServers).to(v->properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,v));
map.from(this::getClientId).to(v->properties.put(ConsumerConfig.CLIENT_ID_CONFIG,v));
map.from(this::getEnableAutoCommit).to(v->properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,v));
map.from(this::getFetchMaxWait).asInt(Duration::toMillis)
.to(v->properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,v));
map.from(this::getFetchMinSize).asInt(DataSize::toBytes)
.to(v->properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,v));
map.from(this::getGroupId).to(v->properties.put(ConsumerConfig.GROUP_ID_CONFIG,v));
map.from(this::getHeartbeatInterval).asInt(Duration::toMillis)
.to(v->properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,v));
map.from(() -> getIsolationLevel().name().toLowerCase(Locale.ROOT))
.to(v->properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,v));
map.from(this::getKeyDeserializer).to(v->properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,v));
map.from(this::getValueDeserializer).to(v->properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,v));
map.from(this::getMaxPollRecords).to(v->properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,v));
return properties;
}
}
public List<String> getBootstrapServers() {
return bootstrapServers;
}
public void setBootstrapServers(List<String> bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public KafkaProperties getKafkaProperties() {
return kafkaProperties;
}
public void setKafkaProperties(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
public Consumer getConsumer() {
return consumer;
}
public void setConsumer(Consumer consumer) {
this.consumer = consumer;
}
}
KafkaDelayListenerType
延迟消费者类型,用于区分是否需要手动提交消费位移。
/**
* @author dewey
*/
public enum KafkaDelayListenerType {
/**
* 只接受消息模式,autocommit必须置为true
*/
SIMPLE,
/**
* 业务控制commit,autocommit必须为false
*/
ACKNOWLEDGING
}
KafkaDelayCommitter
kafka延迟消费者消息提交逻辑的封装,在延迟消费消息的方法中可以接受该参数来进行位移的手动提交。该接口的方法实现是直接调用的kafkaconsumer对应的位移提交方法。这样封装主要是为了避免直接暴露consumer来给使用方。
/**
* @author dewey
*/
public interface KafkaDelayCommitter {
/**
* 封装kafka consumer的commitSync
*/
void commitSync();
/**
* 封装kafka consumer的commitAsync
*/
void commitAsync();
}
KafkaDelayConsumerContainer
kafka延迟消费着相关组件封装,包含使用注解的方法、方法对应的bean、注解详情、消费者、消费者提交等必要组件。同时还提供了一个KafkaDelayCommitter接口的默认实现。该类的构造方法中会根据带有@KafkaDelayListener的方法的参数来动态生成KafkaDelayListenerType,d参数类型则只能使用org.apache.kafka.clients.consumer.ConsumerRecord与com.netease.com.tiku.kafka.delay.KafkaDelayCommitter。如果是ACKNOWLEDGING类型则会初始化consumerCommitter。
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.lang.reflect.Method;
/**
* @author dewey
*/
public class KafkaDelayConsumerContainer {
/**
* 消费逻辑方法所属bean
*/
private Object bean;
/**
* 业务消费方法
*/
private Method method;
/**
* 注解详情
*/
private KafkaDelayListener kafkaDelayListener;
/**
* 监听方式,手动commit与自动commit
*/
private KafkaDelayListenerType listenerType;
/**
* 消费者
*/
private KafkaConsumer<String,String> consumer;
/**
* 提交位移逻辑
*/
private ConsumerCommitter consumerCommitter;
private final String paramTypeRecord = "org.apache.kafka.clients.consumer.ConsumerRecord";
private final String paramTypeCommitter = "com.netease.com.tiku.kafka.delay.KafkaDelayCommitter";
public KafkaDelayConsumerContainer(Object bean, Method method, KafkaDelayListener kafkaDelayListener) {
this.bean = bean;
this.method = method;
this.kafkaDelayListener = kafkaDelayListener;
Class<?>[] parameterTypes = method.getParameterTypes();
if(parameterTypes.length <= 0 || parameterTypes.length >= 3){
throw new RuntimeException(String.format("KafkaDelayConsumerContainer:Method[%s] parameter number error", method.getName()));
}
if(!paramTypeRecord.equals(parameterTypes[0].getName())){
throw new RuntimeException(String.format("KafkaDelayConsumerContainer:Method[%s] parameter type error,need %s but find %s",
method.getName(),paramTypeRecord,parameterTypes[0].getName()));
}
if(parameterTypes.length > 1 && !paramTypeCommitter.equals(parameterTypes[1].getName())){
throw new RuntimeException(String.format("KafkaDelayConsumerContainer:Method[%s] parameter type error,need %s but find %s",
method.getName(),paramTypeCommitter,parameterTypes[1].getName()));
}
if(parameterTypes.length >= 2){
this.consumerCommitter = new ConsumerCommitter();
listenerType = KafkaDelayListenerType.ACKNOWLEDGING;
}else{
listenerType = KafkaDelayListenerType.SIMPLE;
}
}
/**
* 封装提交逻辑
*/
private final class ConsumerCommitter implements KafkaDelayCommitter {
@Override
public void commitSync() {
processCommitSync();
}
@Override
public void commitAsync() {
processCommitAsync();
}
}
private void processCommitSync(){
consumer.commitSync();
}
private void processCommitAsync(){
consumer.commitAsync();
}
public KafkaDelayListener getKafkaDelayListener() {
return kafkaDelayListener;
}
public void setKafkaDelayListener(KafkaDelayListener kafkaDelayListener) {
this.kafkaDelayListener = kafkaDelayListener;
}
public Object getBean() {
return bean;
}
public void setBean(Object bean) {
this.bean = bean;
}
public Method getMethod() {
return method;
}
public void setMethod(Method method) {
this.method = method;
}
public KafkaDelayListenerType getListenerType() {
return listenerType;
}
public void setListenerType(KafkaDelayListenerType listenerType) {
this.listenerType = listenerType;
}
public KafkaConsumer<String, String> getConsumer() {
return consumer;
}
public void setConsumer(KafkaConsumer<String, String> consumer) {
this.consumer = consumer;
}
public ConsumerCommitter getConsumerCommitter() {
return consumerCommitter;
}
public void setConsumerCommitter(ConsumerCommitter consumerCommitter) {
this.consumerCommitter = consumerCommitter;
}
}
KafkaDelayConsumerThread
kafka延迟消息的消费者线程,kafka延迟消费的逻辑实现。初始化时会根据消费者类型来动态的初始化consumerCommiter。线程启动时订阅指定topic,然后开始消费消息,当kafka队列中无可消费的延迟消息时,消费者进入pause状态,不再拉取指定队列的消息。等到pauseTime之后再回复消费。消息的延迟时间以及pause时间则是从注解参数中获取。由于pause时间没有根据对首消息的到期时间来动态设定,所以消息的实际消费时间与预设时间会存在一个pauseTime时间内的误差。由于对于当前业务不敏感,也就无所谓了。
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Collections;
/**
* @author dewey
*/
@Slf4j
public class KafkaDelayConsumerThread extends Thread {
private KafkaConsumer<String, String> consumer;
private Method method;
private Object bean;
private String topic;
private KafkaDelayCommitter committer;
private long pauseTime;
private long delayTime;
private boolean autoCommit;
/**
* 用户Damon判断消费者是否正常
* todo 后面优化kafkaconsumer的状态判断
*/
private String consumerStatus;
private static String CONSUMER_STATUS_RUNNING = "normal";
private static String CONSUMER_STATUS_PAUSE = "pause";
public KafkaDelayConsumerThread(KafkaDelayConsumerContainer consumerContainer) {
KafkaDelayListener kafkaDelayListener = consumerContainer.getKafkaDelayListener();
this.consumer = consumerContainer.getConsumer();
this.method = consumerContainer.getMethod();
this.bean = consumerContainer.getBean();
this.topic = kafkaDelayListener.topic();
if (consumerContainer.getListenerType().equals(KafkaDelayListenerType.ACKNOWLEDGING)) {
this.autoCommit = false;
this.committer = consumerContainer.getConsumerCommitter();
} else {
autoCommit = true;
}
//转换为毫秒
this.pauseTime = kafkaDelayListener.pauseTimeSec() * 1000;
this.delayTime = kafkaDelayListener.delayTimeSec() * 1000;
}
@Override
public void run() {
//订阅
consumer.subscribe(Collections.singleton(topic));
boolean paused = false;
while (true) {
if (paused) {
try {
Thread.sleep(pauseTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (log.isDebugEnabled()) {
log.debug("KafkaDelayConsumerManager:topic{},消费重启", topic);
}
this.consumerStatus = "pause";
consumer.resume(consumer.paused());
paused = false;
} else {
this.consumerStatus = "run";
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
for (ConsumerRecord<String, String> consumerRecord : records) {
TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
if (consumerRecord.timestamp() > System.currentTimeMillis() - delayTime) {
if (log.isDebugEnabled()) {
log.debug("KafkaDelayConsumerManager:topic{},消费暂停", topic);
}
paused = true;
consumer.pause(Collections.singleton(topicPartition));
consumer.seek(topicPartition, consumerRecord.offset());
break;
} else {
try {
if (autoCommit) {
method.invoke(bean, consumerRecord);
} else {
method.invoke(bean, consumerRecord, committer);
}
} catch (Exception e) {
log.error("KafkaDelayConsumerManager:consume fail,topic:{},message{}", topic, consumerRecord.value());
}
}
}
}
}
}
public String getConsumerStatus() {
return consumerStatus;
}
}
KafkaDelayConsumerDamonThread
kafka延迟消费线程的守护线程,用于监控消费线程状态,在消费者状态不对时负责重启对应消费线程(待实现)。同时向外暴露健康检查方法,用户外接获取当前延迟消费逻辑的状态。
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
/**
* @author dewey
*/
@Slf4j
public class KafkaDelayConsumerDamonThread extends Thread {
private Map<String,KafkaDelayConsumerThread> consumerThreadsMap = new HashMap<>();
private String consumerDamonStatus ;
/**
* 服务正常
*/
public static final String STATUS_UP = "UP";
/**
* 部分消费线程不可用
*/
public static final String STATUS_OUT_OF_SERVICE = "OUT_OF_SERVICE";
public KafkaDelayConsumerDamonThread(Map<String,KafkaDelayConsumerThread> consumerThreadsMap) {
this.consumerThreadsMap = consumerThreadsMap;
this.consumerDamonStatus = STATUS_UP;
}
public String getConsumersStatus() {
return consumerDamonStatus;
}
@Override
public void run() {
boolean up = true;
while (true) {
try {
//check interval
Thread.sleep(10000);
if (log.isDebugEnabled()) {
log.debug("KafkaDelayConsumerManager:check consumer threads status");
}
for(Map.Entry<String,KafkaDelayConsumerThread> entry : consumerThreadsMap.entrySet()){
String topic = entry.getKey();
KafkaDelayConsumerThread consumerThread = entry.getValue();
if(consumerThread == null){
up = false;
log.error("KafkaDelayConsumerDamonThread:topic{},consumer thread exit",topic);
break;
}
if(consumerThread.getState() == State.TERMINATED){
up = false;
log.error("KafkaDelayConsumerDamonThread:topic{},consumer thread terminated",topic);
//consumerThread.start();
}
}
if(!up){
this.consumerDamonStatus = STATUS_OUT_OF_SERVICE;
}
if (log.isDebugEnabled()) {
log.debug("KafkaDelayConsumerManager:check consumer threads success");
}
} catch (Exception e) {
log.error("KafkaDelayConsumerDamonThread error in check", e);
}
}
}
}
KafkaDelayConsumerManager
主要负责初始化kafka消费线程和监控线程。在创建消费线程的kafkaConsumer时会判断自动提交的配置和当前消费类型是否匹配,例如手动提交则要配置Enable.auto.commit为false,不匹配则直接报错,启动失败。KafkaDelayConsumerManager实现了BeanPostProcessor接口,在spring 容器初始化bean时来收集配置了KafkaDelayListener的bean以及方法。从而完成KafkaDelayConsumerContainer的初始化。这里并没有直接启动线程进行消费主要是由于当前阶段所有的bean都还是不可用的,直接启动消费线程消费可能会出现意想不到的问题。所以这个类同时实现了SmartInitializingSingleton接口,在postProcessAfterInitialization方法中初始化相关消费者线程与守护线程。
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.actuate.health.Status;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.util.CollectionUtils;
import java.lang.reflect.Method;
import java.util.*;
/**
* @author dewey
*/
@Slf4j
public class KafkaDelayConsumerManager implements BeanPostProcessor,SmartInitializingSingleton {
/**
* 延迟消费实例集合
*/
private List<KafkaDelayConsumerContainer> consumerContainers = new ArrayList<>();
private KafkaDelayConsumerDamonThread damonThread;
@Autowired
private KafkaDelayConfig kafkaDelayConfig;
/**
* 健康检查
* @return
*/
public Status health(){
Thread.State state = damonThread.getState();
if(state == Thread.State.TERMINATED){
return Status.DOWN;
}
if(damonThread.getConsumersStatus().equals(KafkaDelayConsumerDamonThread.STATUS_UP)){
return Status.UP;
}
return Status.OUT_OF_SERVICE;
}
@Override
public void afterSingletonsInstantiated() {
//init consumer threads
if(!CollectionUtils.isEmpty(consumerContainers)) {
Map<String,KafkaDelayConsumerThread> consumerThreadsMap = new HashMap<>();
for (KafkaDelayConsumerContainer consumerContainer : consumerContainers) {
KafkaConsumer<String, String> consumer = createConsumer(consumerContainer);
consumerContainer.setConsumer(consumer);
String topic = consumerContainer.getKafkaDelayListener().topic();
KafkaDelayConsumerThread consumerThread = new KafkaDelayConsumerThread(consumerContainer);
consumerThread.start();
consumerThreadsMap.put(topic,consumerThread);
log.info("KafkaDelayConsumerManager:start consume kafka topic {}", topic);
}
if (!CollectionUtils.isEmpty(consumerContainers)) {
KafkaDelayConsumerDamonThread damon = new KafkaDelayConsumerDamonThread(consumerThreadsMap);
damon.setDaemon(true);
damon.start();
log.info("KafkaDelayConsumerManager:start kafka delay daemon");
damonThread = damon;
}
}
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName){
Class<?> targetClass = AopUtils.getTargetClass(bean);
Map<Method, KafkaDelayListener> temp = MethodIntrospector.selectMethods(targetClass,
new MethodIntrospector.MetadataLookup<KafkaDelayListener>() {
@Override
public KafkaDelayListener inspect(Method method) {
KafkaDelayListener ann = AnnotationUtils.findAnnotation(method, KafkaDelayListener.class);
return ann;
}
});
if(!CollectionUtils.isEmpty(temp)){
for (Map.Entry<Method,KafkaDelayListener> entry:temp.entrySet()){
consumerContainers.add(new KafkaDelayConsumerContainer(bean,entry.getKey(),entry.getValue()));
}
}
return bean;
}
/**
* @param consumerContainer
* @return
*/
private KafkaConsumer<String,String> createConsumer(KafkaDelayConsumerContainer consumerContainer){
Map<String,Object> map = kafkaDelayConfig.getConsumerProperties();
Boolean autoCommit = (Boolean) map.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
if(consumerContainer.getListenerType().equals(KafkaDelayListenerType.SIMPLE)){
if(autoCommit != null && !autoCommit){
throw new RuntimeException(String.format("KafkaDelayConsumerManager:should manual submission in method %s",
consumerContainer.getMethod().getName()));
}
}else if(autoCommit == null || autoCommit){
throw new RuntimeException("KafkaDelayConsumerManager:autocommit should be false ");
}
return new KafkaConsumer<String, String>(map);
}
}
KafkaDelayHealthIndicator
基于spring-actuator拓展的健康检查指标,KafkaDelayHealthIndicator通过kafkaDelayConsumerManager的health方法来获取延迟消费的状态。这样只要应用配置了/actuator/health的健康检查都会检查我们延迟消费的状态。在状态不对时可以报警或者直接重启应用。如果业务中没有使用spring-actuator则可考虑自己暴露相关接口来进行健康检查。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.actuate.health.Status;
/**
* @author dewey
*/
public class KafkaDelayHealthIndicator implements HealthIndicator {
@Autowired
private KafkaDelayConsumerManager kafkaDelayConsumerManager;
@Override
public Health health() {
Status status = kafkaDelayConsumerManager.health();
if(Status.UP == status ){
return Health.up().build();
}else if(Status.OUT_OF_SERVICE == status){
return Health.outOfService().build();
}else {
return Health.down().build();
}
}
}
总结与展望
由于该功能是迭代产物,所以不是很细致,这里想到的还有几个拓展点,欢迎感兴趣的朋友补充交流。
- kafkadelaylistener提供auto-pauseTime的配置,消费者pause时间根据队首消息过期时间来动态变化,避免消息触发得不及时。
- 消费者线程自身状态的准确暴露,消费者线程在自身拉取消息失败、消费位移提交失败等情况时,根据既定标准(自定义消费者可用的标准)来设置当前状态。
- 守护线程能在消费者线程意外退出时自动重启对应线程。
- ··········
相关源码:https://github.com/Dewey-Ding/spring-learn
更多推荐
所有评论(0)