SpringBoot分布式任务调度,可支持rabbitmq与kafka两种消息中间件的可回滚微服务实现。
分布式任务调度管理 Distribution task center. 支持Rabbit与kafka两种消息队列,实现立即执行与根据CronExpress表达式的执行及更加复杂的复合执行策略。在任务执行过程中可完成回滚操作。
分布式任务调度管理 Distribution task center. 支持Rabbit与kafka两种消息队列,实现立即执行与根据CronExpress表达式的执行及更加复杂的复合执行策略。在任务执行过程中可完成回滚操作。
在微服务中我们经常会采用cron或spring scheduler来实现调度任务。但是在分布式多节点架构中,我们不能让每个节点都去执行同一个任务。并且在任务执行过程中一旦遇到失败,必须具备一种回滚机制来保证分布式事务调度的一致性与完整性。本项目采用了rabbitmq与kafka两种消息队列,将任务调度从原始的业务微服务中有效分离,降低业务代码的耦合度。所有调度任务均由调度中心微服务发起。使原有业务真正实现分布式多节点运行。
项目介绍
taskcenter-api - 任务中心api
taskcenter-biz - 任务中心微服务
taskcenter-test - 调度任务实现示例
所有的API均可在taskcenter-test中查看到具体的调用及实现。
运行示例
Java端引入
可参考taskcenter-test
1. pom.xml
<dependency>
<groupId>huxg</groupId>
<artifactId>taskcenter-api</artifactId>
<version>1.0</version>
</dependency>
2.application.yml
spring:
# 使用rabbitmq 作为消息中间件时的配置
#rabbitmq:
# host: ${RABBIT_MQ_HOST:172.16.70.3}
# port: ${RABBIT_MQ_PORT:6672}
# username: user
# password: mqpassWord
# 使用kafka作为消息中间件时的配置
#kafka:
# bootstrap-servers: 172.16.70.3:9092
# immediate-job-topic: base
# cron-job-topic: cron
# producer:
# value-serializer: org.apache.kafka.common.serialization.StringSerializer
# key-serializer: org.apache.kafka.common.serialization.StringSerializer
# buffer-memory: 1351322
# batch-size: 16384
3. @SpringBootApplication 添加包扫描
@SpringBootApplication(scanBasePackages = "com.xxxxx")
4. API说明
4.1 简单的调度任务定义(基于注解定义分布式调度任务)
// 每分钟执行一次的调度任务
@DistributedTask(jobGroupId = "任务组Id", jobId = "任务Id", jobName = "任务名称", requestUrl = "http://taskcenterTest/api/executeEveryMinute", cornExpress = "0 0/1 * * * ?")
// 每天15:00:00 执行的调度任务
@DistributedTask(jobGroupId = "任务组Id", jobId = "任务Id", jobName = "任务名称", requestUrl = "http://taskcenterTest/api/executeEveryMinute", executeTime = "15:00:00")
4.2 执行一个立即执行的任务
BaseMessage job = new BaseMessage();
job.setJobGroupId("任务组名");
job.setJobId("任务ID");
job.setRequestUrl("http://taskcenterTest/api/immediateTask");
taskcenter.executeJob(job);
4.3 执行定时任务
ScheduledJob job = new ScheduledJob();
job.setJobId(UIController.class.getName());
job.setJobGroupId("任务组名");
job.setJobName("任务名称");
job.setCornExpress(cronExpression);
job.setRequestUrl("http://taskcenterTest/api/executeCronTask2");
job.setStatus(TaskCenterConstants.TASK_STATUS_WAITING);
job.setJobType(TaskCenterConstants.JOB_TYPE_CRON);
taskcenter.executeJob(job);
4.4 执行复合调度任务
ScheduledJob job = new ScheduledJob();
job.setJobId(UIController.class.getName());
job.setJobGroupId("任务组Id");
job.setJobName("复杂调度任务");
job.setRequestUrl("http://taskcenterTest/api/executeComplexTask");
job.setJobType(TaskCenterConstants.JOB_TYPE_COMPLEX); // 注意,需要设置为复合执行模式
job.setStatus(TaskCenterConstants.TASK_STATUS_WAITING);
job.setRepeatCount(5);
job.setInterval(5000);
job.param("sendData", data);
taskcenter.executeJob(job);
4.5 Job简介
jobId # 任务Id,停止任务时需要该Id
jobName # 任务名称
requestUrl # 调度任务执行时会调用这里给定的地址
rollbackUrl # 回滚请求
param # 调度任务其他需携带的参数
startTime # 调度任务开始执行时间
endTime # 调度任务终止执行的时间。
repeatCount # 重复执行次数,可理解为震荡次数
jobType # 任务的类型
源代码下载
本项目持续更新,git clone下载最新版本。
更多推荐
所有评论(0)