共计 11604 个字符,预计需要花费 30 分钟才能阅读完成。
简介:《Java 开发手册》中强调,线程资源必须通过线程池提供,而创立线程池必须应用 ThreadPoolExecutor。手册次要强调利用线程池防止两个问题,一是线程过渡切换,二是防止申请过多时造成 OOM。然而如果参数配置谬误,还是会引发下面的两个问题。所以本节咱们次要是探讨 ThreadPoolExecutor 的一些技术细节,并且给出几个罕用的最佳实际倡议。
作者 | 风楼
起源 | 阿里技术公众号
《Java 开发手册》中强调,线程资源必须通过线程池提供,而创立线程池必须应用 ThreadPoolExecutor。手册次要强调利用线程池防止两个问题,一是线程过渡切换,二是防止申请过多时造成 OOM。然而如果参数配置谬误,还是会引发下面的两个问题。所以本节咱们次要是探讨 ThreadPoolExecutor 的一些技术细节,并且给出几个罕用的最佳实际倡议。
我在查找材料的过程中,发现有些问题存在争议。前面发现,一部分起因是因为不同 JDK 版本的事实是有差别的。因而,上面的剖析是基于当下最罕用的版本 JDK1.8,并且对于存在争议的问题,咱们剖析源码,源码才是最精确的。
1 corePoolSize= 0 会怎么样
这是一个争议点。我发现大部分博文,不论是国内的还是国外的,都是这样答复这个问题的:
- 提交工作后,先判断以后池中线程数是否小于 corePoolSize,如果小于,则创立新线程执行这个工作。
- 否者,判断期待队列是否已满,如果没有满,则增加到期待队列。
- 否者,判断以后池中线程数是否大于 maximumPoolSize,如果大于则回绝。
- 否者,创立一个新的线程执行这个工作。
依照下面的形容,如果 corePoolSize=0,则会判断期待队列的容量,如果还有容量,则排队,并且不会创立新的线程。
—— 但其实,这是老版本的实现形式,从 1.6 之后,实现形式就变了。咱们间接看 execute 的源码(submit 也依赖它),我备注出了要害一行:
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))
return;
c = ctl.get();}
if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
// 留神这一行代码,增加到期待队列胜利后,判断以后池内线程数是否为 0,如果是则创立一个 firstTask 为 null 的 worker,这个 worker 会从期待队列中获取工作并执行。else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
- 线程池提交工作后,首先判断以后池中线程数是否小于 corePoolSize。
- 如果小于则尝试创立新的线程执行该工作;否则尝试增加到期待队列。
- 如果增加队列胜利,判断以后池内线程数是否为 0,如果是则创立一个 firstTask 为 null 的 worker,这个 worker 会从期待队列中获取工作并执行。
- 如果增加到期待队列失败,个别是队列已满,才会再尝试创立新的线程。
- 但在创立之前须要与 maximumPoolSize 比拟,如果小于则创立胜利。
- 否则执行回绝策略。
答
上述问题需辨别 JDK 版本。在 1.6 版本之后,如果 corePoolSize=0,提交工作时如果线程池为空,则会立刻创立一个线程来执行工作(先排队再获取);如果提交工作的时候,线程池不为空,则先在期待队列中排队,只有队列满了才会创立新线程。
所以,优化在于,在队列没有满的这段时间内,会有一个线程在生产提交的工作;1.6 之前的实现是,必须等队列满了之后,才开始生产。
2 线程池创立之后,会立刻创立外围线程么
之前有人问过我这个问题,因为他发现利用中有些 Bean 创立了线程池,然而这个 Bean 个别状况下用不到,所以征询我是否须要把这个线程池正文掉,以缩小利用运行时的线程数(该利用运行时线程过多。)
答
不会。从下面的源码能够看出,在刚刚创立 ThreadPoolExecutor 的时候,线程并不会立刻启动,而是要等到有工作提交时才会启动,除非调用了 prestartCoreThread/prestartAllCoreThreads 当时启动外围线程。
- prestartCoreThread:Starts a core thread, causing it to idly wait for work. This overrides the default policy of starting core threads only when new tasks are executed.
- prestartAllCoreThreads:Starts all core threads.
3 外围线程永远不会销毁么
这个问题有点 tricky。首先咱们要明确一下概念,尽管在 JavaDoc 中也应用了“core/non-core threads”这样的形容,但其实这是一个动静的概念,JDK 并没有给一部分线程打上“core”的标记,做什么特殊化的解决。这个问题我认为想要探讨的是闲置线程终结策略的问题。
在 JDK1.6 之前,线程池会尽量放弃 corePoolSize 个外围线程,即便这些线程闲置了很长时间。这一点曾被开发者诟病,所以从 JDK1.6 开始,提供了办法 allowsCoreThreadTimeOut,如果传参为 true,则容许闲置的外围线程被终止。
请留神这种策略和 corePoolSize= 0 的区别。我总结的区别是:
- corePoolSize=0:在个别状况下只应用一个线程生产工作,只有当并发申请特地多、期待队列都满了之后,才开始用多线程。
- allowsCoreThreadTimeOut=true && corePoolSize>1:在个别状况下就开始应用多线程(corePoolSize 个),当并发申请特地多,期待队列都满了之后,持续加大线程数。然而当申请没有的时候,容许外围线程也终止。
所以 corePoolSize= 0 的成果,根本等同于 allowsCoreThreadTimeOut=true && corePoolSize=1,但实现细节其实不同。
答
在 JDK1.6 之后,如果 allowsCoreThreadTimeOut=true,外围线程也能够被终止。
4 如何保障线程不被销毁
首先咱们要明确一下线程池模型。线程池有个外部类 Worker,它实现了 Runnable 接口,首先,它本人要 run 起来。而后它会在适合的时候获取咱们提交的 Runnable 工作,而后调用工作的 run()接口。一个 Worker 不终止的话能够一直执行工作。
咱们后面说的“线程池中的线程”,其实就是 Worker;期待队列中的元素,是咱们提交的 Runnable 工作。
每一个 Worker 在创立进去的时候,会调用它自身的 run()办法,实现是 runWorker(this),这个实现的外围是一个 while 循环,这个循环不完结,Worker 线程就不会终止,就是这个根本逻辑。
- 在这个 while 条件中,有个 getTask()办法是外围中的外围,它所做的事件就是从期待队列中取出工作来执行:
- 如果没有达到 corePoolSize,则创立的 Worker 在执行完它承接的工作后,会用 workQueue.take()取工作、留神,这个接口是阻塞接口,如果取不到工作,Worker 线程始终阻塞。
- 如果超过了 corePoolSize,或者 allowCoreThreadTimeOut,一个 Worker 在闲暇了之后,会用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)取工作。留神,这个接口只阻塞期待 keepAliveTime 工夫,超过这个工夫返回 null,则 Worker 的 while 循环执行完结,则被终止了。
final void runWorker(Worker w) {Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 看这里,外围逻辑在这里
while (task != null || (task = getTask()) != null) {w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {beforeExecute(wt, task);
Throwable thrown = null;
try {task.run();
} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
} finally {afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();}
}
completedAbruptly = false;
} finally {processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?
for (;;) {int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 留神,外围中的外围在这里
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {timedOut = false;}
}
}
答
实现形式十分奇妙,外围线程(Worker)即便始终闲暇也不终止,是通过 workQueue.take()实现的,它会始终阻塞到从期待队列中取到新的工作。非核心线程闲暇指定工夫后终止是通过 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)实现的,一个闲暇的 Worker 只期待 keepAliveTime,如果还没有取到工作则循环终止,线程也就运行完结了。
引申思考
Worker 自身就是个线程,它再调用咱们传入的 Runnable.run(),会启动一个子线程么?如果你还没有答案,再回忆一下 Runnable 和 Thread 的关系。
5 闲暇线程过多会有什么问题
抽象地答复是会占用内存,咱们剖析一下占用了哪些内存。首先,比拟一般的一部分,一个线程的内存模型:
- 虚拟机栈
- 本地办法栈
- 程序计数器
我想额定强调是上面这几个内存占用,须要小心:
- ThreadLocal:业务代码是否应用了 ThreadLocal?就算没有,Spring 框架中也大量应用了 ThreadLocal,你所在公司的框架可能也是一样。
- 局部变量:线程处于阻塞状态,必定还有栈帧没有出栈,栈帧中有局部变量表,但凡被局部变量表援用的内存都不能回收。所以如果这个线程创立了比拟大的局部变量,那么这一部分内存无奈 GC。
- TLAB 机制:如果你的利用线程数处于高位,那么新的线程初始化可能因为 Eden 没有足够的空间调配 TLAB 而触发 YoungGC。
答
线程池放弃闲暇的外围线程是它的默认配置,一般来讲是没有问题的,因为它占用的内存个别不大。怕的就是业务代码中应用 ThreadLocal 缓存的数据过大又不清理。
如果你的利用线程数处于高位,那么须要察看一下 YoungGC 的状况,估算一下 Eden 大小是否足够。如果不够的话,可能要审慎地创立新线程,并且让闲暇的线程终止;必要的时候,可能须要对 JVM 进行调参。
6 keepAliveTime= 0 会怎么样
这也是个争议点。有的博文说等于 0 示意闲暇线程永远不会终止,有的说示意执行完立即终止。还有的说等于 - 1 示意闲暇线程永远不会终止。其实略微看一下源码晓得了,这里我间接抛出答案。
答
在 JDK1.8 中,keepAliveTime= 0 示意非核心线程执行完立即终止。
默认状况下,keepAliveTime 小于 0,初始化的时候才会报错;但如果 allowsCoreThreadTimeOut,keepAliveTime 必须大于 0,不然初始化报错。
7 怎么进行异样解决
很多代码的写法,咱们都习惯依照常见范式去编写,而没有去思考为什么。比方:
- 如果咱们应用 execute()提交工作,咱们个别要在 Runable 工作的代码加上 try-catch 进行异样解决。
- 如果咱们应用 submit()提交工作,咱们个别要在主线程中,对 Future.get()进行 try-catch 进行异样解决。
—— 然而在下面,我提到过,submit()底层实现依赖 execute(),两者应该对立呀,为什么有差别呢?上面再扒一扒 submit()的源码,它的实现蛮有意思。
首先,ThreadPoolExecutor 中没有 submit 的代码,而是在它的父类 AbstractExecutorService 中,有三个 submit 的重载办法,代码非常简单,要害代码就两行:
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
正是因为这三个重载办法,都调用了 execute,所以我才说 submit 底层依赖 execute。通过查看这里 execute 的实现,咱们不难发现,它就是 ThreadPoolExecutor 中的实现,所以,造成 submit 和 execute 的差异化的代码,不在这。那么造成差别的肯定在 newTaskFor 办法中。这个办法也就 new 了一个 FutureTask 而已,FutureTask 实现 RunnableFuture 接口,RunnableFuture 接口继承 Runnable 接口和 Future 接口。而 Callable 只是 FutureTask 的一个成员变量。
所以讲到这里,就有另一个 Java 根底知识点:Callable 和 Future 的关系。咱们个别用 Callable 编写工作代码,Future 是异步返回对象,通过它的 get 办法,阻塞式地获取后果。FutureTask 的外围代码就是实现了 Future 接口,也就是 get 办法的实现:
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
// 外围代码
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
// 死循环
for (;;) {if (Thread.interrupted()) {removeWaiter(q);
throw new InterruptedException();}
int s = state;
// 只有工作的状态是’已实现‘,才会跳出死循环
if (s > COMPLETING) {if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {nanos = deadline - System.nanoTime();
if (nanos <= 0L) {removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
get 的外围实现是有个 awaitDone 办法,这是一个死循环,只有工作的状态是“已实现”,才会跳出死循环;否则会依赖 UNSAFE 包下的 LockSupport.park 原语进行阻塞,期待 LockSupport.unpark 信号量。而这个信号量只有当运行完结取得后果、或者出现异常的状况下,才会收回来。别离对应办法 set 和 setException。这就是异步执行、阻塞获取的原理,扯得有点远了。
回到最后咱们的疑难,为什么 submit 之后,通过 get 办法能够获取到异样?起因是 FutureTask 有一个 Object 类型的 outcome 成员变量,用来记录执行后果。这个后果能够是传入的泛型,也能够是 Throwable 异样:
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
// get 办法中依赖的,报告执行后果
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
FutureTask 的另一个奇妙的中央就是借用 RunnableAdapter 外部类,将 submit 的 Runnable 封装成 Callable。所以就算你 submit 的是 Runnable,一样能够用 get 获取到异样。
答
- 不论是用 execute 还是 submit,都能够本人在业务代码上加 try-catch 进行异样解决。我个别喜爱应用这种形式,因为我喜爱对不同业务场景的异样进行差异化解决,至多打不一样的日志吧。
- 如果是 execute,还能够自定义线程池,继承 ThreadPoolExecutor 并复写其 afterExecute(Runnable r, Throwable t)办法。
- 或者实现 Thread.UncaughtExceptionHandler 接口,实现 void uncaughtException(Thread t, Throwable e); 办法,并将该 handler 传递给线程池的 ThreadFactory。
- 然而留神,afterExecute 和 UncaughtExceptionHandler 都不实用 submit。因为通过下面的 FutureTask.run()不难发现,它本人对 Throwable 进行了 try-catch,封装到了 outcome 属性,所以底层办法 execute 的 Worker 是拿不到异样信息的。
8 线程池需不需要敞开
答
一般来讲,线程池的生命周期追随服务的生命周期。如果一个服务(Service)进行服务了,那么须要调用 shutdown 办法进行敞开。所以 ExecutorService.shutdown 在 Java 以及一些中间件的源码中,是封装在 Service 的 shutdown 办法内的。
如果是 Server 端不重启就不进行提供服务,我认为是不须要非凡解决的。
9 shutdown 和 shutdownNow 的区别
答
shutdown => 平缓敞开,期待所有已增加到线程池中的工作执行完再敞开。
shutdownNow => 立即敞开,进行正在执行的工作,并返回队列中未执行的工作。
原本想剖析一下两者的源码的,然而发现本文的篇幅曾经过长了,源码也贴了不少。感兴趣的敌人本人看一下即可。
10 Spring 中有哪些和 ThreadPoolExecutor 相似的工具
答
这里我想着重强调的就是 SimpleAsyncTaskExecutor,Spring 中应用的 @Async 注解,底层就是基于 SimpleAsyncTaskExecutor 去执行工作,只不过它不是线程池,而是每次都新开一个线程。
另外想要强调的是 Executor 接口。Java 初学者容易想当然的认为 Executor 结尾的类就是一个线程池,而下面的都是反例。咱们能够在 JDK 的 execute 办法上看到这个正文:
/**
- Executes the given command at some time in the future. The command
- may execute in a new thread, in a pooled thread, or in the calling
- thread, at the discretion of the {@code Executor} implementation.
*/
所以,它的职责并不是提供一个线程池的接口,而是提供一个“未来执行命令”的接口。真正能代表线程池意义的,是 ThreadPoolExecutor 类,而不是 Executor 接口。
最佳实际总结
【强制】应用 ThreadPoolExecutor 的构造函数申明线程池,防止应用 Executors 类的 newFixedThreadPool 和 newCachedThreadPool。
【强制】创立线程或线程池时请指定有意义的线程名称,不便出错时回溯。即 threadFactory 参数要结构好。
【倡议】倡议不同类别的业务用不同的线程池。
【倡议】CPU 密集型工作(N+1):这种工作耗费的次要是 CPU 资源,能够将线程数设置为 N(CPU 外围数)+1,比 CPU 外围数多进去的一个线程是为了避免线程偶发的缺页中断,或者其它起因导致的工作暂停而带来的影响。一旦工作暂停,CPU 就会处于闲暇状态,而在这种状况下多进去的一个线程就能够充分利用 CPU 的闲暇工夫。
【倡议】I/ O 密集型工作(2N):这种工作利用起来,零碎会用大部分的工夫来解决 I / O 交互,而线程在解决 I / O 的时间段内不会占用 CPU 来解决,这时就能够将 CPU 交出给其它线程应用。因而在 I / O 密集型工作的利用中,咱们能够多配置一些线程,具体的计算方法是 2N。
【倡议】workQueue 不要应用无界队列,尽量应用有界队列。防止大量工作期待,造成 OOM。
【倡议】如果是资源缓和的利用,应用 allowsCoreThreadTimeOut 能够进步资源利用率。
【倡议】尽管应用线程池有多种异样解决的形式,但在工作代码中,应用 try-catch 最通用,也能给不同工作的异样解决做精细化。
【倡议】对于资源缓和的利用,如果放心线程池资源使用不当,能够利用 ThreadPoolExecutor 的 API 实现简略的监控,而后进行剖析和优化。
线程池初始化示例:
private static final ThreadPoolExecutor pool;
static {ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("po-detail-pool-%d").build();
pool = new ThreadPoolExecutor(4, 8, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(512),
threadFactory, new ThreadPoolExecutor.AbortPolicy());
pool.allowCoreThreadTimeOut(true);
}
- threadFactory:给出带业务语义的线程命名。
- corePoolSize:疾速启动 4 个线程解决该业务,是足够的。
- maximumPoolSize:IO 密集型业务,我的服务器是 4C8G 的,所以 4 *2=8。
- keepAliveTime:服务器资源缓和,让闲暇的线程疾速开释。
- pool.allowCoreThreadTimeOut(true):也是为了在能够的时候,让线程开释,开释资源。
- workQueue:一个工作的执行时长在 100~300ms,业务高峰期 8 个线程,依照 10s 超时(曾经很高了)。10s 钟,8 个线程,能够解决 10 1000ms / 200ms 8 = 400 个工作左右,往上再取一点,512 曾经很多了。
- handler:极其状况下,一些工作只能抛弃,爱护服务端。
原文链接
本文为阿里云原创内容,未经容许不得转载。