共计 23225 个字符,预计需要花费 59 分钟才能阅读完成。
什么是线程池?
为了避免频繁重复的创建和销毁线程,我们可以让这些线程进行复用,在线程池中,总会有活跃的线程在占用,但是线程池中也会存在没有占用的线程,这些线程处于空闲状态,当有任务的时候会从池子里面拿去一个线程来进行使用,当完成工作后,并没有销毁线程,而是将将线程放回到池子中去。
线程池主要解决两个问题:
一是当执行大量异步任务时线程池能够提供很好的性能。
二是线程池提供了一种资源限制和管理的手段,比如可以限制现成的个数,动态新增线程等。
-《Java 并发编程之美》
上面内容出自《Java 并发编程之美》这本书,第一个问题上面已经提到过,线程的频繁创建和销毁是很损耗性能的,但是线程池中的线程是可以复用的,可以较好的提升性能问题,线程池内部是采用了阻塞队列来维护 Runnable 对象。
原理分析
JDK 为我们封装了一套操作多线程的框架 Executors,帮助我们可以更好的控制线程池,Executors 下提供了一些线程池的工厂方法:
- newFixedThreadPool:返回固定长度的线程池,线程池中的线程数量是固定的。
- newCacheThreadPool:该方法返回一个根据实际情况来进行调整线程数量的线程池,空余线程存活时间是 60s
- newSingleThreadExecutor:该方法返回一个只有一个线程的线程池。
- newSingleThreadScheduledExecutor:该方法返回一个
SchemeExecutorService
对象,线程池大小为 1,SchemeExecutorService
接口在ThreadPoolExecutor
类和ExecutorService
接口之上的扩展,在给定时间执行某任务。 - newSchemeThreadPool:该方法返回一个
SchemeExecutorService
对象,可指定线程池线程数量。
对于核心的线程池来说,它内部都是使用了 ThreadPoolExecutor
对象来实现的,只不过内部参数信息不一样,我们先来看两个例子:nexFixedThreadPool
和 newSingleThreadExecutor
如下所示:
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
由上面的线程池的创建过程可以看到它们都是 ThreadPoolExecutor
的封装,接下来我们来看一下 ThreadPoolExecutor
的参数说明:
参数名称 | 参数描述 |
---|---|
corePoolSize | 指定线程池线程的数量 |
maximumPoolSize | 指定线程池中线程的最大数量 |
keepAliveTime | 当线程池线程的数量超过 corePoolSize 的时候,多余的空闲线程存活的时间,如果超过了 corePoolSize,在 keepAliveTime 的时间之后,销毁线程 |
unit | keepAliveTime 的单位 |
workQueue | 工作队列,将被提交但尚未执行的任务缓存起来 |
threadFactory | 线程工厂,用于创建线程,不指定为默认线程工厂 DefaultThreadFactory |
handler | 拒绝策略 |
其中 workQueue 代表的是提交但未执行的队列,它是 BlockingQueue 接口的对象,用于存放 Runable 对象,主要分为以下几种类型:
- 直接提交的队列:
SynchronousQueue
队列,它是一个没有容量的队列,前面我有对其进行讲解,当线程池进行入队 offer 操作的时候,本身是无容量的,所以直接返回 false,并没有保存下来,而是直接提交给线程来进行执行,如果没有空余的线程则执行拒绝策略。 - 有界的任务队列:可以使用
ArrayBlockingQueue
队列,因为它内部是基于数组来进行实现的,初始化时必须指定容量参数,当使用有界任务队列时,当有任务进行提交时,线程池的线程数量小于 corePoolSize 则创建新的线程来执行任务,当线程池的线程数量大于 corePoolSize 的时候,则将提交的任务放入到队列中,当提交的任务塞满队列后,如果线程池的线程数量没有超过 maximumPoolSize,则创建新的线程执行任务,如果超过了 maximumPoolSize 则执行拒绝策略。 - 无界的任务队列:可以使用
LinkedBlockingQueue
队列,它内部是基于链表的形式,默认队列的长度是Integer.MAX_VALUE
,也可以指定队列的长度,当队列满时进行阻塞操作,当然线程池中采用的是offer
方法并不会阻塞线程,当队列满时则返回 false,入队成功则则返回 true,当使用LinkedBlockingQueue
队列时,有任务提交到线程池时,如果线程池的数量小于 corePoolSize,线程池会产生新的线程来执行任务,当线程池的线程数量大于 corePoolSize 时,则将提交的任务放入到队列中,等待执行任务的线程执行完之后进行消费队列中的任务,若后续仍有新的任务提交,而没有空闲的线程时,它会不断往队列中入队提交的任务,直到资源耗尽。 - 优先任务队列:t 有限任务队列是带有执行优先级的队列,他可以使用
PriorityBlockingQueue
队列,可以控制任务的执行先后顺序,它是一个无界队列,该队列可以根据任务自身的优先级顺序先后执行,在确保性能的同时,也能有很好的质量保证。
上面讲解了关于线程池内部都是通过 ThreadPoolExecutor
来进行实现的,那么下面我以一个例子来进行源码分析:
public class ThreadPoolDemo1 {public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(5,
10,
60L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(5), new CustomThreadFactory());
for (int i = 0; i < 15; i++) {executorService.execute(() -> {
try {Thread.sleep(50000);
} catch (InterruptedException e) {e.printStackTrace();
}
System.out.println("由线程:" + Thread.currentThread().getName() + "执行任务完成");
});
}
}
}
上面定义了一个线程池,线程池初始化的 corePoolSize 为 5,也就是线程池中线程的数量为 5,最大线程 maximumThreadPoolSize 为 10,空余的线程存活的时间是 60s,使用 LinkedBlockingQueue 来作为阻塞队列,这里还发现我自定义了 ThreadFactory
线程池工厂,这里我真是针对线程创建的时候输出线程池的名称,源码如下所示:
/**
* 自定义的线程池构造工厂
*/
public class CustomThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public CustomThreadFactory() {SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
@Override
public Thread newThread(Runnable r) {String name = namePrefix + threadNumber.getAndIncrement();
Thread t = new Thread(group, r,
name,
0);
System.out.println("线程池创建,线程名称为:" + name);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
代码和 DefaultThreadFactory
一样,只是在 newThread
新建线程的动作的时候输出了线程池的名称,方便查看线程创建的时机,上面 main
方法中提交了 15 个任务,调用了 execute
方法来进行提交任务,在分析 execute
方法之前我们先了解一下线程的状态:
// 假设 Integer 类型是 32 位的二进制表示。// 高 3 位代表线程池的状态,低 29 位代表的是线程池的数量
// 默认是 RUNNING 状态,线程池的数量为 0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 线程个数位数,表示的 Integer 中除去最高的 3 位之后剩下的位数表示线程池的个数
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池的线程的最大数量
// 这里举例是 32 为机器,表示为 00011111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池的状态
// runState is stored in the high-order bits
//11100000000000000000000000000000
// 接受新任务并且处理阻塞队列里面任务
private static final int RUNNING = -1 << COUNT_BITS;
//00000000000000000000000000000000
// 拒绝新任务但是处理阻塞队列的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
//00100000000000000000000000000000
// 拒接新任务并且抛弃阻塞队列里面的任务,同时会中断正在处理的任务
private static final int STOP = 1 << COUNT_BITS;
//01000000000000000000000000000000
// 所有任务都执行完 (包括阻塞队列中的任务) 后当线程池活动线程数为 0,将要调用 terminated 方法。private static final int TIDYING = 2 << COUNT_BITS;
//01100000000000000000000000000000
// 终止状态,terminated 方法调用完成以后的状态
private static final int TERMINATED = 3 << COUNT_BITS;
通过上面内容可以看到 ctl 其实存放的是线程池的状态和线程数量的变量,默认是RUNNING
,也就是11100000000000000000000000000000
,这里我们来假设运行的机器上的 Integer 的是 32 位的,因为有些机器上可能 Integer 并不是 32 位,下面 COUNT_BITS 来控制位数,也就是先获取 Integer 在该平台上的位数,比如说是 32 位,然后 32 位 - 3 位 =29 位,也就是低 29 位代表的是现成的数量,高 3 位代表线程的状态,可以清晰看到下面的线程池的状态都是通过低位来进行向左位移的操作的,除了上面的变量,还提供了操作线程池状态的方法:
// 操作 ctl 变量,主要是进行分解或组合线程数量和线程池状态。// 获取高 3 位,获取线程池状态
private static int runStateOf(int c) {return c & ~CAPACITY;}
// 获取低 29 位,获取线程池中线程的数量
private static int workerCountOf(int c) {return c & CAPACITY;}
// 组合 ctl 变量,rs=runStatue 代表的是线程池的状态,wc=workCount 代表的是线程池线程的数量
private static int ctlOf(int rs, int wc) {return rs | wc;}
/*
* Bit field accessors that don't require unpacking ctl.
* These depend on the bit layout and on workerCount being never negative.
*/
// 指定的线程池状态 c 小于状态 s
private static boolean runStateLessThan(int c, int s) {return c < s;}
// 指定的线程池状态 c 至少是状态 s
private static boolean runStateAtLeast(int c, int s) {return c >= s;}
// 判断线程池是否运行状态
private static boolean isRunning(int c) {return c < SHUTDOWN;}
/**
* CAS 增加线程池线程数量.
*/
private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);
}
/**
* CAS 减少线程池线程数量
*/
private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);
}
/**
* 将线程池的线程数量进行较少操作,如果竞争失败直到竞争成功为止。*/
private void decrementWorkerCount() {do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
下来我们看一下 ThreadPoolExecutor
对象下的 execute
方法:
public void execute(Runnable command) {
// 判断提交的任务是不是为空,如果为空则抛出 NullPointException 异常
if (command == null)
throw new NullPointerException();
// 获取线程池的状态和线程池的数量
int c = ctl.get();
// 如果线程池的数量小于 corePoolSize,则进行添加线程执行任务
if (workerCountOf(c) < corePoolSize) {
// 添加线程修改线程数量并且将 command 作为第一个任务进行处理
if (addWorker(command, true))
return;
// 获取最新的状态
c = ctl.get();}
// 如果线程池的状态是 RUNNING,将命令添加到队列中
if (isRunning(c) && workQueue.offer(command)) {
// 二次检查线程池状态和线程数量
int recheck = ctl.get();
// 线程不是 RUNNING 状态,从队列中移除当前任务,并且执行拒绝策略。// 这里说明一点,只有 RUNNING 状态的线程池才会接受新的任务,其余状态全部拒绝。if (! isRunning(recheck) && remove(command))
reject(command);
// 如果线程池的线程数量为空时,代表线程池是空的,添加一个新的线程。else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果队列是满的,或者是 SynchronousQueue 队列时,则直接添加新的线程执行任务,如果添加失败则进行拒绝
// 可能线程池的线程数量大于 maximumPoolSize 则采取拒绝策略。else if (!addWorker(command, false))
reject(command);
}
通过分析 execute 方法总结以下几点:
- 当线程池中线程的数量小于
corePoolSize
时,直接添加线程到线程池并且将当前任务做为第一个任务执行。 - 如果线程池的状态的是
RUNNING
,则可以接受任务,将任务放入到阻塞队列中,内部进行二次检查,有可能在运行下面内容时线程池状态已经发生了变化,在这个时候如果线程池状态变成不是RUNNING
,则将当前任务从队列中移除,并且进行拒绝策略。 - 如果阻塞队列已经满了或者
SynchronousQueue
这种特殊队列无空间的时候,直接添加新的线程执行任务,当线程池的线程数量大于maximumPoolSize
时相应拒绝策略。 - 入队操作用的是
offer
方法,该方法不会阻塞队列,如果队列已经满时或超时导致入队失败,返回 false,如果入队成功返回 true。
针对上面例子源码我们来做一下分析,我们源码中阻塞队列采用的是 ArrayBlockingQueue
队列,并且指定队列的长度是 5,我们看下面提交的线程池的任务是 15 个,而且 corePoolSize 设置的是 5 个核心线程,最大线程数(maximumPoolSzie)是 10 个(包括核心线程数),假设所有任务都同时提交到了线程池中,其中有 5 个任务会被提交到线程中作为第一个任务进行执行,会有 5 个任务被添加到阻塞队列中,还有 5 个任务提交到到线程池中的时候发现阻塞队列已经满了,这时候会直接提交任务,发现当前线程数是 5 小于最大线程数,可以进行新建线程来执行任务。
1.png 部提交,因为我们在任务中添加了 Thread.sleep 睡眠一会,在 for 循环结束提交任务之后可能才会结束掉任务的睡眠执行任务后面内容,所以可以看做是全部提交任务,但是没有任务完成,如果有任务完成的话,可能就不会是触发最大的线程数,有可能就是一个任务完成后从队列取出来,然后另一个任务来的时候可以添加到队列中,上图中可以看到,有 5 个核心 core 线程在执行任务,任务队列中有 5 个任务在等待空余线程执行,而还有 5 个正在执行的线程,核心线程是指在 corePoolSize 范围的线程,而非核心线程指的是大于 corePoolSize 但是小于等于 MaximumPoolSize 的线程,就是这些非核心线程并不是一直存活的线程,它会跟随线程池指定的参数来进行销毁,我们这里指定了 60s 后如果没有任务提交,则会进行销毁操作,当然工作线程并不指定那些线程必须回收那些线程就必须保留,是根据从队列中获取任务来决定,如果线程获取任务时发现线程池中的线程数量大于 corePoolSize,并且阻塞队列中为空时,则阻塞队列会阻塞 60s 后如果还有没有任务就返回 false,这时候会释放线程,调用 processWorkerExit
来处理线程的退出,接下来我们来分析下 addWorker
都做了什么内容:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
// 获取线程池的状态和线程池线程的数量
int c = ctl.get();
// 单独获取线程池的状态
int rs = runStateOf(c);
// 检查队列是否只在必要时为空
if (rs >= SHUTDOWN && // 线程池的状态是 SHUTDOWN、STOP、TIDYING、TERMINATED
! (rs == SHUTDOWN && // 可以看做是 rs!=SHUTDOWN, 线程池状态为 STOP、TIDYING、TERMINATED
firstTask == null && // 可以看做 firstTask!=null,并且 rs=SHUTDOWN
! workQueue.isEmpty())) // 可以看做 rs=SHUTDOWN,并且 workQueue.isEmpty()队列为空
return false;
// 循环 CAS 增加线程池中线程的个数
for (;;) {
// 获取线程池中线程个数
int wc = workerCountOf(c);
// 如果线程池线程数量超过最大线程池数量,则直接返回
if (wc >= CAPACITY ||
// 如果指定使用 corePoolSize 作为限制则使用 corePoolSize,反之使用 maximumPoolSize,最为工作线程最大线程线程数量,如果工作线程大于相应的线程数量则直接返回。wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS 增加线程池中线程的数量
if (compareAndIncrementWorkerCount(c))
// 跳出增加线程池数量。break retry;
// 如果修改失败,则重新获取线程池的状态和线程数量
c = ctl.get(); // Re-read ctl
// 如果最新的线程池状态和原有县城出状态不一样时,则跳转到外层 retry 中,否则在内层循环重新进行 CAS
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 工作线程是否开始启动标志
boolean workerStarted = false;
// 工作线程添加到线程池成功与否标志
boolean workerAdded = false;
Worker w = null;
try {
// 创建一个 Worker 对象
w = new Worker(firstTask);
// 获取 worker 中的线程,这里线程是通过 ThreadFactory 线程工厂创建出来的,详细看下面源码信息。final Thread t = w.thread;
// 判断线程是否为空
if (t != null) {
// 添加独占锁,为添加 worker 进行同步操作,防止其他线程同时进行 execute 方法。final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 获取线程池的状态
int rs = runStateOf(ctl.get());
// 如果线程池状态为 RUNNING 或者是线程池状态为 SHUTDOWN 并且第一个任务为空时,当线程池状态为 SHUTDOWN 时,是不允许添加新任务的,所以他会从队列中获取任务。if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加 worker 到集合中
workers.add(w);
int s = workers.size();
// 跟踪最大的线程池数量
if (s > largestPoolSize)
largestPoolSize = s;
// 添加 worker 成功
workerAdded = true;
}
} finally {mainLock.unlock();
}
// 如果添加 worker 成功就启动任务
if (workerAdded) {t.start();
workerStarted = true;
}
}
} finally {
// 如果没有启动,w 不为空就已出 worker,并且线程池数量进行减少。if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
通过上面 addWorker
方法可以分为两个部分来进行讲解,第一部分是对线程池中线程数量的通过 CAS 的方式进行增加,其中第一部分中上面有个 if 语句,这个地方着重分析下:
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
可以看成下面的样子,将 !
放到括号里面,变成下面的样子:
if (rs >= SHUTDOWN &&
(rs != SHUTDOWN ||
firstTask != null ||
workQueue.isEmpty()))
return false;
-
线程池的状态是 SHUTDOWN、STOP、TIDYING、TERMINATED
- 当线程池状态是 STOP、TIDYING、TERMINATED 时,这些状态的时候不需要进行线程的添加和启动操作,因为如果是上面的状态,其实线程池的线程正在进行销毁操作,意味着线程调用了 shutdownNow 等方法。
- 如果线程池状态为 SHUTDOWN 并且第一个任务不为空时,不接受新的任务,直接返回 false,也就是说 SHUTDOWN 的状态,不会接受新任务,只会针对队列中未完成的任务进行操作。
- 当线线程池状态为 SHUTDOWN 并且队列为空时,直接返回不进行任务添加。
上半部分分为内外两个循环,外循环对线程池状态的判断,用于判断是否需要添加工作任务线程,通过上面讲的内容进行判断,后面内循环则是通过 CAS 操作增加线程数,如果指定了 core
参数为 true,代表线程池中线程的数量没有超过 corePoolSize
,当指定为 false 时,代表线程池中线程数量达到了corePoolSize
,并且队列已经满了,或者是SynchronousQueue
这种无空间的队列,但是还没有达到最大的线程池 maximumPoolSize
,所以它内部会根据指定的core
参数来判断是否已经超过了最大的限制,如果超过了就不能进行添加线程了,并且进行拒绝策略,如果没有超过就增加线程数量。
第二部分主要是把任务添加到 worker 中,并启动线程,这里我们先来看一下 Worker 对象。
// 这里发现它是实现了 AQS,是一个不可重入的独占锁模式
// 并且它还集成了 Runable 接口,实现了 run 方法。private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** 执行任务的线程,通过 ThreadFactory 创建 */
final Thread thread;
/** 初始化第一个任务 */
Runnable firstTask;
/** 每个线程完成任务的数量 */
volatile long completedTasks;
/**
* 首先现将 state 值设置为 -1,因为在 AQS 中 state= 0 代表的是锁没有被占用,而且在线程池中 shutdown 方法会判断能否争抢到锁,如果可以获得锁则对线程进行中断操作,如果调用了 shutdownNow 它会判断 state>= 0 会被中断。* firstTask 第一个任务,如果为空则会从队列中获取任务,后面 runWorker 中。*/
Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** 委托调用外部的 runWorker 方法 */
public void run() {runWorker(this);
}
// 是否独占锁
protected boolean isHeldExclusively() {return getState() != 0;
}
protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
// 这里就是上面 shutdownNow 中调用的线程中断的方法,getState()>=0
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {t.interrupt();
} catch (SecurityException ignore) {}}
}
}
可以看到 Worker 是一个实现了 AQS 的锁,它是一个不可重入的独占锁,并且他也实现了 Runnable
接口,实现了 run
方法,在构造函数中将 AQS 的 state
设置为 -1
,为了避免线程还没有进入runWorker
方法前,就调用了 shutdown
或shutdownNow
方法,会被中断,设置为 - 1 则不会被中断。后面我们看到 run
方法,它调用的是 ThreadPoolExecutor
的runWorker
方法,我们这里回想一下,在 addWorker
方法中,添加 worker
到HashSet<Worker>
中后,他会将 workerAdded
设置为 true,代表添加 worker
成功,后面有调用了下面代码:
if (workerAdded) {t.start();
workerStarted = true;
}
这个 t 代表的就是在 Worker 构造函数中的使用 ThreadFactory
创建的线程,并且将自己(Worker 自己)传递了当前线程,创建的线程就是任务线程,任务线程启动的时候会调用 Worker
下的 run
方法,run
方法内部又委托给外部方法 runWorker
来进行操作,它的参数传递的是调用者自己,Worker
中的 run
方法如下所示:
public void run() {runWorker(this); //this 指 Worker 对象本身
}
这里简单画一张图来表示下调用的逻辑。
2.png
整体的逻辑是先进行创建线程,线程将 Worker
设置为执行程序,并将线程塞到 Worker
中,然后再 addWorker 中将 Worker 中的线程取出来,进行启动操作,启动后他会调用 Worker 中的 run 方法,然后 run 方法中将调用 ThreadPoolExecutor 的 runWorker,然后 runWorker 又会调用 Worker 中的任务 firstTask,这个 fistTask 是要真正执行的任务,也是用户自己实现的代码逻辑。
接下来我们就要看一下 runWorker 方法里面具体内容:
final void runWorker(Worker w) {
// 调用者也就是 Worker 中的线程
Thread wt = Thread.currentThread();
// 获取 Worker 中的第一个任务
Runnable task = w.firstTask;
// 将 Worker 中的任务清除代表执行了第一个任务了,后面如果再有任务就从队列中获取。w.firstTask = null;
// 这里还记的我们在 new Worker 的时候将 AQS 的 state 状态设置为 -1,这里先进行解锁操作,将 state 设置为 0
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循环进行获取任务,如果第一个任务不为空,或者是如果第一个任务为空,从任务队列中获取任务,如果有任务则返回获取的任务信息,如果没有任务可以获取则进行阻塞,阻塞也分两种第一种是阻塞直到任务队列中有内容,第二种是阻塞队列一定时间之后还是没有任务就直接返回 null。while (task != null || (task = getTask()) != null) {
// 先获取 worker 的独占锁,防止其他线程调用了 shutdown 方法。w.lock();
// 如果线程池正在停止,确保线程是被中断的,如果没有则确保线程不被中断操作。if ((runStateAtLeast(ctl.get(), STOP) || // 如果线程池状态为 STOP、TIDYING、TERMINATED 直接拒绝任务中断当前线程
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 执行任务之前做一些操作,可进行自定义
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 运行任务在这里喽。task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
} finally {
// 执行任务之后做一些操作,可进行自定义
afterExecute(task, thrown);
}
} finally {
// 将任务清空为了下次任务获取
task = null;
// 统计当前 Worker 完成了多少任务
w.completedTasks++;
// 独占锁释放
w.unlock();}
}
completedAbruptly = false;
} finally {
// 处理 Worker 的退出操作,执行清理工作。processWorkerExit(w, completedAbruptly);
}
}
我们看到如果 Worker 是第一次被启动,它会从 Worker 中获取 firstTask 任务来执行,然后执行成功后,它会 getTask()来从队列中获取任务,这个地方比较有意思,它是分情况进行获取任务的,我们都直到 BlockingQueue 中提供了几种从队列中获取的方法,这个 getTask 中使用了两种方式,第一种是使用 poll 进行获取队列中的信息,它采用的是过一点时间如果队列中仍没有任务时直接返回 null,然后还有一个就是 take 方法,take 方法是如果队列中没有任务则将当前线程进行阻塞,等待队列中有任务后,会通知等待的队列线程进行消费任务,让我们看一下 getTask 方法:
private Runnable getTask() {
boolean timedOut = false; //poll 获取超时
for (;;) {
// 获取线程池的状态和线程数量
int c = ctl.get();
// 获取线程池的状态
int rs = runStateOf(c);
// 线程池状态大于等于 SHUTDOWN
//1. 线程池如果是大于 STOP 的话减少工作线程池数量
//2. 如果线程池状态为 SHUTDOW 并且队列为空时,代表队列任务已经执行完,返回 null,线程数量减少 1
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();
return null;
}
// 获取线程池数量。int wc = workerCountOf(c);
// 如果 allowCoreThreadTimeOut 为 true,则空闲线程在一定时间未获得任务会清除
// 或者如果线程数量大于 corePoolSize 的时候会进行清除空闲线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//1. 如果线程池数量大于最大的线程池数量或者对(空余线程进行清除操作并且 poll 超时了,意思是队列中没有内容了,导致 poll 间隔一段时间后没有获取内容超时了。//2. 如果线程池的数量大于 1 或者是队列已经是空的
// 总之意思就是当线程池的线程池数量大于 corePoolSize,或指定了 allowCoreThreadTimeOut 为 true,当队列中没有数据或者线程池数量大于 1 的情况下,尝试对线程池的数量进行减少操作,然后返回 null,用于上一个方法进行清除操作。if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果 timed 代表的是清除空闲线程的意思
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 等待一段时间如果没有获取到返回 null。workQueue.take(); // 阻塞当前线程
// 如果队列中获取到内容则返回
if (r != null)
return r;
// 如果没有获取到超时了则设置 timeOut 状态
timedOut = true;
} catch (InterruptedException retry) {timedOut = false;}
}
}
- 工作线程调用 getTask 从队列中进行获取任务。
- 如果指定了 allowCoreThreadTimeOut 或线程池线程数量大于 corePoolSize 则进行清除空闲多余的线程,调用阻塞队列的 poll 方法,在指定时间内如果没有获取到任务直接返回 false。
- 如果线程池中线程池数量小于 corePoolSize 或者 allowCoreThreadTimeOut 为 false 默认值,则进行阻塞线程从队列中获取任务,直到队列有任务唤醒线程。
我们还记得第一张图中有标记出来是 core 线程和普通线程,其实这样标记不是很准确,准确的意思是如果线程池的数量超过了 corePoolSize 并且没有特别指定 allowCoreThreadTimeOut 的情况下,它会清除掉大于 corePoolSize 并且小于等于 maximumPoolSize 的一些线程,标记出 core 线程的意思是有 corePoolSize 不会被清除,但是会清除大于 corePoolSize 的线程,也就是线程池中的线程对获取任务的时候进行判断,也就是 getTask 中进行判断,如果当前线程池的线程数量大于 corePoolSize 就使用 poll 方式获取队列中的任务,当过一段时间还没有任务就会返回 null,返回 null 之后设置 timeOut=true,并且获取 getTask 也会返回 null,到此会跳到调用者 runWorker 方法中,一直在 while (task != null || (task = getTask()) != null)
此时的 getTask 返回 null 跳出 while 循环语句,设置 completedAbruptly = false,表示不是突然完成的而是正常完成,退出后它会执行 finally 的processWorkerExit(w, completedAbruptly)
,执行清理工作。我们来看下源码:
private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // 如果突然完成则调整线程数量
decrementWorkerCount(); // 减少线程数量 1
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 获取锁, 同时只有一个线程获得锁
try {
completedTaskCount += w.completedTasks; // 统计整个线程池完成的数量
workers.remove(w); // 将完成任务的 worker 从 HashSet 中移除
} finally {mainLock.unlock(); // 释放锁
}
// 尝试设置线程池状态为 TERMINATED
//1. 如果线程池状态为 SHUTDOWN 并且线程池线程数量与工作队列为空时,修改状态。//2. 如果线程池状态为 STOP 并且线程池线程数量为空时,修改状态。tryTerminate();
// 获取线程池的状态和线程池的数量
int c = ctl.get();
// 如果线程池的状态小于 STOP,也就是 SHUTDOWN 或 RUNNING 状态
if (runStateLessThan(c, STOP)) {
// 如果不是突然完成,也就是正常结束
if (!completedAbruptly) {// 如果指定 allowCoreThreadTimeOut=true(默认 false)则代表线程池中有空余线程时需要进行清理操作,否则线程池中的线程应该保持 corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 这里判断如果线程池中队列为空并且线程数量最小为 0 时,将最小值调整为 1,因为队列中还有任务没有完成需要增加队列,所以这里增加了一个线程。if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 如果当前线程数效益核心个数,就增加一个 Worker
addWorker(null, false);
}
通过上面的源码可以得出,如果线程数超过核心线程数后,在 runWorker
中就不会等待队列中的消息,而是会进行清除操作,上面的清除代码首先是先对线程池的数量进行较少操作,其次是统计整个线程池中完成任务的数量,然后就是尝试修改线程池的状态由 SHUTDOWN->TIDYING->TERMINATED
或者是由STOP->TIDYING->TERMINATED
,修改线程池状态为TERMINATED
,需要有两个条件:
- 当线程池线程数量和工作队列为空,并且线程池的状态为
SHUTDOWN
时,才会将状态进行修改,修改的过程是SHUTDOWN->TIDYING->TERMINATED
- 当线程池的状态为
STOP
并且线程池数量为空时,才会尝试修改状态,修改过程是STOP->TIDYING->TERMINATED
如果设置为 TERMINATED
状态,还需要调用条件变量 termination
的signalAll()
方法来唤醒所有因为调用 awaitTermination
方法而被阻塞的线程,换句话说当调用 awaitTermination
后,只有线程池状态变成 TERMINATED 才会被唤醒。
接下来我们就来分析一下这个 tryTerminate
方法,看一下他到底符不符合我们上述说的内容:
final void tryTerminate() {for (;;) {
// 获取线程池的状态和线程池的数量组合状态
int c = ctl.get();
// 这里单独下面进行分析,这里说明两个问题,需要反向来想这个问题。//1. 如果线程池状态 STOP 则不进入 if 语句
//2. 如果线程池状态为 SHUTDOWN 并且工作队列为空时,不进入 if 语句
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果线程池数量不为空时,进行中断操作。if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 修改状态为 TIDYING,并且将线程池的数量进行清空
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 执行一些逻辑,默认是空的
terminated();} finally {
// 修改状态为 TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
// 唤醒调用 awaitTermination 方法的线程
termination.signalAll();}
return;
}
} finally {mainLock.unlock();
}
// else retry on failed CAS
}
我们单独将上面的 if 语句摘出来进行分析,将上面的第一个 if 判断进行修改如下,可以看到 return 在 else 里面,这时候内部 if 判断进行转换,转换成如下所示:
if (!isRunning(c) &&
!runStateAtLeast(c, TIDYING) && // 只能是 SHUTDOWN 和 STOP
(runStateOf(c) != SHUTDOWN || workQueue.isEmpty())){// 这里执行逻辑}else {return;}
逐一分析分析内容如下:
-
!isRunning(c)
代表不是 RUNNING,则可能的是SHUTDOWN
,STOP
,TIDYING
,TERMINATED
这四种状态 - 中间的连接符是并且的意思,跟着
runStateAtLeast(c, TIDYING)
这句话的意思是至少是TIDYING
,TERMINATED
这两个,反过来就是可能是RUNNING
,SHUTDOWN
,STOP
,但是前面已经判断了不能是RUNINNG
状态,所以前面两个连在一起就是只能是状态为SHUTDOWN
,STOP
-
runStateOf(c) != SHUTDOWN || workQueue.isEmpty()
当前面的状态是SHUTDOWN
时,则会出发workQueue.isEmpty()
, 连在一起就是状态是SHUTDOWN
并工作队列为空,当线程池状态为STOP
时,则会进入到runStateOf(c) != SHUTDOWN
,直接返回 true,就代表线程池状态为STOP
后面还有一个语句一个 if 语句将其转换一下逻辑就是下面的内容:
if (workerCountOf(c) == 0) {// 执行下面的逻辑}else{interruptIdleWorkers(ONLY_ONE);
return;
}
这里我们也进行转换下,就可以看出来当线程池的数量为空时,才会进行下面的逻辑,下面的逻辑就是修改线程池状态为 TERMINATED
,两个连在一起就是上面分析的修改状态为TERMINATED
的条件,这里画一张图来表示线程池状态的信息:
3.png
其实上面图中我们介绍了关于从 SHUTDOWN
或STOP
到 TERMINATED
的变化,没有讲解关于如何从 RUNNING
状态转变成 SHUTDOWN
或STOP
状态,其实是调用了 shutdown()
或shutdownNow
方法对其进行状态的变换,下面来看一下 shutdown
方法源码:
public void shutdown() {
// 获取全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 权限检查
checkShutdownAccess();
// 设置线程池状态为 SHUTDOWN,如果状态已经是大于等于 SHUTDOWN 则直接返回
advanceRunState(SHUTDOWN);
// 如果线程没有设置中断标识并且线程没有运行则设置中断标识
interruptIdleWorkers();
// 空的可以实现的内容
onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();
}
// 尝试修改线程池状态为 TERMINATED
tryTerminate();}
- 首先对当前线程进行权限检测,查看是否设置了安全管理器,如果设置了则要看当前调用 shutdown 的线程有没有权限都关闭线程的权限,如果有权限还要看是否有中断工作现成的权限,如果没有权限则抛出
SecurityException
或NullPointException
异常。 - 设置线程池状态为 SHUTDOWN,如果状态已经是大于等于 SHUTDOWN 则直接返回
- 如果线程没有设置中断标识并且线程没有运行则设置中断标识
- 尝试修改线程池状态为 TERMINATED
接下来我们来看一下 advanceRunState
内容如下所示:
private void advanceRunState(int targetState) {for (;;) {
// 获取线程池状态和线程池的线程数量
int c = ctl.get();
if (runStateAtLeast(c, targetState) || // 如果线程池的状态 >=SHUTDOWN
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) // 设置线程池状态为 SHUTDOWN
// 返回
break;
}
}
- 当线程池的状态 >=SHUTDOWN,直接返回
- 如果线程池状态为 RUNNING,设置线程池状态为 SHUTDOWN,设置成功则返回
interruptIdleWorkers
代码如下所示:
private void interruptIdleWorkers() {interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
// 获取全局锁,同时只能有一个线程能够调用 shutdown 方法
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍历工作线程
for (Worker w : workers) {
Thread t = w.thread;
// 如果当前线程没有设置中断标志并且可以获取 Worker 自己的锁
if (!t.isInterrupted() && w.tryLock()) {
try {
// 设置中断标志
t.interrupt();} catch (SecurityException ignore) { } finally {w.unlock();
}
}
// 执行一次,清理空闲线程。if (onlyOne)
break;
}
} finally {mainLock.unlock();
}
}
我们看到当我们调用 shutdown 方法的时候,只是将空闲的线程给设置了中断标识,也就是活跃正在执行任务的线程并没有设置中断标识,直到将任务全部执行完后才会逐步清理线程操作,我们还记的在 getTask 中的方法里面有这样一段代码:
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();
return null;
}
判断是否是状态 >=SHUTDOWN,并且队列为空时,将线程池数量进行减少操作,内部进行 CAS 操作,直到 CAS 操作成功为止,并且返回 null,返回 null 后,会调用 processWorkerExit(w, false);
清理 Workers 线程信息,并且尝试将线程设置为 TERMINATED
状态,上面是对所有 shutdown
方法的分析,下面来看一下 shutdownNow
方法并且比较两个之间的区别:
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 权限检查
checkShutdownAccess();
// 设置线程池状态为 STOP,如果状态已经是大于等于 STOP 则直接返回
advanceRunState(STOP);
// 这里是和 SHUTDOWN 区别的地方,这里是强制进行中断操作
interruptWorkers();
// 将为完成任务复制到 list 集合中
tasks = drainQueue();} finally {mainLock.unlock();
}
// 尝试修改线程池状态为 TERMINATED
tryTerminate();
return tasks;
}
shutdownNow
方法返回了未完成的任务信息列表 tasks = drainQueue();
,其实该方法和shutdown
方法主要的区别在于一下几点内容:
-
shutdownNow
方法将线程池状态设置为STOP
,而shutdown
则将状态修改为SHUTDOWN
-
shutdownNow
方法将工作任务进行中断操作,也就是说如果工作线程在工作也会被中断,而shutdown
则是先尝试获取锁如果获得锁成功则进行中断标志设置,也就是中断操作,如果没有获取到锁则等待进行完成后自动退出。 -
shutdownNow
方法返回未完成的任务列表。
下面代码是 shutDownNow
的interruptWorkers
方法:
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {for (Worker w : workers)
// 直接进行中断操作。w.interruptIfStarted();} finally {mainLock.unlock();
}
}
内部调用了 Worker
的interruptIfStarted
方法,方法内部是针对线程进行中断操作,但是中断的前提条件是 AQS 的 state 状态必须大于等于 0,如果状态为 - 1 的则不会被中断,但是如果任务运行起来的时候在 runWorker
中则不会执行任务,因为线程池状态为STOP
,如果线程池状态为 STOP 则会中断线程,下面代码是 Worker 中的interruptIfStarted
:
void interruptIfStarted() {
Thread t;
// 当前 Worker 锁状态大于等于 0 并且线程没有被中断
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {t.interrupt();
} catch (SecurityException ignore) {}}
}
拒绝策略
JDK 内置的拒绝策略如下:
- AbortPolicy 策略:该策略会直接抛出异常,阻止系统正常工作
- CallerRunsPolicy 策略:只要线程池没有关闭线程池状态是 RUNNING 状态,该略略直接调用线程中运行当前被丢弃的任务
- DiscardOledestPolicy 策略:该策略将丢弃最老的一个请求,也就是即将被执行的第一个任务,并尝试再次提交任务
- DiscardPolicy 策略:该策略默默丢弃无法处理的任务,不予任何处理。
5.png
总结
首先先上一张图,针对这张图来进行总结:
4.png
- 主线程进行线程池的调用,线程池执行 execute 方法
- 线程池通过
addWorker
进行创建线程,并将线程放入到线程池中,这里我们看到第二步是将线程添加到核心线程中,其实线程池内部不分核心线程和非核心线程,只是根据 corePoolSize 和 maximumPoolSize 设置的大小来进行区分,因为超过 corePoolSize 的线程会被回收,至于回收那些线程,是根据线程获取任务的时候进行判断,当前线程池数量大于 corePoolSize,或者指定了allowCoreThreadTimeOut
为 true,则他等待一定时间后会返回,不会一直等待 - 当线程池的数量达到 corePoolSize 时,线程池首先会将任务添加到队列中
- 当队列中任务也达到了队列设置的最大值时,它会创建新的线程,注意的是此时的线程数量已经超过了 corePoolSize,但是没有达到 maximumPoolSize 最大值。
- 当线程池的线程数量达到了 maximumPoolSize,则会相应拒绝策略。