热衷学习,热衷生存!
积淀、分享、成长,让本人和别人都能有所播种!
一、为什么要应用线程池?
线程池提供了一种限度和治理资源(线程、工作)的形式。
这里借用《Java 并发编程的艺术》提到的来说一下应用线程池的益处:
- 升高资源耗费:通过反复利用已创立的线程升高线程创立和销毁造成的耗费。
- 进步响应速度:当工作达到时,工作能够不须要期待创立线程就能立刻执行。
- 进步线程的可管理性:线程是稀缺资源,如果无线的创立,不仅会耗费系统资源,还会升高零碎的稳定性,应用线程池能够进行对立的调配,调优和监控。
二、ThreadPoolExecutor类剖析
Java
线程池次要由Executor
框架实现,Executor
框架不仅包含了线程池的治理,还提供了线程工厂、队列以及回绝策略等,Executor
框架让并发编程变得更加简略。
线程池实现类ThreadPoolExecutor
是Executor
框架最外围的类,咱们就从这个类的学习线程池的实现原理。
外围属性
public class ThreadPoolExecutor extends AbstractExecutorService { // 控制变量-寄存状态和线程数 32位, 高3位寄存状态, 低29位寄存线程数 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 工作队列, 必须是阻塞队列 private final BlockingQueue<Runnable> workQueue; // 工作线程汇合,寄存线程池中所有的(沉闷的)工作线程,只有在持有全局锁mainLock的前提下能力拜访此汇合 private final HashSet<Worker> workers = new HashSet<>(); // 全局锁 private final ReentrantLock mainLock = new ReentrantLock(); // awaitTermination办法应用的期待条件变量 private final Condition termination = mainLock.newCondition(); // 记录峰值线程数 private int largestPoolSize; // 记录实现胜利执行的工作数 private long completedTaskCount; // 线程工厂, 用于创立新的线程实例 private volatile ThreadFactory threadFactory; // 拒绝执行处理器, 对应不同的回绝策略 private volatile RejectedExecutionHandler handler; // 闲暇线程期待工作的工夫周期, 单位是纳秒 private volatile long keepAliveTime; // 是否容许外围线程超时, 如果为true则keepAliveTime对外围线程也失效 private volatile boolean allowCoreThreadTimeOut; // 外围线程数 private volatile int corePoolSize; // 线程池容量 private volatile int maximumPoolSize; // 省略其余代码}
构造方法
参数最多的构造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if(corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if(workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;}
能够依据这个构造方法自定义线程数、线程池容量(最大线程数)、闲暇线程期待工作周期、工作队列、线程工厂、回绝策略。
在《阿里巴巴Java开发手册》“并发解决”这一章节,明确指出线程资源必须通过线程池创立,不容许在利用中自行显示创立线程,这是因为应用线程池创立线程能够缩小在创立和销毁线程上所耗费的工夫以及系统资源的开销。
《阿里巴巴Java开发手册》中还强制不能应用Executors
去创立线程池,而是通过下面的ThreadPoolExecutor
的结构形式创立,这样的解决形式能够让写的同学更加明确线程池的运行规定,防止资源耗尽的危险。
上面简略剖析一下每个参数的含意和作用:
corePoolSize
:外围线程数量。maximumPoolSize
:最大线程数量,也就是线程池的容量。keepAliveTime
:线程闲暇等待时间,也和工作线程的生命周期无关。unit
:线程闲暇工夫的单位,最终会转为成纳秒。workQueue
:期待队列或者叫工作队列。ThreadFactory
:创立线程的工厂,默认应用Executors.defaultThreadFactory()
作为线程池工厂实例。handler
:线程池的执行执行处理器,更多的时候成为回绝策略,回绝策略执行的机会是当阻塞队列已满、没有闲暇的线程(蕴含外围线程和非核心线程)并且持续提交工作。提供了4种回绝策略实现:AbortPolicy
:间接回绝策略,也就是不会执行工作,间接抛出RjectedExecutionExcetion
谬误,默认的回绝策略。DiscardPolicy
:摈弃策略,也就是间接疏忽提交的工作。DiscardOldestPolicy
:摈弃最老工作策略,也就是通过poll()
办法取出工作队列头的工作摈弃,而后执行以后提交的工作。CallerRunsPolicy
:调用者执行策略,也就是以后调用Executor#execute()
的线程间接调用工作Runnable#run()
,个别不心愿工作失落会选用这种策略,但从理论角度来看,原来的异步调用用意会进化成同步调用。
状态管制
状态管制次要围绕原子整数成员变量crl
:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;// 通过ctl值获取运行状态private static int runStateOf(int c) { return c & ~COUNT_MASK; }// 通过ctl值获取工作线程数private static int workerCountOf(int c) { return c & COUNT_MASK; }// 通过运行状态和工作线程数计算ctl的值,或运算private static int ctlOf(int rs, int wc) { return rs | wc; }private static boolean runStateLessThan(int c, int s) { return c < s;}private static boolean runStateAtLeast(int c, int s) { return c >= s;}private static boolean isRunning(int c) { return c < SHUTDOWN;}// CAS操作线程数减少1private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1);}// CAS操作线程数缩小1private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1);}// 线程数间接缩小1private void decrementWorkerCount() { ctl.addAndGet(-1);}
接下来剖析一下线程池的状态变量,工作线程数量位的长度是COUNT_BITS
,它的值是Integer.SIZE - 3
,也就是正整数29:
咱们晓得,整数包装类型Integer实例的大小是4byte,一共是32位,也就是一共有32位用于寄存0和1。
在ThreadPoolExecutor实现中,应用32位的整数包装类型寄存工作线程数和线程状态,其中低29位用于寄存工作线程数,而高3位用于寄存线程池状态,所以线程池的状态最多只能有2^3种,工作线程下限数量为2^29 - 1,超过5亿,这个数量在短时间内不必思考会超限。
接着看工作线程上线数量掩码COUNT_MASK
,它的值是(1 < COUNT_BITS - 1
),也就是1左移29位,再减去1,如果补全32位,它的位示图如下:
而后就是线程池的状态常量,比方RUNNING
状态:
// -1的补码为:111-11111111111111111111111111111// 左移29位: 111-00000000000000000000000000000// 10进制为:-536870912 // 高3位111的值就是示意线程池正在处于运行状态private final static int RUNNING = -1 << COUNT_BITS;
线程池状态的运行状态常量:
状态名称 | 位图 | 十进制值 | 形容 |
---|---|---|---|
RUNNING | 111-0000... | -536870912 | 运行中状态,能够承受新的工作和执行工作队列中的工作。 |
SHUTDOWN | 000-0000... | 0 | 敞开状态,不再接管新的工作,然而会执行工作队列中的工作。 |
STOP | 001-0000... | 536870912 | 进行状态,不再承受新的工作,也不会执行工作队列中的工作,中断所有执行中的工作。 |
TIDYING | 010-0000... | 1073741824 | 整顿中状态,所有工作都曾经执行结束,工作线程数为0,过渡到此状态的工作线程会调用钩子办法terminated() |
TERMINATED | 011-0000... | 1610612736 | 终止状态,钩子办法terminated() 执行结束。 |
这里还有一个比拟非凡的技巧,因为运行状态值寄存在高3位,所以间接能够通过十进制来比拟判断线程池的状态:
RUNNING
<SHUTDOWN
<STOP
<TIDYING
<TERMINATED
上面的3个办法就是应用这种技巧:
// ctl和状态常量比拟,判断是否小于private static boolean runStateLessThan(int c, int s) { return c < s;}// ctl和状态常量比拟,判断是否小于或等于private static boolean runStateAtLeast(int c, int s) { return c >= s;}// ctl和状态常量SHUTDOWN比拟,判断是否处于RUNNING状态private static boolean isRunning(int c) { return c < SHUTDOWN;}
线程状态流转关系如下图:
execute办法源码剖析
线程异步执行工作的办法实现是ThreadPoolExecutor#execute()
办法,咱们从源码的实现来学习,源码如下:
public void execute(Runnable command) { // 判断工作对象非空 if (command == null) throw new NullPointerExcetion(); // 获取ctl值, 用于获取线程池状态、线程池线程数量 int c = ctl.get(); // 如果以后线程数小于外围线程数,则创立新的外围线程数并且执行传入的工作 if (workerCountOf(c) < corePoolSize) { if (addWoker(command, true)) // 如果创立新的外围线程胜利则间接返回 return; // 这里阐明创立新的外围线程失败,则更新ctl的长期变量c c = ctl.get(); } // 走到这里阐明创立外围线程失败,也就是当前工作的线程数大于等于外围线程数 // 判断线程是否处于运行中状态,如果是运行状态尝试应用非阻塞办法向工作队列放入工作 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 这里向工作队列投放胜利,对线程池的运行中状态做二次查看 // 如果线程池二次查看状态是非运行中状态,则从工作队列移除以后的工作,调用回绝策略解决 if (!isRunning() && remove(command)) // 调用回绝策略解决工作 reject(command); // 走到这里阐明有以下的前提: // 1、待执行的工作曾经胜利退出工作队列 // 2、线程池状态可能是RUNNING // 3、传入的工作可能从工作队列中移除失败(移除失败的惟一可能就是工作曾经被执行了) // 如果当前工作线程数量为0,则创立一个非核心线程并且传入的对象为null // 也就是创立非核心线程不会马上运行,而是期待获取工作队列的工作再执行 else if (workerCountOf(recheck) == 0) // 创立一个非核心线程并且传入的工作对象为null addWorker(null, false); } // 走到这里阐明以下: // 1、线程池的工作线程总数曾经大于等于外围线程数 // 2、线程池可能不是RUNNING状态 // 3、线程池可能是RUNNING状态同时工作队列曾经满了 // 如果工作队列投放工作失败,则会尝试创立非核心线程执行工作 else if (!addWorker(command, false)) // 如果创立非核心线程失败,执行回绝策略 reject(command);}
下面代码的流程如下:
- 如果当前工作线程总数小于外围线程数
corePoolSize
,则间接创立外围线程去执行工作(工作实例会传入间接用于结构工作线程实例)。 - 如果当前工作线程总是大于等于外围线程数
corePoolSize
,判断线程状态是否是运行中状态,如果是运行中状态则会尝试用非阻塞办法(offer()
)向工作队列投放工作,如果投放胜利会二次查看线程池运行状态,如果线程池是非运行中状态或者从工作队列移除以后的工作失败,则会调用回绝策略,如果当前工作线程数量为0,则创立一个非核心线程并且传入的工作对象为null
。 - 如果工作队列投放工作失败了(工作队列满了),则会创立创立非核心线程传入工作实例执行。
- 如果非核心线程创立失败,则会调用回绝策略。
这里有一个纳闷点:为什么要二次查看线程池的状态,当前工作线程数量为0,尝试创立一个非核心线程并且传入的工作对象为null
?这个能够看API
的解释:
如果一个工作胜利退出工作队列,咱们仍然须要二次查看是否须要增加一个工作线程,因为所有存活的工作线程有可能在最初一次查看之后就终结了或者执行当前任务的时候线程池是否曾经shutdown
了,所以咱们须要二次检车线程池的状态,必须时要把工作从工作队列中移除或者在没有可用的工作线程的前提的下创立一个工作线程。
execute()
办法执行流程图如下:
addWorker办法源码剖析
addWorker()
办法用于增加工作线程,源码如下:
pirvate boolean addWorker(Runnable firstTask, boolean core) { retry; for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 判断边界情况表,当线程池的状态是shutdown状态下,不会再承受新的工作, // 在此前提下如果状态曾经到stop状态、或者传入工作不为空、或者工作队列为空 // 都不须要增加新的工作 if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 获取工作线程总数 int wc = workCountOf(c); // 如果工作线程总数大于等于容量或者大于等于外围线程数/最大线程数, // 则不须要增加新的工作 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 胜利通过cas增加新的线程数wc,则break到最外层的循环 if (compareAndOIncreamentWorkerCount(c)) breack retry; // 走到这里阐明通过cas增加新的线程数wc失败,这个时候须要从新判断线程池的状态 // 是否由RUNNABLE曾经变成SHUTDOWN c = ctl.get(); // 如果线程池状态曾经由RUNNING曾经变为SHUTDOWN,则从新跳出到外层循环继续执行 if (runStateOf(c) != rs) continue retry; // 如果线程池状态仍然是RUNNING, CAS更新工作线程数wc失败阐明有可能是并发更新导致的失败 // 则在内层循环即可 } } // 标记工作线程是否胜利启动 boolean workerStarted = false; // 标记工作线程是否创立胜利 boolean workerAdded = false; Worker w = null; try { // 传入工作实例firstTask创立worker实例,Worker结构外面会通过线程工厂创立新的Thread对象 w = new Worker(firstTask); // 获取worker的线程 final Thread t = w.thread; if (t != null) { // 获取全局锁 final ReentrantLock mainLock = this.mainLock; // 全局锁加锁,因为会扭转一些指标值和非线程平安的汇合 mainLock.lock(); try { // 获取线程池状态 int rs = runStateOf(ctl.get()); // 如果线程池状态不是RUNNING或者是SHUTDOWN同时传入的工作实例firstTask为null, // 则判断线程是否存活,不存活抛异样 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); // 把创立的工作线程实例增加到工作线程汇合 workers.add(w); // 获取工作线程数量 int s = workers.size(); // 尝试更新线程池峰值容量 if (s > largestPoolSize) largestPoolSize = s; // 标记工作线程增加胜利,前面才会调用Thread#start()办法启动实在的线程实例 workerAdded = true; } } finally { mainLock.unLock(); } // 如果胜利增加工作线程 if (workerAdded) { // 调用Worker外部的线程实例t的Thread#start()办法启动实在的线程实例 t.start(); // 标记线程启动胜利 workerStarted = true; } } } finally { // 线程启动失败,则须要从工作线程汇合移除对应的Worker if (! workerStarted) addWorkerFailed(w); } return workerStarted;}// 增加Worker失败private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock(); mainLock.lock(); try { if (w != null) // 从工作线程移除 workers.remove(); // 工作线程数-1 decrementWorkerCount(); // 基于状态尝试终止线程池 tryTerminate(); } finally { mainLock.unLock(); }}
addWorker()
办法是用来增加执行工作,这个流程能够分为两局部来看,第一局部是用于记录线程数量,第二局部是在独占锁里创立执行线程并启动。流程如下:
- 首先判断以后线程池的状态是否是
SHUTDOWN
、STOP
、TIDYING
、TERMINATED
中的一个。并且以后状态为SHUTDOWN
、且传入的工作为NULL
、同时工作队列不为空。那么就返回false
。 - 不满足上一点而后判断线程数是否超过外围线程数或者最大线程数(依据传入的
core
判断),如果超过则返回false
。 - 而后通过
CAS
操作减少线程池数量,胜利跳出循环体。 - 线程池数量记录胜利之后,创立工作实例,应用独占锁创立工作线程并退出到工作线程汇合,并记录增加状态,增加胜利则启动工作线程,记录启动状态,如果最初启动失败则调用
addWorkerFailed()
办法移除线程等操作。
流程图如下:
外部类Worker源码剖析
线程池中的每一个具体的工作线程被包装为外部类Worker
实例,Worker
继承与AQS
,实现了Runnable
接口,源码如下:
private final class Worker extends AbstractQueuedSynchronized implements Runnable { private static final long serialVersionUID = 6138294804551838833L; // 保留ThreadFactory创立的线程实例,如果创立失败为null final Thread thread; // 保留传入的Runnable实例 Runnable firstTask; // 记录线程实现的工作总数 volatile long completedTasks; // 惟一构造方法,传入工作实例firstTask, 能够为null Worker(Runnable firstTask) { // 禁止线程中断,直到runWorker办法执行 setState(-1); this.firstTask = firseTask; this.thread = getThreadFactory().newThread(this); } public void run() { // 调用内部的runWorker办法执行真正的工作 runWorker(this); } // 是否持有独占锁,state = 0示意没有获取锁,state > 0示意获取锁 protected boolean isHeldExclusively() { return getState() != 0; } // 尝试获取独占锁 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 尝试开释独占锁 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } // 启动后进行线程中断 void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }}
Worker
的构造函数外面的逻辑非常重要,通过ThreadFactory
创立Thread
实例同时引入Worker
实例,因为Worker
自身实现了Runnable
,所以能够作为工作提交到线程中执行。只有Worker
持有的线程实例w
调用Thread#start()
办法就能执行Worker#run()
。简化一下逻辑如下:
// addWorker()办法中结构Worker worker = createWorker();// 通过线程池结构时候传入ThreadFactory threadFactory = getThreadFactory();// Worker构造函数中Thread thread = threadFactory.newThread(worker);// addWorker()办法中启动thread.start();
Worker
继承AQS
,这里应用了AQS
的独占锁模式,这里有个技巧是结构Worker
的时候,把AQS
资源状态通过setState(-1)
设置成-1,这是因为Wokrer
实例刚创立时AQS
中state
的默认值是0,此时线程尚未启动,不能在这个时候进行线程中断,见Worker#interruptIfStarted()
办法。
runWorker办法源码剖析
final void runWorker(Worker w) { // 获取以后线程,实际上和Wokrer持有的线程实例是雷同的 Thread wt = Thread.currentThread(); // 获取worker中持有的初始化时传入的工作对象,这里寄存长期变量task Runnable task = w.firstTask; // 设置Worker中持有的初始化时传入的工作对象为null w.firstTask = null; // 结构形式的是state设置成-1,这里解锁state设成为0,容许线程中断 w.unlock(); // allow interrupts // 记录线程是否因为用户异样终结 boolean completedAbruptly = true; try { // 初始化工作对象不为空或者从工作队列获取到的工作对象不为空 while (task != null || (task = getTask()) != null) { // 加锁 w.lock(); // 如果线程池正在进行状态,线程须要终止 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 工作执行之前 beforeExecute(wt, task); Throwable thrown = null; try { // 执行工作 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 工作执行之后 afterExecute(task, thrown); } } finally { // 工作对象设为null task = null; // 实现工作数量+1 w.completedTasks++; // 解锁 w.unlock(); } } // 失常实现工作 completedAbruptly = false; } finally { // 解决线程退出,completedAbruptly为true阐明因为用户异样导致线程非正常退出 processWorkerExit(w, completedAbruptly); }}
runWorker
办法的外围流程如下:
Worker
先执行解锁操作,容许线程中断。- 通过
while
循环调用getTask()
办法获取工作对象,首轮循环可能是内部传入的收个工作对象。 - 如果线程池状态变为
STOP
状态,则须要确保工作线程是中断状态并且进行中断解决,否则要保障工作线程不是中断状态。 - 执行工作实例
Runnable#run()
办法,工作执行之前和之后别离会调用beforeExecute()
和afterExecute()
。 while
循环跳出阐明工作全副执行结束,而后会调用processWorkerExit()
办法解决工作线程退出后的工作。
一言半语不如一图,流程图如下:
getTask办法源码解析
getTask()
办法是工作线程在while
死循环中获取工作队列中的工作对象的办法,源码如下:
private Runnable getTask() { // 记录上一次从队列中获取的时候是否超时 boolean timeOut = false; // 循环 for(;;) { int c = ctl.get(); // 获取线程池状态 int rs = runStateOf(c); // 如果线程池状态至多为SHUTDOWN,如果线程池状态STOP或者工作队列为空 // 则工作线程数量wc减1,间接返回null if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())){ decrementWorkerCount(); return null; } // 获取工作线程数 int wc = workerCountOf(c); // timed长期变量用于线程超时管制,决定是否须要通过poll()的非阻塞办法从工作队列获取工作 // allowCoreThreadTimeOut默认为false,如果设置成true,则容许外围线程也能通过poll()办法从工作队列中拉取工作 // 工作线程数大于外围线程数的时候,阐明线程池中创立了额定的非核心线程,这些非核心线程肯定是通过poll()办法从工作队列中拉取工作 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 1.工作线程数大于最大线程数或者timed && timedOut 阐明线程命中了超时管制并且上一轮循环通过poll()办法从工作队列获取工作为null // 并且工作线程总数大于1或者工作队列为空,则通过CAS把线程数减去1,同时返回null if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 如果timed为true,通过poll()办法做超时拉取,keepAliveTime工夫内没有期待到无效的工作,则返回null // 如果timed为false,通过take()做阻塞拉取,会阻塞到有下一个无效的工作时候再返回(个别不会是null) Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedCount = true; } catch (InterruptedException retry) { timedOut = false; } }}
这个办法中,有两处十分复杂的if
逻辑,先来看第一处,对于第一处if
可能导致工作线程数量减去1间接返回null
的场景有:
- 线程池状态为
SHUTDOWN
,个别是调用了shutdown()
办法,并且工作队列为空。 - 线程池状态为
STOP
。
对于第二处if
逻辑有点简单,先拆解一下:
// 工作线程总数大于最大线程数boolean b1 = wc > maximumPoolSize;// 容许线程超时同时上一轮通过poll()办法从工作队列中获取工作为nullboolean b2 = timed && timedOut;// 作线程总数大于1boolean b3 = wc > 1;// 工作队列为空boolean b4 = workQueue.isEmpty();if (r) { if (compareAndDecrementWorkerCount(c)){ return null; }else{ continue; }}
这段逻辑大多数状况下是针对非核心线程的。在execute()
办法中,线程总数大于外围线程并且小于最大线程数时,会调用addWorker(task, false)
办法增加非核心线程,而这里的逻辑恰好是想法的操作,用于缩小非核心线程数,使得工作县城总数总是靠近于外围线程数。如果对于外围线程,上一轮循环获取对象为null
,这一轮循环很容易满足timed && timedOut
为true
,这个时候getTask()
返回null
导致runWorker()
办法跳出循环,最初执行processWorkerExit()
办法解决工作,而该非核心线程对应的Worker
则变成“游离对象”,期待被JVM回收。当allowCoreThreadTimeOut
设置为true
的时候,这里剖析的非核心线程的生命周期终结逻辑同时会实用于外围线程,那么能够总结出keepAliveTime
的意义:
- 当容许外围线程超时,也就是
allowCoreThreadTimeOut
设置为true的时候,此时keepAliveTime
示意闲暇的工作线程存活周期。 - 默认状况下不容许外围线程超时,此时
keepAliveTime
示意闲暇的非核心线程存活周期。
三、手写一个线程池
通过上面对ThreadPoolExecutor
的学习,咱们能够手写一个简略的线程池,蕴含了线程的外围逻辑,蕴含了提交工作,增加工作,获取工作,执行工作外围逻辑。
这个手写线程池的逻辑也非常简单,只体现外围流程,包含:
- 有n个始终执行的线程。
- 把线程提交给线程池运行。
- 如果线程池已满,则把线程放入队列中。
- 最初当有闲暇时,则获取队列中线程进行运行。
代码实现:
public class ThreadPoolTrader implements Executor { private final AtomicInteger ctl = new AtomicInteger(0); private volatile int corePoolSize; private volatile int maximumPoolSize; private final BlockingQueue<Runnable> workQueue; public ThreadPoolTrader(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> workQueue) { this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; } @Override public void execute(Runnable command) { int c = ctl.get(); if (c < corePoolSize) { if (!addWorker(command)) { reject(); } return; } if (!workQueue.offer(command)) { if (!addWorker(command)) { reject(); } } } private boolean addWorker(Runnable firstTask) { if (ctl.get() >= maximumPoolSize) return false; Worker worker = new Worker(firstTask); worker.thread.start(); ctl.incrementAndGet(); return true; } private final class Worker implements Runnable { final Thread thread; Runnable firstTask; Worker(Runnable firstTask) { this.thread = new Thread(this); this.firstTask = firstTask; } @Override public void run() { Runnable task = firstTask; try { while (task != null || (task = getTask()) != null) { System.out.println("以后执行工作的线程:" + Thread.currentThread().getName()); task.run(); if (ctl.get() > maximumPoolSize) { break; } task = null; } } finally { ctl.decrementAndGet(); } } } private Runnable getTask() { for (; ; ) { try { System.out.println("workQueue.size:" + workQueue.size()); return workQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } private void reject() { throw new RuntimeException("Error!ctl.count:" + ctl.get() + " workQueue.size:" + workQueue.size()); } public static void main(String[] args) { ThreadPoolTrader threadPoolTrader = new ThreadPoolTrader(2, 2, new ArrayBlockingQueue<Runnable>(10)); for (int i = 0; i < 10; i++) { int finalI = i; threadPoolTrader.execute(() ->{ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("工作编号:" + finalI); }); } }}
下面的代码测试如下:
以后执行工作的线程:Thread-1以后执行工作的线程:Thread-0工作编号:1工作编号:0workQueue.size:8workQueue.size:8以后执行工作的线程:Thread-0以后执行工作的线程:Thread-1工作编号:3工作编号:2workQueue.size:6以后执行工作的线程:Thread-1workQueue.size:6以后执行工作的线程:Thread-0工作编号:5workQueue.size:4以后执行工作的线程:Thread-0工作编号:4workQueue.size:3以后执行工作的线程:Thread-1工作编号:6workQueue.size:2以后执行工作的线程:Thread-0工作编号:7workQueue.size:1以后执行工作的线程:Thread-1工作编号:8工作编号:9workQueue.size:0workQueue.size:0
四、创立线程池的四种形式
Java 创立线程池的四种形式