ScheduledThreadPoolExecutor是ThreadPoolExecutor的扩大类,用来实现提早执行的工作、或者周期性执行的工作。
一般来讲,周期性工作或者定时工作蕴含两大组件:一个是执行工作的线程池,一个是存储工作的存储器。还记得Quartz吗?企业级定时工作框架,最重要的内容其实也是这两局部:SimpleThreadPool和JobStore。
ScheduledThreadPoolExecutor也不例外,由线程池和工作队列组成。线程池继承自ThreadPoolExecutor,工作队列DelayedWorkQueue,理解了ThreadPoolExecutor和DelayedWorkQueue,也就根本理解了ScheduledThreadPoolExecutor。
此外,ScheduledThreadPoolExecutor的非凡之处还在于他所执行的工作必须是ScheduledFutureTask,ScheduledFutureTask是“将来要执行的工作”,“将来”由delay指定。即便是通过ScheduledThreadPoolExecutor提交“立刻”而不是“将来”要执行的工作,也要通过指定delay时长为0的ScheduledFutureTask来提交。ScheduledFutureTask工作提交之后退出阻塞队列DelayedWorkQueue期待调度。
ScheduledThreadPoolExecutor的创立
提供了四个构造方法,都是通过调用父类ThreadPoolExecutor的构造方法实现ScheduledThreadPoolExecutor对象的创立:
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); }public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); }public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); }
corePoolSize通过构造方法的参数指定,maximumPoolSize在构造方法中都固定设置为Integer.MAX_VALUE,也就是不受限制。
keepAliveTime设置为0,前面咱们会晓得其实ScheduledThreadPoolExecutor的线程数不会超过corePoolSize,而且如果allowCoreThreadTimeOut放弃默认的话(false),那其实这个keepAliveTime是没有意义的。
四个构造方法均设置阻塞队列为new DelayedWorkQueue(),即仅反对DelayedWorkQueue。
ScheduledThreadPoolExecutor的线程池治理
ScheduledThreadPoolExecutor是ThreadPoolExecutor的扩大,线程池治理局部没有做扩大,保留了ThreadPoolExecutor的原有性能。
每次工作退出队列后会调用ensurePrestart(ThreadPoolExecutor实现)办法创立并启动一个线程,ThreadPoolExecutor的ensurePrestart办法:
void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }
如果线程数小于corePoolSize则调用addWorker(null, true)创立外围线程线程,否则如果线程数为0(设置corePoolSize=0则可能走到这个分支)创立非核心线程。
corePoolSize大于0的状况下,ScheduledThreadPoolExecutor启动的线程数不会大于外围线程数。而且每一个线程创立的时候都不会有firstTask,线程总是从阻塞队列里获取工作执行。
提交工作
ScheduledThreadPoolExecutor提供execute(Executor接口的工作提交办法)、submit、schedule、scheduleAtFixedRate、scheduleWithFixedDelay等办法提交工作。
尽管说ScheduledThreadPoolExecutor只承受ScheduledFutureTask,但这并不是说应用层只能提交给他ScheduledFutureTask的工作,利用通过以上各办法提交工作的时候的Task是非常灵活的:能够是Callable,也能够是Runnable,ScheduledThreadPoolExecutor外部再把它们包装为ScheduledFutureTask --- 对应用层来说是通明的。
提供了这么多提交工作的办法,无非是为了反对应用层以更加灵便的形式提交工作,其实底层执行逻辑大同小异。
咱们就以scheduleAtFixedRate为例来剖析工作提交过程:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
scheduleAtFixedRate办法的目标是提交一个在提交后延时(参数initialDelay指定)启动、以固定工夫周期(参数period指定)反复执行的工作。
次要执行了以下动作:
- 首先创立了ScheduledFutureTask工作
- 之后将它包装成RunnableScheduledFuture,其实decorateTask办法间接返回了创立好的ScheduledFutureTask,没什么好剖析的
- 而后调用delayedExecute启动线程执行工作
ScheduledFutureTask工作
这个也是ScheduledThreadPoolExecutor的重头戏!
ScheduledFutureTask继承自FutureTask,并实现了RunnableScheduledFuture接口,所以他是一个复合体:
- 能够异步执行带有返回的工作(Future接口)
- 能够执行周期性工作(RunnableScheduledFuture接口)
先意识几个重要属性:
time: 工作应该被执行的工夫(纳秒)
period: 周期执行工作的间隔时间(纳秒),负数示意fix-rate,正数表述fix-delay(fixed-rate和fixed-delay后面jdk timer的文章讲过,含意一样)
outerTask:指向本人的RunnableScheduledFuture对象
compareTo办法:进入DelayedWorkQueue队列时须要调用CompareTo办法比拟大小,以便把最近执行的工作放在堆头。compareTo办法最终比拟的其实就是time属性。
run办法:
public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } }
如果是一次性工作则间接调用FutureTask的run办法执行工作,否则,如果是周期性工作,首先调用FutureTask的runAndReset办法,调用胜利的话,设置下次执行工夫,而后通过调用reExecutePeriodic将当前任务再次退出队列。
runAndReset办法首先执行当前任务,执行实现后从新设置当前任务状态为NEW,筹备下次执行。
通过剖析runAndReset办法能够晓得,周期性工作执行后不再可能获取到返回(回顾一下FutureTask的代码逻辑,状态设置为NEW之后,就不再可能获取到返回了)。
delayedExecute启动线程
办法代码很简略:
private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } }
工作退出队列,之后调用ensurePrestart办法启动线程!
工作间接进入队列,之后启动线程,把工作的执行齐全交给ThreadPoolExecutor的工作执行线程Worker去调度了:Worker线程调用getTask办法从队列获取并执行工作!
咱们当初能够大胆猜想一下了:周期性工作是有严格的执行工夫要求的,没到执行工夫的工作是不能执行的,因为ThreadPoolExecutor的工作执行线程的逻辑中并没有执行工夫的判断,那么,这个逻辑应该是在getTask办法向队列获取工作的时候、由队列的出队办法实现的。
当初轮到阻塞队列DelayedWorkQueue出场了,咱们带着这个疑难来钻研一下DelayedWorkQueue队列。
阻塞队列DelayedWorkQueue
DelayedWorkQueue底层是以数组实现的堆构造。堆构造是一个齐全二叉树,能够确保每一个节点都比他的叶子节点大(或者小),这样的话堆头节点(也就是数组的第一个元素)就肯定是最大(或最小的)。
DelayedWorkQueue是存储ScheduledFutureTask的队列,最近执行的工作须要寄存在堆头,每一个节点都应该比他的叶子节点小。
每次工作退出队列、节点出队、删除节点等操作都须要依照工作执行工夫time从新调整队列。
初始化容量为16,新节点退出队列时如果队列容量不够则扩容原来容量的50%(新容量 = 1.5 * 旧容量)。
新节点入队的时候调用siftUp办法从新调整队列,以便新节点退出到堆的适合地位。
private void siftUp(int k, RunnableScheduledFuture<?> key) { while (k > 0) { int parent = (k - 1) >>> 1; RunnableScheduledFuture<?> e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); }
siftUp办法基于齐全二叉树的一个个性:齐全二叉树的第k个节点的父节点在数组中的地位为:(k-1)/2取整。
节点退出队列的时候默认退出到尾部k(以后数组的size),获取到k的父节点、比拟以后节点和父节点,如果以后节点大于父节点(调用了ScheduledFutureTask的compareTo办法,比拟的是time),阐明以后节点找到了正确的地位,否则以后节点与父节点替换地位,持续寻找父节点比拟、直到找到根节点。
这样,新退出的节点通过siftUp操作之后会依据工作触发工夫time进入到队列的适合地位。
节点须要从队列remove的时候也须要执行相似的操作(调用siftUp或siftDown办法)确保堆的正确程序。
这样一来,DelayedWorkQueue队列能够始终保障堆头(也就是数组的第一个元素)就是最近须要执行的工作。工作执行线程在获取最近须要被执行工作的时候,不须要遍历整个队列、只须要获取堆头第一个节点执行即可。
堆构造非常适合周期性工作或定时工作这一利用场景:节点退出工作时的时效性要求不高(因为是须要延时执行的工作嘛,时效性要求必定就不高了),获取数据的时效性要求高(到了工作的执行工夫了,最好当然是能即时获取到、立刻执行),可能十分无效的进步工作执行效率。
理解了堆构造的个性,晓得了堆构造入队出队的排序逻辑,接下来还须要去验证咱们的猜想:出队逻辑会判断以后是否曾经到了节点的执行工夫。
因为ThreadPoolExecutor的getTask()办法调用队列的take办法获取工作,所以,间接看DelayedWorkQueue的take()办法就能够了,
public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
逻辑很清晰:
- 队列上锁
- 获取堆头的工作,如果为空(空队列,没有工作),期待(留神期待是会开释锁资源的、期待被唤醒之后从新获取锁资源,无关ReentrantLock咱们前面会专门做详细分析)
- 否则,判断堆头工作曾经到执行工夫了,堆头工作出队列并返回堆头工作。
- 否则,堆头工作执行工夫未到,采纳Leader-Follower模式期待
下面第3点验证了咱们的猜想!
Leader-Follower模式是指线程池中多个线程在期待执行工作的时候,线程会竞争Leader,只有一个线程会在竞争中获胜成为Leader,其余线程就都是Follower。Leader获权仅期待指定工夫(以后距下次工作执行的时间差)、Follower线程则须要无限期期待(被勾销或者被其余线程唤醒)。期待过程中如果有新的节点退出队列并成为堆头的话(新退出的工作变成了最近要被执行的工作),此时须要设置leader为空并唤醒期待线程从新竞争leader。Leader-Follower模式能够无效防止所有期待线程都进入无限期、被动期待其余线程唤醒的期待模式、在期待时长达到后被动唤醒执行工作。
小结
周期性工作线程池ScheduledThreadPoolExecutor扒完了,简略总结下:
- 周期性线程池能够解决立刻执行的工作、提早执行的一次性工作、提早执行的周期性工作(FixedRate和FixedDelay两种模式)
- 创立的时候指定外围线程数量,线程池最终启动的线程数量不会超过外围线程数量,每提交一个工作的同时启动一个线程、直到线程池数量达到外围线程数
- 不论是立刻执行的工作、还是提早执行的工作,工作提交后间接退出阻塞队列,期待线程从队列中获取并执行
- 阻塞队列采纳DelayedWorkQueue,堆构造,最近执行的工作始终放在堆头
- 线程池中的线程采纳Leader-Follower模式竞争工作,能够认为竞争成为Leader的线程取得了堆头工作的优先执行权
Thanks a lot!
上一篇 线程池 - ThreadPoolExecutor源码剖析