你一定遇到过这种情况,接收到消息时并不符合马上处理的条件(例如频率限制),但是又不能丢掉,于是先存起来,过一阵子再来处理。系统应该怎么设计呢?可能你会想到数据库,用一个字段来标记执行的状态,或者设置一个等待的时间戳,不管是哪种都需要反复地从数据库存取,还要考虑出异常情况状态的维护。

    作为一款优秀的消息处理服务,kafka 具有完善的事务管理,状态管理和灾难恢复功能。只要我们稍加变通一下,kafka 也能作为延迟消息处理的解决方案,而且实现上比用数据库简单得多。

    以下代码均在 spring-boot 2.0.5 和 spring-kafka 2.1.10 中测试通过。建议事先阅读文档 https://docs.spring.io/spring-kafka/docs/2.5.4.RELEASE/reference/html/#receiving-messages 以便能很好地理解以下内容。

设计思路

    设计 2 个队列(topic),一个收到消息马上执行,另一个用来接收需延迟处理的消息。话句话说,接收延迟消息的队列直到消息可执行之前一直在 block 状态,所以有局限性,定时不能非常精确,并且任务执行次序与加进来的次序是一致的。
 

spring-boot 的配置

application.yml
————————————————————

spring:
  ## kafka
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      group-id: myGroup
      auto-offset-reset: earliest
      enable-auto-commit: false
      properties:
        max:
          poll:
            interval:
              # 设置时间必须比延迟处理的时间大,不然会报错
              ms: 1200000
    listener:
      # 把提交模式改为手动
      ack-mode: MANUAL
 
 
kafka 默认的消费模式是自动提交,意思是,当 MessageListener 收到消息,执行处理方法后自动提交已完成状态,该消息就从队列里移除了。配置 ack-mode: MANUAL 改为手动提交后,我们就可以根据需要保留数据在消息队列,以便以后再处理。
max.poll.interval.ms 设小了可能会收到下面的错误:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
 
请务必设置一个比等待执行时间更长的时间。
 

发送消息

@Autowired
private KafkaTemplate kafkaTemplate;

public void myAction(){
    // 定义 data
    // 任务推送到 Kafka
    kafkaTemplate.send(“myJob", data.toString());
}

该部分没有特别的地方,跟普通的消息消息发送一样。

接收消息

定义两个 topic:myJob 和 myJob-delay
@SpringBootApplication
@ServletComponentScan
public class Application {
    
    @KafkaListener(topics = “myJob”)
    @SendTo(“myJob-delay")
    public String onMessage(ConsumerRecord<?, ?> cr, Acknowledgment ack) {
        String json = (String) cr.value();
        JSONObject data = JSON.parseObject(json);

        if (/* 需要延迟处理 */){
            // 提交
            ack.acknowledge();
            // 发送到 @SendTo
           data.put("until", System.currentTimeMillis() + msToDelay);
           return data.toString();
        }

        // 正常处理
        // do real work

        // 提交
        ack.acknowledge();
        return null;
    }

    @KafkaListener(topics = “myJob-delay")
    @SendTo(“myJob")
    public String delayMessage(ConsumerRecord<?, ?> cr, Acknowledgment ack){
        
        String json = (String) cr.value();
        JSONObject data = JSON.parseObject(json);
        Long until = data.getLong("until");
        // 阻塞直到 until
        while (System.currentTimeMillis() < until){
           Thread.sleep( Math.max(0, until - System.currentTimeMillis()) );
        }

        // 提交
        ack.acknowledge();
        // 转移到 @SendTo
        return json;

    }
}

代码很简单,不用解释也能看明白。稍微提一下几个重要的地方。

@KafkaListener 的方法参数里有 Acknowledgment ack,这是AckMode.MANUAL 模式下必须要添加的参数。

ack.acknowledge() 用来标记一条消息已经消费完成,即将从消息队列里移除。执行之前消息会一直保留在队列中,即时宕机重启后也能恢复。

@SendTo 用来在队列(topic)间转移消息,只要 return 非 null 的数据。以上代码中,当需要延迟处理时,消息从 myJob 转移到 myJob-delay;而当条件满足时,消息又从 myJob-delay 转移到了 myJob。

自从 spring-kafka 2.2.4 版本之后,可以在方法上定义 max.poll.interval.ms ,更加灵活了。例如

@KafkaListener(topics = "myTopic", groupId = "group", properties = { 
    "max.poll.interval.ms:60000”, 
    ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100”}
)

    以上是延迟消息处理的简单实现,适合延时要求不那么高的场合。朋友们想一下,假如延时比较复杂,执行的次序也不一定跟消息到达的次序一致,系统又该怎样设计呢?

假如这篇文章对你有所帮助, 请关注我公众号, 发现更多有用的文章

 

 
 
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐