1. 背景

定时工作是理论开发中常见的一类性能,例如每天早上凌晨对前一天的注册用户数量、渠道起源进行统计,并以邮件报表的形式发送给相干人员。置信这样的需要,每个开发搭档都解决过。

你能够应用 Linux 的 Crontab 启动应用程序进行解决,或者间接应用 Spring 的 Schedule 对工作进行调度,还能够应用散布式调度零碎,如果 xxl-job 等。置信你曾经驾轻就熟、司空见惯。直到有一天你接到了一个新需要:

  1. 新建一组工作,周期性的执行指定 SQL 并将后果以邮件的形式发送给特定人群;
  2. 比拟不便的对工作进行治理,比方 启动、进行,批改调度周期等;
  3. 动静增加、移除工作,不须要频繁的批改、公布程序;
进展几分钟,简略思考一下,有哪几种实现思路呢?

本篇文章将从一下几局部进行探讨:

  1. Spring Schedule 配置和应用。首先咱们将介绍 Demo 的骨架,并基于 Spring-Boot 实现 Schedule 的配置;
  2. 数据库定时轮询计划。应用 Spring Schedule 定时轮询 数据库,并执行相应工作。在执行工作策略中,咱们将尝试同步和异步执行两种计划,并对其优缺点进行剖析;
  3. 基于 TaskScheduler 动静配置计划。基于数据库 轮询 或 配置核心 两种计划动静的对 Spring TaskScheduler 进行配置,以实现动静治理工作的目标;
  4. 咱们进入分布式环境,利用多个冗余节点解决零碎高可用问题,同时应用分布式锁保障只会有一个工作同时执行;

2. Spring Schedule

Spring Boot 上的 Schedule 的应用非常简单,无需减少新的依赖,只需简略配置即可。
  1. 应用 @EnableScheduling 启用 Schedule;
  2. 在要调度的办法上减少 @Scheduled;

首先,咱们须要在启动类上增加 @EnableScheduling 注解,该注解将启用 SchedulingConfiguration 配置类帮咱们实现最根本的配置。

@SpringBootApplication@EnableSchedulingpublic class ConfigurableScheduleDemoApplication {    public static void main(String[] args) {        SpringApplication.run(ConfigurableScheduleDemoApplication.class, args);    }}

启用Schedule配置之后,在须要被调度的办法上减少 @Scheduled 注解。

@Servicepublic class SpringScheduleService {    @Autowired    private TaskService taskService;    @Scheduled(fixedDelay = 5 * 1000, initialDelay = 1000)    public void runTask(){        TaskConfig taskConfig = TaskConfig.builder()                .name("Spring Default Schedule")                .build();        this.taskService.runTask(taskConfig);    }}

runTask 工作提早 1s 进行初始化,并以 5s 为距离进行调度。

Scheduled 注解类的具体配置如下:

配置含意样例
cronlinux crontab 表达式@Scheduled(cron="/5 * MON-FRI") 工作日,每 5 s 调度一次
fixedDelay固定距离,上次运行完结,与下次启动运行,相隔固定时长@Scheduled(fixedDelay=5000) 运行完结后,5S 后启动一次调度
fixedDelayString与 fixedDelay 统一
fixedRate固定周期,前后两次运行相隔固定的时长@Scheduled(fixedRate=5000) 前后两个工作,距离 5 秒
fixedRateString与 fixedRate 统一
initialDelay第一次执行,间隔时间@Scheduled(initialDelay=1000, fixedRate=5000) 第一次执行,延时 1 秒,当前以 5 秒为周期进行调度
initialDelayString与 initialDelay 统一

环境搭建实现,让咱们开始第一个计划。

3. 数据库定时轮询

应用数据库来治理工作,通过轮询的计划,进行动静调度。首先,咱们看下最简略的计划:串行执行计划。

3.1. 串行执行计划

整体思路非常简单,流程如下:

次要分如下几步:

  1. 在利用中启动一个 Schedule 工作(每 1 秒调度一次),定时从 数据库 中获取待执行的工作(状态为可用,下一次执行工夫小于以后工夫);
  2. 依据数据库的工作配置信息,顺次遍历并执行工作;
  3. 工作执行实现后,通过计算取得下一次调度工夫,将其写回到数据库;
  4. 期待下一次任务调度。

外围代码如下:

@Scheduled(fixedDelay = 1000, initialDelay = 1000)public void loadAndRunTask(){    Date now = new Date();    // 加载须要运行的工作:    // 1. 状态为 ENABLE    // 2. 下一次运行工夫 小于 以后工夫    List<TaskDefinitionV2> shouldRunTasks = loadShouldRunTasks(now);    // 顺次遍历待运行工作,执行对于的工作    for (TaskDefinitionV2 task : shouldRunTasks){        // Double Check        if (task.shouldRun(now)){            // 执行工作            runTask(task);            // 更新工作的下一次运行工夫            updateNextRunTime(task, now);        }    }}

计划简略但十分无效,那该计划存在哪些问题呢?
最次要的问题就是:工作串行执行,会导致前面工作呈现延时运行;同时,下一轮查看也会被 delay。

例如,顺次加载了待执行工作 task1、task2、task3。其中 task1 耗时 5 秒,task2 耗时 5 秒,task3 耗时 1 秒,因为三个工作串行执行,task2 将延时 5 秒,task3 延时 10秒;下一轮查看距上次启动相差 12 秒。

究其基本,外围问题是 调度线程 和 运行线程 是同一个线程,调度的运行 和 工作的运行相互影响。

让咱们看一个改良计划:并行执行计划。

3.2. 并行执行计划

整体执行流程如下:

相比之前的计划,新计划引入了线程池,每一个工作对应一个线程池,防止工作间的相互影响;工作在线程池中异步解决,防止了调度线程的延时。具体流程如下:

  1. 步骤一不变,在利用中启动一个 Schedule 工作(每 1 秒调度一次),定时从 数据库 中获取待执行的工作(状态为可用,下一次执行工夫小于以后工夫);
  2. 顺次遍历工作,将工作提交到专有线程池中异步执行,调度线程间接返回;
  3. 工作在线程池中运行,完结后更新下一次的运行工夫;
  4. 调度线程从新从数据库中获取待执行工作,在将工作提交至线程池中,如果有工作正在执行,应用线程池回绝策略,摈弃最老的工作;

外围代码如下:

Spring 调度工作,每 1 秒运行一次:

@Scheduled(fixedDelay = 1000, initialDelay = 1000)public void loadAndRunTask(){    Date now = new Date();    // 加载所有待运行的工作    // 1. 状态为 ENABLE    // 2. 下一次运行工夫小于 以后工夫    List<TaskDefinitionV2> shouldRunTasks = loadShouldRunTasks(now);    // 遍历待运行工作    for (TaskDefinitionV2 task : shouldRunTasks){        // 1. 依据 Task Id 获取工作对应的线程池        // 2. 将工作提交至线程池中        this.executorServiceForTask(task.getId())                .submit(new TaskRunner(task.getId()));    }}

自定义线程池,每个线程池最多只有一个线程,闲暇超过 10 秒后,线程主动回收,线程饱和时,间接抛弃最老的工作:

private ExecutorService executorServiceForTask(Long taskId){    return this.executorServiceRegistry.computeIfAbsent(taskId, id->{        BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()                // 指定线程池名称                .namingPattern("Async-Task-"+ taskId +"-Thread-%d")                // 设置线程为 后盾线程                .daemon(true)                .build();        // 线程池外围配置:        // 1. 每个线程池最多只有一个线程        // 2. 线程闲暇超过 10秒 进行主动回收        // 3. 间接应用交互器,线程闲暇进行工作交互        // 4. 应用指定的线程工厂,设置线性名称        // 5. 线程池饱和,主动抛弃最老的工作        return new ThreadPoolExecutor(0, 1,                10L, TimeUnit.SECONDS,                new SynchronousQueue<>(),                threadFactory,                new ThreadPoolExecutor.DiscardOldestPolicy()        );    });}

最初,在线程池中运行的 Task 如下:

private class TaskRunner implements Runnable {    private final Date now = new Date();    private final Long taskId;    public TaskRunner(Long taskId) {        this.taskId = taskId;    }    @Override    public void run() {        // 从新加载工作,放弃最新的工作状态        TaskDefinitionV2 task = definitionV2Repository.findById(this.taskId).orElse(null);        if (task != null && task.shouldRun(now)){            // 运行工作            runTask(task);            // 更新工作的下一次运行工夫            updateNextRunTime(task, now);        }    }}

4. TaskScheduler 配置计划

该计划的外围为:绕过 @Schedule 注解,间接对 Spring 底层外围 类 TaskScheduler 进行配置。

TaskScheduler 接口是 Spring 对调度工作的一个形象,更是 @Schedule 背地默默的支持者,首先咱们看下这个接口定义。

public interface TaskScheduler {    ScheduledFuture schedule(Runnable task, Trigger trigger);    ScheduledFuture schedule(Runnable task, Instant startTime);    ScheduledFuture schedule(Runnable task, Date startTime);    ScheduledFuture scheduleAtFixedRate(Runnable task, Instant startTime, Duration period);    ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period);    ScheduledFuture scheduleAtFixedRate(Runnable task, Duration period);    ScheduledFuture scheduleAtFixedRate(Runnable task, long period);    ScheduledFuture scheduleWithFixedDelay(Runnable task, Instant startTime, Duration delay);    ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay);    ScheduledFuture scheduleWithFixedDelay(Runnable task, Duration delay);    ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay);}

满满的都是 schedule 接口,其余的比较简单就不过多叙述了,重点说下 Trigger 这个接口,首先看下这个接口的定义:

public interface Trigger {    Date nextExecutionTime(TriggerContext triggerContext);}

只有一个办法,获取下次执行的工夫。在工作执行实现后,会调用 Trigger 的 nextExecutionTime 获取下一次运行工夫,从而实现周期性调度。

CronTrigger 是 Trigger 的最常见实现,以 linux crontab 的形式配置调度工作,如:

scheduler.schedule(task, new CronTrigger("0 15 9-17 * * MON-FRI"));

根底局部简略介绍到这,让咱们看下数据库动静配置计划。

4.1 数据库动静配置计划

整体设计如下:

仍旧是轮询数据库形式,具体流程如下:

  1. 在利用中启动一个 Schedule 工作(每 1 秒调度一次),定时从 数据库 中获取所有工作;
  2. 顺次遍历工作,与内存中的 TaskEntry(工作与状态) 进行比对,动静的向 TaskScheduler 中 增加 或 勾销 调度工作;
  3. 由 TaskScheduler 负责理论的任务调度;

外围代码如下:

@Scheduled(fixedDelay = 1000, initialDelay = 1000)public void loadAndConfig(){    // 加载所有的工作信息    List<TaskDefinitionV3> tasks = repository.findAll();    // 遍历工作进行工作查看    for (TaskDefinitionV3 task : tasks){        // 获取内存工作状态        TaskEntry taskEntry = this.taskEntry.computeIfAbsent(task.getId(), TaskEntry::new);        if (task.isEnable() && taskEntry.isStop()){            // 工作为可用,运行状态为进行,则从新进行 schedule 注册            ScheduledFuture<?> scheduledFuture = this.taskScheduler.scheduleWithFixedDelay(new TaskRunner(task), task.getDelay() * 1000);            taskEntry.setScheduledFuture(scheduledFuture);            log.info("success to start schedule task for {}", task);        }else if (task.isDisable() && taskEntry.isRunning()){            // 工作为禁用,运行状态为运行中,进行正在运行在工作            taskEntry.stop();            log.info("success to stop schedule task for {}", task);        }    }}

外围辅助类:

@Dataprivate class TaskEntry{    private final Long taskId;    private ScheduledFuture scheduledFuture;    private TaskEntry(Long taskId) {        this.taskId = taskId;    }    /**     * 内存状态 scheduledFuture 为 null,则没有运行的工作     * @return     */    public boolean isStop() {        return scheduledFuture == null;    }    /**     * 内存状态 scheduledFuture 不为 null,则存在运行的工作     * @return     */    public boolean isRunning() {        return scheduledFuture != null;    }    /**     * 进行调度工作 <br />     * 1. 内存状态设置为 null     * 2. 调用 scheduledFuture#cancel 终止正在运行的调度工作     */    public void stop() {        ScheduledFuture scheduledFuture = this.scheduledFuture;        this.scheduledFuture = null;        scheduledFuture.cancel(true);    }}

有没有发现,以上计划都有一个独特的缺点:基于数据库轮询获取工作,加大了数据库压力。实践上,只有在配置发生变化时才有必要对工作进行更新,接下来让咱们看下改良计划:基于配置核心的计划。

4.2 配置核心告诉计划

整体设计如下:

外围流程如下:

  1. 利用启动时,从配置核心中获取 调度的配置信息,并实现对 TaskScheduler 的配置;
  2. 当配置发送变动时,配置核心会被动将配置推送到 应用程序,应用程序在接管到变动告诉时,动静的减少或勾销调度工作;
  3. 工作的理论调度仍旧由 TaskScheduler 实现。

因为手底下没有配置核心,临时没有 coding,思路很简略,有条件的同学能够自行实现。

5. 分布式环境下利用

以上计划,都是在单机环境下运行,如果应用程序挂掉了,任务调度也就进行了,为了防止这种状况的产生,须要晋升零碎的可用性,实现 冗余部署 和 自动化容灾。

以上计划,如果部署多个节点会产生什么?是的,会呈现工作被屡次调度的问题,为了保障在同一时刻只有一个工作在运行,须要为工作减少一个排他锁。同时,因为排他锁的存在,当一个节点处问题后,另一个节点在调度时会主动获取锁,从而解零碎的单点问题。

为了简略,咱们应用 Redis 的分布式锁。

5.1. 环境搭建

Redisson 是 Redis 的一个富客户端,提供了很多高级的数据结构。本次,咱们将应用 RLock 对利用进行爱护。

首先,在 pom 中引入 Redisson Starter。

<dependency>    <groupId>org.redisson</groupId>    <artifactId>redisson-spring-boot-starter</artifactId>    <version>3.16.2</version></dependency>

而后,在 application.properties 文件中减少 Redis 配置,具体如下:

spring.redis.host=127.0.0.1spring.redis.port=6379spring.redis.database=0

5.2 引入分布式锁

最初,就能够间接应用 分布式锁 对工作执行进行爱护了,代码如下:

@Scheduled(fixedDelay = 1000, initialDelay = 1000)public void loadAndRunTaskWithLock(){    Date now = new Date();    // 加载须要运行的工作:    // 1. 状态为 ENABLE    // 2. 下一次运行工夫 小于 以后工夫    List<TaskDefinitionV2> shouldRunTasks = loadShouldRunTasks(now);    // 顺次遍历待运行工作,执行对于的工作    for (TaskDefinitionV2 task : shouldRunTasks){        // Double Check        if (task.shouldRun(now)){            // 获取分布式锁,用于保障每个工作只能有一个正在运行            RLock lock = this.redissonClient.getFairLock("LoadAndRunScheduleService-" + task.getId());            if (lock.tryLock()) {                // 胜利获取锁,运行工作                try {                    log.info("Success to get Lock, begin to run task {}", task.getId());                    // 执行工作                    runTask(task);                    // 更新工作的下一次运行工夫                    updateNextRunTime(task, now);                    log.info("Success to run task {}", task.getId());                }finally {                    // 工作运行解释,开释锁                    lock.unlock();                }            }else {                // 未获取锁,打印日志,不做额定解决                log.info("Failed to get Lock!!!!");            }        }    }}

备注:

Redis 是典型的 AP 利用,而分布式锁严格意义上来说是 CP。所以基于 Redis 的分布式锁只能应用在非严格环境中,比方咱们的数据报表需要。如果设计金钱,须要应用 CP 实现,如 Zookeeper 或 etcd 等。

6. 小结

本文从 Spring 的 Schedule 登程,顺次对数据库轮询计划、TaskScheduler 配置计划 进行具体解说,以实现对调度工作的可配置化。最初,应用 Redis 分布式锁无效解决了分布式环境下工作反复调度和主动容灾问题。

仍旧是那句话,架构设计没有更好,只有最适宜。同学们能够依据本人的需要自取。

最初,附上源码 源码