在 Java 代码中咱们经常会开启异步线程去执行一些网络申请, 或是开启子线程去读写文件, 这些线程的开启与执行在并发量较小的场景下能够失常运行, 如果波及并发量比拟大、线程数量无限、响应速度要快的业务场景下, 此时就不容许独自创立线程去执行工作, 而是基于线程池治理、散发线程机制去执行线程工作, 从而升高资源耗费、进步响应速度,对立治理线程资源
线程池的创立与分类
Exectors 类是 concurrent 包下的用于疾速创立线程的工具类, 该类中定义了一系列创立不同线程类型的静态方法, 理论还是调用 ThreadPoolExecutor 类的有参函数, 上面看下对应的办法源码
--- newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
--- 调用有参函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
-
newFixedThreadPool : 固定数量的线程池, 可用于限度特定线程启用数量的场景, 调用 ThreadPoolExecutor 构造函数中的参数定义如下
- corePoolSize : 外围线程数量
- maximumPoolSize : 最大线程数量
- keepAliveTime : 当线程的数量大于外围线程时, 闲暇线程在终止之前期待新工作的最大工夫
- unit : 参数 keepAliveTime 的工夫单位
-
workQueue : 寄存期待执行工作的阻塞队列, 罕用的组赛队列如下
- ArrayBlockingQueue : 基于数组的有界阻塞队列, 遵循 FIFO(先进先出)准则, 构造函数提供设置队列大小参数, 采纳 ReentrantLock(基于 AQS 实现)获取重入锁, 如果向已满的队列插入则以后线程阻塞
- LinkedBlockingQueue : 基于链表的无界阻塞队列, 默认大小为 Integer.MAX_VALUE, 向该队列插入数据时会封装到 Node<> 节点所对应的链表中, 队列外部应用了 putLock 和 takeLock 标识增加、删除锁,二者可并发执行
- SynchronousQueue : 单向链表同步队列, 具体需查看源码(常识盲区, 未钻研到该队列)
- PriorityBlockingQueue : 具备优先级排序的无界阻塞队列, 默认以天然排序形式或者通过传入可比拟的 Comparator 比拟器进行排序
- threadFactory : 默认线程创立工厂
-
defaultHandler : 回绝策略, 默认应用 ThreadPoolExecutor.AbortPolicy, 示意当队列满了并且工作线程大于线程池最大线程数量, 此时间接抛出异样,
- CallerRunsPolicy : 用于被回绝工作的处理程序,它间接在 execute 办法的调用线程中运行被回绝的工作;如果执行程序已敞开,则会抛弃该工作
- DiscardOldestPolicy : 抛弃最老的一个申请,也就是行将被执行的一个工作,并尝试再次提交当前任务
- DiscardPolicy : 默认抛弃被回绝的工作
个别场景下默认应用 ThreadPoolExecutor.AbortPolicy 回绝策略
<br/>
-
newSingleThreadExecutor : 单线程的线程池, 只有一个外围线程在执行, 可用于须要依照特定程序执行的场景
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
通过入参能够看到只应用一个线程, 采纳 LinkedBlockingQueue 无界队列,keepAliveTime 是 0s, 阐明线程创立了不会超时终止, 该线程程序执行所有工作
-
newCachedThreadPool : 外围线程为 0, 非核心线程数为 int 的最大值
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
keepAliveTime 是 60s, 采纳 SynchronousQueue 同步阻塞队列, 当有新的工作进来此时如果有闲暇的线程则重复使用, 否则就从新创立一个新的线程, 线程闲暇 60s 后会被回收, 对于同步阻塞队列能够看这篇文章 SynchronousQueue
-
newScheduledThreadPool : 外围线程为传入的固定数值, 非核心线程数为 int 的最大值, 可用于延时或定时工作的执行
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); }
办法参数中的 DelayedWorkQueue 底层也是基于数组实现的最小堆的具备优先级的队列, 队列中的工作依照执行工夫升序排列, 执行工夫越凑近以后工夫的工作排在最后面, 此时也会最先执行该工作
<br/>
线程池外部执行机制
外部执行机制根本依照 ThreadPoolExecutor 构造函数传入的参数来解决提交进来的工作
线程池 ThreadPoolExecutor 中的 execute 执行办法
先看下 execute 办法源码
public void execute(Runnable command) {if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))
return;
c = ctl.get();}
if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
办法中的 ctl 是 ThreadPoolExecutor 中申明的提供原子操作的 Integer 对象, 用于获取以后线程池状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- 判断以后线程数量是否小于外围线程数量 corePoolSize, 如果满足条件调用 addWorker()办法创立一个新的外围线程
- 如果大于 corePoolSize, 接着判断以后线程池是否是运行状态并且通过 workQueue.offer()写入阻塞队列
- 此时再次查看线程池状态是否正在运行, 否则从队列中移除工作并执行回绝策略; 如果是运行状态, 调用 workerCountOf()判断以后线程池线程数, 数量为 0 就新创建一个新的线程
- 如果首次判断线程池状态非运行状态, 调用 addWorker()创立心线程如果失败, 执行回绝策略
<br/>
ThreadPoolExecutor 中提供了 shutdown()、shutdownNow()办法用于敞开线程池, 调用 shutdown 时不再承受新的工作, 之前提交的工作等执行完结再敞开线程池, 调用 shutdownNow 时会尝试进行线程池中的工作而后再敞开, 并且返回未解决完的 List<> tasks 工作列表
ForkJoinPool 初探
ThreadPoolExecutor 阻塞队列中的工作都是单个线程去执行, 如果此时须要进行密集型计算工作(比方大数组排序、遍历零碎文件夹并计算文件数量), 就可能呈现线程池中一个线程忙碌而其余线程闲暇, 导致 CPU 负载不平衡系统资源节约,ForkJoinPool 就是用于将单个密集型计算工作拆分成多个小工作, 通过 fork 让线程池其它线程来执行小工作, 通过 join 合并线程执行工作后果, 采纳并行执行机制, 进步 CPU 的使用率
ForkJoinPool 构造函数
public ForkJoinPool() {this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism) {this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,boolean asyncMode) {this(checkParallelism(parallelism),checkFactory(factory),handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();}
结构函数参数定义如下
- parallelism : 并行线程数, 默认为 Runtime.getRuntime().availableProcessors(), 最小为 1
- factory : 线程创立工厂, 对象类型为 ForkJoinWorkerThread
- handler : 线程执行时异样解决
- asyncMode : 为 true 示意解决工作的线程以队列模式依照先进先出 (FIFO) 程序, 此时不反对工作合并,false 则是依照栈的模式后进先出 (LIFO) 程序, 默认是 false 反对工作合并(join)
获取 ForkJoinPool 对象能够间接应用 commonPool()办法,
val pool = ForkJoinPool.commonPool()
public static ForkJoinPool commonPool() {
// assert common != null : "static init error";
return common;
}
而 common 对应的初始化放在动态代码块中,且最终调用了 ForkJoinPool 的构造函数
static{
//...
common = java.security.AccessController.doPrivileged
(new java.security.PrivilegedAction<ForkJoinPool>() {public ForkJoinPool run() {return makeCommonPool(); }});
//...
}
private static ForkJoinPool makeCommonPool() {
//...
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
}
ForkJoinTask 工作
ForkJoinTask 抽象类实现了 Future 接口, 同时提供了 RecursiveAction 和 RecursiveTask 两个实现类, 该实现类提供泛型类型申明, 源码如下
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
V result; // 返回后果
protected abstract V compute(); // 执行工作
public final V getRawResult() { // 获取 result
return result;
}
protected final boolean exec() { //ForkJoinTask 中的形象办法
result = compute();
return true;
}
}
RecursiveAction 绝对应 RecursiveTask 返回值为 void, 上面看下 ForkJoinTask 的三个外围办法
- fork : 在工作执行过程中将大工作拆分为多个小的子工作
- join : 调用子工作的 join()办法期待工作返回后果, 如果子工作执行异样,join()会抛出异样,quietlyJoin()办法不会抛出异样, 须要调用 getException()或 getRawResult()手动解决异样和后果
- invoke : 在以后线程同步执行该工作, 同 join 一样,如果子工作执行异样,invoke()会抛出异样,quietlyInvoke()办法不会抛出异样, 须要调用 getException()或 getRawResult()手动解决异样和后果
执行 ForkJoinTask 工作
应用 ForkJoinPool 时, 能够通过以下三个办法执行 ForkJoinTask 工作,
public <T> T invoke(ForkJoinTask<T> task)
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
public void execute(ForkJoinTask<?> task)
- invoke : 执行有返回值的工作, 同步阻塞办法直到工作执行结束
- submit : 执行没有返回值的工作
- execute : 执行带有 ForkJoinTask 对象返回的工作, 非阻塞办法, 调用后 ForkJoinPool 会立刻执行并返回以后执行的 task 对象
invoke()、submit()是对 ExecutorService 接口的办法实现, 同时 ForkJoinPool 也定义了用来执行 ForkJoinTask 的 execute 办法
work-stealing 模式
对于 work-stealing 模式形容可参见上面这篇博文,外围就是每个工作线程都有本人的工作队列, 当某个线程实现工作后会去 ” 拿 ” 别的队列里的工作去执行,work-stealing 模式
ForkJoinPool 与 ThreadPoolExecutor 比拟
ThreadPoolExecutor 与 ForkJoinPool 都实现了 ExecutorService, 不同之处在于前者只能执行 Runnable 和 Callable 工作, 执行程序是依照其在阻塞队列中的程序来执行; 后者除了能执行前者的工作类型外还扩大处 ForkJoinTask 类型工作, 从而满足 work-stealing 这种算法模式,ForkJoinPool 波及的技术点还有很多, 须要持续深刻摸索, 例如 ForkJoinPool 中的线程状态、fork()、compute()、join()的调用程序、子工作拆分粒度等细节内容 …