百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

分布式定时任务xxJob的常用姿势都集齐了,So Easy

ccwgpt 2024-09-18 12:27 37 浏览 0 评论

前言

任务调度是java项目中常用的一种组件,可以指定任务在何时进行触发,最熟悉的是spring框架里面的quartz,较流行的有一些分布式调度组件,比如elastic-job/azkaban,都是基于quartz二次开发的,今天介绍一款分布式的任务调度器xxl-job。

项目介绍

xxl-job是一款极容易学习上手的轻量级开源分布式调度框架,分为管理端和执行端两块,管理端负责配置任务信息以及查看任务执行日志,执行端只需要配置与管理端的连接信息就可以进行具体的任务逻辑开发了,目前版本还在持续迭代中,使用简单,功能强大,具体功能特性可以看下官方介绍。废话不多说,直接进入实战吧。

实战

1.服务端部署

从https://github.com/xuxueli/xxl-job下载项目,用mysql客户端工具Navicat执行项目根目录下doc/db/table_xxl_job.sql文件,库名自己可以自行修改,一共8张表,如下:

创建一个新的spring boot项目,将下载的xxl-job-admin目录下的文件以及pom.xml文件都拷贝到新建的项目中(如果不想新建项目可以直接用下载下来的项目进行修改部署),修改application.properties中的数据库连接信息。

image-20200603145447639

小编是自己创建的新项目,需要手动改了pom.xml依赖xxl-job-core的版本为2.2.0

image-20200603150900339

修改logback.xml中的日志输出路径。

image-20200603145607284

好了,以上3步曲就搞定整个服务端配置了,启动项目,并访问http://localhost:8080/xxl-job-admin/ ,默认管理员账号admin/123456进行登录。

image-20200603150148363

这交互,可以啊,是不是很带感。

2.执行端配置

创建一个新的module,跟服务端一样,也需要修改下logback.xml以及在pom.xml添加xxl-job-core的依赖。

为了模拟分布式效果,小编创建了2个配置文件来区分2个执行服务。

application-9998.properties

# web port
server.port=8081
# no web
#spring.main.web-environment=false
# log config
logging.config=classpath:logback.xml

### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin

### xxl-job, access token
xxl.job.accessToken=

### xxl-job executor appname
xxl.job.executor.appname=my-job-executor
### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
xxl.job.executor.address=
### xxl-job executor server-info
xxl.job.executor.ip=
xxl.job.executor.port=9998
### xxl-job executor log-path
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### xxl-job executor log-retention-days
xxl.job.executor.logretentiondays=30

application-9999.properties

# web port
server.port=8082
# no web
#spring.main.web-environment=false

# log config
logging.config=classpath:logback.xml

### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### xxl-job, access token
xxl.job.accessToken=

### xxl-job executor appname
xxl.job.executor.appname=my-job-executor
### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
xxl.job.executor.address=
### xxl-job executor server-info
xxl.job.executor.ip=
xxl.job.executor.port=9999
### xxl-job executor log-path
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### xxl-job executor log-retention-days
xxl.job.executor.logretentiondays=30

细心的童鞋会发现只有server.port和xxl.job.executor.port不同,执行器服务跟spring boot一样,自带内嵌tomcat,也会暴露一个端口注册到服务端,进行高可用负载。

创建一个java config类,定义一个使用配置的XxlJobSpringExecutor执行类,如下

@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;
    
    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
        return xxlJobSpringExecutor;
    }

配置2个启动配置,分别启动,效果如下:

image-20200603153527093

image-20200603153548043

完美启动2个服务,看下服务端平台是不是有这两台执行服务的注册信息。

image-20200603153906206

注意:为了演示,事先创建了一个执行器,AppName一定要与配置文件中xxl.job.executor.appname一致。

3.任务开发

3.1 基于方法注解任务

话不多说,直接上代码吧,毕竟代码是程序员最好的交流方式。

    /**
     * 1、注解任务
     */
    @XxlJob(value = "myJobAnnotationHandler",init = "init", destroy = "destroy")
    public ReturnT<String> myJobAnnotationHandler(String param) throws Exception {
        XxlJobLogger.log("XXL-JOB-ANNOTATION, myJobAnnotationHandler.");

        log.info("my first annotation job run, param: {},port:{}",param,port);

        return ReturnT.SUCCESS;
    }
    
    public void init(){
        log.info("my annotation job  init");
    }

    public void destroy(){
        log.info("my job annotation job destory");
    }

3.2 基于api任务

@Slf4j
public class ApiJob extends IJobHandler {

    @Override
    public ReturnT<String> execute(String param) throws Exception {

        XxlJobLogger.log("XXL-JOB-API, Hello World.");

        log.info("my job api run, param: {}",param);

        return ReturnT.SUCCESS;
    }
}
 @PostConstruct
    public void registerHandler(){
   XxlJobExecutor.registJobHandler("myJobApiHandler",new ApiJob());
    }

3.3 分片广播任务

 /**
     * 2、分片任务
     */
    @XxlJob("myShardJobAnnotationHandler")
    public ReturnT<String> myShardJobAnnotationHandler(String param) throws Exception {
        XxlJobLogger.log("XXL-JOB-ANNOTATION, myShardJobAnnotationHandler.");

        log.info("my shard job run, param: {}",param);
        ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
        log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());

        // 业务逻辑
        for (int i = 0; i < shardingVO.getTotal(); i++) {
            if (i == shardingVO.getIndex()) {
                log.info("第 {} 片, 命中分片开始处理", i);
            } else {
                log.info("第 {} 片, 忽略", i);
            }
        }
        return ReturnT.SUCCESS;
    }

上面是整理的比较实用的任务创建方式,个人偏好于注解形式,方法上加一个注解就完事了。

4.任务执行

剩下的就是傻白甜的界面操作了,走起。

4.1 单任务执行

创建一个路由策略为轮询的任务,指定corn表达式,并填入JobHandler为myJobAnnotationHandler,myJobAnnotationHandler其实就是spring IOC容器中管理bean的名称,有兴趣的童鞋可以看下源码。

为了演示效果,点击执行一次并进行任务参数输入。

image-20200603161704586

image-20200603161309965

轮询调用执行器服务效果如下:

image-20200603161526728

image-20200603161610450

4.2 子任务执行

更新任务,并指定子任务id为5,多个子任务的需要以逗号隔开

image

image-20200603162128189

执行任务结果如下

image-20200603162031710

4.3 分片广播任务执行

分片任务其实就是广播功能,每次触发,每个执行服务的业务执行类都会被调用,类似于kafka里面的不同消费组都要对同一个topic进行消费一样。

执行后的效果如下

image-20200603162803111

image-20200603162814417

太强势了,需要定时刷新项目中的配置信息,用这个方式很完美。

5.任务日志

任务日志其实是很重要的一块,方便回溯任务历史执行情况, 以便跟踪问题并矫正丢失的业务数据

image-20200603163503041

查看调度备注,父子任务调度信息非常详细,子任务可以通过执行备注查看执行情况

image-20200603163909026

image-20200603164510795

查看控制台输出,里面的日志是执行器中XxlJobLogger类打印出来的

image-20200603164115849

代码获取

1.访问地址:https://github.com/pengziliu/GitHub-code-practice/

2.点击左下方阅读原文

总结

xxl-job还有很多特性,个人觉得实用的有任务超时控制/任务失败重试/任务失败告警 /邮件报警,这些都是与业务紧密相关的功能,能有效的规避生产事故而达到止损目的。

对比下之前使用过的任务调度组件,xxl-job将任务调度和执行进行解耦,大大提高了可用性和扩展性,代码的侵入性几乎没有,与spring boot无配置化,开箱即用的理念非常契合,任务调度平台可视化操作的确是爽爆了,感觉小白都可以用。

相关推荐

十分钟让你学会LNMP架构负载均衡(impala负载均衡)

业务架构、应用架构、数据架构和技术架构一、几个基本概念1、pv值pv值(pageviews):页面的浏览量概念:一个网站的所有页面,在一天内,被浏览的总次数。(大型网站通常是上千万的级别)2、u...

AGV仓储机器人调度系统架构(agv物流机器人)

系统架构层次划分采用分层模块化设计,分为以下五层:1.1用户接口层功能:提供人机交互界面(Web/桌面端),支持任务下发、实时监控、数据可视化和报警管理。模块:任务管理面板:接收订单(如拣货、...

远程热部署在美团的落地实践(远程热点是什么意思)

Sonic是美团内部研发设计的一款用于热部署的IDEA插件,本文其实现原理及落地的一些技术细节。在阅读本文之前,建议大家先熟悉一下Spring源码、SpringMVC源码、SpringBoot...

springboot搭建xxl-job(分布式任务调度系统)

一、部署xxl-job服务端下载xxl-job源码:https://gitee.com/xuxueli0323/xxl-job二、导入项目、创建xxl_job数据库、修改配置文件为自己的数据库三、启动...

大模型:使用vLLM和Ray分布式部署推理应用

一、vLLM:面向大模型的高效推理框架1.核心特点专为推理优化:专注于大模型(如GPT-3、LLaMA)的高吞吐量、低延迟推理。关键技术:PagedAttention:类似操作系统内存分页管理,将K...

国产开源之光【分布式工作流调度系统】:DolphinScheduler

DolphinScheduler是一个开源的分布式工作流调度系统,旨在帮助用户以可靠、高效和可扩展的方式管理和调度大规模的数据处理工作流。它支持以图形化方式定义和管理工作流,提供了丰富的调度功能和监控...

简单可靠高效的分布式任务队列系统

#记录我的2024#大家好,又见面了,我是GitHub精选君!背景介绍在系统访问量逐渐增大,高并发、分布式系统成为了企业技术架构升级的必由之路。在这样的背景下,异步任务队列扮演着至关重要的角色,...

虚拟服务器之间如何分布式运行?(虚拟服务器部署)

  在云计算和虚拟化技术快速发展的今天,传统“单机单任务”的服务器架构早已难以满足现代业务对高并发、高可用、弹性伸缩和容错容灾的严苛要求。分布式系统应运而生,并成为支撑各类互联网平台、企业信息系统和A...

一文掌握 XXL-Job 的 6 大核心组件

XXL-Job是一个分布式任务调度平台,其核心组件主要包括以下部分,各组件相互协作实现高效的任务调度与管理:1.调度注册中心(RegistryCenter)作用:负责管理调度器(Schedule...

京东大佬问我,SpringBoot中如何做延迟队列?单机与分布式如何做?

京东大佬问我,SpringBoot中如何做延迟队列?单机如何做?分布式如何做呢?并给出案例与代码分析。嗯,用户问的是在SpringBoot中如何实现延迟队列,单机和分布式环境下分别怎么做。这个问题其实...

企业级项目组件选型(一)分布式任务调度平台

官网地址:https://www.xuxueli.com/xxl-job/能力介绍架构图安全性为提升系统安全性,调度中心和执行器进行安全性校验,双方AccessToken匹配才允许通讯;调度中心和执...

python多进程的分布式任务调度应用场景及示例

多进程的分布式任务调度可以应用于以下场景:分布式爬虫:importmultiprocessingimportrequestsdefcrawl(url):response=re...

SpringBoot整合ElasticJob实现分布式任务调度

介绍ElasticJob是面向互联网生态和海量任务的分布式调度解决方案,由两个相互独立的子项目ElasticJob-Lite和ElasticJob-Cloud组成。它通过弹性调度、资源管控、...

分布式可视化 DAG 任务调度系统 Taier 的整体流程分析

Taier作为袋鼠云的开源项目之一,是一个分布式可视化的DAG任务调度系统。旨在降低ETL开发成本,提高大数据平台稳定性,让大数据开发人员可以在Taier直接进行业务逻辑的开发,而不用关...

SpringBoot任务调度:@Scheduled与TaskExecutor全面解析

一、任务调度基础概念1.1什么是任务调度任务调度是指按照预定的时间计划或特定条件自动执行任务的过程。在现代应用开发中,任务调度扮演着至关重要的角色,它使得开发者能够自动化处理周期性任务、定时任务和异...

取消回复欢迎 发表评论: