分布式任务调度管理 Distribution task center. 支持Rabbit与kafka两种消息队列,实现立即执行与根据CronExpress表达式的执行及更加复杂的复合执行策略。在任务执行过程中可完成回滚操作。

 

在微服务中我们经常会采用cron或spring scheduler来实现调度任务。但是在分布式多节点架构中,我们不能让每个节点都去执行同一个任务。并且在任务执行过程中一旦遇到失败,必须具备一种回滚机制来保证分布式事务调度的一致性与完整性。本项目采用了rabbitmq与kafka两种消息队列,将任务调度从原始的业务微服务中有效分离,降低业务代码的耦合度。所有调度任务均由调度中心微服务发起。使原有业务真正实现分布式多节点运行。

项目介绍

taskcenter-api - 任务中心api
taskcenter-biz - 任务中心微服务
taskcenter-test - 调度任务实现示例

所有的API均可在taskcenter-test中查看到具体的调用及实现。

运行示例

Java端引入

可参考taskcenter-test

1. pom.xml

<dependency>
    <groupId>huxg</groupId>
    <artifactId>taskcenter-api</artifactId>
    <version>1.0</version>
</dependency>

2.application.yml

spring:
    # 使用rabbitmq 作为消息中间件时的配置
    #rabbitmq: 
    #    host: ${RABBIT_MQ_HOST:172.16.70.3}
    #    port:  ${RABBIT_MQ_PORT:6672}
    #    username: user
    #    password: mqpassWord
    
    # 使用kafka作为消息中间件时的配置
    #kafka:
    #    bootstrap-servers: 172.16.70.3:9092
    #    immediate-job-topic: base
    #    cron-job-topic: cron
    #    producer:
    #        value-serializer: org.apache.kafka.common.serialization.StringSerializer
    #        key-serializer: org.apache.kafka.common.serialization.StringSerializer
    #        buffer-memory: 1351322
    #        batch-size: 16384

3. @SpringBootApplication 添加包扫描

@SpringBootApplication(scanBasePackages = "com.xxxxx")

4. API说明

    4.1 简单的调度任务定义(基于注解定义分布式调度任务)

// 每分钟执行一次的调度任务
@DistributedTask(jobGroupId = "任务组Id", jobId = "任务Id", jobName = "任务名称", requestUrl = "http://taskcenterTest/api/executeEveryMinute", cornExpress = "0 0/1 * * * ?")
	
// 每天15:00:00 执行的调度任务
@DistributedTask(jobGroupId = "任务组Id", jobId = "任务Id", jobName = "任务名称", requestUrl = "http://taskcenterTest/api/executeEveryMinute", executeTime = "15:00:00")

    4.2 执行一个立即执行的任务

BaseMessage job = new BaseMessage();
job.setJobGroupId("任务组名");
job.setJobId("任务ID");
job.setRequestUrl("http://taskcenterTest/api/immediateTask");
taskcenter.executeJob(job);

    4.3 执行定时任务

ScheduledJob job = new ScheduledJob();
job.setJobId(UIController.class.getName());
job.setJobGroupId("任务组名");
job.setJobName("任务名称");
job.setCornExpress(cronExpression);
job.setRequestUrl("http://taskcenterTest/api/executeCronTask2");
job.setStatus(TaskCenterConstants.TASK_STATUS_WAITING);
job.setJobType(TaskCenterConstants.JOB_TYPE_CRON);
taskcenter.executeJob(job);

    4.4 执行复合调度任务

ScheduledJob job = new ScheduledJob();
job.setJobId(UIController.class.getName());
job.setJobGroupId("任务组Id");
job.setJobName("复杂调度任务");
job.setRequestUrl("http://taskcenterTest/api/executeComplexTask");
job.setJobType(TaskCenterConstants.JOB_TYPE_COMPLEX); // 注意,需要设置为复合执行模式
job.setStatus(TaskCenterConstants.TASK_STATUS_WAITING);
job.setRepeatCount(5);
job.setInterval(5000);
job.param("sendData", data);
taskcenter.executeJob(job);

    4.5 Job简介

     jobId  # 任务Id,停止任务时需要该Id

     jobName # 任务名称

     requestUrl # 调度任务执行时会调用这里给定的地址

      rollbackUrl # 回滚请求

      param # 调度任务其他需携带的参数

      startTime # 调度任务开始执行时间

      endTime # 调度任务终止执行的时间。

       repeatCount # 重复执行次数,可理解为震荡次数

       jobType # 任务的类型

源代码下载

本项目持续更新,git clone下载最新版本。

源代码下载及Github更新地址

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐