关于java:项目终于用上了xxljob真香

30次阅读

共计 9615 个字符,预计需要花费 25 分钟才能阅读完成。

大家好,我是不才陈某~

本篇文章次要记录我的项目中遇到的 xxl-job 的实战,心愿能通过这篇文章通知读者们什么是 xxl-job 以及怎么应用 xxl-job 并分享一个实战案例。

文章首发公众号:码猿技术专栏

关注公众号:码猿技术专栏,回复关键词:1111 获取阿里外部 java 性能调优手册!

那么上面先阐明什么是 xxl-job 以及为什么要应用它。

xxl-job 是什么?

XXL-JOB 是一个 分布式任务调度平台,其外围设计指标是开发迅速、学习简略、轻量级、易扩大。

设计思维 是将调度行为形象造成 调度核心 平台,平台自身不承当业务逻辑,而是负责发动 调度申请 后,由 执行器 接管调度申请并执行 工作 ,这里的 工作 形象为 扩散的 JobHandler。通过这种形式即可实现 调度 工作 互相解耦,从而进步零碎整体的稳定性和拓展性。

为了更好了解,这里放一张官网的架构图:

任务调度是什么?

在开发我的项目时大家是否也遇到过相似的场景问题:

  • 零碎须要定时在每天 0 点进行数据备份。
  • 零碎须要在流动开始前几小时预热执行一些前置业务。
  • 零碎须要定时对 MQ 音讯表的发送装填,对发送失败的 MQ 音讯进行弥补从新发送。

这些场景问题都能够通过 任务调度 来解决,任务调度指的是零碎在约定的指定工夫主动去执行指定的工作的过程。

单体零碎 中有许多实现 任务调度 的形式,如多线程形式、Timer 类、Spring Tasks 等等。这里比拟罕用的是 Spring Tasks(通过 @EnableScheduling + @Scheduled 的注解能够自定义定时工作,有趣味的能够去理解一下)

为什么须要分布式任务调度平台?

分布式下,每个服务都能够搭建为集群,这样的益处是能够将工作切片分给每一个服务从而实现并行执行,进步任务调度的解决效率。那么为什么 分布式系统 不能应用 单体零碎 的任务调度实现形式呢。

在集群服务下,如果还是应用每台机器依照单体零碎的任务调度实现形式实现的话,会呈现上面这四个问题:

  1. 怎么做到对工作的管制(如何防止工作反复执行)。
  2. 如果某台机器宕机了,会不会存在工作失落。
  3. 如果要减少服务实例,怎么做到弹性扩容。
  4. 如何做到对任务调度的执行状况对立监测。

通过下面的问题能够理解到分布式系统下须要一个满足高可用、容错治理、负载平衡等性能的任务调度平台来实现任务调度。分布式系统下,也有许多能够实现任务调度的第三方的分布式任务调度零碎,如 xxl-job、Quartz、elastic-job 等等罕用的分布式任务调度零碎。

如何应用 xxl-job

作为开源软件的 xxl-job,能够在 github 或 gitee 上查看和下载 xxl-job 的源码。

上面将介绍我应用 xxl-job 的流程(如果有操作不当的,能够查看官网的中文文档:https://www.xuxueli.com/xxl-job)

dokcer 下装置 xxl-job

1、docker 下拉取 xxl-job 的镜像(这里应用 2.3.1 版本)

docker pull xuxueli/xxl-job-admin:2.3.1

2、创立映射容器的文件目录

mkdir -p -m 777 /mydata/xxl-job/data/applogs

3、在 /mydata/xxl-job 的目录下创立 application.properties 文件

因为 application.properties 的代码过长,这里就不展现了,须要的能够去 gitee 上获取,具体门路如图:

这里须要留神数据库地位的填写:

如果还须要更改端口的能够更改这里:

这里还须要留神告警邮箱和拜访口令(后续 Spring Boot 配置用到):

4、将 tables_xxl-job.sql 文件导入下面步骤 3 指定的数据库(本人填写的那个数据库)

同样因为文件代码过长,这里展现 gitee 上获取的门路图:

5、执行 docker 命令

留神这里的 -p 8088:8088 是因为我更改了后面 application.porperties 文件的端口号为 8088,所以这里我执行的 docker 命令为 -p 8088:8088,如果没有更改的这里肯定要改为 -p 8080:8080

docker run  -p 8088:8088 \
-d --name=xxl-job-admin --restart=always \
-v /mydata/xxl-job/application.properties:/application.properties \
-v /mydata/xxl-job/data/applogs:/data/applogs \
-e PARAMS='--spring.config.location=/application.properties' xuxueli/xxl-job-admin:2.3.1

执行后通过 docker ps 查看是否胜利运行,如果失败能够通过 docker logs xxl-job-admin 查看具体谬误日志。

6、通过 http://192.168.101.25:8088/xxl-job-admin/ 拜访(这里 ip 和端口是本人的)

账号:admin 明码:123456

到这里就算是实现了 xxl-job 在 docker 的搭建。

Spring Boot 我的项目集成 xxl-job

xxl-job 由 调度核心 执行器 组成,下面曾经实现了在 docker 上部署调度核心了,接下来介绍怎么 配置部署执行器我的项目

1、在 Spring Boot 我的项目中导入 maven 依赖

<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>2.3.1</version>
</dependency>

这里须要留神版本号与 xxl-job 版本须要统一,这里我配置的都是 2.3.1 版本。

2、在 Spring Boot 我的项目中配置 application.yml 文件

xxl:
  job:
    admin:
      addresses: http://192.168.101.25:8088/xxl-job-admin
    executor:
      appname: media-process-service
      address:
      ip:
      port: 9999
      logpath: /data/applogs/xxl-job/jobhandler
      logretentiondays: 30
    accessToken: default_token
  • 这里的 xxl.job.admin.addresses 用于指定调度核心的地址。
  • 这里的 xxl.job.accessToken 用于指定拜访口令(也就是后面搭建 xxl-job 中步骤 3 指定的)。
  • 这里的 xxl.job.executor.appname 用于指定执行器的名称(须要与后续配置执行器的名称统一)。
  • 这里的 xxl.job.executor.port 用于指定执行器的端口(执行器实际上是一个内嵌的 Server,默认端口为 9999,配置多个同一服务实例时须要指定不同的执行器端口,否则会端口抵触)。
  • 其余属性只须要照着配置即可(想要理解属性的具体含意能够查看中文文档中的 2.4 配置部署执行器我的项目章节)。

3、编写配置类

/**
 * XXL-JOB 配置类
 *
 * @author 公众号:码猿技术专栏
 */
@Slf4j
@Configuration
public class XxlJobConfig {@Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {log.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
        return xxlJobSpringExecutor;
    }
}

4、调度核心中新增执行器

执行器的配置属性:

  • AppName: 每个执行器集群的惟一标示 AppName,执行器会周期性以 AppName 为对象进行主动注册。可通过该配置主动发现注册胜利的执行器,供任务调度时应用。
  • 名称: 执行器的名称(能够应用中文更好地体现该执行器是用来干嘛的)。
  • 注册形式:调度核心获取执行器地址的形式(个别为了不便能够选用主动注册即可)。

    • 主动注册:执行器主动进行执行器注册,调度核心通过底层注册表能够动静发现执行器机器地址。
    • 手动录入:人工手动录入执行器的地址信息,多地址逗号分隔,供调度核心应用。
  • 机器地址:” 注册形式 ” 为 ” 手动录入 ” 时无效,反对人工保护执行器的地址信息。

5、配置自定义工作

配置自定义工作有许多种模式,如 Bean 模式(基于办法)、Bean 模式(基于类)、GLUE 模式等等。这里介绍通过 Bean 模式(基于办法)是如何自定义工作的(对于其余的模式能够参考官网文档)。

Bean 模式(基于办法)也就是每个工作对应一个办法,通过增加 @XxLJob(value="自定义 JobHandler 名称", init = "JobHandler 初始化办法", destroy = "JobHandler 销毁办法") 注解即可实现定义。

/**
 * 工作解决类
 *
 * @author 公众号:码猿技术专栏
 */
@Component
public class TestJob {
    /**
     * 测试工作
     */
    @XxlJob("testHandler")
    public void testHandler() {XxlJobHelper.handleSuccess("本次测试任务调度胜利");
    }
}
  • 通过注解也能够指定 初始化办法和销毁办法 ,如果不填写能够间接写一个 自定义的 JobHandler 名称 用于前面在调度核心中配置工作时对应工作的 JobHandler 属性值。
  • 能够通过 XxlJobHelper.log 来打印日志,通过调度核心能够查看执行日志的状况。
  • 能够通过 XxlJobHelper.handleFailXxlJobHelper.handleSuccess 手动设置任务调度的后果(不设置时默认后果为胜利状态,除非工作执行时出现异常)。

6、调度核心中新增工作

这里次要留神 Cron 表达式的工夫配置以及 JobHandler 的值须要与自定义工作办法的注解上的 value 属性值统一即可。

对于高级配置这里放一张中文文档的具体阐明(也能够间接去看文档):

须要搭建集群或过期策略等高级玩法时能够进行配置。

到这里就实现了 SpringBoot 集成 xxl-job 实现分布式任务调度的全过程了,接下来会通过一个实战案例来具体看看 xxl-job 的用途。

xxl-job 实战

上面通过一个最近本人在跟着做的学习我的项目中应用到 xxl-job 的场景案例来具体理解一下如何利用 xxl-job 来实现任务调度。

实战背景

以后我的项目须要对上传到分布式文件系统 minio 中的视频文件进行对立格局的视频转码操作,因为自身视频转码操作会带了很大的工夫耗费以及 CPU 的开销,所以思考集群服务下应用 xxl-job 的形式以任务调度的形式定时解决视频转码操作。

这样能够带来两个益处:① 以任务调度的形式,能够使得视频转码操作不会阻塞主线程,防止影响次要业务的吞吐量;② 以集群服务分片接管工作的形式,能够将工作均分给每个机器使得任务调度能够并行执行,进步总任务解决工夫以及升高单台机器 CPU 的开销;

xxl-job 执行流程图

怎么将工作均分给每台服务器?

因为工作执行工夫过长,须要搭建集群服务来做到并行任务调度,从而减小 CPU 的开销,那么怎么均分工作呢?

利用 xxl-job 在集群部署时,配置路由策略中抉择 分片播送 的形式,能够使一次任务调度会播送触发集群中所有的执行器执行一次工作,并且能够向零碎传递分片参数。

利用这一个性能够依据 以后执行器的分片序号和分片总数 来获取对应的工作记录。

先来看看 Bean 模式下怎么获取分片序号和分片总数:

// 分片序号(以后执行器序号)int shardIndex = XxlJobHelper.getShardIndex();
// 分片总数(执行器总数)int shardTotal = XxlJobHelper.getShardTotal();

有了这两个属性,当执行器扫描数据库获取记录时,能够依据 取模 的形式获取属于以后执行器的工作,能够这样编写 sql 获取工作记录:

select * from media_process m
where m.id % #{shareTotal} = #{shareIndex}  
  and (m.status = '1' or m.status = '3')
  and m.fail_count &lt; 3
limit #{count}

扫描工作表,依据工作 id 对分片总数 取模 来实现对所有分片的均分工作,通过判断是否是以后分片序号,并且当前任务状态为 1(未解决)或 3(解决失败)并且当前任务失败次数小于 3 次时能够获得当前任务。每次扫描只取出 count 个工作数(批量解决)。

因而通过 xxl-job 的分片播送 + 取模 的形式即可实现对集群服务均分工作的操作。

怎么确保工作不会被反复生产?

因为视频转码自身解决工夫就会比拟长,所以更不容许服务反复执行,尽管下面通过分片播送+取模的形式进步了工作不会被反复执行的机率,然而仍旧存在如下状况:

如下图,有三台集群机器和六个工作,刚开始调配好了每台机器两个工作,执行器 0 正筹备执行工作 3 时,刚好执行器 2 宕机了,此时执行器 1 刚好执行一次工作,因为分片总数减小,导致 执行器 1 重新分配到须要执行的工作正好也是工作 3 ,那么此时就会呈现 执行器 0 和执行器 1 都在执行工作 3 的状况。

那么这种状况就须要实现幂等性了,幂等性有很多种实现办法,有趣味理解的能够参考:接口幂等性的实现计划

这里应用乐观锁的形式实现幂等性,具体 sql 如下:

update media_process m
set m.status = '2'
where (m.status = '1' or m.status = '3')
  and m.fail_count &lt; 3
  and m.id = #{id}

这里只须要依附工作的状态即可实现(未解决 1;解决中2;解决失败3;解决胜利4),能够看到这里相似于 CAS 的形式 通过比拟和设置的形式只有在状态为未解决或解决失败时能力设置为解决中。这样在并发场景下,即便多个执行器同时解决该工作,也只有一个工作能够设置胜利进入解决工作阶段。

为了真正达到幂等性,还须要设置一下 xxl-job 的调度过期策略和阻塞解决策略来保障真正的幂等性。别离设置为 疏忽 (调度过期后,疏忽过期的工作,从以后工夫开始从新计算下次触发工夫)和 抛弃后续调度(调度申请进入单机执行器后,发现执行器存在运行的调度工作,本次申请将会被抛弃并标记为失败)。


编写实现该性能所需的所有工作

1、分片视频转码解决

代码(这里的代码只展现局部外围步骤代码):

/**
 * 视频转码解决工作
 * 公众号:码猿技术专栏
 */
@XxlJob("videoTranscodingHandler")
public void videoTranscodingHandler() throws InterruptedException {
    // 1. 分片获取以后执行器须要执行的所有工作
    List<MediaProcess> mediaProcessList = mediaProcessService.getMediaProcessList(shardIndex, shardTotal, count);
    // 通过 JUC 工具类阻塞直到所有工作执行完
    CountDownLatch countDownLatch = new CountDownLatch(mediaProcessList.size());
    // 遍历所有工作
    mediaProcessList.forEach(mediaProcess -> {
        // 以多线程的形式执行所有工作
        executor.execute(() -> {
            try {
                // 2. 尝试抢占工作(通过乐观锁实现)boolean res = mediaProcessService.startTask(id);
                if (!res) {XxlJobHelper.log("工作抢占失败,工作 id{}", id);
                    return;
                }
                
                // 3. 从 minio 中下载视频到本地
                File file = mediaFileService.downloadFileFromMinIO(bucket, objectName);
                // 下载失败
                if (file == null) {XxlJobHelper.log("下载视频出错, 工作 id:{},bucket:{},objectName:{}", id, bucket, objectName);
                    // 出现异常重置工作状态为解决失败期待下一次解决
                    mediaProcessService.saveProcessFinishStatus(id, Constants.MediaProcessCode.FAIL.getValue(), fileId, null, "下载视频到本地失败");
                    return;
                }
                
                // 4. 视频转码
                String result = videoUtil.generateMp4();
                if (!result.equals("success")) {XxlJobHelper.log("视频转码失败, 起因:{},bucket:{},objectName:{},", result, bucket, objectName);
                    // 出现异常重置工作状态为解决失败期待下一次解决
                    mediaProcessService.saveProcessFinishStatus(id, Constants.MediaProcessCode.FAIL.getValue(), fileId, null, "视频转码失败");
                    return;
                }
                
                // 5. 上传转码后的文件
                boolean b1 = mediaFileService.addMediaFilesToMinIO(new_File.getAbsolutePath(), "video/mp4", bucket, objectNameMp4);
                if (!b1) {XxlJobHelper.log("上传 mp4 到 minio 失败, 工作 id:{}", id);
                    // 出现异常重置工作状态为解决失败期待下一次解决
                    mediaProcessService.saveProcessFinishStatus(id, Constants.MediaProcessCode.FAIL.getValue(), fileId, null, "上传 mp4 文件到 minio 失败");
                    return;
                }

                // 6. 更新工作状态为胜利
                mediaProcessService.saveProcessFinishStatus(id, Constants.MediaProcessCode.SUCCESS.getValue(), fileId, url, "创立临时文件异样");
                
            } finally {countDownLatch.countDown();
            }
        });
    });
    // 阻塞直到所有办法执行实现(30min 后不再期待)countDownLatch.await(30, TimeUnit.MINUTES);
}

外围工作 – 分片获取工作后执行视频转码工作,步骤如下:

  • 通过 分片播送拿到的参数以取模的形式 获取以后执行器所属的工作记录汇合
  • 遍历汇合,以 多线程的形式 并发地执行工作
  • 每次执行工作前须要先通过 数据库乐观锁的形式 抢占当前任务,抢占到能力执行
  • 执行工作过程分为 分布式文件系统下载须要转码的视频文件 -> 视频转码 -> 上传转码后的视频 -> 更新工作状态(解决胜利)
  • 应用 JUC 工具类 CountDownLatch 实现所有工作执行完后才退出办法
  • 两头应用 xxl-job 的日志记录错误信息和执行后果

2、清理工作表中转码胜利的工作的记录并将其插入工作历史表

因为工作表处理完工作后只是更新工作状态,这样随着工作增多会导致检索起来工夫耗费过大,所以应用任务调度的形式定期扫描工作表,将工作状态为解决胜利的工作删除并从新插入工作历史表中留存(因为代码过于简略,这里就不做展现了)。

次要实现两个性能:① 清理工作表中已胜利解决的工作;② 将解决胜利的工作记录插入历史表中;

3、视频弥补机制

因为应用乐观锁会将工作状态更新为解决中,如果此时执行工作的执行器(服务)宕机了,会导致该工作记录始终存在,因为乐观锁的起因别的执行器也无奈获取,这个时候同样须要应用任务调度的形式,定期扫描工作表,判断工作是否处于解决中状态并且工作创立工夫远大于 30 分钟,则阐明工作超时了 ,则是应用任务调度的形式从新更新工作的状态为未解决,期待下一次视频转码工作的调度解决。此外视频弥补机制任务调度还须要 查看是否存在工作最大次数曾经大于 3 次的,如果存在则交付给人工解决(因为代码过于简略,这里就不做展现了)。

次要实现两个性能:① 解决工作超时状况下的工作,做出弥补;② 解决失败次数大于 3 次的工作,做出弥补;

测试并查看日志

筹备好的工作表记录:

启动三台媒资服务器,并开启工作:

能够独自查看每个工作的日志:

通过日志中的执行日志查看具体日志信息:

能够看到间接为了测试改错的门路导致下载视频出错:

查看数据库表的变动:

到这里能够看到外围的视频转码工作执行胜利,并且逻辑正确,可能起到分布式任务调度的作用。

总结

这就是本次 xxl-job 实战的全部内容了,写这篇文章次要是为了记录一下我的项目中是如何应用 xxl-job 的,并且提供一种分片播送均分工作的思路以及幂等性问题如何解决,具体应用 xxl-job 还需依据本人我的项目的需要,遇到问题能够参考官网

最初说一句(别白嫖,求关注)

陈某每一篇文章都是精心输入,如果这篇文章对你有所帮忙,或者有所启发的话,帮忙 点赞 在看 转发 珍藏,你的反对就是我坚持下去的最大能源!

关注公众号:【码猿技术专栏】,公众号内有超赞的粉丝福利,回复:加群,能够退出技术探讨群,和大家一起探讨技术,吹牛逼!

正文完
 0