之所以存在线程池是基于以下两个起因:
- 线程的创立和销毁是须要有资源耗费的,多线程环境下频繁创立、销毁线程会影响零碎性能
- 对于一个须要频繁创立工作、线程的利用来说,创立的工作数、线程数须要受到管制或治理
有了线程池,尤其是相似 ThreadPoolExecutor 这种能够通过参数调整其行为的线程池,能够近乎完满的解决上述两个问题。
线程池工作原理
简略来说线程池的工作原理就是:提前或者在执行工作的时候创立线程,执行完工作之后不销毁线程而是将线程偿还到线程池中,后续有工作提交上来之后就能够不再创立线程、而是由线程池中闲暇的线程执行工作。
这样一来就能够防止频繁创立和销毁线程,并且也能够控制线程池中线程的数量,同时如果提交工作的速度太快、线程池中的线程来不及执行工作的话,能够将工作放在队列中期待,等后面的工作执行实现、线程偿还到线程池中之后,再从队列中获取工作继续执行。
其实以上就是 ThreadPoolExecutor 性能的简略形容。
当然 ThreadPoolExecutor 的性能要比这个形容弱小的多也简单的多。咱们就从以下几个方面来详细分析一下 ThreadPoolExecutor 的性能和底层原理:
- 根本属性
- 工作队列
- 创立线程池
- 提交工作
- 执行工作
- 回绝工作
- 钩子函数
根本属性
corePoolSize&maximumPoolSize:ThreadPoolExecutor 有外围线程数(corePoolSize)和最大线程数(maximumPoolSize)的概念,新工作提交后,如果以后线程数小于 corePoolSize,即便线程池中有闲暇线程,ThreadPoolExecutor 也会立刻创立一个线程去执行工作。如果以后线程数大于 corePoolSize 且小于 maximumPoolSize,则只有队列满的状况下才会创立线程、否则工作入队列排队。
ctl:绑定了状态 runState 和线程数 workerCount 两个属性的 AtomicInteger 变量:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
了解 ctl 的工作原理是读懂 ThreadPoolExecutor 源码的必要前提。
先看两个辅助变量:
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
COUNT_BITS 是 Integer.size-3=31-3=29,CAPACITY 是 1 向左位移 29 位后减 1,用二进制示意就是:
0001 1111 1111 1111 1111 1111 1111 1111
~CAPACITY 用二进制示意就是:
1110 0000 0000 0000 0000 0000 0000 0000
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) {return c & ~CAPACITY;}
private static int workerCountOf(int c) {return c & CAPACITY;}
private static int ctlOf(int rs, int wc) {return rs | wc;}
runState 有 RUNNING、SHUTDOWN、STOP、TIDYING 和 TERMINATED 等 5 个状态,压入 ctl 示意的时候全副要左移 29 位。意思是:ctl 是依照 2 进制位来表白含意的,高位的 3 位用来示意状态 runstate,低位的 29 位用来示意线程数 workerCount。
ctl 通过 ctlOf 函数(runState 的理论状态值左移 29 位,挪动到高 3 位后,和 workerCount 按位或操作)失去。
runStateOf 函数:ctl 和~CAPACITY 进行按位与,~CAPACITY 的二级制示意为:
1110 0000 0000 0000 0000 0000 0000 0000
按位与操作失去的就是 ctl 的高 3 位,对应的就是 runState。
workerCountOf 函数:ctl 和 CAPACITY 进行按位与,CAPACITY 的二进制示意为:
0001 1111 1111 1111 1111 1111 1111 1111
按位与失去的就是低 29 位,对应的就是 workerCount。
Keep-alive:如果以后线程数超过了 corePoolSize,那么超出的线程如果闲暇工夫超过了 keep-alive 会被回收(terminate)。外围线程是没有超时概念也不会被回收的,然而能够通过设置 allowCoreThreadTimeOut 为 true,使得外围线程也受到参数 Keep-alive 管制从而被回收。
工作队列
创立线程池的时候,通过 ThreadPoolExutor 构造方法指定工作队列,能够反对任何 BlockingQueue。通过 Executors 工具创立 ThreadPoolExutor 的话,反对 SynchronousQueue、LinkedBlockingQueue 和 ArrayBlockingQueue 三种阻塞队列。
工作队列是 ThreadPoolExutor 的重要参数,与 corePoolSize 和 MaxPoolSize 配合应用会创立出体现齐全不同的线程池:有界还是无界队列会影响到线程池接管工作的能力或体现,FIFO 还是 LIFO 会影响到工作执行程序,等等。
创立线程池
ThreadPoolExecutor 提供了 4 个构造方法,然而如果你不打算替换默认的 ThreadFactory 和 RejectedExecutionHandler 的话,最罕用的构造方法其实只有一个:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
调用到另外一个构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
从构造方法中咱们能够看到,ThreadPoolExecutor 创立后除了设置重要属性之外啥也没干,外围线程也并没有启动!
提交工作
默认状况下,ThreadPoolExecutor 提交工作的过程也同时是创立线程的过程,因为缺省状况下 ThreadPoolExecutor 创立的时候并不创立线程。
ThreadPoolExecutor 实现了 Executor 接口,Executor 通过其惟一办法 execute 来提交工作。
ThreadPoolExecutor 的 execute 办法接管一个 Runable 参数作为工作,依照如下逻辑实现工作的提交:
- 如果线程池的线程数量小于 corePoolSize,则通过 addWorker(command,true)创立并启动一个新线程来执行工作
- 否则,尝试将工作退出队列,如果胜利,再次查看线程池状态,如果线程池曾经进行运行则工作出队,回绝工作。再次查看如果以后线程数为 0 则调用 addWorker(null,false)创立新线程
- 否则,超出外围线程数且队列满,如果尚未超出最大线程数则通过 addWorker(command,false)创立新线程,否则超出最大线程数,回绝工作
源码比较简单,须要留神的是退出队列局部:
if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
调用阻塞队列的非阻塞办法 offer 入队,想想为啥不必阻塞办法?
线程启动机会
默认状况下 ThreadPoolExecutor 创立之后不会启动任何线程,包含外围线程。所有线程都是在工作提交后启动。
能够通过调用 prestartCoreThread()或 prestartAllCoreThreads()随时启动一个或所有外围线程。
启动线程 #addWorker
线程通过 addWorker(Runnable firstTask, boolean core)办法启动。firstTask 是该线程的第一个工作,firstTask=null 示意只启动线程、无工作。core 示意要启动的是外围线程、还是一般线程,用来判断线程数是否已达下限。
addWorker 办法首先做必要的合法性判断:以后线程池状态,线程数是否已达下限等,满足启动条件则更新以后线程数 WorkerCount。
而后创立线程对象 Worker,取得锁,加锁操作:Worker 退出 workers 缓存(worker 存储以后线程池的所有尚未执行工作的线程)…… 操作实现之后,解锁, 并 启动线程。
Worker 对象
Worker 是实现了 Runnable 接口的外部类,次要属性:
- thread:线程池的线程本尊
- firstTask:线程创立时绑定的工作,该线程如果是工作提交的时候创立的,firstTask 就是被提交的工作,如果线程创立胜利,则 firstTask 具备优先执行权
- completedTasks:以后线程实现执行的工作数
初始化办法通过线程工厂创立一个线程:
Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
this 作为 Runnable 参数传递给 Thread 的构造方法,线程启动的时候就回调 this 的 run 办法,所以 Worker 的 run 办法就是线程池中的线程执行工作的入口办法。
执行工作 Worker.run
run 办法调用了 runWorker 办法:
public void run() {runWorker(this);
}
持续跟踪 runWorker 办法:
final void runWorker(Worker w) {Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {// 无限执行 firstTask,firstTask 为空的话通过 getTask()从队列中获取 task
while (task != null || (task = getTask()) != null) {w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 调用工作执行前钩子函数
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行工作
task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
} finally {
// 调用工作执行后钩子函数
afterExecute(task, thrown);
}
} finally {
// 开释工作
task = null;
// 以后线程执行工作数更新 +1
w.completedTasks++;
w.unlock();}
}
completedAbruptly = false;
} finally {
// 线程池相干数据更新
processWorkerExit(w, completedAbruptly);
}
}
从源码能够看到,Worker 的 firstTask 会失去以后线程的优先执行,因为代码中获取并执行工作的循环条件中的 task 的初始值就是 Worker 的 firstTask:
while (task != null || (task = getTask()) != null) {
firstTask 执行实现之后,开释工作(task=null)。线程持续运行,下次循环时会通过 getTask 办法从队列获取工作。这个动作相当于:线程执行完一个工作之后并没有完结或销毁,而是交还给线程池,通过 getTask 持续从队列领工作,领到工作后继续执行。
getTask 办法
getTask 办法从队列获取排队期待执行的工作。
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?
for (;;) {int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果以后线程池已进行,或者处于 SHUTDOWN 状态且队列为空则返回 null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();
return null;
}
// 线程数
int wc = workerCountOf(c);
// Are workers subject to culling?
// 是否须要限定工夫
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果超过最大线程数且队列空,或期待超时且(线程数 >1 或者队列空),返回 null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果须要限时,则通过限时参数从阻塞队列获取工作,否则不须要限时的话,阻塞直到获取到工作
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 从队列拿到工作就返回该工作
if (r != null)
return r;
// 否则没有拿到工作,设置超时
timedOut = true;
} catch (InterruptedException retry) {timedOut = false;}
}
}
咱们发现线程池的 keepAliveTime 参数就是在这个 getTask 办法中失效的。
如果线程数小于外围线程数,并且 allowCoreThreadTimeOut 设置为 false 的话,线程不受期待工作时长的限度,则采纳阻塞队列的 take 办法、无限期期待直到能够从队列中获取工作。
如果线程数大于外围线程数,或者参数设置外围线程也须要受到超时管制,就会设置获取工作的限时时长为 keepAliveTime,如果在 keepAliveTime 工夫范畴内依然没有从阻塞队列中拿到工作,则返回 null。
超过 keepAliveTime 时长没拿到工作将导致在 runWorker 办法的 while 循环满足完结条件而退出循环:
while (task != null || (task = getTask()) != null) {// 获取工作...} finally {processWorkerExit(w, completedAbruptly);
}
退出循环后,调用 processWorkerExit 办法完结线程、退出线程池。
keepAliveTime=0L 的状况
如果设置 keepAliveTime=0L,并且线程数超出外围线程数,会是什么状况?
上面这一行代码交代的清清楚楚了:
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
调用 poll 办法、等待时间为 0 去阻塞队列获取工作,能够去看一下阻塞队列的限定期待时长的 poll 办法的源码,期待时长为 0 则成果等同于非阻塞办法:获取不到数据立即返回 null。
所以 keepAliveTime=0L 示意:超出外围线程数后,在执行完工作之后容许闲暇工夫为 0!即:如果没有新的工作提交上来的话,只保留 corePoolSize 个线程持续留在线程池期待工作,其余线程立刻销毁、退出线程池。
回绝工作
采纳有界阻塞队列的线程池,在队列已满、且超出最大线程数后提交上来的工作会被回绝,回绝后的解决形式由 ThreadPoolExecutor 的回绝策略 RejectedExecutionHandler 决定。回绝策略在 ThreadPoolExecutor 创立时指定。次要包含:
- AbortPolicy:间接抛异样 RejectedExecutionException
- CallerRunsPolicy:调用方解决,即交给提交 execute 办法的线程本人执行工作
- DiscardPolicy:间接扔掉该工作
- DiscardOldestPolicy:扔掉队列中等待时间最久的工作,执行当前任务
- 自定义:实现 RejectedExecutionHandler 接口,自定义回绝策略
钩子函数
工作执行前和工作执行后别离调用 beforeExecute/afterExecute 办法,这两个办法在 ThreadPoolExecutor 中默认都是哑实现,什么都没做。如果你既想要采纳 ThreadPoolExecutor 作为线程池、又想在工作执行前后做额定的动作,能够继承 ThreadPoolExecutor 并笼罩他的 beforeExecute/afterExecute 办法。
小结
线程池 ThreadPoolExecutor 源码剖析实现,其实从利用层面来讲,绝大部分线程池需要都能够通过 ThreadPoolExecutor 失去满足,而且能够利用 Executors 工具类创立满足各种不同场景的线程池。
Thanks a lot!
上一篇 BlockingQueue – ArrayBlockingQueue