springboot kafka 实现延时队列_kafka延迟队列(1)
3.1 优点:①RabbitMQ的延迟队列是通过RabbitMQ的插件实现的,易于部署和使用。②RabbitMQ的延迟队列支持消息重试和消息顺序处理,可靠性较高。③支持消息持久化和分布式。④支持优先级队列和死信队列。⑤提供了丰富的插件和工具。3.2 缺点:①RabbitMQ的延迟队列性能较低,不适用于高吞吐量的场景。②性能较低,不适合高并发场景。③实现延迟队列需要额外的配置,但是配置就很简单了。3
3.1 优点:
①RabbitMQ的延迟队列是通过RabbitMQ的插件实现的,易于部署和使用。
②RabbitMQ的延迟队列支持消息重试和消息顺序处理,可靠性较高。
③支持消息持久化和分布式。
④支持优先级队列和死信队列。
⑤提供了丰富的插件和工具。
3.2 缺点:
①RabbitMQ的延迟队列性能较低,不适用于高吞吐量的场景。
②性能较低,不适合高并发场景。
③实现延迟队列需要额外的配置,但是配置就很简单了。
3.3应用场景:
适用于中小型的任务调度和消息通知,对可靠性要求高的场景。
4. RocketMQ
4.1 优点:
①RocketMQ的延迟队列是RocketMQ原生支持的,易于使用和部署。
②RocketMQ的延迟队列支持消息重试和消息顺序处理,可靠性较高。
③高性能和高吞吐量,支持分布式和消息持久化。
④RocketMQ使用简单,性能好,并且支持延迟队列功能。
4.2 缺点:
①RocketMQ的延迟队列不支持动态添加或删除队列。
②RocketMQ的延迟队列需要保证消息的顺序,可能会导致消息延迟。
③在节点崩溃后,RocketMQ有可能发生消息丢失。
4.3 应用场景:
①适用于大规模的数据处理,对性能和吞吐量要求较高的场景。
②适合于任务量较大、需要延迟消息和定时消息的场景。例如电商平台、社交软件等。
③适用于分布式任务调度和高可靠性消息通知场景。
四、Kafka延时队列背景
- 基于以上四种实现延时队列的分析来,选择对应的技术方案的基础上呢,不同公司的mq的基础设施不同,如果只有Kafka,也没必要引入RabbitMQ和RocketMq来实现,引入新的组件也会顺便带来新的问题。
- 网上搜Kafka实现延时队列有很多文章,很多文章说使用Kafka内部的时间轮,支持延时操作,但这是Kafka自己内部使用的,时间轮只是一个工具类,用户无法将其作为延迟队列来使用。
- Kafka延时队列的最佳实践,使用Kafka消费者的暂停和恢复机制来实现。
五、Kafka延时队列实现思路
- 解决一个问题前首先要明确问题,如何让Kafka有延时队列的功能呢?
- 就是在Kafka消费者消费的时候延时消费,不久搞定了嘛
- 那如何延时消费呢,网上有些文章使用Thread.sleep进行延时消费这是不靠谱的(亲身实践),sleep的时间超过了Kafka配置的max.poll.records时间,消费者无法及时提交offset,kafka就会认为这个消费者已经挂了,会进行rebalance也就是重新分配分区给消费者,以保证每个分区只被一个消费者消费
- 也有同学说了,为了不发生rebalance,那可以增加max.poll.records时间啊,但是这样的话,如果要sleep几天的时间,难道max.poll.records要写几天的时间嘛,有违Kafka的设计原理了,那怎么办呢?
- 这时候Kafka的pause暂停消费和resume恢复消费就登场了,pause暂停某个分区之后消费者不会再poll拉取该分区的消息,直到resume恢复该分区之后才会重新poll消息。
- 我已经做好了Kafka延时队列的封装,以后只需要一行代码就可以实现延时队列了,代码核心使用Kafka消费者的pause函数(暂停)和resume函数(恢复)+线程池+定时任务+事件监听机制+工厂模式
六、Kafka延时队列架构图
七、kafka延时任务代码实现
以下代码只列出了核心实现,完整代码已经给大家整理好了,可以关注【微信公众号】微信搜索【老板来一杯java】,然后【加群】直接获取源码,在自己项目中引入即用!
源码目录:
1. KafkaDelayQueue:Kafka延迟队列
定义一个Kafka延期队列,包含的内容:KafkaDelayQueue,其中有延迟队列配置,主题,消费组,延迟时间,目标主题,KafkaSyncConsumer,ApplicationContext,poll线程池,delay线程池等等
package com.wdyin.kafka.delay;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.ThreadPoolExecutor;
/**
* kafka延时队列
*
* @Author WDYin
* @Date 2022/7/2
**/
@Slf4j
@Getter
@Setter
class KafkaDelayQueue<K, V> {
private String topic;
private String group;
private Integer delayTime;
private String targetTopic;
private KafkaDelayConfig kafkaDelayConfig;
private KafkaSyncConsumer<K, V> kafkaSyncConsumer;
private ApplicationContext applicationContext;
private ThreadPoolTaskScheduler threadPoolPollTaskScheduler;
private ThreadPoolTaskScheduler threadPoolDelayTaskScheduler;
…
}
2. KafkaDelayQueueFactory:Kafka延迟队列工厂
Kafka延期队列的工厂,用于及其管理延迟队列
package com.wdyin.kafka.delay;
import lombok.Data;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.context.ApplicationContext;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import java.util.Properties;
/**
* 延时队列工厂
* @author WDYin
* @date 2023/4/17
**/
@Data
public class KafkaDelayQueueFactory {
private KafkaDelayConfig kafkaDelayConfig;
private Properties properties;
private ApplicationContext applicationContext;
private Integer concurrency;
public KafkaDelayQueueFactory(Properties properties, KafkaDelayConfig kafkaDelayConfig) {
Assert.notNull(properties, “properties cannot null”);
Assert.notNull(kafkaDelayConfig.getDelayThreadPool(), “delayThreadPool cannot null”);
Assert.notNull(kafkaDelayConfig.getPollThreadPool(), “pollThreadPool cannot null”);
Assert.notNull(kafkaDelayConfig.getPollInterval(), “pollInterval cannot null”);
Assert.notNull(kafkaDelayConfig.getPollTimeout(), “timeout cannot null”);
this.properties = properties;
this.kafkaDelayConfig = kafkaDelayConfig;
}
public void listener(String topic, String group, Integer delayTime, String targetTopic) {
if (StringUtils.isEmpty(topic)) {
throw new RuntimeException(“topic cannot empty”);
}
if (StringUtils.isEmpty(group)) {
throw new RuntimeException(“group cannot empty”);
}
if (StringUtils.isEmpty(delayTime)) {
throw new RuntimeException(“delayTime cannot empty”);
}
if (StringUtils.isEmpty(targetTopic)) {
throw new RuntimeException(“targetTopic cannot empty”);
}
KafkaSyncConsumer<String, String> kafkaSyncConsumer = createKafkaSyncConsumer(group);
KafkaDelayQueue<String, String> kafkaDelayQueue = createKafkaDelayQueue(topic, group, delayTime, targetTopic, kafkaSyncConsumer);
kafkaDelayQueue.send();
}
private KafkaDelayQueue<String, String> createKafkaDelayQueue(String topic, String group, Integer delayTime, String targetTopic, KafkaSyncConsumer<String, String> kafkaSyncConsumer) {
KafkaDelayQueue<String, String> kafkaDelayQueue = new KafkaDelayQueue<>(kafkaSyncConsumer, kafkaDelayConfig);
Assert.notNull(applicationContext, “kafkaDelayQueue need applicationContext”);
kafkaDelayQueue.setApplicationContext(applicationContext);
kafkaDelayQueue.setDelayTime(delayTime);
kafkaDelayQueue.setTopic(topic);
kafkaDelayQueue.setGroup(group);
kafkaDelayQueue.setTargetTopic(targetTopic);
return kafkaDelayQueue;
}
private KafkaSyncConsumer<String, String> createKafkaSyncConsumer(String group) {
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);
return new KafkaSyncConsumer<>(properties);
}
}
3. KafkaPollListener:Kafka延迟队列事件监听
package com.wdyin.kafka.delay;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.springframework.context.ApplicationListener;
import org.springframework.kafka.core.KafkaTemplate;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.*;
/**
* 延时队列监听
* @Author : WDYin
* @Date : 2021/5/7
* @Desc :
*/
@Slf4j
public class KafkaPollListener<K, V> implements ApplicationListener<KafkaPollEvent<K, V>> {
private KafkaTemplate kafkaTemplate;
public KafkaPollListener(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void onApplicationEvent(KafkaPollEvent<K, V> event) {
ConsumerRecords<K, V> records = (ConsumerRecords<K, V>) event.getSource();
Integer delayTime = event.getDelayTime();
KafkaDelayQueue<K, V> kafkaDelayQueue = event.getKafkaDelayQueue();
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。
深知大多数大数据工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!
由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新
如果你觉得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)
TGwQPZpo-1712534725363)]
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!
由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新
如果你觉得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)
[外链图片转存中…(img-hrf347Jd-1712534725364)]
更多推荐
所有评论(0)