乐趣区

关于java:ScheduledThreadPoolExecutor源码分析你知道定时线程池是如何实现延迟执行和周期执行的吗

Java 版本:8u261。

1 简介

ScheduledThreadPoolExecutor 即定时线程池,是用来执行提早工作或周期性工作的。相比于 Timer 的单线程,定时线程池在遇到工作抛出异样的时候不会敞开整个线程池,更加强壮(须要提一下的是:ScheduledThreadPoolExecutor 和 ThreadPoolExecutor 一样,如果执行工作的过程中抛异样的话,这个工作是会被抛弃的。所以在工作的执行过程中须要对异样做捕捉解决,有必要的话须要做弥补措施)。

传进来的工作会被包装为 ScheduledFutureTask,其继承于 FutureTask,提供异步执行的能力,并且能够返回执行后果。同时实现了 Delayed 接口,能够通过 getDelay 办法来获取延迟时间。

相比于 ThreadPoolExecutor,ScheduledThreadPoolExecutor 中应用的队列是 DelayedWorkQueue,是一个无界的队列。所以在定时线程池中,最大线程数是没有意义的(最大线程数会固定为 int 的最大值,且不会作为定时线程池的参数)。在 ThreadPoolExecutor 中,如果以后线程数小于外围线程数就间接创立外围线程来执行工作,大于等于外围线程数的话才往阻塞队列中放入工作;而在 ScheduledThreadPoolExecutor 中却不是这种逻辑。ScheduledThreadPoolExecutor 中上来就会把工作放进提早队列中,而后再去期待执行。

1.1 小顶堆

DelayedWorkQueue 的实现有些非凡,是基于小顶堆构建的(与 DelayQueue 和 PriorityQueue 相似)。因为要保障每次从提早队列中拿取到的工作是距当初最近的一个,所以应用小顶堆构造来构建是再适宜不过了(堆构造也经常用来解决前 N 小和前 N 大的问题)。小顶堆保障每个节点的值不小于其父节点的值,而不大于其孩子节点的值,而对于同级节点来说则没有什么限度。这样在小顶堆中值最小的点永远保障是在根节点处。如果用数组来构建小顶堆的话,值最小的点就在数组中的第一个地位处。

图中红色的数字代表节点在数组中的索引地位,由此能够看出堆的另一条性质是:假如以后节点的索引是 k,那么其父节点的索引是:(k-1)/2;左孩子节点的索引是:k2+1;而右孩子节点的索引是 k 2+2。

构建堆的两个外围办法是 siftUpsiftDown,siftUp 办法用于增加节点时的上溯过程;而 siftDown 办法用于删除节点时的下溯过程。具体的实现源码会在上面进行剖析,这里就画图来了解一下(上面只会剖析经典的小顶堆增加和删除节点的实现,而在源码中的实现略有不同,但外围都是一样的):

1.1.1 增加节点

如果在下面的 siftUp 过程中,发现某一次以后节点的值就曾经大于了父节点的值,siftUp 过程也就会提前终止了。同时能够看出:在下面的 siftUp 以及上面将要讲的 siftDown 操作过程中,每次都只会比拟并替换以后节点和其父子节点的值,而不是整个堆都产生变动,升高了工夫复杂度。

1.1.2 删除节点

删除节点分为三种状况,首先来看一下 删除根节点的状况

而后是 删除最初一个节点的状况。删除最初一个节点是最简略的,只须要进行删除就行了,因为这并不影响小顶堆的构造,不须要进行调整。这里就不再展现了(留神:删除除了最初一个节点的其余叶子节点并不属于以后这种状况,而是属于上面第三种状况。也就是说删除这些叶子节点并不能简略地删除它们就完了的,因为堆构造首先得保障是一颗齐全二叉树)。

最初是 删除既不是根节点又不是最初一个节点的状况

在删除既不是根节点又不是最初一个节点的时候,能够看到执行了一次 siftDown 并随同了一次 siftUp 的过程。然而这个 siftUp 过程并不是会肯定触发的,只有满足最初一个节点的值比要删除节点的父节点的值还要小的时候才会触发 siftUp 操作(这个很好推理:在小顶堆中如果最初一个节点值比要删除节点的父节点值要小的话,那么要删除节点的左右孩子节点值也必然是都大于最初一个节点值的(不思考值相等的状况),那么此时就不会产生 siftDown 操作;而如果产生了 siftDown 操作,就阐明最初一个节点值至多要比要删除节点的左右孩子节点中的一个要大(如果有左右孩子节点的话)。而孙子节点值是必定要大于爷爷节点值的(不思考值相等的状况),所以也就是说产生了 siftDown 操作的时候,最初一个节点值是比要删除节点的父节点值大的。这个时候孙子节点和最初一个节点 siftDown 替换后,仍然是满足小顶堆性质的,所以就不须要附加的 siftUp 操作;还有一种状况是最初一个节点值是介于要删除节点的父节点值和要删除节点的左右孩子节点值中的较小者,那么这个时候既不会产生 siftDown,也不会产生 siftUp)。

而源码中的实现和下面的经典实现最大的不同就是不会有节点彼此替换的操作。在 siftUp 和 siftDown 的经典实现中,如果须要变动节点时,都会来一次父子节点的互相交换操作(包含删除节点时首先做的要删除节点和最初一个节点之间的替换操作也是如此)。如果认真思考的话,就会发现这其实是多余的。在须要替换节点的时候,只须要 siftUp 操作时的父节点或 siftDown 时的孩子节点从新移到以后须要比拟的节点地位上,而比拟节点是不须要挪动到它们的地位上的。此时间接进入到下一次的判断中,反复 siftUp 或 siftDown 过程,直到最初找到了比拟节点的插入地位后,才会将其插入进去。这样做的益处是能够省去一半的节点赋值的操作,进步了执行的效率。同时这也就意味着,须要将要比拟的节点作为参数保存起来,而源码中也正是这么实现的。

1.2 Leader-Follower 模式

ScheduledThreadPoolExecutor 中应用了 Leader-Follower 模式。这是一种设计思维,如果说当初有一堆期待执行的工作(个别是寄存在一个队列中排好序),而所有的工作线程中只会有一个是 leader 线程,其余的线程都是 follower 线程。只有 leader 线程能执行工作,而剩下的 follower 线程则不会执行工作,它们会处在休眠中的状态。当 leader 线程拿到工作后执行工作前,本人会变成 follower 线程,同时会选出一个新的 leader 线程,而后才去执行工作。如果此时有下一个工作,就是这个新的 leader 线程来执行了,并以此往返这个过程。当之前那个执行工作的线程执行结束再回来时,会判断如果此时曾经没工作了,又或者有工作然而有其余的线程作为 leader 线程,那么本人就休眠了;如果此时有工作然而没有 leader 线程,那么本人就会从新成为 leader 线程来执行工作。

不像 ThreadPoolExecutor 是须要立刻执行工作的,ScheduledThreadPoolExecutor 中的工作是提早执行的,而拿取工作也是提早拿取的。所以并不需要所有的线程都处于运行状态延时期待获取工作。而如果这么做的话,最初也只会有一个线程能执行当前任务,其余的线程还是会被再次休眠的(这里只是在说单任务多线程的状况,但对于多任务来说也是一样的,总结来说就是 Leader-Follower 模式只会唤醒真正须要“干事”的线程)。这是很没有必要的,而且浪费资源。所以应用 Leader-Follower 模式的益处是:防止没必要的唤醒和阻塞的操作,这样会更加无效,且节俭资源。

2 结构器

 1 /**
 2  * ScheduledThreadPoolExecutor:
 3  */
 4 public ScheduledThreadPoolExecutor(int corePoolSize) {
 5     super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
 6             new DelayedWorkQueue());
 7 }
 8
 9 /**
10  * ThreadPoolExecutor:
11  */
12 public ThreadPoolExecutor(int corePoolSize,
13                           int maximumPoolSize,
14                           long keepAliveTime,
15                           TimeUnit unit,
16                           BlockingQueue<Runnable> workQueue) {
17     this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
18             Executors.defaultThreadFactory(), defaultHandler);
19 }

能够看到:ScheduledThreadPoolExecutor 的结构器是调用了父类 ThreadPoolExecutor 的结构器来实现的,而父类的结构器以及之中的所有参数我在之前剖析 ThreadPoolExecutor 的源码文章中讲过,这里就不再赘述了。

3 schedule 办法

execute 办法和 submit 办法外部都是调用的 schedule 办法,所以来看一下其实现:

 1 /**
 2  * ScheduledThreadPoolExecutor:
 3  */
 4 public ScheduledFuture<?> schedule(Runnable command,
 5                                    long delay,
 6                                    TimeUnit unit) {
 7     // 非空校验
 8     if (command == null || unit == null)
 9         throw new NullPointerException();
10     // 包装工作
11     RunnableScheduledFuture<?> t = decorateTask(command,
12             new ScheduledFutureTask<Void>(command, null,
13                     triggerTime(delay, unit)));
14     // 提早执行
15     delayedExecute(t);
16     return t;
17 }
18
19 /**
20  * 第 13 行代码处:21  * 提早操作的触发工夫
22  */
23 private long triggerTime(long delay, TimeUnit unit) {
24     //delay 非负解决
25     return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
26 }
27
28 long triggerTime(long delay) {
29     /*
30     now 办法外部就一句话:“System.nanoTime();”,也就是获取以后工夫。这里也就是获取
31     以后工夫加上延迟时间后的后果。如果延迟时间超过了下限,会在 overflowFree 办法中解决
32      */
33     return now() +
34             ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
35 }
36
37 private long overflowFree(long delay) {
38     // 获取队头节点(不移除)39     Delayed head = (Delayed) super.getQueue().peek();
40     if (head != null) {
41         // 获取队头的残余延迟时间
42         long headDelay = head.getDelay(NANOSECONDS);
43         /*
44         能走进本办法中,就阐明 delay 是一个靠近 long 最大值的数。此时判断如果 headDelay 小于 0
45         就阐明延迟时间曾经到了或过期了然而还没有执行,并且 delay 和 headDelay 的差值小于 0,阐明 headDelay
46         和 delay 的差值曾经超过了 long 的范畴
47          */
48         if (headDelay < 0 && (delay - headDelay < 0))
49             // 此时更新一下 delay 的值,确保其和 headDelay 的差值在 long 的范畴内,同时 delay 也会从新变成一个负数
50             delay = Long.MAX_VALUE + headDelay;
51     }
52     return delay;
53 }
54
55 /**
56  * 第 39 行代码处:57  * 调用 DelayedWorkQueue 中覆写的 peek 办法来获取队头节点
58  */
59 public RunnableScheduledFuture<?> peek() {
60     final ReentrantLock lock = this.lock;
61     lock.lock();
62     try {63         return queue[0];
64     } finally {65         lock.unlock();
66     }
67 }
68
69 /**
70  * 第 42 行代码处:71  * 能够看到本办法就是获取延迟时间和以后工夫的差值
72  */
73 public long getDelay(TimeUnit unit) {74     return unit.convert(time - now(), NANOSECONDS);
75 }

4 包装工作

下面第 11 行和第 12 行代码处会进行工作的包装:

 1 /**
 2  * ScheduledThreadPoolExecutor:
 3  */
 4 ScheduledFutureTask(Runnable r, V result, long ns) {
 5     // 调用父类 FutureTask 的结构器
 6     super(r, result);
 7     // 这里会将延迟时间赋值给 this.time
 8     this.time = ns;
 9     //period 用来示意工作的类型,为 0 示意提早工作,否则示意周期性工作
10     this.period = 0;
11     // 这里会给每一个工作赋值一个惟一的序列号。当延迟时间雷同时,会以该序列号来进行判断。序列号小的会出队
12     this.sequenceNumber = sequencer.getAndIncrement();
13 }
14
15 /**
16  * schedule 办法第 11 行代码处:17  * 包装工作,这里只是返回 task 而已,子类能够覆写本办法中的逻辑
18  */
19 protected <V> RunnableScheduledFuture<V> decorateTask(20         Runnable runnable, RunnableScheduledFuture<V> task) {
21     return task;
22 }

5 delayedExecute 办法

在 schedule 办法的第 15 行代码处会执行提早工作,增加工作和补充工作线程:

  1 /**
  2  * ScheduledThreadPoolExecutor:
  3  */
  4 private void delayedExecute(RunnableScheduledFuture<?> task) {5     if (isShutdown())
  6         /*
  7         这里会调用父类 ThreadPoolExecutor 的 isShutdown 办法来判断以后线程池是否处于敞开或正在敞开的状态,8         如果是的话就执行具体的回绝策略
  9          */
 10         reject(task);
 11     else {
 12         // 否则就往提早队列中增加当前任务
 13         super.getQueue().add(task);
 14         /*
 15         增加后持续判断以后线程池是否处于敞开或正在敞开的状态,如果是的话就判断此时是否还能继续执行工作,16         如果不能的话就删除下面增加的工作
 17          */
 18         if (isShutdown() &&
 19                 !canRunInCurrentRunState(task.isPeriodic()) &&
 20                 remove(task))
 21             // 同时会勾销此工作的执行
 22             task.cancel(false);
 23         else
 24             // 否则,阐明线程池是能够继续执行工作的,就去判断此时是否须要补充工作线程
 25             ensurePrestart();
 26     }
 27 }
 28
 29 /**
 30  * 第 19 行代码处:31  * 传进来的 periodic 示意工作是否是周期性工作,如果是的话就是 true(通过“period != 0”进行判断)32  */
 33 boolean canRunInCurrentRunState(boolean periodic) {
 34     return isRunningOrShutdown(periodic ?
 35             // 敞开线程池时判断是否须要继续执行周期性工作
 36             continueExistingPeriodicTasksAfterShutdown :
 37             // 敞开线程池时判断是否须要继续执行提早工作
 38             executeExistingDelayedTasksAfterShutdown);
 39 }
 40
 41 /**
 42  * ThreadPoolExecutor:
 43  */
 44 final boolean isRunningOrShutdown(boolean shutdownOK) {
 45     // 获取以后线程池的运行状态
 46     int rs = runStateOf(ctl.get());
 47     // 如果是 RUNNING 状态的,或者是 SHUTDOWN 状态并且是能继续执行工作的,就返回 true
 48     return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
 49 }
 50
 51 /**
 52  * ScheduledThreadPoolExecutor:
 53  * 下面第 20 行代码处的 remove 办法会调用 ThreadPoolExecutor 的 remove 办法,而该办法我在之前的
 54  * ThreadPoolExecutor 的源码剖析文章中曾经剖析过了。然而其中会调用提早队列覆写的 remove 逻辑,55  * 也就是本办法(同时第 130 行代码处也会调用到这里)56  */
 57 public boolean remove(Object x) {
 58     final ReentrantLock lock = this.lock;
 59     // 加锁
 60     lock.lock();
 61     try {
 62         // 获取以后节点的堆索引位
 63         int i = indexOf(x);
 64         if (i < 0)
 65             // 如果找不到的话,就间接返回 false
 66             return false;
 67
 68         // 将以后节点的索引位设置为 -1,因为上面要进行删除了
 69         setIndex(queue[i], -1);
 70         //size-1
 71         int s = --size;
 72         // 获取小顶堆的最初一个节点,用于替换
 73         RunnableScheduledFuture<?> replacement = queue[s];
 74         // 将最初一个节点置为 null
 75         queue[s] = null;
 76         // 如果要删除的节点自身就是最初一个节点的话,就能够间接返回 true 了,因为不影响小顶堆的构造
 77         if (s != i) {
 78             /*
 79             否则执行一次 siftDown 下溯过程,将最初一个节点的值从新插入到小顶堆中
 80             这其中会删除 i 地位处的节点(siftDown 办法前面会再次调用,到时候再来详细分析该办法的实现)81              */
 82             siftDown(i, replacement);
 83             /*
 84             通过下面的 siftDown 的操作后,如果最初一个节点的延迟时间自身就比要删除的节点的小的话,85             那么就会间接将最初一个节点放在要删除节点的地位上。此时从删除节点到其上面的节点都是满足
 86             小顶堆构造的,然而不能保障 replacement 也就是以后删除后的替换节点和其父节点之间满足小顶堆
 87             构造,也就是说可能呈现 replacement 节点的延迟时间比其父节点的还小的状况
 88              */
 89             if (queue[i] == replacement)
 90                 // 那么此时就调用一次 siftUp 上溯操作,再次调整 replacement 节点其上的小顶堆的构造即可
 91                 siftUp(i, replacement);
 92         }
 93         return true;
 94     } finally {
 95         // 开释锁
 96         lock.unlock();
 97     }
 98 }
 99
100 /**
101  * 第 63 行代码处:102  */
103 private int indexOf(Object x) {104     if (x != null) {105         if (x instanceof ScheduledFutureTask) {
106             // 如果以后节点是 ScheduledFutureTask 类型的,就获取它的堆索引位
107             int i = ((ScheduledFutureTask) x).heapIndex;
108             // 大于等于 0 和小于 size 阐明以后节点还在小顶堆中,并且以后节点还在提早队列中的话,就间接返回该索引位
109             if (i >= 0 && i < size && queue[i] == x)
110                 return i;
111         } else {
112             // 否则就依照一般遍历的形式查找是否有相等的节点,如果有的话就返回索引位
113             for (int i = 0; i < size; i++)
114                 if (x.equals(queue[i]))
115                     return i;
116         }
117     }
118     // 找不到的话就返回 -1
119     return -1;
120 }
121
122 /**
123  * 第 22 行代码处:124  */
125 public boolean cancel(boolean mayInterruptIfRunning) {
126     // 调用 FutureTask 的 cancel 办法来尝试勾销此工作的执行
127     boolean cancelled = super.cancel(mayInterruptIfRunning);
128     // 如果勾销胜利了,并且容许删除节点,并且以后节点存在于小顶堆中的话,就删除它
129     if (cancelled && removeOnCancel && heapIndex >= 0)
130         remove(this);
131     return cancelled;
132 }
133
134 /**
135  * ThreadPoolExecutor:
136  * 第 25 行代码处:137  */
138 void ensurePrestart() {
139     // 获取以后线程池的工作线程数
140     int wc = workerCountOf(ctl.get());
141     if (wc < corePoolSize)
142         /*
143         如果小于外围线程数,就增加一个外围线程,之前我在剖析 ThreadPoolExecutor 的源码文章中讲过,144         addWorker 办法的执行中会同时启动运行线程。这里传入的 firstTask 参数为 null,因为不须要立刻执行工作,145         而是从提早队列中拿取工作
146          */
147         addWorker(null, true);
148     else if (wc == 0)
149         // 如果以后没有工作线程,就去增加一个非核心线程,而后运行它。保障至多要有一个线程
150         addWorker(null, false);
151     /*
152     从这里能够看出,如果以后的工作线程数曾经达到了外围线程数后,就不会再创立工作线程了
153     定时线程池最多只有“外围线程数”个线程,也就是通过结构器传进来的参数大小
154      */
155 }

6 增加工作

因为提早队列是用小顶堆构建的,所以增加的时候会波及到小顶堆的调整:

  1 /**
  2  * ScheduledThreadPoolExecutor:
  3  * 这里会调用 DelayedWorkQueue 的 add 办法
  4  */
  5 public boolean add(Runnable e) {6     return offer(e);
  7 }
  8
  9 public boolean offer(Runnable x) {
 10     // 非空校验
 11     if (x == null)
 12         throw new NullPointerException();
 13     // 强转类型
 14     RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>) x;
 15     final ReentrantLock lock = this.lock;
 16     // 加锁
 17     lock.lock();
 18     try {
 19         // 获取以后的工作数量
 20         int i = size;
 21         // 判断是否须要扩容(初始容量为 16)22         if (i >= queue.length)
 23             grow();
 24         //size+1
 25         size = i + 1;
 26         if (i == 0) {
 27             // 如果以后是第一个工作的话,就间接放在小顶堆的根节点地位处就行了(队列第一个地位)28             queue[0] = e;
 29             // 同时设置一下以后节点的堆索引位为 0
 30             setIndex(e, 0);
 31         } else {
 32             // 否则就用 siftUp 的形式来插入到应该插入的地位
 33             siftUp(i, e);
 34         }
 35         // 通过下面的插入过程之后,如果小顶堆的根节点还是以后新增加节点的话,阐明新增加节点的延迟时间是最短的
 36         if (queue[0] == e) {
 37             // 那么此时不论有没有 leader 线程,都得将其置为 null
 38             leader = null;
 39             /*
 40             并且从新将条件队列上的一个节点转移到 CLH 队列中(如果以后只有一个节点的时候也会进入到 signal 办法中
 41             但不妨,因为此时条件队列中还没有节点,所以并不会做什么)须要提一点的是:如果真的看过 signal 办法外部实现
 42             的话就会晓得,signal 办法在惯例状况下并不是在做唤醒线程的工作,唤醒是在上面的 unlock 办法中实现的
 43              */
 44             available.signal();
 45         }
 46     } finally {
 47         /*
 48         开释锁(留神,这里只会唤醒 CLH 队列中的 head 节点的下一个节点,可能是下面被锁住的增加工作的其余线程、49         也可能是上次执行完工作后筹备再次拿取工作的线程,还有可能是期待被唤醒的 follower 线程,又或者有其余的
 50         状况。但不论是哪个,只有能保障唤醒动作是始终能被流传上来的就行。ReentrantLock 和阻塞队列的执行细节
 51         详见我之前对 AQS 源码进行剖析的文章)52          */
 53         lock.unlock();
 54     }
 55     return true;
 56 }
 57
 58 /**
 59  * 第 23 行代码处:60  */
 61 private void grow() {
 62     int oldCapacity = queue.length;
 63     // 能够看到这里的扩容策略是 *1.5 的形式
 64     int newCapacity = oldCapacity + (oldCapacity >> 1);
 65     // 如果扩容后的新容量溢出了,就将其复原为 int 的最大值
 66     if (newCapacity < 0)
 67         newCapacity = Integer.MAX_VALUE;
 68     // 应用 Arrays.copyOf(System.arraycopy)的形式来进行数组的拷贝
 69     queue = Arrays.copyOf(queue, newCapacity);
 70 }
 71
 72 /**
 73  * 第 30 行、第 99 行和第 109 行代码处:74  * 设置 f 节点在小顶堆中的索引位为 idx,这样在最初的删除节点时能够通过 index 是否大于 0 来判断以后节点是否仍在小顶堆中
 75  */
 76 private void setIndex(RunnableScheduledFuture<?> f, int idx) {77     if (f instanceof ScheduledFutureTask)
 78         ((ScheduledFutureTask) f).heapIndex = idx;
 79 }
 80
 81 /**
 82  * 第 33 行代码处:83  * 堆排序的精华就在于 siftUp 和 siftDown 办法,但本实现与惯例的实现略有不同,多了一个入参 key
 84  * key 代表以后要插入节点中的工作
 85  */
 86 private void siftUp(int k, RunnableScheduledFuture<?> key) {
 87     // 当 k <= 0 的时候阐明曾经上溯到根节点了
 88     while (k > 0) {89         // 获取父节点的索引((以后节点索引位 -1)/ 2 的形式)90         int parent = (k - 1) >>> 1;
 91         // 获取父节点的工作
 92         RunnableScheduledFuture<?> e = queue[parent];
 93         // 如果以后要插入节点中的工作延迟时间大于父节点的延迟时间的话,就进行上溯过程,阐明找到了插入的地位
 94         if (key.compareTo(e) >= 0)
 95             break;
 96         // 否则就须要将父节点的内容赋值给以后节点
 97         queue[k] = e;
 98         // 同时设置一下父节点的堆索引位为以后节点处
 99         setIndex(e, k);
100         // 而后将父节点赋值给以后节点,持续下一次的上溯过程
101         k = parent;
102     }
103     /*
104     走到这里阐明有两种状况:<1> 曾经完结了上溯的过程,但最初一次的父节点还没有赋值,这里就是进行赋值的操作;105     <2> 如果本办法进来的时候要增加的最初一个节点自身就满足小顶堆条件的话,那么该处就是在给最初一个节点进行赋值
106      */
107     queue[k] = key;
108     // 同时设置一下要插入节点的堆索引位
109     setIndex(key, k);
110 }
111
112 /**
113  * 第 94 行代码处:114  */
115 public int compareTo(Delayed other) {
116     // 如果比拟的就是以后对象,就间接返回 0 相等
117     if (other == this)
118         return 0;
119     if (other instanceof ScheduledFutureTask) {
120         // 如果须要比拟的工作也是 ScheduledFutureTask 类型的话,就首先强转一下类型
121         ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other;
122         // 计算当前任务和须要比拟的工作之间的延迟时间差
123         long diff = time - x.time;
124         if (diff < 0)
125             // 小于 0 阐明当前任务的延迟时间更短,就返回 -1
126             return -1;
127         else if (diff > 0)
128             // 大于 0 阐明须要比拟的工作的延迟时间更短,就返回 1
129             return 1;
130         // 如果两者相等的话,就比拟序列号,谁的序列号更小(序列号是惟一的),就应该先被执行
131         else if (sequenceNumber < x.sequenceNumber)
132             return -1;
133         else
134             return 1;
135     }
136     // 如果须要比拟的工作不是 ScheduledFutureTask 类型的话,就通过 getDelay 的形式来进行比拟
137     long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
138     return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
139 }

7 拿取工作

在下面的 ensurePrestart 办法中会调用到 addWorker 办法,以此来补充工作线程。之前我对 ThreadPoolExecutor 源码进行剖析的文章中说到过,addWorker 办法会调用到 getTask 办法来从队列中拿取工作:

  1 /**
  2  * ThreadPoolExecutor:
  3  */
  4 private Runnable getTask() {
  5     //...
  6     /*
  7     这里的 allowCoreThreadTimeOut 默认为 false(为 true 示意闲暇的外围线程也是要超时销毁的),8     而下面说过定时线程池最多只有“外围线程数”个线程,所以 timed 为 false
  9      */
 10     boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
 11     //...
 12     // 因为 timed 为 false,所以这里会走 take 办法中的逻辑
 13     Runnable r = timed ?
 14             workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
 15             workQueue.take();
 16     //...
 17 }
 18
 19 /**
 20  * ScheduledThreadPoolExecutor:
 21  * 第 15 行代码处:22  * 下面的 take 办法会调用到 DelayedWorkQueue 的 take 办法,而该办法也就是用来实现提早拿取工作的
 23  */
 24 public RunnableScheduledFuture<?> take() throws InterruptedException {
 25     final ReentrantLock lock = this.lock;
 26     // 加锁(响应中断模式)27     lock.lockInterruptibly();
 28     try {29         for (; ;) {
 30             // 获取队头节点
 31             RunnableScheduledFuture<?> first = queue[0];
 32             if (first == null)
 33                 /*
 34                 如果以后提早队列中没有提早工作,就在这里阻塞以后线程(通过 AQS 中条件队列的形式),期待有工作时被唤醒
 35                 另外,当线程执行完工作后也会再次走到 getTask 办法中的本办法中。如果此时没工作了,就会在此被阻塞休眠住
 36(我在之前 AQS 源码剖析的文章中说过:await 办法中会开释掉所有的 ReentrantLock 锁资源,而后才会被阻塞住)37                  */
 38                 available.await();
 39             else {
 40                 // 否则就获取队头的残余延迟时间
 41                 long delay = first.getDelay(NANOSECONDS);
 42                 // 如果延迟时间曾经到了的话,就删除并返回队头,示意拿取到了工作
 43                 if (delay <= 0)
 44                     return finishPoll(first);
 45                 /*
 46                 这里将队头节点的援用置为 null,如果不置为 null 的话,可能有多个期待着的线程同时持有着队头节点的
 47                 first 援用,这样如果要删除队头节点的话,因为其还有其余线程的援用,所以不能被及时回收,造成内存透露
 48                  */
 49                 first = null;
 50                 /*
 51                 如果 leader 不为 null,阐明有其余的线程曾经成为了 leader 线程,正在提早期待着
 52                 同时此时没有新的延迟时间最短的节点进入到提早队列中
 53                  */
 54                 if (leader != null)
 55                     /*
 56                     那么以后线程就变成了 follower 线程,须要被阻塞住,期待被唤醒(同上,其中会开释掉所有的锁资源)57                     线程执行完工作后也会再次走到本办法中拿取工作,如果走到这里发现曾经有别的 leader 线程了,58                     那么以后线程也会被阻塞休眠住;否则就会在上面的 else 分支中再次成为 leader 线程
 59                      */
 60                     available.await();
 61                 else {
 62                     /*
 63                     leader 为 null,可能是上一个 leader 线程拿取到工作后唤醒的下一个线程,也有可能
 64                     是一个新的延迟时间最短的节点进入到提早队列中,从而将 leader 置为 null
 65 
 66                     此时获取以后线程
 67                      */
 68                     Thread thisThread = Thread.currentThread();
 69                     // 并将 leader 置为以后线程,也就是以后线程成为了 leader 线程
 70                     leader = thisThread;
 71                     try {
 72                         /*
 73                         这里也就是在做具体的延时期待 delay 纳秒的操作了,具体波及到 AQS 中条件队列的相干操作
 74                         如果被唤醒的话可能是因为达到了延迟时间从而醒来;也有可能是被别的线程 signal 唤醒了;75                         还有可能是中断被唤醒。失常状况下是等达到了延迟时间后,这里会醒来并进入到下一次循环中的
 76                         finishPoll 办法中,剔除队头节点并最终返回(awaitNanos 办法和 await 办法相似,其中会开释掉
 77                         所有的锁资源;不一样的是在被唤醒时会把以后节点从条件队列中“转移”到 CLH 队列中。这里能够认为
 78                         是转移,因为在条件队列中的该节点状态曾经改为了 0,相当于是个垃圾节点,后续会进行删除)79                          */
 80                         available.awaitNanos(delay);
 81                     } finally {
 82                         /*
 83                         不论 awaitNanos 是如何被唤醒的,此时会判断以后的 leader 线程是否还是以后线程
 84                         如果是的话就将 leader 置为 null,也就是以后线程不再是 leader 线程了
 85                          */
 86                         if (leader == thisThread)
 87                             leader = null;
 88                     }
 89                 }
 90             }
 91         }
 92     } finally {
 93         // 在退出本办法之前,判断如果 leader 线程为 null 并且删除队头后的提早队列依然不为空的话(阐明此时有其余的提早工作)94         if (leader == null && queue[0] != null)
 95             // 就将条件队列上的一个节点转移到 CLH 队列中(同时会剔除下面的垃圾条件节点)96             available.signal();
 97         /*
 98         开释锁(同 offer 办法中的逻辑,这里只会唤醒 CLH 队列中的 head 节点的下一个节点。这里就体现了
 99         Leader-Follower 模式:当 leader 线程拿取到工作后筹备要执行时,会首先唤醒剩下线程中的一个,100         它将会成为新的 leader 线程,并以此往返。保障在任何工夫都只有一个 leader 线程,防止不必要的唤醒与睡眠)101          */
102         lock.unlock();
103     }
104 }
105
106 /**
107  * 第 44 行代码处:108  */
109 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
110     //size-1
111     int s = --size;
112     // 获取队列中的最初一个节点
113     RunnableScheduledFuture<?> x = queue[s];
114     // 并置空它,便于 GC,这里也就是在删除最初一个节点
115     queue[s] = null;
116     // 如果删除前提早队列中有不止一个节点的话,就进入到 siftDown 办法中,将小顶堆中的根节点删除,并且从新保护小顶堆
117     if (s != 0)
118         siftDown(0, x);
119     // 同时设置一下删除前的根节点的堆索引位为 -1,示意其不存在于小顶堆中了
120     setIndex(f, -1);
121     // 最初将其返回进来
122     return f;
123 }
124
125 /**
126  * 第 118 行代码处:127  * 办法参数中的 key 代表删除的最初一个节点中的工作
128  */
129 private void siftDown(int k, RunnableScheduledFuture<?> key) {
130     /*
131     这里会取数组长度的一半 half(留神这里的 size 是曾经删除最初一个节点后的 size),132     而 half 也就是在指向最初一个非叶子节点的下一个节点
133      */
134     int half = size >>> 1;
135     // 从这里能够看出下溯的终止条件是 k 大于等于 half,也就是此时遍历到曾经没有了非叶子节点,天然不须要进行调整
136     while (k < half) {
137         // 获取左孩子节点的索引位
138         int child = (k << 1) + 1;
139         // 获取左孩子节点的工作
140         RunnableScheduledFuture<?> c = queue[child];
141         // 获取右孩子节点的索引位
142         int right = child + 1;
143         // 如果右孩子节点的索引位小于 size,也就是在说以后节点含有右子树。并且左孩子节点的工作延迟时间大于右孩子节点的话
144         if (right < size && c.compareTo(queue[right]) > 0)
145             // 就将 c 从新指向为右孩子节点
146             c = queue[child = right];
147         /*
148         走到这里阐明 c 指向的是左右子节点中、工作延迟时间较小的那个节点。此时判断如果最初一个节点的
149         工作延迟时间小于等于这个较小节点的话,就能够进行下溯了,阐明找到了插入的地位
150          */
151         if (key.compareTo(c) <= 0)
152             break;
153         // 否则就把较小的那个节点赋值给以后节点处
154         queue[k] = c;
155         // 同时设置一下延迟时间较小的那个节点的堆索引位为以后节点处
156         setIndex(c, k);
157         // 而后将以后节点指向那个较小的节点,持续下一次循环
158         k = child;
159     }
160     /*
161     同 siftUp 办法一样,走到这里阐明有两种状况:<1> 曾经完结了下溯的过程,但最初一次的子节点还没有赋值,162     这里会把其赋值为之前删除的最初一个节点;163     <2> 如果根节点的左右子节点中、工作延迟时间较小的那个节点自身的延迟时间就比之前删除节点大的话,164     就会把根节点替换为之前删除的最初一个节点
165     所以本办法加上 finishPoll 办法,实际上并没有将最初一个节点删除,最初一个节点中的工作始终都是保留着的
166(也就是 key),而是变相地将堆的根节点删除了(在第一种状况中根节点在第一次赋值为左右子节点中、167     工作延迟时间较小的那个节点时,就曾经被笼罩了)168      */
169     queue[k] = key;
170     // 同时设置一下最初一个节点当初新的堆索引位
171     setIndex(key, k);
172 }

8 执行提早工作

拿取到工作之后,就是具体的执行工作了。addWorker 办法具体的执行逻辑我在之前 ThreadPoolExecutor 的源码剖析文章中曾经讲过了,其中执行工作的时候会调用 task 的 run 办法,也就是这里包装为 ScheduledFutureTask 的 run 办法:

 1 /**
 2  * ScheduledThreadPoolExecutor:
 3  */
 4 public void run() {
 5     // 判断是否是周期性工作
 6     boolean periodic = isPeriodic();
 7     if (!canRunInCurrentRunState(periodic)) {
 8         // 如果此时不能继续执行工作的话,就尝试勾销此工作的执行
 9         cancel(false);
10     } else if (!periodic)
11         /*
12         如果是提早工作,就调用 ScheduledFutureTask 父类 FutureTask 的 run 办法,13         其中会通过 call 办法来最终调用到使用者具体写的工作
14          */
15         ScheduledFutureTask.super.run();
16     else if (ScheduledFutureTask.super.runAndReset()) {
17         // 周期性工作的执行放在下一节中进行剖析
18         setNextRunTime();
19         reExecutePeriodic(outerTask);
20     }
21 }

9 scheduleAtFixedRate & scheduleWithFixedDelay 办法

scheduleAtFixedRate 办法是以上次的延迟时间点开始,提早指定工夫后再次执行当前任务;而 scheduleWithFixedDelay 办法是以上个周期工作执行结束后的工夫点开始,提早指定工夫后再次执行当前任务。因为这两个办法的实现绝大部分都是一样的,所以合在一起来进行剖析:

  1 /**
  2  * ScheduledThreadPoolExecutor:
  3  * scheduleAtFixedRate 办法
  4  */
  5 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
  6                                               long initialDelay,
  7                                               long period,
  8                                               TimeUnit unit) {
  9     // 非空校验
 10     if (command == null || unit == null)
 11         throw new NullPointerException();
 12     // 非负校验
 13     if (period <= 0)
 14         throw new IllegalArgumentException();
 15     // 包装工作
 16     ScheduledFutureTask<Void> sft =
 17             new ScheduledFutureTask<Void>(command,
 18                     null,
 19                     triggerTime(initialDelay, unit),
 20                     unit.toNanos(period));
 21     RunnableScheduledFuture<Void> t = decorateTask(command, sft);
 22     // 把工作赋值给 ScheduledFutureTask 的 outerTask 属性
 23     sft.outerTask = t;
 24     // 提早执行
 25     delayedExecute(t);
 26     return t;
 27 }
 28
 29 /**
 30  * scheduleWithFixedDelay 办法
 31  */
 32 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
 33                                                  long initialDelay,
 34                                                  long delay,
 35                                                  TimeUnit unit) {
 36     // 非空校验
 37     if (command == null || unit == null)
 38         throw new NullPointerException();
 39     // 非负校验
 40     if (delay <= 0)
 41         throw new IllegalArgumentException();
 42     // 包装工作
 43     ScheduledFutureTask<Void> sft =
 44             new ScheduledFutureTask<Void>(command,
 45                     null,
 46                     triggerTime(initialDelay, unit),
 47                     unit.toNanos(-delay));
 48     RunnableScheduledFuture<Void> t = decorateTask(command, sft);
 49     // 把工作赋值给 ScheduledFutureTask 的 outerTask 属性
 50     sft.outerTask = t;
 51     // 提早执行
 52     delayedExecute(t);
 53     return t;
 54 }
 55
 56 /**
 57  * 第 17 行和第 44 行代码处:58  */
 59 ScheduledFutureTask(Runnable r, V result, long ns, long period) {60     super(r, result);
 61     this.time = ns;
 62     /*
 63     能够看到这里与 schedule 办法中调用 ScheduledFutureTask 结构器的区别是多了一个 period 入参
 64     在 schedule 办法中 this.period 赋值为 0,而这里会赋值为周期时间。其余的代码都是一样的
 65     如果仔细的话能够看出:在下面 scheduleAtFixedRate 办法传入的 period 是一个大于 0 的数,而
 66     scheduleWithFixedDelay 办法传入的 period 是一个小于 0 的数,以此来进行辨别
 67      */
 68     this.period = period;
 69     this.sequenceNumber = sequencer.getAndIncrement();
 70 }

10 执行周期性工作

周期性工作和提早工作的拿取工作逻辑都是一样的,而在上面具体运行工作时有所不同,上面就来看一下其实现的差别:

 1 /**
 2  * ScheduledThreadPoolExecutor:
 3  */
 4 public void run() {5     boolean periodic = isPeriodic();
 6     if (!canRunInCurrentRunState(periodic))
 7         cancel(false);
 8     else if (!periodic)
 9         ScheduledFutureTask.super.run();
10     /*
11     后面都是之前剖析过的,而周期性工作会走上面的分支中
12
13     FutureTask 的 runAndReset 办法相比于 run 办法来说,区别在于能够反复计算(run 办法不能复用)14     因为 runAndReset 办法在计算实现后不会批改状态,状态始终都是 NEW
15      */
16     else if (ScheduledFutureTask.super.runAndReset()) {
17         // 设置下次的运行工夫点
18         setNextRunTime();
19         // 从新增加工作
20         reExecutePeriodic(outerTask);
21     }
22 }
23
24 /**
25  * 第 18 行代码处:26  */
27 private void setNextRunTime() {
28     /*
29     这里会获取 period,也就是之前设置的周期时间。下面说过,通过 period 的正负就能够辨别出到底调用的是
30     scheduleAtFixedRate 办法还是 scheduleWithFixedDelay 办法
31      */
32     long p = period;
33     if (p > 0)
34         /*
35         如果调用的是 scheduleAtFixedRate 办法,下一次的周期工作工夫点就是起始的延迟时间加上周期时间,须要留神的是:36         如果工作执行的工夫大于周期时间 period 的话,那么定时线程池就不会依照原先设计的延迟时间进行执行,而是会依照近似于
37         工作执行的工夫来作为提早的距离(不论外围线程有多少个都是如此,因为工作是放在提早队列中的、是线性执行的)38          */
39         time += p;
40     else
41         /*
42         triggerTime 办法之前剖析过是获取以后工夫 + 延迟时间后的后果,而此时是在执行完工作后,也就是说:43         如果调用的是 scheduleWithFixedDelay 办法,下一次的周期工作工夫点就是执行完上次工作后的工夫点加上周期时间
44         由此能够看出,scheduleAtFixedRate 办法和 scheduleWithFixedDelay 办法的区别就在于下一次 time 设置的不同而已
45          */
46         time = triggerTime(-p);
47     //time 属性会记录到节点中,在小顶堆中通过 compareTo 办法来进行排序
48 }
49
50 /**
51  * 第 20 行代码处:52  */
53 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
54     // 判断此时是否还能继续执行工作
55     if (canRunInCurrentRunState(true)) {
56         /*
57         这里也就是从新往提早队列中增加工作,以此达到周期执行的成果。增加之后在 getTask 办法中的 take 办法中
58         就又能够拿到这个工作。设置下次的执行工夫,而后再增加工作... 周而复始
59          */
60         super.getQueue().add(task);
61         // 增加后持续判断此时是否还能继续执行工作,如果不能的话就删除下面增加的工作
62         if (!canRunInCurrentRunState(true) && remove(task))
63             // 同时会勾销此工作的执行
64             task.cancel(false);
65         else
66             // 否则,阐明线程池是能够继续执行工作的,就去判断此时是否须要补充工作线程
67             ensurePrestart();
68     }
69 }

留神:网上的一种说法是:scheduleAtFixedRate 办法是以上一个工作开始的工夫计时,period 工夫过来后,检测上一个工作是否执行结束。如果上一个工作执行结束,则当前任务立刻执行;如果上一个工作没有执行结束,则须要等上一个工作执行结束后立刻执行。实际上这种说法是谬误的,只管它的表象是对的。正确的说法是:如果工作的执行工夫小于周期时间的话,则会以上次工作执行开始工夫加上周期时间后,再去执行下一次工作;而如果工作的执行工夫大于周期时间的话,则会等到上次工作执行结束后立刻(近似于)执行下次工作。这两种说法的区别就在于工作的执行工夫大于周期时间的时候,检测上一个工作是否结束的机会不同。实际上在 period 工夫过来后,基本不会有任何的检测机制。因为只有等上次工作执行结束后才会往提早队列中增加下一次工作,从而触发各种后续的动作。所以在 period 工夫点时,以后线程还在执行工作中,而其余的线程因为提早队列中为空会处于休眠的状态(如果就只有一个周期工作的话)。所以基本不会有所谓的“检测”的说法,这种说法也只能说是想当然了。还是那句话:“Talk is cheap. Show me the code.”

既然都说到这里了,那么当初就想来尝试剖析一下如果工作的执行工夫大于周期时间的话,具体是怎么的一个执行流程?

为了便于剖析,假如当初是只有一个周期工作的场景,那么提早队列中的工作数量最多就只会有 1 个:拿取到工作,提早队列中就变为空。执行完工作的时候,就又会往队列中放一个工作。这样其余抢不到工作的线程就会被休眠住。而增加工作的时候因为每次从新增加的工作都是小顶堆的根节点(从无到有),即增加的这个工作就是此时延迟时间最短的工作,所以同时会触发尝试唤醒线程的动作。

同时在增加下一个工作前会批改下一次的工夫点。在 setNextRunTime 办法中,scheduleAtFixedRate 办法是以上一次的延迟时间点加上周期时间来作为下一次的延迟时间点的,并不是 scheduleWithFixedDelay 办法获取以后工夫加上周期时间的形式。在以后这种状况下周期时间是要小于工作的执行工夫的,也就是说会造成下一次的延迟时间点会赋值为一个曾经过期的工夫。且随着周期的减少,下一次的延迟时间点会离以后工夫点越来越远。既然下一次的延迟时间点曾经过期了,那么就会去立马执行工作。

所以总结一下:须要被唤醒的线程和上次执行完工作的线程就会去争抢锁资源(唤醒线程会把以后节点放进 CLH 队列中,上次执行完工作的线程也会再次走到 lockInterruptibly 办法中(在它从新放工作的时候也会经验一次 lock),同时因为是 ReentrantLock 非偏心锁,这样在调用 unlock 解锁时就会呈现在 CLH 队列上的抢资源景象了),抢到的就会立马去执行下一次的周期工作,而不会有任何的延时,造成的表象就是会以一个近似于工作执行工夫为距离的周期来执行工作。

11 shutdown 办法

 1 /**
 2  * ScheduledThreadPoolExecutor:
 3  * 能够看到,定时线程池的 shutdown 办法是应用的父类 ThreadPoolExecutor 的 shutdown 办法,4  * 而该办法我在之前的 ThreadPoolExecutor 的源码剖析文章中曾经剖析过了。然而其中会调用
 5  * onShutdown 的钩子办法,也就是在 ScheduledThreadPoolExecutor 中的实现
 6  */
 7 public void shutdown() {8     super.shutdown();
 9 }
10
11 @Override
12 void onShutdown() {
13     // 获取提早队列
14     BlockingQueue<Runnable> q = super.getQueue();
15     // 敞开线程池时判断是否须要继续执行提早工作
16     boolean keepDelayed =
17             getExecuteExistingDelayedTasksAfterShutdownPolicy();
18     // 敞开线程池时判断是否须要继续执行周期性工作
19     boolean keepPeriodic =
20             getContinueExistingPeriodicTasksAfterShutdownPolicy();
21     if (!keepDelayed && !keepPeriodic) {
22         // 如果都不需要的话,就将提早队列中的工作一一勾销(并删除)23         for (Object e : q.toArray())
24             if (e instanceof RunnableScheduledFuture<?>)
25                 ((RunnableScheduledFuture<?>) e).cancel(false);
26         // 最初做清理工作
27         q.clear();
28     } else {29         for (Object e : q.toArray()) {30             if (e instanceof RunnableScheduledFuture) {
31                 // 否则就判断如果工作是 RunnableScheduledFuture 类型的,就强转一下类型
32                 RunnableScheduledFuture<?> t =
33                         (RunnableScheduledFuture<?>) e;
34                 // 如果敞开线程池时不须要继续执行工作,又或者须要继续执行然而工作曾经勾销了
35                 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
36                         t.isCancelled()) {
37                     // 就删除以后节点
38                     if (q.remove(t))
39                         // 同时勾销工作
40                         t.cancel(false);
41                 }
42             }
43         }
44     }
45     // 依据线程池状态来判断是否应该完结线程池
46     tryTerminate();
47 }
48
49 /**
50  * 第 27 行代码处:51  */
52 public void clear() {
53     final ReentrantLock lock = this.lock;
54     // 加锁
55     lock.lock();
56     try {57         for (int i = 0; i < size; i++) {
58             // 遍历取得提早队列中的每一个节点
59             RunnableScheduledFuture<?> t = queue[i];
60             if (t != null) {
61                 // 将节点置为 null
62                 queue[i] = null;
63                 // 同时将索引地位为 -1(recheck)64                 setIndex(t, -1);
65             }
66         }
67         //size 赋为初始值 0
68         size = 0;
69     } finally {
70         // 开释锁
71         lock.unlock();
72     }
73 }

更多内容请关注微信公众号:奇客工夫

退出移动版