共计 7467 个字符,预计需要花费 19 分钟才能阅读完成。
ScheduledThreadPoolExecutor 是 ThreadPoolExecutor 的扩大类,用来实现提早执行的工作、或者周期性执行的工作。
一般来讲,周期性工作或者定时工作蕴含两大组件:一个是执行工作的线程池,一个是存储工作的存储器。还记得 Quartz 吗?企业级定时工作框架,最重要的内容其实也是这两局部:SimpleThreadPool 和 JobStore。
ScheduledThreadPoolExecutor 也不例外,由线程池和工作队列组成。线程池继承自 ThreadPoolExecutor,工作队列 DelayedWorkQueue,理解了 ThreadPoolExecutor 和 DelayedWorkQueue,也就根本理解了 ScheduledThreadPoolExecutor。
此外,ScheduledThreadPoolExecutor 的非凡之处还在于他所执行的工作必须是 ScheduledFutureTask,ScheduledFutureTask 是“将来要执行的工作”,“将来”由 delay 指定。即便是通过 ScheduledThreadPoolExecutor 提交“立刻”而不是“将来”要执行的工作,也要通过指定 delay 时长为 0 的 ScheduledFutureTask 来提交。ScheduledFutureTask 工作提交之后退出阻塞队列 DelayedWorkQueue 期待调度。
ScheduledThreadPoolExecutor 的创立
提供了四个构造方法,都是通过调用父类 ThreadPoolExecutor 的构造方法实现 ScheduledThreadPoolExecutor 对象的创立:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
corePoolSize 通过构造方法的参数指定,maximumPoolSize 在构造方法中都固定设置为 Integer.MAX_VALUE,也就是不受限制。
keepAliveTime 设置为 0,前面咱们会晓得其实 ScheduledThreadPoolExecutor 的线程数不会超过 corePoolSize,而且如果 allowCoreThreadTimeOut 放弃默认的话(false),那其实这个 keepAliveTime 是没有意义的。
四个构造方法均设置阻塞队列为 new DelayedWorkQueue(),即仅反对 DelayedWorkQueue。
ScheduledThreadPoolExecutor 的线程池治理
ScheduledThreadPoolExecutor 是 ThreadPoolExecutor 的扩大,线程池治理局部没有做扩大,保留了 ThreadPoolExecutor 的原有性能。
每次工作退出队列后会调用 ensurePrestart(ThreadPoolExecutor 实现)办法创立并启动一个线程,ThreadPoolExecutor 的 ensurePrestart 办法:
void ensurePrestart() {int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
如果线程数小于 corePoolSize 则调用 addWorker(null, true) 创立外围线程线程,否则如果线程数为 0(设置 corePoolSize= 0 则可能走到这个分支)创立非核心线程。
corePoolSize 大于 0 的状况下,ScheduledThreadPoolExecutor 启动的线程数不会大于外围线程数。而且每一个线程创立的时候都不会有 firstTask,线程总是从阻塞队列里获取工作执行。
提交工作
ScheduledThreadPoolExecutor 提供 execute(Executor 接口的工作提交办法)、submit、schedule、scheduleAtFixedRate、scheduleWithFixedDelay 等办法提交工作。
尽管说 ScheduledThreadPoolExecutor 只承受 ScheduledFutureTask,但这并不是说应用层只能提交给他 ScheduledFutureTask 的工作,利用通过以上各办法提交工作的时候的 Task 是非常灵活的:能够是 Callable,也能够是 Runnable,ScheduledThreadPoolExecutor 外部再把它们包装为 ScheduledFutureTask — 对应用层来说是通明的。
提供了这么多提交工作的办法,无非是为了反对应用层以更加灵便的形式提交工作,其实底层执行逻辑大同小异。
咱们就以 scheduleAtFixedRate 为例来剖析工作提交过程:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
scheduleAtFixedRate 办法的目标是提交一个在提交后延时(参数 initialDelay 指定)启动、以固定工夫周期(参数 period 指定)反复执行的工作。
次要执行了以下动作:
- 首先创立了 ScheduledFutureTask 工作
- 之后将它包装成 RunnableScheduledFuture,其实 decorateTask 办法间接返回了创立好的 ScheduledFutureTask,没什么好剖析的
- 而后调用 delayedExecute 启动线程执行工作
ScheduledFutureTask 工作
这个也是 ScheduledThreadPoolExecutor 的重头戏!
ScheduledFutureTask 继承自 FutureTask,并实现了 RunnableScheduledFuture 接口,所以他是一个复合体:
- 能够异步执行带有返回的工作(Future 接口)
- 能够执行周期性工作(RunnableScheduledFuture 接口)
先意识几个重要属性:
time: 工作应该被执行的工夫(纳秒)
period:周期执行工作的间隔时间(纳秒),负数示意 fix-rate, 正数表述 fix-delay(fixed-rate 和 fixed-delay 后面 jdk timer 的文章讲过,含意一样)
outerTask:指向本人的 RunnableScheduledFuture 对象
compareTo 办法:进入 DelayedWorkQueue 队列时须要调用 CompareTo 办法比拟大小,以便把最近执行的工作放在堆头。compareTo 办法最终比拟的其实就是 time 属性。
run 办法:
public void run() {boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();
reExecutePeriodic(outerTask);
}
}
如果是一次性工作则间接调用 FutureTask 的 run 办法执行工作,否则,如果是周期性工作,首先调用 FutureTask 的 runAndReset 办法,调用胜利的话,设置下次执行工夫,而后通过调用 reExecutePeriodic 将当前任务再次退出队列。
runAndReset 办法首先执行当前任务,执行实现后从新设置当前任务状态为 NEW,筹备下次执行。
通过剖析 runAndReset 办法能够晓得,周期性工作执行后不再可能获取到返回(回顾一下 FutureTask 的代码逻辑,状态设置为 NEW 之后,就不再可能获取到返回了)。
delayedExecute 启动线程
办法代码很简略:
private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())
reject(task);
else {super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();}
}
工作退出队列,之后调用 ensurePrestart 办法启动线程!
工作间接进入队列,之后启动线程,把工作的执行齐全交给 ThreadPoolExecutor 的工作执行线程 Worker 去调度了:Worker 线程调用 getTask 办法从队列获取并执行工作!
咱们当初能够大胆猜想一下了:周期性工作是有严格的执行工夫要求的,没到执行工夫的工作是不能执行的,因为 ThreadPoolExecutor 的工作执行线程的逻辑中并没有执行工夫的判断,那么,这个逻辑应该是在 getTask 办法向队列获取工作的时候、由队列的出队办法实现的。
当初轮到阻塞队列 DelayedWorkQueue 出场了,咱们带着这个疑难来钻研一下 DelayedWorkQueue 队列。
阻塞队列 DelayedWorkQueue
DelayedWorkQueue 底层是以数组实现的堆构造。堆构造是一个齐全二叉树,能够确保每一个节点都比他的叶子节点大(或者小),这样的话堆头节点(也就是数组的第一个元素)就肯定是最大(或最小的)。
DelayedWorkQueue 是存储 ScheduledFutureTask 的队列,最近执行的工作须要寄存在堆头,每一个节点都应该比他的叶子节点小。
每次工作退出队列、节点出队、删除节点等操作都须要依照工作执行工夫 time 从新调整队列。
初始化容量为 16,新节点退出队列时如果队列容量不够则扩容原来容量的 50%(新容量 = 1.5 * 旧容量)。
新节点入队的时候调用 siftUp 办法从新调整队列,以便新节点退出到堆的适合地位。
private void siftUp(int k, RunnableScheduledFuture<?> key) {while (k > 0) {int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
siftUp 办法基于齐全二叉树的一个个性:齐全二叉树的第 k 个节点的父节点在数组中的地位为:(k-1)/ 2 取整。
节点退出队列的时候默认退出到尾部 k(以后数组的 size),获取到 k 的父节点、比拟以后节点和父节点,如果以后节点大于父节点(调用了 ScheduledFutureTask 的 compareTo 办法,比拟的是 time),阐明以后节点找到了正确的地位,否则以后节点与父节点替换地位,持续寻找父节点比拟、直到找到根节点。
这样,新退出的节点通过 siftUp 操作之后会依据工作触发工夫 time 进入到队列的适合地位。
节点须要从队列 remove 的时候也须要执行相似的操作(调用 siftUp 或 siftDown 办法)确保堆的正确程序。
这样一来,DelayedWorkQueue 队列能够始终保障堆头(也就是数组的第一个元素)就是最近须要执行的工作。工作执行线程在获取最近须要被执行工作的时候,不须要遍历整个队列、只须要获取堆头第一个节点执行即可。
堆构造非常适合周期性工作或定时工作这一利用场景:节点退出工作时的时效性要求不高(因为是须要延时执行的工作嘛,时效性要求必定就不高了),获取数据的时效性要求高(到了工作的执行工夫了,最好当然是能即时获取到、立刻执行),可能十分无效的进步工作执行效率。
理解了堆构造的个性,晓得了堆构造入队出队的排序逻辑,接下来还须要去验证咱们的猜想: 出队逻辑会判断以后是否曾经到了节点的执行工夫。
因为 ThreadPoolExecutor 的 getTask() 办法调用队列的 take 办法获取工作,所以,间接看 DelayedWorkQueue 的 take() 办法就能够了,
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {for (;;) {RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {Thread thisThread = Thread.currentThread();
leader = thisThread;
try {available.awaitNanos(delay);
} finally {if (leader == thisThread)
leader = null;
}
}
}
}
} finally {if (leader == null && queue[0] != null)
available.signal();
lock.unlock();}
}
逻辑很清晰:
- 队列上锁
- 获取堆头的工作,如果为空(空队列,没有工作),期待(留神期待是会开释锁资源的、期待被唤醒之后从新获取锁资源,无关 ReentrantLock 咱们前面会专门做详细分析)
- 否则, 判断堆头工作曾经到执行工夫了,堆头工作出队列并返回堆头工作。
- 否则,堆头工作执行工夫未到,采纳 Leader-Follower 模式期待
下面第 3 点验证了咱们的猜想!
Leader-Follower 模式是指线程池中多个线程在期待执行工作的时候,线程会竞争 Leader,只有一个线程会在竞争中获胜成为 Leader,其余线程就都是 Follower。Leader 获权仅期待指定工夫(以后距下次工作执行的时间差)、Follower 线程则须要无限期期待(被勾销或者被其余线程唤醒)。期待过程中如果有新的节点退出队列并成为堆头的话(新退出的工作变成了最近要被执行的工作),此时须要设置 leader 为空并唤醒期待线程从新竞争 leader。Leader-Follower 模式能够无效防止所有期待线程都进入无限期、被动期待其余线程唤醒的期待模式、在期待时长达到后被动唤醒执行工作。
小结
周期性工作线程池 ScheduledThreadPoolExecutor 扒完了,简略总结下:
- 周期性线程池能够解决立刻执行的工作、提早执行的一次性工作、提早执行的周期性工作(FixedRate 和 FixedDelay 两种模式)
- 创立的时候指定外围线程数量,线程池最终启动的线程数量不会超过外围线程数量,每提交一个工作的同时启动一个线程、直到线程池数量达到外围线程数
- 不论是立刻执行的工作、还是提早执行的工作,工作提交后间接退出阻塞队列,期待线程从队列中获取并执行
- 阻塞队列采纳 DelayedWorkQueue,堆构造,最近执行的工作始终放在堆头
- 线程池中的线程采纳 Leader-Follower 模式竞争工作,能够认为竞争成为 Leader 的线程取得了堆头工作的优先执行权
Thanks a lot!
上一篇 线程池 – ThreadPoolExecutor 源码剖析