Java提供了几种便捷的办法创立线程池,通过这些内置的api就可能很轻松的创立线程池。在java.util.concurrent
包中的Executors
类,其中的静态方法就是用来创立线程池的:
- newFixedThreadPool():创立一个固定线程数量的线程池,而且线程池中的工作全副执行实现后,闲暇的线程也不会被敞开。
- newSingleThreadExecutor():创立一个只有一个线程的线程池,闲暇时也不会被敞开。
- newCachedThreadPool():创立一个可缓存的线程池,线程的数量为
Integer.MAX_VALUE
,闲暇线程会长期缓存下来,线程会期待60s
还是没有工作退出的话就会被敞开。
Executors
类中还有一些创立线程池的办法(jdk8新加的),然而当初这个触极到我的常识盲区了~~
下面那几个办法,其实都是创立了一个ThreadPoolExecutor
对象作为返回值,要搞清楚线程池的原理次要还是要剖析ThreadPoolExecutor
这个类。
ThreadPoolExecutor
的构造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ... }
ThreadPoolExecutor
的构造方法蕴含以下几个参数:
- corePoolSize: 外围线程数量,常驻线程池中的线程,即时线程池中没有工作可执行,也不会被敞开。
- maximumPoolSize:最大线程数量
- keepAliveTime:闲暇线程存活工夫
- unit: 闲暇线程存活工夫的单位
- workQueue:工作队列,线程池一下忙不过来,那新来的工作就须要排队,排除中的工作就会放在workQueue中
- threadFactory:线程工厂,创立线程用的
- handler:
RejectedExecutionHandler
实例用于在线程池中没有闲暇线程可能执行工作,并且workQueue
中也容不下工作时回绝工作时的策略。
ThreadPoolExecutor
中的线程统称为工作线程,但有一个小概念是外围线程
,外围线程由参数corePoolSize
指定,如corePoolSize
设置5,那线程池中就会有5条线程常驻线程池中,不会被回收掉,然而也会有例外,如果allowCoreThreadTimeOut
为true
闲暇一段时间后,也会被敞开。
线程的状态和工作线程数量
线程中的状态和工作线程和数量都是由ctl
示意,是一个AtomicInteger
类型的属性:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl的高四位为线程的状态,其余位数为工作线程的数量,所以线程中最大的工作线程数量为(2^29)-1
。
线程池中的状态有五种:
- RUNNING:接管新的工作和解决队列中的工作
- SHUTDOWN:不能新增工作,然而会持续解决曾经增加的工作
- STOP:不能新增工作,不会持续解决曾经增加工作
- TIDYING:所有的工作曾经被终止,工作线程为0
- TERMINATED:terminated()办法执行实现
状态码的定义如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
创立线程池
如果有面试官问:如何正确的创立线程池?千万不要说应用Executors
创立线程,尽管Executors
能很不便的创立线程池,然而他提供的动态创立办法会有一些坑。
次要的起因是:maximumPoolSize
和workQueue
这两个参数
Executors
静态方法在创立线程池时,如果maximumPoolSize
设置为Integer.MAX_VALUE
,这样会导致线程池能够始终要以接管运行工作,可能导致cpu负载过高。
workQueue
是一个阻塞队列的实例,用于搁置正在期待执行的工作。如果在创立线程种时workQueue
实例没有指定工作的容量,那么期待队列中能够始终增加工作,极有可能导致oom
。
所以创立线程,最好是依据线程池的用处,而后本人创立线程。
增加工作
调用线程池的execute
并不是立刻执行工作,线程池外部用通过一顿操作,如:判断外围线程数、是否须要增加到期待队列中。
下来的代码是execute
的源码,代码很简洁只有2个if
语句:
public void execute(Runnable command) { int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command);}
- 第一个if,如果以后线程池中的工作线程数量小于
corePoolSize
,间接创立一个工作线程执行工作 - 第二个if,当线程池处于运行状态,调用
workQueue.offer(command)
办法将工作增加到workQueue
,否则调用addWorker(command, false)
尝试去增加一个工作线程。
整顿了一张图,把线程池分为三局部Core Worker
、Worker
、workQueue
:
换一种说法,在调用execute
办法时,工作首先会放在Core Worker
内,而后才是workQueue
,最初才会思考Worker
。
这样做的起因能够保障Core Worker
中的工作执行实现后,能立刻从workQueue
获取下一个工作,而不须要启动别的工作线程,用起码的工作线程办更多的事。
创立工作线程
在execute
办法中,有三个中央调用了addWorker
。addWorker
办法能够分为二局部:
- 减少工作线程数量
- 启动工作线程
addWorker
的办法签名如下:
private boolean addWorker(Runnable firstTask, boolean core)
- firstTask:第一个运行的工作,能够为空。如果为空工作会从
workQueue
中获取。 - core: 是否是外围工作线程
减少工作线程数量
retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); .... for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
下面代码省略了一部分代码,次要代码都在for
循环中,利用CAS
锁,平安的实现线程池状态的查看与减少工作线程的数量。其中的compareAndIncrementWorkerCount(c)
调用就是将工作线程数量+1。
启动工作线程
减少工作线程的数量后,紧接着就会启动工作线程:
boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } }} finally { if (! workerStarted) addWorkerFailed(w);}
启动工作线程的流程:
- 创立一个
Worker
实例,Worker
构造方法会应用ThreadFactory
创立一个线程
w = new Worker(firstTask);final Thread t = w.thread;
就不说Worker
类的实现了,间接给出构造方法来细品:
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this);}
- 如果线程池状态是在运行中,或者曾经敞开,但工作线程要从
workQueue
中获取工作,能力增加工作线程
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; }
留神::当线程池处于SHUTDOWN
状态时,它不能接管新的工作,然而能够继续执行未实现的工作。工作是否从workQueue
中获取,是依据firstTask
判断,每个Worker
实例都有一个firstTask
属性,如果这个值为null
,工作线程启动的时候就会从workQueue
中获取工作,否则会执行firstTask
。
- 启动线程
调用线程的start
办法,启动线程。
if (workerAdded) { t.start(); workerStarted = true;}
执行工作
回过头来看一个Worker
类的定义:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } ...}
Worker
类实现了Runnable
接口,同时在构造方法中会将this
传递给线程,到这里你就晓得了Worker
实例中有run
办法,它会在线程启动后执行:
public void run() { runWorker(this);}
run
办法外部接着调用runWorker
办法运行工作,在这里才是真正的开始运行工作了:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
- 获取工作
首先将firstTask
传递给task
长期变量:
Runnable task = w.firstTask;
而后循环查看task
或者从workQueue
中获取工作:
while (task != null || (task = getTask()) != null) { ...}
getTask()
稍后再做剖析。
- 运行工作
去掉一些状态查看、异样捕捉、和勾子办法调用后,保留最重要的调用task.run()
:
while (task != null || (task = getTask()) != null) { ... task.run(); ... }
task
其实就是通过调用execute
办法传递进来的Runnable
实例,也就是你的工作。只不过它可能保留在Worker.firstTask
中,或者在workQueue
中,保留在哪里在后面的工作增加程序
中曾经阐明。
从workQueue中获取工作
试想一下如果每个工作执行实现,就敞开掉一个线程那有多浪费资源,这样应用线程池也没有多大的意义。所以线程的次要的性能就是线程复用,一旦工作执行实现间接去获取下一个工作,或者挂起线程期待下一个提交的工作,而后期待一段时间后还是没有工作提交,而后才思考是否敞开局部闲暇的线程。
runWorker
中会循环的获取工作:
while (task != null || (task = getTask()) != null) { ... task.run(); ... }
下面的代码getTask()
就是从workQueue
中获取工作:
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { ... int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; ... try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
获取工作的时候会有两种形式:
- 超时期待获取工作
- 始终期待工作,直到有新工作
如果allowCoreThreadTimeOut
为true
,corePoolSize
指定的外围线程数量会被疏忽,间接应用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
获取工作,否则的话会依据当前工作线程的数量,如果wc > corePoolSize
为false
则以后会被认为是外围线程,调用workQueue.take()
始终期待工作。
工作线程的敞开
还是在runWorker
办法中:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { task.run(); } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
- completedAbruptly变量:标记当前工作线程是失常执行实现,还是异样实现的。completedAbruptly为
false
能够确定线程池中没有可执行的工作了。
下面代码是简洁后的代码,一个while
循环保障不间断的获取工作,没有工作能够执行(task为null)退出循环,最初再才会调用processWorkerExit
办法:
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
processWorkerExit
接管一个Worker
实例与completedAbruptly
变量。processWorkerExit的大抵工作流程:
- 判断当前工作线程是否异样实现,如果是间接缩小工作线程的数量,简略的说就是校对一下工作线程的数量。
- 减少实现的工作数量,将
Worker
从workers
中移除 - tryTerminate() 查看线程池状态,因为线程池能够提早敞开,如果你调用
shutdown
办法后不会立刻敞开,要期待所有的工作执行实现,所以这里调用tryTerminate()办法,尝试去调用terminated
办法。
工作线程实现策略
如果某个工作线程实现,线程池外部会判断是否须要重新启动一个:
//判断线程池状态if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { //获取最小工作线程数量 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //如果最小工作线程数量为0,然而workQueue中还有工作,那重置最小工作线程数量1 if (min == 0 && ! workQueue.isEmpty()) min = 1; //如果当前工作线程数数量大于或等于最小工作线程数量,则不须要启动新的工作线程 if (workerCountOf(c) >= min) return; // replacement not needed } //启动一个新的工作线程 addWorker(null, false);}
工作线程实现后有两种解决策略:
- 对于异样实现的工作线程,间接启动一个新的替换
- 对于失常实现的工作线程,判断当前工作线程是否足够,如果足够则不须要新启动工作线程
留神:这里的实现,示意工作线程的工作执行实现,workQueue
中也没有工作能够获取了。
线程池的敞开
敞开线程池有能够通过shutdown
办法:
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate();}
shutdown
办法,第一步就是先扭转线程池的状态,调用advanceRunState(SHUTDOWN)
办法,将线程池以后状态更改为SHUTDOWN
,advanceRunState代码如下:
private void advanceRunState(int targetState) { for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } }
而后立刻调用interruptIdleWorkers()
办法,interruptIdleWorkers()
外部会调用它的重载办法interruptIdleWorkers(boolean onlyOne)
同时onlyOne参数传递的false
来敞开闲暇的线程:
private void interruptIdleWorkers() { interruptIdleWorkers(false);} private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
以上代码会遍历workers
中的Worker
实例,而后调用线程的interrupt()
办法。
什么样的线程才是闲暇工作线程?
后面提到过在getTask()
中,线程从workQueue
中获取工作时会阻塞,被阻塞的线程就是闲暇的。
再次回到getTask()
的代码中:
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } ... int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; ... try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
再次剖析getTask()
中的代码中有一段捕捉InterruptedException
的代码块,interruptIdleWorkers办法中断线程后,getTask()
会捕捉中断异样,因为里面是一个for
循环,随后代码走到判断线程池状态的中央:
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null;}
下面的代码的会判断以后线程池状态,如果状态大于STOP
或者状态等于SHUTDOWN
并且workQueue
为空时则返回null
,getTask()
返回空那么在runWorker
中循环就会退出,当前工作线程的工作就实现了,能够退出了:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { task.run(); } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
shutdownNow
除了shutdown办法能敞开线程池,还有shutdownNow
也能够敞开线程池。它两的区别在于:
shutdownNow
会清空workQueue
中的工作shutdownNow
还会停止以后正在运行的工作shutdownNow
会使线程进入STOP
状态,而shutdown()
是SHUTDOWN
状态
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
下面代码根本流程:
- advanceRunState(STOP): 使线程池进行
STOP
状态,与shutdown()
中的统一 ,只是应用的状态码是STOP
- interruptWorkers(): 与
shutdown()
中的统一 - drainQueue(): 清空队列
工作是中止执行还是继续执行?
调用shutdownNow()后线程池处于STOP
状态,紧接着所有的工作线程都会被调用interrupt
办法,如果此时runWorker
还在运行会产生什么?
在runWorker
有一段代码,就是工作线程停止的重要代码:
final void runWorker(Worker w) { ... while (task != null || (task = getTask()) != null) { if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); task.run(); } ...}
重点关注:
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt();
这个if看起来有点难了解,了解下来大抵意思是:如果线程池状态大于等于STOP
,立刻中断线程,否则革除线程的中断标记,也就是说当线程池状态为RUNNING
和SHUTDOWN
时,线程的中断标记会被革除(线程的中断代码在interruptWorkers
办法中),能够继续执行工作。
以上代码执行实现后,紧接着就会调用task.run()
办法,这外面咱们本人就能够依据线程的中断标记来判断工作是否被中断。
总结
集体程度无限,文中如有谬误,谢谢大家斧正。
本文从线程池的源码动手,剖析线程池的创立、增加工作、运行工作等流程,整个剖析下来基本上大多数公司对于线程池面试的问题都能够答复得上来,当然还有一些小细节如:Worker
类是继承AQS
的,为什么这么做其实源码中都有一些苗头,Worker
在运行时会锁住运行的代码块,而shutdown
在敞开闲暇的Worker
时,首先就要去获取Worker
的同步锁能力持续操作,这样能力平安的敞开工作线程。
欢送关注我的公众号:架构文摘,取得独家整顿120G的收费学习资源助力你的架构师学习之路!公众号后盾回复
arch028
获取材料: