共计 30296 个字符,预计需要花费 76 分钟才能阅读完成。
上一篇文章介绍了 Thread 类,可知线程随着工作的执行完结而被销毁。然而,因为线程的创立与销毁操作波及到零碎调用,开销较大,因而须要将线程的生命周期与工作进行解耦。应用线程池来治理线程,能够无效地反复利用线程来执行工作。本文将介绍线程池最根底的实现类 ThreadPoolExecutor。
本文基于 jdk1.8.0_91
1. 线程池体系
类型 名称 形容
接口 Executor 最上层的接口,提供了工作提交的根底办法
接口 ExecutorService 继承了 Executor 接口,扩大了提交工作、获取异步工作执行后果、线程池销毁等办法
接口 ScheduledExecutorService 继承了 ExecutorService 接口,减少了提早执行工作、定时执行工作的办法
抽象类 AbstractExecutorService 提供了 ExecutorService 接口的默认实现,提供 newTaskFor 办法将工作转换为 RunnableFuture,以便提交给 Executor 执行
实现类 ThreadPoolExecutor 根底、规范的线程池实现
实现类 ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor,实现了 ScheduledExecutorService 中相干提早工作、定时工作的办法
实现类 ForkJoinPool JDK7 退出的线程池,是 Fork/Join 框架的外围实现,只容许执行 ForkJoinTask 工作
一般类 Executors 创立各种线程池的工具类
线程池能够解决两个问题:
- 缩小零碎因为频繁创立和销毁线程所带来的开销。
- 主动治理线程和分配资源,应用方只需提交工作即可。
2. 构造方法
JDK 中倡议应用较为不便的 Executors 工厂办法,它们均为大多数应用场景预约义了设置:
- Executors.newCachedThreadPool():无界限程池,能够进行自动线程回收
- Executors.newFixedThreadPool(int):固定大小的线程池
- Executors.newSingleThreadExecutor():单个后盾线程的线程池
- Executors.newScheduledThreadPool():执行定时工作的线程池
- Executors.newWorkStealingPool(int):反对并行执行的线程池
阿里 Java 开发手册 对线程池的应用进行了限度,可作参考:
【强制】线程资源必须通过线程池提供,不容许在利用中自行显式创立线程。
阐明:应用线程池的益处是缩小在创立和销毁线程上所花的工夫以及系统资源的开销,解决资源有余的问题。如果不应用线程池,有可能造成零碎创立大量同类线程而导致耗费完内存或者“适度切换”的问题。【强制】线程池不容许应用 Executors 去创立,而是通过 ThreadPoolExecutor 的形式,这样的解决形式让写的同学更加明确线程池的运行规定,躲避资源耗尽的危险。
阐明:Executors 返回的线程池对象的弊病如下:
1)FixedThreadPool 和 SingleThreadPool: 容许的申请队列长度为 Integer.MAX_VALUE,可能会沉积大量的申请,从而导致 OOM。
2)CachedThreadPool 和 ScheduledThreadPool: 容许的创立线程数量为 Integer.MAX_VALUE,可能会创立大量的线程,从而导致 OOM。
2.1 源码剖析
Executors 外部都是调用 ThreadPoolExecutor 的构造方法来实现的:
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize 外围线程数量
* @param maximumPoolSize 总的线程数量
* @param keepAliveTime 闲暇线程的存活工夫
* @param unit keepAliveTime 的单位
* @param workQueue 工作队列, 保留曾经提交但尚未被执行的线程
* @param threadFactory 线程工厂(用于指定如何创立一个线程)* @param handler 回绝策略 (当工作太多导致工作队列满时的解决策略)
*/
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
2.2 应用阐明
Java 官网对 ThreadPoolExecutor 的应用阐明:
2.2.1 Core and maximum pool sizes
ThreadPoolExecutor 将依据 corePoolSize 和 maximumPoolSize 主动调整线程池中的线程数量,以及判断工作是否进行排队。
当提交新工作时:
- 如果运行的线程少于 corePoolSize,则创立新线程来解决申请,即便存在工作线程是闲暇的,也不会进行排队。
-
如果运行的线程多于 corePoolSize 而少于 maximumPoolSize:
- 如果队列未满,则会把新提交的工作退出队列,不创立新的线程。
- 如果队列已满,并且运行线程数小于 maximumPoolSize,也会创立新的线程来执行工作。
- 如果设置的 corePoolSize 和 maximumPoolSize 雷同,则创立了固定大小的线程池。
-
如果线程数大于 maximumPoolSize,新提交的工作将会依据回绝策略来解决。
- 如果将 maximumPoolSize 设置为根本的无界值(如 Integer.MAX_VALUE),则容许池适应任意数量的并发工作。
在大多数状况下,corePoolSize 和 maximumPoolSize 是通过构造函数来设置的,不过也能够应用 ThreadPoolExecutor 的 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动静更改。留神,外围线程和非核心线程只是一种逻辑上的辨别,线程池中只有一种类型的线程,称为工作线程(Worker)。
2.2.2 On-demand construction(按需结构)
默认状况下,外围线程只是在新工作达到时才创立和启动的。
能够应用办法 prestartCoreThread() 或 prestartAllCoreThreads() 对其进行动静重写。
个别在结构带有非空队列的线程池时,会心愿先启动线程。
2.2.3 Creating new threads
应用 ThreadFactory 来创立新线程。
在 Executors 中默认应用 Executors.defaultThreadFactory() 创立线程,并且这些线程具备雷同的分组和优先级,且都是非守护线程。
如果 ThreadFactory 创立新线程失败,此时线程池能够持续运行,但不能执行任何工作。
2.2.4 Keep-alive times
默认状况下,放弃流动策略只利用在非核心线程上,即当线程的闲暇工夫超过 keepAliveTime 时将会终止。
若应用 allowCoreThreadTimeOut(boolean) 办法则会把保活策略也利用于外围线程。
当线程池处于非沉闷状态时,能够缩小资源耗费。如果线程从新变得沉闷,则会创立新的线程。
能够应用办法 setKeepAliveTime(time, timeUnit) 动静地更改此参数,若入参应用 Long.MAX_VALUE 和 TimeUnit.NANOSECONDS 则不会终止闲暇线程,除非线程池敞开。
2.2.5 Queuing
所有 BlockingQueue 都可用于传输和放弃提交的工作。当线程池中的线程数量大于 corePoolSize 而少于 maximumPoolSize 时,新工作会退出同步队列。
排队有三种通用策略:
-
间接提交(Direct handoffs)
- 默认应用传递队列 SynchronousQueue,该队列会将工作间接传递给线程,而不会保留工作。
- 如果以后没有可用线程,则会创立新的线程。通常须要无界的最大线程数(maximumPoolSize)以防止回绝新提交的工作。
- 极其状况下会因为创立过多线程而耗尽 CPU 和内存资源。
-
无界队列(Unbounded queues)
- 应用无界队列(如 LinkedBlockingQueue)作为期待队列,当所有的外围线程都在解决工作时,新提交的工作都会进入队列期待。
- 线程数量不会超过 corePoolSize,也就是 maximumPoolSize 的值有效。
- 当每个工作齐全独立于其余工作,即工作执行互不影响时,适宜于应用无界队列。
- 极其状况下会因为存储过多任务而耗尽内存资源。
-
有界队列(Bounded queues)
- 应用有界队列(如 ArrayBlockingQueue)作为期待队列,有助于避免资源耗尽,然而可能较难调整和管制队列大小和线程池大小(maximumPoolSize)。
- 应用大的队列和小的线程数能够缩小 CPU 使用率、系统资源和上下文切换的开销,然而会导致吞吐量变低。
- 应用小的队列通常须要更多的线程数,这样能够最大化 CPU 使用率,但可能会须要更大的调度开销,从而升高吞吐量。
2.2.6 Rejected tasks(回绝策略)
当线程池已敞开,或者线程池中的线程数量和队列容量已饱和时,持续提交新工作会被回绝,会触发 RejectedExecutionHandler#rejectedExecution 办法。
ThreadPoolExecutor 定义了四种回绝策略:
- AbortPolicy:默认策略,在须要回绝工作时抛出 RejectedExecutionException;
- CallerRunsPolicy:由提交工作的线程自行执行工作,以此减缓新工作的提交速度。如果线程池曾经敞开,工作将被抛弃;
- DiscardPolicy:间接抛弃工作;
- DiscardOldestPolicy:抛弃在队列中等待时间最长的工作,并执行以后提交的工作,如果线程池曾经敞开,工作将被抛弃。
能够自定义 RejectedExecutionHandler 类来实现回绝策略。须要留神的是,回绝策略的运行须要指定线程池和队列的容量。
2.2.7 Hook methods(钩子办法)
ThreadPoolExecutor 中提供 beforeExecute 和 afterExecute 办法,在执行每个工作之前和之后调用。提供 terminated 办法,在线程池敞开之前做一些收尾工作。
如果钩子办法抛出异样,则外部工作线程将顺次失败并终止。
2.2.8 Queue maintenance(队列保护)
办法 getQueue() 容许出于监控和调试目标而拜访工作队列。强烈拥护出于其余任何目标而应用此办法。
remove() 和 purge() 这两种办法可用于在勾销大量已排队工作时帮忙进行存储回收。
2.2.9 Finalization(终止)
当线程池的援用变为不可达,并且线程池中没有遗留的线程(通过设置 allowCoreThreadTimeOut 把非流动的外围线程销毁),此时线程池会主动 shutdown。
3. 属性
3.1 线程池状态
ThreadPoolExecutor 中应用一个 AtomicInteger 类型的变量 ctl 来治理线程池。
其中,低 29 位保留线程数,高 3 位保留线程池状态。
线程池中最大的线程数为 2^29-1。
/**
* The main pool control state, ctl, is an atomic integer packing
* two conceptual fields
* workerCount, indicating the effective number of threads // 工作线程数量
* runState, indicating whether running, shutting down etc // 线程池运行状态
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 32 - 3 = 29
// 最大线程数: 2^29-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 0001 1111 1111 1111 1111 1111 1111 1111
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; // 高三位 111
private static final int SHUTDOWN = 0 << COUNT_BITS; // 高三位 000
private static final int STOP = 1 << COUNT_BITS; // 高三位 001
private static final int TIDYING = 2 << COUNT_BITS; // 高三位 010
private static final int TERMINATED = 3 << COUNT_BITS; // 高三位 011
// Packing and unpacking ctl
private static int runStateOf(int c) {return c & ~CAPACITY;} // 运行状态
private static int workerCountOf(int c) {return c & CAPACITY;} // 运行的工作线程数
private static int ctlOf(int rs, int wc) {return rs | wc;} // 封装运行状态和工作线程
ThreadPoolExecutor 一共定义了 5 种线程池状态:
- RUNNING:能够接管新的工作和队列工作
- SHUTDOWN:不接管新的工作,然而会解决队列里的工作
- STOP:不接管新工作,也不会解决队列里的工作,并且中断正在运行的工作
- TIDYING:所有工作都曾经终止,workerCount 为 0,当池状态为 TIDYING 时将会调用 terminated() 办法解决收尾工作。
- TERMINATED:阐明 terminated() 办法实现执行。
线程池状态的转移:
- RUNNING -> SHUTDOWN
On invocation of shutdown(), perhaps implicitly in finalize() - (RUNNING or SHUTDOWN) -> STOP
On invocation of shutdownNow() - SHUTDOWN -> TIDYING
When both queue and pool are empty - STOP -> TIDYING
When pool is empty - TIDYING -> TERMINATED
When the terminated() hook method has completed
3.2 工作线程 Worker
// 工作线程汇合
private final HashSet<Worker> workers = new HashSet<Worker>();
// 操作 workers 汇合应用到的锁
private final ReentrantLock mainLock = new ReentrantLock();
在线程池中具备一个 Worker 汇合,一个 Worker 对应一个工作线程。当线程池启动时,对应的 Worker 会执行池中的工作,执行结束后从阻塞队列里获取一个新的工作继续执行。
持有 mainLock 时能力操作 Worker 汇合,Java 官网对应用 mainLock 而不是并发汇合的阐明:
- 在线程池敞开期间,可能串行地查看并中断 Worker 汇合中的线程,防止中断风暴(interrupt storms)。
- 可能简化一些统计操作,如 largestPoolSize。
Worker 是 ThreadPoolExecutor 的外部类,其继承体系如下:
- 继承了 AQS 抽象类,实现了不可重入的互斥锁。工作线程在执行工作时需持有 Worker 锁,每个工作线程之间持有的锁对象不同。
- 实现了 Runnable 接口,也就是说 Worker 自身也作为一个线程工作执行。
Worker 应用 AQS 中的 state 属性示意是否持有锁:
- -1: 初始状态
- 0: 无锁状态
- 1: 加锁状态
Worker 开始工作时,会先执行 unlock() 办法设置 state 为 0,后续再应用 CAS 对 state 来加锁。具体见 ThreadPoolExecutor#runWorker。
留神,线程池中的工作线程在逻辑上分为外围线程和非核心线程,然而在 Worker 类中并没有相干属性标记以后线程是否是外围线程!
而是在运行期间动静指定的:
- ThreadPoolExecutor#execute 提交工作时,调用 addWorker 新增工作线程,若入参 core 传入 true 只是用于校验外围线程数量 corePoolSize 是否非法,并不代表该线程始终都是外围线程。
- ThreadPoolExecutor#getTask 获取工作时,如果 workerCount > corePoolSize 成立,则阐明以后线程要以非核心线程的规定来从队列中拉取工作(keepAliveTime 工夫内拉取不到工作,线程会被销毁),不论该线程在 addWorker 创立时是否指定 core 为 true.
这样设计的目标,只是为了动静维持线程池中的外围线程数量不超过 corePoolSize,是一种涣散的管制。
java.util.concurrent.ThreadPoolExecutor.Worker
private final class Worker // 不可重入的互斥锁
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread; // 工作线程
/** Initial task to run. Possibly null. */
Runnable firstTask; // 初始运行工作
/** Per-thread task counter */
volatile long completedTasks; // 工作实现计数
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {return getState() != 0;
}
protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {t.interrupt();
} catch (SecurityException ignore) {}}
}
}
3.3 线程工厂 ThreadFactory
ThreadPoolExecutor 具备属性 threadFactory 示意线程工厂。
/**
* Factory for new threads. All threads are created using this
* factory (via method addWorker). All callers must be prepared
* for addWorker to fail, which may reflect a system or user's
* policy limiting the number of threads. Even though it is not
* treated as an error, failure to create threads may result in
* new tasks being rejected or existing ones remaining stuck in
* the queue.
*
* We go further and preserve pool invariants even in the face of
* errors such as OutOfMemoryError, that might be thrown while
* trying to create threads. Such errors are rather common due to
* the need to allocate a native stack in Thread.start, and users
* will want to perform clean pool shutdown to clean up. There
* will likely be enough memory available for the cleanup code to
* complete without encountering yet another OutOfMemoryError.
*/
private volatile ThreadFactory threadFactory;
Worker 中蕴含属性 thread 示意工作线程,在 Worker 构造函数中通过线程工厂来创立线程(即 Thread#new,留神不能启动线程!)。
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
在 Executors 工具类中定义了外部类 DefaultThreadFactory 作为默认的线程工厂,用于对立设置线程信息:
- 若存在 SecurityManager,则线程分组为 System#getSecurityManager,否则与调用 defaultThreadFactory 办法的线程所属分组雷同;
- 线程优先级均为 Thread.NORM_PRIORITY(处于最小和最大之间);
- 线程命名对立为 pool-N-thread-M 格局,其中 N 是此工厂的序列号,M 是此工厂所创立线程的序列号。
java.util.concurrent.Executors#defaultThreadFactory
/**
* Returns a default thread factory used to create new threads.
* This factory creates all new threads used by an Executor in the
* same {@link ThreadGroup}. If there is a {@link
* java.lang.SecurityManager}, it uses the group of {@link
* System#getSecurityManager}, else the group of the thread
* invoking this {@code defaultThreadFactory} method. Each new
* thread is created as a non-daemon thread with priority set to
* the smaller of {@code Thread.NORM_PRIORITY} and the maximum
* priority permitted in the thread group. New threads have names
* accessible via {@link Thread#getName} of
* <em>pool-N-thread-M</em>, where <em>N</em> is the sequence
* number of this factory, and <em>M</em> is the sequence number
* of the thread created by this factory.
* @return a thread factory
*/
public static ThreadFactory defaultThreadFactory() {return new DefaultThreadFactory();
}
java.util.concurrent.Executors.DefaultThreadFactory
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
4. 线程池办法
4.1 提交工作 execute
线程池的顶层接口 Executor 中只有一个 execute 办法,用于把工作提交到线程池,ThreadPoolExecutor 对它进行了实现。
办法阐明:
- 提交一个工作到线程池,工作不肯定会立刻执行。
- 提交的工作可能在一个新的线程中执行,也可能在曾经存在的闲暇线程中执行。
- 如果因为池敞开或者池容量曾经饱和导致工作无奈提交,那么就依据回绝策略 RejectedExecutionHandler 解决工作。
代码流程:
- 如果工作为空,抛出 NullPointerException。
- 如果 worderCount < corePoolSize,则通过 addWorker 增加新的外围线程,并把当前任务作为它的 firstTask 去执行。
- 如果 worderCount >= corePoolSize,阐明无奈增加新的外围线程,则须要把工作退出同步队列。分为两种状况:
- 如果入队胜利,须要做双重查看:
4.1 入队过程中线程池已敞开,则要回退入队操作,并执行回绝策略。
4.2 入队过程中工作线程已沦亡,则当工作线程数量为 0 时,须要初始化非核心线程,用于拉取队列中的工作去解决。 - 如果入队失败,则尝试创立非核心线程用于解决该工作。若非外围线程创立失败,则执行回绝策略。
java.util.concurrent.ThreadPoolExecutor#execute
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) // 增加新的外围线程,并把当前任务交给它执行
return;
c = ctl.get();}
if (isRunning(c) && workQueue.offer(command)) { // 线程池未敞开,且非阻塞入队胜利,则进入下一步
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) // 线程池已敞开,不接管新工作,须要出队并回绝工作
reject(command);
else if (workerCountOf(recheck) == 0) // 入队胜利但工作线程为空,则增加非核心线程且不指定工作
addWorker(null, false); // 没有工作的工作线程会从同步队列中拉取工作去执行
}
else if (!addWorker(command, false)) // 队列已满,且创立非核心线程失败,则回绝工作
reject(command);
}
4.2 增加工作线程 addWorker
在工作提交(execute 办法)、更新外围线程数(setCorePoolSize 办法)、预启动线程(prestartCoreThread 办法)中都会调用 addWorker 办法增加新的工作线程。
addWorker 入参指定该工作线程须要执行的工作,以及该工作线程是否外围线程。
代码次要流程:
- 通过查看线程池状态、线程数量限度,判断是否增加工作线程。
- 创立工作线程(Worker#new),启动工作线程(Thread#start)。
查看线程池状态(注:SHUTDOWN 不接管新的工作,然而会解决队列里的工作):
- 线程池状态为 STOP 或 TIDYING 或 TERMINATED: 都不会执行任何工作,无奈创立新线程;
- 线程池状态为 SHUTDOWN 且 firstTask != null: 因为不再承受新工作的提交,无奈创立新线程;
- 线程池状态为 SHUTDOWN 且 队列为空: 因为队列中曾经没有工作了, 所以也就不须要执行任何工作了,无奈创立新线程。
查看线程数量限度(注:工作线程在逻辑上分为外围线程、非核心线程):
- 如果工作线程数量超过 CAPACITY(即 2^29-1),则无奈创立新【工作线程】;
- 如果工作线程数量超过 corePoolSize,则无奈创立新的【外围线程】;
- 如果工作线程数量超过 maximumPoolSize,则无奈创立新的【非核心线程】;
java.util.concurrent.ThreadPoolExecutor#addWorker
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary. // 查看线程池状态
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {int wc = workerCountOf(c); // 查看线程数量限度
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c)) // workerCount 自增,完结自旋
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) // 线程池状态发生变化,重试
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {w = new Worker(firstTask); // 创立工作线程,指定初始工作(Executors.DefaultThreadFactory 中会执行 Thread#new,然而不会调用 Thread#start)final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 加锁用于操作 workers 汇合
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException(); // 如果线程工厂曾经提前启动线程了(Thread#start),则报错
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; // 更新最大池容量
workerAdded = true;
}
} finally {mainLock.unlock();
}
if (workerAdded) {t.start(); // 启动线程,由 JVM 在操作系统层面创立线程并执行 Thread#run
workerStarted = true;
}
}
} finally {if (! workerStarted)
addWorkerFailed(w); // 线程增加失败,回滚操作
}
return workerStarted;
}
留神,创立新线程的过程中,须要辨别 new Thread()
和 new Thread().start
的不同:
- Thread#new:创立 Thread 对象,并不会映射到操作系统上的线程,此时 Thread#isAlive 为 false。留神线程池中的线程工厂只能创立 Thread 对象,不可启动线程。
- Thread#start:启动线程,由 JVM 在操作系统层面创立线程,并绑定到 Thread 对象中,此时 Thread#isAlive 为 true。
另外,在 ThreadPoolExecutor#addWorker 办法中执行 Thread t = w.thread; t.start()
会触发执行 ThreadPoolExecutor#runWorker,该过程简化如下:
private final class Worker implements Runnable {
final Thread thread; // 工作线程
public Worker() {thread = new Thread(this);
System.out.println("addWorker!");
}
@Override
public void run() {System.out.println("runWorker!");
}
}
/**
* 测试在 addWorker 中触发 runWorker
*/
@Test
public void test() throws InterruptedException {Worker worker = new Worker();
worker.thread.start();
worker.thread.join();}
4.3 执行工作 runWorker
在 ThreadPoolExecutor#addWorker 中增加工作线程之后,会启动工作线程(Thread#start),触发工作线程执行工作(Thread#run)。
runWorker 代码流程:
- 获取工作,该工作可能是 firstTask,也可能是从队列中拉取的工作。
- 获取 worker 上的互斥锁,确保除非线程池敞开,否则没有其余线程可能中断当前任务。
- 查看线程池状态,如果是 STOP 或 TIDYING 或 TERMINATED,阐明不再须要执行工作了,中断以后线程。
- 执行前置工作 beforeExecute,这是一个钩子办法。
- 执行工作 Runnable#run。
- 执行后置工作 afterExecute,这也是一个钩子办法。
对于 Worker#lock,官网的阐明:
Before running any task, the lock is acquired to prevent other pool interrupts while the task is executing, and then we ensure that unless pool is stopping, this thread does not have its interrupt set.
java.util.concurrent.ThreadPoolExecutor#runWorker
final void runWorker(Worker w) {Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts // 初始化 state 为 0
boolean completedAbruptly = true;
try {while (task != null || (task = getTask()) != null) { // firstTask 不为空,或者从队列拉取到工作不为空
w.lock(); // 加锁,确保除非线程池敞开,否则没有其余线程可能中断当前任务
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) && // 如果线程池状态 >= STOP,则中断以后线程,不须要执行新工作
!wt.isInterrupted()) // 这里可能会两次执行 isInterrupted,是为了防止 shutdownNow 过程中革除了线程中断状态
wt.interrupt();
try {beforeExecute(wt, task); // 前置工作,预留
Throwable thrown = null;
try {task.run(); // 执行工作
} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
} finally {afterExecute(task, thrown); // 后置工作,预留
}
} finally {
task = null;
w.completedTasks++;
w.unlock();}
}
completedAbruptly = false; // 走到这里阐明线程没有工作可执行
} finally {processWorkerExit(w, completedAbruptly); // 工作线程退出
}
}
4.4 获取工作 getTask
在 ThreadPoolExecutor#runWorker 中,工作线程执行工作之前,如果 firstTask 为空,则调用 getTask() 从队列中获取工作。
工作线程从队列中拉取工作之前,须要进行校验,如果呈现以下任意一种状况会间接退出:
- 工作线程数量大于 maximumPoolSize;
- 线程池已进行(STOP);
- 线程池已敞开(SHUTDOWN)且队列为空;
- 工作线程期待工作超时(keepAliveTime)。
java.util.concurrent.ThreadPoolExecutor#getTask
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?
for (;;) {int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 校验线程池状态:1. 线程池状态为 SHUTDOWN 且队列为空;2. 线程池状态 >= STOP
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 以后线程是否容许超时,true 示意具备超时工夫(keepAliveTime)if ((wc > maximumPoolSize || (timed && timedOut)) // 校验工作线程状态:1. 工作线程数超过 maximumPoolSize;2. 当前工作线程已超时;3. 队列为空
&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 阻塞直到拉取胜利或超时
workQueue.take(); // 阻塞直到拉取胜利
if (r != null)
return r;
timedOut = true; // 示意直到超时,都没有获取工作
} catch (InterruptedException retry) { // 拉取时被中断唤醒,持续自旋
timedOut = false;
}
}
}
工作线程从线程池中拉取工作,具备两种形式:
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
:阻塞直到拉取工作胜利或超时。workQueue.take()
:阻塞直到拉取工作胜利。
代码中通过 allowCoreThreadTimeOut || wc > corePoolSize
这个表达式来管制应用哪种拉取工作形式。
该表达式为 true 时,如果线程在 keepAliveTime 工夫内没有拉取到工作,则会被销毁,体现为“非核心线程”。
然而,因为工作线程数量 wc 是会实时发生变化的,因而同一个线程在运行期间可能会先后应用不同的形式拉取工作。
也就是说,工作线程在运行期间可能会在“外围线程”和“非核心线程”两种状态之间切换。
- 外围 -> 非核心:初始 addWorker() 时以后线程为外围线程。当队列满了后,池中新增了非核心线程,此时以后线程执行 getTask() 满足 wc > corePoolSize,变为非核心线程。
- 非核心 -> 外围:初始 addWorker() 时以后线程为非核心线程。当局部外围线程因执行工作产生异样而终结,此时以后线程执行 getTask() 不满足 wc > corePoolSize,变为外围线程。
而实际上 ThreadPoolExecutor 辨别“外围线程”和“非核心线程”只是为了利用 corePoolSize 来管制沉闷线程数量以及工作是否进入队列中排队期待,并不关怀 Worker 到底是不是“外围线程”。
4.5 工作线程退出 processWorkerExit
在 runWorker() 中,如果通过 getTask() 辨认到闲暇线程(timedOut = true),或者工作线程在执行工作过程中出现异常,会调用 processWorkerExit() 退出工作线程。
代码流程:
- 如果以后线程是因为工作执行异样而终止的,须要扣减 workerCount。
- 获取 mainLock,统计工作数,从 workers set 中移除以后 worker。
- 尝试终止线程池。
- 如果线程池未终止,须要判断是否补上新的非核心线程。
留神,以后线程在执行完 processWorkerExit 办法之后会主动完结运行,Thread#isAlive 返回 false。
因而在以后线程终止之前,如果满足以下条件之一,则会创立新的非核心线程来替换以后线程:
- 用户工作执行异样导致线程退出。
- 工作线程数少于 corePoolSize。
- 期待队列不为空但没有工作线程。
Java 官网的阐明:
replaces the worker if either it exited due to user task exception or if fewer than corePoolSize workers are running or queue is non-empty but there are no workers.
java.util.concurrent.ThreadPoolExecutor#processWorkerExit
/**
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted // 以后线程执行工作时出现异常,须要扣减 workerCount
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks; // 统计所有线程实现的工作数
workers.remove(w); // 移除以后线程的 worker
} finally {mainLock.unlock();
}
tryTerminate(); // 尝试终止线程池
int c = ctl.get();
if (runStateLessThan(c, STOP)) { // RUNNING、SHUTDOWN,即线程池尚未进行
if (!completedAbruptly) {
// 没有出现异常,阐明以后线程是非沉闷线程:// 1. allowCoreThreadTimeOut 为 false,则 min 为 corePoolSize。若 workerCountOf(c) >= min 阐明以后终止的是非核心线程,无需补充新线程
// 2. allowCoreThreadTimeOut 为 true,且队列为空,则 min 为 0。若 workerCountOf(c) >= min 阐明以后没有工作须要解决,无需补充新线程
// 3. allowCoreThreadTimeOut 为 true,且队列非空,则 min 为 1。若 workerCountOf(c) >= min 阐明具备沉闷的线程解决工作,无需补充新线程
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false); // 创立新的线程替换以后线程
}
}
4.6 尝试敞开线程池 tryTerminate
tryTerminate 用于尝试终止线程池,在 shutdow()、shutdownNow()、remove() 中均是通过此办法来终止线程池。
此办法必须在任何可能导致线程终止的行为之后被调用,例如缩小工作线程数,移除队列中的工作,或者是在工作线程运行结束后处理工作线程退出逻辑的办法(processWorkerExit)。
代码流程:
- 校验线程池状态。当线程池状态为 STOP,或者状态为 SHUTDOWN 且队列为空,阐明线程池是可终止的,此时才可进入下一步。
- 校验线程数量。如果线程池中工作线程数量不为 0,则中断其中一个线程(interruptIdleWorkers)并完结 tryTerminate 办法,后续由该线程来传递线程池敞开音讯(runWorker -> getTask -> processWorkerExit -> tryTerminate)。
- 当线程池中没有工作线程,且队列中没有工作后,开始敞开线程池:
3.1 批改线程池状态:(STOP or SHUTDOWN) -> TIDYING
3.2 调用钩子办法 terminated()
3.3 批改线程池状态:TIDYING -> TERMINATED
java.util.concurrent.ThreadPoolExecutor#tryTerminate
final void tryTerminate() {for (;;) {int c = ctl.get();
// 校验线程池状态,只有状态为 STOP,或者(状态为 SHUTDOWN 且队列为空)的状况下,才能够往下执行,否则间接返回
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) || // TIDYING、TERMINATED
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE); // 仅中断一个工作线程,由它来传递线程池敞开音讯
return;
}
final ReentrantLock mainLock = this.mainLock; // 来到这里,阐明线程池中没有工作线程了
mainLock.lock();
try {if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {// (STOP or SHUTDOWN) -> TIDYING
try {terminated(); // 钩子办法
} finally {ctl.set(ctlOf(TERMINATED, 0)); // TIDYING -> TERMINATED
termination.signalAll();}
return;
}
} finally {mainLock.unlock();
}
// else retry on failed CAS
}
}
4.7 敞开线程池
了解了 tryTerminate() 如何尝试敞开线程池后,再来看一下发动线程池敞开的办法:shutdown()、shutdownNow()。
4.7.1 shutdown
敞开线程池,不接管新的工作,然而会解决队列里的工作。
java.util.concurrent.ThreadPoolExecutor#shutdown
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {checkShutdownAccess(); // 查看敞开权限
advanceRunState(SHUTDOWN); // 批改线程池状态
interruptIdleWorkers(); // 顺次中断所有闲暇线程
onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();
}
tryTerminate(); // 尝试敞开线程池}
了解 shutdown() 如何做到在敞开线程池之前,不承受新工作,并且持续解决已有工作,关键在于两个操作:
advanceRunState(SHUTDOWN)
设置线程池状态为 SHUTDOWN 之后:
- 在 ThreadPoolExecutor#execute 中,线程池状态为 SHUTDOWN 不会接管新的工作。
- 在 ThreadPoolExecutor#getTask 中,线程池状态为 SHUTDOWN 然而队列中仍有未解决的工作,会持续拉取工作来解决。
- 在 ThreadPoolExecutor#runWorker 中,线程池状态为 SHUTDOWN 能够持续解决工作。
interruptIdleWorkers
java.util.concurrent.ThreadPoolExecutor#interruptIdleWorkers()
/**
* Common form of interruptIdleWorkers, to avoid having to
* remember what the boolean argument means.
*/
private void interruptIdleWorkers() {interruptIdleWorkers(false);
}
/**
* Interrupts threads that might be waiting for tasks (as
* indicated by not being locked) so they can check for
* termination or configuration changes. Ignores
* SecurityExceptions (in which case some threads may remain
* uninterrupted).
*
* @param onlyOne If true, interrupt at most one worker. This is
* called only from tryTerminate when termination is otherwise
* enabled but there are still other workers. In this case, at
* most one waiting worker is interrupted to propagate shutdown
* signals in case all threads are currently waiting.
* Interrupting any arbitrary thread ensures that newly arriving
* workers since shutdown began will also eventually exit.
* To guarantee eventual termination, it suffices to always
* interrupt only one idle worker, but shutdown() interrupts all
* idle workers so that redundant workers exit promptly, not
* waiting for a straggler task to finish.
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) { // 可能获取锁,阐明以后线程没有在执行工作,是“闲暇”的
try {t.interrupt();
} catch (SecurityException ignore) { } finally {w.unlock();
}
}
if (onlyOne)
break;
}
} finally {mainLock.unlock();
}
}
interruptIdleWorkers() 在中断线程之前,应用 tryLock() 尝试一次性获取锁,再中断工作。
- 如果指标线程在 ThreadPoolExecutor#getTask 中拉取工作,因为拉取工作不必持有锁,因而该指标线程在期待工作过程中会被中断唤醒,从新自旋校验再拉取工作。
- 如果指标线程在 ThreadPoolExecutor#runWorker 中执行工作,因为执行工作须要持有锁,因而其余线程 tryLock() 失败,以后指标线程能够平安执行完工作。
4.7.2 shutdownNow
敞开线程池,不接管新工作,也不会解决队列里的工作,并且中断正在运行的工作。
java.util.concurrent.ThreadPoolExecutor#shutdownNow
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
*
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers(); // 中断所有线程
tasks = drainQueue(); // 移除期待队列中的所有工作} finally {mainLock.unlock();
}
tryTerminate();
return tasks;
}
了解 shutdownNow() 如何做到在敞开线程池时,不承受新工作,也不会解决队列里的工作,并且中断正在运行的工作。关键在于三个操作:
advanceRunState(STOP)
设置线程池状态为 STOP 之后:
- 在 ThreadPoolExecutor#execute 中,线程池状态为 STOP 不会接管新的工作。
- 在 ThreadPoolExecutor#getTask 中,线程池状态为 STOP,不论队列中是否有未解决工作,均不再拉取。
- 在 ThreadPoolExecutor#runWorker 中,线程池状态为 STOP 会执行 interrupt() 设置中断状态,工作会不会继续执行取决于该工作中有没有查看中断状态。
interruptWorkers
java.util.concurrent.ThreadPoolExecutor#interruptWorkers
/**
* Interrupts all threads, even if active. Ignores SecurityExceptions
* (in which case some threads may remain uninterrupted).
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {for (Worker w : workers)
w.interruptIfStarted();} finally {mainLock.unlock();
}
}
java.util.concurrent.ThreadPoolExecutor.Worker#interruptIfStarted
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { // 留神这里没有获取锁!try {t.interrupt();
} catch (SecurityException ignore) {}}
}
与 interruptIdleWorkers() 相比,interruptWorkers() 在中断线程之前,只需校验 getState() >= 0,无需获取锁即可强行中断运行中的线程。
- 如果指标线程在 ThreadPoolExecutor#getTask 中拉取工作,因为拉取工作不必持有锁,因而该指标线程在期待工作过程中会被中断唤醒,从新自旋校验后不再拉取工作。
- 如果指标线程在 ThreadPoolExecutor#runWorker 中执行工作,会被强行设置中断状态,然而工作会不会继续执行取决于该工作中有没有查看中断状态。
drainQueue
java.util.concurrent.ThreadPoolExecutor#drainQueue
/**
* Drains the task queue into a new list, normally using
* drainTo. But if the queue is a DelayQueue or any other kind of
* queue for which poll or drainTo may fail to remove some
* elements, it deletes them one by one.
*/
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList); // 批量将队列中的工作转移到 taskList
if (!q.isEmpty()) {for (Runnable r : q.toArray(new Runnable[0])) {if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
将队列 workQueue 中未解决的工作全副拉取到 taskList 中,不再解决工作。
5. 线程数配置
正当地配置线程池:
- CPU 密集型工作:配置尽可能小的线程,如配置 Ncpu+1 个线程的线程池,升高调度开销。
- IO 密集型工作:因为线程并不是始终在执行工作,应配置尽可能多的线程,如 2*Ncpu。
- 混合型的工作:只有这两个工作执行的工夫相差不是太大,将其拆分成一个 CPU 密集型工作 和一个 IO 密集型工作。
《Java 并发编程实战》提出了一个线程数计算公式。
定义:
$$
N_{cpu} = CPU 外围数
$$
$$
U_{cpu} = 指标 CPU 利用率,0 \leqslant U_{cpu} \leqslant 1
$$
$$
\frac{W}{C} = 等待时间和计算工夫的比例
$$
要使得处理器达到冀望的使用率,线程数的最优大小等于:
$$
N_{threads} = N_{cpu} * U_{cpu} * (1 + \frac{ W}{C} )
$$
能够通过 Runtime 来获取 CPU 外围数:
int N_CPU = Runtime.getRuntime().availableProcessors();
6. 总结
回顾一下 ThreadPoolExecutor 的内部结构:
- 如果以后运行的线程少于 corePoolSize,即便有闲暇线程也会创立新线程来执行工作。
- 如果运行的线程等于或多于 corePoolSize,则将工作退出 BlockingQueue。
- 如果无奈将工作退出 BlockingQueue(队列已满),则创立新的线程来解决工作。
- 如果创立新线程将使以后运行的线程超出 maximumPoolSize,工作将被回绝。
作者:Sumkor
链接:https://segmentfault.com/a/11…