共计 10043 个字符,预计需要花费 26 分钟才能阅读完成。
线程池
池化技术的益处
- 升高资源耗费:能够反复利用已创立的线程升高线程创立和销毁造成的耗费。
- 进步响应速度:当工作达到时,工作能够不须要等到线程创立就能立刻执行。
- 进步线程的可管理性:线程是稀缺资源,如果无限度地创立,不仅会耗费系统资源,还会升高零碎的稳定性,应用线程池能够进行统一分配、调优和监控。
线程池的利用场景
- 服务器承受到大量申请时, 应用线程池技术时十分适合的, 它能够大大减少线程的创立和销毁次数, 进步服务器的工作效率
- 实际上, 在开发中, 如果须要创立 5 个以上的线程, 那么就能够应用线程池来治理
线程池的类关系图
线程池的结构器参数
参数名 | 类型 | 含意 |
---|---|---|
corePoolSize | int | 外围线程数 |
maxPoolSize | int | 最大线程数 |
keepAliveTime | long | 放弃存活工夫 |
workQueue | BlockingQueue | 工作存储队列 |
threadFactory | ThreadFactory | 当线程池须要新的线程的时候, 会应用 threadFactory 来生成新的线程 |
Handler | RejectedExecutionHandler | 因为线程池无奈承受所提交的工作的回绝策略 |
corePoolSize 和 maxPoolSize
corePoolSize
指的是外围线程数: 线程池在实现初始化后, 默认状况下, 还没有创立任何线程, 线程池会期待有工作到来时, 再创立新线程去执行工作, 直到达到外围线程数, 之后外围线程会始终放弃这个数量; 当工作数量超过外围线程数, 将工作放在阻塞队列workQueue
中, 期待外围线程闲暇后处理- 如果外围线程全副在工作中, 而且队列也满了, 线程池就会在外围线程的根底上, 额定减少一些线程, 这些新减少的线程数最大下限就是
maxPoolSize
线程创立规定
- 如果线程数小于 corePoolSize, 即便其余线程处于闲暇状态, 也会创立一个新线程 (外围线程) 来运行新工作
- 如果线程数等于(或大于)corePoolSize 但少于 maxPoolSize, 则将工作放入队列
- 如果队列已满, 并且线程数小于 maxPoolSize, 则创立一个新线程来运行工作
- 如果队列已满, 并且线程数大于或等于 maxPoolSize 则回绝该工作
增减线程的特点
- 通过设置 corePoolSize 和 maxPoolSize 为雷同数量, 就能够创立固定大小的线程池, 即便队列满了也不会在拓展线程
- 线程池心愿放弃较少的线程数, 并且只有在负载变得很大时才减少它, 这就是队列的用意
- 通过设置 maxPoolSize 为很高的只, 例如 Integer.MAX_VALUE, 能够容许线程池包容任意数量的并发工作
- 是只有在队列填满时才创立多于 corePoolSize 的线程, 所以如果应用无界队列(例如 LinkedBlockingQueue), 那么线程数就不会超过 corePoolSize
keepAliveTime
闲暇的非核心线程的存活工夫, 用于回收线程
- 如果线程池以后的线程数多于 corePoolSize, 那么如果多余的线程闲暇工夫超过 keepAliveTime, 它们就会被终止
ThreadFactory
线程工厂, 用于创立线程
- 新的线程是由 ThreadFactory 创立的, 默认应用的线程工厂是
Executors.defaultThreadFactory()
, 创立进去的线程都在同一个线程组, 领有同样的NORM_PRIORITY
优先级并且都不是守护线程; 如果本人定义 ThreadFactory, 那么就能够扭转线程名, 线程组, 优先级, 是否是守护线程等 - 通常应用默认的就能够, 源码如下:
workQueue
有三种最常见的队列类型:
- 间接交接: SynchronousQueue 无容量
- 无界队列: LinkedBlockingQueue 有限容量, 有内存溢出的危险
- 有界队列: ArrayBlockingQueue 可设置容量
ThreadPoolExecutor 的启动
/** | |
* - 通过 new 创立线程池时, 除非调用 prestartAllCoreThreads / prestartCoreThread 办法启动外围线程, | |
* - 否则即便工作队列中存在工作, 同样也不会执行. | |
*/ | |
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 20, 3L, TimeUnit.SECONDS, linkedBlockingDeque); | |
/** | |
* Starts all core threads, causing them to idly wait for work. This | |
* overrides the default policy of starting core threads only when | |
* new tasks are executed. | |
* - 启动所有外围线程,让它们无闲暇的期待工作。这将笼罩仅在执行新工作时启动外围线程的默认策略。* - 手动启动线程池. | |
* @return the number of threads started | |
*/ | |
threadPoolExecutor.prestartAllCoreThreads(); |
JDK 内置线程池
线程池应该手动创立还是主动创立
手动创立, 能够让咱们更加明确线程池的容许规定, 防止资源耗尽的危险
主动创立, 也就是间接调用 JDK 封装号的构造函数, 可能会带来一些问题:
Executors.newFixedThreadPool(int nThreads)
数量固定的线程池
public static ExecutorService newFixedThreadPool(int nThreads) { | |
return new ThreadPoolExecutor(nThreads, nThreads, | |
0L, TimeUnit.MILLISECONDS, | |
new LinkedBlockingQueue<Runnable>()); | |
} |
corePoolSize 和 maxPoolSize 被设置为雷同的 nThreads 参数, 并应用了无界队列 LinkedBlockingQueue, 不会拓展线程所以也没有存活工夫
当工作在队列中沉积过多, 可能就会造成 OOM
Executors.newSingleThreadExecutor()
只有一个线程的线程池
public static ExecutorService newSingleThreadExecutor() { | |
return new FinalizableDelegatedExecutorService | |
(new ThreadPoolExecutor(1, 1, | |
0L, TimeUnit.MILLISECONDS, | |
new LinkedBlockingQueue<Runnable>())); | |
} |
Executors.newCachedThreadPool()
可缓存线程
无界限程池, 具备主动回收多余线程的性能
public static ExecutorService newCachedThreadPool() { | |
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, | |
60L, TimeUnit.SECONDS, | |
new SynchronousQueue<Runnable>()); | |
} |
最大的线程数被设置为 Integer.MAX_VALUE, 线程闲暇 60 秒后回收, 不应用队列(SynchronousQueue)
Executors.newScheduledTreadPool()
反对定时及周期性工作执行的线程池, 应用提早队列(DelayedWorkQueue)
public static void main(String[] args) { | |
ScheduledExecutorService threadPool = | |
Executors.newScheduledThreadPool(10); | |
// 提早 5 秒执行工作 | |
threadPool.schedule(new EveryTaskOneThread.Task(),5, TimeUnit.SECONDS); | |
// 1 秒之后每个 3 秒执行一次工作 | |
threadPool.scheduleAtFixedRate(new EveryTaskOneThread.Task(), | |
1, 3,TimeUnit.SECONDS); | |
} |
所以, 还是更具业务的并发量手动创立线程池吧
JDK1.8 后退出workStealingPool
- 子工作
- 窃取
线程数量怎么设定?
- CPU 密集型(加密, 即便 hash 等) : 最佳线程数为 CPU 外围数的 1 - 2 倍左右
- 耗时 I / O 型(读写数据库, 文件, 网络传输等): 最佳线程数个别会大于 CPU 外围数很多倍, 以 JVM 线程监控显示忙碌状况为根据, 保障线程闲暇能够连接上, 参考 Brain Goetz 举荐的计算方法:
== 线程数 =CPU 外围数 * (1+ 均匀等待时间 / 均匀工作工夫))==
- 实际上最靠谱的还是通过压力测试得出适合的线程数
进行线程池的正确形式
- shutdown 执行该办法后, 线程池会将以后队列中的工作执行结束, 并且在次期间回绝新工作进入, 执行完后进行线程池
public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(10); | |
for (int i = 0; i < 1000; i++){executorService.execute(new ShutDownTask()); | |
} | |
System.out.println(executorService.isShutdown()); | |
Thread.sleep(1500); | |
executorService.shutdown(); | |
// 是否进入进行状态 | |
System.out.println(executorService.isShutdown()); | |
// 回绝新工作 | |
executorService.execute(new ShutDownTask()); | |
// 是否真正意义上的敞开 | |
System.out.println(executorService.isTerminated()); | |
} | |
static class ShutDownTask implements Runnable { | |
@Override | |
public void run() { | |
try {Thread.sleep(500); | |
System.out.println(Thread.currentThread().getName()); | |
} catch (InterruptedException e) {e.printStackTrace(); | |
} | |
} | |
} |
awaitTermination(timeout)
: 在一段时间内所有工作是否被执行结束
- shutdownNow 将所有线程中断, 并且队列中还未执行的工作作为一个列表返回
public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(10); | |
for (int i = 0; i < 1000; i++){executorService.execute(new ShutDownTask()); | |
} | |
System.out.println(executorService.isShutdown()); | |
Thread.sleep(1500); | |
// 发送中断信号, 并返回 runnableList | |
List<Runnable> runnableList = | |
executorService.shutdownNow();} | |
static class ShutDownTask implements Runnable { | |
@Override | |
public void run() { | |
try {Thread.sleep(500); | |
System.out.println(Thread.currentThread().getName()); | |
} catch (InterruptedException e) {System.out.println(Thread.currentThread().getName() + "被中断了"); | |
} | |
} | |
} |
工作回绝策略
-
回绝机会
- 当 Executor 敞开 (shutdown) 时, 提交新工作会被回绝
- 当 Executor 对最大线程和队列容量应用有限度并且曾经饱和时
4 种回绝策略
- AbortPolicy: 默认, 间接抛出 RejectedExecutionException 回绝异样
- DiscardPolicy: 默默的把被回绝的工作抛弃
- DiscardOldestPolicy: 当有新工作时, 会抛弃工作队列中存在最久的老工作, 以腾出地位给新工作
- CallerRunsPolicy: 将被线程池回绝的工作交给调用者 (caller) 主线程去执行
钩子办法
每个工作执行前后能够减少解决(日志, 统计)
/** | |
* 演示每个工作执行前后都能够放钩子函数 | |
*/ | |
public class PauseableTreadPool extends ThreadPoolExecutor {private final ReentrantLock lock = new ReentrantLock(); | |
private Condition unpaused = lock.newCondition(); | |
// 标记线程是否处于暂停状态 | |
private boolean isPaused; | |
public PauseableTreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); | |
} | |
public PauseableTreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); | |
} | |
public PauseableTreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); | |
} | |
public PauseableTreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); | |
} | |
// 重写办法 before 钩子 | |
@Override | |
protected void beforeExecute(Thread t, Runnable r) {super.beforeExecute(t, r); | |
lock.lock(); | |
try {while(isPaused) {unpaused.await(); | |
} | |
} catch (InterruptedException e) {e.printStackTrace(); | |
}finally {lock.unlock(); | |
} | |
} | |
// 暂停办法 | |
private void pause() {lock.lock(); | |
try {isPaused = true;} finally {lock.unlock(); | |
} | |
} | |
// 复原办法 | |
private void resume() {lock.lock(); | |
try{ | |
isPaused = false; | |
// 唤醒全副 | |
unpaused.signalAll();} finally {lock.unlock(); | |
} | |
} | |
public static void main(String[] args) throws InterruptedException { | |
PauseableTreadPool pauseableTreadPool = | |
new PauseableTreadPool(10, 20, | |
10l, TimeUnit.SECONDS, new LinkedBlockingDeque<>()); | |
Runnable runnable = new Runnable() { // 线程体 | |
@Override | |
public void run() {System.out.println("被执行"); | |
try {Thread.sleep(10); | |
} catch (InterruptedException e) {e.printStackTrace(); | |
} | |
} | |
}; | |
for (int i = 0; i < 10000; i++){pauseableTreadPool.execute(runnable); | |
} | |
Thread.sleep(1500); | |
pauseableTreadPool.pause(); | |
System.out.println("线程池被暂停"); | |
Thread.sleep(1500); | |
pauseableTreadPool.resume(); | |
System.out.println("线程池已复原"); | |
} | |
} |
原理 & 源码剖析
次要剖析 ThreadPoolExecutor
线程池的组成部分
- 线程池管理器 ExecutorService 控制线程池的启动和进行
- 工作线程 ThreadPoolExecutor 中的外部类 Worker
- 工作队列 线程平安的
BlockingQueue<Runnable> workQueue;
-
工作接口(Task)
/** * Creates with given first task and thread from ThreadFactory.
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker | |
this.firstTask = firstTask; | |
this.thread = getThreadFactory().newThread(this); |
}
** 线程池事项工作复用的原理 ** | |
- 用雷同的线程执行不同的工作 | |
**ThreadPoolExecutor 中的 execute 办法 ** |
/** | |
* 在未来的某个工夫执行给定的工作, 工作能够在新线程或池中现有的线程中执行 | |
* 如果无奈将工作提交执行, 起因之一是执行器已敞开或因为其容量已满, 该工作由以后的 RejectedExecutionHandler 解决 | |
* @param command 要执行的工作 | |
* | |
*/ | |
public void execute(Runnable command) {if (command == null) | |
throw new NullPointerException(); | |
/* | |
* Proceed in 3 steps: | |
* | |
* 1. 如果工作线程数量少于 corePoolSize, 尝试调用 addWorker 以给定的 command 启动一个新线程 | |
* | |
* 2. 如果一个工作能够胜利排队,那么咱们依然须要 | |
* 仔细检查咱们是否应该增加线程 | |
*(因为现有的自上次查看后死亡)或 | |
* 自从进入此办法以来,该池已敞开。所以咱们 | |
* 从新查看状态,并在必要时回退排队 | |
* 进行,如果没有,则启动一个新线程。* | |
* 3. 如果咱们无奈将工作排队,那么咱们尝试增加一个新的 | |
* 线程。如果失败,咱们晓得咱们曾经敞开或饱和 | |
* 并因而回绝工作。*/ | |
int c = ctl.get(); //ctl 记录了线程池状态和线程数 | |
if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) | |
return; | |
c = ctl.get();} | |
if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get(); | |
if (! isRunning(recheck) && remove(command)) | |
reject(command); | |
else if (workerCountOf(recheck) == 0) | |
addWorker(null, false); | |
} | |
else if (!addWorker(command, false)) | |
reject(command); | |
} |
final void runWorker(Worker w) {
Thread wt = Thread.currentThread(); | |
Runnable task = w.firstTask; | |
w.firstTask = null; | |
w.unlock(); // allow interrupts | |
boolean completedAbruptly = true; | |
try { // 循环获取工作执行 | |
while (task != null || (task = getTask()) != null) {w.lock(); | |
// If pool is stopping, ensure thread is interrupted; | |
// if not, ensure thread is not interrupted. This | |
// requires a recheck in second case to deal with | |
// shutdownNow race while clearing interrupt | |
if ((runStateAtLeast(ctl.get(), STOP) || | |
(Thread.interrupted() && | |
runStateAtLeast(ctl.get(), STOP))) && | |
!wt.isInterrupted()) | |
wt.interrupt(); | |
try {beforeExecute(wt, task); | |
Throwable thrown = null; | |
try {task.run(); | |
} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x); | |
} finally {afterExecute(task, thrown); | |
} | |
} finally { | |
task = null; | |
w.completedTasks++; | |
w.unlock();} | |
} | |
completedAbruptly = false; | |
} finally {processWorkerExit(w, completedAbruptly); | |
} | |
} |
## 线程池的状态 | |
- RUNNING: 承受型工作并解决排队工作 | |
- SHUTDOWN: 不接受任务, 但解决排队工作 | |
- STOP: 不承受新工作, 也不解决排队工作, 并中断正在进行的工作 | |
- TIDYING(整洁): 所有工作都已终止, workerCount 为 0 时, 线程会转换到 TIDYING 状态, 并将运行 terminate() 钩子办法 | |
- TERMINATED: terminate() 运行实现 |
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; | |
private static final int SHUTDOWN = 0 << COUNT_BITS; | |
private static final int STOP = 1 << COUNT_BITS; | |
private static final int TIDYING = 2 << COUNT_BITS; | |
private static final int TERMINATED = 3 << COUNT_BITS; |
## 应用线程池的留神点 | |
- 防止工作的沉积 | |
FixedThreadPool SingleThreadExecutor | |
工作队列长度过大, 可能会沉积大量的申请, 从而导致 OOM. | |
- 防止线程数适度减少 | |
CachedThreadPool ScheduledThreadPool | |
容许的创立线程数量为 Integer.MAX_VALUE,可能会创立大量的线程,从而导致 OOM。- 排查线程透露 | |
线程曾经执行结束, 却没有正确的被回收, 往往是工作的逻辑问题 | |