共计 8343 个字符,预计需要花费 21 分钟才能阅读完成。
前言
ScheduledThreadPoolExecutor 是一种带有提早,定时执行工作的线程池。它在很多延时工作、定时工作的场景中有丰盛的利用场景。明天就来剖析下它的实现原理吧。
实现原理
先看看类图构造:
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {}
它继承了 ThreadPoolExecutor 类并实现了 ScheduledExecutorService 接口。其中 ScheduledExecutorService 继承了 ExecutorService,并提供了 schedule 相干的办法:
// 通过 delay 延迟时间后,执行 command 工作
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
// 通过 delay 延迟时间后,执行 callable 工作
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
// 通过 initialDelay 提早后,以 period 的周期时长周期性的执行 command 工作。//(来自官网正文)留神:// 1. 如果某次工作抛出了异样,后续的周期工作不会被执行。// 2. 如果某次工作执行时长超过了 period 周期,那么下一个周期到来时,不会执行新的一轮工作,而是往后推延,// 等到当前任务执行完后再执行,以此来保障屡次工作不会并发执行。public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
long period, TimeUnit unit);
// 通过 initialDelay 提早后,执行工作 command,而后每次等工作执行结束后,提早 delay 时长再执行新的一轮工作。public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
long delay, TimeUnit unit);
其中 scheduleAtFixedRate 和 scheduleWithFixedDelay 容易混同,这里再阐明下它们的区别:
fixed-rate 执行机会:initialDelay + n * period (n >= 0 的整数)
fixed-delay 执行机会:initialDelay, (task1 end time + delay), (task2 end time + delay), … (taskN end time + delay)
由 ScheduledExecutorService 接口还能够看到,这几个办法的返回值都为 ScheduledFuture。它的构造如下:
public interface ScheduledFuture<V> extends Delayed, Future<V> {}
它继承了 Delayed 和 Future 接口。Delayed 中有个 getDelay()办法获取残余延时。Future 大家应该都比拟相熟,它用来示意工作的异步执行后果,能够通过 Future.get()或 Future.isDone()系列办法判断工作的执行状况和后果。
FutureTask 是 Future 的一个常见实现类,它外部有个 state 变量代表工作的执行状态。
private volatile int state;
private static final int NEW = 0; // 初始状态
private static final int COMPLETING = 1; // 执行中
private static final int NORMAL = 2; // 失常运行完结
private static final int EXCEPTIONAL = 3; // 运行中产生异样
private static final int CANCELLED = 4; // 工作被勾销
private static final int INTERRUPTING = 5; // 工作正在被中断
private static final int INTERRUPTED = 6; // 工作曾经被中断
依据官网正文,可能的状态转换如下:
// Possible state transitions:
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED
ScheduledFutureTask 继承了 FutureTask,也是 ScheduledFuture 接口的实现类,它外部还用了 period 来示意周期类型。
依据官网正文,period 的值代表含意如下。
period > 0,示意 fixed-rate 类型的周期工作。
period < 0,示意 fixed-delay 类型的周期工作。
period = 0,示意非周期性工作,即一次性工作。
(ps: 这里个人感觉 period 设计的扩展性不够好,如果后续 JDK 版本想再加一种新的周期类型,period 的值该如何示意呢?
我感觉能够换成枚举值的模式)
有了上述背景常识,再来看要害办法,就会轻松许多了。
1. 构造方法:
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
能够看到,它应用的阻塞队列为 DelayedWorkQueue,它是 ScheduledThreadPoolExecutor 的外部类,性能和 DelayedQueue 相似,也实现了 BlockingQueue 接口,因而能够用于线程池中。
2.schedule()办法:
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
// 工作不能为空
if (command == null || unit == null)
throw new NullPointerException();
// 工作装璜和转换
RunnableScheduledFuture<?> t = decorateTask(command,
// triggerTime 是获取工作触发 (执行) 工夫戳
new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
// 增加工作到执行队列去执行
delayedExecute(t);
return t;
}
其中 new ScheduledFutureTask(…)代码如下:
ScheduledFutureTask(Runnable r, V result, long ns) {
// 调用父类 FutureTask 构造函数初始化 state=NEW
super(r, result);
// 初始化工作触发工夫
this.time = ns;
// 一次性工作
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();}
其中 decorateTask 目前在 ScheduledThreadPoolExecutor 类实现中只是简略返回了 task 自身,但它是 protected 润饰的,容许咱们自定义的子类去覆写这个办法,实现工作的装璜和批改逻辑。
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {return task;}
再来看 delayedExecute()办法代码:
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 线程池一敞开,执行回绝策略
if (isShutdown())
reject(task);
else {
// 将工作增加到阻塞队列中,DelayedWorkQueue,队首元素是延迟时间最短的元素
super.getQueue().add(task);
// 再次查看线程池状态,如果不能运行,从队列中移除工作,并勾销工作
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
// 确保线程池中至多有一个线程在解决工作
else ensurePrestart();}
}
其中 ensurePrestart 代码如下:
void ensurePrestart() {int wc = workerCountOf(ctl.get());
// 减少外围线程数
if (wc < corePoolSize)
addWorker(null, true);
// 以后线程数为 0,也增加一个线程
else if (wc == 0)
addWorker(null, false);
}
下面剖析了工作如何被放入阻塞队列中,接下来剖析下工作线程如何从队列中获取工作并执行。由 ThreadPoolExecutor 能够晓得,Worker 负责从工作队列中循环获取工作,并调用它的 run()办法,这里的工作 Runnable 被包装成了 ScheduledFutureTask,它重写了 run(),所以须要看它的逻辑:
public void run() {
// 是否周期性工作
boolean periodic = isPeriodic();
// 以后线程池状态不能运行工作,则勾销工作
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 如果是一次性工作 (调用 schedule() 会走这个分支),调用父类 FutureTask 的 run()
else if (!periodic)
ScheduledFutureTask.super.run();
// 如果是周期性工作(调用 scheduleAtFixedRate 或 scheduleWithFixedDelay 会走这个分支)
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置下一次执行机会
setNextRunTime();
// 重新加入队列中,周期执行
reExecutePeriodic(outerTask);
}
}
其中,FutureTask 的 run()代码如下:
public void run() {
// 如果工作状态不为 NEW 或者 CAS 设置执行线程为以后线程失败,则返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// 再次查看工作状态是否为 NEW
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 调用指标工作 Callable,取得返回值 result
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 工作执行过程中产生异样,设置异样后果
setException(ex);
}
// 工作失常运行完结,设置失常后果
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run() runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
其中 set(result)和 setException(ex)代码如下:
protected void set(V v) {
// CAS 设置工作状态从 NEW -> COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 设置后果
outcome = v;
// 设置工作状态为失常完结,这里应用 putOrderedInt 效率会比 putIntVolatile 高些
// 且这里不要求设置的 NORMAL 状态对其它线程立刻可见。UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();}
}
// 和下面 set()相似,不做赘述
protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();}
}
3.scheduleWithFixedDelay()
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay, long delay, TimeUnit unit) {if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,null, triggerTime(initialDelay, unit), unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
大体框架和 schedule 相似,但须要留神的是 new ScheduledFutureTask(…)的区别,这里多传了个 unit.toNanos(-delay)参数。它的 period 值为 -delay,后面提到 period < 0 时,代表 fixed-delay 类型的周期工作。
ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();}
这时回到咱们下面剖析的 ScheduledFutureTask 的 run(),会走这个 if 分支。
// 运行并重置
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置下一次执行机会
setNextRunTime();
// 重新加入队列中,周期执行
reExecutePeriodic(outerTask);
}
其中 runAndReset()代码如下:
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {c.call(); // don't set result
ran = true;
} catch (Throwable ex) {setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run() runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
// 工作是否失常运行且状态为 NEW
return ran && s == NEW;
}
它的逻辑和 run()相似,只不过它没有获取 c.call()运行后果,也没有设置工作状态为 NORMAL 失常完结,目标是使得工作成为可反复执行的。
再来看 setNextRunTime()办法:
private void setNextRunTime() {
long p = period;
// fixed-rate 类型
if (p > 0)
time += p;
// fixed-delay 类型(留神能进入 setNextRunTime()的前提条件是 period != 0)// 这里因为 p 为正数,所以须要提早 - p 工夫执行(time = now + delay)else time = triggerTime(-p);
}
其中 reExecutePeriodic()如下
void reExecutePeriodic(RunnableScheduledFuture<?> task) {if (canRunInCurrentRunState(true)) {
// 从新将工作退出队列中
super.getQueue().add(task);
// 查看线程池状态
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
// 确保至多一个线程在执行
else ensurePrestart();}
}
4.scheduleAtFixedRate()
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay, long period, TimeUnit unit) {if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
能够看到,它和 scheduleWithFixedDelay()类似,不同点在于传递的 period 参数是 unit.toNanos(period),而不是 unit.toNanos(-delay), 因为 fixed-rate 类型的 period > 0.
所以,在执行 setNextRunTime()办法时,会执行 time += p,而不是 time = triggerTime(-p).
总结
本文讲述了 ScheduledThreadPoolExecutor 的原理,外部应用 DelayedWorkQueue 寄存工作,fixed-delay 类型保障工作屡次执行之间距离固定工夫,fixed-rate 类型保障依照固定频率执行工作。