乐趣区

关于java:动态配置的-Schedule-设计

1. 背景

定时工作是理论开发中常见的一类性能,例如每天早上凌晨对前一天的注册用户数量、渠道起源进行统计,并以邮件报表的形式发送给相干人员。置信这样的需要,每个开发搭档都解决过。

你能够应用 Linux 的 Crontab 启动应用程序进行解决,或者间接应用 Spring 的 Schedule 对工作进行调度,还能够应用散布式调度零碎,如果 xxl-job 等。置信你曾经驾轻就熟、司空见惯。直到有一天你接到了一个新需要:

  1. 新建一组工作,周期性的执行指定 SQL 并将后果以邮件的形式发送给特定人群;
  2. 比拟不便的对工作进行治理,比方 启动、进行,批改调度周期等;
  3. 动静增加、移除工作,不须要频繁的批改、公布程序;

进展几分钟,简略思考一下,有哪几种实现思路呢?

本篇文章将从一下几局部进行探讨:

  1. Spring Schedule 配置和应用。首先咱们将介绍 Demo 的骨架,并基于 Spring-Boot 实现 Schedule 的配置;
  2. 数据库定时轮询计划。应用 Spring Schedule 定时轮询 数据库,并执行相应工作。在执行工作策略中,咱们将尝试同步和异步执行两种计划,并对其优缺点进行剖析;
  3. 基于 TaskScheduler 动静配置计划。基于数据库 轮询 或 配置核心 两种计划动静的对 Spring TaskScheduler 进行配置,以实现动静治理工作的目标;
  4. 咱们进入分布式环境,利用多个冗余节点解决零碎高可用问题,同时应用分布式锁保障只会有一个工作同时执行;

2. Spring Schedule

Spring Boot 上的 Schedule 的应用非常简单,无需减少新的依赖,只需简略配置即可。

  1. 应用 @EnableScheduling 启用 Schedule;
  2. 在要调度的办法上减少 @Scheduled;

首先,咱们须要在启动类上增加 @EnableScheduling 注解,该注解将启用 SchedulingConfiguration 配置类帮咱们实现最根本的配置。

@SpringBootApplication
@EnableScheduling
public class ConfigurableScheduleDemoApplication {public static void main(String[] args) {SpringApplication.run(ConfigurableScheduleDemoApplication.class, args);
    }

}

启用 Schedule 配置之后,在须要被调度的办法上减少 @Scheduled 注解。

@Service
public class SpringScheduleService {
    @Autowired
    private TaskService taskService;

    @Scheduled(fixedDelay = 5 * 1000, initialDelay = 1000)
    public void runTask(){TaskConfig taskConfig = TaskConfig.builder()
                .name("Spring Default Schedule")
                .build();
        this.taskService.runTask(taskConfig);
    }
}

runTask 工作提早 1s 进行初始化,并以 5s 为距离进行调度。

Scheduled 注解类的具体配置如下:

配置 含意 样例
cron linux crontab 表达式 @Scheduled(cron=”/5 * MON-FRI”) 工作日,每 5 s 调度一次
fixedDelay 固定距离,上次运行完结,与下次启动运行,相隔固定时长 @Scheduled(fixedDelay=5000) 运行完结后,5S 后启动一次调度
fixedDelayString 与 fixedDelay 统一
fixedRate 固定周期,前后两次运行相隔固定的时长 @Scheduled(fixedRate=5000) 前后两个工作,距离 5 秒
fixedRateString 与 fixedRate 统一
initialDelay 第一次执行,间隔时间 @Scheduled(initialDelay=1000, fixedRate=5000) 第一次执行,延时 1 秒,当前以 5 秒为周期进行调度
initialDelayString 与 initialDelay 统一

环境搭建实现,让咱们开始第一个计划。

3. 数据库定时轮询

应用数据库来治理工作,通过轮询的计划,进行动静调度。首先,咱们看下最简略的计划:串行执行计划。

3.1. 串行执行计划

整体思路非常简单,流程如下:

次要分如下几步:

  1. 在利用中启动一个 Schedule 工作(每 1 秒调度一次),定时从 数据库 中获取待执行的工作(状态为可用,下一次执行工夫小于以后工夫);
  2. 依据数据库的工作配置信息,顺次遍历并执行工作;
  3. 工作执行实现后,通过计算取得下一次调度工夫,将其写回到数据库;
  4. 期待下一次任务调度。

外围代码如下:

@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void loadAndRunTask(){Date now = new Date();
    // 加载须要运行的工作:// 1. 状态为 ENABLE
    // 2. 下一次运行工夫 小于 以后工夫
    List<TaskDefinitionV2> shouldRunTasks = loadShouldRunTasks(now);

    // 顺次遍历待运行工作,执行对于的工作
    for (TaskDefinitionV2 task : shouldRunTasks){
        // Double Check
        if (task.shouldRun(now)){
            // 执行工作
            runTask(task);

            // 更新工作的下一次运行工夫
            updateNextRunTime(task, now);
        }
    }
}

计划简略但十分无效,那该计划存在哪些问题呢?
最次要的问题就是:工作串行执行,会导致前面工作呈现延时运行;同时,下一轮查看也会被 delay。

例如,顺次加载了待执行工作 task1、task2、task3。其中 task1 耗时 5 秒,task2 耗时 5 秒,task3 耗时 1 秒,因为三个工作串行执行,task2 将延时 5 秒,task3 延时 10 秒;下一轮查看距上次启动相差 12 秒。

究其基本,外围问题是 调度线程 和 运行线程 是同一个线程,调度的运行 和 工作的运行相互影响。

让咱们看一个改良计划:并行执行计划。

3.2. 并行执行计划

整体执行流程如下:

相比之前的计划,新计划引入了线程池,每一个工作对应一个线程池,防止工作间的相互影响;工作在线程池中异步解决,防止了调度线程的延时。具体流程如下:

  1. 步骤一不变,在利用中启动一个 Schedule 工作(每 1 秒调度一次),定时从 数据库 中获取待执行的工作(状态为可用,下一次执行工夫小于以后工夫);
  2. 顺次遍历工作,将工作提交到专有线程池中异步执行,调度线程间接返回;
  3. 工作在线程池中运行,完结后更新下一次的运行工夫;
  4. 调度线程从新从数据库中获取待执行工作,在将工作提交至线程池中,如果有工作正在执行,应用线程池回绝策略,摈弃最老的工作;

外围代码如下:

Spring 调度工作,每 1 秒运行一次:

@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void loadAndRunTask(){Date now = new Date();
    // 加载所有待运行的工作
    // 1. 状态为 ENABLE
    // 2. 下一次运行工夫小于 以后工夫
    List<TaskDefinitionV2> shouldRunTasks = loadShouldRunTasks(now);
    // 遍历待运行工作
    for (TaskDefinitionV2 task : shouldRunTasks){
        // 1. 依据 Task Id 获取工作对应的线程池
        // 2. 将工作提交至线程池中
        this.executorServiceForTask(task.getId())
                .submit(new TaskRunner(task.getId()));
    }
}

自定义线程池,每个线程池最多只有一个线程,闲暇超过 10 秒后,线程主动回收,线程饱和时,间接抛弃最老的工作:

private ExecutorService executorServiceForTask(Long taskId){
    return this.executorServiceRegistry.computeIfAbsent(taskId, id->{BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
                // 指定线程池名称
                .namingPattern("Async-Task-"+ taskId +"-Thread-%d")
                // 设置线程为 后盾线程
                .daemon(true)
                .build();
        // 线程池外围配置:// 1. 每个线程池最多只有一个线程
        // 2. 线程闲暇超过 10 秒 进行主动回收
        // 3. 间接应用交互器,线程闲暇进行工作交互
        // 4. 应用指定的线程工厂,设置线性名称
        // 5. 线程池饱和,主动抛弃最老的工作
        return new ThreadPoolExecutor(0, 1,
                10L, TimeUnit.SECONDS,
                new SynchronousQueue<>(),
                threadFactory,
                new ThreadPoolExecutor.DiscardOldestPolicy());
    });
}

最初,在线程池中运行的 Task 如下:

private class TaskRunner implements Runnable {private final Date now = new Date();
    private final Long taskId;
    public TaskRunner(Long taskId) {this.taskId = taskId;}

    @Override
    public void run() {
        // 从新加载工作,放弃最新的工作状态
        TaskDefinitionV2 task = definitionV2Repository.findById(this.taskId).orElse(null);
        if (task != null && task.shouldRun(now)){
            // 运行工作
            runTask(task);

            // 更新工作的下一次运行工夫
            updateNextRunTime(task, now);
        }
    }
}

4. TaskScheduler 配置计划

该计划的外围为:绕过 @Schedule 注解,间接对 Spring 底层外围 类 TaskScheduler 进行配置。

TaskScheduler 接口是 Spring 对调度工作的一个形象,更是 @Schedule 背地默默的支持者,首先咱们看下这个接口定义。

public interface TaskScheduler {ScheduledFuture schedule(Runnable task, Trigger trigger);

    ScheduledFuture schedule(Runnable task, Instant startTime);

    ScheduledFuture schedule(Runnable task, Date startTime);

    ScheduledFuture scheduleAtFixedRate(Runnable task, Instant startTime, Duration period);

    ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period);

    ScheduledFuture scheduleAtFixedRate(Runnable task, Duration period);

    ScheduledFuture scheduleAtFixedRate(Runnable task, long period);

    ScheduledFuture scheduleWithFixedDelay(Runnable task, Instant startTime, Duration delay);

    ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay);

    ScheduledFuture scheduleWithFixedDelay(Runnable task, Duration delay);

    ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay);
}

满满的都是 schedule 接口,其余的比较简单就不过多叙述了,重点说下 Trigger 这个接口,首先看下这个接口的定义:

public interface Trigger {Date nextExecutionTime(TriggerContext triggerContext);
}

只有一个办法,获取下次执行的工夫。在工作执行实现后,会调用 Trigger 的 nextExecutionTime 获取下一次运行工夫,从而实现周期性调度。

CronTrigger 是 Trigger 的最常见实现,以 linux crontab 的形式配置调度工作,如:

scheduler.schedule(task, new CronTrigger("0 15 9-17 * * MON-FRI"));

根底局部简略介绍到这,让咱们看下数据库动静配置计划。

4.1 数据库动静配置计划

整体设计如下:

仍旧是轮询数据库形式,具体流程如下:

  1. 在利用中启动一个 Schedule 工作(每 1 秒调度一次),定时从 数据库 中获取所有工作;
  2. 顺次遍历工作,与内存中的 TaskEntry(工作与状态)进行比对,动静的向 TaskScheduler 中 增加 或 勾销 调度工作;
  3. 由 TaskScheduler 负责理论的任务调度;

外围代码如下:

@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void loadAndConfig(){
    // 加载所有的工作信息
    List<TaskDefinitionV3> tasks = repository.findAll();
    // 遍历工作进行工作查看
    for (TaskDefinitionV3 task : tasks){
        // 获取内存工作状态
        TaskEntry taskEntry = this.taskEntry.computeIfAbsent(task.getId(), TaskEntry::new);

        if (task.isEnable() && taskEntry.isStop()){
            // 工作为可用,运行状态为进行,则从新进行 schedule 注册
            ScheduledFuture<?> scheduledFuture = this.taskScheduler.scheduleWithFixedDelay(new TaskRunner(task), task.getDelay() * 1000);
            taskEntry.setScheduledFuture(scheduledFuture);
            log.info("success to start schedule task for {}", task);

        }else if (task.isDisable() && taskEntry.isRunning()){
            // 工作为禁用,运行状态为运行中,进行正在运行在工作
            taskEntry.stop();
            log.info("success to stop schedule task for {}", task);
        }
    }
}

外围辅助类:

@Data
private class TaskEntry{
    private final Long taskId;
    private ScheduledFuture scheduledFuture;
    private TaskEntry(Long taskId) {this.taskId = taskId;}

    /**
     * 内存状态 scheduledFuture 为 null,则没有运行的工作
     * @return
     */
    public boolean isStop() {return scheduledFuture == null;}

    /**
     * 内存状态 scheduledFuture 不为 null,则存在运行的工作
     * @return
     */
    public boolean isRunning() {return scheduledFuture != null;}

    /**
     * 进行调度工作 <br />
     * 1. 内存状态设置为 null
     * 2. 调用 scheduledFuture#cancel 终止正在运行的调度工作
     */
    public void stop() {
        ScheduledFuture scheduledFuture = this.scheduledFuture;
        this.scheduledFuture = null;

        scheduledFuture.cancel(true);
    }
}

有没有发现,以上计划都有一个独特的缺点:基于数据库轮询获取工作,加大了数据库压力。实践上,只有在配置发生变化时才有必要对工作进行更新,接下来让咱们看下改良计划:基于配置核心的计划。

4.2 配置核心告诉计划

整体设计如下:

外围流程如下:

  1. 利用启动时,从配置核心中获取 调度的配置信息,并实现对 TaskScheduler 的配置;
  2. 当配置发送变动时,配置核心会被动将配置推送到 应用程序,应用程序在接管到变动告诉时,动静的减少或勾销调度工作;
  3. 工作的理论调度仍旧由 TaskScheduler 实现。

因为手底下没有配置核心,临时没有 coding,思路很简略,有条件的同学能够自行实现。

5. 分布式环境下利用

以上计划,都是在单机环境下运行,如果应用程序挂掉了,任务调度也就进行了,为了防止这种状况的产生,须要晋升零碎的可用性,实现 冗余部署 和 自动化容灾。

以上计划,如果部署多个节点会产生什么?是的,会呈现工作被屡次调度的问题,为了保障在同一时刻只有一个工作在运行,须要为工作减少一个排他锁。同时,因为排他锁的存在,当一个节点处问题后,另一个节点在调度时会主动获取锁,从而解零碎的单点问题。

为了简略,咱们应用 Redis 的分布式锁。

5.1. 环境搭建

Redisson 是 Redis 的一个富客户端,提供了很多高级的数据结构。本次,咱们将应用 RLock 对利用进行爱护。

首先,在 pom 中引入 Redisson Starter。

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.16.2</version>
</dependency>

而后,在 application.properties 文件中减少 Redis 配置,具体如下:

spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.database=0

5.2 引入分布式锁

最初,就能够间接应用 分布式锁 对工作执行进行爱护了,代码如下:

@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void loadAndRunTaskWithLock(){Date now = new Date();
    // 加载须要运行的工作:// 1. 状态为 ENABLE
    // 2. 下一次运行工夫 小于 以后工夫
    List<TaskDefinitionV2> shouldRunTasks = loadShouldRunTasks(now);
    // 顺次遍历待运行工作,执行对于的工作
    for (TaskDefinitionV2 task : shouldRunTasks){
        // Double Check
        if (task.shouldRun(now)){

            // 获取分布式锁,用于保障每个工作只能有一个正在运行
            RLock lock = this.redissonClient.getFairLock("LoadAndRunScheduleService-" + task.getId());
            if (lock.tryLock()) {
                // 胜利获取锁,运行工作
                try {log.info("Success to get Lock, begin to run task {}", task.getId());

                    // 执行工作
                    runTask(task);

                    // 更新工作的下一次运行工夫
                    updateNextRunTime(task, now);
                    log.info("Success to run task {}", task.getId());
                }finally {
                    // 工作运行解释,开释锁
                    lock.unlock();}
            }else {
                // 未获取锁,打印日志,不做额定解决
                log.info("Failed to get Lock!!!!");
            }
        }
    }
}

备注:

Redis 是典型的 AP 利用,而分布式锁严格意义上来说是 CP。所以基于 Redis 的分布式锁只能应用在非严格环境中,比方咱们的数据报表需要。如果设计金钱,须要应用 CP 实现,如 Zookeeper 或 etcd 等。

6. 小结

本文从 Spring 的 Schedule 登程,顺次对数据库轮询计划、TaskScheduler 配置计划 进行具体解说,以实现对调度工作的可配置化。最初,应用 Redis 分布式锁无效解决了分布式环境下工作反复调度和主动容灾问题。

仍旧是那句话,架构设计没有更好,只有最适宜。同学们能够依据本人的需要自取。

最初,附上源码 源码

退出移动版