ScheduledThreadPoolExecutor是ThreadPoolExecutor的扩大类,用来实现提早执行的工作、或者周期性执行的工作。

一般来讲,周期性工作或者定时工作蕴含两大组件:一个是执行工作的线程池,一个是存储工作的存储器。还记得Quartz吗?企业级定时工作框架,最重要的内容其实也是这两局部:SimpleThreadPool和JobStore。

ScheduledThreadPoolExecutor也不例外,由线程池和工作队列组成。线程池继承自ThreadPoolExecutor,工作队列DelayedWorkQueue,理解了ThreadPoolExecutor和DelayedWorkQueue,也就根本理解了ScheduledThreadPoolExecutor。

此外,ScheduledThreadPoolExecutor的非凡之处还在于他所执行的工作必须是ScheduledFutureTask,ScheduledFutureTask是“将来要执行的工作”,“将来”由delay指定。即便是通过ScheduledThreadPoolExecutor提交“立刻”而不是“将来”要执行的工作,也要通过指定delay时长为0的ScheduledFutureTask来提交。ScheduledFutureTask工作提交之后退出阻塞队列DelayedWorkQueue期待调度。

ScheduledThreadPoolExecutor的创立

提供了四个构造方法,都是通过调用父类ThreadPoolExecutor的构造方法实现ScheduledThreadPoolExecutor对象的创立:

public ScheduledThreadPoolExecutor(int corePoolSize) {        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,              new DelayedWorkQueue());    }public ScheduledThreadPoolExecutor(int corePoolSize,                                       ThreadFactory threadFactory) {        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,              new DelayedWorkQueue(), threadFactory);    }public ScheduledThreadPoolExecutor(int corePoolSize,                                       RejectedExecutionHandler handler) {        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,              new DelayedWorkQueue(), handler);    }public ScheduledThreadPoolExecutor(int corePoolSize,                                       ThreadFactory threadFactory,                                       RejectedExecutionHandler handler) {        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,              new DelayedWorkQueue(), threadFactory, handler);    }

corePoolSize通过构造方法的参数指定,maximumPoolSize在构造方法中都固定设置为Integer.MAX_VALUE,也就是不受限制。

keepAliveTime设置为0,前面咱们会晓得其实ScheduledThreadPoolExecutor的线程数不会超过corePoolSize,而且如果allowCoreThreadTimeOut放弃默认的话(false),那其实这个keepAliveTime是没有意义的。

四个构造方法均设置阻塞队列为new DelayedWorkQueue(),即仅反对DelayedWorkQueue。

ScheduledThreadPoolExecutor的线程池治理

ScheduledThreadPoolExecutor是ThreadPoolExecutor的扩大,线程池治理局部没有做扩大,保留了ThreadPoolExecutor的原有性能。

每次工作退出队列后会调用ensurePrestart(ThreadPoolExecutor实现)办法创立并启动一个线程,ThreadPoolExecutor的ensurePrestart办法:

 void ensurePrestart() {        int wc = workerCountOf(ctl.get());        if (wc < corePoolSize)            addWorker(null, true);        else if (wc == 0)            addWorker(null, false);    }

如果线程数小于corePoolSize则调用addWorker(null, true)创立外围线程线程,否则如果线程数为0(设置corePoolSize=0则可能走到这个分支)创立非核心线程。

corePoolSize大于0的状况下,ScheduledThreadPoolExecutor启动的线程数不会大于外围线程数。而且每一个线程创立的时候都不会有firstTask,线程总是从阻塞队列里获取工作执行。

提交工作

ScheduledThreadPoolExecutor提供execute(Executor接口的工作提交办法)、submit、schedule、scheduleAtFixedRate、scheduleWithFixedDelay等办法提交工作。

尽管说ScheduledThreadPoolExecutor只承受ScheduledFutureTask,但这并不是说应用层只能提交给他ScheduledFutureTask的工作,利用通过以上各办法提交工作的时候的Task是非常灵活的:能够是Callable,也能够是Runnable,ScheduledThreadPoolExecutor外部再把它们包装为ScheduledFutureTask --- 对应用层来说是通明的。

提供了这么多提交工作的办法,无非是为了反对应用层以更加灵便的形式提交工作,其实底层执行逻辑大同小异。

咱们就以scheduleAtFixedRate为例来剖析工作提交过程:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,                                                  long initialDelay,                                                  long period,                                                  TimeUnit unit) {        if (command == null || unit == null)            throw new NullPointerException();        if (period <= 0)            throw new IllegalArgumentException();        ScheduledFutureTask<Void> sft =            new ScheduledFutureTask<Void>(command,                                          null,                                          triggerTime(initialDelay, unit),                                          unit.toNanos(period));        RunnableScheduledFuture<Void> t = decorateTask(command, sft);        sft.outerTask = t;        delayedExecute(t);        return t;    }

scheduleAtFixedRate办法的目标是提交一个在提交后延时(参数initialDelay指定)启动、以固定工夫周期(参数period指定)反复执行的工作。

次要执行了以下动作:

  1. 首先创立了ScheduledFutureTask工作
  2. 之后将它包装成RunnableScheduledFuture,其实decorateTask办法间接返回了创立好的ScheduledFutureTask,没什么好剖析的
  3. 而后调用delayedExecute启动线程执行工作

ScheduledFutureTask工作

这个也是ScheduledThreadPoolExecutor的重头戏!

ScheduledFutureTask继承自FutureTask,并实现了RunnableScheduledFuture接口,所以他是一个复合体:

  1. 能够异步执行带有返回的工作(Future接口)
  2. 能够执行周期性工作(RunnableScheduledFuture接口)

先意识几个重要属性:
time: 工作应该被执行的工夫(纳秒)
period: 周期执行工作的间隔时间(纳秒),负数示意fix-rate,正数表述fix-delay(fixed-rate和fixed-delay后面jdk timer的文章讲过,含意一样)
outerTask:指向本人的RunnableScheduledFuture对象

compareTo办法:进入DelayedWorkQueue队列时须要调用CompareTo办法比拟大小,以便把最近执行的工作放在堆头。compareTo办法最终比拟的其实就是time属性。

run办法:

public void run() {            boolean periodic = isPeriodic();            if (!canRunInCurrentRunState(periodic))                cancel(false);            else if (!periodic)                ScheduledFutureTask.super.run();            else if (ScheduledFutureTask.super.runAndReset()) {                setNextRunTime();                reExecutePeriodic(outerTask);            }        }

如果是一次性工作则间接调用FutureTask的run办法执行工作,否则,如果是周期性工作,首先调用FutureTask的runAndReset办法,调用胜利的话,设置下次执行工夫,而后通过调用reExecutePeriodic将当前任务再次退出队列。

runAndReset办法首先执行当前任务,执行实现后从新设置当前任务状态为NEW,筹备下次执行。

通过剖析runAndReset办法能够晓得,周期性工作执行后不再可能获取到返回(回顾一下FutureTask的代码逻辑,状态设置为NEW之后,就不再可能获取到返回了)。

delayedExecute启动线程

办法代码很简略:

private void delayedExecute(RunnableScheduledFuture<?> task) {        if (isShutdown())            reject(task);        else {            super.getQueue().add(task);            if (isShutdown() &&                !canRunInCurrentRunState(task.isPeriodic()) &&                remove(task))                task.cancel(false);            else                ensurePrestart();        }    }

工作退出队列,之后调用ensurePrestart办法启动线程!

工作间接进入队列,之后启动线程,把工作的执行齐全交给ThreadPoolExecutor的工作执行线程Worker去调度了:Worker线程调用getTask办法从队列获取并执行工作!

咱们当初能够大胆猜想一下了:周期性工作是有严格的执行工夫要求的,没到执行工夫的工作是不能执行的,因为ThreadPoolExecutor的工作执行线程的逻辑中并没有执行工夫的判断,那么,这个逻辑应该是在getTask办法向队列获取工作的时候、由队列的出队办法实现的。

当初轮到阻塞队列DelayedWorkQueue出场了,咱们带着这个疑难来钻研一下DelayedWorkQueue队列。

阻塞队列DelayedWorkQueue

DelayedWorkQueue底层是以数组实现的堆构造。堆构造是一个齐全二叉树,能够确保每一个节点都比他的叶子节点大(或者小),这样的话堆头节点(也就是数组的第一个元素)就肯定是最大(或最小的)。

DelayedWorkQueue是存储ScheduledFutureTask的队列,最近执行的工作须要寄存在堆头,每一个节点都应该比他的叶子节点小。

每次工作退出队列、节点出队、删除节点等操作都须要依照工作执行工夫time从新调整队列。

初始化容量为16,新节点退出队列时如果队列容量不够则扩容原来容量的50%(新容量 = 1.5 * 旧容量)。

新节点入队的时候调用siftUp办法从新调整队列,以便新节点退出到堆的适合地位。

private void siftUp(int k, RunnableScheduledFuture<?> key) {            while (k > 0) {                int parent = (k - 1) >>> 1;                RunnableScheduledFuture<?> e = queue[parent];                if (key.compareTo(e) >= 0)                    break;                queue[k] = e;                setIndex(e, k);                k = parent;            }            queue[k] = key;            setIndex(key, k);        }

siftUp办法基于齐全二叉树的一个个性:齐全二叉树的第k个节点的父节点在数组中的地位为:(k-1)/2取整。

节点退出队列的时候默认退出到尾部k(以后数组的size),获取到k的父节点、比拟以后节点和父节点,如果以后节点大于父节点(调用了ScheduledFutureTask的compareTo办法,比拟的是time),阐明以后节点找到了正确的地位,否则以后节点与父节点替换地位,持续寻找父节点比拟、直到找到根节点。

这样,新退出的节点通过siftUp操作之后会依据工作触发工夫time进入到队列的适合地位。

节点须要从队列remove的时候也须要执行相似的操作(调用siftUp或siftDown办法)确保堆的正确程序。

这样一来,DelayedWorkQueue队列能够始终保障堆头(也就是数组的第一个元素)就是最近须要执行的工作。工作执行线程在获取最近须要被执行工作的时候,不须要遍历整个队列、只须要获取堆头第一个节点执行即可。

堆构造非常适合周期性工作或定时工作这一利用场景:节点退出工作时的时效性要求不高(因为是须要延时执行的工作嘛,时效性要求必定就不高了),获取数据的时效性要求高(到了工作的执行工夫了,最好当然是能即时获取到、立刻执行),可能十分无效的进步工作执行效率。

理解了堆构造的个性,晓得了堆构造入队出队的排序逻辑,接下来还须要去验证咱们的猜想:出队逻辑会判断以后是否曾经到了节点的执行工夫。

因为ThreadPoolExecutor的getTask()办法调用队列的take办法获取工作,所以,间接看DelayedWorkQueue的take()办法就能够了,

public RunnableScheduledFuture<?> take() throws InterruptedException {            final ReentrantLock lock = this.lock;            lock.lockInterruptibly();            try {                for (;;) {                    RunnableScheduledFuture<?> first = queue[0];                    if (first == null)                        available.await();                    else {                        long delay = first.getDelay(NANOSECONDS);                        if (delay <= 0)                            return finishPoll(first);                        first = null; // don't retain ref while waiting                        if (leader != null)                            available.await();                        else {                            Thread thisThread = Thread.currentThread();                            leader = thisThread;                            try {                                available.awaitNanos(delay);                            } finally {                                if (leader == thisThread)                                    leader = null;                            }                        }                    }                }            } finally {                if (leader == null && queue[0] != null)                    available.signal();                lock.unlock();            }        }

逻辑很清晰:

  1. 队列上锁
  2. 获取堆头的工作,如果为空(空队列,没有工作),期待(留神期待是会开释锁资源的、期待被唤醒之后从新获取锁资源,无关ReentrantLock咱们前面会专门做详细分析)
  3. 否则,判断堆头工作曾经到执行工夫了,堆头工作出队列并返回堆头工作。
  4. 否则,堆头工作执行工夫未到,采纳Leader-Follower模式期待

下面第3点验证了咱们的猜想!

Leader-Follower模式是指线程池中多个线程在期待执行工作的时候,线程会竞争Leader,只有一个线程会在竞争中获胜成为Leader,其余线程就都是Follower。Leader获权仅期待指定工夫(以后距下次工作执行的时间差)、Follower线程则须要无限期期待(被勾销或者被其余线程唤醒)。期待过程中如果有新的节点退出队列并成为堆头的话(新退出的工作变成了最近要被执行的工作),此时须要设置leader为空并唤醒期待线程从新竞争leader。Leader-Follower模式能够无效防止所有期待线程都进入无限期、被动期待其余线程唤醒的期待模式、在期待时长达到后被动唤醒执行工作。

小结

周期性工作线程池ScheduledThreadPoolExecutor扒完了,简略总结下:

  1. 周期性线程池能够解决立刻执行的工作、提早执行的一次性工作、提早执行的周期性工作(FixedRate和FixedDelay两种模式)
  2. 创立的时候指定外围线程数量,线程池最终启动的线程数量不会超过外围线程数量,每提交一个工作的同时启动一个线程、直到线程池数量达到外围线程数
  3. 不论是立刻执行的工作、还是提早执行的工作,工作提交后间接退出阻塞队列,期待线程从队列中获取并执行
  4. 阻塞队列采纳DelayedWorkQueue,堆构造,最近执行的工作始终放在堆头
  5. 线程池中的线程采纳Leader-Follower模式竞争工作,能够认为竞争成为Leader的线程取得了堆头工作的优先执行权

Thanks a lot!

上一篇 线程池 - ThreadPoolExecutor源码剖析