基于kafka的延时队列实现

延时队列简介

延时队列顾名思义就是一个可以延时(消费)的队列。生产者和消费者约定消息延迟的时间,投递到延时队列的消息则会在约定的时间被消费。那么为什么要这样使用消息队列呢?我们来看一下如下的几个业务场景:

  1. 用户创建订单、锁定库存,30分钟内未支付则取消订单,释放库存。
  2. 用户提交作业后一天之内未批改,通知老师进行批改。
  3. 直播结束30分钟后开始直播效果统计计算。
  4. ········

以上几个业务场景如果使用定时轮询的方式执行,那肯定会存在较大误差并且增加了大量无效数据扫描,效率低。为了高效精确制导就需要我们的延时队列了。

相关延时队列的实现

这里简单介绍下几个常用实现。

Java DelayQueue

Java util包提供的延迟队列,是一个优先级阻塞队列。业务线程通过take方法获取可用消息,如果队列无消息则无限期阻塞当前线程,直到其他线程往队列中添加元素时,唤醒当前阻塞线程。take时当队列中有消息且已到过期时间时则直接返回对应消息,如果消息未到过期时间则需要判断是否有leader线程已经阻塞,如果存在则自己无限期休眠。没有leader则阻塞自己到队首消息的过期时间。

该实现将消息放入内存,触发基本无延迟,但是需要进行持久化与分布式环境应用的相关拓展。

rabbitmq

rabbitmq的延迟队列实现是通过消息过期时间是死信队列路由实现,rabbitmq的消息以及队列都可以设置,如果同时设置则是会取小的那个。死信路由则可以对应多个队列,当一个消息满足以下三个条件时就可以被死信路由投递:

  1. 消息过期。
  2. 消息被拒收且不重新入队。
  3. 消息队列被塞满,前面的消息会被死信路由处理。

所以用rabbitmq来实现死信队列需要使用两个队列。

redis 过期回调

redis 内数据可以设置过期时间,业务上则可以接受消息过期后的回调。但是由于redis的过期策略是惰性的,所以消息的通知也会不及时。只有在真正删除时才会通知到客户端。由于业务上也有使用过redis消息,经常出现个别客户端收不到消息的情况,还没深究到底是使用方式问题还是redis pub、sub稳定性问题。

时间轮

时间轮是一种环形的数据格式,相当于时钟一样,只是时间间隔可能不是一秒一个格。每个格上放着延时任务的队列。当时间走到特定的格子后遍历链表上的任务是否到可执行时间点。这里就不再详述了,相关实现可以自行百度。

基于kafka的简单延时队列消费实现

为什么要基于kafka来做延时队列

首先我们的业务场景是一个非常密集的用户打点行为,例如每个用户五秒一个请求,统计后转消息队列,两分钟后消费。基于这个场景我们选用kafka作为延时队列有过几点考虑。首先当前技术栈中使用的消息中间件有rabbitmq和kafka,并且有成熟的运维。限于业务进度原因、并且不是很有必要引入新的技术栈来增加运维难度。所以自然而然地就二选一了。而当前这个场景每个用户两分钟会堆积24个消息,2w用户同时学就是堆积48w消息,如果有集中的学习,例如统一考试、竞赛等。对于rabbitmq是个不小的挑战。因此选择了更能堆的kafka。

实现
使用实例

使用kafka延迟消费功能非常便捷,开箱即用。只需要引入对应jar包然后在启动类上加上@EnableKafkaDelay注解。最后在具体的消费逻辑方法上加上注解@KafkaDelayListener即可体验延迟消费的快乐。就像@kafkaListener一样方便高效。对于延迟消息的消费方法,提供了两个参数,分别是ConsumerRecord与KafkaDelayCommitter,KafkaDelayCommitter用于业务需要手动提交的场景,注意需要配合consumer的autocommit=false来使用。
配置示例

kafka:
  delay:
    bootstrap-servers: 10.177.47.5:9092
    consumer:
      groupId: testDelay
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

使用示例

//pom配置
   <dependency>
            <groupId>com.netease.edu.tiku</groupId>
            <artifactId>tiku-common-kafka</artifactId>
            <version>1.0-SNAPSHOT</version>
    </dependency>

//启动类配置
@SpringBootApplication
@EnableKafkaDelay
public class TestApplication {

    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }

}
//示例方法1
@KafkaDelayListener(topic = "test-delay",delayTimeSec = 2 )
    public void onMessage(ConsumerRecord<String,String> record, KafkaDelayCommitter committer){
        System.out.println(record.value());
        committer.commitSync();
    }
 //示例方法2 
@KafkaDelayListener(topic = "test")
    public void onMessage1(ConsumerRecord<String,String> record){
        System.out.println(record.value());
    }
具体实现

文件结构如下,一共是是十一个文件,具体的实现参考了spring autoconfig的大致逻辑以及spring data kafka的相关实现。但是复杂度与完善度肯定还是不如的。相信通过下文你也能对spring的自动配置逻辑有一定的理解。
在这里插入图片描述

EnableKafkaDelay

@EnableKafkaDelay注解用于加载kafkaDelay相关组件。通过import来加载KafkaDelayConsumerBootstrap到spring 容器中,注意这里的import是只能加在类上的,所以整个注解限制的Target是ElementType.TYPE。

import org.springframework.context.annotation.Import;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * @author dewey
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(KafkaDelayConsumerBootstrap.class)
public @interface EnableKafkaDelay {
}
KafkaDelayListener

KafkaDelayListener注解用于启动消费者线程来延迟消费kafka消息,方法内只需要关心消息的处理逻辑即可,如果关闭了自动提交则还需要在方法中控制消费位移的提交。该注解提供了三个可配置参数,topic,delayTimeSec和pauseTimeSec。topic表示消费者需要订阅队列。delayTimeSec则是从消息产生到消费之间需要等待的时间,即延时消息的延迟时间。这里可以看到延迟时间是配置到消费者端的,所以该延迟消息投递后消费者是一直可见的,这与rabbitmq的实现有着很大的不同。pauseTimeSec则是kafka消费者发现当前无延迟消息到期时主动paus的时间。(tips:kafkaConsumer pause后只是不拉取消息,心跳线程不会受到影响,不会引发分区重分区)


import org.springframework.context.annotation.Import;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * @author dewey
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface KafkaDelayListener {
    String topic();

    /**
     * 延迟消费的时间,单位秒
     * @return
     */
    int delayTimeSec() default 120;

    /**
     * 消费者pause时间,单位秒
     * @return
     */
    int pauseTimeSec() default 2;
}

KafkaDelayConsumerBootstrap

负责启动kafka延迟消费的managerbean以及KafkaDelay的健康检查bean。这里需要注意的是两个bean的启动顺序。这里KafkaDelayHealthIndicator依赖了kafkaDelayConsumerManager来获取kafkadelayconsumer的整体状态,所以需要在kafkaDelayConsumerManager之后启动。

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;

/**
 * @author dewey
 */
@Configuration
@Slf4j
@EnableConfigurationProperties(KafkaDelayConfig.class)
public class KafkaDelayConsumerBootstrap {

    @Bean("kafkaDelayConsumerManager")
    public KafkaDelayConsumerManager kafkaDelayConsumerManager(){
        log.info("=====init kafkaDelayConsumerManager");
        return new KafkaDelayConsumerManager();
    }

    @Bean("kafkaDelayHealthIndicator")
    @DependsOn("kafkaDelayConsumerManager")
    public KafkaDelayHealthIndicator kafkaDelayHealthIndicator(){
        log.info("=====init KafkaDelayHealthIndicator");
        return new KafkaDelayHealthIndicator();
    }
}
KafkaDelayConfig

kafka延迟消费者相关配置,注意仅用于消费者,毕竟我们是基于消费者来实现。KafkaDelayConfig引入了spring kafka的配置类,这样如果项目中引用了spring-data-kafka则会默认使用spring-kafka配置。避免了重复配置,只需关注延迟消费的topic以及延迟时间等相关配置即可。但是如果同时配置了kafka-delay相关的配置则在delayConsumer中会覆盖spring-data-kafka的配置。

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.unit.DataSize;

import java.time.Duration;
import java.util.*;

/**
 * 自定义kafka配置
 * 仅用作consumer
 * @author dewey
 */
@ConfigurationProperties(prefix = "kafka.delay")
@Configuration
public class KafkaDelayConfig {

    @Autowired(required = false)
    private KafkaProperties kafkaProperties;

    private Consumer consumer = new Consumer();

    private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));

    private String clientId;



    /**
     * 获取消费端配置
     */
    public Map<String,Object> getConsumerProperties(){
        Map<String,Object> properties = this.getSpringKafkaConfig();
        properties.putAll(this.buildCommonProperties());
        properties.putAll(this.consumer.buildProperties());
        return properties;
    }

    /**
     * 集成spring-kafka工程的先获取默认配置
     * @return
     */
    public Map<String, Object> getSpringKafkaConfig(){
        Map<String,Object> properties = new HashMap<>();
        if(kafkaProperties != null){
            return kafkaProperties.buildConsumerProperties();
        } else{
            return properties;
        }
    }

    private Map<String, Object> buildCommonProperties() {
        Map<String, Object> properties = new HashMap<>();
        if (this.bootstrapServers != null) {
            properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
        }
        if (this.clientId != null) {
            properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
        }
        return properties;
    }

    public static class Consumer {

        /**
         * Frequency with which the consumer offsets are auto-committed to Kafka if
         * 'enable.auto.commit' is set to true.
         */
        private Duration autoCommitInterval;

        /**
         * What to do when there is no initial offset in Kafka or if the current offset no
         * longer exists on the server.
         */
        private String autoOffsetReset;

        /**
         * Comma-delimited list of host:port pairs to use for establishing the initial
         * connections to the Kafka cluster. Overrides the global property, for consumers.
         */
        private List<String> bootstrapServers;

        /**
         * ID to pass to the server when making requests. Used for server-side logging.
         */
        private String clientId;

        /**
         * Whether the consumer's offset is periodically committed in the background.
         */
        private Boolean enableAutoCommit;

        /**
         * Maximum amount of time the server blocks before answering the fetch request if
         * there isn't sufficient data to immediately satisfy the requirement given by
         * "fetch-min-size".
         */
        private Duration fetchMaxWait;

        /**
         * Minimum amount of data the server should return for a fetch request.
         */
        private DataSize fetchMinSize;

        /**
         * Unique string that identifies the consumer group to which this consumer
         * belongs.
         */
        private String groupId;

        /**
         * Expected time between heartbeats to the consumer coordinator.
         */
        private Duration heartbeatInterval;

        /**
         * Isolation level for reading messages that have been written transactionally.
         */
        private IsolationLevel isolationLevel = IsolationLevel.READ_UNCOMMITTED;

        /**
         * Deserializer class for keys.
         */
        private Class<?> keyDeserializer = StringDeserializer.class;

        /**
         * Deserializer class for values.
         */
        private Class<?> valueDeserializer = StringDeserializer.class;

        /**
         * Maximum number of records returned in a single call to poll().
         */
        private Integer maxPollRecords;

        /**
         * Additional consumer-specific properties used to configure the client.
         */
        private final Map<String, String> properties = new HashMap<>();

        public Duration getAutoCommitInterval() {
            return this.autoCommitInterval;
        }

        public void setAutoCommitInterval(Duration autoCommitInterval) {
            this.autoCommitInterval = autoCommitInterval;
        }

        public String getAutoOffsetReset() {
            return this.autoOffsetReset;
        }

        public void setAutoOffsetReset(String autoOffsetReset) {
            this.autoOffsetReset = autoOffsetReset;
        }

        public List<String> getBootstrapServers() {
            return this.bootstrapServers;
        }

        public void setBootstrapServers(List<String> bootstrapServers) {
            this.bootstrapServers = bootstrapServers;
        }

        public String getClientId() {
            return this.clientId;
        }

        public void setClientId(String clientId) {
            this.clientId = clientId;
        }

        public Boolean getEnableAutoCommit() {
            return this.enableAutoCommit;
        }

        public void setEnableAutoCommit(Boolean enableAutoCommit) {
            this.enableAutoCommit = enableAutoCommit;
        }

        public Duration getFetchMaxWait() {
            return this.fetchMaxWait;
        }

        public void setFetchMaxWait(Duration fetchMaxWait) {
            this.fetchMaxWait = fetchMaxWait;
        }

        public DataSize getFetchMinSize() {
            return this.fetchMinSize;
        }

        public void setFetchMinSize(DataSize fetchMinSize) {
            this.fetchMinSize = fetchMinSize;
        }

        public String getGroupId() {
            return this.groupId;
        }

        public void setGroupId(String groupId) {
            this.groupId = groupId;
        }

        public Duration getHeartbeatInterval() {
            return this.heartbeatInterval;
        }

        public void setHeartbeatInterval(Duration heartbeatInterval) {
            this.heartbeatInterval = heartbeatInterval;
        }

        public IsolationLevel getIsolationLevel() {
            return this.isolationLevel;
        }

        public void setIsolationLevel(IsolationLevel isolationLevel) {
            this.isolationLevel = isolationLevel;
        }

        public Class<?> getKeyDeserializer() {
            return this.keyDeserializer;
        }

        public void setKeyDeserializer(Class<?> keyDeserializer) {
            this.keyDeserializer = keyDeserializer;
        }

        public Class<?> getValueDeserializer() {
            return this.valueDeserializer;
        }

        public void setValueDeserializer(Class<?> valueDeserializer) {
            this.valueDeserializer = valueDeserializer;
        }

        public Integer getMaxPollRecords() {
            return this.maxPollRecords;
        }

        public void setMaxPollRecords(Integer maxPollRecords) {
            this.maxPollRecords = maxPollRecords;
        }

        public Map<String, String> getProperties() {
            return this.properties;
        }

        public Map<String, Object> buildProperties() {
            Map<String, Object> properties = new HashMap<>();
            PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
            map.from(this::getAutoCommitInterval).asInt(Duration::toMillis)
                    .to(v->properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,v));
            map.from(this::getAutoOffsetReset).to(v->properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,v));
            map.from(this::getBootstrapServers).to(v->properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,v));
            map.from(this::getClientId).to(v->properties.put(ConsumerConfig.CLIENT_ID_CONFIG,v));
            map.from(this::getEnableAutoCommit).to(v->properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,v));
            map.from(this::getFetchMaxWait).asInt(Duration::toMillis)
                    .to(v->properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,v));
            map.from(this::getFetchMinSize).asInt(DataSize::toBytes)
                    .to(v->properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,v));
            map.from(this::getGroupId).to(v->properties.put(ConsumerConfig.GROUP_ID_CONFIG,v));
            map.from(this::getHeartbeatInterval).asInt(Duration::toMillis)
                    .to(v->properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,v));
            map.from(() -> getIsolationLevel().name().toLowerCase(Locale.ROOT))
                    .to(v->properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,v));
            map.from(this::getKeyDeserializer).to(v->properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,v));
            map.from(this::getValueDeserializer).to(v->properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,v));
            map.from(this::getMaxPollRecords).to(v->properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,v));
            return properties;
        }
    }

    public List<String> getBootstrapServers() {
        return bootstrapServers;
    }

    public void setBootstrapServers(List<String> bootstrapServers) {
        this.bootstrapServers = bootstrapServers;
    }

    public String getClientId() {
        return clientId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    public KafkaProperties getKafkaProperties() {
        return kafkaProperties;
    }

    public void setKafkaProperties(KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
    }

    public Consumer getConsumer() {
        return consumer;
    }

    public void setConsumer(Consumer consumer) {
        this.consumer = consumer;
    }
}

KafkaDelayListenerType

延迟消费者类型,用于区分是否需要手动提交消费位移。

/**
 * @author dewey
 */
public enum KafkaDelayListenerType {

    /**
     * 只接受消息模式,autocommit必须置为true
     */
    SIMPLE,

    /**
     * 业务控制commit,autocommit必须为false
     */
    ACKNOWLEDGING

}
KafkaDelayCommitter

kafka延迟消费者消息提交逻辑的封装,在延迟消费消息的方法中可以接受该参数来进行位移的手动提交。该接口的方法实现是直接调用的kafkaconsumer对应的位移提交方法。这样封装主要是为了避免直接暴露consumer来给使用方。

/**
 * @author dewey
 */
public interface KafkaDelayCommitter {
    /**
     * 封装kafka consumer的commitSync
     */
    void commitSync();

    /**
     * 封装kafka consumer的commitAsync
     */
    void commitAsync();
}

KafkaDelayConsumerContainer

kafka延迟消费着相关组件封装,包含使用注解的方法、方法对应的bean、注解详情、消费者、消费者提交等必要组件。同时还提供了一个KafkaDelayCommitter接口的默认实现。该类的构造方法中会根据带有@KafkaDelayListener的方法的参数来动态生成KafkaDelayListenerType,d参数类型则只能使用org.apache.kafka.clients.consumer.ConsumerRecord与com.netease.com.tiku.kafka.delay.KafkaDelayCommitter。如果是ACKNOWLEDGING类型则会初始化consumerCommitter。


import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.lang.reflect.Method;

/**
 * @author dewey
 */
public class KafkaDelayConsumerContainer {

    /**
     * 消费逻辑方法所属bean
     */
    private Object bean;

    /**
     * 业务消费方法
     */
    private Method method;

    /**
     * 注解详情
     */
    private KafkaDelayListener kafkaDelayListener;

    /**
     * 监听方式,手动commit与自动commit
     */
    private KafkaDelayListenerType listenerType;

    /**
     * 消费者
     */
    private KafkaConsumer<String,String> consumer;

    /**
     * 提交位移逻辑
     */
    private ConsumerCommitter consumerCommitter;

    private final String paramTypeRecord = "org.apache.kafka.clients.consumer.ConsumerRecord";
    private final String paramTypeCommitter = "com.netease.com.tiku.kafka.delay.KafkaDelayCommitter";

    public KafkaDelayConsumerContainer(Object bean, Method method, KafkaDelayListener kafkaDelayListener) {
        this.bean = bean;
        this.method = method;
        this.kafkaDelayListener = kafkaDelayListener;

        Class<?>[] parameterTypes = method.getParameterTypes();
        if(parameterTypes.length <= 0 || parameterTypes.length >= 3){
            throw new RuntimeException(String.format("KafkaDelayConsumerContainer:Method[%s] parameter number error", method.getName()));
        }
        if(!paramTypeRecord.equals(parameterTypes[0].getName())){
            throw new RuntimeException(String.format("KafkaDelayConsumerContainer:Method[%s] parameter type error,need %s but find %s",
                    method.getName(),paramTypeRecord,parameterTypes[0].getName()));
        }
        if(parameterTypes.length > 1 && !paramTypeCommitter.equals(parameterTypes[1].getName())){
            throw new RuntimeException(String.format("KafkaDelayConsumerContainer:Method[%s] parameter type error,need %s but find %s",
                    method.getName(),paramTypeCommitter,parameterTypes[1].getName()));
        }

        if(parameterTypes.length >= 2){
            this.consumerCommitter = new ConsumerCommitter();
            listenerType = KafkaDelayListenerType.ACKNOWLEDGING;
        }else{
            listenerType = KafkaDelayListenerType.SIMPLE;
        }

    }

    /**
     * 封装提交逻辑
     */
    private final class ConsumerCommitter implements KafkaDelayCommitter {

        @Override
        public void commitSync() {
            processCommitSync();
        }

        @Override
        public void commitAsync() {
            processCommitAsync();
        }
    }

    private void processCommitSync(){
        consumer.commitSync();
    }

    private void processCommitAsync(){
        consumer.commitAsync();
    }

    public KafkaDelayListener getKafkaDelayListener() {
        return kafkaDelayListener;
    }

    public void setKafkaDelayListener(KafkaDelayListener kafkaDelayListener) {
        this.kafkaDelayListener = kafkaDelayListener;
    }

    public Object getBean() {
        return bean;
    }

    public void setBean(Object bean) {
        this.bean = bean;
    }

    public Method getMethod() {
        return method;
    }

    public void setMethod(Method method) {
        this.method = method;
    }

    public KafkaDelayListenerType getListenerType() {
        return listenerType;
    }

    public void setListenerType(KafkaDelayListenerType listenerType) {
        this.listenerType = listenerType;
    }

    public KafkaConsumer<String, String> getConsumer() {
        return consumer;
    }

    public void setConsumer(KafkaConsumer<String, String> consumer) {
        this.consumer = consumer;
    }

    public ConsumerCommitter getConsumerCommitter() {
        return consumerCommitter;
    }

    public void setConsumerCommitter(ConsumerCommitter consumerCommitter) {
        this.consumerCommitter = consumerCommitter;
    }
}
  

KafkaDelayConsumerThread

kafka延迟消息的消费者线程,kafka延迟消费的逻辑实现。初始化时会根据消费者类型来动态的初始化consumerCommiter。线程启动时订阅指定topic,然后开始消费消息,当kafka队列中无可消费的延迟消息时,消费者进入pause状态,不再拉取指定队列的消息。等到pauseTime之后再回复消费。消息的延迟时间以及pause时间则是从注解参数中获取。由于pause时间没有根据对首消息的到期时间来动态设定,所以消息的实际消费时间与预设时间会存在一个pauseTime时间内的误差。由于对于当前业务不敏感,也就无所谓了。

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Collections;

/**
 * @author dewey
 */
@Slf4j
public class KafkaDelayConsumerThread extends Thread {

    private KafkaConsumer<String, String> consumer;

    private Method method;

    private Object bean;

    private String topic;

    private KafkaDelayCommitter committer;

    private long pauseTime;

    private long delayTime;

    private boolean autoCommit;

    /**
     * 用户Damon判断消费者是否正常
     * todo 后面优化kafkaconsumer的状态判断
     */
    private String consumerStatus;

    private static String CONSUMER_STATUS_RUNNING = "normal";

    private static String CONSUMER_STATUS_PAUSE = "pause";

    public KafkaDelayConsumerThread(KafkaDelayConsumerContainer consumerContainer) {
        KafkaDelayListener kafkaDelayListener = consumerContainer.getKafkaDelayListener();

        this.consumer = consumerContainer.getConsumer();
        this.method = consumerContainer.getMethod();
        this.bean = consumerContainer.getBean();
        this.topic = kafkaDelayListener.topic();

        if (consumerContainer.getListenerType().equals(KafkaDelayListenerType.ACKNOWLEDGING)) {
            this.autoCommit = false;
            this.committer = consumerContainer.getConsumerCommitter();
        } else {
            autoCommit = true;
        }
        //转换为毫秒
        this.pauseTime = kafkaDelayListener.pauseTimeSec() * 1000;
        this.delayTime = kafkaDelayListener.delayTimeSec() * 1000;
    }


    @Override
    public void run() {
        //订阅
        consumer.subscribe(Collections.singleton(topic));
        boolean paused = false;
        while (true) {
            if (paused) {
                try {
                    Thread.sleep(pauseTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (log.isDebugEnabled()) {
                    log.debug("KafkaDelayConsumerManager:topic{},消费重启", topic);
                }
                this.consumerStatus = "pause";
                consumer.resume(consumer.paused());
                paused = false;
            } else {
                this.consumerStatus = "run";
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
                for (ConsumerRecord<String, String> consumerRecord : records) {
                    TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
                    if (consumerRecord.timestamp() > System.currentTimeMillis() - delayTime) {
                        if (log.isDebugEnabled()) {
                            log.debug("KafkaDelayConsumerManager:topic{},消费暂停", topic);
                        }
                        paused = true;

                        consumer.pause(Collections.singleton(topicPartition));
                        consumer.seek(topicPartition, consumerRecord.offset());
                        break;
                    } else {
                        try {
                            if (autoCommit) {
                                method.invoke(bean, consumerRecord);
                            } else {
                                method.invoke(bean, consumerRecord, committer);
                            }
                        } catch (Exception e) {
                            log.error("KafkaDelayConsumerManager:consume fail,topic:{},message{}", topic, consumerRecord.value());
                        }
                    }
                }
            }
        }
    }


    public String getConsumerStatus() {
        return consumerStatus;
    }
}
KafkaDelayConsumerDamonThread

kafka延迟消费线程的守护线程,用于监控消费线程状态,在消费者状态不对时负责重启对应消费线程(待实现)。同时向外暴露健康检查方法,用户外接获取当前延迟消费逻辑的状态。


import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.Map;

/**
 * @author dewey
 */
@Slf4j
public class KafkaDelayConsumerDamonThread extends Thread {

    private Map<String,KafkaDelayConsumerThread> consumerThreadsMap = new HashMap<>();

    private String consumerDamonStatus ;

    /**
     * 服务正常
     */
    public static final String STATUS_UP = "UP";

    /**
     * 部分消费线程不可用
     */
    public static final String STATUS_OUT_OF_SERVICE = "OUT_OF_SERVICE";


    public KafkaDelayConsumerDamonThread(Map<String,KafkaDelayConsumerThread> consumerThreadsMap) {
        this.consumerThreadsMap = consumerThreadsMap;
        this.consumerDamonStatus = STATUS_UP;
    }

    public String getConsumersStatus() {
        return consumerDamonStatus;
    }

    @Override
    public void run() {
        boolean up = true;

        while (true) {
            try {
                //check interval
                Thread.sleep(10000);

                if (log.isDebugEnabled()) {
                    log.debug("KafkaDelayConsumerManager:check consumer threads status");
                }

                for(Map.Entry<String,KafkaDelayConsumerThread> entry : consumerThreadsMap.entrySet()){
                    String topic = entry.getKey();
                    KafkaDelayConsumerThread consumerThread = entry.getValue();
                    if(consumerThread == null){
                        up = false;
                        log.error("KafkaDelayConsumerDamonThread:topic{},consumer thread exit",topic);
                        break;
                    }
                    if(consumerThread.getState() == State.TERMINATED){
                        up = false;
                        log.error("KafkaDelayConsumerDamonThread:topic{},consumer thread terminated",topic);
                        //consumerThread.start();
                    }
                }

                if(!up){
                    this.consumerDamonStatus = STATUS_OUT_OF_SERVICE;
                }

                if (log.isDebugEnabled()) {
                    log.debug("KafkaDelayConsumerManager:check consumer threads success");
                }

            } catch (Exception e) {
                log.error("KafkaDelayConsumerDamonThread error in check", e);
            }
        }
    }
}

KafkaDelayConsumerManager

主要负责初始化kafka消费线程和监控线程。在创建消费线程的kafkaConsumer时会判断自动提交的配置和当前消费类型是否匹配,例如手动提交则要配置Enable.auto.commit为false,不匹配则直接报错,启动失败。KafkaDelayConsumerManager实现了BeanPostProcessor接口,在spring 容器初始化bean时来收集配置了KafkaDelayListener的bean以及方法。从而完成KafkaDelayConsumerContainer的初始化。这里并没有直接启动线程进行消费主要是由于当前阶段所有的bean都还是不可用的,直接启动消费线程消费可能会出现意想不到的问题。所以这个类同时实现了SmartInitializingSingleton接口,在postProcessAfterInitialization方法中初始化相关消费者线程与守护线程。

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.actuate.health.Status;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.util.CollectionUtils;

import java.lang.reflect.Method;
import java.util.*;

/**
 * @author dewey
 */
@Slf4j
public class KafkaDelayConsumerManager implements BeanPostProcessor,SmartInitializingSingleton {
    /**
     * 延迟消费实例集合
     */
    private List<KafkaDelayConsumerContainer> consumerContainers = new ArrayList<>();

    private KafkaDelayConsumerDamonThread damonThread;

    @Autowired
    private KafkaDelayConfig kafkaDelayConfig;

    /**
     * 健康检查
     * @return
     */
    public Status health(){
        Thread.State state = damonThread.getState();
        if(state == Thread.State.TERMINATED){
            return Status.DOWN;
        }
        if(damonThread.getConsumersStatus().equals(KafkaDelayConsumerDamonThread.STATUS_UP)){
            return Status.UP;
        }
        return Status.OUT_OF_SERVICE;
    }

    @Override
    public void afterSingletonsInstantiated() {
        //init consumer threads
        if(!CollectionUtils.isEmpty(consumerContainers)) {
            Map<String,KafkaDelayConsumerThread> consumerThreadsMap = new HashMap<>();

            for (KafkaDelayConsumerContainer consumerContainer : consumerContainers) {
                KafkaConsumer<String, String> consumer = createConsumer(consumerContainer);
                consumerContainer.setConsumer(consumer);

                String topic = consumerContainer.getKafkaDelayListener().topic();
                KafkaDelayConsumerThread consumerThread = new KafkaDelayConsumerThread(consumerContainer);
                consumerThread.start();

                consumerThreadsMap.put(topic,consumerThread);
                log.info("KafkaDelayConsumerManager:start consume kafka topic {}", topic);
            }

            if (!CollectionUtils.isEmpty(consumerContainers)) {
                KafkaDelayConsumerDamonThread damon = new KafkaDelayConsumerDamonThread(consumerThreadsMap);
                damon.setDaemon(true);
                damon.start();
                log.info("KafkaDelayConsumerManager:start kafka delay daemon");

                damonThread = damon;
            }
        }
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName){
        Class<?> targetClass = AopUtils.getTargetClass(bean);
        Map<Method, KafkaDelayListener> temp = MethodIntrospector.selectMethods(targetClass,
                new MethodIntrospector.MetadataLookup<KafkaDelayListener>() {
                    @Override
                    public KafkaDelayListener inspect(Method method) {
                        KafkaDelayListener ann = AnnotationUtils.findAnnotation(method, KafkaDelayListener.class);
                        return ann;
                    }

                });
        if(!CollectionUtils.isEmpty(temp)){
            for (Map.Entry<Method,KafkaDelayListener> entry:temp.entrySet()){
                consumerContainers.add(new KafkaDelayConsumerContainer(bean,entry.getKey(),entry.getValue()));
            }

        }
        return bean;
    }

    /**
     * @param consumerContainer
     * @return
     */
    private KafkaConsumer<String,String> createConsumer(KafkaDelayConsumerContainer consumerContainer){
        Map<String,Object> map = kafkaDelayConfig.getConsumerProperties();
        Boolean autoCommit = (Boolean) map.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
        if(consumerContainer.getListenerType().equals(KafkaDelayListenerType.SIMPLE)){
            if(autoCommit != null && !autoCommit){
                throw new RuntimeException(String.format("KafkaDelayConsumerManager:should manual submission in method %s",
                        consumerContainer.getMethod().getName()));
            }
        }else if(autoCommit == null || autoCommit){
            throw new RuntimeException("KafkaDelayConsumerManager:autocommit should be false ");
        }
        return new KafkaConsumer<String, String>(map);
    }


}
KafkaDelayHealthIndicator

基于spring-actuator拓展的健康检查指标,KafkaDelayHealthIndicator通过kafkaDelayConsumerManager的health方法来获取延迟消费的状态。这样只要应用配置了/actuator/health的健康检查都会检查我们延迟消费的状态。在状态不对时可以报警或者直接重启应用。如果业务中没有使用spring-actuator则可考虑自己暴露相关接口来进行健康检查。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.actuate.health.Status;

/**
 * @author dewey
 */
public class KafkaDelayHealthIndicator implements HealthIndicator {

    @Autowired
    private KafkaDelayConsumerManager kafkaDelayConsumerManager;

    @Override
    public Health health() {
        Status status = kafkaDelayConsumerManager.health();
        if(Status.UP == status ){
            return Health.up().build();
        }else if(Status.OUT_OF_SERVICE == status){
            return Health.outOfService().build();
        }else {
            return Health.down().build();
        }

    }
}


总结与展望

由于该功能是迭代产物,所以不是很细致,这里想到的还有几个拓展点,欢迎感兴趣的朋友补充交流。

  1. kafkadelaylistener提供auto-pauseTime的配置,消费者pause时间根据队首消息过期时间来动态变化,避免消息触发得不及时。
  2. 消费者线程自身状态的准确暴露,消费者线程在自身拉取消息失败、消费位移提交失败等情况时,根据既定标准(自定义消费者可用的标准)来设置当前状态。
  3. 守护线程能在消费者线程意外退出时自动重启对应线程。
  4. ··········

相关源码:https://github.com/Dewey-Ding/spring-learn

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐