关于java:ThreadPoolExecutor源码分析面试问烂了的Java线程池执行流程如果要问你具体的执行细节你还会吗

6次阅读

共计 17998 个字符,预计需要花费 45 分钟才能阅读完成。

Java 版本:8u261。

对于 Java 中的线程池,面试问的最多的就是线程池中各个参数的含意,又或者是线程池执行的流程,彷佛这已成为了固定的模式与套路。然而如果我是面试官,当初我想问一些更粗疏的问题,你还能答得上来吗?比方:

  1. 线程池是如何实现线程复用的?
  2. 如果一个线程执行工作的时候抛出异样,那么这个工作是否会被抛弃?
  3. 以后线程池中有十个线程,其中一个线程正在执行工作,那么剩下的九个线程正在处于一种什么状态呢?

置信如果没有看过线程池的相干源码实现,这些问题是很难答复得完满的。同时这些问题往深了问还会引出 Java 中阻塞队列以及 AQS 的实现,你都能接得住吗?

1 简介

因为线程是稀缺资源,如果在高并发的状况下被无限度地创立和销毁,不仅会耗费系统资源,还会升高零碎的稳定性。所以线程池的呈现就是为了解决这些问题的。线程池通过重用曾经存在的线程资源,缩小线程创立和销毁的次数,进步了性能。同时还能够进行对立的调配、调优和监控。

在 Java 中,能够通过 Executors 类中的 newFixedThreadPool、newCachedThreadPool,newScheduledThreadPool 或者其余形式来创立各种线程池,它们都会间接或间接地通过 ThreadPoolExecutor 来进行构建,通过传入不同的参数来实现不同成果的线程池(newScheduledThreadPool 比拟非凡,它重写了局部 ThreadPoolExecutor 的逻辑,后续我会写一篇对 ScheduledThreadPoolExecutor 进行源码剖析的文章)。

1.1 线程池参数

在 ThreadPoolExecutor 中共有七个参数:

  • corePoolSize:外围线程数,外围线程会始终存活,即便没有工作须要执行(除非 allowCoreThreadTimeOut 参数设置为 true,这样的话即便是外围线程也会被超时销毁);
  • maximumPoolSize:线程池中容许的最大线程数;
  • keepAliveTime:保护工作线程所容许的闲暇工夫,如果工作线程期待的工夫超过了 keepAliveTime,则会被销毁;
  • unit:指定 keepAliveTime 的单位,如 TimeUnit.SECONDS;
  • workQueue:用来保留期待被执行工作的阻塞队列。罕用的有:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue 和 PriorityBlockingQueue 等;
  • threadFactory:线程工厂,提供创立新线程的性能。默认的实现是 Executors.defaultThreadFactory(),即通过 new Thread 的形式;
  • handler:如果以后阻塞队列已满,并且以后的线程数量已超过了最大线程数,则会执行相应的回绝策略。具体有四种(也能够本人实现):
    • AbortPolicy:默认实现,会间接抛出 RejectedExecutionException;
    • CallerRunsPolicy:用调用者所在的线程来执行工作;
    • DiscardPolicy:间接摈弃,工作不执行;
    • DiscardOldestPolicy:抛弃阻塞队列中最靠前的工作,并执行当前任务。

这四种回绝策略的实现很简略,这里就不再过多展现阐明了,读者可自行查看。

1.2 运行过程

ThreadPoolExecutor 的大抵运行过程如下:

如果应用的是有界阻塞队列:

有新的工作须要执行,并且以后线程池的线程数小于外围线程数,则创立一个外围线程来执行。如果以后线程数大于外围线程数,则会将除了外围线程解决的工作之外剩下的工作退出到阻塞队列中期待执行。如果队列已满,则在以后线程数不大于最大线程数的前提下,创立新的非核心线程,处理完毕后等达到 keepAliveTime 闲暇工夫后会被间接销毁(留神,不肯定销毁的就是这些非核心线程,外围线程也可能被销毁,只有减到残余线程数达到外围线程数就行。外围线程和非核心线程的区别仅在于判断是否达到阈值时有区别:外围线程判断的是外围线程数,而非核心线程判断的是最大线程数。仅此一个区别。前面讲源码时会再强调这一点)。如果以后线程数大于最大线程数,则会执行相应的回绝策略。

如果应用的是无界阻塞队列:

与有界阻塞队列相比,除非系统资源耗尽,否则无界的阻塞队列不存在工作入队失败的状况。当有新工作到来,零碎的线程数小于外围线程数时,则创立一个外围线程来执行。当达到外围线程数后,就不会持续减少。若后续仍有新的工作退出,而没有闲暇的线程资源,则工作间接进入阻塞队列中进行期待。如果工作创立和解决工作的速度差别很大,无界阻塞队列会放弃快速增长,直到耗尽零碎内存。

1.3 线程池状态

在 ThreadPoolExecutor 中存在五种状态:

  • RUNNING:初始状态,在此状态下可能接管新工作,以及对曾经增加的工作进行解决;
  • SHUTDOWN:通过调用 shutdown 办法,线程池转成 SHUTDOWN 状态。此时不再接管新工作,然而能解决曾经增加的工作;
  • STOP:通过调用 shutdownNow 办法,线程池转成 STOP 状态。此时不再接管新工作,不解决曾经增加的工作,并且会中断正在解决的工作;
  • TIDYING:当线程池中所有的工作曾经终止了,工作数量为 0 并且阻塞队列为空的时候,会进入到 TIDYING 状态。此时会调用一个钩子办法 terminated,它是一个空的实现,能够供调用者覆写;
  • TREMINATED:线程池彻底终止的状态。当线程池处于 TIDYING 状态时,执行完 terminated 办法后,就会进入到该状态。

在 ThreadPoolExecutor 中状态是通过 ctl 属性中的高 3 位来示意的:

 1 //ctl 中蕴含两局部信息:高 3 位示意运行状态,低 29 位保留工作线程数量,初始状态是 RUNNING
 2 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 3 //29
 4 private static final int COUNT_BITS = Integer.SIZE - 3;
 5 // 1 左移 29 位后 -1,也就是 29 个 1。用来示意工作线程数量的最大值
 6 private static final int CAPACITY = (1 << COUNT_BITS) - 1;
 7
 8 //ctl 高 3 位为 111(低 29 位都为 0)9 private static final int RUNNING = -1 << COUNT_BITS;
10 //ctl 高 3 位为 000(低 29 位都为 0)11 private static final int SHUTDOWN = 0 << COUNT_BITS;
12 //ctl 高 3 位为 001(低 29 位都为 0)13 private static final int STOP = 1 << COUNT_BITS;
14 //ctl 高 3 位为 010(低 29 位都为 0)15 private static final int TIDYING = 2 << COUNT_BITS;
16 //ctl 高 3 位为 011(低 29 位都为 0)17 private static final int TERMINATED = 3 << COUNT_BITS;
18
19 // 获取 ctl 的高 3 位(低 29 位都为 0)也就是获取运行状态
20 private static int runStateOf(int c) {
21     return c & ~CAPACITY;
22 }
23
24 // 获取 ctl 的低 29 位(高 3 位都为 0)也就是获取工作线程数量
25 private static int workerCountOf(int c) {
26     return c & CAPACITY;
27 }
28
29 // 用来获取运行状态和工作线程数量拼接起来的值
30 private static int ctlOf(int rs, int wc) {
31     return rs | wc;
32 }
33
34 // 判断 ctl 是否小于 s 所代表状态的值
35 private static boolean runStateLessThan(int c, int s) {
36     return c < s;
37 }
38
39 // 判断 ctl 是否大于等于 s 所代表状态的值
40 private static boolean runStateAtLeast(int c, int s) {
41     return c >= s;
42 }
43
44 // 判断 ctl 此时是否是 RUNNING 状态
45 private static boolean isRunning(int c) {
46     return c < SHUTDOWN;
47 }

1.4 Worker

Worker 是 ThreadPoolExecutor 中的一个外部类,用来封装工作线程:

 1 private final class Worker
 2         extends AbstractQueuedSynchronizer
 3         implements Runnable {
 4     //...
 5
 6     // 正在运行 Worker 的线程
 7     final Thread thread;
 8     // 传入的工作
 9     Runnable firstTask;
10     // 本 Worker 已实现的工作数,用于后续的统计与监控
11     volatile long completedTasks;
12
13     Worker(Runnable firstTask) {
14         /*
15         这里设置 AQS 的 state 初始为 - 1 是为了将线程产生中断的动作提早到工作真正开始运行的时候,换句话说就是
16         禁止在执行工作前对线程进行中断。在调用一些像 shutdown 和 shutdownNow 等办法中会去中断线程,而在
17         中断前会调用 tryLock 办法尝试加锁。而这里设置为 - 1 后,tryLock 办法就会返回为 false,所以就不能中断了
18          */
19         setState(-1);
20         this.firstTask = firstTask;
21         this.thread = getThreadFactory().newThread(this);
22     }
23
24     // 因为 Worker 类实现了 Runnable 接口,所以当调用 thread.start 办法时最终会调用到此处运行
25     public void run() {26         runWorker(this);
27     }
28
29     //...
30 }

由上能够看到 Worker 继承了 AQS(我之前写过对 AQS、ReentrantLock 和阻塞队列进行源码剖析的文章,感兴趣的能够查看 AQS 源码深入分析之独占模式 -ReentrantLock 锁个性详解、AQS 源码深入分析之条件队列 - 你晓得 Java 中的阻塞队列是如何实现的吗?),并实现了 Runnable 接口。之后在剖析源码时将会看到:在运行 Worker 的时候之所以没有用 ReentrantLock 作为独占锁来应用是因为这里是要求不可重入的,而 ReentrantLock 是可重入锁。在像一些 setCorePoolSize 办法去手动更改外围线程数时,如果批改的值比本来的小,那么多余的线程会被中断、会中断正在运行着的的线程。所以应用本人实现的不可重入独占锁而不是应用 ReentrantLock 就是为了不想让像 setCorePoolSize 这样的办法来从新获取到锁资源,不想让正在运行的线程产生自我中断。其实下面所说的内容在 Worker 类的正文中都曾经解释了:

2 结构器

 1 /**
 2  * ThreadPoolExecutor:
 3  * 全参数结构器,其余结构器最终都会调用到这里
 4  */
 5 public ThreadPoolExecutor(int corePoolSize,
 6                           int maximumPoolSize,
 7                           long keepAliveTime,
 8                           TimeUnit unit,
 9                           BlockingQueue<Runnable> workQueue,
10                           ThreadFactory threadFactory,
11                           RejectedExecutionHandler handler) {
12     // 非法参数校验
13     if (corePoolSize < 0 ||
14             maximumPoolSize <= 0 ||
15             maximumPoolSize < corePoolSize ||
16             keepAliveTime < 0)
17         throw new IllegalArgumentException();
18     // 非空校验
19     if (workQueue == null || threadFactory == null || handler == null)
20         throw new NullPointerException();
21     // 如果平安管理器不为空,就进行权限拜访(本文不开展剖析)22     this.acc = System.getSecurityManager() == null ?
23             null :
24             AccessController.getContext();
25     this.corePoolSize = corePoolSize;
26     this.maximumPoolSize = maximumPoolSize;
27     this.workQueue = workQueue;
28     // 将 keepAliveTime 转换成纳秒
29     this.keepAliveTime = unit.toNanos(keepAliveTime);
30     this.threadFactory = threadFactory;
31     this.handler = handler;
32 }

3 execute 办法

 1 /**
 2  * ThreadPoolExecutor:
 3  */
 4 public void execute(Runnable command) {
 5     // 非空校验
 6     if (command == null)
 7         throw new NullPointerException();
 8     int c = ctl.get();
 9     // 如果以后线程数小于外围线程数的话,就间接创立一个外围线程
10     if (workerCountOf(c) < corePoolSize) {11         if (addWorker(command, true))
12             return;
13         /*
14         增加失败(可能是线程池状态是 SHUTDOWN 或以上的状态(SHUTDOWN 状态下不再接管
15         新工作),也可能是线程数超过阈值了),就从新获取一下 ctl 的值,走上面的逻辑
16          */
17         c = ctl.get();
18     }
19     /*
20     走到这里阐明以后线程数大于等于外围线程数,又或者是下面增加外围线程失败中解释的状况
21     此时就判断一下以后线程池是否是 RUNNING 状态,如果是的话就往阻塞队列入队
22     这里 offer 跟 put 的区别是如果队列已满,offer 不会被阻塞,而是立刻返回 false
23      */
24     if (isRunning(c) && workQueue.offer(command)) {25         int recheck = ctl.get();
26         /*
27         这里会再次查看一次以后线程池是否是 RUNNING 状态,可能此时线程池曾经 shutdown 了
28         如果不是 RUNNING 状态,就删除下面入队的工作,并执行相应的回绝策略
29          */
30         if (!isRunning(recheck) && remove(command))
31             reject(command);
32         /*
33         此时还会去判断一下是否以后的工作线程数曾经为 0 了(可能这些线程在上次 workerCountOf
34         查看后(第 10 行代码处)被销毁了(allowCoreThreadTimeOut 设置为 true)),如果是
35         的话就新创建一个空工作的非核心线程。留神,这里传进 addWorker 办法的是空工作,因为工作
36         曾经在阻塞队列中存在了,所以这个 Worker 执行的时候,会间接从阻塞队列中取出工作来执行
37         所以说这里的意义也就是要保障线程池在 RUNNING 状态下必须要有一个线程来执行工作
38          */
39         else if (workerCountOf(recheck) == 0)
40             addWorker(null, false);
41     } else if (!addWorker(command, false))
42         /*
43         走到这里阐明线程池不是 RUNNING 状态,或者阻塞队列已满,此时创立一个非核心线程去执行
44         如果创立失败,阐明线程池的状态曾经不是 RUNNING 了,又或者以后线程数曾经大于等于最大线程数了
45         那么就执行相应的回绝策略
46          */
47         reject(command);
48 }
49
50 /**
51  * 第 30 行代码处:52  */
53 public boolean remove(Runnable task) {
54     // 阻塞队列中删除这个工作
55     boolean removed = workQueue.remove(task);
56     // 依据线程池状态来判断是否应该完结线程池
57     tryTerminate(); 
58     return removed;
59 }
60
61 /**
62  * 第 31 行和第 47 行代码处:63  */
64 final void reject(Runnable command) {
65     // 依据是哪种回绝策略,来具体执行其中的逻辑(具体的四种回绝策略的代码这里就不再看了,都是很简略的)66     handler.rejectedExecution(command, this);
67 }

4 addWorker 办法

在下面增加工作时会调用到 addWorker 办法:

  1 /**
  2  * ThreadPoolExecutor:
  3  */
  4 private boolean addWorker(Runnable firstTask, boolean core) {
  5     retry:
  6     for (; ;) {7         int c = ctl.get();
  8         // 获取以后线程池的运行状态
  9         int rs = runStateOf(c);
 10 
 11         /*
 12         如果以后线程池状态大于 SHUTDOWN,就间接返回 false,示意不再增加新的 Worker
 13         如果以后线程池的状态是 SHUTDOWN(此时不再接管新的工作,然而还是会持续解决
 14         阻塞队列中的工作),然而 firstTask 不为 null(相当于新的工作)或者阻塞队列为空
 15(为空阐明也没有必要去创立 Worker 了)的话,也间接返回 false,不再增加新的 Worker
 16          */
 17         if (rs >= SHUTDOWN &&
 18                 !(rs == SHUTDOWN &&
 19                         firstTask == null &&
 20                         !workQueue.isEmpty()))
 21             return false;
 22 
 23         for (; ;) {
 24             // 从新获取以后线程池的工作线程数
 25             int wc = workerCountOf(c);
 26             /*
 27             <1> 如果以后线程数大于等于最大值;28             <2.1> 如果是外围线程,以后线程数大于等于外围线程数;29             <2.2> 如果是非核心线程,以后线程数大于等于最大线程数
 30             以上两个条件任意一个满足,就阐明以后线程数曾经达到阈值了,31             也间接返回 false,不再增加新的工作
 32              */
 33             if (wc >= CAPACITY ||
 34                     wc >= (core ? corePoolSize : maximumPoolSize))
 35                 return false;
 36             /*
 37             CAS 尝试对 ctl+1,也就是工作线程数量 +1。如果胜利了,就跳出死循环,38             从第 58 行代码处持续往下执行
 39              */
 40             if (compareAndIncrementWorkerCount(c))
 41                 break retry;
 42             // 如果 CAS+ 1 失败了,从新读此时 ctl 的最新值
 43             c = ctl.get();
 44             /*
 45             如果发现此时的运行状态和之前刚进入该办法时的运行状态不相等,46             阐明在此期间产生了状态的扭转,那么就从头开始重试
 47              */
 48             if (runStateOf(c) != rs)
 49                 continue retry;
 50             /*
 51             走到这里阐明状态没有产生扭转,然而之前 ctl+ 1 的 CAS 操作失败了,那么从新从第 25
 52             行代码处持续往下执行
 53              */
 54         }
 55     }
 56
 57     // 下面的死循环次要是为了对 ctl 做 + 1 的操作,而上面是为了创立 Worker
 58     boolean workerStarted = false;
 59     boolean workerAdded = false;
 60     Worker w = null;
 61     try {
 62         // 依据 firstTask 来创立一个 Worker(如下面所说,AQS 中的 state 初始值为 -1,避免被中断)63         w = new Worker(firstTask);
 64         // 每一个 Worker 都会创立一个 Thread
 65         final Thread t = w.thread;
 66         if (t != null) {
 67             final ReentrantLock mainLock = this.mainLock;
 68             // 上锁
 69             mainLock.lock();
 70             try {
 71                 // 从新获取以后线程池的运行状态
 72                 int rs = runStateOf(ctl.get());
 73 
 74                 /*
 75                 如果线程池以后状态是 RUNNING 状态,或者是 SHUTDOWN 状态并且 firstTask
 76                 为空(意味着不去解决新工作而是去解决阻塞队列中的工作),能力将创立的
 77                 新 Worker 增加到 workers 汇合中
 78                  */
 79                 if (rs < SHUTDOWN ||
 80                         (rs == SHUTDOWN && firstTask == null)) {
 81                     /*
 82                     此时线程还没有 start,然而 isAlive 办法返回 true,阐明这个线程是有问题的,83                     间接抛出异样
 84                      */
 85                     if (t.isAlive())
 86                         throw new IllegalThreadStateException();
 87                     // 在 workers 汇合(HashSet,因为曾经加锁了,所以 HashSet 就行)外面增加本 Worker
 88                     workers.add(w);
 89                     int s = workers.size();
 90                     /*
 91                     如果以后线程池中线程数量超过了 largestPoolSize,就更新一下 largestPoolSize 为
 92                     以后线程数量,即 largestPoolSize 中保留着线程池中呈现过的最大线程数,用于统计监控
 93                      */
 94                     if (s > largestPoolSize)
 95                         largestPoolSize = s;
 96                     // 创立 Worker 胜利
 97                     workerAdded = true;
 98                 }
 99             } finally {
100                 // 开释锁
101                 mainLock.unlock();
102             }
103             if (workerAdded) {
104                 // 如果下面 workers 汇合增加 Worker 胜利,就用 Worker 中的 thread 来启动线程
105                 t.start();
106                 workerStarted = true;
107             }
108         }
109     } finally {110         if (!workerStarted)
111             // 如果没增加胜利,就执行失败解决
112             addWorkerFailed(w);
113     }
114     return workerStarted;
115 }
116
117 private void addWorkerFailed(Worker w) {
118     final ReentrantLock mainLock = this.mainLock;
119     // 上锁
120     mainLock.lock();
121     try {
122         // 如果之前创立 Worker 胜利了,就从 workers 汇合中删除它
123         if (w != null)
124             workers.remove(w);
125         // 将 ctl-1,外面应用了死循环确保 CAS 操作肯定胜利
126         decrementWorkerCount();
127         // 依据线程池状态来判断是否应该完结线程池
128         tryTerminate();
129     } finally {
130         // 开释锁
131         mainLock.unlock();
132     }
133 }

5 runWorker 办法

因为 Worker 类实现了 Runnable 接口,所以当调用 thread.start 办法时最终会调用到 Worker 的 run 办法处:

  1 /**
  2  * ThreadPoolExecutor:
  3  * 当调用 t.start()办法时最终会调用到此处
  4  */
  5 public void run() {6     runWorker(this);
  7 }
  8
  9 final void runWorker(Worker w) {
 10     // 获取以后线程(以后线程也就是在 Worker 中的 thread)11     Thread wt = Thread.currentThread();
 12     Runnable task = w.firstTask;
 13     // 把 Worker 中的 firstTask 清空,因为上面要执行它了
 14     w.firstTask = null;
 15     /*
 16     因为之前创立 Worker 的时候将 AQS 的 state 初始为 -1,是为了避免线程被中断
 17     而这里 unlock 办法是把 state 重置为 0,意思就是曾经进入到 runWorker 办法
 18     中,能够容许中断了
 19      */
 20     w.unlock();
 21     boolean completedAbruptly = true;
 22     try {
 23         // 如果 task 不为空,或者从阻塞队列中拿取到工作了
 24         while (task != null || (task = getTask()) != null) {
 25             /*
 26             上锁(留神,这里是用 Worker 而不是 ReentrantLock 来加锁的,为了确保
 27             以下的代码不会被同一线程所重入,同时能够做到不同线程能够并发执行)28              */
 29             w.lock();
 30             /*
 31             如果以后线程池状态大于等于 STOP,确保以后线程也是须要中断的(因为这个时候要
 32             完结线程池了,不能再增加新的线程);否则如果在下面这个判断不满足之后调用了 shutdownNow
 33             办法的时候(留神,shutdownNow 办法是 ReentrantLock 上锁,而代码走到
 34             这里是以后 Worker 上锁,两者上的不是同一个锁,所以能够并发执行),35             之前的状态要么是 RUNNING 要么是 SHUTDOWN,在走完第一个 runStateAtLeast
 36             判断条件发现不满足后,当初执行了 shutdownNow 办法将状态改为了 STOP,37             同时设置 Worker 中断位。那么此时在该处的第二个判断 Thread.interrupted()返回 true,38             同时线程池的状态此时曾经改为了 STOP,那么也会去中断这个线程(留神,这里说的
 39             乃至整个 ThreadPoolExecutor 中我说的中断线程并不是会去真的中断,40             wt.interrupt()只是会设置一个中断标记位,须要使用者在 run 办法中首先
 41             通过 isInterrupted 办法去进行判断,是否应该执行接下来的业务代码)42              */
 43             if ((runStateAtLeast(ctl.get(), STOP) ||
 44                     (Thread.interrupted() &&
 45                             runStateAtLeast(ctl.get(), STOP))) &&
 46                     !wt.isInterrupted())
 47                 wt.interrupt();
 48             try {
 49                 // 钩子办法,空实现
 50                 beforeExecute(wt, task);
 51                 Throwable thrown = null;
 52                 try {
 53                     // 这里就是在具体执行线程的工作了(也就是使用者具体写的工作)54                     task.run();
 55                 } catch (RuntimeException x) {
 56                     thrown = x;
 57                     throw x;
 58                 } catch (Error x) {
 59                     thrown = x;
 60                     throw x;
 61                 } catch (Throwable x) {
 62                     thrown = x;
 63                     throw new Error(x);
 64                 } finally {
 65                     // 钩子办法,空实现
 66                     afterExecute(task, thrown);
 67                 }
 68             } finally {
 69                 // 这里将 task 置为 null,下次循环的时候就会在阻塞队列中拿取下一个工作了
 70                 task = null;
 71                 // 实现的工作数 +1
 72                 w.completedTasks++;
 73                 // 开释锁
 74                 w.unlock();
 75             }
 76         }
 77         // 循环执行下面的 while 循环来拿取工作,而走到这里阐明 Worker 和阻塞队列中都曾经没有了工作
 78         completedAbruptly = false;
 79     } finally {
 80         // 最初对 Worker 做收尾工作
 81         processWorkerExit(w, completedAbruptly);
 82     }
 83 }
 84
 85 private void processWorkerExit(Worker w, boolean completedAbruptly) {
 86     /*
 87     completedAbruptly 为 true 示意在 runWorker 办法中的 while 循环中抛出了异样,那么此时
 88     工作线程是没有 - 1 的,须要 -1(失常状况下在 while 循环最初一次调用 getTask 办法中会 -1)89      */
 90     if (completedAbruptly)
 91         decrementWorkerCount();
 92
 93     final ReentrantLock mainLock = this.mainLock;
 94     // 上锁
 95     mainLock.lock();
 96     try {
 97         // 累加所有 Worker 曾经实现的工作数,用于统计监控
 98         completedTaskCount += w.completedTasks;
 99         /*
100         把以后 Worker(也就是以后线程)剔除出 workers 汇合中,期待 GC
101         留神,能走到这里,阐明在 getTask 办法中的 timed 标记位必定为 true(为 false 的话就会在 getTask 办法中的
102         take 办法中始终被阻塞,中断唤醒也不可能,因为这种状况下还是会持续在 getTask 办法中循环)。那么无外乎两种状况,103         要么是闲暇的外围线程超时须要被销毁,要么是闲暇的非核心线程超时须要被销毁。不论属于哪一种,以后线程都是
104         要被销毁的
105          */
106         workers.remove(w);
107     } finally {
108         // 开释锁
109         mainLock.unlock();
110     }
111
112     // 依据线程池状态来判断是否应该完结线程池
113     tryTerminate();
114 
115     int c = ctl.get();
116     // 如果以后线程池处在 RUNNING 或 SHUTDOWN 状态
117     if (runStateLessThan(c, STOP)) {
118         // 通过之前的剖析,如果 completedAbruptly 为 false,表明此时曾经没有工作能够执行了
119         if (!completedAbruptly) {
120             // 如果 allowCoreThreadTimeOut 为 true,min 就为 0,否则为外围线程数
121             int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
122             /*
123             如果阻塞队列不为空(可能代码执行到这里阻塞队列中又有数据了),并且 allowCoreThreadTimeOut
124             为 true,就将 min 改为 1
125              */
126             if (min == 0 && !workQueue.isEmpty())
127                 min = 1;
128             /*
129             两种状况:130             <1> 如果阻塞队列不为空,并且 allowCoreThreadTimeOut 为 true,就判断一下当前工作线程数是否大于等于 1,131             如果是的话就间接返回,不是的话阐明以后没有工作线程了,就增加一个非核心线程去执行阻塞队列中的工作
132             <2> 如果 allowCoreThreadTimeOut 为 false,就判断一下下当前工作线程数是否大于等于外围线程数,如果是
133             的话就间接返回,不是的话阐明当前工作线程数小于外围线程数,那么也去增加一个非核心线程
134              */
135             if (workerCountOf(c) >= min)
136                 return;
137         }
138         /*
139         下面曾经剖析了在 completedAbruptly 为 false 时的两种状况,上面来剖析第三种状况,也就是 completedAbruptly 为
140         true 的时候。completedAbruptly 为 true 示意在 runWorker 办法中的 while 循环中抛出了异样,那么也去增加一个
141         非核心线程(尽管之前那个报错的工作是会在 finally 子句中被清空的,然而在这之前使用者能够覆写 afterExecute
142         钩子办法,在其中保留这个执行失败的工作,以此来进行后续的解决。从这个角度上来说,增加一个非核心线程还是
143         有意义的。另外,如之前的剖析,在 addWorker 办法中的第 34 行代码处,外围线程和非核心线程的区别仅在于阈值的判断上,144         其余都是一样的。所以这里增加一个非核心线程也是能够的,反正没达到阈值)145          */
146         addWorker(null, false);
147     }
148 }

6 getTask 办法

由上所示,在第 24 行代码处,当本 Worker 中的 task 工作为空时,就会从阻塞队列中拿取工作,也就是调用到 getTask 办法:

 1 /**
 2  * ThreadPoolExecutor:
 3  */
 4 private Runnable getTask() {
 5     //timedOut 标记位用来判断 poll 办法拿取工作是否超时了
 6     boolean timedOut = false;
 7 
 8     for (; ;) {9         int c = ctl.get();
10         // 从新获取以后线程池的运行状态
11         int rs = runStateOf(c);
12 
13         /*
14         如果以后线程池是 SHUTDOWN 状态,并且阻塞队列为空的时候;或者以后线程池的状态大于等于 STOP
15         以上两种状况都会将工作线程 -1,间接返回 null。因为这两种状况下不须要
16         获取工作了。工作线程 - 1 后,后续会在 processWorkerExit 办法中从 workers 汇合中剔除掉这个 Worker 期待 GC 的
17          */
18         if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {19             decrementWorkerCount();
20             return null;
21         }
22
23         /*
24         走到这里阐明以后线程池要么是 RUNNING 状态,要么是 SHUTDOWN 状态然而阻塞队列不为空(SHUTDOWN 状态还是要
25         解决阻塞队列中的工作的)26 
27         从新获取以后线程池的工作线程数
28          */
29         int wc = workerCountOf(c);
30 
31         /*
32         timed 标记位示意工作线程是否须要超时销毁
33         如果 allowCoreThreadTimeOut 设置为 true(示意闲暇的外围线程也是要超时销毁的),或者以后线程数大于
34         外围线程数(这个条件代表的是闲暇的非核心线程是要被销毁的,如果 allowCoreThreadTimeOut 为 false,35         那么线程池中最多保留“传进线程池中的外围线程数”个线程),就将 timed 置为 true
36          */
37         boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
38
39         /*
40         如果当前工作线程数大于最大线程数,可能是调用了 setMaximumPoolSize 办法,把最大线程数改小了(走到这里
41         阐明 addWorker 办法运行胜利,而在 addWorker 办法中的第 34 行代码处曾经判断了大于最大线程数的状况);42         timedOut 为 true 阐明以后曾经不是第一次循环了,在上次循环中曾经产生了 poll 的超时。所以总结来说这个 if 条件的意思是:43         <1.1> 如果当前工作线程数大于最大线程数
44         <1.2> 或者以后线程处于闲暇状态并且是须要被销毁的
45         <2.1> 并且当前工作线程要有多于一个
46         <2.2> 或者以后阻塞队列是空的
47         满足下面两个条件,就将工作线程 -1,去掉以后这个多余的线程,而后间接返回
48          */
49         if ((wc > maximumPoolSize || (timed && timedOut))
50                 && (wc > 1 || workQueue.isEmpty())) {
51             // 这里的办法和 decrementWorkerCount 办法的区别是不会死循环去始终 CAS 尝试,如果失败了就间接返回 false
52             if (compareAndDecrementWorkerCount(c))
53                 return null;
54             // 如果 CAS- 1 失败了,就进入到下次循环中持续判断即可
55             continue;
56         }
57
58         try {
59             /*
60             如果 timed 为 true,则通过 poll 办法进行限时拿取(超过 keepAliveTime 工夫没有拿取到,就间接返回 null),61             否则通过 take 办法进行拿取(如果阻塞队列为空,take 办法在此时就会被阻塞住,也就是本线程会被阻塞住,直到
62             阻塞队列中有数据了。也就是说如果 timed 为 false 的话,这些工作线程会始终被阻塞在这里)63              */
64             Runnable r = timed ?
65                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
66                     workQueue.take();
67             if (r != null)
68                 // 如果拿取到工作了,就间接返回给 Worker 解决
69                 return r;
70             /*
71             走到这里阐明产生了 poll 超时,那么将 timedOut 标记地位为 true,进入到下一次循环中重试
72(大概率会走到第 53 行代码处返回 null)73              */
74             timedOut = true;
75         } catch (InterruptedException retry) {
76             // 如果在阻塞的过程中产生了中断,那么将 timedOut 置为 false,也进入到下一次循环中重试
77             timedOut = false;
78         }
79     }
80     /*
81     以上的逻辑阐明了:外围线程和非核心线程的区别并不是在 Worker 中有个示意是否是外围线程的属性,Worker 是无状态的,82     每个 Worker 都是一样的。而辨别是通过判断当前工作线程数是否大于外围线程数来进行的(因为只有阻塞队列满了的时候
83     才会去创立新的非核心线程,也就会使工作线程数大于外围线程数)。如果大于,那么不论之前这个线程到底是外围线程
84     还是非核心线程,当初我就认定以后这个线程就是“非核心线程“,那么等这个“非核心线程”闲暇工夫超过 keepAliveTime 后,85     就会被销毁
86      */
87 }

7 shutdown 办法

敞开线程池时个别调用的是 shutdown 办法,而不是 shutdownNow 办法:

 1 /**
 2  * ThreadPoolExecutor:
 3  */
 4 public void shutdown() {
 5     final ReentrantLock mainLock = this.mainLock;
 6     // 上锁
 7     mainLock.lock();
 8     try {
 9         // 如果有平安管理器,确保调用者有权限敞开线程池(本文不开展剖析)10         checkShutdownAccess();
11         // 将线程池状态改为 SHUTDOWN,外面应用了死循环确保 CAS 操作肯定胜利
12         advanceRunState(SHUTDOWN);
13         interruptIdleWorkers();
14         // 钩子办法,空实现
15         onShutdown();
16     } finally {
17         // 开释锁
18         mainLock.unlock();
19     }
20     // 依据线程池状态来判断是否应该完结线程池
21     tryTerminate();
22 }
23
24 /**
25  * 第 13 行代码处:26  */
27 private void interruptIdleWorkers() {
28     // 中断所有的闲暇线程
29     interruptIdleWorkers(false);
30 }
31
32 private void interruptIdleWorkers(boolean onlyOne) {
33     final ReentrantLock mainLock = this.mainLock;
34     // 上锁
35     mainLock.lock();
36     try {37         for (Worker w : workers) {
38             Thread t = w.thread;
39             if (!t.isInterrupted() && w.tryLock()) {
40                 try {
41                     /*
42                     如果以后 Worker 中的线程没有被中断过,且尝试加锁胜利,就将
43                     中断标记位从新置为 true,意思就是说要中断这个闲暇的 Worker
44                      */
45                     t.interrupt();
46                 } catch (SecurityException ignore) {47} finally {
48                     // 将 AQS 中的 state 复位为 0,复原为 tryLock 之前的状态
49                     w.unlock();
50                 }
51             }
52             if (onlyOne)
53                 // 如果 onlyOne 为 true,就只尝试中断一次
54                 break;
55         }
56     } finally {
57         // 开释锁
58         mainLock.unlock();
59     }
60 }

8 tryTerminate 办法

在下面的实现中能够看到有多处调用到了 tryTerminate 办法,以此来判断以后线程池是否应该完结:

 1 /**
 2  * ThreadPoolExecutor:
 3  *(注:该办法放在最初再看比拟好)4  */
 5 final void tryTerminate() {6     for (; ;) {7         int c = ctl.get();
 8         /*
 9         <1> 如果以后线程池是 RUNNING 状态,就间接返回,因为这时候不须要完结线程池
10         <2> 如果以后线程池是 TIDYING 或 TERMINATED 状态,也间接返回,这时候就等着
11         批改状态的那个线程把 terminated 办法执行结束就行了
12         <3> 如果以后线程池是 SHUTDOWN 状态并且阻塞队列不为空,也间接返回,因为这时
13         候还是要去执行阻塞队列中的工作的,不能扭转线程池状态
14          */
15         if (isRunning(c) ||
16                 runStateAtLeast(c, TIDYING) ||
17                 (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
18             return;
19         /*
20         走到这里阐明有两种状况,要么以后线程池是 STOP 状态,要么以后线程池是 SHUTDOWN 状态并且阻塞队列为空
21         这个时候是否能够完结线程池还要查看一下以后的工作线程数,如果不为 0,阐明以后线程不是最初一个执行工作
22         的线程(因为如果以后要销毁的线程是闲暇状态,会最终在 getTask 办法中实现 - 1 的动作(执行时抛出异样会
23         在 processWorkerExit 办法中实现 -1),也就是说每个应该要销毁的闲暇线程在最初拿取不到工作时都会 - 1 的,24         所以如果发现当前工作线程数没有减到 0 的话,就阐明以后线程不是最初一个执行线程),那么就不会完结线程池
25(完结线程池的工作交给最初一个线程来做)。这里 ONLY_ONE 永远为 true,也就是说如果以后线程不是最初一个
26         执行工作的线程的话,那么就只是中断一个闲暇的线程而已(相当于中断本人),而后就间接返回就行了
27          */
28         if (workerCountOf(c) != 0) {29             interruptIdleWorkers(ONLY_ONE);
30             return;
31         }
32
33         /*
34         走到这里阐明当前工作线程数曾经为 0 了,也就是说以后线程是最初一个执行工作的线程,35         此时须要实现完结线程池的动作
36          */
37         final ReentrantLock mainLock = this.mainLock;
38         // 上锁
39         mainLock.lock();
40         try {
41             //CAS 将 ctl 状态改为 TIDYING
42             if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
43                 try {
44                     // 钩子办法,空实现
45                     terminated();
46                 } finally {
47                     // 在执行完 terminated 办法后,将线程池状态置为 TERMINATED
48                     ctl.set(ctlOf(TERMINATED, 0));
49                     /*
50                     可能在此之前某线程调用了 awaitTermination 办法,始终处在阻塞中,51                     并且没有超时,也没有产生中断。那么在完结线程池的此时就须要唤醒这些线程了
52                      */
53                     termination.signalAll();
54                 }
55                 return;
56             }
57         } finally {
58             // 开释锁
59             mainLock.unlock();
60         }
61         // 走到这里阐明之前的 CAS 将状态改为 TIDYING 失败了,那么就从头开始重试
62     }
63 }

更多内容请关注微信公众号:奇客工夫

正文完
 0