简介: 《Java开发手册》中强调,线程资源必须通过线程池提供,而创立线程池必须应用ThreadPoolExecutor。手册次要强调利用线程池防止两个问题,一是线程过渡切换,二是防止申请过多时造成OOM。然而如果参数配置谬误,还是会引发下面的两个问题。所以本节咱们次要是探讨ThreadPoolExecutor的一些技术细节,并且给出几个罕用的最佳实际倡议。
作者 | 风楼
起源 | 阿里技术公众号
《Java开发手册》中强调,线程资源必须通过线程池提供,而创立线程池必须应用ThreadPoolExecutor。手册次要强调利用线程池防止两个问题,一是线程过渡切换,二是防止申请过多时造成OOM。然而如果参数配置谬误,还是会引发下面的两个问题。所以本节咱们次要是探讨ThreadPoolExecutor的一些技术细节,并且给出几个罕用的最佳实际倡议。
我在查找材料的过程中,发现有些问题存在争议。前面发现,一部分起因是因为不同JDK版本的事实是有差别的。因而,上面的剖析是基于当下最罕用的版本JDK1.8,并且对于存在争议的问题,咱们剖析源码,源码才是最精确的。
1 corePoolSize=0会怎么样
这是一个争议点。我发现大部分博文,不论是国内的还是国外的,都是这样答复这个问题的:
- 提交工作后,先判断以后池中线程数是否小于corePoolSize,如果小于,则创立新线程执行这个工作。
- 否者,判断期待队列是否已满,如果没有满,则增加到期待队列。
- 否者,判断以后池中线程数是否大于maximumPoolSize,如果大于则回绝。
- 否者,创立一个新的线程执行这个工作。
依照下面的形容,如果corePoolSize=0,则会判断期待队列的容量,如果还有容量,则排队,并且不会创立新的线程。
—— 但其实,这是老版本的实现形式,从1.6之后,实现形式就变了。咱们间接看execute的源码(submit也依赖它),我备注出了要害一行:
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
// 留神这一行代码,增加到期待队列胜利后,判断以后池内线程数是否为0,如果是则创立一个firstTask为null的worker,这个worker会从期待队列中获取工作并执行。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
- 线程池提交工作后,首先判断以后池中线程数是否小于corePoolSize。
- 如果小于则尝试创立新的线程执行该工作;否则尝试增加到期待队列。
- 如果增加队列胜利,判断以后池内线程数是否为0,如果是则创立一个firstTask为null的worker,这个worker会从期待队列中获取工作并执行。
- 如果增加到期待队列失败,个别是队列已满,才会再尝试创立新的线程。
- 但在创立之前须要与maximumPoolSize比拟,如果小于则创立胜利。
- 否则执行回绝策略。
答
上述问题需辨别JDK版本。在1.6版本之后,如果corePoolSize=0,提交工作时如果线程池为空,则会立刻创立一个线程来执行工作(先排队再获取);如果提交工作的时候,线程池不为空,则先在期待队列中排队,只有队列满了才会创立新线程。
所以,优化在于,在队列没有满的这段时间内,会有一个线程在生产提交的工作;1.6之前的实现是,必须等队列满了之后,才开始生产。
2 线程池创立之后,会立刻创立外围线程么
之前有人问过我这个问题,因为他发现利用中有些Bean创立了线程池,然而这个Bean个别状况下用不到,所以征询我是否须要把这个线程池正文掉,以缩小利用运行时的线程数(该利用运行时线程过多。)
答
不会。从下面的源码能够看出,在刚刚创立ThreadPoolExecutor的时候,线程并不会立刻启动,而是要等到有工作提交时才会启动,除非调用了prestartCoreThread/prestartAllCoreThreads当时启动外围线程。
- prestartCoreThread:Starts a core thread, causing it to idly wait for work. This overrides the default policy of starting core threads only when new tasks are executed.
- prestartAllCoreThreads:Starts all core threads.
3 外围线程永远不会销毁么
这个问题有点tricky。首先咱们要明确一下概念,尽管在JavaDoc中也应用了“core/non-core threads”这样的形容,但其实这是一个动静的概念,JDK并没有给一部分线程打上“core”的标记,做什么特殊化的解决。这个问题我认为想要探讨的是闲置线程终结策略的问题。
在JDK1.6之前,线程池会尽量放弃corePoolSize个外围线程,即便这些线程闲置了很长时间。这一点曾被开发者诟病,所以从JDK1.6开始,提供了办法allowsCoreThreadTimeOut,如果传参为true,则容许闲置的外围线程被终止。
请留神这种策略和corePoolSize=0的区别。我总结的区别是:
- corePoolSize=0:在个别状况下只应用一个线程生产工作,只有当并发申请特地多、期待队列都满了之后,才开始用多线程。
- allowsCoreThreadTimeOut=true && corePoolSize>1:在个别状况下就开始应用多线程(corePoolSize个),当并发申请特地多,期待队列都满了之后,持续加大线程数。然而当申请没有的时候,容许外围线程也终止。
所以corePoolSize=0的成果,根本等同于allowsCoreThreadTimeOut=true && corePoolSize=1,但实现细节其实不同。
答
在JDK1.6之后,如果allowsCoreThreadTimeOut=true,外围线程也能够被终止。
4 如何保障线程不被销毁
首先咱们要明确一下线程池模型。线程池有个外部类Worker,它实现了Runnable接口,首先,它本人要run起来。而后它会在适合的时候获取咱们提交的Runnable工作,而后调用工作的run()接口。一个Worker不终止的话能够一直执行工作。
咱们后面说的“线程池中的线程”,其实就是Worker;期待队列中的元素,是咱们提交的Runnable工作。
每一个Worker在创立进去的时候,会调用它自身的run()办法,实现是runWorker(this),这个实现的外围是一个while循环,这个循环不完结,Worker线程就不会终止,就是这个根本逻辑。
- 在这个while条件中,有个getTask()办法是外围中的外围,它所做的事件就是从期待队列中取出工作来执行:
- 如果没有达到corePoolSize,则创立的Worker在执行完它承接的工作后,会用workQueue.take()取工作、留神,这个接口是阻塞接口,如果取不到工作,Worker线程始终阻塞。
- 如果超过了corePoolSize,或者allowCoreThreadTimeOut,一个Worker在闲暇了之后,会用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)取工作。留神,这个接口只阻塞期待keepAliveTime工夫,超过这个工夫返回null,则Worker的while循环执行完结,则被终止了。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 看这里,外围逻辑在这里
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 留神,外围中的外围在这里
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
答
实现形式十分奇妙,外围线程(Worker)即便始终闲暇也不终止,是通过workQueue.take()实现的,它会始终阻塞到从期待队列中取到新的工作。非核心线程闲暇指定工夫后终止是通过workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)实现的,一个闲暇的Worker只期待keepAliveTime,如果还没有取到工作则循环终止,线程也就运行完结了。
引申思考
Worker自身就是个线程,它再调用咱们传入的Runnable.run(),会启动一个子线程么?如果你还没有答案,再回忆一下Runnable和Thread的关系。
5 闲暇线程过多会有什么问题
抽象地答复是会占用内存,咱们剖析一下占用了哪些内存。首先,比拟一般的一部分,一个线程的内存模型:
- 虚拟机栈
- 本地办法栈
- 程序计数器
我想额定强调是上面这几个内存占用,须要小心:
- ThreadLocal:业务代码是否应用了ThreadLocal?就算没有,Spring框架中也大量应用了ThreadLocal,你所在公司的框架可能也是一样。
- 局部变量:线程处于阻塞状态,必定还有栈帧没有出栈,栈帧中有局部变量表,但凡被局部变量表援用的内存都不能回收。所以如果这个线程创立了比拟大的局部变量,那么这一部分内存无奈GC。
- TLAB机制:如果你的利用线程数处于高位,那么新的线程初始化可能因为Eden没有足够的空间调配TLAB而触发YoungGC。
答
线程池放弃闲暇的外围线程是它的默认配置,一般来讲是没有问题的,因为它占用的内存个别不大。怕的就是业务代码中应用ThreadLocal缓存的数据过大又不清理。
如果你的利用线程数处于高位,那么须要察看一下YoungGC的状况,估算一下Eden大小是否足够。如果不够的话,可能要审慎地创立新线程,并且让闲暇的线程终止;必要的时候,可能须要对JVM进行调参。
6 keepAliveTime=0会怎么样
这也是个争议点。有的博文说等于0示意闲暇线程永远不会终止,有的说示意执行完立即终止。还有的说等于-1示意闲暇线程永远不会终止。其实略微看一下源码晓得了,这里我间接抛出答案。
答
在JDK1.8中,keepAliveTime=0示意非核心线程执行完立即终止。
默认状况下,keepAliveTime小于0,初始化的时候才会报错;但如果allowsCoreThreadTimeOut,keepAliveTime必须大于0,不然初始化报错。
7 怎么进行异样解决
很多代码的写法,咱们都习惯依照常见范式去编写,而没有去思考为什么。比方:
- 如果咱们应用execute()提交工作,咱们个别要在Runable工作的代码加上try-catch进行异样解决。
- 如果咱们应用submit()提交工作,咱们个别要在主线程中,对Future.get()进行try-catch进行异样解决。
—— 然而在下面,我提到过,submit()底层实现依赖execute(),两者应该对立呀,为什么有差别呢?上面再扒一扒submit()的源码,它的实现蛮有意思。
首先,ThreadPoolExecutor中没有submit的代码,而是在它的父类AbstractExecutorService中,有三个submit的重载办法,代码非常简单,要害代码就两行:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
正是因为这三个重载办法,都调用了execute,所以我才说submit底层依赖execute。通过查看这里execute的实现,咱们不难发现,它就是ThreadPoolExecutor中的实现,所以,造成submit和execute的差异化的代码,不在这。那么造成差别的肯定在newTaskFor办法中。这个办法也就new了一个FutureTask而已,FutureTask实现RunnableFuture接口,RunnableFuture接口继承Runnable接口和Future接口。而Callable只是FutureTask的一个成员变量。
所以讲到这里,就有另一个Java根底知识点:Callable和Future的关系。咱们个别用Callable编写工作代码,Future是异步返回对象,通过它的get办法,阻塞式地获取后果。FutureTask的外围代码就是实现了Future接口,也就是get办法的实现:
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
// 外围代码
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
// 死循环
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 只有工作的状态是’已实现‘,才会跳出死循环
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
get的外围实现是有个awaitDone办法,这是一个死循环,只有工作的状态是“已实现”,才会跳出死循环;否则会依赖UNSAFE包下的LockSupport.park原语进行阻塞,期待LockSupport.unpark信号量。而这个信号量只有当运行完结取得后果、或者出现异常的状况下,才会收回来。别离对应办法set和setException。这就是异步执行、阻塞获取的原理,扯得有点远了。
回到最后咱们的疑难,为什么submit之后,通过get办法能够获取到异样?起因是FutureTask有一个Object类型的outcome成员变量,用来记录执行后果。这个后果能够是传入的泛型,也能够是Throwable异样:
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
// get办法中依赖的,报告执行后果
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
FutureTask的另一个奇妙的中央就是借用RunnableAdapter外部类,将submit的Runnable封装成Callable。所以就算你submit的是Runnable,一样能够用get获取到异样。
答
- 不论是用execute还是submit,都能够本人在业务代码上加try-catch进行异样解决。我个别喜爱应用这种形式,因为我喜爱对不同业务场景的异样进行差异化解决,至多打不一样的日志吧。
- 如果是execute,还能够自定义线程池,继承ThreadPoolExecutor并复写其afterExecute(Runnable r, Throwable t)办法。
- 或者实现Thread.UncaughtExceptionHandler接口,实现void uncaughtException(Thread t, Throwable e);办法,并将该handler传递给线程池的ThreadFactory。
- 然而留神,afterExecute和UncaughtExceptionHandler都不实用submit。因为通过下面的FutureTask.run()不难发现,它本人对Throwable进行了try-catch,封装到了outcome属性,所以底层办法execute的Worker是拿不到异样信息的。
8 线程池需不需要敞开
答
一般来讲,线程池的生命周期追随服务的生命周期。如果一个服务(Service)进行服务了,那么须要调用shutdown办法进行敞开。所以ExecutorService.shutdown在Java以及一些中间件的源码中,是封装在Service的shutdown办法内的。
如果是Server端不重启就不进行提供服务,我认为是不须要非凡解决的。
9 shutdown和shutdownNow的区别
答
shutdown => 平缓敞开,期待所有已增加到线程池中的工作执行完再敞开。
shutdownNow => 立即敞开,进行正在执行的工作,并返回队列中未执行的工作。
原本想剖析一下两者的源码的,然而发现本文的篇幅曾经过长了,源码也贴了不少。感兴趣的敌人本人看一下即可。
10 Spring中有哪些和ThreadPoolExecutor相似的工具
答
这里我想着重强调的就是SimpleAsyncTaskExecutor,Spring中应用的@Async注解,底层就是基于SimpleAsyncTaskExecutor去执行工作,只不过它不是线程池,而是每次都新开一个线程。
另外想要强调的是Executor接口。Java初学者容易想当然的认为Executor结尾的类就是一个线程池,而下面的都是反例。咱们能够在JDK的execute办法上看到这个正文:
/**
- Executes the given command at some time in the future. The command
- may execute in a new thread, in a pooled thread, or in the calling
- thread, at the discretion of the {@code Executor} implementation.
*/
所以,它的职责并不是提供一个线程池的接口,而是提供一个“未来执行命令”的接口。真正能代表线程池意义的,是ThreadPoolExecutor类,而不是Executor接口。
最佳实际总结
【强制】应用ThreadPoolExecutor的构造函数申明线程池,防止应用Executors类的 newFixedThreadPool和newCachedThreadPool。
【强制】 创立线程或线程池时请指定有意义的线程名称,不便出错时回溯。即threadFactory参数要结构好。
【倡议】倡议不同类别的业务用不同的线程池。
【倡议】CPU密集型工作(N+1):这种工作耗费的次要是CPU资源,能够将线程数设置为N(CPU外围数)+1,比CPU外围数多进去的一个线程是为了避免线程偶发的缺页中断,或者其它起因导致的工作暂停而带来的影响。一旦工作暂停,CPU就会处于闲暇状态,而在这种状况下多进去的一个线程就能够充分利用CPU的闲暇工夫。
【倡议】I/O密集型工作(2N):这种工作利用起来,零碎会用大部分的工夫来解决I/O交互,而线程在解决I/O的时间段内不会占用CPU来解决,这时就能够将CPU交出给其它线程应用。因而在I/O密集型工作的利用中,咱们能够多配置一些线程,具体的计算方法是2N。
【倡议】workQueue不要应用无界队列,尽量应用有界队列。防止大量工作期待,造成OOM。
【倡议】如果是资源缓和的利用,应用allowsCoreThreadTimeOut能够进步资源利用率。
【倡议】尽管应用线程池有多种异样解决的形式,但在工作代码中,应用try-catch最通用,也能给不同工作的异样解决做精细化。
【倡议】对于资源缓和的利用,如果放心线程池资源使用不当,能够利用ThreadPoolExecutor的API实现简略的监控,而后进行剖析和优化。
线程池初始化示例:
private static final ThreadPoolExecutor pool;
static {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("po-detail-pool-%d").build();
pool = new ThreadPoolExecutor(4, 8, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(512),
threadFactory, new ThreadPoolExecutor.AbortPolicy());
pool.allowCoreThreadTimeOut(true);
}
- threadFactory:给出带业务语义的线程命名。
- corePoolSize:疾速启动4个线程解决该业务,是足够的。
- maximumPoolSize:IO密集型业务,我的服务器是4C8G的,所以4*2=8。
- keepAliveTime:服务器资源缓和,让闲暇的线程疾速开释。
- pool.allowCoreThreadTimeOut(true):也是为了在能够的时候,让线程开释,开释资源。
- workQueue:一个工作的执行时长在100~300ms,业务高峰期8个线程,依照10s超时(曾经很高了)。10s钟,8个线程,能够解决10 1000ms / 200ms 8 = 400个工作左右,往上再取一点,512曾经很多了。
- handler:极其状况下,一些工作只能抛弃,爱护服务端。
原文链接
本文为阿里云原创内容,未经容许不得转载。
发表回复