百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

SpringBoot整合ElasticJob实现分布式任务调度

ccwgpt 2025-06-15 14:53 4 浏览 0 评论

介绍

ElasticJob 是面向互联网生态和海量任务的分布式调度解决方案,由两个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成。它通过弹性调度、资源管控、以及作业治理的功能,打造一个适用于互联网场景的分布式调度解决方案,并通过开放的架构设计,提供多元化的作业生态。它的各个产品使用统一的作业 API,开发者仅需一次开发,即可随意部署

ElasticJob 已于 2020 年 5 月 28 日成为 Apache ShardingSphere 的子项目

中文官网文档:https://shardingsphere.apache.org/elasticjob/current/cn/overview/

工作原理图

ElasticJob-Lite: 定位为轻量级无中心化解决方案,使用 jar 的形式提供分布式任务的协调服务

ElasticJob-Cloud: 采用自研 Mesos Framework 的解决方案,额外提供资源治理、应用分发以及进程隔离等功能


ElasticJob-LiteElasticJob-Cloud无中心化是否资源分配不支持支持作业模式常驻常驻+瞬时部署依赖ZooKeeperZooKeeper + Mesos

正文

注意:

本次演示,是用
org.apache.shardingsphere.elasticjob的依赖

查看网上大多还是用com.dangdang

前者实现了自动装配

需要提前安装zookeepr:docker run --name zook3.6.0 -p 2181:2181 -d zookeeper:3.6.0

演示案例未持久化入库,实际使用中,可以将任务更新到数据库

ElasticJob 目前提供 Simple、Dataflow 这两种基于 class 的作业类型,并提供 Script、HTTP 这两种基于 type 的作业类型,用户可通过实现 SPI 接口自行扩展作业类型

普通静态任务

1.引入依赖

  <dependency>
      <groupId>org.apache.shardingsphere.elasticjob</groupId>
      <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
      <version>3.0.1</version>
  </dependency>
 <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.6</version>
</dependency>

2.application.yml增加定时任务信息

注意:标红的配置,尽量调大一些,不然可能会出现连接EK异常

elasticjob:
  regCenter:
    #ZK地址
    serverLists: 192.168.5.128:2181
    #ZK的命名空间
    namespace: elasticjob-lite-springboot
    #连接超时时间,单位:毫秒
    connection-timeout-milliseconds: 500000
    #会话超时时间,单位:毫秒
    session-timeout-milliseconds: 500000
    #等待重试的间隔时间的初始值,单位:毫秒
    base-sleep-time-milliseconds: 500000
    #等待重试的间隔时间的最大值,单位:毫秒
    max-sleep-time-milliseconds: 500000
  #静态任务配置
  jobs:
    #任务名称,唯一
    simpleJob:
      #作业是否禁止启动,可用于部署作业时,先禁止启动,部署结束后统一启动
      disabled: true
      #作业实现类
      elasticJobClass: com.test.job.SimpleJobDemo
      #cron表达式,用于控制作业触发时间
      cron: 0/5 * * * * ?
      #作业分片总数
      shardingTotalCount: 1
      #分片序列号和参数用等号分隔,多个键值对用逗号分隔;分片序列号从0开始,不可大于或等于作业分片总数
      shardingItemParameters: 0=Beijing
      #作业自定义参数,可通过传递该参数为作业调度的业务方法传参,用于实现带参数的作业
      jobParameter: "{0:'Beijing'}"
      #本地配置是否可覆盖注册中心配置,如果可覆盖,每次启动作业都以本地配置为准
      overwrite: true
    dataflowJob:
      disabled: true
      elasticJobClass: com.test.job.DataflowJobDemo
      cron: 0/5 * * * * ?
      shardingTotalCount: 2
      shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
      overwrite: true
    scriptJob:
      disabled: true
      elasticJobType: SCRIPT
      cron: 0/10 * * * * ?
      shardingTotalCount: 2
      props:
        script.command.line: "echo SCRIPT Job: "
      overwrite: true

3.编写SimpleJobDemo和DataflowJobDemo定时任务

SimpleJobDemo:

@Slf4j
@Component
public class SimpleJobDemo implements SimpleJob {
    public void execute(ShardingContext shardingContext) {
        Gson gson = new Gson();
        Map<String,String> map = gson.fromJson(shardingContext.getJobParameter(), Map.class);
        switch (shardingContext.getShardingItem()) {
            case 0:
                log.info("分片1:执行任务");
                System.out.printf("当前任务名称{%s},当前参数{%s},当前任务参数{%s}",shardingContext.getJobName(),
                        shardingContext.getShardingParameter(),map.get(shardingContext.getShardingItem()+""));
                System.out.println();
                log.info("分片1:任务结束");
                break;
            case 1:
                log.info("分片2:执行任务");
                System.out.printf("当前任务名称{%s},当前参数{%s},当前任务参数{%s}",shardingContext.getJobName(),
                        shardingContext.getShardingParameter(),map.get(shardingContext.getShardingItem()+""));
                System.out.println();
                log.info("分片2:任务结束");
                break;
        }
    }
}

DataflowJobDemo:

@Slf4j
@Component
public class DataflowJobDemo implements DataflowJob<Demo> {

    @Override
    public List<Demo> fetchData(final ShardingContext shardingContext) {
        log.info("Item: {} | Time: {} | Thread: {} | {},detail:{}",
                shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "DATAFLOW FETCH", shardingContext);
        //模拟查询数据库
        List<Demo> list = new ArrayList<>();
        Demo demo = new Demo();
        demo.setName("一安未来").setAddr("北京");
        list.add(demo);
        return list;
    }

    @Override
    public void processData(final ShardingContext shardingContext, final List<Demo> data) {
        log.info("Item: {} | Time: {} | Thread: {} | {},detail:{} | data:{}",
                shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "DATAFLOW PROCESS", shardingContext,data);

    }

}

4.启动验证

2022-09-07 15:09:35.093  INFO 20952 --- [pleJob_Worker-1] com.test.job.SimpleJobDemo               : 分片1:执行任务
当前任务名称{simpleJob},当前参数{Beijing},当前任务参数{Beijing}
2022-09-07 15:09:35.096  INFO 20952 --- [pleJob_Worker-1] com.test.job.SimpleJobDemo               : 分片1:任务结束
2022-09-07 15:09:35.100  INFO 20952 --- [b-dataflowJob-1] com.test.job.DataflowJobDemo             : Item: 0 | Time: 15:09:35 | Thread: 103 | DATAFLOW FETCH,detail:ShardingContext(jobName=dataflowJob, taskId=dataflowJob@-@0,1@-@READY@-@192.168.5.1@-@20952, shardingTotalCount=2, jobParameter=, shardingItem=0, shardingParameter=Beijing)
2022-09-07 15:09:35.100  INFO 20952 --- [b-dataflowJob-2] com.test.job.DataflowJobDemo             : Item: 1 | Time: 15:09:35 | Thread: 104 | DATAFLOW FETCH,detail:ShardingContext(jobName=dataflowJob, taskId=dataflowJob@-@0,1@-@READY@-@192.168.5.1@-@20952, shardingTotalCount=2, jobParameter=, shardingItem=1, shardingParameter=Shanghai)
2022-09-07 15:09:35.101  INFO 20952 --- [b-dataflowJob-2] com.test.job.DataflowJobDemo             : Item: 1 | Time: 15:09:35 | Thread: 104 | DATAFLOW PROCESS,detail:ShardingContext(jobName=dataflowJob, taskId=dataflowJob@-@0,1@-@READY@-@192.168.5.1@-@20952, shardingTotalCount=2, jobParameter=, shardingItem=1, shardingParameter=Shanghai) | data:[Demo(name=一安未来, addr=北京)]
2022-09-07 15:09:35.101  INFO 20952 --- [b-dataflowJob-1] com.test.job.DataflowJobDemo             : Item: 0 | Time: 15:09:35 | Thread: 103 | DATAFLOW PROCESS,detail:ShardingContext(jobName=dataflowJob, taskId=dataflowJob@-@0,1@-@READY@-@192.168.5.1@-@20952, shardingTotalCount=2, jobParameter=, shardingItem=0, shardingParameter=Beijing) | data:[Demo(name=一安未来, addr=北京)]
2022-09-07 15:09:40.018  INFO 20952 --- [pleJob_Worker-1] com.test.job.SimpleJobDemo               : 分片1:执行任务
当前任务名称{simpleJob},当前参数{Beijing},当前任务参数{Beijing}
2022-09-07 15:09:40.018  INFO 20952 --- [pleJob_Worker-1] com.test.job.SimpleJobDemo               : 分片1:任务结束
2022-09-07 15:09:40.020  INFO 20952 --- [b-dataflowJob-3] com.test.job.DataflowJobDemo             : Item: 0 | Time: 15:09:40 | Thread: 105 | DATAFLOW FETCH,detail:ShardingContext(jobName=dataflowJob, taskId=dataflowJob@-@0,1@-@READY@-@192.168.5.1@-@20952, shardingTotalCount=2, jobParameter=, shardingItem=0, shardingParameter=Beijing)
2022-09-07 15:09:40.020  INFO 20952 --- [b-dataflowJob-3] com.test.job.DataflowJobDemo             : Item: 0 | Time: 15:09:40 | Thread: 105 | DATAFLOW PROCESS,detail:ShardingContext(jobName=dataflowJob, taskId=dataflowJob@-@0,1@-@READY@-@192.168.5.1@-@20952, shardingTotalCount=2, jobParameter=, shardingItem=0, shardingParameter=Beijing) | data:[Demo(name=一安未来, addr=北京)]
2022-09-07 15:09:40.020  INFO 20952 --- [b-dataflowJob-4] com.test.job.DataflowJobDemo             : Item: 1 | Time: 15:09:40 | Thread: 106 | DATAFLOW FETCH,detail:ShardingContext(jobName=dataflowJob, taskId=dataflowJob@-@0,1@-@READY@-@192.168.5.1@-@20952, shardingTotalCount=2, jobParameter=, shardingItem=1, shardingParameter=Shanghai)
2022-09-07 15:09:40.021  INFO 20952 --- [b-dataflowJob-4] com.test.job.DataflowJobDemo             : Item: 1 | Time: 15:09:40 | Thread: 106 | DATAFLOW PROCESS,detail:ShardingContext(jobName=dataflowJob, taskId=dataflowJob@-@0,1@-@READY@-@192.168.5.1@-@20952, shardingTotalCount=2, jobParameter=, shardingItem=1, shardingParameter=Shanghai) | data:[Demo(name=一安未来, addr=北京)]
SCRIPT Job: {jobName:scriptJob,taskId:scriptJob@-@0,1@-@READY@-@192.168.5.1@-@20952,shardingTotalCount:2,jobParameter:,shardingItem:0}
SCRIPT Job: {jobName:scriptJob,taskId:scriptJob@-@0,1@-@READY@-@192.168.5.1@-@20952,shardingTotalCount:2,jobParameter:,shardingItem:1}

5.启动多个程序端口验证

这里修改了simpleJob任务为两个分片,其他任务暂时停止了,修改disabled: true即可停止任务

    simpleJob:
      #作业是否禁止启动,可用于部署作业时,先禁止启动,部署结束后统一启动
      disabled: false
      #作业实现类
      elasticJobClass: com.test.job.SimpleJobDemo
      #cron表达式,用于控制作业触发时间
      cron: 0/5 * * * * ?
      #作业分片总数
      shardingTotalCount: 2
      #分片序列号和参数用等号分隔,多个键值对用逗号分隔;分片序列号从0开始,不可大于或等于作业分片总数
      shardingItemParameters: 0=Beijing,1=Shanghai
      #作业自定义参数,可通过传递该参数为作业调度的业务方法传参,用于实现带参数的作业
      jobParameter: "{0:'Beijing',1:'Shanghai'}"
      #本地配置是否可覆盖注册中心配置,如果可覆盖,每次启动作业都以本地配置为准
      overwrite: true

启动8082端口,可以看到分片1,2信息:

2022-09-07 15:18:20.015  INFO 2164 --- [job-simpleJob-7] com.test.job.SimpleJobDemo               : 分片1:执行任务
当前任务名称{simpleJob},当前参数{Beijing},当前任务参数{Beijing}
2022-09-07 15:18:20.015  INFO 2164 --- [job-simpleJob-8] com.test.job.SimpleJobDemo               : 分片2:执行任务
2022-09-07 15:18:20.015  INFO 2164 --- [job-simpleJob-7] com.test.job.SimpleJobDemo               : 分片1:任务结束
当前任务名称{simpleJob},当前参数{Shanghai},当前任务参数{null}
2022-09-07 15:18:20.015  INFO 2164 --- [job-simpleJob-8] com.test.job.SimpleJobDemo               : 分片2:任务结束

再启动8083端口,可以看到分片1信息:

2022-09-07 15:19:15.024  INFO 32764 --- [pleJob_Worker-1] com.test.job.SimpleJobDemo               : 分片1:执行任务
当前任务名称{simpleJob},当前参数{Beijing},当前任务参数{Beijing}
2022-09-07 15:19:15.024  INFO 32764 --- [pleJob_Worker-1] com.test.job.SimpleJobDemo               : 分片1:任务结束
2022-09-07 15:19:20.021  INFO 32764 --- [pleJob_Worker-1] com.test.job.SimpleJobDemo               : 分片1:执行任务
当前任务名称{simpleJob},当前参数{Beijing},当前任务参数{Beijing}
2022-09-07 15:19:20.021  INFO 32764 --- [pleJob_Worker-1] com.test.job.SimpleJobDemo               : 分片1:任务结束

这个时候再看8082端口日志,只剩下分片2信息:

2022-09-07 15:19:20.020  INFO 2164 --- [pleJob_Worker-1] com.test.job.SimpleJobDemo               : 分片2:执行任务
当前任务名称{simpleJob},当前参数{Shanghai},当前任务参数{null}
2022-09-07 15:19:20.021  INFO 2164 --- [pleJob_Worker-1] com.test.job.SimpleJobDemo               : 分片2:任务结束
2022-09-07 15:19:25.010  INFO 2164 --- [pleJob_Worker-1] com.test.job.SimpleJobDemo               : 分片2:执行任务
当前任务名称{simpleJob},当前参数{Shanghai},当前任务参数{null}
2022-09-07 15:19:25.011  INFO 2164 --- [pleJob_Worker-1] com.test.job.SimpleJobDemo               : 分片2:任务结束
2022-09-07 15:19:30.016  INFO 2164 --- [pleJob_Worker-1] com.test.job.SimpleJobDemo               : 分片2:执行任务
当前任务名称{simpleJob},当前参数{Shanghai},当前任务参数{null}
2022-09-07 15:19:30.016  INFO 2164 --- [pleJob_Worker-1] com.test.job.SimpleJobDemo               : 分片2:任务结束

停止8083端口,再查看8082端口,又恢复分片1,2信息:

当前任务名称{simpleJob},当前参数{Shanghai},当前任务参数{null}
2022-09-07 15:21:15.012  INFO 2164 --- [job-simpleJob-6] com.test.job.SimpleJobDemo               : 分片2:任务结束
2022-09-07 15:21:15.011  INFO 2164 --- [job-simpleJob-5] com.test.job.SimpleJobDemo               : 分片1:执行任务
当前任务名称{simpleJob},当前参数{Beijing},当前任务参数{Beijing}
2022-09-07 15:21:15.012  INFO 2164 --- [job-simpleJob-5] com.test.job.SimpleJobDemo               : 分片1:任务结束
2022-09-07 15:21:20.012  INFO 2164 --- [job-simpleJob-7] com.test.job.SimpleJobDemo               : 分片1:执行任务
当前任务名称{simpleJob},当前参数{Beijing},当前任务参数{Beijing}
2022-09-07 15:21:20.012  INFO 2164 --- [job-simpleJob-8] com.test.job.SimpleJobDemo               : 分片2:执行任务
当前任务名称{simpleJob},当前参数{Shanghai},当前任务参数{null}
2022-09-07 15:21:20.013  INFO 2164 --- [job-simpleJob-8] com.test.job.SimpleJobDemo               : 分片2:任务结束
2022-09-07 15:21:20.013  INFO 2164 --- [job-simpleJob-7] com.test.job.SimpleJobDemo               : 分片1:任务结束

动态定时任务

上面实现了静态任务的编写,但实际开发中经常会遇到动态任务,即动态创建,修改,暂停定时任务

1.编写动态任务类

@Component
public class DynamicTask {

    @Autowired
    private ZookeeperRegistryCenter zookeeperRegistryCenter;

    /***
     * 动态创建定时任务
     * @param jobName:定时任务名称
     * @param cron:表达式
     * @param shardingTotalCount:分片数量
     * @param instance:定时任务实例
     * @param shardingItemParameters:分片参数
     * @param parameters:参数
     */
    public void create(String jobName, String cron, int shardingTotalCount, SimpleJob instance, String shardingItemParameters,String parameters){
        JobConfiguration coreConfig = JobConfiguration.newBuilder(jobName, shardingTotalCount).cron(cron)
                .shardingItemParameters(shardingItemParameters).jobParameter(parameters).overwrite(true).build();
        new ScheduleJobBootstrap(zookeeperRegistryCenter, instance, coreConfig).schedule();
    }

    /**
     * 更新定时任务
     * @param jobName
     * @param cron
     */public void updateJob(String jobName, String cron) {
        JobRegistry.getInstance().getJobScheduleController(jobName).rescheduleJob(cron,"");
    }

    /**
     * 停止定时任务
     * @param jobName
     */
    public void shutdownJob(String jobName){
        JobRegistry.getInstance().getJobScheduleController(jobName).shutdown();
    }
}

2.发送请求类

@RestController
public class TestJobController {
    @Autowired
    private DynamicTask dynamicTaskAdd;

    @GetMapping("/create")
    public void create(){
        String cron = "0/5 * * * * ?";
        dynamicTaskAdd.create("job001", cron, 1, new SimpleJobDemo(),"0=test","{0:'Beijing'}");
    }
    @GetMapping("/update")
    public void update(){
        String cron = "0/10 * * * * ?";
        dynamicTaskAdd.updateJob("job001",cron);
    }
    @GetMapping("/stop")
    public void stop(){
        dynamicTaskAdd.shutdownJob("job001");
    }
}

3.依次请求create--update--stop,注意观察间隔时间

2022-09-07 15:36:55.019  INFO 7320 --- [job001_Worker-1] com.test.job.SimpleJobDemo               : 分片1:执行任务
当前任务名称{job001},当前参数{test},当前任务参数{Beijing}
2022-09-07 15:36:55.020  INFO 7320 --- [job001_Worker-1] com.test.job.SimpleJobDemo               : 分片1:任务结束
2022-09-07 15:37:00.037  INFO 7320 --- [job001_Worker-1] com.test.job.SimpleJobDemo               : 分片1:执行任务
当前任务名称{job001},当前参数{test},当前任务参数{Beijing}
2022-09-07 15:37:00.037  INFO 7320 --- [job001_Worker-1] com.test.job.SimpleJobDemo               : 分片1:任务结束
2022-09-07 15:37:05.070  INFO 7320 --- [job001_Worker-1] com.test.job.SimpleJobDemo               : 分片1:执行任务
当前任务名称{job001},当前参数{test},当前任务参数{Beijing}
2022-09-07 15:37:05.072  INFO 7320 --- [job001_Worker-1] com.test.job.SimpleJobDemo               : 分片1:任务结束

2022-09-07 15:37:20.019  INFO 7320 --- [job001_Worker-1] com.test.job.SimpleJobDemo               : 分片1:执行任务
当前任务名称{job001},当前参数{test},当前任务参数{Beijing}
2022-09-07 15:37:20.021  INFO 7320 --- [job001_Worker-1] com.test.job.SimpleJobDemo               : 分片1:任务结束
2022-09-07 15:37:30.013  INFO 7320 --- [job001_Worker-1] com.test.job.SimpleJobDemo               : 分片1:执行任务
当前任务名称{job001},当前参数{test},当前任务参数{Beijing}
2022-09-07 15:37:30.013  INFO 7320 --- [job001_Worker-1] com.test.job.SimpleJobDemo               : 分片1:任务结束
2022-09-07 15:37:39.085  INFO 7320 --- [nio-8083-exec-5] org.quartz.core.QuartzScheduler          : Scheduler job001_$_NON_CLUSTERED shutting down.
2022-09-07 15:37:39.085  INFO 7320 --- [nio-8083-exec-5] org.quartz.core.QuartzScheduler          : Scheduler job001_$_NON_CLUSTERED paused.
2022-09-07 15:37:39.093  INFO 7320 --- [nio-8083-exec-5] org.quartz.core.QuartzScheduler          : Scheduler job001_$_NON_CLUSTERED shutdown complete.

图形化界面

elastic-job支持图形化界面,进入后首先添加自己的ZK地址和命名空间,然后可在界面查看任务

  1. 如果你使用的是org.apache.shardingsphere.elasticjob版本,下载elasticjob-ui一定要跟自己的依赖版本一致,不然会出现各种异常,启动默认端口8088,用户名密码root/root

官网下载地址:https://dlcdn.apache.org/shardingsphere

  1. 如果你使用的是com.dangdang,你需要下载elastic-job-lite-console,启动默认端口8899,用户名密码root

目前下载都是从github上直接拉取

顺便介绍一下如何使用com.dangdang

1.引入依赖

   <dependency>
        <groupId>com.dangdang</groupId>
        <artifactId>elastic-job-lite-core</artifactId>
        <version>2.1.5</version>
    </dependency>
    <dependency>
        <groupId>com.dangdang</groupId>
        <artifactId>elastic-job-lite-spring</artifactId>
        <version>2.1.5</version>
    </dependency>

2.编写注册中心类

@Configuration
public class ElasticRegCenterConfig {
    @Value("${elasticjob.zk.serverLists}")
    private String serverList;

    @Value("${elasticjob.zk.namespace}")
    private String namespace;

    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter() {
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(serverList, namespace);
        zookeeperConfiguration.setMaxRetries(3); //设置重试次数,可设置其他属性
        zookeeperConfiguration.setSessionTimeoutMilliseconds(500000); //设置会话超时时间,尽量大一点,否则项目无法正常启动
        return new ZookeeperRegistryCenter(zookeeperConfiguration);
    }
}

3.编写定时任务配置类

@Configuration
public class ElasticJobConfig {
    @Autowired
    private ZookeeperRegistryCenter regCenter;

    @Autowired
    SimpleJobDemo simpleJob;

    @Value("${elasticjob.job.cron}")
    private String cron;

    @Value("${elasticjob.job.shardingTotalCount}")
    private int shardingTotalCount;

    @Value("${elasticjob.job.shardingItemParameters}")
    private String shardingItemParameters;

    @Value("${elasticjob.job.jobParameter:}")
    private String jobParameter;


    //加入bean注解就可以拿到MyJob
    //静态任务默认初始化
    @Bean(initMethod = "init")
    public SpringJobScheduler initJobConfiguration() {
        return new SpringJobScheduler(simpleJob, regCenter, createJobConfiguration(simpleJob.getClass(),cron,shardingTotalCount,shardingItemParameters));
    }

    /**
     * 
     * @param clazz 任务的字节码
     * @param cron 表达式
     * @param shrdingCount 分片个数
     * @param shardingParamter 分片参数
     * @return
     */
    private LiteJobConfiguration createJobConfiguration(Class<? extends ElasticJob> clazz, String cron, int shrdingCount, String shardingParamter) {

        JobCoreConfiguration.Builder builder = JobCoreConfiguration.newBuilder(clazz.getSimpleName(), cron, shrdingCount);
        if (StringUtils.isEmpty(shardingParamter)) {
            builder.shardingItemParameters(shardingParamter);
        }
        //创建作业配置
        SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(builder.build(), clazz.getCanonicalName());
        //覆盖zookeeper
        return LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build();
    }
    
       /***
     * 动态创建定时任务
     * @param jobName:定时任务名称
     * @param cron:表达式
     * @param shardingTotalCount:分片数量
     * @param instance:定时任务实例
     * @param parameters:参数
     * @param description:作业描述
     */
    public void addJob(String jobName, String cron, int shardingTotalCount, SimpleJob instance, String parameters, String description){
        LiteJobConfiguration.Builder builder = LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(
                JobCoreConfiguration.newBuilder(
                        jobName,
                        cron,
                        shardingTotalCount
                ).jobParameter(parameters).description(description).build(),
                instance.getClass().getName()
        )).overwrite(true);
        LiteJobConfiguration liteJobConfiguration = builder.build();

        new SpringJobScheduler(instance,regCenter,liteJobConfiguration).init();
    }

    /**
     * 更新定时任务
     * @param jobName
     * @param cron
     */public void updateJob(String jobName, String cron) {
        JobRegistry.getInstance().getJobScheduleController(jobName).rescheduleJob(cron,"");
    }


    /**
     * 停止定时任务
     * @param jobName
     */
    public void shutdownJob(String jobName){
        JobRegistry.getInstance().getJobScheduleController(jobName).shutdown();
    }
}

4.编写SimpleJobDemo和DataflowJobDemo定时任务(参照apache以上即可)

补充说明

ElasticJob支持错误处理策略:

  • 记录日志策略
  • 抛出异常策略
  • 忽略异常策略
  • 邮件通知策略
<dependency>
    <groupId>org.apache.shardingsphere.elasticjob</groupId>
    <artifactId>elasticjob-error-handler-email</artifactId>
    <version>${latest.release.version}</version>
</dependency>
  • 企业微信通知策略
<dependency>
    <groupId>org.apache.shardingsphere.elasticjob</groupId>
    <artifactId>elasticjob-error-handler-wechat</artifactId>
    <version>${latest.release.version}</version>
</dependency>
  • 钉钉通知策略
<dependency>
    <groupId>org.apache.shardingsphere.elasticjob</groupId>
    <artifactId>elasticjob-error-handler-dingtalk</artifactId>
    <version>${latest.release.version}</version>
</dependency>

相关推荐

十分钟让你学会LNMP架构负载均衡(impala负载均衡)

业务架构、应用架构、数据架构和技术架构一、几个基本概念1、pv值pv值(pageviews):页面的浏览量概念:一个网站的所有页面,在一天内,被浏览的总次数。(大型网站通常是上千万的级别)2、u...

AGV仓储机器人调度系统架构(agv物流机器人)

系统架构层次划分采用分层模块化设计,分为以下五层:1.1用户接口层功能:提供人机交互界面(Web/桌面端),支持任务下发、实时监控、数据可视化和报警管理。模块:任务管理面板:接收订单(如拣货、...

远程热部署在美团的落地实践(远程热点是什么意思)

Sonic是美团内部研发设计的一款用于热部署的IDEA插件,本文其实现原理及落地的一些技术细节。在阅读本文之前,建议大家先熟悉一下Spring源码、SpringMVC源码、SpringBoot...

springboot搭建xxl-job(分布式任务调度系统)

一、部署xxl-job服务端下载xxl-job源码:https://gitee.com/xuxueli0323/xxl-job二、导入项目、创建xxl_job数据库、修改配置文件为自己的数据库三、启动...

大模型:使用vLLM和Ray分布式部署推理应用

一、vLLM:面向大模型的高效推理框架1.核心特点专为推理优化:专注于大模型(如GPT-3、LLaMA)的高吞吐量、低延迟推理。关键技术:PagedAttention:类似操作系统内存分页管理,将K...

国产开源之光【分布式工作流调度系统】:DolphinScheduler

DolphinScheduler是一个开源的分布式工作流调度系统,旨在帮助用户以可靠、高效和可扩展的方式管理和调度大规模的数据处理工作流。它支持以图形化方式定义和管理工作流,提供了丰富的调度功能和监控...

简单可靠高效的分布式任务队列系统

#记录我的2024#大家好,又见面了,我是GitHub精选君!背景介绍在系统访问量逐渐增大,高并发、分布式系统成为了企业技术架构升级的必由之路。在这样的背景下,异步任务队列扮演着至关重要的角色,...

虚拟服务器之间如何分布式运行?(虚拟服务器部署)

  在云计算和虚拟化技术快速发展的今天,传统“单机单任务”的服务器架构早已难以满足现代业务对高并发、高可用、弹性伸缩和容错容灾的严苛要求。分布式系统应运而生,并成为支撑各类互联网平台、企业信息系统和A...

一文掌握 XXL-Job 的 6 大核心组件

XXL-Job是一个分布式任务调度平台,其核心组件主要包括以下部分,各组件相互协作实现高效的任务调度与管理:1.调度注册中心(RegistryCenter)作用:负责管理调度器(Schedule...

京东大佬问我,SpringBoot中如何做延迟队列?单机与分布式如何做?

京东大佬问我,SpringBoot中如何做延迟队列?单机如何做?分布式如何做呢?并给出案例与代码分析。嗯,用户问的是在SpringBoot中如何实现延迟队列,单机和分布式环境下分别怎么做。这个问题其实...

企业级项目组件选型(一)分布式任务调度平台

官网地址:https://www.xuxueli.com/xxl-job/能力介绍架构图安全性为提升系统安全性,调度中心和执行器进行安全性校验,双方AccessToken匹配才允许通讯;调度中心和执...

python多进程的分布式任务调度应用场景及示例

多进程的分布式任务调度可以应用于以下场景:分布式爬虫:importmultiprocessingimportrequestsdefcrawl(url):response=re...

SpringBoot整合ElasticJob实现分布式任务调度

介绍ElasticJob是面向互联网生态和海量任务的分布式调度解决方案,由两个相互独立的子项目ElasticJob-Lite和ElasticJob-Cloud组成。它通过弹性调度、资源管控、...

分布式可视化 DAG 任务调度系统 Taier 的整体流程分析

Taier作为袋鼠云的开源项目之一,是一个分布式可视化的DAG任务调度系统。旨在降低ETL开发成本,提高大数据平台稳定性,让大数据开发人员可以在Taier直接进行业务逻辑的开发,而不用关...

SpringBoot任务调度:@Scheduled与TaskExecutor全面解析

一、任务调度基础概念1.1什么是任务调度任务调度是指按照预定的时间计划或特定条件自动执行任务的过程。在现代应用开发中,任务调度扮演着至关重要的角色,它使得开发者能够自动化处理周期性任务、定时任务和异...

取消回复欢迎 发表评论: