之前学习 ThreadPool 的使用以及源码剖析,并且从面试的角度去介绍知识点的解答。今天给大家介绍下周期性线程池的使用和重点源码剖析。
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor:用来处理延时任务或定时任务
定时线程池类的类结构图
ScheduledThreadPoolExecutor 接收 ScheduleFutureTask 类型的任务,是线程池调度任务的最小单位。
它采用 DelayQueue 存储等待的任务:
1、DelayQueue 内部封装成一个 PriorityQueue,它会根据 time 的先后时间顺序,如果 time 相同则根绝 sequenceNumber 排序;
2、DelayQueue 是无界队列;
ScheduleFutureTask
接收的参数:
private final long sequenceNumber;// 任务的序号
private long time;// 任务开始的时间
private final long period;// 任务执行的时间间隔
工作线程的的执行过程:
工作线程会从 DelayQueue 取出已经到期的任务去执行;
执行结束后重新设置任务的到期时间,再次放回 DelayQueue;
ScheduledThreadPoolExecutor 会把待执行的任务放到工作队列 DelayQueue 中,DelayQueue 封装了一个 PriorityQueue,PriorityQueue 会对队列中的 ScheduledFutureTask 进行排序,具体的排序算法实现如下:
public int compareTo(Delayed other) {if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
// 首先按照 time 排序,time 小的排到前面,time 大的排到后面
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
//time 相同,按照 sequenceNumber 排序;
//sequenceNumber 小的排在前面,sequenceNumber 大的排在后面
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
接下来看看 ScheduledFutureTask 的 run 方法实现,run 方法是调度 task 的核心,task 的执行实际上是 run 方法的执行。
public void run() {
// 是否是周期性的
boolean periodic = isPeriodic();
// 线程池是 shundown 状态不支持处理新任务,直接取消任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 如果不需要执行执行周期性任务,直接执行 run 方法结束
else if (!periodic)
ScheduledFutureTask.super.run();
// 如果需要周期性执行,则在执行任务完成后,设置下一次执行时间
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置下一次执行该任务的时间
setNextRunTime();
// 重复执行该任务
reExecutePeriodic(outerTask);
}
}
run 方法的执行步骤:
- 1、如果线程池是 shundown 状态不支持处理新任务,直接取消任务,否则步骤 2;
- 2、如果不是周期性任务,直接调用 ScheduledFutureTask 的 run 方法执行,会设置执行结果,然后直接返回,否则步骤 3;
- 3、如果是周期性任务,调用 ScheduledFutureTask 的 runAndset 方法执行,不会设置执行结果,然后直接返回,否则执行步骤 4 和步骤 5;
- 4、计算下一次执行该任务的时间;
- 5、重复执行该任务;
接下来看下 reExecutePeriodic 方法的执行步骤:
void reExecutePeriodic(RunnableScheduledFuture<?> task) {if (canRunInCurrentRunState(true)) {super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();}
}
由于已经执行过一次周期性任务,所以不会 reject 当前任务,同时传入的任务一定是周期性任务。
周期性线程池任务的提交方式
周期性有三种提交的方式:schedule、sceduleAtFixedRate、schedlueWithFixedDelay。下面从使用和源码两个方面进行说明,首先是如果提交任务:
pool.schedule(new Runnable() {
@Override
public void run() {System.out.println("延迟执行");
}
},1, TimeUnit.SECONDS);
/**
* 这个执行周期是固定,不管任务执行多长时间,每过 3 秒中就会产生一个新的任务
*/
pool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// 这个业务逻辑需要很长的时间,超过了 3 秒
System.out.println("重复执行");
}
},1,3,TimeUnit.SECONDS);
pool.shutdown();
/**
* 假如 run 方法 30min 后执行完成,然后间隔 3 秒,再周期性执行下一个任务
*/
pool.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
//30min
System.out.println("重复执行");
}
},1,3,TimeUnit.SECONDS);
知道了如何提交周期性任务,接下来源码是如何执行的,首先是 schedule 方法,该方法是指任务在指定延迟时间到达后触发,只会执行一次。
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {if (command == null || unit == null)
throw new NullPointerException();
// 把任务封装成 ScheduledFutureTask,之后调用 decorateTask 进行包装;//decorateTask 方法是空方法,留给用户去实现的;RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
// 包装好任务之后,进行任务的提交
delayedExecute(t);
return t;
}
任务提交方法:
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 如果线程池不是 RUNNING 状态,则使用拒绝策略把提交任务拒绝掉
if (isShutdown())
reject(task);
else {
// 与 ThreadPoolExecutor 不同,这里直接把任务加入延迟队列
super.getQueue().add(task);
// 如果当前状态无法执行任务,则取消
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 和 ThreadPoolExecutor 不一样,corePoolSize 没有达到会增加 Worker;
// 增加 Worker,确保提交的任务能够被执行
ensurePrestart();}
}
还没关注我的公众号?
- 扫文末二维码关注公众号【小强的进阶之路】可领取如下:
- 学习资料:1T 视频教程:涵盖 Javaweb 前后端教学视频、机器学习 / 人工智能教学视频、Linux 系统教程视频、雅思考试视频教程;
- 100 多本书:包含 C /C++、Java、Python 三门编程语言的经典必看图书、LeetCode 题解大全;
- 软件工具:几乎包括你在编程道路上的可能会用到的大部分软件;
- 项目源码:20 个 JavaWeb 项目源码。