Java版本:8u261。
对于Java中的线程池,面试问的最多的就是线程池中各个参数的含意,又或者是线程池执行的流程,彷佛这已成为了固定的模式与套路。然而如果我是面试官,当初我想问一些更粗疏的问题,你还能答得上来吗?比方:
- 线程池是如何实现线程复用的?
- 如果一个线程执行工作的时候抛出异样,那么这个工作是否会被抛弃?
- 以后线程池中有十个线程,其中一个线程正在执行工作,那么剩下的九个线程正在处于一种什么状态呢?
置信如果没有看过线程池的相干源码实现,这些问题是很难答复得完满的。同时这些问题往深了问还会引出Java中阻塞队列以及AQS的实现,你都能接得住吗?
1 简介
因为线程是稀缺资源,如果在高并发的状况下被无限度地创立和销毁,不仅会耗费系统资源,还会升高零碎的稳定性。所以线程池的呈现就是为了解决这些问题的。线程池通过重用曾经存在的线程资源,缩小线程创立和销毁的次数,进步了性能。同时还能够进行对立的调配、调优和监控。
在Java中,能够通过Executors类中的newFixedThreadPool、newCachedThreadPool,newScheduledThreadPool或者其余形式来创立各种线程池,它们都会间接或间接地通过ThreadPoolExecutor来进行构建,通过传入不同的参数来实现不同成果的线程池(newScheduledThreadPool比拟非凡,它重写了局部ThreadPoolExecutor的逻辑,后续我会写一篇对ScheduledThreadPoolExecutor进行源码剖析的文章)。
1.1 线程池参数
在ThreadPoolExecutor中共有七个参数:
- corePoolSize:外围线程数,外围线程会始终存活,即便没有工作须要执行(除非allowCoreThreadTimeOut参数设置为true,这样的话即便是外围线程也会被超时销毁);
- maximumPoolSize:线程池中容许的最大线程数;
- keepAliveTime:保护工作线程所容许的闲暇工夫,如果工作线程期待的工夫超过了keepAliveTime,则会被销毁;
- unit:指定keepAliveTime的单位,如TimeUnit.SECONDS;
- workQueue:用来保留期待被执行工作的阻塞队列。罕用的有:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue和PriorityBlockingQueue等;
- threadFactory:线程工厂,提供创立新线程的性能。默认的实现是Executors.defaultThreadFactory(),即通过new Thread的形式;
- handler:如果以后阻塞队列已满,并且以后的线程数量已超过了最大线程数,则会执行相应的回绝策略。具体有四种(也能够本人实现):
-
- AbortPolicy:默认实现,会间接抛出RejectedExecutionException;
- CallerRunsPolicy:用调用者所在的线程来执行工作;
- DiscardPolicy:间接摈弃,工作不执行;
- DiscardOldestPolicy:抛弃阻塞队列中最靠前的工作,并执行当前任务。
这四种回绝策略的实现很简略,这里就不再过多展现阐明了,读者可自行查看。
1.2 运行过程
ThreadPoolExecutor的大抵运行过程如下:
如果应用的是有界阻塞队列:
有新的工作须要执行,并且以后线程池的线程数小于外围线程数,则创立一个外围线程来执行。如果以后线程数大于外围线程数,则会将除了外围线程解决的工作之外剩下的工作退出到阻塞队列中期待执行。如果队列已满,则在以后线程数不大于最大线程数的前提下,创立新的非核心线程,处理完毕后等达到keepAliveTime闲暇工夫后会被间接销毁(留神,不肯定销毁的就是这些非核心线程,外围线程也可能被销毁,只有减到残余线程数达到外围线程数就行。外围线程和非核心线程的区别仅在于判断是否达到阈值时有区别:外围线程判断的是外围线程数,而非核心线程判断的是最大线程数。仅此一个区别。前面讲源码时会再强调这一点)。如果以后线程数大于最大线程数,则会执行相应的回绝策略。
如果应用的是无界阻塞队列:
与有界阻塞队列相比,除非系统资源耗尽,否则无界的阻塞队列不存在工作入队失败的状况。当有新工作到来,零碎的线程数小于外围线程数时,则创立一个外围线程来执行。当达到外围线程数后,就不会持续减少。若后续仍有新的工作退出,而没有闲暇的线程资源,则工作间接进入阻塞队列中进行期待。如果工作创立和解决工作的速度差别很大,无界阻塞队列会放弃快速增长,直到耗尽零碎内存。
1.3 线程池状态
在ThreadPoolExecutor中存在五种状态:
- RUNNING:初始状态,在此状态下可能接管新工作,以及对曾经增加的工作进行解决;
- SHUTDOWN:通过调用shutdown办法,线程池转成SHUTDOWN状态。此时不再接管新工作,然而能解决曾经增加的工作;
- STOP:通过调用shutdownNow办法,线程池转成STOP状态。此时不再接管新工作,不解决曾经增加的工作,并且会中断正在解决的工作;
- TIDYING:当线程池中所有的工作曾经终止了,工作数量为0并且阻塞队列为空的时候,会进入到TIDYING状态。此时会调用一个钩子办法terminated,它是一个空的实现,能够供调用者覆写;
- TREMINATED:线程池彻底终止的状态。当线程池处于TIDYING状态时,执行完terminated办法后,就会进入到该状态。
在ThreadPoolExecutor中状态是通过ctl属性中的高3位来示意的:
1 //ctl中蕴含两局部信息:高3位示意运行状态,低29位保留工作线程数量,初始状态是RUNNING
2 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
3 //29
4 private static final int COUNT_BITS = Integer.SIZE - 3;
5 //1左移29位后-1,也就是29个1。用来示意工作线程数量的最大值
6 private static final int CAPACITY = (1 << COUNT_BITS) - 1;
7
8 //ctl高3位为111(低29位都为0)
9 private static final int RUNNING = -1 << COUNT_BITS;
10 //ctl高3位为000(低29位都为0)
11 private static final int SHUTDOWN = 0 << COUNT_BITS;
12 //ctl高3位为001(低29位都为0)
13 private static final int STOP = 1 << COUNT_BITS;
14 //ctl高3位为010(低29位都为0)
15 private static final int TIDYING = 2 << COUNT_BITS;
16 //ctl高3位为011(低29位都为0)
17 private static final int TERMINATED = 3 << COUNT_BITS;
18
19 //获取ctl的高3位(低29位都为0)也就是获取运行状态
20 private static int runStateOf(int c) {
21 return c & ~CAPACITY;
22 }
23
24 //获取ctl的低29位(高3位都为0)也就是获取工作线程数量
25 private static int workerCountOf(int c) {
26 return c & CAPACITY;
27 }
28
29 //用来获取运行状态和工作线程数量拼接起来的值
30 private static int ctlOf(int rs, int wc) {
31 return rs | wc;
32 }
33
34 //判断ctl是否小于s所代表状态的值
35 private static boolean runStateLessThan(int c, int s) {
36 return c < s;
37 }
38
39 //判断ctl是否大于等于s所代表状态的值
40 private static boolean runStateAtLeast(int c, int s) {
41 return c >= s;
42 }
43
44 //判断ctl此时是否是RUNNING状态
45 private static boolean isRunning(int c) {
46 return c < SHUTDOWN;
47 }
1.4 Worker
Worker是ThreadPoolExecutor中的一个外部类,用来封装工作线程:
1 private final class Worker
2 extends AbstractQueuedSynchronizer
3 implements Runnable {
4 //...
5
6 //正在运行Worker的线程
7 final Thread thread;
8 //传入的工作
9 Runnable firstTask;
10 //本Worker已实现的工作数,用于后续的统计与监控
11 volatile long completedTasks;
12
13 Worker(Runnable firstTask) {
14 /*
15 这里设置AQS的state初始为-1是为了将线程产生中断的动作提早到工作真正开始运行的时候,换句话说就是
16 禁止在执行工作前对线程进行中断。在调用一些像shutdown和shutdownNow等办法中会去中断线程,而在
17 中断前会调用tryLock办法尝试加锁。而这里设置为-1后,tryLock办法就会返回为false,所以就不能中断了
18 */
19 setState(-1);
20 this.firstTask = firstTask;
21 this.thread = getThreadFactory().newThread(this);
22 }
23
24 //因为Worker类实现了Runnable接口,所以当调用thread.start办法时最终会调用到此处运行
25 public void run() {
26 runWorker(this);
27 }
28
29 //...
30 }
由上能够看到Worker继承了AQS(我之前写过对AQS、ReentrantLock和阻塞队列进行源码剖析的文章,感兴趣的能够查看AQS源码深入分析之独占模式-ReentrantLock锁个性详解、AQS源码深入分析之条件队列-你晓得Java中的阻塞队列是如何实现的吗?),并实现了Runnable接口。之后在剖析源码时将会看到:在运行Worker的时候之所以没有用ReentrantLock作为独占锁来应用是因为这里是要求不可重入的,而ReentrantLock是可重入锁。在像一些setCorePoolSize办法去手动更改外围线程数时,如果批改的值比本来的小,那么多余的线程会被中断、会中断正在运行着的的线程。所以应用本人实现的不可重入独占锁而不是应用ReentrantLock就是为了不想让像setCorePoolSize这样的办法来从新获取到锁资源,不想让正在运行的线程产生自我中断。其实下面所说的内容在Worker类的正文中都曾经解释了:
2 结构器
1 /**
2 * ThreadPoolExecutor:
3 * 全参数结构器,其余结构器最终都会调用到这里
4 */
5 public ThreadPoolExecutor(int corePoolSize,
6 int maximumPoolSize,
7 long keepAliveTime,
8 TimeUnit unit,
9 BlockingQueue<Runnable> workQueue,
10 ThreadFactory threadFactory,
11 RejectedExecutionHandler handler) {
12 //非法参数校验
13 if (corePoolSize < 0 ||
14 maximumPoolSize <= 0 ||
15 maximumPoolSize < corePoolSize ||
16 keepAliveTime < 0)
17 throw new IllegalArgumentException();
18 //非空校验
19 if (workQueue == null || threadFactory == null || handler == null)
20 throw new NullPointerException();
21 //如果平安管理器不为空,就进行权限拜访(本文不开展剖析)
22 this.acc = System.getSecurityManager() == null ?
23 null :
24 AccessController.getContext();
25 this.corePoolSize = corePoolSize;
26 this.maximumPoolSize = maximumPoolSize;
27 this.workQueue = workQueue;
28 //将keepAliveTime转换成纳秒
29 this.keepAliveTime = unit.toNanos(keepAliveTime);
30 this.threadFactory = threadFactory;
31 this.handler = handler;
32 }
3 execute办法
1 /**
2 * ThreadPoolExecutor:
3 */
4 public void execute(Runnable command) {
5 //非空校验
6 if (command == null)
7 throw new NullPointerException();
8 int c = ctl.get();
9 //如果以后线程数小于外围线程数的话,就间接创立一个外围线程
10 if (workerCountOf(c) < corePoolSize) {
11 if (addWorker(command, true))
12 return;
13 /*
14 增加失败(可能是线程池状态是SHUTDOWN或以上的状态(SHUTDOWN状态下不再接管
15 新工作),也可能是线程数超过阈值了),就从新获取一下ctl的值,走上面的逻辑
16 */
17 c = ctl.get();
18 }
19 /*
20 走到这里阐明以后线程数大于等于外围线程数,又或者是下面增加外围线程失败中解释的状况
21 此时就判断一下以后线程池是否是RUNNING状态,如果是的话就往阻塞队列入队
22 这里offer跟put的区别是如果队列已满,offer不会被阻塞,而是立刻返回false
23 */
24 if (isRunning(c) && workQueue.offer(command)) {
25 int recheck = ctl.get();
26 /*
27 这里会再次查看一次以后线程池是否是RUNNING状态,可能此时线程池曾经shutdown了
28 如果不是RUNNING状态,就删除下面入队的工作,并执行相应的回绝策略
29 */
30 if (!isRunning(recheck) && remove(command))
31 reject(command);
32 /*
33 此时还会去判断一下是否以后的工作线程数曾经为0了(可能这些线程在上次workerCountOf
34 查看后(第10行代码处)被销毁了(allowCoreThreadTimeOut设置为true)),如果是
35 的话就新创建一个空工作的非核心线程。留神,这里传进addWorker办法的是空工作,因为工作
36 曾经在阻塞队列中存在了,所以这个Worker执行的时候,会间接从阻塞队列中取出工作来执行
37 所以说这里的意义也就是要保障线程池在RUNNING状态下必须要有一个线程来执行工作
38 */
39 else if (workerCountOf(recheck) == 0)
40 addWorker(null, false);
41 } else if (!addWorker(command, false))
42 /*
43 走到这里阐明线程池不是RUNNING状态,或者阻塞队列已满,此时创立一个非核心线程去执行
44 如果创立失败,阐明线程池的状态曾经不是RUNNING了,又或者以后线程数曾经大于等于最大线程数了
45 那么就执行相应的回绝策略
46 */
47 reject(command);
48 }
49
50 /**
51 * 第30行代码处:
52 */
53 public boolean remove(Runnable task) {
54 //阻塞队列中删除这个工作
55 boolean removed = workQueue.remove(task);
56 //依据线程池状态来判断是否应该完结线程池
57 tryTerminate();
58 return removed;
59 }
60
61 /**
62 * 第31行和第47行代码处:
63 */
64 final void reject(Runnable command) {
65 //依据是哪种回绝策略,来具体执行其中的逻辑(具体的四种回绝策略的代码这里就不再看了,都是很简略的)
66 handler.rejectedExecution(command, this);
67 }
4 addWorker办法
在下面增加工作时会调用到addWorker办法:
1 /**
2 * ThreadPoolExecutor:
3 */
4 private boolean addWorker(Runnable firstTask, boolean core) {
5 retry:
6 for (; ; ) {
7 int c = ctl.get();
8 //获取以后线程池的运行状态
9 int rs = runStateOf(c);
10
11 /*
12 如果以后线程池状态大于SHUTDOWN,就间接返回false,示意不再增加新的Worker
13 如果以后线程池的状态是SHUTDOWN(此时不再接管新的工作,然而还是会持续解决
14 阻塞队列中的工作),然而firstTask不为null(相当于新的工作)或者阻塞队列为空
15 (为空阐明也没有必要去创立Worker了)的话,也间接返回false,不再增加新的Worker
16 */
17 if (rs >= SHUTDOWN &&
18 !(rs == SHUTDOWN &&
19 firstTask == null &&
20 !workQueue.isEmpty()))
21 return false;
22
23 for (; ; ) {
24 //从新获取以后线程池的工作线程数
25 int wc = workerCountOf(c);
26 /*
27 <1>如果以后线程数大于等于最大值;
28 <2.1>如果是外围线程,以后线程数大于等于外围线程数;
29 <2.2>如果是非核心线程,以后线程数大于等于最大线程数
30 以上两个条件任意一个满足,就阐明以后线程数曾经达到阈值了,
31 也间接返回false,不再增加新的工作
32 */
33 if (wc >= CAPACITY ||
34 wc >= (core ? corePoolSize : maximumPoolSize))
35 return false;
36 /*
37 CAS尝试对ctl+1,也就是工作线程数量+1。如果胜利了,就跳出死循环,
38 从第58行代码处持续往下执行
39 */
40 if (compareAndIncrementWorkerCount(c))
41 break retry;
42 //如果CAS+1失败了,从新读此时ctl的最新值
43 c = ctl.get();
44 /*
45 如果发现此时的运行状态和之前刚进入该办法时的运行状态不相等,
46 阐明在此期间产生了状态的扭转,那么就从头开始重试
47 */
48 if (runStateOf(c) != rs)
49 continue retry;
50 /*
51 走到这里阐明状态没有产生扭转,然而之前ctl+1的CAS操作失败了,那么从新从第25
52 行代码处持续往下执行
53 */
54 }
55 }
56
57 //下面的死循环次要是为了对ctl做+1的操作,而上面是为了创立Worker
58 boolean workerStarted = false;
59 boolean workerAdded = false;
60 Worker w = null;
61 try {
62 //依据firstTask来创立一个Worker(如下面所说,AQS中的state初始值为-1,避免被中断)
63 w = new Worker(firstTask);
64 //每一个Worker都会创立一个Thread
65 final Thread t = w.thread;
66 if (t != null) {
67 final ReentrantLock mainLock = this.mainLock;
68 //上锁
69 mainLock.lock();
70 try {
71 //从新获取以后线程池的运行状态
72 int rs = runStateOf(ctl.get());
73
74 /*
75 如果线程池以后状态是RUNNING状态,或者是SHUTDOWN状态并且firstTask
76 为空(意味着不去解决新工作而是去解决阻塞队列中的工作),能力将创立的
77 新Worker增加到workers汇合中
78 */
79 if (rs < SHUTDOWN ||
80 (rs == SHUTDOWN && firstTask == null)) {
81 /*
82 此时线程还没有start,然而isAlive办法返回true,阐明这个线程是有问题的,
83 间接抛出异样
84 */
85 if (t.isAlive())
86 throw new IllegalThreadStateException();
87 //在workers汇合(HashSet,因为曾经加锁了,所以HashSet就行)外面增加本Worker
88 workers.add(w);
89 int s = workers.size();
90 /*
91 如果以后线程池中线程数量超过了largestPoolSize,就更新一下largestPoolSize为
92 以后线程数量,即largestPoolSize中保留着线程池中呈现过的最大线程数,用于统计监控
93 */
94 if (s > largestPoolSize)
95 largestPoolSize = s;
96 //创立Worker胜利
97 workerAdded = true;
98 }
99 } finally {
100 //开释锁
101 mainLock.unlock();
102 }
103 if (workerAdded) {
104 //如果下面workers汇合增加Worker胜利,就用Worker中的thread来启动线程
105 t.start();
106 workerStarted = true;
107 }
108 }
109 } finally {
110 if (!workerStarted)
111 //如果没增加胜利,就执行失败解决
112 addWorkerFailed(w);
113 }
114 return workerStarted;
115 }
116
117 private void addWorkerFailed(Worker w) {
118 final ReentrantLock mainLock = this.mainLock;
119 //上锁
120 mainLock.lock();
121 try {
122 //如果之前创立Worker胜利了,就从workers汇合中删除它
123 if (w != null)
124 workers.remove(w);
125 //将ctl-1,外面应用了死循环确保CAS操作肯定胜利
126 decrementWorkerCount();
127 //依据线程池状态来判断是否应该完结线程池
128 tryTerminate();
129 } finally {
130 //开释锁
131 mainLock.unlock();
132 }
133 }
5 runWorker办法
因为Worker类实现了Runnable接口,所以当调用thread.start办法时最终会调用到Worker的run办法处:
1 /**
2 * ThreadPoolExecutor:
3 * 当调用t.start()办法时最终会调用到此处
4 */
5 public void run() {
6 runWorker(this);
7 }
8
9 final void runWorker(Worker w) {
10 //获取以后线程(以后线程也就是在Worker中的thread)
11 Thread wt = Thread.currentThread();
12 Runnable task = w.firstTask;
13 //把Worker中的firstTask清空,因为上面要执行它了
14 w.firstTask = null;
15 /*
16 因为之前创立Worker的时候将AQS的state初始为-1,是为了避免线程被中断
17 而这里unlock办法是把state重置为0,意思就是曾经进入到runWorker办法
18 中,能够容许中断了
19 */
20 w.unlock();
21 boolean completedAbruptly = true;
22 try {
23 //如果task不为空,或者从阻塞队列中拿取到工作了
24 while (task != null || (task = getTask()) != null) {
25 /*
26 上锁(留神,这里是用Worker而不是ReentrantLock来加锁的,为了确保
27 以下的代码不会被同一线程所重入,同时能够做到不同线程能够并发执行)
28 */
29 w.lock();
30 /*
31 如果以后线程池状态大于等于STOP,确保以后线程也是须要中断的(因为这个时候要
32 完结线程池了,不能再增加新的线程);否则如果在下面这个判断不满足之后调用了shutdownNow
33 办法的时候(留神,shutdownNow办法是ReentrantLock上锁,而代码走到
34 这里是以后Worker上锁,两者上的不是同一个锁,所以能够并发执行),
35 之前的状态要么是RUNNING要么是SHUTDOWN,在走完第一个runStateAtLeast
36 判断条件发现不满足后,当初执行了shutdownNow办法将状态改为了STOP,
37 同时设置Worker中断位。那么此时在该处的第二个判断Thread.interrupted()返回true,
38 同时线程池的状态此时曾经改为了STOP,那么也会去中断这个线程(留神,这里说的
39 乃至整个ThreadPoolExecutor中我说的中断线程并不是会去真的中断,
40 wt.interrupt()只是会设置一个中断标记位,须要使用者在run办法中首先
41 通过isInterrupted办法去进行判断,是否应该执行接下来的业务代码)
42 */
43 if ((runStateAtLeast(ctl.get(), STOP) ||
44 (Thread.interrupted() &&
45 runStateAtLeast(ctl.get(), STOP))) &&
46 !wt.isInterrupted())
47 wt.interrupt();
48 try {
49 //钩子办法,空实现
50 beforeExecute(wt, task);
51 Throwable thrown = null;
52 try {
53 //这里就是在具体执行线程的工作了(也就是使用者具体写的工作)
54 task.run();
55 } catch (RuntimeException x) {
56 thrown = x;
57 throw x;
58 } catch (Error x) {
59 thrown = x;
60 throw x;
61 } catch (Throwable x) {
62 thrown = x;
63 throw new Error(x);
64 } finally {
65 //钩子办法,空实现
66 afterExecute(task, thrown);
67 }
68 } finally {
69 //这里将task置为null,下次循环的时候就会在阻塞队列中拿取下一个工作了
70 task = null;
71 //实现的工作数+1
72 w.completedTasks++;
73 //开释锁
74 w.unlock();
75 }
76 }
77 //循环执行下面的while循环来拿取工作,而走到这里阐明Worker和阻塞队列中都曾经没有了工作
78 completedAbruptly = false;
79 } finally {
80 //最初对Worker做收尾工作
81 processWorkerExit(w, completedAbruptly);
82 }
83 }
84
85 private void processWorkerExit(Worker w, boolean completedAbruptly) {
86 /*
87 completedAbruptly为true示意在runWorker办法中的while循环中抛出了异样,那么此时
88 工作线程是没有-1的,须要-1(失常状况下在while循环最初一次调用getTask办法中会-1)
89 */
90 if (completedAbruptly)
91 decrementWorkerCount();
92
93 final ReentrantLock mainLock = this.mainLock;
94 //上锁
95 mainLock.lock();
96 try {
97 //累加所有Worker曾经实现的工作数,用于统计监控
98 completedTaskCount += w.completedTasks;
99 /*
100 把以后Worker(也就是以后线程)剔除出workers汇合中,期待GC
101 留神,能走到这里,阐明在getTask办法中的timed标记位必定为true(为false的话就会在getTask办法中的
102 take办法中始终被阻塞,中断唤醒也不可能,因为这种状况下还是会持续在getTask办法中循环)。那么无外乎两种状况,
103 要么是闲暇的外围线程超时须要被销毁,要么是闲暇的非核心线程超时须要被销毁。不论属于哪一种,以后线程都是
104 要被销毁的
105 */
106 workers.remove(w);
107 } finally {
108 //开释锁
109 mainLock.unlock();
110 }
111
112 //依据线程池状态来判断是否应该完结线程池
113 tryTerminate();
114
115 int c = ctl.get();
116 //如果以后线程池处在RUNNING或SHUTDOWN状态
117 if (runStateLessThan(c, STOP)) {
118 //通过之前的剖析,如果completedAbruptly为false,表明此时曾经没有工作能够执行了
119 if (!completedAbruptly) {
120 //如果allowCoreThreadTimeOut为true,min就为0,否则为外围线程数
121 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
122 /*
123 如果阻塞队列不为空(可能代码执行到这里阻塞队列中又有数据了),并且allowCoreThreadTimeOut
124 为true,就将min改为1
125 */
126 if (min == 0 && !workQueue.isEmpty())
127 min = 1;
128 /*
129 两种状况:
130 <1>如果阻塞队列不为空,并且allowCoreThreadTimeOut为true,就判断一下当前工作线程数是否大于等于1,
131 如果是的话就间接返回,不是的话阐明以后没有工作线程了,就增加一个非核心线程去执行阻塞队列中的工作
132 <2>如果allowCoreThreadTimeOut为false,就判断一下下当前工作线程数是否大于等于外围线程数,如果是
133 的话就间接返回,不是的话阐明当前工作线程数小于外围线程数,那么也去增加一个非核心线程
134 */
135 if (workerCountOf(c) >= min)
136 return;
137 }
138 /*
139 下面曾经剖析了在completedAbruptly为false时的两种状况,上面来剖析第三种状况,也就是completedAbruptly为
140 true的时候。completedAbruptly为true示意在runWorker办法中的while循环中抛出了异样,那么也去增加一个
141 非核心线程(尽管之前那个报错的工作是会在finally子句中被清空的,然而在这之前使用者能够覆写afterExecute
142 钩子办法,在其中保留这个执行失败的工作,以此来进行后续的解决。从这个角度上来说,增加一个非核心线程还是
143 有意义的。另外,如之前的剖析,在addWorker办法中的第34行代码处,外围线程和非核心线程的区别仅在于阈值的判断上,
144 其余都是一样的。所以这里增加一个非核心线程也是能够的,反正没达到阈值)
145 */
146 addWorker(null, false);
147 }
148 }
6 getTask办法
由上所示,在第24行代码处,当本Worker中的task工作为空时,就会从阻塞队列中拿取工作,也就是调用到getTask办法:
1 /**
2 * ThreadPoolExecutor:
3 */
4 private Runnable getTask() {
5 //timedOut标记位用来判断poll办法拿取工作是否超时了
6 boolean timedOut = false;
7
8 for (; ; ) {
9 int c = ctl.get();
10 //从新获取以后线程池的运行状态
11 int rs = runStateOf(c);
12
13 /*
14 如果以后线程池是SHUTDOWN状态,并且阻塞队列为空的时候;或者以后线程池的状态大于等于STOP
15 以上两种状况都会将工作线程-1,间接返回null。因为这两种状况下不须要
16 获取工作了。工作线程-1后,后续会在processWorkerExit办法中从workers汇合中剔除掉这个Worker期待GC的
17 */
18 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
19 decrementWorkerCount();
20 return null;
21 }
22
23 /*
24 走到这里阐明以后线程池要么是RUNNING状态,要么是SHUTDOWN状态然而阻塞队列不为空(SHUTDOWN状态还是要
25 解决阻塞队列中的工作的)
26
27 从新获取以后线程池的工作线程数
28 */
29 int wc = workerCountOf(c);
30
31 /*
32 timed标记位示意工作线程是否须要超时销毁
33 如果allowCoreThreadTimeOut设置为true(示意闲暇的外围线程也是要超时销毁的),或者以后线程数大于
34 外围线程数(这个条件代表的是闲暇的非核心线程是要被销毁的,如果allowCoreThreadTimeOut为false,
35 那么线程池中最多保留“传进线程池中的外围线程数”个线程),就将timed置为true
36 */
37 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
38
39 /*
40 如果当前工作线程数大于最大线程数,可能是调用了setMaximumPoolSize办法,把最大线程数改小了(走到这里
41 阐明addWorker办法运行胜利,而在addWorker办法中的第34行代码处曾经判断了大于最大线程数的状况);
42 timedOut为true阐明以后曾经不是第一次循环了,在上次循环中曾经产生了poll的超时。所以总结来说这个if条件的意思是:
43 <1.1>如果当前工作线程数大于最大线程数
44 <1.2>或者以后线程处于闲暇状态并且是须要被销毁的
45 <2.1>并且当前工作线程要有多于一个
46 <2.2>或者以后阻塞队列是空的
47 满足下面两个条件,就将工作线程-1,去掉以后这个多余的线程,而后间接返回
48 */
49 if ((wc > maximumPoolSize || (timed && timedOut))
50 && (wc > 1 || workQueue.isEmpty())) {
51 //这里的办法和decrementWorkerCount办法的区别是不会死循环去始终CAS尝试,如果失败了就间接返回false
52 if (compareAndDecrementWorkerCount(c))
53 return null;
54 //如果CAS-1失败了,就进入到下次循环中持续判断即可
55 continue;
56 }
57
58 try {
59 /*
60 如果timed为true,则通过poll办法进行限时拿取(超过keepAliveTime工夫没有拿取到,就间接返回null),
61 否则通过take办法进行拿取(如果阻塞队列为空,take办法在此时就会被阻塞住,也就是本线程会被阻塞住,直到
62 阻塞队列中有数据了。也就是说如果timed为false的话,这些工作线程会始终被阻塞在这里)
63 */
64 Runnable r = timed ?
65 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
66 workQueue.take();
67 if (r != null)
68 //如果拿取到工作了,就间接返回给Worker解决
69 return r;
70 /*
71 走到这里阐明产生了poll超时,那么将timedOut标记地位为true,进入到下一次循环中重试
72 (大概率会走到第53行代码处返回null)
73 */
74 timedOut = true;
75 } catch (InterruptedException retry) {
76 //如果在阻塞的过程中产生了中断,那么将timedOut置为false,也进入到下一次循环中重试
77 timedOut = false;
78 }
79 }
80 /*
81 以上的逻辑阐明了:外围线程和非核心线程的区别并不是在Worker中有个示意是否是外围线程的属性,Worker是无状态的,
82 每个Worker都是一样的。而辨别是通过判断当前工作线程数是否大于外围线程数来进行的(因为只有阻塞队列满了的时候
83 才会去创立新的非核心线程,也就会使工作线程数大于外围线程数)。如果大于,那么不论之前这个线程到底是外围线程
84 还是非核心线程,当初我就认定以后这个线程就是“非核心线程“,那么等这个“非核心线程”闲暇工夫超过keepAliveTime后,
85 就会被销毁
86 */
87 }
7 shutdown办法
敞开线程池时个别调用的是shutdown办法,而不是shutdownNow办法:
1 /**
2 * ThreadPoolExecutor:
3 */
4 public void shutdown() {
5 final ReentrantLock mainLock = this.mainLock;
6 //上锁
7 mainLock.lock();
8 try {
9 //如果有平安管理器,确保调用者有权限敞开线程池(本文不开展剖析)
10 checkShutdownAccess();
11 //将线程池状态改为SHUTDOWN,外面应用了死循环确保CAS操作肯定胜利
12 advanceRunState(SHUTDOWN);
13 interruptIdleWorkers();
14 //钩子办法,空实现
15 onShutdown();
16 } finally {
17 //开释锁
18 mainLock.unlock();
19 }
20 //依据线程池状态来判断是否应该完结线程池
21 tryTerminate();
22 }
23
24 /**
25 * 第13行代码处:
26 */
27 private void interruptIdleWorkers() {
28 //中断所有的闲暇线程
29 interruptIdleWorkers(false);
30 }
31
32 private void interruptIdleWorkers(boolean onlyOne) {
33 final ReentrantLock mainLock = this.mainLock;
34 //上锁
35 mainLock.lock();
36 try {
37 for (Worker w : workers) {
38 Thread t = w.thread;
39 if (!t.isInterrupted() && w.tryLock()) {
40 try {
41 /*
42 如果以后Worker中的线程没有被中断过,且尝试加锁胜利,就将
43 中断标记位从新置为true,意思就是说要中断这个闲暇的Worker
44 */
45 t.interrupt();
46 } catch (SecurityException ignore) {
47 } finally {
48 //将AQS中的state复位为0,复原为tryLock之前的状态
49 w.unlock();
50 }
51 }
52 if (onlyOne)
53 //如果onlyOne为true,就只尝试中断一次
54 break;
55 }
56 } finally {
57 //开释锁
58 mainLock.unlock();
59 }
60 }
8 tryTerminate办法
在下面的实现中能够看到有多处调用到了tryTerminate办法,以此来判断以后线程池是否应该完结:
1 /**
2 * ThreadPoolExecutor:
3 * (注:该办法放在最初再看比拟好)
4 */
5 final void tryTerminate() {
6 for (; ; ) {
7 int c = ctl.get();
8 /*
9 <1>如果以后线程池是RUNNING状态,就间接返回,因为这时候不须要完结线程池
10 <2>如果以后线程池是TIDYING或TERMINATED状态,也间接返回,这时候就等着
11 批改状态的那个线程把terminated办法执行结束就行了
12 <3>如果以后线程池是SHUTDOWN状态并且阻塞队列不为空,也间接返回,因为这时
13 候还是要去执行阻塞队列中的工作的,不能扭转线程池状态
14 */
15 if (isRunning(c) ||
16 runStateAtLeast(c, TIDYING) ||
17 (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
18 return;
19 /*
20 走到这里阐明有两种状况,要么以后线程池是STOP状态,要么以后线程池是SHUTDOWN状态并且阻塞队列为空
21 这个时候是否能够完结线程池还要查看一下以后的工作线程数,如果不为0,阐明以后线程不是最初一个执行工作
22 的线程(因为如果以后要销毁的线程是闲暇状态,会最终在getTask办法中实现-1的动作(执行时抛出异样会
23 在processWorkerExit办法中实现-1),也就是说每个应该要销毁的闲暇线程在最初拿取不到工作时都会-1的,
24 所以如果发现当前工作线程数没有减到0的话,就阐明以后线程不是最初一个执行线程),那么就不会完结线程池
25 (完结线程池的工作交给最初一个线程来做)。这里ONLY_ONE永远为true,也就是说如果以后线程不是最初一个
26 执行工作的线程的话,那么就只是中断一个闲暇的线程而已(相当于中断本人),而后就间接返回就行了
27 */
28 if (workerCountOf(c) != 0) {
29 interruptIdleWorkers(ONLY_ONE);
30 return;
31 }
32
33 /*
34 走到这里阐明当前工作线程数曾经为0了,也就是说以后线程是最初一个执行工作的线程,
35 此时须要实现完结线程池的动作
36 */
37 final ReentrantLock mainLock = this.mainLock;
38 //上锁
39 mainLock.lock();
40 try {
41 //CAS将ctl状态改为TIDYING
42 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
43 try {
44 //钩子办法,空实现
45 terminated();
46 } finally {
47 //在执行完terminated办法后,将线程池状态置为TERMINATED
48 ctl.set(ctlOf(TERMINATED, 0));
49 /*
50 可能在此之前某线程调用了awaitTermination办法,始终处在阻塞中,
51 并且没有超时,也没有产生中断。那么在完结线程池的此时就须要唤醒这些线程了
52 */
53 termination.signalAll();
54 }
55 return;
56 }
57 } finally {
58 //开释锁
59 mainLock.unlock();
60 }
61 //走到这里阐明之前的CAS将状态改为TIDYING失败了,那么就从头开始重试
62 }
63 }
更多内容请关注微信公众号:奇客工夫
发表回复