简介: 《Java开发手册》中强调,线程资源必须通过线程池提供,而创立线程池必须应用ThreadPoolExecutor。手册次要强调利用线程池防止两个问题,一是线程过渡切换,二是防止申请过多时造成OOM。然而如果参数配置谬误,还是会引发下面的两个问题。所以本节咱们次要是探讨ThreadPoolExecutor的一些技术细节,并且给出几个罕用的最佳实际倡议。
作者 | 风楼
起源 | 阿里技术公众号
《Java开发手册》中强调,线程资源必须通过线程池提供,而创立线程池必须应用ThreadPoolExecutor。手册次要强调利用线程池防止两个问题,一是线程过渡切换,二是防止申请过多时造成OOM。然而如果参数配置谬误,还是会引发下面的两个问题。所以本节咱们次要是探讨ThreadPoolExecutor的一些技术细节,并且给出几个罕用的最佳实际倡议。
我在查找材料的过程中,发现有些问题存在争议。前面发现,一部分起因是因为不同JDK版本的事实是有差别的。因而,上面的剖析是基于当下最罕用的版本JDK1.8,并且对于存在争议的问题,咱们剖析源码,源码才是最精确的。
1 corePoolSize=0会怎么样
这是一个争议点。我发现大部分博文,不论是国内的还是国外的,都是这样答复这个问题的:
- 提交工作后,先判断以后池中线程数是否小于corePoolSize,如果小于,则创立新线程执行这个工作。
- 否者,判断期待队列是否已满,如果没有满,则增加到期待队列。
- 否者,判断以后池中线程数是否大于maximumPoolSize,如果大于则回绝。
- 否者,创立一个新的线程执行这个工作。
依照下面的形容,如果corePoolSize=0,则会判断期待队列的容量,如果还有容量,则排队,并且不会创立新的线程。
—— 但其实,这是老版本的实现形式,从1.6之后,实现形式就变了。咱们间接看execute的源码(submit也依赖它),我备注出了要害一行:
int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); // 留神这一行代码,增加到期待队列胜利后,判断以后池内线程数是否为0,如果是则创立一个firstTask为null的worker,这个worker会从期待队列中获取工作并执行。 else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command);
- 线程池提交工作后,首先判断以后池中线程数是否小于corePoolSize。
- 如果小于则尝试创立新的线程执行该工作;否则尝试增加到期待队列。
- 如果增加队列胜利,判断以后池内线程数是否为0,如果是则创立一个firstTask为null的worker,这个worker会从期待队列中获取工作并执行。
- 如果增加到期待队列失败,个别是队列已满,才会再尝试创立新的线程。
- 但在创立之前须要与maximumPoolSize比拟,如果小于则创立胜利。
- 否则执行回绝策略。
答
上述问题需辨别JDK版本。在1.6版本之后,如果corePoolSize=0,提交工作时如果线程池为空,则会立刻创立一个线程来执行工作(先排队再获取);如果提交工作的时候,线程池不为空,则先在期待队列中排队,只有队列满了才会创立新线程。
所以,优化在于,在队列没有满的这段时间内,会有一个线程在生产提交的工作;1.6之前的实现是,必须等队列满了之后,才开始生产。
2 线程池创立之后,会立刻创立外围线程么
之前有人问过我这个问题,因为他发现利用中有些Bean创立了线程池,然而这个Bean个别状况下用不到,所以征询我是否须要把这个线程池正文掉,以缩小利用运行时的线程数(该利用运行时线程过多。)
答
不会。从下面的源码能够看出,在刚刚创立ThreadPoolExecutor的时候,线程并不会立刻启动,而是要等到有工作提交时才会启动,除非调用了prestartCoreThread/prestartAllCoreThreads当时启动外围线程。
- prestartCoreThread:Starts a core thread, causing it to idly wait for work. This overrides the default policy of starting core threads only when new tasks are executed.
- prestartAllCoreThreads:Starts all core threads.
3 外围线程永远不会销毁么
这个问题有点tricky。首先咱们要明确一下概念,尽管在JavaDoc中也应用了“core/non-core threads”这样的形容,但其实这是一个动静的概念,JDK并没有给一部分线程打上“core”的标记,做什么特殊化的解决。这个问题我认为想要探讨的是闲置线程终结策略的问题。
在JDK1.6之前,线程池会尽量放弃corePoolSize个外围线程,即便这些线程闲置了很长时间。这一点曾被开发者诟病,所以从JDK1.6开始,提供了办法allowsCoreThreadTimeOut,如果传参为true,则容许闲置的外围线程被终止。
请留神这种策略和corePoolSize=0的区别。我总结的区别是:
- corePoolSize=0:在个别状况下只应用一个线程生产工作,只有当并发申请特地多、期待队列都满了之后,才开始用多线程。
- allowsCoreThreadTimeOut=true && corePoolSize>1:在个别状况下就开始应用多线程(corePoolSize个),当并发申请特地多,期待队列都满了之后,持续加大线程数。然而当申请没有的时候,容许外围线程也终止。
所以corePoolSize=0的成果,根本等同于allowsCoreThreadTimeOut=true && corePoolSize=1,但实现细节其实不同。
答
在JDK1.6之后,如果allowsCoreThreadTimeOut=true,外围线程也能够被终止。
4 如何保障线程不被销毁
首先咱们要明确一下线程池模型。线程池有个外部类Worker,它实现了Runnable接口,首先,它本人要run起来。而后它会在适合的时候获取咱们提交的Runnable工作,而后调用工作的run()接口。一个Worker不终止的话能够一直执行工作。
咱们后面说的“线程池中的线程”,其实就是Worker;期待队列中的元素,是咱们提交的Runnable工作。
每一个Worker在创立进去的时候,会调用它自身的run()办法,实现是runWorker(this),这个实现的外围是一个while循环,这个循环不完结,Worker线程就不会终止,就是这个根本逻辑。
- 在这个while条件中,有个getTask()办法是外围中的外围,它所做的事件就是从期待队列中取出工作来执行:
- 如果没有达到corePoolSize,则创立的Worker在执行完它承接的工作后,会用workQueue.take()取工作、留神,这个接口是阻塞接口,如果取不到工作,Worker线程始终阻塞。
- 如果超过了corePoolSize,或者allowCoreThreadTimeOut,一个Worker在闲暇了之后,会用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)取工作。留神,这个接口只阻塞期待keepAliveTime工夫,超过这个工夫返回null,则Worker的while循环执行完结,则被终止了。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 看这里,外围逻辑在这里 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 留神,外围中的外围在这里 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
答
实现形式十分奇妙,外围线程(Worker)即便始终闲暇也不终止,是通过workQueue.take()实现的,它会始终阻塞到从期待队列中取到新的工作。非核心线程闲暇指定工夫后终止是通过workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)实现的,一个闲暇的Worker只期待keepAliveTime,如果还没有取到工作则循环终止,线程也就运行完结了。
引申思考
Worker自身就是个线程,它再调用咱们传入的Runnable.run(),会启动一个子线程么?如果你还没有答案,再回忆一下Runnable和Thread的关系。
5 闲暇线程过多会有什么问题
抽象地答复是会占用内存,咱们剖析一下占用了哪些内存。首先,比拟一般的一部分,一个线程的内存模型:
- 虚拟机栈
- 本地办法栈
- 程序计数器
我想额定强调是上面这几个内存占用,须要小心:
- ThreadLocal:业务代码是否应用了ThreadLocal?就算没有,Spring框架中也大量应用了ThreadLocal,你所在公司的框架可能也是一样。
- 局部变量:线程处于阻塞状态,必定还有栈帧没有出栈,栈帧中有局部变量表,但凡被局部变量表援用的内存都不能回收。所以如果这个线程创立了比拟大的局部变量,那么这一部分内存无奈GC。
- TLAB机制:如果你的利用线程数处于高位,那么新的线程初始化可能因为Eden没有足够的空间调配TLAB而触发YoungGC。
答
线程池放弃闲暇的外围线程是它的默认配置,一般来讲是没有问题的,因为它占用的内存个别不大。怕的就是业务代码中应用ThreadLocal缓存的数据过大又不清理。
如果你的利用线程数处于高位,那么须要察看一下YoungGC的状况,估算一下Eden大小是否足够。如果不够的话,可能要审慎地创立新线程,并且让闲暇的线程终止;必要的时候,可能须要对JVM进行调参。
6 keepAliveTime=0会怎么样
这也是个争议点。有的博文说等于0示意闲暇线程永远不会终止,有的说示意执行完立即终止。还有的说等于-1示意闲暇线程永远不会终止。其实略微看一下源码晓得了,这里我间接抛出答案。
答
在JDK1.8中,keepAliveTime=0示意非核心线程执行完立即终止。
默认状况下,keepAliveTime小于0,初始化的时候才会报错;但如果allowsCoreThreadTimeOut,keepAliveTime必须大于0,不然初始化报错。
7 怎么进行异样解决
很多代码的写法,咱们都习惯依照常见范式去编写,而没有去思考为什么。比方:
- 如果咱们应用execute()提交工作,咱们个别要在Runable工作的代码加上try-catch进行异样解决。
- 如果咱们应用submit()提交工作,咱们个别要在主线程中,对Future.get()进行try-catch进行异样解决。
—— 然而在下面,我提到过,submit()底层实现依赖execute(),两者应该对立呀,为什么有差别呢?上面再扒一扒submit()的源码,它的实现蛮有意思。
首先,ThreadPoolExecutor中没有submit的代码,而是在它的父类AbstractExecutorService中,有三个submit的重载办法,代码非常简单,要害代码就两行:
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask;}public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask;}public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask;}
正是因为这三个重载办法,都调用了execute,所以我才说submit底层依赖execute。通过查看这里execute的实现,咱们不难发现,它就是ThreadPoolExecutor中的实现,所以,造成submit和execute的差异化的代码,不在这。那么造成差别的肯定在newTaskFor办法中。这个办法也就new了一个FutureTask而已,FutureTask实现RunnableFuture接口,RunnableFuture接口继承Runnable接口和Future接口。而Callable只是FutureTask的一个成员变量。
所以讲到这里,就有另一个Java根底知识点:Callable和Future的关系。咱们个别用Callable编写工作代码,Future是异步返回对象,通过它的get办法,阻塞式地获取后果。FutureTask的外围代码就是实现了Future接口,也就是get办法的实现:
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) // 外围代码 s = awaitDone(false, 0L); return report(s);}private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; // 死循环 for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // 只有工作的状态是’已实现‘,才会跳出死循环 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); }}
get的外围实现是有个awaitDone办法,这是一个死循环,只有工作的状态是“已实现”,才会跳出死循环;否则会依赖UNSAFE包下的LockSupport.park原语进行阻塞,期待LockSupport.unpark信号量。而这个信号量只有当运行完结取得后果、或者出现异常的状况下,才会收回来。别离对应办法set和setException。这就是异步执行、阻塞获取的原理,扯得有点远了。
回到最后咱们的疑难,为什么submit之后,通过get办法能够获取到异样?起因是FutureTask有一个Object类型的outcome成员变量,用来记录执行后果。这个后果能够是传入的泛型,也能够是Throwable异样:
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}
// get办法中依赖的,报告执行后果
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x);}
FutureTask的另一个奇妙的中央就是借用RunnableAdapter外部类,将submit的Runnable封装成Callable。所以就算你submit的是Runnable,一样能够用get获取到异样。
答
- 不论是用execute还是submit,都能够本人在业务代码上加try-catch进行异样解决。我个别喜爱应用这种形式,因为我喜爱对不同业务场景的异样进行差异化解决,至多打不一样的日志吧。
- 如果是execute,还能够自定义线程池,继承ThreadPoolExecutor并复写其afterExecute(Runnable r, Throwable t)办法。
- 或者实现Thread.UncaughtExceptionHandler接口,实现void uncaughtException(Thread t, Throwable e);办法,并将该handler传递给线程池的ThreadFactory。
- 然而留神,afterExecute和UncaughtExceptionHandler都不实用submit。因为通过下面的FutureTask.run()不难发现,它本人对Throwable进行了try-catch,封装到了outcome属性,所以底层办法execute的Worker是拿不到异样信息的。
8 线程池需不需要敞开
答
一般来讲,线程池的生命周期追随服务的生命周期。如果一个服务(Service)进行服务了,那么须要调用shutdown办法进行敞开。所以ExecutorService.shutdown在Java以及一些中间件的源码中,是封装在Service的shutdown办法内的。
如果是Server端不重启就不进行提供服务,我认为是不须要非凡解决的。
9 shutdown和shutdownNow的区别
答
shutdown => 平缓敞开,期待所有已增加到线程池中的工作执行完再敞开。
shutdownNow => 立即敞开,进行正在执行的工作,并返回队列中未执行的工作。
原本想剖析一下两者的源码的,然而发现本文的篇幅曾经过长了,源码也贴了不少。感兴趣的敌人本人看一下即可。
10 Spring中有哪些和ThreadPoolExecutor相似的工具
答
这里我想着重强调的就是SimpleAsyncTaskExecutor,Spring中应用的@Async注解,底层就是基于SimpleAsyncTaskExecutor去执行工作,只不过它不是线程池,而是每次都新开一个线程。
另外想要强调的是Executor接口。Java初学者容易想当然的认为Executor结尾的类就是一个线程池,而下面的都是反例。咱们能够在JDK的execute办法上看到这个正文:
/**
- Executes the given command at some time in the future. The command
- may execute in a new thread, in a pooled thread, or in the calling
- thread, at the discretion of the {@code Executor} implementation.
*/
所以,它的职责并不是提供一个线程池的接口,而是提供一个“未来执行命令”的接口。真正能代表线程池意义的,是ThreadPoolExecutor类,而不是Executor接口。
最佳实际总结
【强制】应用ThreadPoolExecutor的构造函数申明线程池,防止应用Executors类的 newFixedThreadPool和newCachedThreadPool。
【强制】 创立线程或线程池时请指定有意义的线程名称,不便出错时回溯。即threadFactory参数要结构好。
【倡议】倡议不同类别的业务用不同的线程池。
【倡议】CPU密集型工作(N+1):这种工作耗费的次要是CPU资源,能够将线程数设置为N(CPU外围数)+1,比CPU外围数多进去的一个线程是为了避免线程偶发的缺页中断,或者其它起因导致的工作暂停而带来的影响。一旦工作暂停,CPU就会处于闲暇状态,而在这种状况下多进去的一个线程就能够充分利用CPU的闲暇工夫。
【倡议】I/O密集型工作(2N):这种工作利用起来,零碎会用大部分的工夫来解决I/O交互,而线程在解决I/O的时间段内不会占用CPU来解决,这时就能够将CPU交出给其它线程应用。因而在I/O密集型工作的利用中,咱们能够多配置一些线程,具体的计算方法是2N。
【倡议】workQueue不要应用无界队列,尽量应用有界队列。防止大量工作期待,造成OOM。
【倡议】如果是资源缓和的利用,应用allowsCoreThreadTimeOut能够进步资源利用率。
【倡议】尽管应用线程池有多种异样解决的形式,但在工作代码中,应用try-catch最通用,也能给不同工作的异样解决做精细化。
【倡议】对于资源缓和的利用,如果放心线程池资源使用不当,能够利用ThreadPoolExecutor的API实现简略的监控,而后进行剖析和优化。
线程池初始化示例:
private static final ThreadPoolExecutor pool;static { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("po-detail-pool-%d").build(); pool = new ThreadPoolExecutor(4, 8, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(512), threadFactory, new ThreadPoolExecutor.AbortPolicy()); pool.allowCoreThreadTimeOut(true);}
- threadFactory:给出带业务语义的线程命名。
- corePoolSize:疾速启动4个线程解决该业务,是足够的。
- maximumPoolSize:IO密集型业务,我的服务器是4C8G的,所以4*2=8。
- keepAliveTime:服务器资源缓和,让闲暇的线程疾速开释。
- pool.allowCoreThreadTimeOut(true):也是为了在能够的时候,让线程开释,开释资源。
- workQueue:一个工作的执行时长在100~300ms,业务高峰期8个线程,依照10s超时(曾经很高了)。10s钟,8个线程,能够解决10 1000ms / 200ms 8 = 400个工作左右,往上再取一点,512曾经很多了。
- handler:极其状况下,一些工作只能抛弃,爱护服务端。
原文链接
本文为阿里云原创内容,未经容许不得转载。