乐趣区

关于java:JDK-ThreadPoolExecutor核心原理与实践

一、内容概括

本文内容次要围绕 JDK 中的 ThreadPoolExecutor 开展,首先形容了 ThreadPoolExecutor 的结构流程以及外部状态治理的机理,随后用大量篇幅深刻源码探索了 ThreadPoolExecutor 线程调配、工作解决、回绝策略、启动进行等过程,其中对 Worker 内置类进行重点剖析,内容不仅蕴含其工作原理,更对其设计思路进行了肯定剖析。文章内容既蕴含了源码流程剖析,还具备设计思路探讨和二次开发实际。

二、结构 ThreadPoolExecutor

2.1 线程池参数列表

大家能够通过如下构造方法创立线程池(其实还有其它结构器,大家能够深刻源码进行查看,但最终都是调用上面的结构器创立线程池);

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {...}

其中的结构参数的作用如下:

  • corePoolSize:外围线程数。提交工作时,当线程池中的线程数 小于 corePoolSize 时,会 新 创立一个外围线程执行工作。当线程数 等于 corePoolSize 时,会将工作 增加进工作队列。
  • maximumPoolSize:最大线程数。提交工作时,当 工作队列已满 并且线程池中的总线程数 不大于 maximumPoolSize 时,线程池会令非核心线程执行提交的工作。当 大于 maximumPoolSize 时,会执行回绝策略。
  • keepAliveTime:非核心线程 闲暇时 的存活工夫。
  • unit:keepAliveTime 的单位。
  • workQueue:工作队列(阻塞队列)。
  • threadFactory:线程工厂。线程池用来新创建线程的工厂类。
  • handler:回绝策略,线程池遇到无奈解决的状况时会执行该回绝策略抉择摈弃或疏忽工作等。

2.2 执行流程概述

由结构参数的作用咱们可知,线程池中由几个重要的组件:外围线程池  闲暇(非核心)线程池 和  阻塞队列。这里首先给出线程池的外围执行流程图,大家首先对其有个印象,之后剖析源码就会轻松一些了。

上面对流程图中一些正文阐明下:cap 示意池的容量,size 示意池中正在运行的线程数。对于阻塞队列来说,cap 示意队列容量,size 示意曾经入队的工作数量。cpS<cpc 示意运行中的外围线程数小于线程池设置外围线程数的状况。

1)当外围线程池 未“满”时,会创立新的外围线程执行提交的工作。这里的“满”指的是外围线程池中的数量(size)小于容量(cap),此时会通过线程工厂新创建线程执行提交工作。

2)当外围线程池 已“满”时,会将提交的工作 push 进工作队列中,期待外围线程的开释。一旦外围线程开释后,将会从工作队列中 pull task 继续执行。因为应用的是阻塞队列,对于曾经开释的外围线程,也会阻塞在获取工作的过程中。

3)当工作队列也满了时(这里的满是指真的满了,当然暂不思考无界队列状况),会从闲暇线程池中持续创立线程执行提交的工作。但闲暇线程池中的线程是有 存活工夫(keepAliveTime)的,当线程执行完工作后,只能存活 keepAliveTime 时长,工夫一过,线程就得被销毁。

4)当闲暇线程池的线程数一直减少,直到 ThreadPoolExecutor 中的总线程数大于 maximumPoolSize 时,会拒绝执行工作,将提交的工作交给 RejectedExecutionHandler 进行后续解决。

下面所说的外围线程池和闲暇线程池只是形象进去的一个概念,前面咱们将对其具体内容进行剖析。

2.3 罕用线程池

在进入 ThreadPoolExecutor 的源码剖析前,咱们先介绍下罕用的线程池(其实并不罕用,只是 JDK 自带了)。这些线程池可由 Executors 这个工具类(或叫线程池工厂)来创立。

2.3.1 FixedThreadPool

固定线程数线程池的创立形式如下:其中外围线程数与最大线程数固定且相等,采纳以链表为底层构造的无界阻塞队列。

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

特点

  • 外围线程数与最大线程数相等,因而不会创立闲暇线程。keepAliveTime 设置与否无关紧要。
  • 采纳无界队列,工作会被有限增加,直至内存溢出(OOM)。
  • 因为无界队列不可能被占满,工作在执行前不可能被回绝(前提是线程池始终处于运行状态)。

利用场景

  • 实用于线程数固定的场景
  • 实用负载比拟重的服务器

2.3.2 SingleThreadExecutor

单线程线程池的创立形式如下:其中外围线程数与最大线程数都为 1,采纳以链表为底层构造的无界阻塞队列。

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}

特点

  • 与 FixedThreadPool 相似,只是线程数为 1 而已。

利用场景

  • 实用单线程的场景。
  • 实用于对提交工作的解决有程序性要求的场景。

2.3.3 CachedThreadPool

缓冲线程池的创立形式如下:其中外围线程数为 0,最大线程数为 Integer.MAX_VALUE(能够了解为无穷大)。采纳同步阻塞队列。

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}

特点

  • 外围线程数为 0,则初始就创立闲暇线程,并且闲暇线程的只能期待工作 60s,60s 内没有提交工作,闲暇线程将被销毁。
  • 最大线程数为无穷大,这样会造成巨量线程同时运行,CPU 负载过高,导致利用解体。
  • 采纳同步阻塞队列,即队列不存储工作。提交一个生产一个。因为最大线程数为无穷大,因而,只有提交工作就肯定会被生产(利用未解体前)。

利用场景

  • 实用于耗时短、异步的小程序。
  • 实用于负载较轻的服务器。

三、线程池状态以及沉闷线程数

ThreadPoolExecutor 中有两个十分重要的参数:线程池状态 (rs)以及 沉闷线程数(wc)。前者用于标识以后线程池的状态,并依据状态量来控制线程池应该做什么;后者用于标识沉闷线程数,依据数量管制应该在外围线程池还是闲暇线程池创立线程。

ThreadPoolExecutor 用一个 Integer 变量(ctl)来设置这两个参数。咱们晓得,在不同操作系统下,Java 中的 Integer 变量都是 32 位,ThreadPoolExecutor 应用前 3 位(31~29)示意线程池状态,用后 29 位(28~0)示意沉闷线程数。

这样设置的目标是什么呢?

咱们晓得,在并发场景中同时保护两个变量的代价是十分大的,往往须要进行加锁来保障两个变量的变动是原子性的。而将两个参数用一个变量保护,便只需一条语句就能保障两个变量的原子性。这种形式大大降低了应用过程中的并发问题。

有了下面的概念,咱们从源码层面看看 ThreadPoolExecutor 的几种状态,以及 ThreadPoolExecutor 如何同时操作状态和沉闷线程数这两个参数的。

ThreadPoolExecutor 对于状态初始化的源码如下:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

ThreadPoolExecutor 应用原子 Integer 定义了 ctl 变量。ctl 在一个 int 中包装了沉闷线程数和线程池运行时状态两个变量。为了达到这样的目标,ThreadPoolExecutor 的线程数被限度在 2^29-1(大概 500 million)个,而不是 2^31-1(2 billion)个,因为前 3 位被用于标识 ThreadPoolExecutor 的状态。如果将来 ThreadPoolExecutor 中的线程数不够用了,能够把 ctl 设置为原子 long 类型,再调整下相应的掩码就行了。

COUNT_BITS 概念上用于示意状态位与线程数位的分界值,理论用于状态变量等移位操作。此处为 Integer.sixze-3=32-3=29。

CAPACITY 示意 ThreadPoolExecutor 的最大容量。由下图能够看出,通过移位操作后,一个 int 值的后 29 位达到最大值:全为 1。这 29 位示意沉闷线程数,全为 1 时表明达到 ThreadPoolExecutor 能包容的最大线程数。前 3 位为 0,示意该变量只与沉闷线程数相干,与状态无关。这也是为了便于后续的位操作。

RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED 示意 ThreadPoolExecutor 的 5 个状态。这 5 个状态对应的可执行操作如下:

RUNNING:可接管新工作,可继续解决阻塞队列中的工作。

SHUTDOWN:不可接管新工作,可持续解决阻塞队列中的工作。

STOP:不可接管新工作,中断阻塞队列中所有工作。

TIDYING:所有工作间接终止,所有线程清空。

TERMINATED:线程池敞开。

这 5 个状态的计算过程如下图所示,通过移位计算后,数值的后 29 位全为 0,前 3 位别离代表不同的状态。

通过以上的变量定义后,ThreadPoolExecutor 将状态与线程数拆散,别离设置再一个 int 值的不同间断位上,这也为上面的操作带来了极大的便当。

接下来咱们来看看 ThreadPoolExecutor 是如何获取状态和线程数的。

3.1 runStateOf(c)办法

private static int runStateOf(int c) {return c & ~CAPACITY;}

runStateOf() 办法是用于获取线程池状态的办法。其中形参 c 个别是 ctl 变量,蕴含了状态和线程数,runStateOf()移位计算的过程如下图所示。

CAPACITY 取反后高三地位 1,低 29 地位 0。取反后的值与 ctl 进行‘与’操作。因为任何值‘与’1 等于原值,‘与’0 等于 0。因而‘与’操作过后,ctl 的高 3 位保留原值,低 29 地位 0。这样就将状态值从 ctl 中分离出来。

3.2 workerCountOf(c)办法

private static int workerCountOf(int c) {return c & CAPACITY;}

workerCountOf(c) 办法的剖析思路与上述相似,就是把后 29 位从 ctl 中分离出来,取得沉闷线程数。如下图所示,这里就不再赘述。

3.3 ctlOf(rs, wc)办法

private static int ctlOf(int rs, int wc) {return rs | wc;}

ctlOf(rs, wc)通过状态值和线程数值计算出 ctl 值。rs 是 runState 的缩写,wc 是 workerCount 的缩写。rs 的后 29 位为 0,wc 的前三位为 0,两者通过‘或’操作计算出来的最终值同时保留了 rs 的前 3 位和 wc 的后 29 位,即 ctl 值。

ThreadPoolExecutor 中还有一些其它操作 ctl 的办法,剖析思路与下面都大同小异,大家有趣味能够本人看看。

本小结最初再来看看 ThreadPoolExecutor 状态转换的路径,也能够了解为生命周期。

四、execute()执行流程

4.1 execute 办法

execute() 源码如下所示:

public void execute(Runnable command) {
  // 如果待执行的工作为 null,间接返回空指针异样。如果工作都没有,上面的步骤都没有执行的必要啦。if (command == null) throw new NullPointerException();
  // 获取 ctl 的值,ctl = (runState + workerCount)
  int c = ctl.get();
  // 如果 workerCount(工作线程数) < 外围线程数
  if (workerCountOf(c) < corePoolSize) {// 执行 addWorker 办法。addWorker()办法会在上面进行详细分析,这里能够简略了解为增加工作线程解决工作。这里的 true 示意:在小于外围线程数时增加 worker 线程,即增加外围线程。if (addWorker(command, true))
      // 增加胜利则间接返回
      return;
    // 增加失败,从新获取 ctl 的值,避免在增加 worker 时状态扭转
    c = ctl.get();}
  // 运行到这里示意外围线程数已满,因而上面 addWorker 中第二个参数为 false。判断线程池是否是运行状态,如果是则尝试将工作增加至 工作队列 中
  if (isRunning(c) && workQueue.offer(command)) {
    // 再次获取 ctl 的值,进行 double-check
    int recheck = ctl.get();
    // 如果线程池为非运行状态,则尝试从工作队列中移除工作
    if (! isRunning(recheck) && remove(command))
      // 移除胜利后执行回绝策略
      reject(command);
    // 如果线程池为运行状态、或移除工作失败
    else if (workerCountOf(recheck) == 0)
      // 执行 addWorker 办法,此时增加的是非核心线程(闲暇线程,有存活工夫)addWorker(null, false);
  }
  // 如果线程池是非运行状态,或者 工作队列 增加工作失败,再次尝试 addWorker() 办法
  else if (!addWorker(command, false))
    // addWorker() 失败,执行回绝策略
    reject(command);
}

源码剖析间接看正文就行了,每一行都有,灰常灰常的具体了。

从源码中能够看到,execute() 办法次要封装了 ThreadPoolExecutor 创立线程的判断逻辑,外围线程和闲暇线程的创立机会,回绝策略的执行机会都在该办法进行判断。这里通过上面的流程图对上述源码进行总结下。

通过创立线程去执行提交的工作逻辑封装在 addWorker() 办法中。下一大节咱们未来剖析执行提交工作的具体逻辑。execute() 办法中还有几个办法这里阐明下。

3.1.1 workerCountOf()

从 ctl 中获取沉闷线程数,在第二大节曾经介绍过了。

3.1.2 isRunning()

private static boolean isRunning(int c) {return c < SHUTDOWN;}

根据 ctl 的值判断 ThreadPoolExecutor 是否运行状态。源码中直接判断 ctl < SHUTDOWN 是否成立,这是因为运行状态下的 ctl 最高位为 1,必定是正数;而其它状态最高位为 0,必定是负数。因而判断 ctl 的大小即可判断是否为运行态。

3.1.3 reject()

final void reject(Runnable command) {handler.rejectedExecution(command, this);
}

间接调用初始化时的 RejectedExecutionHandler 接口的 rejectedExecution() 办法。这也是典型的策略模式的应用,真正的回绝操作被封装在实现了 RejectedExecutionHandler 接口的实现类中。这里就不进行开展。

4.2 addWorker 办法

addWorker()源码剖析如下:

private boolean addWorker(Runnable firstTask, boolean core) {
  retry:
  // 死循环执行逻辑。确保多线程环境下在预期条件下退出循环。for (;;) {
    // 获取 ctl 值并从中提取线程池 运行状态
    int c = ctl.get();
    int rs = runStateOf(c);
    // 如果 rs > SHUTDOWN,此时不容许接管新工作,也不容许执行工作队列中的工作,间接返回 fasle。// 如果 rs == SHUTDOWN,工作为 null,并且工作队列不为空,此时走上面的 '执行工作队列中工作' 的逻辑。// 这里设置 firstTask == null 是因为:线程池在 SHUTDOWN 状态下,不容许增加新工作,只容许执行工作队列中残余的工作。if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
      return false;
    for (;;) {
      // 获取沉闷线程数
      int wc = workerCountOf(c);
      // 如果沉闷线程数 >= 容量,不容许增加新工作
      // 如果 core 为 true,示意创立外围线程,如果 沉闷线程数 > 外围线程数,则不容许创立线程
      // 如果 core 为 false,示意创立闲暇线程,如果 沉闷线程数 > 最大线程数,则不容许创立线程
      if (wc >= CAPACITY ||
          wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
      // 尝试减少外围线程数,减少胜利间接中断最外层死循环,开始创立 worker 线程
      // 减少失败则继续执行循环内逻辑
      if (compareAndIncrementWorkerCount(c))
        break retry;
      // 获取 ctl 值,判断运行状态是否扭转
      c = ctl.get();
      // 如果运行状态曾经扭转,则从从新执行外层死循环
      // 如果运行状态未扭转,继续执行内层死循环
      if (runStateOf(c) != rs)
        continue retry;
    }
  }
  // 用于记录 worker 线程的状态
  boolean workerStarted = false;
  boolean workerAdded = false;
  Worker w = null;
  try {
    // new 一个新的 worker 线程,每一个 Worker 内持有真正执行工作的线程。w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
      // 加锁,保障 workerAdded 状态更改的原子性
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
        // 获取线程池状态
        int rs = runStateOf(ctl.get());
        // 如果为运行状态,则创立 worker 线程
        // 如果为 SHUTDOWN 状态,并且 firstTask == null,此时将创立线程执行 工作队列 中的工作。if (rs < SHUTDOWN ||
            (rs == SHUTDOWN && firstTask == null)) {
          // 如果线程在未启动前就曾经运行,抛出异样
          if (t.isAlive())
            throw new IllegalThreadStateException();
          // 本地缓存 worker 线程
          workers.add(w);
          int s = workers.size();
          if (s > largestPoolSize)
            largestPoolSize = s;
          // worker 线程增加胜利,更改为 true 状态
          workerAdded = true;
        }
      } finally {mainLock.unlock();
      }
      // 更改状态胜利后启动 worker 线程
      if (workerAdded) {
        // 启动 worker 线程
        t.start();
        // 更改启动状态
        workerStarted = true;
      }
    }
  } finally {
    // 如果工作线程状态未扭转,则解决失败逻辑
    if (! workerStarted)
      addWorkerFailed(w);
  }
  return workerStarted;
}

addWorker() 通过内外两层死循环判断 ThreadPoolExecutor 运行状态并通过 CAS 胜利更新沉闷线程数。这是为了保障线程池中的多个线程在并发环境下都可能依照预期的条件退出循环。

随后办法会 new 一个 Worker 并启动 Worker 内置的工作线程。这里通过 workerAdded 和 workerStarted 两个状态判断 Worker 是否被胜利缓存与启动。

批改 workerAdded 过程会应用 ThreadPoolExecutor 的 mainlock 上锁保障原子性,避免多线程并发环境下,向 workers 中增加数据以及获取 workers 数量这两个过程呈现预期之外的状况。

addWorker() 启动 worker 线程的步骤是先 new 一个 Worker 对象,而后从中获取工作线程,再 start,因而真正的线程启动过程还是在 Worker 对象中。

这里通过一张流程图对 addWorker 总结下:

addWorker 还有几个办法也在这里剖析下:

4.2.1 runStateOf()

从 ctl 中获取 ThreadPoolExecutor 状态,详细分析看第二章。

4.2.2 workerCountOf()

从 ctl 中获取 ThreadPoolExecutor 沉闷线程数,详细分析看第二章。

4.2.3 compareAndIncrementWorkerCount()

int c = ctl.get();
if (compareAndIncrementWorkerCount(c)) {...}
private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);
}

通过 CAS 的形式令 ctl 中沉闷线程数 +1。这里为什么只有让 ctl 的值 + 1 就能更改线程数了呢?因为 ctl 线程数的值存储在后 29 位中,在不溢出的状况下,+ 1 只会影响后 29 位的数值,只会令线程数 +1。而不影响线程池状态。

4.2.4 addWorkerFailed()

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {if (w != null)
            // 移除 worker
            workers.remove(w);
        // 沉闷线程数 -1
        decrementWorkerCount();
        // 尝试进行线程池
        tryTerminate();} finally {mainLock.unlock();
    }
}
 
private void decrementWorkerCount() {do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

该办法是在工作线程启动失败后执行的办法。什么状况下会呈现这种问题呢?在胜利减少沉闷线程数后并胜利 new Worker 后,线程池状态扭转为 > SHUTDOWN,既不可承受新工作,又不能执行工作队列残余的工作,此时线程池应该间接进行。

该办法就是在这种状况下:

  • 从 workers 缓存池中移除新创建的 Worker;
  • 通过死循环 +CAS 确保沉闷线程数减 1;
  • 执行 tryTerminate() 办法,尝试进行线程池。

执行完 tryTerminate() 办法后,线程池将会进入到 TERMINATED状态。

4.2.5 tryTerminate()

final void tryTerminate() {for (;;) {int c = ctl.get();
        // 如果以后线程池状态为以下之一,无奈间接进入 TERMINATED 状态,间接返回 false,示意尝试失败
        if (isRunning(c) || runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        // 如果沉闷线程数不为 0,中断所有的 worker 线程,这个会在上面具体解说,这里会关系到 Worker 尽管继承了 AQS,然而并未应用外面的 CLH 的起因。if (workerCountOf(c) != 0) {interruptIdleWorkers(ONLY_ONE);
            return;
        }
        // 加上全局锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {// 首先通过 CAS 将 ctl 扭转成 (rs=TIDYING, wc=0),因为通过下面的判断保障了当先线程池可能达到这个状态。if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 钩子函数,用户能够通过继承 ThreadPoolExecutor 实现自定义的办法。terminated();} finally {// 将 ctl 扭转成 (rs=TERMINATED, wc=0),此时线程池将敞开。ctl.set(ctlOf(TERMINATED, 0));
                    // 唤醒其它线程,唤醒其实也没用了,其它线程唤醒后通过判断得悉线程池 TERMINATED 后也会退出。termination.signalAll();}
                return;
            }
        } finally {
            // 开释全局锁
            mainLock.unlock();}
    }
}

五、Worker 内置类剖析

5.1 Worker 对象剖析

Worker 对象的源码剖析:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
  // 工作线程
  final Thread thread;
  // 提交的待执行工作
  Runnable firstTask;
  // 曾经实现的任务量
  volatile long completedTasks;
  Worker(Runnable firstTask) {
    // 初始化状态
    setState(-1);
    this.firstTask = firstTask;
    // 通过线程工厂创立线程
    this.thread = getThreadFactory().newThread(this);
  }
  // 执行提交工作的办法,具体执行逻辑封装在 runWorker() 中,当 addWorker() 中 t.start()后,将执行该办法
  public void run() {runWorker(this);
  }
  // 实现 AQS 中的一些办法
  protected boolean isHeldExclusively() { ...}
  protected boolean tryAcquire(int unused) {...}
  protected boolean tryRelease(int unused) {...}
  public void lock()        { ...}
  public boolean tryLock()  { ...}
  public void unlock()      { ...}
  public boolean isLocked() { ...}
  // 中断持有的线程
  void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try { t.interrupt(); }
      catch (SecurityException ignore) {}}
  }
}

从下面源码能够看出:Worker 实现了 Runnable 接口,阐明 Worker 是一个工作;Worker 又继承了 AQS,阐明 Worker 同时具备锁的性质,但 Worker 并没有像 ReentrantLock 等锁工具应用了 CLH 的性能,因为线程池中并不存在多个线程拜访同一个 Worker 的场景,这里只是应用了 AQS 中状态保护的性能,这个具体会在上面进行具体阐明。

每个 Worker 对象会持有一个工作线程 thread,在 Worker 初始化时,通过线程工厂创立该工作线程并将本人作为工作传入工作线程当中。因而,线程池中工作的运行其实并不是间接执行提交工作的 run()办法,而是执行 Worker 中的 run()办法,在该办法中再执行提交工作的 run()办法。

Worker 中的 run() 办法是委托给 ThreadPoolExecutor 中的 runWorker() 执行具体逻辑。

这里用一张图总结下:

  • Worker 自身是一个工作,并且持有用户提交的工作和工作线程。
  • 工作线程持有的工作是 this 自身,因而调用工作线程的 start()办法其实是执行 this 自身的 run()办法。
  • this 自身的 run()委托全局的 runWorker()办法执行具体逻辑。
  • runWorker()办法中执行用户提交工作的 run()办法,执行用户具体逻辑。

5.2 runWorker 办法

runWorker() 源码如下所示:

final void runWorker(Worker w) {Thread wt = Thread.currentThread();
  // 拷贝提交的工作,并将 Worker 中的 firstTask 置为 null,便于下一次从新赋值。Runnable task = w.firstTask;
  w.firstTask = null;
  w.unlock();
  boolean completedAbruptly = true;
  try {// 执行完持有工作后,通过 getTask() 一直从工作队列中获取工作
    while (task != null || (task = getTask()) != null) {w.lock();
      try {// ThreadPoolExecutor 的钩子函数,用户能够实现 ThreadPoolExecutor,并重写 beforeExecute() 办法,从而在工作执行前 实现用户定制的操作逻辑。beforeExecute(wt, task);
        Throwable thrown = null;
        try {// 执行提交工作的 run() 办法
          task.run();} catch (RuntimeException x) {...} finally {
          // ThreadPoolExecutor 的钩子函数,同 beforeExecute,只不过在工作执行完后执行。afterExecute(task, thrown);
        }
      } finally {
        // 便于工作回收
        task = null;
        w.completedTasks++;
        w.unlock();}
    }
    completedAbruptly = false;
  } finally {
    // 执行到这里示意工作队列中没了工作,或者线程池敞开了,此时须要将 worker 从缓存冲革除
    processWorkerExit(w, completedAbruptly);
  }
}

runWorker() 是真正执行提交工作的办法,但其并没有通过 Thread.start()办法执行工作,而是间接执行工作的 run()办法。

runWorker() 会从工作队列中一直获取工作并执行。

runWorker() 提供了两个钩子函数,如果 jdk 的 ThreadPoolExecutor 无奈满足开发人员的需要,开发人员能够继承 ThreadPoolExecutor 并重写 beforeExecute()和 afterExecute()办法定制工作执行前须要执行的逻辑。比方设置一些监控指标或者打印日志等。

5.2.1 getTask()

private Runnable getTask() {
    boolean timedOut = false;
    // 死循环保障肯定获取到工作
    for (;;) {
        ...
        try {
            // 从工作队列中获取工作
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {timedOut = false;}
    }
}

5.2.2 processWorkerExit()

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    ...
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        // 从缓存中移除 worker
        workers.remove(w);
    } finally {mainLock.unlock();
    }
    // 尝试进行线程池
    tryTerminate();
    ...
}

六、shutdown()执行流程

线程池领有两个被动敞开的办法;

shutdown():敞开线程池中所有闲暇 Worker 线程,扭转线程池状态为 SHUTDOWN;

shutdownNow():敞开线程池中所有 Worker 线程,扭转线程池状态为 STOP,并返回所有正在期待解决的工作列表。

这里为什么要将 Worker 线程辨别为闲暇和非闲暇呢?

由下面的 runWorker() 办法,咱们晓得 Worker 线程在现实状况下会在 while 循环中一直从工作队列中获取工作并执行,此时的 Worker 线程就是非闲暇的;没有在执行工作的 worker 线程则是闲暇的。因为线程池的 SHUTDOWN 状态不容许接管新工作,只容许执行工作队列中残余的工作,因而须要中断所有闲暇的 Worker 线程,非闲暇线程则继续执行工作队列的工作,直至队列为空。而线程池的 STOP 状态既不容许承受新工作,也不容许执行残余的工作,因而须要敞开所有 Worker 线程,包含正在运行的。

6.1 shutdown()

shutdown() 源码如下:

public void shutdown() {
  // 上全局锁
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    // 校验是否有敞开线程池的权限,这里次要通过 SecurityManager 校验以后线程与每个 Worker 线程的“modifyThread”权限
    checkShutdownAccess();
    // 批改线程池状态
    advanceRunState(SHUTDOWN);
    // 敞开所有闲暇线程
    interruptIdleWorkers();
    // 钩子函数,用户能够继承 ThreadPoolExecutor 并实现自定义钩子,ScheduledThreadPoolExecutor 便实现了本人的钩子函数
    onShutdown();} finally {mainLock.unlock();
  }
  // 尝试敞开线程池
  tryTerminate();}

shutdown() 将 ThreadPoolExecutor 的敞开步骤封装在几个办法中,并且通过全局锁保障只有一个线程能被动敞开 ThreadPoolExecutor。ThreadPoolExecutor 同样提供了一个钩子函数 onShutdown() 让开发人员定制化敞开过程。比方 ScheduledThreadPoolExecutor 就会在敞开时对工作队列进行清理。

上面对其中的办法进行剖析。

checkShutdownAccess()

private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
 
private void checkShutdownAccess() {SecurityManager security = System.getSecurityManager();
  if (security != null) {
    // 校验以后线程的权限,其中 shutdownPerm 就是一个具备 modifyThread 参数的 RuntimePermission 对象。security.checkPermission(shutdownPerm);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {for (Worker w : workers)
        // 校验所有 worker 线程是否具备 modifyThread 权限
        security.checkAccess(w.thread);
    } finally {mainLock.unlock();
    }
  }
}

advanceRunState()

// targetState = SHUTDOWN
private void advanceRunState(int targetState) {for (;;) {int c = ctl.get();
    // 判断以后线程池状态 >= SHUTDOWN 是否成立,如果不成立的话,通过 CAS 进行批改
    if (runStateAtLeast(c, targetState) ||
        ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
      break;
  }
}
private static boolean runStateAtLeast(int c, int s) {return c >= s;}

该办法中判断线当前程池状态 >= SHUTDOWN 是否成立其实也是用到了之前线程池状态定义的技巧。对于非运行状态的其它状态都为负数,且高三位都不同,TERMINATED(011)> TIDYING(010)> STOP(001)> SHUTDOWN(000)而高三位的大小取决了整个数的大小。因而对于不同状态,无论沉闷线程数是多少,线程池的状态始终决定着 ctl 值的大小。即 TERMINATED 状态下的 ctl 值 > TIDYING 状态下的 ctl 值恒成立。

interruptIdleWorkers()

private void interruptIdleWorkers() {interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {for (Worker w : workers) {
      Thread t = w.thread;
      // 判断 worker 线程是否曾经被标记中断了,如果没有,则尝试获取 worker 线程的锁
      if (!t.isInterrupted() && w.tryLock()) {
        try {
          // 中断线程
          t.interrupt();} catch (SecurityException ignore) { } finally {w.unlock();
        }
      }
      // 如果 onlyOne 为 true 的话最多中断一个线程
      if (onlyOne)
        break;
    }
  } finally {mainLock.unlock();
  }
}

刚办法会尝试获取 Worker 的锁,只有获取胜利的状况下才会中断线程。这里也与后面说的 Worker 尽管继承了 AQS 但却没应用 CLH 无关,前面会进行剖析。

tryTerminate() 办法曾经在后面剖析过了,这里不过多叙述。

6.2 shutdownNow()

public List<Runnable> shutdownNow() {
  List<Runnable> tasks;
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    // 校验敞开线程池权限
    checkShutdownAccess();
    // 批改线程池状态为 STOP
    advanceRunState(STOP);
    // 中断所有线程
    interruptWorkers();
    // 获取队列中所有正在期待解决的工作列表
    tasks = drainQueue();} finally {mainLock.unlock();
  }
  // 尝试敞开线程池
  tryTerminate();
  // 返回工作列表
  return tasks;
}

该办法与 shutdown() 比拟类似,都将外围步骤封装在了几个办法中,其中 checkShutdownAccess() 和 advanceRunState() 雷同。上面对不同的办法进行阐明

interruptWorkers()

private void interruptWorkers() {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    // 遍历所有的 Worker,只有 Worker 启动了就将其中断
    for (Worker w : workers)
      w.interruptIfStarted();} finally {mainLock.unlock();
  }
}
void interruptIfStarted() {
  Thread t;
  // state >= 0 示意 worker 曾经启动,Worker 启动并且持有线程不为 null 并且持有线程未被标记中断,则中断该线程
  if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
    try {t.interrupt();
    } catch (SecurityException ignore) {}}
}

该办法并没有尝试去获取 Worker 的锁,而是间接中断线程。因为 STOP 状态下的线程池不容许解决工作队列中正在期待的工作。

drainQueue()

// 将工作队列中的工作增加进列表中返回,通常状况下应用 drainTo() 就行了,但如果队列是提早队列或是其余无奈通过 drainTo()办法转移工作时,再通过循环遍历进行转移
private List<Runnable> drainQueue() {...}

七、Worker 继承 AQS 的起因

首先说论断——Worker 继承 AQS 是为了应用其中状态治理的性能,并没有像 ReentrantLock 应用 AQS 中 CLH 的性质。

咱们先来看看 Worker 中与 AQS 相干的办法:

// 参数为 unused,从命名也能够晓得该参数未被应用
protected boolean tryAcquire(int unused) {
  // 通过 CAS 扭转将状态由 0 扭转为 1
  if (compareAndSetState(0, 1)) {
    // 设置以后线程独占
    setExclusiveOwnerThread(Thread.currentThread());
    return true;
  }
  return false;
}
// 该办法只在 runWorker() 中被应用
public void lock()        { acquire(1); }
public boolean tryLock()  { return tryAcquire(1); }

Worker 中的 tryAcquire 只是将状态改为 1,而参数未被应用,因而咱们能够判定,Worker 中的状态可能取值为(0, 1)。这里没有思考初始化状态 - 1 是避免出现混同。

再看 lock() 办法,lock() 办法被调用的惟一地位就是在 runWorker() 中启动 worker 线程前。而 runWorker() 是通过 Worker 中的 run() 调用的。Worker 作为工作只被传递给自身持有的工作线程中,因而 Worker 中的 run() 办法只能被自身持有的工作线程通过 start() 调用,因而 runWorker() 只会被 Worker 自身持有的工作线程所调用,lock() 办法也只会被单线程调用,不存在多个线程竞争同一把锁的状况,也就不存在多线程环境下,只有一个线程能取得锁导致其余期待线程被增加进 CLH 队列的状况。所以 Worker 并没没有应用 CLH 的性能。

这也就很好阐明了 tryAcquire() 办法并没有应用传递的参数,因为 Worker 只存在两种状态,要么被上锁(非闲暇,state=1),要么未被上锁(闲暇,state=0)。无需通过传递参数设置其余的状态。

final void runWorker(Worker w) {
  ...
  try {while (task != null || (task = getTask()) != null) {
      // 惟一被调用的中央
      w.lock();
      ...
    }
  }
}

以上剖析阐明了 Worker 没有应用 AQS 的 CLH 性能。那么 Worker 是如何应用状态治理的性能的呢?

在敞开线程池的 shutdown() 办法中,有一个步骤是中断所有的闲暇 Worker 线程。而在中断所有 Worker 线程前会判断 Worker 线程是否能被获取到锁,通过 tryLock() -> tryAcquire() 判断 Worker 的状态是否为 0,只有可能获取到锁的 Worker 才会被中断,而能被获取到锁的 Worker 即为闲暇 Worker(state=0)。而不能被获取到锁的 Worker 表名曾经执行过 lock() 办法了,此时 Worker 在 While 循环不断获取阻塞队列的工作执行,在 shutdown()办法中不能被中断。

private void interruptIdleWorkers(boolean onlyOne) {
    ...
  try {for (Worker w : workers) {
      Thread t = w.thread;
      if (!t.isInterrupted() && w.tryLock()) {...}
    }
  }
}

因而 Worker 的状态治理其实是通过 state 的值(0 或 1)判断 Worker 是否为闲暇的,如果是闲暇的,则能够在线程池敞开时被中断掉,否则得始终在 while 循环中获取阻塞队列中的工作并执行,直至队列中工作为空后才被开释。如下图所示:

八、回绝策略

本章只探讨 ThreadPoolExecutor 内置的四个回绝策略 handler。

8.1 CallerRunsPolicy

public static class CallerRunsPolicy implements RejectedExecutionHandler {public CallerRunsPolicy() { }
  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    // 如果线程池未被敞开,间接在以后线程中执行工作
    if (!e.isShutdown()) {r.run();
    }
  }
}

间接在调用线程中执行被回绝的工作。只有线程池为 RUNNING 状态,工作仍被执行。如果为非 RUNNING 状态,工作将间接被疏忽,这也合乎线程池状态的行为。

8.2 AbortPolicy

public static class AbortPolicy implements RejectedExecutionHandler {public AbortPolicy() { }
  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    // 抛出回绝异样
    throw new RejectedExecutionException("Task" + r.toString() +
                                         "rejected from" +
                                         e.toString());
  }
}

工作被回绝后间接抛出回绝异样。

8.3 DiscardPolicy

public static class DiscardPolicy implements RejectedExecutionHandler {public DiscardPolicy() { }
    // 空办法,什么都不执行
  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}

摈弃该工作。回绝办法为空,示意什么都不执行,等同于将工作摈弃。

8.4 DiscardOldestPolicy

public static class DiscardOldestPolicy implements RejectedExecutionHandler {public DiscardOldestPolicy() { }
  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {
      // 从阻塞队列中获取(移除)队头的工作,e.getQueue().poll();
      // 再次尝试 execute 当前任务
      e.execute(r);
    }
  }
}

移除阻塞队列中最早进入队列中(队头)的工作,而后再次尝试执行 execute()办法,将当前任务入队。这是典型的喜新厌旧的策略。

九、ThreadPoolExecutor 二次开发实际

介绍完了 ThreadPoolExecutor 的外围原理,咱们来看看 vivo 自研的 NexTask 并发框架是如何玩转线程池并晋升业务人员的开发速度和代码执行速度。

NexTask 对业务罕用模式、算法、场景进行抽象化,以组件的模式落地。它提供了一个疾速、轻量级、简略易用并且屏蔽了底层技术细节的形式,可能让开发人员疾速编写并发程序,更大程度上为开发赋能。

首先给出 NexTask 架构图,而后咱们针对架构图中应用到了 ThreadPoolExecutor 的中央进行详细分析。

// Executor 局部代码:public class Executor {
  ...
    private static DefaultTaskProcessFactory taskProcessFactory =
    new DefaultTaskProcessFactory();
  // 对外提供的 API,用户疾速创立工作处理器
  public static TaskProcess getCommonTaskProcess(String name) {return TaskProcessManager.getTaskProcess(name, taskProcessFactory);
    }
  public static TaskProcess getTransactionalTaskProcess(String name) {return TaskProcessManager.getTaskProcessTransactional(name, taskProcessFactory);
    }
  ...
}

Executor 是对外提供的接口,开发人员能够应用它具备的简略易用的 API,疾速通过工作管理器 TaskProcessManager 创立工作处理器 TaskProcess。

// TaskProcessManager 局部代码:public class TaskProcessManager {
  // 缓存 map,< 业务名称, 针对该业务的工作处理器 >
  private static Map<String, TaskProcess> taskProcessContainer =
            new ConcurrentHashMap<String, TaskProcess>();
  ...
}

TaskProcessManager 持有一个 ConcurrentHashMap 本地缓存有所的工作处理器,每个工作处理器与特定的业务名称一一映射。在获取工作处理器时,通过具体的业务名称从缓存中获取,不仅可能保障各个业务间的工作解决互相隔离,同时可能避免屡次创立、销毁线程池造成的资源损耗。

// TaskProcess 局部代码:public class TaskProcess {
  // 线程池
  private ExecutorService executor;
  // 线程池初始化
  private void createThreadPool() {
        executor = new ThreadPoolExecutor(coreSize, poolSize, 60, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(2048), new DefaultThreadFactory(domain),
                new ThreadPoolExecutor.AbortPolicy());
    }
  // 多线程提交工作进行解决
  public <T> List<T> executeTask(List<TaskAction<T>> tasks) {int size = tasks.size();
    // 创立一个与工作数雷同的 CountDownLatch,保障所有工作全副解决完后一起返回后果
    final CountDownLatch latch = new CountDownLatch(size);
    // 返回后果初始化
    List<Future<T>> futures = new ArrayList<Future<T>>(size);
    List<T> resultList = new ArrayList<T>(size);
    //  遍历所有工作,提交到线程池
    for (final TaskAction<T> runnable : tasks) {Future<T> future = executor.submit(new Callable<T>() {
            @Override
            public T call() throws Exception {
          // 解决具体的工作逻辑
                try {return runnable.doInAction(); }
          // 解决实现后,CountDownLatch - 1
          finally {latch.countDown(); }
                }
            });
            futures.add(future);
        }
        try {
      // 期待所有工作解决实现
            latch.await(50, TimeUnit.SECONDS);
        } catch (Exception e) {log.info("Executing Task is interrupt.");
        }
    // 封装后果并返回
        for (Future<T> future : futures) {
            try {T result = future.get();// wait
                if (result != null) {resultList.add(result);
                }
            } catch (Exception e) {throw new RuntimeException(e);
            }
        }
        return resultList;
    }
  ...
}

每个 TaskProcess 都持有一个线程池,由线程池的初始化过程能够看到,TaskProcess 采纳的是有界阻塞队列,队列中最多寄存 2048 个工作,一旦超过这个数量后,将会间接回绝接管工作并抛出回绝解决异样。

TaskProcess 会遍历用户提交的工作列表,并通过 submit() 办法将其提交至线程池解决,submit() 底层其实还是调用的 ThreadPoolExecutor#execute() 办法,只不过会在调用前将工作封装成 RunnableFuture,这里就是 FutureTask 框架的内容了,就不进行开展。

TaskProcess 会在每次解决工作时,创立一个 CountDownLatch,并在工作完结后执行 CountDownLatch.countDown(),这样就能保障所有工作在执行实现阻塞以后线程,直至所有工作解决完后对立获取后果并返回。

十、总结

JDK 尽管为开发人员提供了 Executors 工具类以及内置的多种线程池,但那些线程池的应用十分局限,无奈满足日益简单的业务场景。阿里官网的编程规约中也举荐开发人员不要间接应用 JDK 自带的线程池,而是依据本身业务场景通过 ThreadPoolExecutor 进行创立线程池。因而,理解 ThreadPoolExecutor 外部原理对日常开发中纯熟应用线程池也是至关重要的。

本文次要是对 ThreadPoolExecutor 外部外围原理进行探索,介绍了其构造方法及其各个结构参数的具体意义,以及线程池外围 ctl 参数的转化办法。随后花了大量篇幅深刻 ThreadPoolExecutor 源码介绍线程池的启动与敞开流程、外围内置类 Worker 等。ThreadPoolExecutor 还有其余办法本文暂未介绍,读者能够在读完本文的根底上自行浏览其余源码,置信会有肯定帮忙。

作者:vivo 互联网服务器团队 -Xu Weiteng

退出移动版