关于后端:详解-JUC-线程池中的-ThreadPoolExecutor

心愿美妙的货色可能美妙地终结,是一种低微的人情世故。

前提

很早之前就打算看一次JUC线程池ThreadPoolExecutor的源码实现,因为近段时间比较忙,始终没有工夫整顿出源码剖析的文章。之前在剖析扩大线程池实现可回调的Future时候已经提到并发巨匠Doug Lea在设计线程池ThreadPoolExecutor的提交工作的顶层接口Executor只有一个无状态的执行办法:

public interface Executor {
    void execute(Runnable command);
}

ExecutorService提供了很多扩大办法底层基本上是基于Executor#execute()办法进行扩大。本文着重剖析ThreadPoolExecutor#execute()的实现,笔者会从实现原理、源码实现等角度联合简化例子进行具体的剖析。ThreadPoolExecutor的源码从JDK8到JDK11根本没有变动,本文编写的时候应用的是JDK11。

ThreadPoolExecutor的原理

ThreadPoolExecutor外面应用到JUC同步器框架AbstractQueuedSynchronizer(俗称AQS)、大量的位操作、CAS操作。ThreadPoolExecutor提供了固定沉闷线程(外围线程)、额定的线程(线程池容量 – 外围线程数这部分额定创立的线程,上面称为非核心线程)、工作队列以及回绝策略这几个重要的性能。

JUC同步器框架

ThreadPoolExecutor外面应用到JUC同步器框架,次要用于四个方面:

  • 全局锁mainLock成员属性,是可重入锁ReentrantLock类型,次要是用于拜访工作线程Worker汇合和进行数据统计记录时候的加锁操作。
  • 条件变量terminationCondition类型,次要用于线程进行期待终结awaitTermination()办法时的带期限阻塞。
  • 工作队列workQueueBlockingQueue类型,工作队列,用于寄存待执行的工作。
  • 工作线程,外部类Worker类型,是线程池中真正的工作线程对象。

对于AQS笔者之前写过一篇相干源码剖析的文章:JUC同步器框架AbstractQueuedSynchronizer源码图文剖析。

外围线程

这里先参考ThreadPoolExecutor的实现并且进行简化,实现一个只有外围线程的线程池,要求如下:

  • 临时不思考工作执行异常情况下的解决。
  • 工作队列为无界队列。
  • 线程池容量固定为外围线程数量。
  • 临时不思考回绝策略。
public class CoreThreadPool implements Executor {
    private BlockingQueue<Runnable> workQueue;
    private static final AtomicInteger COUNTER = new AtomicInteger();
    private int coreSize;
    private int threadCount = 0;

    public CoreThreadPool(int coreSize) {
        this.coreSize = coreSize;
        this.workQueue = new LinkedBlockingQueue<>();
    }

    @Override
    public void execute(Runnable command) {
        if (++threadCount <= coreSize) {
            new Worker(command).start();
        } else {
            try {
                workQueue.put(command);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
        }
    }

    private class Worker extends Thread {
        private Runnable firstTask;

        public Worker(Runnable runnable) {
            super(String.format("Worker-%d", COUNTER.getAndIncrement()));
            this.firstTask = runnable;
        }

        @Override
        public void run() {
            Runnable task = this.firstTask;
            while (null != task || null != (task = getTask())) {
                try {
                    task.run();
                } finally {
                    task = null;
                }
            }
        }
    }

    private Runnable getTask() {
        try {
            return workQueue.take();
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
    }

    public static void main(String[] args) throws Exception {
        CoreThreadPool pool = new CoreThreadPool(5);
        IntStream.range(0, 10)
                .forEach(i -> pool.execute(() ->
                        System.out.println(String.format("Thread:%s,value:%d", Thread.currentThread().getName(), i))));
        Thread.sleep(Integer.MAX_VALUE);
    }
}

某次运行后果如下:

Thread:Worker-0,value:0  
Thread:Worker-3,value:3  
Thread:Worker-2,value:2  
Thread:Worker-1,value:1  
Thread:Worker-4,value:4  
Thread:Worker-1,value:5  
Thread:Worker-2,value:8  
Thread:Worker-4,value:7  
Thread:Worker-0,value:6  
Thread:Worker-3,value:9

设计此线程池的时候,外围线程是懒创立的,如果线程闲暇的时候则阻塞在工作队列的take()办法,其实对于ThreadPoolExecutor也是相似这样实现,只是如果应用了keepAliveTime并且容许外围线程超时(allowCoreThreadTimeOut设置为true)则会应用BlockingQueue#poll(keepAliveTime)进行轮询代替永恒阻塞。

其余附加性能

构建ThreadPoolExecutor实例的时候,须要定义maximumPoolSize(线程池最大线程数)和corePoolSize(外围线程数)。当工作队列是有界的阻塞队列,外围线程满负载,工作队列曾经满的状况下,会尝试创立额定的maximumPoolSize - corePoolSize个线程去执行新提交的工作。当ThreadPoolExecutor这里实现的两个次要附加性能是:

  • 肯定条件下会创立非核心线程去执行工作,非核心线程的回收周期(线程生命周期终结时刻)是keepAliveTime,线程生命周期终结的条件是:下一次通过工作队列获取工作的时候并且存活工夫超过keepAliveTime
  • 提供回绝策略,也就是在外围线程满负载、工作队列已满、非核心线程满负载的条件下会触发回绝策略。

源码剖析

先剖析线程池的要害属性,接着剖析其状态管制,最初重点剖析ThreadPoolExecutor#execute()办法。

要害属性

public class ThreadPoolExecutor extends AbstractExecutorService {
    // 控制变量-寄存状态和线程数
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 工作队列,必须是阻塞队列
    private final BlockingQueue<Runnable> workQueue;
    // 工作线程汇合,寄存线程池中所有的(沉闷的)工作线程,只有在持有全局锁mainLock的前提下能力拜访此汇合
    private final HashSet<Worker> workers = new HashSet<>();
    // 全局锁
    private final ReentrantLock mainLock = new ReentrantLock();
    // awaitTermination办法应用的期待条件变量
    private final Condition termination = mainLock.newCondition();
    // 记录峰值线程数
    private int largestPoolSize;
    // 记录曾经胜利执行结束的工作数
    private long completedTaskCount;
    // 线程工厂,用于创立新的线程实例
    private volatile ThreadFactory threadFactory;
    // 拒绝执行处理器,对应不同的回绝策略
    private volatile RejectedExecutionHandler handler;
    // 闲暇线程期待工作的工夫周期,单位是纳秒
    private volatile long keepAliveTime;
    // 是否容许外围线程超时,如果为true则keepAliveTime对外围线程也失效
    private volatile boolean allowCoreThreadTimeOut;
    // 外围线程数
    private volatile int corePoolSize;
    // 线程池容量
    private volatile int maximumPoolSize;
    // 省略其余代码
}

上面看参数列表最长的构造函数:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

能够自定义外围线程数、线程池容量(最大线程数)、闲暇线程期待工作周期、工作队列、线程工厂、回绝策略。上面简略剖析一下每个参数的含意和作用:

  • corePoolSize:int类型,外围线程数量。
  • maximumPoolSize:int类型,最大线程数量,也就是线程池的容量。
  • keepAliveTime:long类型,线程闲暇等待时间,也和工作线程的生命周期无关,下文会剖析。
  • unitTimeUnit类型,keepAliveTime参数的工夫单位,实际上keepAliveTime最终会转化为纳秒。
  • workQueueBlockingQueue类型,期待队列或者叫工作队列。
  • threadFactoryThreadFactory类型,线程工厂,用于创立工作线程(包含外围线程和非核心线程),默认应用Executors.defaultThreadFactory()作为内建线程工厂实例,个别自定义线程工厂能力更好地跟踪工作线程。
  • handler
  • RejectedExecutionHandler

    类型,线程池的拒绝执行处理器,更多时候称为回绝策略,回绝策略执行的机会是当阻塞队列已满、没有闲暇的线程(包含外围线程和非核心线程)并且持续提交工作。提供了4种内建的回绝策略实现:

  • AbortPolicy:间接回绝策略,也就是不会执行工作,间接抛出RejectedExecutionException,这是默认的回绝策略
  • DiscardPolicy:摈弃策略,也就是间接疏忽提交的工作(艰深来说就是空实现)。
  • DiscardOldestPolicy:摈弃最老工作策略,也就是通过poll()办法取出工作队列队头的工作摈弃,而后执行以后提交的工作。
  • CallerRunsPolicy:调用者执行策略,也就是以后调用Executor#execute()的线程间接调用工作Runnable#run()个别不心愿工作失落会选用这种策略,但从理论角度来看,原来的异步调用用意会进化为同步调用

状态管制

状态管制次要围绕原子整型成员变量ctl

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// 通过ctl值获取运行状态
private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
// 通过ctl值获取工作线程数
private static int workerCountOf(int c)  { return c & COUNT_MASK; }

// 通过运行状态和工作线程数计算ctl的值,或运算
private static int ctlOf(int rs, int wc) { return rs | wc; }

private static boolean runStateLessThan(int c, int s) {
    return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

// CAS操作线程数减少1
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}

// CAS操作线程数缩小1
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}

// 线程数间接缩小1
private void decrementWorkerCount() {
    ctl.addAndGet(-1);
}

接下来剖析一下线程池的状态变量,工作线程下限数量位的长度是COUNT_BITS,它的值是Integer.SIZE - 3,也就是正整数29:

咱们晓得,整型包装类型Integer实例的大小是4 byte,一共32 bit,也就是一共有32个位用于寄存0或者1。在ThreadPoolExecutor实现中,应用32位的整型包装类型寄存工作线程数和线程池状态。其中,低29位用于寄存工作线程数,而高3位用于寄存线程池状态,所以线程池的状态最多只能有2^3种。工作线程下限数量为2^29 – 1,超过5亿,这个数量在短时间内不必思考会超限。

接着看工作线程下限数量掩码COUNT_MASK,它的值是(1 < COUNT_BITS) - l,也就是1左移29位,再减去1,如果补全32位,它的位视图如下:

而后就是线程池的状态常量,这里只详细分析其中一个,其余类同,这里看RUNNING状态:

// -1的补码为:111-11111111111111111111111111111
// 左移29位后:111-00000000000000000000000000000
// 10进制值为:-536870912
// 高3位111的值就是示意线程池正在处于运行状态
private static final int RUNNING = -1 << COUNT_BITS;

控制变量ctl的组成就是通过线程池运行状态rs和工作线程数wc通过或运算失去的:

// rs=RUNNING值为:111-00000000000000000000000000000
// wc的值为0:000-00000000000000000000000000000
// rs | wc的后果为:111-00000000000000000000000000000
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) {
    return rs | wc;
}

那么咱们怎么从ctl中取出高3位的线程池状态?下面源码中提供的runStateOf()办法就是提取运行状态:

// 先把COUNT_MASK取反(~COUNT_MASK),
失去:111-00000000000000000000000000000
// ctl位图特点是:xxx-yyyyyyyyyyyyyyyyyyyyyyyyyyyyyy
// 两者做一次与运算即可失去高3位xxx
private static int runStateOf(int c){
    return c & ~COUNT_MASK;
}

同理,取出低29位的工作线程数量只须要把ctlCOUNT_MASK(000-11111111111111111111111111111)做一次与运算即可。

工作线程数为0的前提下,小结一下线程池的运行状态常量:

这里有一个比拟非凡的技巧,因为运行状态值寄存在高3位,所以能够间接通过十进制值(甚至能够疏忽低29位,间接用ctl进行比拟,或者应用ctl和线程池状态常量进行比拟)来比拟和判断线程池的状态:

工作线程数为0的前提下:RUNNING(-536870912) < SHUTDOWN(0) < STOP(536870912) < TIDYING(1073741824) < TERMINATED(1610612736)

上面这三个办法就是应用这种技巧:

// ctl和状态常量比拟,判断是否小于
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}

// ctl和状态常量比拟,判断是否小于或等于
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

// ctl和状态常量SHUTDOWN比拟,判断是否处于RUNNING状态
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

最初是线程池状态的跃迁图:

PS:线程池源码中有很多两头变量用了简略的单字母示意,例如c就是示意ctl、wc就是示意worker count、rs就是示意running status。

execute办法源码剖析

线程池异步执行工作的办法实现是ThreadPoolExecutor#execute(),源码如下:

// 执行命令,其中命令(上面称工作)对象是Runnable的实例
public void execute(Runnable command) {
    // 判断命令(工作)对象非空
    if (command == null)
        throw new NullPointerException();
    // 获取ctl的值
    int c = ctl.get();
    // 判断如果当前工作线程数小于外围线程数,则创立新的外围线程并且执行传入的工作
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
        // 如果创立新的外围线程胜利则间接返回
            return;
        // 这里阐明创立外围线程失败,须要更新ctl的长期变量c
        c = ctl.get();
    }
    // 走到这里阐明创立新的外围线程失败,也就是当前工作线程数大于等于corePoolSize
    // 判断线程池是否处于运行中状态,同时尝试用非阻塞办法向工作队列放入工作(放入工作失败返回false)
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 这里是向工作队列投放工作胜利,对线程池的运行中状态做二次查看
        // 如果线程池二次查看状态是非运行中状态,则从工作队列移除以后的工作调用回绝策略解决之(也就是移除后面胜利入队的工作实例)
        if (! isRunning(recheck) && remove(command))
        // 调用回绝策略解决工作 - 返回
            reject(command);
        // 走到上面的else if分支,阐明有以下的前提:
        // 0、待执行的工作曾经胜利退出工作队列
        // 1、线程池可能是RUNNING状态
        // 2、传入的工作可能从工作队列中移除失败(移除失败的惟一可能就是工作曾经被执行了)
        // 如果当前工作线程数量为0,则创立一个非核心线程并且传入的工作对象为null - 返回
        // 也就是创立的非核心线程不会马上运行,而是期待获取工作队列的工作去执行
        // 如果前工作线程数量不为0,原来应该是最初的else分支,然而能够什么也不做,因为工作曾经胜利入队列,总会有适合的机会调配其余闲暇线程去执行它
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 走到这里阐明有以下的前提:
    // 0、线程池中的工作线程总数曾经大于等于corePoolSize(简略来说就是外围线程曾经全副懒创立结束)
    // 1、线程池可能不是RUNNING状态
    // 2、线程池可能是RUNNING状态同时工作队列曾经满了
    // 如果向工作队列投放工作失败,则会尝试创立非核心线程传入工作执行
    // 创立非核心线程失败,此时须要拒绝执行工作
    else if (!addWorker(command, false))
    // 调用回绝策略解决工作 - 返回
        reject(command);
}

这里简略剖析一下整个流程:

  1. 如果当前工作线程总数小于corePoolSize,则间接创立外围线程执行工作(工作实例会传入间接用于结构工作线程实例)。
  2. 如果当前工作线程总数大于等于corePoolSize,判断线程池是否处于运行中状态,同时尝试用非阻塞办法向工作队列放入工作,这里会二次查看线程池运行状态,如果当前工作线程数量为0,则创立一个非核心线程并且传入的工作对象为null。
  3. 如果向工作队列投放工作失败(工作队列曾经满了),则会尝试创立非核心线程传入工作实例执行。
  4. 如果创立非核心线程失败,此时须要拒绝执行工作,调用回绝策略解决工作。

这里是一个纳闷点:为什么须要二次查看线程池的运行状态,当前工作线程数量为0,尝试创立一个非核心线程并且传入的工作对象为null?这个能够看API正文:

如果一个工作胜利退出工作队列,咱们仍然须要二次查看是否须要增加一个工作线程(因为所有存活的工作线程有可能在最初一次查看之后曾经终结)或者执行以后办法的时候线程池是否曾经shutdown了。所以咱们须要二次查看线程池的状态,必须时把工作从工作队列中移除或者在没有可用的工作线程的前提下新建一个工作线程。

工作提交流程从调用者的角度来看如下:

addWorker办法源码剖析

boolean addWorker(Runnable firstTask, boolean core)办法的第一的参数能够用于间接传入工作实例,第二个参数用于标识将要创立的工作线程是否外围线程。办法源码如下:

// 增加工作线程,如果返回false阐明没有新创建工作线程,如果返回true阐明创立和启动工作线程胜利
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 留神这是一个死循环 - 最外层循环
    for (int c = ctl.get();;) {
        // 这个是十分复杂的条件,这里先拆分多个与(&&)条件:
        // 1. 线程池状态至多为SHUTDOWN状态,也就是rs >= SHUTDOWN(0)
        // 2. 线程池状态至多为STOP状态,也就是rs >= STOP(1),或者传入的工作实例firstTask不为null,或者工作队列为空
        // 其实这个判断的边界是线程池状态为shutdown状态下,不会再承受新的工作,在此前提下如果状态曾经到了STOP、或者传入工作不为空、或者工作队列为空(曾经没有积压工作)都不须要增加新的线程
        if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;
        // 留神这也是一个死循环 - 二层循环
        for (;;) {
            // 这里每一轮循环都会从新获取工作线程数wc
            // 1. 如果传入的core为true,示意将要创立外围线程,通过wc和corePoolSize判断,如果wc >= corePoolSize,则返回false示意创立外围线程失败
            // 1. 如果传入的core为false,示意将要创非建外围线程,通过wc和maximumPoolSize判断,如果wc >= maximumPoolSize,则返回false示意创立非核心线程失败
            if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            // 胜利通过CAS更新工作线程数wc,则break到最外层的循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 走到这里阐明了通过CAS更新工作线程数wc失败,这个时候须要从新判断线程池的状态是否由RUNNING曾经变为SHUTDOWN
            c = ctl.get();  // Re-read ctl
            // 如果线程池状态曾经由RUNNING曾经变为SHUTDOWN,则从新跳出到外层循环继续执行
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
            // 如果线程池状态仍然是RUNNING,CAS更新工作线程数wc失败阐明有可能是并发更新导致的失败,则在内层循环重试即可
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // 标记工作线程是否启动胜利
    boolean workerStarted = false;
    // 标记工作线程是否创立胜利
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 传入工作实例firstTask创立Worker实例,Worker结构外面会通过线程工厂创立新的Thread对象,所以上面能够间接操作Thread t = w.thread
        // 这一步Worker实例曾经创立,然而没有退出工作线程汇合或者启动它持有的线程Thread实例
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 这里须要全局加锁,因为会扭转一些指标值和非线程平安的汇合
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();
                // 这里次要在加锁的前提下判断ThreadFactory创立的线程是否存活或者判断获取锁胜利之后线程池状态是否曾经更变为SHUTDOWN
                // 1. 如果线程池状态仍然为RUNNING,则只须要判断线程实例是否存活,须要增加到工作线程汇合和启动新的Worker
                // 2. 如果线程池状态小于STOP,也就是RUNNING或者SHUTDOWN状态下,同时传入的工作实例firstTask为null,则须要增加到工作线程汇合和启动新的Worker
                // 对于2,换言之,如果线程池处于SHUTDOWN状态下,同时传入的工作实例firstTask不为null,则不会增加到工作线程汇合和启动新的Worker
                // 这一步其实有可能创立了新的Worker实例然而并不启动(长期对象,没有任何强援用),这种Worker有可能胜利下一轮GC被收集的垃圾对象
                if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 把创立的工作线程实例增加到工作线程汇合
                    workers.add(w);
                    int s = workers.size();
                    // 尝试更新历史峰值工作线程数,也就是线程池峰值容量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 这里更新工作线程是否启动胜利标识为true,前面才会调用Thread#start()办法启动实在的线程实例
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 如果胜利增加工作线程,则调用Worker外部的线程实例t的Thread#start()办法启动实在的线程实例
            if (workerAdded) {
                t.start();
                // 标记线程启动胜利
                workerStarted = true;
            }
        }
    } finally {
        // 线程启动失败,须要从工作线程汇合移除对应的Worker
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

// 增加Worker失败
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 从工作线程汇合移除之
        if (w != null)
            workers.remove(w);
        // wc数量减1
        decrementWorkerCount();
        // 基于状态判断尝试终结线程池
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

笔者发现了Doug Lea大神非常喜爱简单的条件判断,而且单行简单判断不喜爱加花括号,像上面这种代码在他编写的很多类库中都比拟常见:

if (runStateAtLeast(c, SHUTDOWN)
        && (runStateAtLeast(c, STOP)
        || firstTask != null
        || workQueue.isEmpty()))
    return false;
// ....
//  代码拆分一下如下
boolean atLeastShutdown = runStateAtLeast(c, SHUTDOWN);     # rs >= SHUTDOWN(0)
boolean atLeastStop = runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty();
if (atLeastShutdown && atLeastStop){
    return false;
}

下面的剖析逻辑中须要留神一点,Worker实例创立的同时,在其构造函数中会通过ThreadFactory创立一个Java线程Thread实例,前面会加锁后二次查看是否须要把Worker实例增加到工作线程汇合workers中和是否须要启动Worker中持有的Thread实例,只有启动了Thread实例实例,Worker才真正开始运作,否则只是一个无用的长期对象。Worker自身也实现了Runnable接口,它能够看成是一个Runnable的适配器。

工作线程外部类Worker源码剖析

线程池中的每一个具体的工作线程被包装为外部类Worker实例,Worker继承于AbstractQueuedSynchronizer(AQS),实现了Runnable接口:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    // 保留ThreadFactory创立的线程实例,如果ThreadFactory创立线程失败则为null
    final Thread thread;
    // 保留传入的Runnable工作实例
    Runnable firstTask;
    // 记录每个线程实现的工作总数
    volatile long completedTasks;

    // 惟一的构造函数,传入工作实例firstTask,留神能够为null
    Worker(Runnable firstTask) {
        // 禁止线程中断,直到runWorker()办法执行
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 通过ThreadFactory创立线程实例,留神一下Worker实例本身作为Runnable用于创立新的线程实例
        this.thread = getThreadFactory().newThread(this);
    }

    // 委托到内部的runWorker()办法,留神runWorker()办法是线程池的办法,而不是Worker的办法
    public void run() {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.
    //  是否持有独占锁,state值为1的时候示意持有锁,state值为0的时候示意曾经开释锁
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    // 独占模式下尝试获取资源,这里没有判断传入的变量,间接CAS判断0更新为1是否胜利,胜利则设置独占线程为以后线程
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    // 独占模式下尝试是否资源,这里没有判断传入的变量,间接把state设置为0
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    // 加锁
    public void lock()        { acquire(1); }

    // 尝试加锁
    public boolean tryLock()  { return tryAcquire(1); }

    // 解锁
    public void unlock()      { release(1); }

    // 是否锁定
    public boolean isLocked() { return isHeldExclusively(); }

    // 启动后进行线程中断,留神这里会判断线程实例的中断标记位是否为false,只有中断标记位为false才会中断
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

Worker的构造函数外面的逻辑非常重要,通过ThreadFactory创立的Thread实例同时传入Worker实例,因为Worker自身实现了Runnable,所以能够作为工作提交到线程中执行。只有Worker持有的线程实例w调用Thread#start()办法就能在适合机会执行Worker#run()。简化一下逻辑如下:

// addWorker()办法中结构
Worker worker = createWorker();
// 通过线程池结构时候传入
ThreadFactory threadFactory = getThreadFactory();
// Worker构造函数中
Thread thread = threadFactory.newThread(worker);
// addWorker()办法中启动
thread.start();

Worker继承自AQS,这里应用了AQS的独占模式,有个技巧是结构Worker的时候,把AQS的资源(状态)通过setState(-1)设置为-1,这是因为Worker实例刚创立时AQSstate的默认值为0,此时线程尚未启动,不能在这个时候进行线程中断,见Worker#interruptIfStarted()办法。Worker中两个笼罩AQS的办法tryAcquire()tryRelease()都没有判断内部传入的变量,前者间接CAS(0,1),后者间接setState(0)。接着看外围办法ThreadPoolExecutor#runWorker()

final void runWorker(Worker w) {
    // 获取以后线程,实际上和Worker持有的线程实例是雷同的
    Thread wt = Thread.currentThread();
    // 获取Worker中持有的初始化时传入的工作对象,这里留神寄存在长期变量task中
    Runnable task = w.firstTask;
    // 设置Worker中持有的初始化时传入的工作对象为null
    w.firstTask = null;
    // 因为Worker初始化时AQS中state设置为-1,这里要先做一次解锁把state更新为0,容许线程中断
    w.unlock(); // allow interrupts
    // 记录线程是否因为用户异样终结,默认是true
    boolean completedAbruptly = true;
    try {
        // 初始化工作对象不为null,或者从工作队列获取工作不为空(从工作队列获取到的工作会更新到长期变量task中)
        // getTask()因为应用了阻塞队列,这个while循环如果命中后半段会处于阻塞或者超时阻塞状态,getTask()返回为null会导致线程跳出死循环使线程终结
        while (task != null || (task = getTask()) != null) {
            // Worker加锁,实质是AQS获取资源并且尝试CAS更新state由0更变为1
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 如果线程池正在进行(也就是由RUNNING或者SHUTDOWN状态向STOP状态变更),那么要确保当前工作线程是中断状态
            // 否则,要保障以后线程不是中断状态
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                            runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                wt.interrupt();
            try {
                // 钩子办法,工作执行前
                beforeExecute(wt, task);
                try {
                    task.run();
                    // 钩子办法,工作执行后 - 失常状况
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    // 钩子办法,工作执行后 - 异常情况
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                // 清空task长期变量,这个很重要,否则while会死循环执行同一个task
                task = null;
                // 累加Worker实现的工作数
                w.completedTasks++;
                // Worker解锁,实质是AQS开释资源,设置state为0
                w.unlock();
            }
        }
        // 走到这里阐明某一次getTask()返回为null,线程失常退出
        completedAbruptly = false;
    } finally {
        // 解决线程退出,completedAbruptly为true阐明因为用户异样导致线程非正常退出
        processWorkerExit(w, completedAbruptly);
    }
}

这里重点拆解剖析一下判断当前工作线程中断状态的代码:

if ((runStateAtLeast(ctl.get(), STOP) ||
        (Thread.interrupted() &&
                runStateAtLeast(ctl.get(), STOP))) &&
        !wt.isInterrupted())
    wt.interrupt();
// 先简化一下判断逻辑,如下
// 判断线程池状态是否至多为STOP,rs >= STOP(1)
boolean atLeastStop = runStateAtLeast(ctl.get(), STOP);
// 判断线程池状态是否至多为STOP,同时判断以后线程的中断状态并且清空以后线程的中断状态
boolean interruptedAndAtLeastStop = Thread.interrupted() && runStateAtLeast(ctl.get(), STOP);
if (atLeastStop || interruptedAndAtLeastStop && !wt.isInterrupted()){
    wt.interrupt();
}

Thread.interrupted()办法获取线程的中断状态同时会清空该中断状态,这里之所以会调用这个办法是因为在执行下面这个if逻辑同时内部有可能调用shutdownNow()办法,shutdownNow()办法中也存在中断所有Worker线程的逻辑,然而因为shutdownNow()办法中会遍历所有Worker做线程中断,有可能无奈及时在工作提交到Worker执行之前进行中断,所以这个中断逻辑会在Worker外部执行,就是if代码块的逻辑。这里还要留神的是:STOP状态下会回绝所有新提交的工作,不会再执行工作队列中的工作,同时会中断所有Worker线程。也就是,即便工作Runnable曾经runWorker()中前半段逻辑取出,只有还没走到调用其Runnable#run(),都有可能被中断。假如刚好产生了进入if代码块的逻辑同时内部调用了shutdownNow()办法,那么if逻辑内会判断线程中断状态并且重置,那么shutdownNow()办法中调用的interruptWorkers()就不会因为中断状态判断呈现问题导致二次中断线程(会导致异样)。

小结一下下面runWorker()办法的外围流程:

  1. Worker先执行一次解锁操作,用于解除不可中断状态。
  2. 通过while循环调用getTask()办法从工作队列中获取工作(当然,首轮循环也有可能是内部传入的firstTask工作实例)。
  3. 如果线程池更变为STOP状态,则须要确保工作线程是中断状态并且进行中断解决,否则要保障工作线程必须不是中断状态。
  4. 执行工作实例Runnale#run()办法,工作实例执行之前和之后(包含失常执行结束和异样执行状况)别离会调用钩子办法beforeExecute()afterExecute()
  5. while循环跳出意味着runWorker()办法完结和工作线程生命周期完结(Worker#run()生命周期完结),会调用processWorkerExit()解决工作线程退出的后续工作。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理