Java版本:8u261。
1 简介
ScheduledThreadPoolExecutor即定时线程池,是用来执行提早工作或周期性工作的。相比于Timer的单线程,定时线程池在遇到工作抛出异样的时候不会敞开整个线程池,更加强壮(须要提一下的是:ScheduledThreadPoolExecutor和ThreadPoolExecutor一样,如果执行工作的过程中抛异样的话,这个工作是会被抛弃的。所以在工作的执行过程中须要对异样做捕捉解决,有必要的话须要做弥补措施)。
传进来的工作会被包装为ScheduledFutureTask,其继承于FutureTask,提供异步执行的能力,并且能够返回执行后果。同时实现了Delayed接口,能够通过getDelay办法来获取延迟时间。
相比于ThreadPoolExecutor,ScheduledThreadPoolExecutor中应用的队列是DelayedWorkQueue,是一个无界的队列。所以在定时线程池中,最大线程数是没有意义的(最大线程数会固定为int的最大值,且不会作为定时线程池的参数)。在ThreadPoolExecutor中,如果以后线程数小于外围线程数就间接创立外围线程来执行工作,大于等于外围线程数的话才往阻塞队列中放入工作;而在ScheduledThreadPoolExecutor中却不是这种逻辑。ScheduledThreadPoolExecutor中上来就会把工作放进提早队列中,而后再去期待执行。
1.1 小顶堆
DelayedWorkQueue的实现有些非凡,是基于小顶堆构建的(与DelayQueue和PriorityQueue相似)。因为要保障每次从提早队列中拿取到的工作是距当初最近的一个,所以应用小顶堆构造来构建是再适宜不过了(堆构造也经常用来解决前N小和前N大的问题)。小顶堆保障每个节点的值不小于其父节点的值,而不大于其孩子节点的值,而对于同级节点来说则没有什么限度。这样在小顶堆中值最小的点永远保障是在根节点处。如果用数组来构建小顶堆的话,值最小的点就在数组中的第一个地位处。
图中红色的数字代表节点在数组中的索引地位,由此能够看出堆的另一条性质是:假如以后节点的索引是k,那么其父节点的索引是:(k-1)/2;左孩子节点的索引是:k2+1;而右孩子节点的索引是k2+2。
构建堆的两个外围办法是siftUp和siftDown,siftUp办法用于增加节点时的上溯过程;而siftDown办法用于删除节点时的下溯过程。具体的实现源码会在上面进行剖析,这里就画图来了解一下(上面只会剖析经典的小顶堆增加和删除节点的实现,而在源码中的实现略有不同,但外围都是一样的):
1.1.1 增加节点
如果在下面的siftUp过程中,发现某一次以后节点的值就曾经大于了父节点的值,siftUp过程也就会提前终止了。同时能够看出:在下面的siftUp以及上面将要讲的siftDown操作过程中,每次都只会比拟并替换以后节点和其父子节点的值,而不是整个堆都产生变动,升高了工夫复杂度。
1.1.2 删除节点
删除节点分为三种状况,首先来看一下删除根节点的状况:
而后是删除最初一个节点的状况。删除最初一个节点是最简略的,只须要进行删除就行了,因为这并不影响小顶堆的构造,不须要进行调整。这里就不再展现了(留神:删除除了最初一个节点的其余叶子节点并不属于以后这种状况,而是属于上面第三种状况。也就是说删除这些叶子节点并不能简略地删除它们就完了的,因为堆构造首先得保障是一颗齐全二叉树)。
最初是删除既不是根节点又不是最初一个节点的状况:
在删除既不是根节点又不是最初一个节点的时候,能够看到执行了一次siftDown并随同了一次siftUp的过程。然而这个siftUp过程并不是会肯定触发的,只有满足最初一个节点的值比要删除节点的父节点的值还要小的时候才会触发siftUp操作(这个很好推理:在小顶堆中如果最初一个节点值比要删除节点的父节点值要小的话,那么要删除节点的左右孩子节点值也必然是都大于最初一个节点值的(不思考值相等的状况),那么此时就不会产生siftDown操作;而如果产生了siftDown操作,就阐明最初一个节点值至多要比要删除节点的左右孩子节点中的一个要大(如果有左右孩子节点的话)。而孙子节点值是必定要大于爷爷节点值的(不思考值相等的状况),所以也就是说产生了siftDown操作的时候,最初一个节点值是比要删除节点的父节点值大的。这个时候孙子节点和最初一个节点siftDown替换后,仍然是满足小顶堆性质的,所以就不须要附加的siftUp操作;还有一种状况是最初一个节点值是介于要删除节点的父节点值和要删除节点的左右孩子节点值中的较小者,那么这个时候既不会产生siftDown,也不会产生siftUp)。
而源码中的实现和下面的经典实现最大的不同就是不会有节点彼此替换的操作。在siftUp和siftDown的经典实现中,如果须要变动节点时,都会来一次父子节点的互相交换操作(包含删除节点时首先做的要删除节点和最初一个节点之间的替换操作也是如此)。如果认真思考的话,就会发现这其实是多余的。在须要替换节点的时候,只须要siftUp操作时的父节点或siftDown时的孩子节点从新移到以后须要比拟的节点地位上,而比拟节点是不须要挪动到它们的地位上的。此时间接进入到下一次的判断中,反复siftUp或siftDown过程,直到最初找到了比拟节点的插入地位后,才会将其插入进去。这样做的益处是能够省去一半的节点赋值的操作,进步了执行的效率。同时这也就意味着,须要将要比拟的节点作为参数保存起来,而源码中也正是这么实现的。
1.2 Leader-Follower模式
ScheduledThreadPoolExecutor中应用了Leader-Follower模式。这是一种设计思维,如果说当初有一堆期待执行的工作(个别是寄存在一个队列中排好序),而所有的工作线程中只会有一个是leader线程,其余的线程都是follower线程。只有leader线程能执行工作,而剩下的follower线程则不会执行工作,它们会处在休眠中的状态。当leader线程拿到工作后执行工作前,本人会变成follower线程,同时会选出一个新的leader线程,而后才去执行工作。如果此时有下一个工作,就是这个新的leader线程来执行了,并以此往返这个过程。当之前那个执行工作的线程执行结束再回来时,会判断如果此时曾经没工作了,又或者有工作然而有其余的线程作为leader线程,那么本人就休眠了;如果此时有工作然而没有leader线程,那么本人就会从新成为leader线程来执行工作。
不像ThreadPoolExecutor是须要立刻执行工作的,ScheduledThreadPoolExecutor中的工作是提早执行的,而拿取工作也是提早拿取的。所以并不需要所有的线程都处于运行状态延时期待获取工作。而如果这么做的话,最初也只会有一个线程能执行当前任务,其余的线程还是会被再次休眠的(这里只是在说单任务多线程的状况,但对于多任务来说也是一样的,总结来说就是Leader-Follower模式只会唤醒真正须要“干事”的线程)。这是很没有必要的,而且浪费资源。所以应用Leader-Follower模式的益处是:防止没必要的唤醒和阻塞的操作,这样会更加无效,且节俭资源。
2 结构器
1 /**
2 * ScheduledThreadPoolExecutor:
3 */
4 public ScheduledThreadPoolExecutor(int corePoolSize) {
5 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
6 new DelayedWorkQueue());
7 }
8
9 /**
10 * ThreadPoolExecutor:
11 */
12 public ThreadPoolExecutor(int corePoolSize,
13 int maximumPoolSize,
14 long keepAliveTime,
15 TimeUnit unit,
16 BlockingQueue<Runnable> workQueue) {
17 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
18 Executors.defaultThreadFactory(), defaultHandler);
19 }
能够看到:ScheduledThreadPoolExecutor的结构器是调用了父类ThreadPoolExecutor的结构器来实现的,而父类的结构器以及之中的所有参数我在之前剖析ThreadPoolExecutor的源码文章中讲过,这里就不再赘述了。
3 schedule办法
execute办法和submit办法外部都是调用的schedule办法,所以来看一下其实现:
1 /**
2 * ScheduledThreadPoolExecutor:
3 */
4 public ScheduledFuture<?> schedule(Runnable command,
5 long delay,
6 TimeUnit unit) {
7 //非空校验
8 if (command == null || unit == null)
9 throw new NullPointerException();
10 //包装工作
11 RunnableScheduledFuture<?> t = decorateTask(command,
12 new ScheduledFutureTask<Void>(command, null,
13 triggerTime(delay, unit)));
14 //提早执行
15 delayedExecute(t);
16 return t;
17 }
18
19 /**
20 * 第13行代码处:
21 * 提早操作的触发工夫
22 */
23 private long triggerTime(long delay, TimeUnit unit) {
24 //delay非负解决
25 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
26 }
27
28 long triggerTime(long delay) {
29 /*
30 now办法外部就一句话:“System.nanoTime();”,也就是获取以后工夫。这里也就是获取
31 以后工夫加上延迟时间后的后果。如果延迟时间超过了下限,会在overflowFree办法中解决
32 */
33 return now() +
34 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
35 }
36
37 private long overflowFree(long delay) {
38 //获取队头节点(不移除)
39 Delayed head = (Delayed) super.getQueue().peek();
40 if (head != null) {
41 //获取队头的残余延迟时间
42 long headDelay = head.getDelay(NANOSECONDS);
43 /*
44 能走进本办法中,就阐明delay是一个靠近long最大值的数。此时判断如果headDelay小于0
45 就阐明延迟时间曾经到了或过期了然而还没有执行,并且delay和headDelay的差值小于0,阐明headDelay
46 和delay的差值曾经超过了long的范畴
47 */
48 if (headDelay < 0 && (delay - headDelay < 0))
49 //此时更新一下delay的值,确保其和headDelay的差值在long的范畴内,同时delay也会从新变成一个负数
50 delay = Long.MAX_VALUE + headDelay;
51 }
52 return delay;
53 }
54
55 /**
56 * 第39行代码处:
57 * 调用DelayedWorkQueue中覆写的peek办法来获取队头节点
58 */
59 public RunnableScheduledFuture<?> peek() {
60 final ReentrantLock lock = this.lock;
61 lock.lock();
62 try {
63 return queue[0];
64 } finally {
65 lock.unlock();
66 }
67 }
68
69 /**
70 * 第42行代码处:
71 * 能够看到本办法就是获取延迟时间和以后工夫的差值
72 */
73 public long getDelay(TimeUnit unit) {
74 return unit.convert(time - now(), NANOSECONDS);
75 }
4 包装工作
下面第11行和第12行代码处会进行工作的包装:
1 /**
2 * ScheduledThreadPoolExecutor:
3 */
4 ScheduledFutureTask(Runnable r, V result, long ns) {
5 //调用父类FutureTask的结构器
6 super(r, result);
7 //这里会将延迟时间赋值给this.time
8 this.time = ns;
9 //period用来示意工作的类型,为0示意提早工作,否则示意周期性工作
10 this.period = 0;
11 //这里会给每一个工作赋值一个惟一的序列号。当延迟时间雷同时,会以该序列号来进行判断。序列号小的会出队
12 this.sequenceNumber = sequencer.getAndIncrement();
13 }
14
15 /**
16 * schedule办法第11行代码处:
17 * 包装工作,这里只是返回task而已,子类能够覆写本办法中的逻辑
18 */
19 protected <V> RunnableScheduledFuture<V> decorateTask(
20 Runnable runnable, RunnableScheduledFuture<V> task) {
21 return task;
22 }
5 delayedExecute办法
在schedule办法的第15行代码处会执行提早工作,增加工作和补充工作线程:
1 /**
2 * ScheduledThreadPoolExecutor:
3 */
4 private void delayedExecute(RunnableScheduledFuture<?> task) {
5 if (isShutdown())
6 /*
7 这里会调用父类ThreadPoolExecutor的isShutdown办法来判断以后线程池是否处于敞开或正在敞开的状态,
8 如果是的话就执行具体的回绝策略
9 */
10 reject(task);
11 else {
12 //否则就往提早队列中增加当前任务
13 super.getQueue().add(task);
14 /*
15 增加后持续判断以后线程池是否处于敞开或正在敞开的状态,如果是的话就判断此时是否还能继续执行工作,
16 如果不能的话就删除下面增加的工作
17 */
18 if (isShutdown() &&
19 !canRunInCurrentRunState(task.isPeriodic()) &&
20 remove(task))
21 //同时会勾销此工作的执行
22 task.cancel(false);
23 else
24 //否则,阐明线程池是能够继续执行工作的,就去判断此时是否须要补充工作线程
25 ensurePrestart();
26 }
27 }
28
29 /**
30 * 第19行代码处:
31 * 传进来的periodic示意工作是否是周期性工作,如果是的话就是true(通过“period != 0”进行判断)
32 */
33 boolean canRunInCurrentRunState(boolean periodic) {
34 return isRunningOrShutdown(periodic ?
35 //敞开线程池时判断是否须要继续执行周期性工作
36 continueExistingPeriodicTasksAfterShutdown :
37 //敞开线程池时判断是否须要继续执行提早工作
38 executeExistingDelayedTasksAfterShutdown);
39 }
40
41 /**
42 * ThreadPoolExecutor:
43 */
44 final boolean isRunningOrShutdown(boolean shutdownOK) {
45 //获取以后线程池的运行状态
46 int rs = runStateOf(ctl.get());
47 //如果是RUNNING状态的,或者是SHUTDOWN状态并且是能继续执行工作的,就返回true
48 return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
49 }
50
51 /**
52 * ScheduledThreadPoolExecutor:
53 * 下面第20行代码处的remove办法会调用ThreadPoolExecutor的remove办法,而该办法我在之前的
54 * ThreadPoolExecutor的源码剖析文章中曾经剖析过了。然而其中会调用提早队列覆写的remove逻辑,
55 * 也就是本办法(同时第130行代码处也会调用到这里)
56 */
57 public boolean remove(Object x) {
58 final ReentrantLock lock = this.lock;
59 //加锁
60 lock.lock();
61 try {
62 //获取以后节点的堆索引位
63 int i = indexOf(x);
64 if (i < 0)
65 //如果找不到的话,就间接返回false
66 return false;
67
68 //将以后节点的索引位设置为-1,因为上面要进行删除了
69 setIndex(queue[i], -1);
70 //size-1
71 int s = --size;
72 //获取小顶堆的最初一个节点,用于替换
73 RunnableScheduledFuture<?> replacement = queue[s];
74 //将最初一个节点置为null
75 queue[s] = null;
76 //如果要删除的节点自身就是最初一个节点的话,就能够间接返回true了,因为不影响小顶堆的构造
77 if (s != i) {
78 /*
79 否则执行一次siftDown下溯过程,将最初一个节点的值从新插入到小顶堆中
80 这其中会删除i地位处的节点(siftDown办法前面会再次调用,到时候再来详细分析该办法的实现)
81 */
82 siftDown(i, replacement);
83 /*
84 通过下面的siftDown的操作后,如果最初一个节点的延迟时间自身就比要删除的节点的小的话,
85 那么就会间接将最初一个节点放在要删除节点的地位上。此时从删除节点到其上面的节点都是满足
86 小顶堆构造的,然而不能保障replacement也就是以后删除后的替换节点和其父节点之间满足小顶堆
87 构造,也就是说可能呈现replacement节点的延迟时间比其父节点的还小的状况
88 */
89 if (queue[i] == replacement)
90 //那么此时就调用一次siftUp上溯操作,再次调整replacement节点其上的小顶堆的构造即可
91 siftUp(i, replacement);
92 }
93 return true;
94 } finally {
95 //开释锁
96 lock.unlock();
97 }
98 }
99
100 /**
101 * 第63行代码处:
102 */
103 private int indexOf(Object x) {
104 if (x != null) {
105 if (x instanceof ScheduledFutureTask) {
106 //如果以后节点是ScheduledFutureTask类型的,就获取它的堆索引位
107 int i = ((ScheduledFutureTask) x).heapIndex;
108 //大于等于0和小于size阐明以后节点还在小顶堆中,并且以后节点还在提早队列中的话,就间接返回该索引位
109 if (i >= 0 && i < size && queue[i] == x)
110 return i;
111 } else {
112 //否则就依照一般遍历的形式查找是否有相等的节点,如果有的话就返回索引位
113 for (int i = 0; i < size; i++)
114 if (x.equals(queue[i]))
115 return i;
116 }
117 }
118 //找不到的话就返回-1
119 return -1;
120 }
121
122 /**
123 * 第22行代码处:
124 */
125 public boolean cancel(boolean mayInterruptIfRunning) {
126 //调用FutureTask的cancel办法来尝试勾销此工作的执行
127 boolean cancelled = super.cancel(mayInterruptIfRunning);
128 //如果勾销胜利了,并且容许删除节点,并且以后节点存在于小顶堆中的话,就删除它
129 if (cancelled && removeOnCancel && heapIndex >= 0)
130 remove(this);
131 return cancelled;
132 }
133
134 /**
135 * ThreadPoolExecutor:
136 * 第25行代码处:
137 */
138 void ensurePrestart() {
139 //获取以后线程池的工作线程数
140 int wc = workerCountOf(ctl.get());
141 if (wc < corePoolSize)
142 /*
143 如果小于外围线程数,就增加一个外围线程,之前我在剖析ThreadPoolExecutor的源码文章中讲过,
144 addWorker办法的执行中会同时启动运行线程。这里传入的firstTask参数为null,因为不须要立刻执行工作,
145 而是从提早队列中拿取工作
146 */
147 addWorker(null, true);
148 else if (wc == 0)
149 //如果以后没有工作线程,就去增加一个非核心线程,而后运行它。保障至多要有一个线程
150 addWorker(null, false);
151 /*
152 从这里能够看出,如果以后的工作线程数曾经达到了外围线程数后,就不会再创立工作线程了
153 定时线程池最多只有“外围线程数”个线程,也就是通过结构器传进来的参数大小
154 */
155 }
6 增加工作
因为提早队列是用小顶堆构建的,所以增加的时候会波及到小顶堆的调整:
1 /**
2 * ScheduledThreadPoolExecutor:
3 * 这里会调用DelayedWorkQueue的add办法
4 */
5 public boolean add(Runnable e) {
6 return offer(e);
7 }
8
9 public boolean offer(Runnable x) {
10 //非空校验
11 if (x == null)
12 throw new NullPointerException();
13 //强转类型
14 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>) x;
15 final ReentrantLock lock = this.lock;
16 //加锁
17 lock.lock();
18 try {
19 //获取以后的工作数量
20 int i = size;
21 //判断是否须要扩容(初始容量为16)
22 if (i >= queue.length)
23 grow();
24 //size+1
25 size = i + 1;
26 if (i == 0) {
27 //如果以后是第一个工作的话,就间接放在小顶堆的根节点地位处就行了(队列第一个地位)
28 queue[0] = e;
29 //同时设置一下以后节点的堆索引位为0
30 setIndex(e, 0);
31 } else {
32 //否则就用siftUp的形式来插入到应该插入的地位
33 siftUp(i, e);
34 }
35 //通过下面的插入过程之后,如果小顶堆的根节点还是以后新增加节点的话,阐明新增加节点的延迟时间是最短的
36 if (queue[0] == e) {
37 //那么此时不论有没有leader线程,都得将其置为null
38 leader = null;
39 /*
40 并且从新将条件队列上的一个节点转移到CLH队列中(如果以后只有一个节点的时候也会进入到signal办法中
41 但不妨,因为此时条件队列中还没有节点,所以并不会做什么)须要提一点的是:如果真的看过signal办法外部实现
42 的话就会晓得,signal办法在惯例状况下并不是在做唤醒线程的工作,唤醒是在上面的unlock办法中实现的
43 */
44 available.signal();
45 }
46 } finally {
47 /*
48 开释锁(留神,这里只会唤醒CLH队列中的head节点的下一个节点,可能是下面被锁住的增加工作的其余线程、
49 也可能是上次执行完工作后筹备再次拿取工作的线程,还有可能是期待被唤醒的follower线程,又或者有其余的
50 状况。但不论是哪个,只有能保障唤醒动作是始终能被流传上来的就行。ReentrantLock和阻塞队列的执行细节
51 详见我之前对AQS源码进行剖析的文章)
52 */
53 lock.unlock();
54 }
55 return true;
56 }
57
58 /**
59 * 第23行代码处:
60 */
61 private void grow() {
62 int oldCapacity = queue.length;
63 //能够看到这里的扩容策略是*1.5的形式
64 int newCapacity = oldCapacity + (oldCapacity >> 1);
65 //如果扩容后的新容量溢出了,就将其复原为int的最大值
66 if (newCapacity < 0)
67 newCapacity = Integer.MAX_VALUE;
68 //应用Arrays.copyOf(System.arraycopy)的形式来进行数组的拷贝
69 queue = Arrays.copyOf(queue, newCapacity);
70 }
71
72 /**
73 * 第30行、第99行和第109行代码处:
74 * 设置f节点在小顶堆中的索引位为idx,这样在最初的删除节点时能够通过index是否大于0来判断以后节点是否仍在小顶堆中
75 */
76 private void setIndex(RunnableScheduledFuture<?> f, int idx) {
77 if (f instanceof ScheduledFutureTask)
78 ((ScheduledFutureTask) f).heapIndex = idx;
79 }
80
81 /**
82 * 第33行代码处:
83 * 堆排序的精华就在于siftUp和siftDown办法,但本实现与惯例的实现略有不同,多了一个入参key
84 * key代表以后要插入节点中的工作
85 */
86 private void siftUp(int k, RunnableScheduledFuture<?> key) {
87 //当k<=0的时候阐明曾经上溯到根节点了
88 while (k > 0) {
89 //获取父节点的索引((以后节点索引位-1)/2的形式)
90 int parent = (k - 1) >>> 1;
91 //获取父节点的工作
92 RunnableScheduledFuture<?> e = queue[parent];
93 //如果以后要插入节点中的工作延迟时间大于父节点的延迟时间的话,就进行上溯过程,阐明找到了插入的地位
94 if (key.compareTo(e) >= 0)
95 break;
96 //否则就须要将父节点的内容赋值给以后节点
97 queue[k] = e;
98 //同时设置一下父节点的堆索引位为以后节点处
99 setIndex(e, k);
100 //而后将父节点赋值给以后节点,持续下一次的上溯过程
101 k = parent;
102 }
103 /*
104 走到这里阐明有两种状况:<1>曾经完结了上溯的过程,但最初一次的父节点还没有赋值,这里就是进行赋值的操作;
105 <2>如果本办法进来的时候要增加的最初一个节点自身就满足小顶堆条件的话,那么该处就是在给最初一个节点进行赋值
106 */
107 queue[k] = key;
108 //同时设置一下要插入节点的堆索引位
109 setIndex(key, k);
110 }
111
112 /**
113 * 第94行代码处:
114 */
115 public int compareTo(Delayed other) {
116 //如果比拟的就是以后对象,就间接返回0相等
117 if (other == this)
118 return 0;
119 if (other instanceof ScheduledFutureTask) {
120 //如果须要比拟的工作也是ScheduledFutureTask类型的话,就首先强转一下类型
121 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other;
122 //计算当前任务和须要比拟的工作之间的延迟时间差
123 long diff = time - x.time;
124 if (diff < 0)
125 //小于0阐明当前任务的延迟时间更短,就返回-1
126 return -1;
127 else if (diff > 0)
128 //大于0阐明须要比拟的工作的延迟时间更短,就返回1
129 return 1;
130 //如果两者相等的话,就比拟序列号,谁的序列号更小(序列号是惟一的),就应该先被执行
131 else if (sequenceNumber < x.sequenceNumber)
132 return -1;
133 else
134 return 1;
135 }
136 //如果须要比拟的工作不是ScheduledFutureTask类型的话,就通过getDelay的形式来进行比拟
137 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
138 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
139 }
7 拿取工作
在下面的ensurePrestart办法中会调用到addWorker办法,以此来补充工作线程。之前我对ThreadPoolExecutor源码进行剖析的文章中说到过,addWorker办法会调用到getTask办法来从队列中拿取工作:
1 /**
2 * ThreadPoolExecutor:
3 */
4 private Runnable getTask() {
5 //...
6 /*
7 这里的allowCoreThreadTimeOut默认为false(为true示意闲暇的外围线程也是要超时销毁的),
8 而下面说过定时线程池最多只有“外围线程数”个线程,所以timed为false
9 */
10 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
11 //...
12 //因为timed为false,所以这里会走take办法中的逻辑
13 Runnable r = timed ?
14 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
15 workQueue.take();
16 //...
17 }
18
19 /**
20 * ScheduledThreadPoolExecutor:
21 * 第15行代码处:
22 * 下面的take办法会调用到DelayedWorkQueue的take办法,而该办法也就是用来实现提早拿取工作的
23 */
24 public RunnableScheduledFuture<?> take() throws InterruptedException {
25 final ReentrantLock lock = this.lock;
26 //加锁(响应中断模式)
27 lock.lockInterruptibly();
28 try {
29 for (; ; ) {
30 //获取队头节点
31 RunnableScheduledFuture<?> first = queue[0];
32 if (first == null)
33 /*
34 如果以后提早队列中没有提早工作,就在这里阻塞以后线程(通过AQS中条件队列的形式),期待有工作时被唤醒
35 另外,当线程执行完工作后也会再次走到getTask办法中的本办法中。如果此时没工作了,就会在此被阻塞休眠住
36 (我在之前AQS源码剖析的文章中说过:await办法中会开释掉所有的ReentrantLock锁资源,而后才会被阻塞住)
37 */
38 available.await();
39 else {
40 //否则就获取队头的残余延迟时间
41 long delay = first.getDelay(NANOSECONDS);
42 //如果延迟时间曾经到了的话,就删除并返回队头,示意拿取到了工作
43 if (delay <= 0)
44 return finishPoll(first);
45 /*
46 这里将队头节点的援用置为null,如果不置为null的话,可能有多个期待着的线程同时持有着队头节点的
47 first援用,这样如果要删除队头节点的话,因为其还有其余线程的援用,所以不能被及时回收,造成内存透露
48 */
49 first = null;
50 /*
51 如果leader不为null,阐明有其余的线程曾经成为了leader线程,正在提早期待着
52 同时此时没有新的延迟时间最短的节点进入到提早队列中
53 */
54 if (leader != null)
55 /*
56 那么以后线程就变成了follower线程,须要被阻塞住,期待被唤醒(同上,其中会开释掉所有的锁资源)
57 线程执行完工作后也会再次走到本办法中拿取工作,如果走到这里发现曾经有别的leader线程了,
58 那么以后线程也会被阻塞休眠住;否则就会在上面的else分支中再次成为leader线程
59 */
60 available.await();
61 else {
62 /*
63 leader为null,可能是上一个leader线程拿取到工作后唤醒的下一个线程,也有可能
64 是一个新的延迟时间最短的节点进入到提早队列中,从而将leader置为null
65
66 此时获取以后线程
67 */
68 Thread thisThread = Thread.currentThread();
69 //并将leader置为以后线程,也就是以后线程成为了leader线程
70 leader = thisThread;
71 try {
72 /*
73 这里也就是在做具体的延时期待delay纳秒的操作了,具体波及到AQS中条件队列的相干操作
74 如果被唤醒的话可能是因为达到了延迟时间从而醒来;也有可能是被别的线程signal唤醒了;
75 还有可能是中断被唤醒。失常状况下是等达到了延迟时间后,这里会醒来并进入到下一次循环中的
76 finishPoll办法中,剔除队头节点并最终返回(awaitNanos办法和await办法相似,其中会开释掉
77 所有的锁资源;不一样的是在被唤醒时会把以后节点从条件队列中“转移”到CLH队列中。这里能够认为
78 是转移,因为在条件队列中的该节点状态曾经改为了0,相当于是个垃圾节点,后续会进行删除)
79 */
80 available.awaitNanos(delay);
81 } finally {
82 /*
83 不论awaitNanos是如何被唤醒的,此时会判断以后的leader线程是否还是以后线程
84 如果是的话就将leader置为null,也就是以后线程不再是leader线程了
85 */
86 if (leader == thisThread)
87 leader = null;
88 }
89 }
90 }
91 }
92 } finally {
93 //在退出本办法之前,判断如果leader线程为null并且删除队头后的提早队列依然不为空的话(阐明此时有其余的提早工作)
94 if (leader == null && queue[0] != null)
95 //就将条件队列上的一个节点转移到CLH队列中(同时会剔除下面的垃圾条件节点)
96 available.signal();
97 /*
98 开释锁(同offer办法中的逻辑,这里只会唤醒CLH队列中的head节点的下一个节点。这里就体现了
99 Leader-Follower模式:当leader线程拿取到工作后筹备要执行时,会首先唤醒剩下线程中的一个,
100 它将会成为新的leader线程,并以此往返。保障在任何工夫都只有一个leader线程,防止不必要的唤醒与睡眠)
101 */
102 lock.unlock();
103 }
104 }
105
106 /**
107 * 第44行代码处:
108 */
109 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
110 //size-1
111 int s = --size;
112 //获取队列中的最初一个节点
113 RunnableScheduledFuture<?> x = queue[s];
114 //并置空它,便于GC,这里也就是在删除最初一个节点
115 queue[s] = null;
116 //如果删除前提早队列中有不止一个节点的话,就进入到siftDown办法中,将小顶堆中的根节点删除,并且从新保护小顶堆
117 if (s != 0)
118 siftDown(0, x);
119 //同时设置一下删除前的根节点的堆索引位为-1,示意其不存在于小顶堆中了
120 setIndex(f, -1);
121 //最初将其返回进来
122 return f;
123 }
124
125 /**
126 * 第118行代码处:
127 * 办法参数中的key代表删除的最初一个节点中的工作
128 */
129 private void siftDown(int k, RunnableScheduledFuture<?> key) {
130 /*
131 这里会取数组长度的一半half(留神这里的size是曾经删除最初一个节点后的size),
132 而half也就是在指向最初一个非叶子节点的下一个节点
133 */
134 int half = size >>> 1;
135 //从这里能够看出下溯的终止条件是k大于等于half,也就是此时遍历到曾经没有了非叶子节点,天然不须要进行调整
136 while (k < half) {
137 //获取左孩子节点的索引位
138 int child = (k << 1) + 1;
139 //获取左孩子节点的工作
140 RunnableScheduledFuture<?> c = queue[child];
141 //获取右孩子节点的索引位
142 int right = child + 1;
143 //如果右孩子节点的索引位小于size,也就是在说以后节点含有右子树。并且左孩子节点的工作延迟时间大于右孩子节点的话
144 if (right < size && c.compareTo(queue[right]) > 0)
145 //就将c从新指向为右孩子节点
146 c = queue[child = right];
147 /*
148 走到这里阐明c指向的是左右子节点中、工作延迟时间较小的那个节点。此时判断如果最初一个节点的
149 工作延迟时间小于等于这个较小节点的话,就能够进行下溯了,阐明找到了插入的地位
150 */
151 if (key.compareTo(c) <= 0)
152 break;
153 //否则就把较小的那个节点赋值给以后节点处
154 queue[k] = c;
155 //同时设置一下延迟时间较小的那个节点的堆索引位为以后节点处
156 setIndex(c, k);
157 //而后将以后节点指向那个较小的节点,持续下一次循环
158 k = child;
159 }
160 /*
161 同siftUp办法一样,走到这里阐明有两种状况:<1>曾经完结了下溯的过程,但最初一次的子节点还没有赋值,
162 这里会把其赋值为之前删除的最初一个节点;
163 <2>如果根节点的左右子节点中、工作延迟时间较小的那个节点自身的延迟时间就比之前删除节点大的话,
164 就会把根节点替换为之前删除的最初一个节点
165 所以本办法加上finishPoll办法,实际上并没有将最初一个节点删除,最初一个节点中的工作始终都是保留着的
166 (也就是key),而是变相地将堆的根节点删除了(在第一种状况中根节点在第一次赋值为左右子节点中、
167 工作延迟时间较小的那个节点时,就曾经被笼罩了)
168 */
169 queue[k] = key;
170 //同时设置一下最初一个节点当初新的堆索引位
171 setIndex(key, k);
172 }
8 执行提早工作
拿取到工作之后,就是具体的执行工作了。addWorker办法具体的执行逻辑我在之前ThreadPoolExecutor的源码剖析文章中曾经讲过了,其中执行工作的时候会调用task的run办法,也就是这里包装为ScheduledFutureTask的run办法:
1 /**
2 * ScheduledThreadPoolExecutor:
3 */
4 public void run() {
5 //判断是否是周期性工作
6 boolean periodic = isPeriodic();
7 if (!canRunInCurrentRunState(periodic)) {
8 //如果此时不能继续执行工作的话,就尝试勾销此工作的执行
9 cancel(false);
10 } else if (!periodic)
11 /*
12 如果是提早工作,就调用ScheduledFutureTask父类FutureTask的run办法,
13 其中会通过call办法来最终调用到使用者具体写的工作
14 */
15 ScheduledFutureTask.super.run();
16 else if (ScheduledFutureTask.super.runAndReset()) {
17 //周期性工作的执行放在下一节中进行剖析
18 setNextRunTime();
19 reExecutePeriodic(outerTask);
20 }
21 }
9 scheduleAtFixedRate & scheduleWithFixedDelay办法
scheduleAtFixedRate办法是以上次的延迟时间点开始,提早指定工夫后再次执行当前任务;而scheduleWithFixedDelay办法是以上个周期工作执行结束后的工夫点开始,提早指定工夫后再次执行当前任务。因为这两个办法的实现绝大部分都是一样的,所以合在一起来进行剖析:
1 /**
2 * ScheduledThreadPoolExecutor:
3 * scheduleAtFixedRate办法
4 */
5 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
6 long initialDelay,
7 long period,
8 TimeUnit unit) {
9 //非空校验
10 if (command == null || unit == null)
11 throw new NullPointerException();
12 //非负校验
13 if (period <= 0)
14 throw new IllegalArgumentException();
15 //包装工作
16 ScheduledFutureTask<Void> sft =
17 new ScheduledFutureTask<Void>(command,
18 null,
19 triggerTime(initialDelay, unit),
20 unit.toNanos(period));
21 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
22 //把工作赋值给ScheduledFutureTask的outerTask属性
23 sft.outerTask = t;
24 //提早执行
25 delayedExecute(t);
26 return t;
27 }
28
29 /**
30 * scheduleWithFixedDelay办法
31 */
32 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
33 long initialDelay,
34 long delay,
35 TimeUnit unit) {
36 //非空校验
37 if (command == null || unit == null)
38 throw new NullPointerException();
39 //非负校验
40 if (delay <= 0)
41 throw new IllegalArgumentException();
42 //包装工作
43 ScheduledFutureTask<Void> sft =
44 new ScheduledFutureTask<Void>(command,
45 null,
46 triggerTime(initialDelay, unit),
47 unit.toNanos(-delay));
48 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
49 //把工作赋值给ScheduledFutureTask的outerTask属性
50 sft.outerTask = t;
51 //提早执行
52 delayedExecute(t);
53 return t;
54 }
55
56 /**
57 * 第17行和第44行代码处:
58 */
59 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
60 super(r, result);
61 this.time = ns;
62 /*
63 能够看到这里与schedule办法中调用ScheduledFutureTask结构器的区别是多了一个period入参
64 在schedule办法中this.period赋值为0,而这里会赋值为周期时间。其余的代码都是一样的
65 如果仔细的话能够看出:在下面scheduleAtFixedRate办法传入的period是一个大于0的数,而
66 scheduleWithFixedDelay办法传入的period是一个小于0的数,以此来进行辨别
67 */
68 this.period = period;
69 this.sequenceNumber = sequencer.getAndIncrement();
70 }
10 执行周期性工作
周期性工作和提早工作的拿取工作逻辑都是一样的,而在上面具体运行工作时有所不同,上面就来看一下其实现的差别:
1 /**
2 * ScheduledThreadPoolExecutor:
3 */
4 public void run() {
5 boolean periodic = isPeriodic();
6 if (!canRunInCurrentRunState(periodic))
7 cancel(false);
8 else if (!periodic)
9 ScheduledFutureTask.super.run();
10 /*
11 后面都是之前剖析过的,而周期性工作会走上面的分支中
12
13 FutureTask的runAndReset办法相比于run办法来说,区别在于能够反复计算(run办法不能复用)
14 因为runAndReset办法在计算实现后不会批改状态,状态始终都是NEW
15 */
16 else if (ScheduledFutureTask.super.runAndReset()) {
17 //设置下次的运行工夫点
18 setNextRunTime();
19 //从新增加工作
20 reExecutePeriodic(outerTask);
21 }
22 }
23
24 /**
25 * 第18行代码处:
26 */
27 private void setNextRunTime() {
28 /*
29 这里会获取period,也就是之前设置的周期时间。下面说过,通过period的正负就能够辨别出到底调用的是
30 scheduleAtFixedRate办法还是scheduleWithFixedDelay办法
31 */
32 long p = period;
33 if (p > 0)
34 /*
35 如果调用的是scheduleAtFixedRate办法,下一次的周期工作工夫点就是起始的延迟时间加上周期时间,须要留神的是:
36 如果工作执行的工夫大于周期时间period的话,那么定时线程池就不会依照原先设计的延迟时间进行执行,而是会依照近似于
37 工作执行的工夫来作为提早的距离(不论外围线程有多少个都是如此,因为工作是放在提早队列中的、是线性执行的)
38 */
39 time += p;
40 else
41 /*
42 triggerTime办法之前剖析过是获取以后工夫+延迟时间后的后果,而此时是在执行完工作后,也就是说:
43 如果调用的是scheduleWithFixedDelay办法,下一次的周期工作工夫点就是执行完上次工作后的工夫点加上周期时间
44 由此能够看出,scheduleAtFixedRate办法和scheduleWithFixedDelay办法的区别就在于下一次time设置的不同而已
45 */
46 time = triggerTime(-p);
47 //time属性会记录到节点中,在小顶堆中通过compareTo办法来进行排序
48 }
49
50 /**
51 * 第20行代码处:
52 */
53 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
54 //判断此时是否还能继续执行工作
55 if (canRunInCurrentRunState(true)) {
56 /*
57 这里也就是从新往提早队列中增加工作,以此达到周期执行的成果。增加之后在getTask办法中的take办法中
58 就又能够拿到这个工作。设置下次的执行工夫,而后再增加工作...周而复始
59 */
60 super.getQueue().add(task);
61 //增加后持续判断此时是否还能继续执行工作,如果不能的话就删除下面增加的工作
62 if (!canRunInCurrentRunState(true) && remove(task))
63 //同时会勾销此工作的执行
64 task.cancel(false);
65 else
66 //否则,阐明线程池是能够继续执行工作的,就去判断此时是否须要补充工作线程
67 ensurePrestart();
68 }
69 }
留神:网上的一种说法是:scheduleAtFixedRate办法是以上一个工作开始的工夫计时,period工夫过来后,检测上一个工作是否执行结束。如果上一个工作执行结束,则当前任务立刻执行;如果上一个工作没有执行结束,则须要等上一个工作执行结束后立刻执行。实际上这种说法是谬误的,只管它的表象是对的。正确的说法是:如果工作的执行工夫小于周期时间的话,则会以上次工作执行开始工夫加上周期时间后,再去执行下一次工作;而如果工作的执行工夫大于周期时间的话,则会等到上次工作执行结束后立刻(近似于)执行下次工作。这两种说法的区别就在于工作的执行工夫大于周期时间的时候,检测上一个工作是否结束的机会不同。实际上在period工夫过来后,基本不会有任何的检测机制。因为只有等上次工作执行结束后才会往提早队列中增加下一次工作,从而触发各种后续的动作。所以在period工夫点时,以后线程还在执行工作中,而其余的线程因为提早队列中为空会处于休眠的状态(如果就只有一个周期工作的话)。所以基本不会有所谓的“检测”的说法,这种说法也只能说是想当然了。还是那句话:“Talk is cheap. Show me the code.”
既然都说到这里了,那么当初就想来尝试剖析一下如果工作的执行工夫大于周期时间的话,具体是怎么的一个执行流程?
为了便于剖析,假如当初是只有一个周期工作的场景,那么提早队列中的工作数量最多就只会有1个:拿取到工作,提早队列中就变为空。执行完工作的时候,就又会往队列中放一个工作。这样其余抢不到工作的线程就会被休眠住。而增加工作的时候因为每次从新增加的工作都是小顶堆的根节点(从无到有),即增加的这个工作就是此时延迟时间最短的工作,所以同时会触发尝试唤醒线程的动作。
同时在增加下一个工作前会批改下一次的工夫点。在setNextRunTime办法中,scheduleAtFixedRate办法是以上一次的延迟时间点加上周期时间来作为下一次的延迟时间点的,并不是scheduleWithFixedDelay办法获取以后工夫加上周期时间的形式。在以后这种状况下周期时间是要小于工作的执行工夫的,也就是说会造成下一次的延迟时间点会赋值为一个曾经过期的工夫。且随着周期的减少,下一次的延迟时间点会离以后工夫点越来越远。既然下一次的延迟时间点曾经过期了,那么就会去立马执行工作。
所以总结一下:须要被唤醒的线程和上次执行完工作的线程就会去争抢锁资源(唤醒线程会把以后节点放进CLH队列中,上次执行完工作的线程也会再次走到lockInterruptibly办法中(在它从新放工作的时候也会经验一次lock),同时因为是ReentrantLock非偏心锁,这样在调用unlock解锁时就会呈现在CLH队列上的抢资源景象了),抢到的就会立马去执行下一次的周期工作,而不会有任何的延时,造成的表象就是会以一个近似于工作执行工夫为距离的周期来执行工作。
11 shutdown办法
1 /**
2 * ScheduledThreadPoolExecutor:
3 * 能够看到,定时线程池的shutdown办法是应用的父类ThreadPoolExecutor的shutdown办法,
4 * 而该办法我在之前的ThreadPoolExecutor的源码剖析文章中曾经剖析过了。然而其中会调用
5 * onShutdown的钩子办法,也就是在ScheduledThreadPoolExecutor中的实现
6 */
7 public void shutdown() {
8 super.shutdown();
9 }
10
11 @Override
12 void onShutdown() {
13 //获取提早队列
14 BlockingQueue<Runnable> q = super.getQueue();
15 //敞开线程池时判断是否须要继续执行提早工作
16 boolean keepDelayed =
17 getExecuteExistingDelayedTasksAfterShutdownPolicy();
18 //敞开线程池时判断是否须要继续执行周期性工作
19 boolean keepPeriodic =
20 getContinueExistingPeriodicTasksAfterShutdownPolicy();
21 if (!keepDelayed && !keepPeriodic) {
22 //如果都不需要的话,就将提早队列中的工作一一勾销(并删除)
23 for (Object e : q.toArray())
24 if (e instanceof RunnableScheduledFuture<?>)
25 ((RunnableScheduledFuture<?>) e).cancel(false);
26 //最初做清理工作
27 q.clear();
28 } else {
29 for (Object e : q.toArray()) {
30 if (e instanceof RunnableScheduledFuture) {
31 //否则就判断如果工作是RunnableScheduledFuture类型的,就强转一下类型
32 RunnableScheduledFuture<?> t =
33 (RunnableScheduledFuture<?>) e;
34 //如果敞开线程池时不须要继续执行工作,又或者须要继续执行然而工作曾经勾销了
35 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
36 t.isCancelled()) {
37 //就删除以后节点
38 if (q.remove(t))
39 //同时勾销工作
40 t.cancel(false);
41 }
42 }
43 }
44 }
45 //依据线程池状态来判断是否应该完结线程池
46 tryTerminate();
47 }
48
49 /**
50 * 第27行代码处:
51 */
52 public void clear() {
53 final ReentrantLock lock = this.lock;
54 //加锁
55 lock.lock();
56 try {
57 for (int i = 0; i < size; i++) {
58 //遍历取得提早队列中的每一个节点
59 RunnableScheduledFuture<?> t = queue[i];
60 if (t != null) {
61 //将节点置为null
62 queue[i] = null;
63 //同时将索引地位为-1(recheck)
64 setIndex(t, -1);
65 }
66 }
67 //size赋为初始值0
68 size = 0;
69 } finally {
70 //开释锁
71 lock.unlock();
72 }
73 }
更多内容请关注微信公众号:奇客工夫
发表回复