在Java代码中咱们经常会开启异步线程去执行一些网络申请,或是开启子线程去读写文件,这些线程的开启与执行在并发量较小的场景下能够失常运行,如果波及并发量比拟大、线程数量无限、响应速度要快的业务场景下,此时就不容许独自创立线程去执行工作,而是基于线程池治理、散发线程机制去执行线程工作,从而升高资源耗费、进步响应速度,对立治理线程资源
线程池的创立与分类
Exectors类是concurrent包下的用于疾速创立线程的工具类,该类中定义了一系列创立不同线程类型的静态方法,理论还是调用ThreadPoolExecutor类的有参函数,上面看下对应的办法源码
--- newFixedThreadPoolpublic static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());}--- 调用有参函数public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);}
newFixedThreadPool : 固定数量的线程池,可用于限度特定线程启用数量的场景,调用ThreadPoolExecutor构造函数中的参数定义如下
- corePoolSize : 外围线程数量
- maximumPoolSize : 最大线程数量
- keepAliveTime : 当线程的数量大于外围线程时,闲暇线程在终止之前期待新工作的最大工夫
- unit : 参数keepAliveTime的工夫单位
workQueue : 寄存期待执行工作的阻塞队列,罕用的组赛队列如下
- ArrayBlockingQueue : 基于数组的有界阻塞队列,遵循FIFO(先进先出)准则,构造函数提供设置队列大小参数,采纳ReentrantLock(基于AQS实现)获取重入锁,如果向已满的队列插入则以后线程阻塞
- LinkedBlockingQueue : 基于链表的无界阻塞队列,默认大小为Integer.MAX_VALUE,向该队列插入数据时会封装到Node<>节点所对应的链表中,队列外部应用了putLock和takeLock标识增加、删除锁,二者可并发执行
- SynchronousQueue : 单向链表同步队列,具体需查看源码(常识盲区,未钻研到该队列)
- PriorityBlockingQueue : 具备优先级排序的无界阻塞队列,默认以天然排序形式或者通过传入可比拟的Comparator比拟器进行排序
- threadFactory : 默认线程创立工厂
defaultHandler : 回绝策略,默认应用ThreadPoolExecutor.AbortPolicy,示意当队列满了并且工作线程大于线程池最大线程数量,此时间接抛出异样,
- CallerRunsPolicy : 用于被回绝工作的处理程序,它间接在 execute 办法的调用线程中运行被回绝的工作;如果执行程序已敞开,则会抛弃该工作
- DiscardOldestPolicy : 抛弃最老的一个申请,也就是行将被执行的一个工作,并尝试再次提交当前任务
- DiscardPolicy : 默认抛弃被回绝的工作
个别场景下默认应用ThreadPoolExecutor.AbortPolicy回绝策略
<br/>
newSingleThreadExecutor : 单线程的线程池,只有一个外围线程在执行,可用于须要依照特定程序执行的场景
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));}
通过入参能够看到只应用一个线程,采纳LinkedBlockingQueue无界队列,keepAliveTime是0s,阐明线程创立了不会超时终止,该线程程序执行所有工作
newCachedThreadPool : 外围线程为0,非核心线程数为int的最大值
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());}
keepAliveTime是60s,采纳SynchronousQueue同步阻塞队列,当有新的工作进来此时如果有闲暇的线程则重复使用,否则就从新创立一个新的线程,线程闲暇60s后会被回收,对于同步阻塞队列能够看这篇文章SynchronousQueue
newScheduledThreadPool : 外围线程为传入的固定数值,非核心线程数为int的最大值,可用于延时或定时工作的执行
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize);}public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue());}
办法参数中的DelayedWorkQueue底层也是基于数组实现的最小堆的具备优先级的队列,队列中的工作依照执行工夫升序排列,执行工夫越凑近以后工夫的工作排在最后面,此时也会最先执行该工作
<br/>
线程池外部执行机制
外部执行机制根本依照ThreadPoolExecutor构造函数传入的参数来解决提交进来的工作
线程池ThreadPoolExecutor中的execute执行办法
先看下execute办法源码
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
办法中的ctl是ThreadPoolExecutor中申明的提供原子操作的Integer对象,用于获取以后线程池状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- 判断以后线程数量是否小于外围线程数量corePoolSize,如果满足条件调用addWorker()办法创立一个新的外围线程
- 如果大于corePoolSize,接着判断以后线程池是否是运行状态并且通过workQueue.offer()写入阻塞队列
- 此时再次查看线程池状态是否正在运行,否则从队列中移除工作并执行回绝策略;如果是运行状态,调用workerCountOf()判断以后线程池线程数,数量为0就新创建一个新的线程
- 如果首次判断线程池状态非运行状态,调用addWorker()创立心线程如果失败,执行回绝策略
<br/>
ThreadPoolExecutor中提供了shutdown()、shutdownNow()办法用于敞开线程池,调用shutdown时不再承受新的工作,之前提交的工作等执行完结再敞开线程池,调用shutdownNow时会尝试进行线程池中的工作而后再敞开,并且返回未解决完的List<> tasks工作列表
ForkJoinPool初探
ThreadPoolExecutor阻塞队列中的工作都是单个线程去执行,如果此时须要进行密集型计算工作(比方大数组排序、遍历零碎文件夹并计算文件数量),就可能呈现线程池中一个线程忙碌而其余线程闲暇,导致CPU负载不平衡系统资源节约,ForkJoinPool就是用于将单个密集型计算工作拆分成多个小工作,通过fork让线程池其它线程来执行小工作,通过join合并线程执行工作后果,采纳并行执行机制,进步CPU的使用率
ForkJoinPool构造函数
public ForkJoinPool() { this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), defaultForkJoinWorkerThreadFactory, null, false);}public ForkJoinPool(int parallelism) { this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);} public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,boolean asyncMode) { this(checkParallelism(parallelism),checkFactory(factory),handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission();}
结构函数参数定义如下
- parallelism : 并行线程数,默认为Runtime.getRuntime().availableProcessors(),最小为1
- factory : 线程创立工厂,对象类型为ForkJoinWorkerThread
- handler : 线程执行时异样解决
- asyncMode : 为true示意解决工作的线程以队列模式依照先进先出(FIFO)程序,此时不反对工作合并,false则是依照栈的模式后进先出(LIFO)程序,默认是false反对工作合并(join)
获取ForkJoinPool对象能够间接应用commonPool()办法,
val pool = ForkJoinPool.commonPool()public static ForkJoinPool commonPool() { // assert common != null : "static init error"; return common;}
而common对应的初始化放在动态代码块中,且最终调用了ForkJoinPool的构造函数
static{ //... common = java.security.AccessController.doPrivileged (new java.security.PrivilegedAction<ForkJoinPool>() { public ForkJoinPool run() { return makeCommonPool(); }}); //...}private static ForkJoinPool makeCommonPool() { //... return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, "ForkJoinPool.commonPool-worker-");}
ForkJoinTask工作
ForkJoinTask抽象类实现了Future接口,同时提供了RecursiveAction和RecursiveTask两个实现类,该实现类提供泛型类型申明,源码如下
public abstract class RecursiveTask<V> extends ForkJoinTask<V> { V result; //返回后果 protected abstract V compute(); //执行工作 public final V getRawResult() { //获取result return result; } protected final boolean exec() { //ForkJoinTask中的形象办法 result = compute(); return true; }}
RecursiveAction绝对应RecursiveTask返回值为void,上面看下ForkJoinTask的三个外围办法
- fork : 在工作执行过程中将大工作拆分为多个小的子工作
- join : 调用子工作的join()办法期待工作返回后果,如果子工作执行异样,join()会抛出异样,quietlyJoin()办法不会抛出异样,须要调用getException()或getRawResult()手动解决异样和后果
- invoke : 在以后线程同步执行该工作,同join一样,如果子工作执行异样,invoke()会抛出异样,quietlyInvoke()办法不会抛出异样,须要调用getException()或getRawResult()手动解决异样和后果
执行ForkJoinTask工作
应用ForkJoinPool时,能够通过以下三个办法执行ForkJoinTask工作,
public <T> T invoke(ForkJoinTask<T> task)public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)public void execute(ForkJoinTask<?> task)
- invoke : 执行有返回值的工作,同步阻塞办法直到工作执行结束
- submit : 执行没有返回值的工作
- execute : 执行带有ForkJoinTask对象返回的工作,非阻塞办法,调用后ForkJoinPool会立刻执行并返回以后执行的task对象
invoke()、submit()是对ExecutorService接口的办法实现,同时ForkJoinPool 也定义了用来执行ForkJoinTask的execute办法
work-stealing模式
对于work-stealing模式形容可参见上面这篇博文,外围就是每个工作线程都有本人的工作队列,当某个线程实现工作后会去"拿"别的队列里的工作去执行,work-stealing模式
ForkJoinPool 与 ThreadPoolExecutor 比拟
ThreadPoolExecutor 与 ForkJoinPool 都实现了ExecutorService,不同之处在于前者只能执行 Runnable 和 Callable 工作,执行程序是依照其在阻塞队列中的程序来执行;后者除了能执行前者的工作类型外还扩大处ForkJoinTask类型工作,从而满足work-stealing这种算法模式,ForkJoinPool波及的技术点还有很多,须要持续深刻摸索,例如ForkJoinPool中的线程状态、fork()、compute()、join()的调用程序、子工作拆分粒度等细节内容...