共计 16553 个字符,预计需要花费 42 分钟才能阅读完成。
热衷学习,热衷生存!😄
积淀、分享、成长,让本人和别人都能有所播种!😄
一、为什么要应用线程池?
线程池 提供了一种限度和治理资源(线程、工作)的形式。
这里借用《Java 并发编程的艺术》提到的来说一下 应用线程池的益处:
- 升高资源耗费:通过反复利用已创立的线程升高线程创立和销毁造成的耗费。
- 进步响应速度:当工作达到时,工作能够不须要期待创立线程就能立刻执行。
- 进步线程的可管理性:线程是稀缺资源,如果无线的创立,不仅会耗费系统资源,还会升高零碎的稳定性,应用线程池能够进行对立的调配,调优和监控。
二、ThreadPoolExecutor 类剖析
Java
线程池次要由 Executor
框架实现,Executor
框架不仅包含了线程池的治理,还提供了线程工厂、队列以及回绝策略等,Executor
框架让并发编程变得更加简略。
线程池实现类 ThreadPoolExecutor
是Executor
框架最外围的类,咱们就从这个类的学习线程池的实现原理。
外围属性
public class ThreadPoolExecutor extends AbstractExecutorService {
// 控制变量 - 寄存状态和线程数 32 位, 高 3 位寄存状态, 低 29 位寄存线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 工作队列,必须是阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 工作线程汇合,寄存线程池中所有的 (沉闷的) 工作线程,只有在持有全局锁 mainLock 的前提下能力拜访此汇合
private final HashSet<Worker> workers = new HashSet<>();
// 全局锁
private final ReentrantLock mainLock = new ReentrantLock();
// awaitTermination 办法应用的期待条件变量
private final Condition termination = mainLock.newCondition();
// 记录峰值线程数
private int largestPoolSize;
// 记录实现胜利执行的工作数
private long completedTaskCount;
// 线程工厂, 用于创立新的线程实例
private volatile ThreadFactory threadFactory;
// 拒绝执行处理器, 对应不同的回绝策略
private volatile RejectedExecutionHandler handler;
// 闲暇线程期待工作的工夫周期,单位是纳秒
private volatile long keepAliveTime;
// 是否容许外围线程超时, 如果为 true 则 keepAliveTime 对外围线程也失效
private volatile boolean allowCoreThreadTimeOut;
// 外围线程数
private volatile int corePoolSize;
// 线程池容量
private volatile int maximumPoolSize;
// 省略其余代码
}
构造方法
参数最多的构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if(corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if(workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
能够依据这个构造方法自定义线程数、线程池容量(最大线程数)、闲暇线程期待工作周期、工作队列、线程工厂、回绝策略。
在《阿里巴巴 Java 开发手册》“并发解决”这一章节,明确指出线程资源必须通过线程池创立,不容许在利用中自行显示创立线程,这是因为 应用线程池创立线程能够缩小在创立和销毁线程上所耗费的工夫以及系统资源的开销。
《阿里巴巴 Java 开发手册》中还强制不能应用 Executors
去创立线程池,而是通过下面的 ThreadPoolExecutor
的结构形式创立,这样的解决形式能够让写的同学更加明确线程池的运行规定,防止资源耗尽的危险。
上面简略剖析一下每个参数的含意和作用:
corePoolSize
:外围线程数量。maximumPoolSize
:最大线程数量,也就是线程池的容量。keepAliveTime
:线程闲暇等待时间,也和工作线程的生命周期无关。unit
:线程闲暇工夫的单位,最终会转为成纳秒。workQueue
:期待队列或者叫工作队列。ThreadFactory
:创立线程的工厂,默认应用Executors.defaultThreadFactory()
作为线程池工厂实例。-
handler
:线程池的执行执行处理器,更多的时候成为回绝策略,回绝策略执行的机会是当阻塞队列已满、没有闲暇的线程(蕴含外围线程和非核心线程)并且持续提交工作。提供了 4 种回绝策略实现:AbortPolicy
:间接回绝策略,也就是不会执行工作,间接抛出RjectedExecutionExcetion
谬误,默认的回绝策略。DiscardPolicy
:摈弃策略,也就是间接疏忽提交的工作。DiscardOldestPolicy
:摈弃最老工作策略,也就是通过poll()
办法取出工作队列头的工作摈弃,而后执行以后提交的工作。CallerRunsPolicy
:调用者执行策略,也就是以后调用Executor#execute()
的线程间接调用工作Runnable#run()
, 个别不心愿工作失落会选用这种策略,但从理论角度来看,原来的异步调用用意会进化成同步调用。
状态管制
状态管制次要围绕原子整数成员变量crl
:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 通过 ctl 值获取运行状态
private static int runStateOf(int c) {return c & ~COUNT_MASK;}
// 通过 ctl 值获取工作线程数
private static int workerCountOf(int c) {return c & COUNT_MASK;}
// 通过运行状态和工作线程数计算 ctl 的值,或运算
private static int ctlOf(int rs, int wc) {return rs | wc;}
private static boolean runStateLessThan(int c, int s) {return c < s;}
private static boolean runStateAtLeast(int c, int s) {return c >= s;}
private static boolean isRunning(int c) {return c < SHUTDOWN;}
// CAS 操作线程数减少 1
private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);
}
// CAS 操作线程数缩小 1
private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);
}
// 线程数间接缩小 1
private void decrementWorkerCount() {ctl.addAndGet(-1);
}
接下来剖析一下线程池的状态变量,工作线程数量位的长度是COUNT_BITS
,它的值是Integer.SIZE - 3
,也就是正整数 29:
咱们晓得,整数包装类型 Integer 实例的大小是 4byte,一共是 32 位,也就是一共有 32 位用于寄存 0 和 1。
在 ThreadPoolExecutor 实现中,应用 32 位的整数包装类型寄存工作线程数和线程状态,其中低 29 位用于寄存工作线程数,而高 3 位用于寄存线程池状态,所以线程池的状态最多只能有 2^3 种,工作线程下限数量为 2^29 – 1,超过 5 亿,这个数量在短时间内不必思考会超限。
接着看工作线程上线数量掩码COUNT_MASK
,它的值是(1 < COUNT_BITS - 1
),也就是 1 左移 29 位,再减去 1,如果补全 32 位,它的位示图如下:
而后就是线程池的状态常量,比方 RUNNING
状态:
// - 1 的补码为:111-11111111111111111111111111111
// 左移 29 位:111-00000000000000000000000000000
// 10 进制为:-536870912
// 高 3 位 111 的值就是示意线程池正在处于运行状态
private final static int RUNNING = -1 << COUNT_BITS;
线程池状态的运行状态常量:
状态名称 | 位图 | 十进制值 | 形容 |
---|---|---|---|
RUNNING |
111-0000... |
-536870912 | 运行中状态,能够承受新的工作和执行工作队列中的工作。 |
SHUTDOWN |
000-0000... |
0 | 敞开状态,不再接管新的工作,然而会执行工作队列中的工作。 |
STOP |
001-0000... |
536870912 | 进行状态,不再承受新的工作,也不会执行工作队列中的工作,中断所有执行中的工作。 |
TIDYING |
010-0000... |
1073741824 | 整顿中状态,所有工作都曾经执行结束,工作线程数为 0,过渡到此状态的工作线程会调用钩子办法terminated() |
TERMINATED |
011-0000... |
1610612736 | 终止状态,钩子办法 terminated() 执行结束。 |
这里还有一个比拟非凡的技巧,因为运行状态值寄存在高 3 位,所以间接能够通过十进制来比拟判断线程池的状态:
RUNNING
<SHUTDOWN
<STOP
<TIDYING
<TERMINATED
上面的 3 个办法就是应用这种技巧:
// ctl 和状态常量比拟,判断是否小于
private static boolean runStateLessThan(int c, int s) {return c < s;}
// ctl 和状态常量比拟,判断是否小于或等于
private static boolean runStateAtLeast(int c, int s) {return c >= s;}
// ctl 和状态常量 SHUTDOWN 比拟,判断是否处于 RUNNING 状态
private static boolean isRunning(int c) {return c < SHUTDOWN;}
线程状态流转关系如下图:
execute 办法源码剖析
线程异步执行工作的办法实现是 ThreadPoolExecutor#execute()
办法,咱们从源码的实现来学习,源码如下:
public void execute(Runnable command) {
// 判断工作对象非空
if (command == null)
throw new NullPointerExcetion();
// 获取 ctl 值, 用于获取线程池状态、线程池线程数量
int c = ctl.get();
// 如果以后线程数小于外围线程数,则创立新的外围线程数并且执行传入的工作
if (workerCountOf(c) < corePoolSize) {if (addWoker(command, true))
// 如果创立新的外围线程胜利则间接返回
return;
// 这里阐明创立新的外围线程失败,则更新 ctl 的长期变量 c
c = ctl.get();}
// 走到这里阐明创立外围线程失败,也就是当前工作的线程数大于等于外围线程数
// 判断线程是否处于运行中状态,如果是运行状态尝试应用非阻塞办法向工作队列放入工作
if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
// 这里向工作队列投放胜利,对线程池的运行中状态做二次查看
// 如果线程池二次查看状态是非运行中状态,则从工作队列移除以后的工作,调用回绝策略解决
if (!isRunning() && remove(command))
// 调用回绝策略解决工作
reject(command);
// 走到这里阐明有以下的前提:// 1、待执行的工作曾经胜利退出工作队列
// 2、线程池状态可能是 RUNNING
// 3、传入的工作可能从工作队列中移除失败(移除失败的惟一可能就是工作曾经被执行了)
// 如果当前工作线程数量为 0,则创立一个非核心线程并且传入的对象为 null
// 也就是创立非核心线程不会马上运行,而是期待获取工作队列的工作再执行
else if (workerCountOf(recheck) == 0)
// 创立一个非核心线程并且传入的工作对象为 null
addWorker(null, false);
}
// 走到这里阐明以下:// 1、线程池的工作线程总数曾经大于等于外围线程数
// 2、线程池可能不是 RUNNING 状态
// 3、线程池可能是 RUNNING 状态同时工作队列曾经满了
// 如果工作队列投放工作失败,则会尝试创立非核心线程执行工作
else if (!addWorker(command, false))
// 如果创立非核心线程失败,执行回绝策略
reject(command);
}
下面代码的流程如下:
- 如果当前工作线程总数小于外围线程数
corePoolSize
,则间接创立外围线程去执行工作(工作实例会传入间接用于结构工作线程实例)。 - 如果当前工作线程总是大于等于外围线程数
corePoolSize
,判断线程状态是否是运行中状态,如果是运行中状态则会尝试用非阻塞办法(offer()
)向工作队列投放工作,如果投放胜利会二次查看线程池运行状态,如果线程池是非运行中状态或者从工作队列移除以后的工作失败,则会调用回绝策略,如果当前工作线程数量为 0,则创立一个非核心线程并且传入的工作对象为null
。 - 如果工作队列投放工作失败了(工作队列满了),则会创立创立非核心线程传入工作实例执行。
- 如果非核心线程创立失败,则会调用回绝策略。
这里有一个纳闷点 :为什么要二次查看线程池的状态,当前工作线程数量为 0,尝试创立一个非核心线程并且传入的工作对象为null
?这个能够看API
的解释:
如果一个工作胜利退出工作队列,咱们仍然须要二次查看是否须要增加一个工作线程,因为所有存活的工作线程有可能在最初一次查看之后就终结了或者执行当前任务的时候线程池是否曾经
shutdown
了,所以咱们须要二次检车线程池的状态,必须时要把工作从工作队列中移除或者在没有可用的工作线程的前提的下创立一个工作线程。
execute()
办法执行流程图如下:
addWorker 办法源码剖析
addWorker()
办法用于增加工作线程,源码如下:
pirvate boolean addWorker(Runnable firstTask, boolean core) {
retry;
for (;;) {int c = ctl.get();
int rs = runStateOf(c);
// 判断边界情况表,当线程池的状态是 shutdown 状态下,不会再承受新的工作,// 在此前提下如果状态曾经到 stop 状态、或者传入工作不为空、或者工作队列为空
// 都不须要增加新的工作
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取工作线程总数
int wc = workCountOf(c);
// 如果工作线程总数大于等于容量或者大于等于外围线程数 / 最大线程数,
// 则不须要增加新的工作
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 胜利通过 cas 增加新的线程数 wc,则 break 到最外层的循环
if (compareAndOIncreamentWorkerCount(c))
breack retry;
// 走到这里阐明通过 cas 增加新的线程数 wc 失败,这个时候须要从新判断线程池的状态
// 是否由 RUNNABLE 曾经变成 SHUTDOWN
c = ctl.get();
// 如果线程池状态曾经由 RUNNING 曾经变为 SHUTDOWN,则从新跳出到外层循环继续执行
if (runStateOf(c) != rs)
continue retry;
// 如果线程池状态仍然是 RUNNING, CAS 更新工作线程数 wc 失败阐明有可能是并发更新导致的失败
// 则在内层循环即可
}
}
// 标记工作线程是否胜利启动
boolean workerStarted = false;
// 标记工作线程是否创立胜利
boolean workerAdded = false;
Worker w = null;
try {
// 传入工作实例 firstTask 创立 worker 实例,Worker 结构外面会通过线程工厂创立新的 Thread 对象
w = new Worker(firstTask);
// 获取 worker 的线程
final Thread t = w.thread;
if (t != null) {
// 获取全局锁
final ReentrantLock mainLock = this.mainLock;
// 全局锁加锁,因为会扭转一些指标值和非线程平安的汇合
mainLock.lock();
try {
// 获取线程池状态
int rs = runStateOf(ctl.get());
// 如果线程池状态不是 RUNNING 或者是 SHUTDOWN 同时传入的工作实例 firstTask 为 null,// 则判断线程是否存活,不存活抛异样
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive())
throw new IllegalThreadStateException();
// 把创立的工作线程实例增加到工作线程汇合
workers.add(w);
// 获取工作线程数量
int s = workers.size();
// 尝试更新线程池峰值容量
if (s > largestPoolSize)
largestPoolSize = s;
// 标记工作线程增加胜利,前面才会调用 Thread#start()办法启动实在的线程实例
workerAdded = true;
}
} finally {mainLock.unLock();
}
// 如果胜利增加工作线程
if (workerAdded) {// 调用 Worker 外部的线程实例 t 的 Thread#start()办法启动实在的线程实例
t.start();
// 标记线程启动胜利
workerStarted = true;
}
}
} finally {
// 线程启动失败,则须要从工作线程汇合移除对应的 Worker
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
// 增加 Worker 失败
private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock();
mainLock.lock();
try {if (w != null)
// 从工作线程移除
workers.remove();
// 工作线程数 -1
decrementWorkerCount();
// 基于状态尝试终止线程池
tryTerminate();} finally {mainLock.unLock();
}
}
addWorker()
办法是用来增加执行工作,这个流程能够分为两局部来看,第一局部是用于记录线程数量,第二局部是在独占锁里创立执行线程并启动。流程如下:
- 首先判断以后线程池的状态是否是
SHUTDOWN
、STOP
、TIDYING
、TERMINATED
中的一个。并且以后状态为SHUTDOWN
、且传入的工作为NULL
、同时工作队列不为空。那么就返回false
。 - 不满足上一点而后判断线程数是否超过外围线程数或者最大线程数(依据传入的
core
判断),如果超过则返回false
。 - 而后通过
CAS
操作减少线程池数量,胜利跳出循环体。 - 线程池数量记录胜利之后,创立工作实例,应用独占锁创立工作线程并退出到工作线程汇合,并记录增加状态,增加胜利则启动工作线程,记录启动状态,如果最初启动失败则调用
addWorkerFailed()
办法移除线程等操作。
流程图如下:
外部类 Worker 源码剖析
线程池中的每一个具体的工作线程被包装为外部类 Worker
实例,Worker
继承与 AQS
,实现了Runnable
接口,源码如下:
private final class Worker extends AbstractQueuedSynchronized implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
// 保留 ThreadFactory 创立的线程实例,如果创立失败为 null
final Thread thread;
// 保留传入的 Runnable 实例
Runnable firstTask;
// 记录线程实现的工作总数
volatile long completedTasks;
// 惟一构造方法,传入工作实例 firstTask, 能够为 null
Worker(Runnable firstTask) {
// 禁止线程中断,直到 runWorker 办法执行
setState(-1);
this.firstTask = firseTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
// 调用内部的 runWorker 办法执行真正的工作
runWorker(this);
}
// 是否持有独占锁,state = 0 示意没有获取锁,state > 0 示意获取锁
protected boolean isHeldExclusively() {return getState() != 0;
}
// 尝试获取独占锁
protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 尝试开释独占锁
protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
// 启动后进行线程中断
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {t.interrupt();
} catch (SecurityException ignore) {}}
}
}
Worker
的构造函数外面的逻辑非常重要,通过 ThreadFactory
创立 Thread
实例同时引入 Worker
实例,因为 Worker
自身实现了 Runnable
,所以能够作为工作提交到线程中执行。只有Worker
持有的线程实例 w
调用 Thread#start()
办法就能执行Worker#run()
。简化一下逻辑如下:
// addWorker()办法中结构
Worker worker = createWorker();
// 通过线程池结构时候传入
ThreadFactory threadFactory = getThreadFactory();
// Worker 构造函数中
Thread thread = threadFactory.newThread(worker);
// addWorker()办法中启动
thread.start();
Worker
继承 AQS
,这里应用了AQS
的独占锁模式,这里有个技巧是结构 Worker
的时候,把 AQS
资源状态通过 setState(-1)
设置成 -1,这是因为 Wokrer
实例刚创立时 AQS
中state
的默认值是 0,此时线程尚未启动,不能在这个时候进行线程中断,见 Worker#interruptIfStarted()
办法。
runWorker 办法源码剖析
final void runWorker(Worker w) {
// 获取以后线程,实际上和 Wokrer 持有的线程实例是雷同的
Thread wt = Thread.currentThread();
// 获取 worker 中持有的初始化时传入的工作对象,这里寄存长期变量 task
Runnable task = w.firstTask;
// 设置 Worker 中持有的初始化时传入的工作对象为 null
w.firstTask = null;
// 结构形式的是 state 设置成 -1,这里解锁 state 设成为 0,容许线程中断
w.unlock(); // allow interrupts
// 记录线程是否因为用户异样终结
boolean completedAbruptly = true;
try {
// 初始化工作对象不为空或者从工作队列获取到的工作对象不为空
while (task != null || (task = getTask()) != null) {
// 加锁
w.lock();
// 如果线程池正在进行状态,线程须要终止
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 工作执行之前
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行工作
task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
} finally {
// 工作执行之后
afterExecute(task, thrown);
}
} finally {
// 工作对象设为 null
task = null;
// 实现工作数量 +1
w.completedTasks++;
// 解锁
w.unlock();}
}
// 失常实现工作
completedAbruptly = false;
} finally {
// 解决线程退出,completedAbruptly 为 true 阐明因为用户异样导致线程非正常退出
processWorkerExit(w, completedAbruptly);
}
}
runWorker
办法的外围流程如下:
Worker
先执行解锁操作,容许线程中断。- 通过
while
循环调用getTask()
办法获取工作对象,首轮循环可能是内部传入的收个工作对象。 - 如果线程池状态变为
STOP
状态,则须要确保工作线程是中断状态并且进行中断解决,否则要保障工作线程不是中断状态。 - 执行工作实例
Runnable#run()
办法,工作执行之前和之后别离会调用beforeExecute()
和afterExecute()
。 while
循环跳出阐明工作全副执行结束,而后会调用processWorkerExit()
办法解决工作线程退出后的工作。
一言半语不如一图,流程图如下:
getTask 办法源码解析
getTask()
办法是工作线程在 while
死循环中获取工作队列中的工作对象的办法,源码如下:
private Runnable getTask() {
// 记录上一次从队列中获取的时候是否超时
boolean timeOut = false;
// 循环
for(;;) {int c = ctl.get();
// 获取线程池状态
int rs = runStateOf(c);
// 如果线程池状态至多为 SHUTDOWN,如果线程池状态 STOP 或者工作队列为空
// 则工作线程数量 wc 减 1,间接返回 null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())){decrementWorkerCount();
return null;
}
// 获取工作线程数
int wc = workerCountOf(c);
// timed 长期变量用于线程超时管制,决定是否须要通过 poll()的非阻塞办法从工作队列获取工作
// allowCoreThreadTimeOut 默认为 false,如果设置成 true, 则容许外围线程也能通过 poll()办法从工作队列中拉取工作
// 工作线程数大于外围线程数的时候,阐明线程池中创立了额定的非核心线程,这些非核心线程肯定是通过 poll()办法从工作队列中拉取工作
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 1. 工作线程数大于最大线程数或者 timed && timedOut 阐明线程命中了超时管制并且上一轮循环通过 poll()办法从工作队列获取工作为 null
// 并且工作线程总数大于 1 或者工作队列为空,则通过 CAS 把线程数减去 1,同时返回 null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {// 如果 timed 为 true,通过 poll()办法做超时拉取,keepAliveTime 工夫内没有期待到无效的工作,则返回 null
// 如果 timed 为 false,通过 take()做阻塞拉取,会阻塞到有下一个无效的工作时候再返回(个别不会是 null)Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedCount = true;
} catch (InterruptedException retry) {timedOut = false;}
}
}
这个办法中,有两处十分复杂的 if
逻辑,先来看第一处,对于第一处 if
可能导致工作线程数量减去 1 间接返回 null
的场景有:
- 线程池状态为
SHUTDOWN
,个别是调用了shutdown()
办法,并且工作队列为空。 - 线程池状态为
STOP
。
对于第二处 if
逻辑有点简单,先拆解一下:
// 工作线程总数大于最大线程数
boolean b1 = wc > maximumPoolSize;
// 容许线程超时同时上一轮通过 poll()办法从工作队列中获取工作为 null
boolean b2 = timed && timedOut;
// 作线程总数大于 1
boolean b3 = wc > 1;
// 工作队列为空
boolean b4 = workQueue.isEmpty();
if (r) {if (compareAndDecrementWorkerCount(c)){return null;}else{continue;}
}
这段逻辑大多数状况下是针对非核心线程的。在 execute()
办法中,线程总数大于外围线程并且小于最大线程数时,会调用 addWorker(task, false)
办法增加非核心线程,而这里的逻辑恰好是想法的操作,用于缩小非核心线程数,使得工作县城总数总是靠近于外围线程数。如果对于外围线程,上一轮循环获取对象为 null
,这一轮循环很容易满足timed && timedOut
为true
,这个时候 getTask()
返回 null
导致 runWorker()
办法跳出循环,最初执行 processWorkerExit()
办法解决工作,而该非核心线程对应的 Worker
则变成“游离对象”,期待被 JVM 回收。当 allowCoreThreadTimeOut
设置为 true
的时候,这里剖析的非核心线程的生命周期终结逻辑同时会实用于外围线程,那么能够总结出 keepAliveTime
的意义:
- 当容许外围线程超时,也就是
allowCoreThreadTimeOut
设置为 true 的时候,此时keepAliveTime
示意闲暇的工作线程存活周期。 - 默认状况下不容许外围线程超时,此时
keepAliveTime
示意闲暇的非核心线程存活周期。
三、手写一个线程池
通过上面对 ThreadPoolExecutor
的学习,咱们能够手写一个简略的线程池,蕴含了线程的外围逻辑,蕴含了提交工作,增加工作,获取工作,执行工作外围逻辑。
这个手写线程池的逻辑也非常简单,只体现外围流程,包含:
- 有 n 个始终执行的线程。
- 把线程提交给线程池运行。
- 如果线程池已满,则把线程放入队列中。
- 最初当有闲暇时,则获取队列中线程进行运行。
代码实现:
public class ThreadPoolTrader implements Executor {private final AtomicInteger ctl = new AtomicInteger(0);
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private final BlockingQueue<Runnable> workQueue;
public ThreadPoolTrader(int corePoolSize, int maximumPoolSize,
BlockingQueue<Runnable> workQueue) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
}
@Override
public void execute(Runnable command) {int c = ctl.get();
if (c < corePoolSize) {if (!addWorker(command)) {reject();
}
return;
}
if (!workQueue.offer(command)) {if (!addWorker(command)) {reject();
}
}
}
private boolean addWorker(Runnable firstTask) {if (ctl.get() >= maximumPoolSize) return false;
Worker worker = new Worker(firstTask);
worker.thread.start();
ctl.incrementAndGet();
return true;
}
private final class Worker implements Runnable {
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {this.thread = new Thread(this);
this.firstTask = firstTask;
}
@Override
public void run() {
Runnable task = firstTask;
try {while (task != null || (task = getTask()) != null) {System.out.println("以后执行工作的线程:" + Thread.currentThread().getName());
task.run();
if (ctl.get() > maximumPoolSize) {break;}
task = null;
}
} finally {ctl.decrementAndGet();
}
}
}
private Runnable getTask() {for (; ;) {
try {System.out.println("workQueue.size:" + workQueue.size());
return workQueue.take();} catch (InterruptedException e) {e.printStackTrace();
}
}
}
private void reject() {throw new RuntimeException("Error!ctl.count:" + ctl.get() + "workQueue.size:" + workQueue.size());
}
public static void main(String[] args) {
ThreadPoolTrader threadPoolTrader = new ThreadPoolTrader(2, 2,
new ArrayBlockingQueue<Runnable>(10));
for (int i = 0; i < 10; i++) {
int finalI = i;
threadPoolTrader.execute(() ->{
try {Thread.sleep(1000);
} catch (InterruptedException e) {e.printStackTrace();
}
System.out.println("工作编号:" + finalI);
});
}
}
}
下面的代码测试如下:
以后执行工作的线程:Thread-1
以后执行工作的线程:Thread-0
工作编号:1
工作编号:0
workQueue.size:8
workQueue.size:8
以后执行工作的线程:Thread-0
以后执行工作的线程:Thread-1
工作编号:3
工作编号:2
workQueue.size:6
以后执行工作的线程:Thread-1
workQueue.size:6
以后执行工作的线程:Thread-0
工作编号:5
workQueue.size:4
以后执行工作的线程:Thread-0
工作编号:4
workQueue.size:3
以后执行工作的线程:Thread-1
工作编号:6
workQueue.size:2
以后执行工作的线程:Thread-0
工作编号:7
workQueue.size:1
以后执行工作的线程:Thread-1
工作编号:8
工作编号:9
workQueue.size:0
workQueue.size:0
四、创立线程池的四种形式
Java 创立线程池的四种形式