在Java代码中咱们经常会开启异步线程去执行一些网络申请,或是开启子线程去读写文件,这些线程的开启与执行在并发量较小的场景下能够失常运行,如果波及并发量比拟大、线程数量无限、响应速度要快的业务场景下,此时就不容许独自创立线程去执行工作,而是基于线程池治理、散发线程机制去执行线程工作,从而升高资源耗费、进步响应速度,对立治理线程资源

线程池的创立与分类

Exectors类是concurrent包下的用于疾速创立线程的工具类,该类中定义了一系列创立不同线程类型的静态方法,理论还是调用ThreadPoolExecutor类的有参函数,上面看下对应的办法源码

--- newFixedThreadPoolpublic static ExecutorService newFixedThreadPool(int nThreads) {        return new ThreadPoolExecutor(nThreads, nThreads,                                    0L, TimeUnit.MILLISECONDS,                                    new LinkedBlockingQueue<Runnable>());}--- 调用有参函数public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue) {    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,        Executors.defaultThreadFactory(), defaultHandler);}
  1. newFixedThreadPool : 固定数量的线程池,可用于限度特定线程启用数量的场景,调用ThreadPoolExecutor构造函数中的参数定义如下

    • corePoolSize : 外围线程数量
    • maximumPoolSize : 最大线程数量
    • keepAliveTime : 当线程的数量大于外围线程时,闲暇线程在终止之前期待新工作的最大工夫
    • unit : 参数keepAliveTime的工夫单位
    • workQueue : 寄存期待执行工作的阻塞队列,罕用的组赛队列如下

      1. ArrayBlockingQueue : 基于数组的有界阻塞队列,遵循FIFO(先进先出)准则,构造函数提供设置队列大小参数,采纳ReentrantLock(基于AQS实现)获取重入锁,如果向已满的队列插入则以后线程阻塞
      2. LinkedBlockingQueue : 基于链表的无界阻塞队列,默认大小为Integer.MAX_VALUE,向该队列插入数据时会封装到Node<>节点所对应的链表中,队列外部应用了putLock和takeLock标识增加、删除锁,二者可并发执行
      3. SynchronousQueue : 单向链表同步队列,具体需查看源码(常识盲区,未钻研到该队列)
      4. PriorityBlockingQueue : 具备优先级排序的无界阻塞队列,默认以天然排序形式或者通过传入可比拟的Comparator比拟器进行排序
    • threadFactory : 默认线程创立工厂
    • defaultHandler : 回绝策略,默认应用ThreadPoolExecutor.AbortPolicy,示意当队列满了并且工作线程大于线程池最大线程数量,此时间接抛出异样,

      1. CallerRunsPolicy : 用于被回绝工作的处理程序,它间接在 execute 办法的调用线程中运行被回绝的工作;如果执行程序已敞开,则会抛弃该工作
      2. DiscardOldestPolicy : 抛弃最老的一个申请,也就是行将被执行的一个工作,并尝试再次提交当前任务
      3. DiscardPolicy : 默认抛弃被回绝的工作

      个别场景下默认应用ThreadPoolExecutor.AbortPolicy回绝策略

<br/>

  1. newSingleThreadExecutor : 单线程的线程池,只有一个外围线程在执行,可用于须要依照特定程序执行的场景

    public static ExecutorService newSingleThreadExecutor() {     return new FinalizableDelegatedExecutorService         (new ThreadPoolExecutor(1, 1,                                 0L, TimeUnit.MILLISECONDS,                                 new LinkedBlockingQueue<Runnable>()));}

    通过入参能够看到只应用一个线程,采纳LinkedBlockingQueue无界队列,keepAliveTime是0s,阐明线程创立了不会超时终止,该线程程序执行所有工作

  2. newCachedThreadPool : 外围线程为0,非核心线程数为int的最大值

    public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                 60L, TimeUnit.SECONDS,                                 new SynchronousQueue<Runnable>());}

    keepAliveTime是60s,采纳SynchronousQueue同步阻塞队列,当有新的工作进来此时如果有闲暇的线程则重复使用,否则就从新创立一个新的线程,线程闲暇60s后会被回收,对于同步阻塞队列能够看这篇文章SynchronousQueue

  3. newScheduledThreadPool : 外围线程为传入的固定数值,非核心线程数为int的最大值,可用于延时或定时工作的执行

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize);}public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE,       DEFAULT_KEEPALIVE_MILLIS,        MILLISECONDS,       new DelayedWorkQueue());}

    办法参数中的DelayedWorkQueue底层也是基于数组实现的最小堆的具备优先级的队列,队列中的工作依照执行工夫升序排列,执行工夫越凑近以后工夫的工作排在最后面,此时也会最先执行该工作

<br/>

线程池外部执行机制

外部执行机制根本依照ThreadPoolExecutor构造函数传入的参数来解决提交进来的工作

线程池ThreadPoolExecutor中的execute执行办法

先看下execute办法源码

public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        int c = ctl.get();        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;            c = ctl.get();        }        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))                reject(command);            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }        else if (!addWorker(command, false))            reject(command);    }

办法中的ctl是ThreadPoolExecutor中申明的提供原子操作的Integer对象,用于获取以后线程池状态

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

  1. 判断以后线程数量是否小于外围线程数量corePoolSize,如果满足条件调用addWorker()办法创立一个新的外围线程
  2. 如果大于corePoolSize,接着判断以后线程池是否是运行状态并且通过workQueue.offer()写入阻塞队列
  3. 此时再次查看线程池状态是否正在运行,否则从队列中移除工作并执行回绝策略;如果是运行状态,调用workerCountOf()判断以后线程池线程数,数量为0就新创建一个新的线程
  4. 如果首次判断线程池状态非运行状态,调用addWorker()创立心线程如果失败,执行回绝策略

<br/>
ThreadPoolExecutor中提供了shutdown()、shutdownNow()办法用于敞开线程池,调用shutdown时不再承受新的工作,之前提交的工作等执行完结再敞开线程池,调用shutdownNow时会尝试进行线程池中的工作而后再敞开,并且返回未解决完的List<> tasks工作列表

ForkJoinPool初探

ThreadPoolExecutor阻塞队列中的工作都是单个线程去执行,如果此时须要进行密集型计算工作(比方大数组排序、遍历零碎文件夹并计算文件数量),就可能呈现线程池中一个线程忙碌而其余线程闲暇,导致CPU负载不平衡系统资源节约,ForkJoinPool就是用于将单个密集型计算工作拆分成多个小工作,通过fork让线程池其它线程来执行小工作,通过join合并线程执行工作后果,采纳并行执行机制,进步CPU的使用率

ForkJoinPool构造函数

public ForkJoinPool() {    this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),        defaultForkJoinWorkerThreadFactory, null, false);}public ForkJoinPool(int parallelism) {    this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);}    public ForkJoinPool(int parallelism,    ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,boolean asyncMode) {        this(checkParallelism(parallelism),checkFactory(factory),handler,             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,             "ForkJoinPool-" + nextPoolId() + "-worker-");    checkPermission();}

结构函数参数定义如下

  • parallelism : 并行线程数,默认为Runtime.getRuntime().availableProcessors(),最小为1
  • factory : 线程创立工厂,对象类型为ForkJoinWorkerThread
  • handler : 线程执行时异样解决
  • asyncMode : 为true示意解决工作的线程以队列模式依照先进先出(FIFO)程序,此时不反对工作合并,false则是依照栈的模式后进先出(LIFO)程序,默认是false反对工作合并(join)

获取ForkJoinPool对象能够间接应用commonPool()办法,

val pool = ForkJoinPool.commonPool()public static ForkJoinPool commonPool() {    // assert common != null : "static init error";    return common;}

而common对应的初始化放在动态代码块中,且最终调用了ForkJoinPool的构造函数

static{    //...    common = java.security.AccessController.doPrivileged            (new java.security.PrivilegedAction<ForkJoinPool>() {                public ForkJoinPool run() { return makeCommonPool(); }});    //...}private static ForkJoinPool makeCommonPool() {    //...    return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,                            "ForkJoinPool.commonPool-worker-");}

ForkJoinTask工作

ForkJoinTask抽象类实现了Future接口,同时提供了RecursiveAction和RecursiveTask两个实现类,该实现类提供泛型类型申明,源码如下

public abstract class RecursiveTask<V> extends ForkJoinTask<V> {    V result;                                //返回后果    protected abstract V compute();         //执行工作    public final V getRawResult() {         //获取result        return result;    }    protected final boolean exec() {        //ForkJoinTask中的形象办法        result = compute();        return true;    }}

RecursiveAction绝对应RecursiveTask返回值为void,上面看下ForkJoinTask的三个外围办法

  • fork : 在工作执行过程中将大工作拆分为多个小的子工作
  • join : 调用子工作的join()办法期待工作返回后果,如果子工作执行异样,join()会抛出异样,quietlyJoin()办法不会抛出异样,须要调用getException()或getRawResult()手动解决异样和后果
  • invoke : 在以后线程同步执行该工作,同join一样,如果子工作执行异样,invoke()会抛出异样,quietlyInvoke()办法不会抛出异样,须要调用getException()或getRawResult()手动解决异样和后果

执行ForkJoinTask工作

应用ForkJoinPool时,能够通过以下三个办法执行ForkJoinTask工作,

public <T> T invoke(ForkJoinTask<T> task)public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)public void execute(ForkJoinTask<?> task)
  • invoke : 执行有返回值的工作,同步阻塞办法直到工作执行结束
  • submit : 执行没有返回值的工作
  • execute : 执行带有ForkJoinTask对象返回的工作,非阻塞办法,调用后ForkJoinPool会立刻执行并返回以后执行的task对象

invoke()、submit()是对ExecutorService接口的办法实现,同时ForkJoinPool 也定义了用来执行ForkJoinTask的execute办法

work-stealing模式

对于work-stealing模式形容可参见上面这篇博文,外围就是每个工作线程都有本人的工作队列,当某个线程实现工作后会去"拿"别的队列里的工作去执行,work-stealing模式

ForkJoinPool 与 ThreadPoolExecutor 比拟

ThreadPoolExecutor 与 ForkJoinPool 都实现了ExecutorService,不同之处在于前者只能执行 Runnable 和 Callable 工作,执行程序是依照其在阻塞队列中的程序来执行;后者除了能执行前者的工作类型外还扩大处ForkJoinTask类型工作,从而满足work-stealing这种算法模式,ForkJoinPool波及的技术点还有很多,须要持续深刻摸索,例如ForkJoinPool中的线程状态、fork()、compute()、join()的调用程序、子工作拆分粒度等细节内容...