关于android:Java深入研究ThreadPoolExecutor线程池

3次阅读

共计 6677 个字符,预计需要花费 17 分钟才能阅读完成。

在 Java 代码中咱们经常会开启异步线程去执行一些网络申请, 或是开启子线程去读写文件, 这些线程的开启与执行在并发量较小的场景下能够失常运行, 如果波及并发量比拟大、线程数量无限、响应速度要快的业务场景下, 此时就不容许独自创立线程去执行工作, 而是基于线程池治理、散发线程机制去执行线程工作, 从而升高资源耗费、进步响应速度,对立治理线程资源

线程池的创立与分类

Exectors 类是 concurrent 包下的用于疾速创立线程的工具类, 该类中定义了一系列创立不同线程类型的静态方法, 理论还是调用 ThreadPoolExecutor 类的有参函数, 上面看下对应的办法源码

--- newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>());
}
--- 调用有参函数
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
        Executors.defaultThreadFactory(), defaultHandler);
}
  1. newFixedThreadPool : 固定数量的线程池, 可用于限度特定线程启用数量的场景, 调用 ThreadPoolExecutor 构造函数中的参数定义如下

    • corePoolSize : 外围线程数量
    • maximumPoolSize : 最大线程数量
    • keepAliveTime : 当线程的数量大于外围线程时, 闲暇线程在终止之前期待新工作的最大工夫
    • unit : 参数 keepAliveTime 的工夫单位
    • workQueue : 寄存期待执行工作的阻塞队列, 罕用的组赛队列如下

      1. ArrayBlockingQueue : 基于数组的有界阻塞队列, 遵循 FIFO(先进先出)准则, 构造函数提供设置队列大小参数, 采纳 ReentrantLock(基于 AQS 实现)获取重入锁, 如果向已满的队列插入则以后线程阻塞
      2. LinkedBlockingQueue : 基于链表的无界阻塞队列, 默认大小为 Integer.MAX_VALUE, 向该队列插入数据时会封装到 Node<> 节点所对应的链表中, 队列外部应用了 putLock 和 takeLock 标识增加、删除锁,二者可并发执行
      3. SynchronousQueue : 单向链表同步队列, 具体需查看源码(常识盲区, 未钻研到该队列)
      4. PriorityBlockingQueue : 具备优先级排序的无界阻塞队列, 默认以天然排序形式或者通过传入可比拟的 Comparator 比拟器进行排序
    • threadFactory : 默认线程创立工厂
    • defaultHandler : 回绝策略, 默认应用 ThreadPoolExecutor.AbortPolicy, 示意当队列满了并且工作线程大于线程池最大线程数量, 此时间接抛出异样,

      1. CallerRunsPolicy : 用于被回绝工作的处理程序,它间接在 execute 办法的调用线程中运行被回绝的工作;如果执行程序已敞开,则会抛弃该工作
      2. DiscardOldestPolicy : 抛弃最老的一个申请,也就是行将被执行的一个工作,并尝试再次提交当前任务
      3. DiscardPolicy : 默认抛弃被回绝的工作

      个别场景下默认应用 ThreadPoolExecutor.AbortPolicy 回绝策略

<br/>

  1. newSingleThreadExecutor : 单线程的线程池, 只有一个外围线程在执行, 可用于须要依照特定程序执行的场景

    public static ExecutorService newSingleThreadExecutor() {
         return new FinalizableDelegatedExecutorService
             (new ThreadPoolExecutor(1, 1,
                                     0L, TimeUnit.MILLISECONDS,
                                     new LinkedBlockingQueue<Runnable>()));
    }

    通过入参能够看到只应用一个线程, 采纳 LinkedBlockingQueue 无界队列,keepAliveTime 是 0s, 阐明线程创立了不会超时终止, 该线程程序执行所有工作

  2. newCachedThreadPool : 外围线程为 0, 非核心线程数为 int 的最大值

    public static ExecutorService newCachedThreadPool() {
     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                     60L, TimeUnit.SECONDS,
                                     new SynchronousQueue<Runnable>());
    }

    keepAliveTime 是 60s, 采纳 SynchronousQueue 同步阻塞队列, 当有新的工作进来此时如果有闲暇的线程则重复使用, 否则就从新创立一个新的线程, 线程闲暇 60s 后会被回收, 对于同步阻塞队列能够看这篇文章 SynchronousQueue

  3. newScheduledThreadPool : 外围线程为传入的固定数值, 非核心线程数为 int 的最大值, 可用于延时或定时工作的执行

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize) {
     super(corePoolSize, Integer.MAX_VALUE,
           DEFAULT_KEEPALIVE_MILLIS, 
           MILLISECONDS,
           new DelayedWorkQueue());
    }

    办法参数中的 DelayedWorkQueue 底层也是基于数组实现的最小堆的具备优先级的队列, 队列中的工作依照执行工夫升序排列, 执行工夫越凑近以后工夫的工作排在最后面, 此时也会最先执行该工作

<br/>

线程池外部执行机制

外部执行机制根本依照 ThreadPoolExecutor 构造函数传入的参数来解决提交进来的工作

线程池 ThreadPoolExecutor 中的 execute 执行办法

先看下 execute 办法源码

public void execute(Runnable command) {if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))
                return;
            c = ctl.get();}
        if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

办法中的 ctl 是 ThreadPoolExecutor 中申明的提供原子操作的 Integer 对象, 用于获取以后线程池状态

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

  1. 判断以后线程数量是否小于外围线程数量 corePoolSize, 如果满足条件调用 addWorker()办法创立一个新的外围线程
  2. 如果大于 corePoolSize, 接着判断以后线程池是否是运行状态并且通过 workQueue.offer()写入阻塞队列
  3. 此时再次查看线程池状态是否正在运行, 否则从队列中移除工作并执行回绝策略; 如果是运行状态, 调用 workerCountOf()判断以后线程池线程数, 数量为 0 就新创建一个新的线程
  4. 如果首次判断线程池状态非运行状态, 调用 addWorker()创立心线程如果失败, 执行回绝策略

<br/>
ThreadPoolExecutor 中提供了 shutdown()、shutdownNow()办法用于敞开线程池, 调用 shutdown 时不再承受新的工作, 之前提交的工作等执行完结再敞开线程池, 调用 shutdownNow 时会尝试进行线程池中的工作而后再敞开, 并且返回未解决完的 List<> tasks 工作列表

ForkJoinPool 初探

ThreadPoolExecutor 阻塞队列中的工作都是单个线程去执行, 如果此时须要进行密集型计算工作(比方大数组排序、遍历零碎文件夹并计算文件数量), 就可能呈现线程池中一个线程忙碌而其余线程闲暇, 导致 CPU 负载不平衡系统资源节约,ForkJoinPool 就是用于将单个密集型计算工作拆分成多个小工作, 通过 fork 让线程池其它线程来执行小工作, 通过 join 合并线程执行工作后果, 采纳并行执行机制, 进步 CPU 的使用率

ForkJoinPool 构造函数

public ForkJoinPool() {this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
        defaultForkJoinWorkerThreadFactory, null, false);
}

public ForkJoinPool(int parallelism) {this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
    
public ForkJoinPool(int parallelism,
    ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,boolean asyncMode) {this(checkParallelism(parallelism),checkFactory(factory),handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
    checkPermission();}

结构函数参数定义如下

  • parallelism : 并行线程数, 默认为 Runtime.getRuntime().availableProcessors(), 最小为 1
  • factory : 线程创立工厂, 对象类型为 ForkJoinWorkerThread
  • handler : 线程执行时异样解决
  • asyncMode : 为 true 示意解决工作的线程以队列模式依照先进先出 (FIFO) 程序, 此时不反对工作合并,false 则是依照栈的模式后进先出 (LIFO) 程序, 默认是 false 反对工作合并(join)

获取 ForkJoinPool 对象能够间接应用 commonPool()办法,

val pool = ForkJoinPool.commonPool()

public static ForkJoinPool commonPool() {
    // assert common != null : "static init error";
    return common;
}

而 common 对应的初始化放在动态代码块中,且最终调用了 ForkJoinPool 的构造函数

static{
    //...
    common = java.security.AccessController.doPrivileged
            (new java.security.PrivilegedAction<ForkJoinPool>() {public ForkJoinPool run() {return makeCommonPool(); }});
    //...
}

private static ForkJoinPool makeCommonPool() {
    //...
    return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                            "ForkJoinPool.commonPool-worker-");
}

ForkJoinTask 工作

ForkJoinTask 抽象类实现了 Future 接口, 同时提供了 RecursiveAction 和 RecursiveTask 两个实现类, 该实现类提供泛型类型申明, 源码如下

public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
    V result;                                // 返回后果
    protected abstract V compute();         // 执行工作

    public final V getRawResult() {         // 获取 result
        return result;
    }

    protected final boolean exec() {        //ForkJoinTask 中的形象办法
        result = compute();
        return true;
    }
}

RecursiveAction 绝对应 RecursiveTask 返回值为 void, 上面看下 ForkJoinTask 的三个外围办法

  • fork : 在工作执行过程中将大工作拆分为多个小的子工作
  • join : 调用子工作的 join()办法期待工作返回后果, 如果子工作执行异样,join()会抛出异样,quietlyJoin()办法不会抛出异样, 须要调用 getException()或 getRawResult()手动解决异样和后果
  • invoke : 在以后线程同步执行该工作, 同 join 一样,如果子工作执行异样,invoke()会抛出异样,quietlyInvoke()办法不会抛出异样, 须要调用 getException()或 getRawResult()手动解决异样和后果

执行 ForkJoinTask 工作

应用 ForkJoinPool 时, 能够通过以下三个办法执行 ForkJoinTask 工作,

public <T> T invoke(ForkJoinTask<T> task)
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
public void execute(ForkJoinTask<?> task)
  • invoke : 执行有返回值的工作, 同步阻塞办法直到工作执行结束
  • submit : 执行没有返回值的工作
  • execute : 执行带有 ForkJoinTask 对象返回的工作, 非阻塞办法, 调用后 ForkJoinPool 会立刻执行并返回以后执行的 task 对象

invoke()、submit()是对 ExecutorService 接口的办法实现, 同时 ForkJoinPool 也定义了用来执行 ForkJoinTask 的 execute 办法

work-stealing 模式

对于 work-stealing 模式形容可参见上面这篇博文,外围就是每个工作线程都有本人的工作队列, 当某个线程实现工作后会去 ” 拿 ” 别的队列里的工作去执行,work-stealing 模式

ForkJoinPool 与 ThreadPoolExecutor 比拟

ThreadPoolExecutor 与 ForkJoinPool 都实现了 ExecutorService, 不同之处在于前者只能执行 Runnable 和 Callable 工作, 执行程序是依照其在阻塞队列中的程序来执行; 后者除了能执行前者的工作类型外还扩大处 ForkJoinTask 类型工作, 从而满足 work-stealing 这种算法模式,ForkJoinPool 波及的技术点还有很多, 须要持续深刻摸索, 例如 ForkJoinPool 中的线程状态、fork()、compute()、join()的调用程序、子工作拆分粒度等细节内容 …

正文完
 0