共计 8671 个字符,预计需要花费 22 分钟才能阅读完成。
1. 背景
定时工作是理论开发中常见的一类性能,例如每天早上凌晨对前一天的注册用户数量、渠道起源进行统计,并以邮件报表的形式发送给相干人员。置信这样的需要,每个开发搭档都解决过。
你能够应用 Linux 的 Crontab 启动应用程序进行解决,或者间接应用 Spring 的 Schedule 对工作进行调度,还能够应用散布式调度零碎,如果 xxl-job 等。置信你曾经驾轻就熟、司空见惯。直到有一天你接到了一个新需要:
- 新建一组工作,周期性的执行指定 SQL 并将后果以邮件的形式发送给特定人群;
- 比拟不便的对工作进行治理,比方 启动、进行,批改调度周期等;
- 动静增加、移除工作,不须要频繁的批改、公布程序;
进展几分钟,简略思考一下,有哪几种实现思路呢?
本篇文章将从一下几局部进行探讨:
- Spring Schedule 配置和应用。首先咱们将介绍 Demo 的骨架,并基于 Spring-Boot 实现 Schedule 的配置;
- 数据库定时轮询计划。应用 Spring Schedule 定时轮询 数据库,并执行相应工作。在执行工作策略中,咱们将尝试同步和异步执行两种计划,并对其优缺点进行剖析;
- 基于 TaskScheduler 动静配置计划。基于数据库 轮询 或 配置核心 两种计划动静的对 Spring TaskScheduler 进行配置,以实现动静治理工作的目标;
- 咱们进入分布式环境,利用多个冗余节点解决零碎高可用问题,同时应用分布式锁保障只会有一个工作同时执行;
2. Spring Schedule
Spring Boot 上的 Schedule 的应用非常简单,无需减少新的依赖,只需简略配置即可。
- 应用 @EnableScheduling 启用 Schedule;
- 在要调度的办法上减少 @Scheduled;
首先,咱们须要在启动类上增加 @EnableScheduling 注解,该注解将启用 SchedulingConfiguration 配置类帮咱们实现最根本的配置。
@SpringBootApplication
@EnableScheduling
public class ConfigurableScheduleDemoApplication {public static void main(String[] args) {SpringApplication.run(ConfigurableScheduleDemoApplication.class, args);
}
}
启用 Schedule 配置之后,在须要被调度的办法上减少 @Scheduled 注解。
@Service
public class SpringScheduleService {
@Autowired
private TaskService taskService;
@Scheduled(fixedDelay = 5 * 1000, initialDelay = 1000)
public void runTask(){TaskConfig taskConfig = TaskConfig.builder()
.name("Spring Default Schedule")
.build();
this.taskService.runTask(taskConfig);
}
}
runTask 工作提早 1s 进行初始化,并以 5s 为距离进行调度。
Scheduled 注解类的具体配置如下:
配置 | 含意 | 样例 |
---|---|---|
cron | linux crontab 表达式 | @Scheduled(cron=”/5 * MON-FRI”) 工作日,每 5 s 调度一次 |
fixedDelay | 固定距离,上次运行完结,与下次启动运行,相隔固定时长 | @Scheduled(fixedDelay=5000) 运行完结后,5S 后启动一次调度 |
fixedDelayString | 与 fixedDelay 统一 | |
fixedRate | 固定周期,前后两次运行相隔固定的时长 | @Scheduled(fixedRate=5000) 前后两个工作,距离 5 秒 |
fixedRateString | 与 fixedRate 统一 | |
initialDelay | 第一次执行,间隔时间 | @Scheduled(initialDelay=1000, fixedRate=5000) 第一次执行,延时 1 秒,当前以 5 秒为周期进行调度 |
initialDelayString | 与 initialDelay 统一 |
环境搭建实现,让咱们开始第一个计划。
3. 数据库定时轮询
应用数据库来治理工作,通过轮询的计划,进行动静调度。首先,咱们看下最简略的计划:串行执行计划。
3.1. 串行执行计划
整体思路非常简单,流程如下:
次要分如下几步:
- 在利用中启动一个 Schedule 工作(每 1 秒调度一次),定时从 数据库 中获取待执行的工作(状态为可用,下一次执行工夫小于以后工夫);
- 依据数据库的工作配置信息,顺次遍历并执行工作;
- 工作执行实现后,通过计算取得下一次调度工夫,将其写回到数据库;
- 期待下一次任务调度。
外围代码如下:
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void loadAndRunTask(){Date now = new Date();
// 加载须要运行的工作:// 1. 状态为 ENABLE
// 2. 下一次运行工夫 小于 以后工夫
List<TaskDefinitionV2> shouldRunTasks = loadShouldRunTasks(now);
// 顺次遍历待运行工作,执行对于的工作
for (TaskDefinitionV2 task : shouldRunTasks){
// Double Check
if (task.shouldRun(now)){
// 执行工作
runTask(task);
// 更新工作的下一次运行工夫
updateNextRunTime(task, now);
}
}
}
计划简略但十分无效,那该计划存在哪些问题呢?
最次要的问题就是:工作串行执行,会导致前面工作呈现延时运行;同时,下一轮查看也会被 delay。
例如,顺次加载了待执行工作 task1、task2、task3。其中 task1 耗时 5 秒,task2 耗时 5 秒,task3 耗时 1 秒,因为三个工作串行执行,task2 将延时 5 秒,task3 延时 10 秒;下一轮查看距上次启动相差 12 秒。
究其基本,外围问题是 调度线程 和 运行线程 是同一个线程,调度的运行 和 工作的运行相互影响。
让咱们看一个改良计划:并行执行计划。
3.2. 并行执行计划
整体执行流程如下:
相比之前的计划,新计划引入了线程池,每一个工作对应一个线程池,防止工作间的相互影响;工作在线程池中异步解决,防止了调度线程的延时。具体流程如下:
- 步骤一不变,在利用中启动一个 Schedule 工作(每 1 秒调度一次),定时从 数据库 中获取待执行的工作(状态为可用,下一次执行工夫小于以后工夫);
- 顺次遍历工作,将工作提交到专有线程池中异步执行,调度线程间接返回;
- 工作在线程池中运行,完结后更新下一次的运行工夫;
- 调度线程从新从数据库中获取待执行工作,在将工作提交至线程池中,如果有工作正在执行,应用线程池回绝策略,摈弃最老的工作;
外围代码如下:
Spring 调度工作,每 1 秒运行一次:
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void loadAndRunTask(){Date now = new Date();
// 加载所有待运行的工作
// 1. 状态为 ENABLE
// 2. 下一次运行工夫小于 以后工夫
List<TaskDefinitionV2> shouldRunTasks = loadShouldRunTasks(now);
// 遍历待运行工作
for (TaskDefinitionV2 task : shouldRunTasks){
// 1. 依据 Task Id 获取工作对应的线程池
// 2. 将工作提交至线程池中
this.executorServiceForTask(task.getId())
.submit(new TaskRunner(task.getId()));
}
}
自定义线程池,每个线程池最多只有一个线程,闲暇超过 10 秒后,线程主动回收,线程饱和时,间接抛弃最老的工作:
private ExecutorService executorServiceForTask(Long taskId){
return this.executorServiceRegistry.computeIfAbsent(taskId, id->{BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
// 指定线程池名称
.namingPattern("Async-Task-"+ taskId +"-Thread-%d")
// 设置线程为 后盾线程
.daemon(true)
.build();
// 线程池外围配置:// 1. 每个线程池最多只有一个线程
// 2. 线程闲暇超过 10 秒 进行主动回收
// 3. 间接应用交互器,线程闲暇进行工作交互
// 4. 应用指定的线程工厂,设置线性名称
// 5. 线程池饱和,主动抛弃最老的工作
return new ThreadPoolExecutor(0, 1,
10L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
threadFactory,
new ThreadPoolExecutor.DiscardOldestPolicy());
});
}
最初,在线程池中运行的 Task 如下:
private class TaskRunner implements Runnable {private final Date now = new Date();
private final Long taskId;
public TaskRunner(Long taskId) {this.taskId = taskId;}
@Override
public void run() {
// 从新加载工作,放弃最新的工作状态
TaskDefinitionV2 task = definitionV2Repository.findById(this.taskId).orElse(null);
if (task != null && task.shouldRun(now)){
// 运行工作
runTask(task);
// 更新工作的下一次运行工夫
updateNextRunTime(task, now);
}
}
}
4. TaskScheduler 配置计划
该计划的外围为:绕过 @Schedule 注解,间接对 Spring 底层外围 类 TaskScheduler 进行配置。
TaskScheduler 接口是 Spring 对调度工作的一个形象,更是 @Schedule 背地默默的支持者,首先咱们看下这个接口定义。
public interface TaskScheduler {ScheduledFuture schedule(Runnable task, Trigger trigger);
ScheduledFuture schedule(Runnable task, Instant startTime);
ScheduledFuture schedule(Runnable task, Date startTime);
ScheduledFuture scheduleAtFixedRate(Runnable task, Instant startTime, Duration period);
ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period);
ScheduledFuture scheduleAtFixedRate(Runnable task, Duration period);
ScheduledFuture scheduleAtFixedRate(Runnable task, long period);
ScheduledFuture scheduleWithFixedDelay(Runnable task, Instant startTime, Duration delay);
ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay);
ScheduledFuture scheduleWithFixedDelay(Runnable task, Duration delay);
ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay);
}
满满的都是 schedule 接口,其余的比较简单就不过多叙述了,重点说下 Trigger 这个接口,首先看下这个接口的定义:
public interface Trigger {Date nextExecutionTime(TriggerContext triggerContext);
}
只有一个办法,获取下次执行的工夫。在工作执行实现后,会调用 Trigger 的 nextExecutionTime 获取下一次运行工夫,从而实现周期性调度。
CronTrigger 是 Trigger 的最常见实现,以 linux crontab 的形式配置调度工作,如:
scheduler.schedule(task, new CronTrigger("0 15 9-17 * * MON-FRI"));
根底局部简略介绍到这,让咱们看下数据库动静配置计划。
4.1 数据库动静配置计划
整体设计如下:
仍旧是轮询数据库形式,具体流程如下:
- 在利用中启动一个 Schedule 工作(每 1 秒调度一次),定时从 数据库 中获取所有工作;
- 顺次遍历工作,与内存中的 TaskEntry(工作与状态)进行比对,动静的向 TaskScheduler 中 增加 或 勾销 调度工作;
- 由 TaskScheduler 负责理论的任务调度;
外围代码如下:
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void loadAndConfig(){
// 加载所有的工作信息
List<TaskDefinitionV3> tasks = repository.findAll();
// 遍历工作进行工作查看
for (TaskDefinitionV3 task : tasks){
// 获取内存工作状态
TaskEntry taskEntry = this.taskEntry.computeIfAbsent(task.getId(), TaskEntry::new);
if (task.isEnable() && taskEntry.isStop()){
// 工作为可用,运行状态为进行,则从新进行 schedule 注册
ScheduledFuture<?> scheduledFuture = this.taskScheduler.scheduleWithFixedDelay(new TaskRunner(task), task.getDelay() * 1000);
taskEntry.setScheduledFuture(scheduledFuture);
log.info("success to start schedule task for {}", task);
}else if (task.isDisable() && taskEntry.isRunning()){
// 工作为禁用,运行状态为运行中,进行正在运行在工作
taskEntry.stop();
log.info("success to stop schedule task for {}", task);
}
}
}
外围辅助类:
@Data
private class TaskEntry{
private final Long taskId;
private ScheduledFuture scheduledFuture;
private TaskEntry(Long taskId) {this.taskId = taskId;}
/**
* 内存状态 scheduledFuture 为 null,则没有运行的工作
* @return
*/
public boolean isStop() {return scheduledFuture == null;}
/**
* 内存状态 scheduledFuture 不为 null,则存在运行的工作
* @return
*/
public boolean isRunning() {return scheduledFuture != null;}
/**
* 进行调度工作 <br />
* 1. 内存状态设置为 null
* 2. 调用 scheduledFuture#cancel 终止正在运行的调度工作
*/
public void stop() {
ScheduledFuture scheduledFuture = this.scheduledFuture;
this.scheduledFuture = null;
scheduledFuture.cancel(true);
}
}
有没有发现,以上计划都有一个独特的缺点:基于数据库轮询获取工作,加大了数据库压力。实践上,只有在配置发生变化时才有必要对工作进行更新,接下来让咱们看下改良计划:基于配置核心的计划。
4.2 配置核心告诉计划
整体设计如下:
外围流程如下:
- 利用启动时,从配置核心中获取 调度的配置信息,并实现对 TaskScheduler 的配置;
- 当配置发送变动时,配置核心会被动将配置推送到 应用程序,应用程序在接管到变动告诉时,动静的减少或勾销调度工作;
- 工作的理论调度仍旧由 TaskScheduler 实现。
因为手底下没有配置核心,临时没有 coding,思路很简略,有条件的同学能够自行实现。
5. 分布式环境下利用
以上计划,都是在单机环境下运行,如果应用程序挂掉了,任务调度也就进行了,为了防止这种状况的产生,须要晋升零碎的可用性,实现 冗余部署 和 自动化容灾。
以上计划,如果部署多个节点会产生什么?是的,会呈现工作被屡次调度的问题,为了保障在同一时刻只有一个工作在运行,须要为工作减少一个排他锁。同时,因为排他锁的存在,当一个节点处问题后,另一个节点在调度时会主动获取锁,从而解零碎的单点问题。
为了简略,咱们应用 Redis 的分布式锁。
5.1. 环境搭建
Redisson 是 Redis 的一个富客户端,提供了很多高级的数据结构。本次,咱们将应用 RLock 对利用进行爱护。
首先,在 pom 中引入 Redisson Starter。
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.2</version>
</dependency>
而后,在 application.properties 文件中减少 Redis 配置,具体如下:
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.database=0
5.2 引入分布式锁
最初,就能够间接应用 分布式锁 对工作执行进行爱护了,代码如下:
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void loadAndRunTaskWithLock(){Date now = new Date();
// 加载须要运行的工作:// 1. 状态为 ENABLE
// 2. 下一次运行工夫 小于 以后工夫
List<TaskDefinitionV2> shouldRunTasks = loadShouldRunTasks(now);
// 顺次遍历待运行工作,执行对于的工作
for (TaskDefinitionV2 task : shouldRunTasks){
// Double Check
if (task.shouldRun(now)){
// 获取分布式锁,用于保障每个工作只能有一个正在运行
RLock lock = this.redissonClient.getFairLock("LoadAndRunScheduleService-" + task.getId());
if (lock.tryLock()) {
// 胜利获取锁,运行工作
try {log.info("Success to get Lock, begin to run task {}", task.getId());
// 执行工作
runTask(task);
// 更新工作的下一次运行工夫
updateNextRunTime(task, now);
log.info("Success to run task {}", task.getId());
}finally {
// 工作运行解释,开释锁
lock.unlock();}
}else {
// 未获取锁,打印日志,不做额定解决
log.info("Failed to get Lock!!!!");
}
}
}
}
备注:
Redis 是典型的 AP 利用,而分布式锁严格意义上来说是 CP。所以基于 Redis 的分布式锁只能应用在非严格环境中,比方咱们的数据报表需要。如果设计金钱,须要应用 CP 实现,如 Zookeeper 或 etcd 等。
6. 小结
本文从 Spring 的 Schedule 登程,顺次对数据库轮询计划、TaskScheduler 配置计划 进行具体解说,以实现对调度工作的可配置化。最初,应用 Redis 分布式锁无效解决了分布式环境下工作反复调度和主动容灾问题。
仍旧是那句话,架构设计没有更好,只有最适宜。同学们能够依据本人的需要自取。
最初,附上源码 源码