前言
ScheduledThreadPoolExecutor是一种带有提早,定时执行工作的线程池。它在很多延时工作、定时工作的场景中有丰盛的利用场景。明天就来剖析下它的实现原理吧。
实现原理
先看看类图构造:
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {}
它继承了ThreadPoolExecutor类并实现了ScheduledExecutorService接口。其中ScheduledExecutorService继承了ExecutorService,并提供了schedule相干的办法:
// 通过delay延迟时间后,执行command工作public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);// 通过delay延迟时间后,执行callable工作public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);// 通过initialDelay提早后,以period的周期时长周期性的执行command工作。// (来自官网正文)留神:// 1.如果某次工作抛出了异样,后续的周期工作不会被执行。// 2.如果某次工作执行时长超过了period周期,那么下一个周期到来时,不会执行新的一轮工作,而是往后推延,// 等到当前任务执行完后再执行,以此来保障屡次工作不会并发执行。public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); // 通过initialDelay提早后,执行工作command,而后每次等工作执行结束后,提早delay时长再执行新的一轮工作。public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
其中scheduleAtFixedRate和scheduleWithFixedDelay容易混同,这里再阐明下它们的区别:
fixed-rate执行机会:initialDelay + n * period (n >= 0的整数)
fixed-delay执行机会:initialDelay, (task1 end time + delay), (task2 end time + delay), ... (taskN end time + delay)
由ScheduledExecutorService接口还能够看到,这几个办法的返回值都为ScheduledFuture。它的构造如下:
public interface ScheduledFuture<V> extends Delayed, Future<V> {}
它继承了Delayed和Future接口。Delayed中有个getDelay()办法获取残余延时。Future大家应该都比拟相熟,它用来示意工作的异步执行后果,能够通过Future.get()或Future.isDone()系列办法判断工作的执行状况和后果。
FutureTask是Future的一个常见实现类,它外部有个state变量代表工作的执行状态。
private volatile int state;private static final int NEW = 0; // 初始状态private static final int COMPLETING = 1; // 执行中private static final int NORMAL = 2; // 失常运行完结private static final int EXCEPTIONAL = 3; // 运行中产生异样private static final int CANCELLED = 4; // 工作被勾销private static final int INTERRUPTING = 5; // 工作正在被中断private static final int INTERRUPTED = 6; // 工作曾经被中断
依据官网正文,可能的状态转换如下:
// Possible state transitions:
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED
ScheduledFutureTask继承了FutureTask,也是ScheduledFuture接口的实现类,它外部还用了period来示意周期类型。
依据官网正文,period的值代表含意如下。
period > 0,示意fixed-rate类型的周期工作。
period < 0,示意fixed-delay类型的周期工作。
period = 0,示意非周期性工作,即一次性工作。
(ps:这里个人感觉period设计的扩展性不够好,如果后续JDK版本想再加一种新的周期类型,period的值该如何示意呢?
我感觉能够换成枚举值的模式)
有了上述背景常识,再来看要害办法,就会轻松许多了。
1.构造方法:
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());}
能够看到,它应用的阻塞队列为DelayedWorkQueue,它是ScheduledThreadPoolExecutor的外部类,性能和DelayedQueue相似,也实现了BlockingQueue接口,因而能够用于线程池中。
2.schedule()办法:
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { // 工作不能为空 if (command == null || unit == null) throw new NullPointerException(); // 工作装璜和转换 RunnableScheduledFuture<?> t = decorateTask(command, // triggerTime是获取工作触发(执行)工夫戳 new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); // 增加工作到执行队列去执行 delayedExecute(t); return t;}
其中 new ScheduledFutureTask(...)代码如下:
ScheduledFutureTask(Runnable r, V result, long ns) { // 调用父类FutureTask构造函数初始化state=NEW super(r, result); // 初始化工作触发工夫 this.time = ns; // 一次性工作 this.period = 0; this.sequenceNumber = sequencer.getAndIncrement();}
其中decorateTask目前在ScheduledThreadPoolExecutor类实现中只是简略返回了task自身,但它是protected润饰的,容许咱们自定义的子类去覆写这个办法,实现工作的装璜和批改逻辑。
protected <V> RunnableScheduledFuture<V> decorateTask( Runnable runnable, RunnableScheduledFuture<V> task) { return task;}
再来看delayedExecute()办法代码:
private void delayedExecute(RunnableScheduledFuture<?> task) { // 线程池一敞开,执行回绝策略 if (isShutdown()) reject(task); else { // 将工作增加到阻塞队列中,DelayedWorkQueue,队首元素是延迟时间最短的元素 super.getQueue().add(task); // 再次查看线程池状态,如果不能运行,从队列中移除工作,并勾销工作 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); // 确保线程池中至多有一个线程在解决工作 else ensurePrestart(); }}
其中ensurePrestart代码如下:
void ensurePrestart() { int wc = workerCountOf(ctl.get()); // 减少外围线程数 if (wc < corePoolSize) addWorker(null, true); // 以后线程数为0,也增加一个线程 else if (wc == 0) addWorker(null, false);}
下面剖析了工作如何被放入阻塞队列中,接下来剖析下工作线程如何从队列中获取工作并执行。由ThreadPoolExecutor能够晓得,Worker负责从工作队列中循环获取工作,并调用它的run()办法,这里的工作Runnable被包装成了ScheduledFutureTask,它重写了run(),所以须要看它的逻辑:
public void run() { // 是否周期性工作 boolean periodic = isPeriodic(); // 以后线程池状态不能运行工作,则勾销工作 if (!canRunInCurrentRunState(periodic)) cancel(false); // 如果是一次性工作(调用schedule()会走这个分支),调用父类FutureTask的run() else if (!periodic) ScheduledFutureTask.super.run(); // 如果是周期性工作(调用scheduleAtFixedRate或scheduleWithFixedDelay会走这个分支) else if (ScheduledFutureTask.super.runAndReset()) { // 设置下一次执行机会 setNextRunTime(); // 重新加入队列中,周期执行 reExecutePeriodic(outerTask); }}
其中,FutureTask的run()代码如下:
public void run() { // 如果工作状态不为NEW或者CAS设置执行线程为以后线程失败,则返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; // 再次查看工作状态是否为NEW if (c != null && state == NEW) { V result; boolean ran; try { // 调用指标工作Callable,取得返回值result result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; // 工作执行过程中产生异样,设置异样后果 setException(ex); } // 工作失常运行完结,设置失常后果 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}
其中set(result)和setException(ex)代码如下:
protected void set(V v) { // CAS设置工作状态从NEW -> COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 设置后果 outcome = v; // 设置工作状态为失常完结,这里应用putOrderedInt效率会比putIntVolatile高些 // 且这里不要求设置的NORMAL状态对其它线程立刻可见。 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); }}
// 和下面set()相似,不做赘述protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); }}
3.scheduleWithFixedDelay()
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t;}
大体框架和schedule相似,但须要留神的是new ScheduledFutureTask(...)的区别,这里多传了个unit.toNanos(-delay)参数。它的period值为-delay,后面提到period < 0时,代表fixed-delay类型的周期工作。
ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement();}
这时回到咱们下面剖析的ScheduledFutureTask的run(),会走这个if分支。
// 运行并重置 else if (ScheduledFutureTask.super.runAndReset()) { // 设置下一次执行机会 setNextRunTime(); // 重新加入队列中,周期执行 reExecutePeriodic(outerTask); }
其中runAndReset()代码如下:
protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } // 工作是否失常运行且状态为NEW return ran && s == NEW;}
它的逻辑和run()相似,只不过它没有获取c.call()运行后果,也没有设置工作状态为NORMAL失常完结,目标是使得工作成为可反复执行的。
再来看setNextRunTime()办法:
private void setNextRunTime() { long p = period; // fixed-rate类型 if (p > 0) time += p; // fixed-delay类型(留神能进入setNextRunTime()的前提条件是period != 0) // 这里因为p为正数,所以须要提早-p工夫执行(time = now + delay) else time = triggerTime(-p);}
其中reExecutePeriodic()如下
void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { // 从新将工作退出队列中 super.getQueue().add(task); // 查看线程池状态 if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); // 确保至多一个线程在执行 else ensurePrestart(); }}
4.scheduleAtFixedRate()
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t;}
能够看到,它和scheduleWithFixedDelay()类似,不同点在于传递的period参数是unit.toNanos(period),而不是unit.toNanos(-delay),因为fixed-rate类型的period > 0.
所以,在执行setNextRunTime()办法时,会执行time += p,而不是time = triggerTime(-p).
总结
本文讲述了ScheduledThreadPoolExecutor的原理,外部应用DelayedWorkQueue寄存工作,fixed-delay类型保障工作屡次执行之间距离固定工夫,fixed-rate类型保障依照固定频率执行工作。