共计 4195 个字符,预计需要花费 11 分钟才能阅读完成。
1、为什么应用线程池
- 升高资源耗费
通过反复利用已创立的线程升高线程创立和销毁造成的耗费 - 进步响应速度
当工作达到时, 工作能够不须要等到线程创立就能立刻执行 - 进步线程的可管理性
应用线程池能够对线程进行统一分配、治理和监控
3、创立线程池
应用 ThreadPoolExecutor
类创立线程池,它的构造方法一共有 7 个参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize: 外围线程数,线程池的在闲暇状况下所可能放弃的线程数。
当提交一个工作到线程池时, 线程池会创立一个线程来执行工作,即便其余闲暇的根本线程可能执行新工作也会创立线程,等到须要执行的工作数大于线程池根本大小时就不再创立。如果调用了 prestartAllCoreThreads()
办法,线程池会提前创立并启动所有根本线程。
maximumPoolSize: 线程池所能保留的最大线程数量
如果阻塞队列满了并且线程池已创立的线程数小于最大线程数, 则线程池会再创立新的线程执行工作。如果应用了无界的工作队列,则该参数有效
keepAliveTime: 线程的生存工夫
当线程数大于 corePoolSize
时,多余的闲暇线程具备的生存工夫,当线程在肯定工夫内处于闲暇状态(没有执行工作),线程池会主动销毁该线程,但线程池不会销毁所有线程,而是保留最多 corePoolSize
个线程。
TimeUnit: 线程生存工夫的单位
BlockingQueue: 工作队列
用于保留期待执行的工作的阻塞队列。可选队列类型如下:
- ArrayBlockingQueue:基于数组的有界阻塞队列,此队列按 FIFO(先进先出)原
则对元素进行排序。 - LinkedBlockingQueue: 一个基于链表的阻塞队列,如果创立时设置了
size
参数就是有界队列,否则是无界队列。此队列按 FIFO 排序元素, 吞吐量通常要高于数组队列。动态工厂办法Executors.newFixedThreadPool()
应用了这个队列。 - SynchronousQueue:存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作, 否则插入操作始终处于阻塞状态, 吞吐量通常要高于 Linked- BlockingQueue, 动态工厂办法
Executors.newCachedThreadPool()
应用了这个队列。 - PriorityBlockingQueue: 具备优先级的有限阻塞队列。
ThreadFactory: 线程工厂
用于创立线程的工厂,能够在创立办法能够自定义线程的根本属性(如:名称、优先级、设置守护线程)
// 应用开源框架 guava 提供的 ThreadFactoryBuilder 创立线程工厂
new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build() ;
RejectedExecutionHandler: 饱和策略
当阻塞队列和线程池都满了, 阐明线程池处于饱和状态,那么必须采取一种策略解决提交的新工作。在 JDK 中提供了以下 4 种策略:
- AbortPolicy:间接抛出异样(默认)
- DiscardPolicy:间接抛弃当前任务
- DiscardOldestPolicy:抛弃队列里最近的工作,并执行当前任务。
- CallerRunsPolicy:只用调用者所在线程来运行工作。
3、线程池工作流程
0)使用者向线程池提交了一个工作
1)如果以后运行的线程数少于corePoolSize
,则创立新线程来执行工作(创立线程须要获取全局锁)。
2)如果运行的线程数大于等于corePoolSize
,则将工作退出阻塞队列
3)如果阻塞队列已满,则创立新的线程来解决工作
4)如果创立新线程将使以后运行的线程超出最大线程数,则采取饱和策略解决
线程池采纳上述步骤的总体设计思路,是为了在执行 execute()办法时,尽可能地防止获取全局锁。在线程池实现预热之后 (以后运行的线程数大于等于 corePoolSize),简直所有的 execute() 办法调用都是执行步骤 2, 而步骤 2 不须要获取全局锁。
4、线程池的生命周期
调用线程池的 shutdown()
或shutdownNow()
办法敞开线程池。
它们的原理是:遍历线程池中的工作线程, 而后一一调用线程的 interrupt 办法来中断线程,所以不能终止无奈响应中断的工作。
shutdownNow:首先将线程池的状态设置成 STOP,而后尝试进行所有的正在执行或暂停工作的线程, 并返回期待执行工作的列表
shutdown:只是将线程池的状态设置成 SHUTDOWN 状态,而后中断所有没有正在执行工作的线程。
5、线程池配置
要想正当地配置线程池,就必须首先剖析工作个性,性质不同的工作用不同规模的线程池离开解决。能够从以下几个角度来剖析:
- 工作的性质:CPU 密集型工作、I0 密集型工作和混合型工作
- 工作的优先级: 高、中和低
- 工作的执行工夫: 长、中和短
- 工作的依赖性: 是否依赖其余系统资源, 如数据库连贯
通过 Runtime.getRuntime().availableProcessors()
办法取得以后设施的 CPU 个数
CPU 密集型工作应配置尽可能小的线程,如配置 N + 1 个线程的线程池
IO 密集型工作的线程不是始终在执行工作(被 IO 阻塞),应配置尽可能多的线程,如:2*Ncpu
优先级不同的工作能够应用优先级队列 PriorityBlockingQueue 来解决。它能够让优先级高的工作先执行。留神如果始终有优先级高的工作提交到队列里, 那么优先级低的工作可能永远不能执行。
执行工夫不同的工作能够交给不同规模的线程池来解决,或者能够应用优先级队列,让执行工夫短的工作先执行。
依赖数据库连接池的工作, 因为线程提交 SQL 后须要期待数据库返回后果,期待的工夫越长,则 CPU 闲暇工夫就越长,那么线程数应该设置得越大, 这样能力更好地利用 CPU。
6、线程复用原理
线程池有一个外部类叫 Worker,它实现了 Runnable 接口,它有 2 个重要的外部变量 thread (创立的线程) 和 firstTask (线程的首工作)
Worker 的构造方法接管一个 Runnable 参数,将其赋值给 firstTask,还会应用线程池的线程工厂的 newThread(Runnable task)
办法创立线程,将本身作为参数传入。这样一来,当创立的线程执行时,就会执行 Worker 对象 的 run()
办法,理论就是执行 runWorker()
办法。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
// 创立的线程
final Thread thread;
// 线程执行的第一个工作
Runnable firstTask;
Worker(Runnable firstTask) {setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {runWorker(this);
}
}
在 runWorker()
中,当 Worker 对象的 firstTask 没有执行时,会执行它的 firstTask;若已执行,则会从阻塞队列中获取工作执行,这个过程会继续循环直到线程进行为止。至此就实现了线程的复用。
final void runWorker(Worker w) {Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
try {while (task != null || (task = getTask()) != null) {
...
task.run();
...
}
} finally {processWorkerExit(w, completedAbruptly);
}
}
线程池调用 execute
办法提交工作,通过 addWorker()
创立 Worker 对象,并启动新建的线程
public void execute(Runnable command) {
// 如果以后线程数小于外围线程数,则创立新线程
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))
return;
c = ctl.get();}
// 将工作存入阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果阻塞队列已满,则创立新过程解决
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
.......
Worker w = null;
try {
// 创立 Worker
w = new Worker(firstTask);
// 获取 Worker 中的线程对象
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
........
// 运行线程
if (workerAdded) {t.start();
workerStarted = true;
}
}
} finally {if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
这是线程池 execute()
办法的时序图,通过时序图便于了解下面的过程