前言
后面一遍文章 咱们看了下FutureTask的源码,晓得了怎么样去获取一个工作的返回值,明天咱们看下ThreadPoolExecutor。
ThreadPoolExecutor 看名词 咱们就能够 看做是ThreadPool 和Executor的联合,大略意思咱们也能晓得就是线程池执行器,哈哈这翻译 真棒。这篇博文 会从源码的角度去剖析下 一个线程工作 退出的线程池当前 是怎么被执行的~
线程池
下面 说线程的时候 咱们也说过 线程是零碎中极其宝贵的资源,那咱们要正当的应用他,所以有了线程池的呈现,那线程池能带来哪些益处呢
- 升高资源的耗费:通过反复利用曾经创立的线程来升高线程创立和销毁带来的耗费
- 提供响应速度:当咱们创立人物达到的时候,工作能够不须要期待线程的创立就能立刻执行
- 进步线程可管理性:线程是稀缺资源,不能有限创立,所以要应用线程池对线程进行同一的治理和调配,调优和监控等等。
源码剖析
继承构造
首先 咱们看下ThreadPoolExecutor 的继承关系
public class ThreadPoolExecutor extends AbstractExecutorService{}public abstract class AbstractExecutorService implements ExecutorService{}public interface ExecutorService extends Executor { <!--进行线程池,状态设置为SHUTDOWN,并且不在承受新的工作,曾经提交的工作会继续执行--> void shutdown(); <!--进行线程池,状态设置为STOP,不在承受先工作,尝试中断正在执行的工作,返回还未执行的工作--> List<Runnable> shutdownNow(); <!--是否是SHUTDOWN状态--> boolean isShutdown(); <!--是否所有工作都曾经终止--> boolean isTerminated(); <!--超时工夫内,去期待工作执行工作--> boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <!--Callable 去提交工作--> <T> Future<T> submit(Callable<T> task); <!--Runnable 去提交工作--> <T> Future<T> submit(Runnable task, T result); <!--Runnable 去提交工作--> Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}public interface Executor { void execute(Runnable command);}
咱们先从最上面的接口Executor 来看,这个接口就是一个实现,就是执行execute办法,这个接口就是线程执行的入口
ExecutorService接口继承了Executor接口,外面的的办法比拟多,咱们常见的shutdownNow,shutdown 就是在这个接口外面的,还有就是咱们常见往线程池外面提交工作的时候submit办法。ExecutorService丰盛了对工作执行和治理的性能
AbstractExecutorService是一个抽象类,实现了ExecutorService接口,这边顺带说下,为什么java 源码外面存在大量 抽象类实现接口,而后类再继承抽象类,为什么类不间接实现接口呢?还要套一层呢,之前我也不明确,起初我才分明,抽象类去实现接口,就是去实现一些公共的接口办法,这样类再次去实现接口的时候,只有关怀我不同的实现就好了,因为 咱们晓得接口的实现类不止一个,抽象类就是把这些要实现接口的类的公共的实现再次抽取进去,防止了大量的反复实现,尤其List,Set 接口 你看下 简直都有响应的抽象类实现!
次要的变量
<!--ctl 存储了线程池状态和线程的数量--> private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3;//32-3=29 private static final int CAPACITY = (1 << COUNT_BITS) - 1;//2的29次方-1 // runState is stored in the high-order bits <!--示意线程池正在运行,能够接受任务 解决线程池中工作--> private static final int RUNNING = -1 << COUNT_BITS; <!--不承受新的工作,然而任然会解决队列中的工作--> private static final int SHUTDOWN = 0 << COUNT_BITS; <!--不承受新的工作,不会解决队里中工作,对正在执行的工作进行中断--> private static final int STOP = 1 << COUNT_BITS; <!--工作被中断,正在解决整顿状态--> private static final int TIDYING = 2 << COUNT_BITS; <!--示意终结状态--> private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl <!--获取以后线程池的运行状态--> private static int runStateOf(int c) { return c & ~CAPACITY; } <!--获取以后线程池中工作线程的数量-> private static int workerCountOf(int c) { return c & CAPACITY; } <!--获取ctl的值 -> private static int ctlOf(int rs, int wc) { return rs | wc; }
对于Ctl是怎么解决线程状态和线程数的数量的,能够看下我的另外一篇博文:https://blog.csdn.net/zxlp520...
构造函数
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
这个构造函数 是所有构造函数最终调用的办法,那咱们就说下 这些具体的参数
- int corePoolSize 外围的线程数量
- int maximumPoolSize 最大的线程数量
- long keepAliveTime 线程存活的最大工夫设置
- TimeUnit unit 设置工夫的单位 和keepAliveTime是对应的
- BlockingQueue<Runnable> workQueue 阻塞队里,存储要执行的工作
- ThreadFactory threadFactory 创立执行线程的工厂 默认值:Executors.defaultThreadFactory()
RejectedExecutionHandler handler 工作的回绝Hander办法
- 默认的是AbortPolicy就是抛出异样,
- 还有三种策略是DiscardPolicy抛弃策略,DiscardOldestPolicy抛弃队列中等待时间最长的工作策略,CallerRunsPolicy这个是让调用的线程去解决的策略
Worker
为什么要先讲worker呢?因为咱们提交的工作Runnabale是以Worker这个对象去包装后运行的,这个前面我我讲addWorker办法的时候在细聊
先看下Worker的代码:
/** Worker 继承了AQS 和实现了Runnable接口 */ private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; /** worker 运行的主体线程 就是在哪个线程外面运行工作的 */ final Thread thread; /** 须要运行的工作 */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this);//这边的this 就是以后的Worker 对象 } /** 运行 以后的工作 runWorker是ThreadPoolExecutor外面的办法 */ public void run() { runWorker(this); } // Lock methods // 0 示意 没有锁住状态 // 1 示意 锁住状态 protected boolean isHeldExclusively() { return getState() != 0; } <!--这个办法咱们应该很相熟 我在将AQS的时候聊过这个办法,这边做的就是尝试批改state的状态,这样就是示意加锁的意思,示意这个worker 是锁住状态,别的线程不能执行,--> protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) {//CAS 去批改State的值,1示意 曾经被上锁 setExclusiveOwnerThread(Thread.currentThread());设置以后锁的占用者线程是以后线程 return true; } return false; } <!--开释锁,也就是批改State的值 为0 unused这个字段命名也挺有意思,意思是说 没用的意思--> protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null);//设置以后锁的占用者线程是null setState(0); return true; } <!--给以后的Worker加锁,如果获取不到 就退出期待队里中,阻塞以后执行线程--> public void lock() { acquire(1); } <!--这边相当于一个非偏心锁的实现 去尝试下加锁--> public boolean tryLock() { return tryAcquire(1); } <!--开释锁--> public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } <!--尝试去中断运行的线程工作,就是咱们调用shutdownNow 的时候--> void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
首先看下 这个Worker的继承构造,首先是实现了Runnable,又了这样的关系,Worker就能够被Thread去执行了,另外一个还有一个继承了一个抽象类AbstractQueuedSynchronizer,简称AQS,这个类 哈哈 真的是很久不见了,我之前花了5篇文章解释了这个AQS,可想而知其重要性,JUC 中很多实现都是 基于这个去做的,还是不分明的小伙伴能够去到我的博客外面去找下。
这边又一行代码 咱们须要注意下,挺有意思的,this.thread = getThreadFactory().newThread(this);这边 的this 就是咱们构建的Worker,thread 就是用ThreadFactory去创立的一个线程并且执行的工作就是Worker,也就是调用thread.start()就能够执行Worker了
execute
execute是实现Executor接口的办法,就是执行的工作的入口办法,咱们看下一个工作的提交进来是怎么做的
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get();//获取以后的ctl 值 /* * workerCountOf办法我下面也讲过,就是获取以后的工作线程数 * 如果以后的工作线程数小于设置的外围线程数量,就调用addWorker去新增一个工作线程,ture是示意要增加外围工作线程 * addWorker 如果增加胜利就间接返回,如果增加失败就持续后去下ctl,这边重写获取是为了 避免在addWorker过程中 ctl产生了扭转 */ if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } /* * 走到这步 阐明以后的工作线程数大于外围线程数或者是addWorker产生了失败 * 首先去判断了下 以后的线程状态是否是Running 而后把当前任务退出到阻塞队列workQueue中 * 如果都胜利了 那就再次获取下ctl,因为咱们在offer Runnable的时候可能ctl也会发生变化 *这边的多重验证 思考到高并发的状况,代码逻辑十分的谨严 * 持续走上来的逻辑是 再次判断下线程池状态 如果是非Running,那就移除以后的工作,最初执行reject办法 依据不同的回绝策略,做不同的行为 * 最初走到 判断以后线程数量如果是0,还是回去调用addWorker办法,传入一个空的Runnalbe,false 是示意创立一个非核心的工作线程 */ if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } /* * 走到这个判断 阐明以后线程池状态是非Running或者入队工作失败,队列可能是满了 * 这边是去创立非核心线程去解决工作,如果创立失败 就执行回绝策略 */ else if (!addWorker(command, false)) reject(command); }
这边的英文正文 我没舍得删除,读者能够去本人翻译下 形容的可能比我精确,我置信 大家能看的懂,而后再比照下 我上面的中文正文,我置信能分明 一个工作新增进来 是怎么个解决流程!
看完本人再回忆下,什么时候去创立外围线程?什么时候去创立非核心线程?什么时候工作会退出的阻塞队列中?最初执行回绝策略 有那几种状况?晓得这些答案 那么execute办法你应该了然于心了!
addWorker
上面咱们看下一个重点的办法,这个办法 调用的频次很高,咱们进入去看下
private boolean addWorker(Runnable firstTask, boolean core) { retry: //这个是一个自旋 套了一个自旋 其目标就是CAS 新增线程池的数量 for (;;) { int c = ctl.get();//获取ctl的值 int rs = runStateOf(c);//获取以后的线程状态 // 这边这个条件看上去很绕头,然而认真看看就能晓得 // 第一个条件rs >= SHUTDOWN 阐明线程池状态不失常 // 前面有一个非的判断 其实就是括号外面的条件有一个不成立 整个条件就是false if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 上面是获取线程外面的工作线程 如果大于最大值或者设置的阈值,就返回间接返回false 办法完结 int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //这个的意思是 如果CAS批改workerCount胜利 整个最外层的自旋就完结 if (compareAndIncrementWorkerCount(c)) break retry; // 这边为什么要用2个自旋 次要是这边又判断了下 以后这个自旋CAS批改WorkerCount失败后,ctl会发生变化 //如果和外层的不相等,就要返回外层的自旋 去重写做 这边就是为什么用的是 continue retry c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false;//worker是否开始执行了 boolean workerAdded = false;//worker 是否增加胜利 Worker w = null; try { w = new Worker(firstTask);//将Runnable 传入到worker的构造函数中,下面也讲过,其实就是用firstTask去结构了先的Thread final Thread t = w.thread;//以后的t就是执行Runnable的线程,在worker中创立 if (t != null) { final ReentrantLock mainLock = this.mainLock;//重入锁 mainLock.lock();//保障增加workder时候的线程平安 try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w);//增加worker到一个工作worker汇合中HashSet存储的 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock();//开释锁 } if (workerAdded) {//如果增加胜利 t.start();//这个是真正执行Worker的中央 就是这儿 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w);//如果最终Worker没有运行,那就清理掉他 批改对应的WorkerCount } return workerStarted; }
办法最开始的中央 用了2个自旋去解决并发状况下的CAS批改workerCount失败的状况,这边每个细节,每种状况都思考的很到位,状态判断的特地的谨严,真正看明确,感觉多线程状况下的编程是如许的麻烦,辛亏帮咱们做了封装!
咱们看下 t.Start() 这边办法,咱们晓得t就是Worker外面创立线程主体,是以本人为工作传入到Thread中的,咱们晓得start是开始运行线程,最终是会调用到run办法的,那么就是说会调到Worker外面的run 办法,咱们在回看下Worker外面的run办法
public void run() { runWorker(this);//ThreadPoolExecutor外面的办法}
runWorker
下面我也说了 线程start后会调用run办法,那么也就是调用 runWorker办法,咱们在看下这个外面写的时候什么
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask;//获取Worker外面的工作 w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //始终while循环 while (task != null || (task = getTask()) != null) { w.lock();//锁住Worker //判断如果以后的线程池状态是stop 并且检测以后线程的中断状态如果是false 就帮忙以后线程执行中断调用interrupt() if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task);//执行工作的前置Action Throwable thrown = null; try { task.run();//执行最终的Runnable工作 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown);///执行工作的后置Action } } finally { task = null; w.completedTasks++;//Worker实现的工作+1 w.unlock();//开释锁 } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly);//Worker执行完结后退出 } }
RunWorker办法是整个线程池运行工作的外围办法,线程会应用While循环 一直的从阻塞队里外面去获取工作,而后去执行工作,如果阻塞队列外面没有工作,这个时候
getTask() 办法就会阻塞线程,直至新工作的到来,所以咱们在做单元测试的时候,用到线程池,如果你不调用Shutdown 办法 ,你的debug 小红点就始终在运行,就是这个起因!
getTask
这个办法就是从阻塞队列中取获取工作
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); //判断线程池的状态如果是SHUTDOWN并且队列为空 或者间接状态就是null 就不会从阻塞队列中 取出工作 间接返回null if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); //timed 就是用来管制 获取阻塞队列中的工作 是否有等待时间,咱们设置的keepAliveTime值就会在这边用到,如果一个工作线程在期待工作超过了设置的值就会退出期待,回收线程 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c))//工作线程数减1 return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();// 获取工作 if (r != null) return r; timedOut = true;//设置期待超时标记 应该在自旋中,下次判断会用到此值 } catch (InterruptedException retry) { timedOut = false; } } }
咱们都晓得 当咱们调用shutdown的时候 线程池状态是ShUTDOWN,调用shutdownnow的时候线程状态是Stop,那么这2种状态是怎么解决阻塞队列外面的工作的呢,看了上文咱们应该能找到答案,当状态是stop的时候,咱们获取队列中的工作是间接返回的null的也就是说队列中的工作不会在执行了,然而当状态是shutdown的时候 只有 队列为空的时候 才会返回null,也就是队列不空 还是能够获取队列中的工作的,这种问题 在面试题中经常出现,如果要正在晓得答案,还是要通过从源码中去真正了解,光是被答案我置信你很快还是会遗记的!
submit
把握了execute办法 在看submit办法 其实就很简略了,submit个别是用于增加 带返回值的工作,咱们看下 代码
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null);//将Runnable 包装成FutureTask工作 去让线程执行 execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result);//将Runnable 包装成FutureTask工作 去让线程执行 execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
看到这边的代码,应该有点儿相熟的滋味,应该上篇文章聊FutureTask的时候 很多曾经将过了,包含Runnable和Callable怎么转换的,Future是怎么获取返回值的?
不分明的小伙伴 能够去看下我之前的文章!https://blog.csdn.net/zxlp520...
下面三个构造函数,就是对应着FutureTask的构造函数,说白了就是咱们应用execute的时候都是用FutureTask去传入的,因为FutureTask也是实现了Runable接口的
执行流程图
最初 用一张流程图,来形容下一个工作从增加到运行完结,经验了哪些办法!
总结
ThreadPoolExecutor 尽管外面执行办法很多,然而你如果把握了常见的逻辑运算符,AQS,线程,FutureTask 等相干常识的根底前提下 去看源码,也不会那么的累。最初我画的流程图,就是一个工作在新增到线程池中执行的整个流程!
最初分享下最近看到的一段话:
什么是危机?
真正的危机,来源于在正确的工夫做不正确的事。没有在正确的工夫,为下一步做出积攒,这才是危机的本源。
如果你正在这条成长路上的敌人,晚醒不如早醒,这就是我想说的。千万别等到中年才发现自己没有建设好本人的护城河,这个时候才晓得致力。在本人致力的阶段,不仅不致力反了抉择了放纵本人,这才是危机的本源。
心愿大家会有所播种,不负时光,不负卿!