前言

后面在学习 JUC 源码时,很多代码举例中都应用了线程池 ThreadPoolExecutor,并且在工作中也常常用到线程池,所以当初就一步一步看看,线程池的源码,理解其背地的外围原理。

公众号:『 刘志航 』,记录工作学习中的技术、开发及源码笔记;时不时分享一些生存中的见闻感悟。欢送大佬来领导!

介绍

什么是线程池

线程池(英语:thread pool):一种线程应用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池保护着多个线程,期待着监督管理者调配可并发执行的工作。这防止了在解决短时间工作时创立与销毁线程的代价。线程池不仅可能保障内核的充分利用,还能避免过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

—— 维基百科

为什么要应用线程池

  1. 升高资源耗费:通过池化技术反复利用已创立的线程,升高线程创立和销毁造成的损耗。
  2. 进步响应速度:工作达到时,无需期待线程创立即可立刻执行。
  3. 进步线程的可管理性:线程是稀缺资源,如果无限度创立,不仅会耗费系统资源,还会因为线程的不合理散布导致资源调度失衡,升高零碎的稳定性。应用线程池能够进行对立的调配、调优和监控。

如何应用线程池

线程池应用有很多种形式,不过依照《Java 开发手册》形容,尽量还是要应用 ThreadPoolExecutor 进行创立。

代码举例:

ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,                new LinkedBlockingQueue<>(1024),                new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(),                new ThreadPoolExecutor.AbortPolicy());

那创立线程池的这些结构参数有什么含意?线程池的运行原理是什么?上面则开始通过源码及作图一步一步的理解。

源码剖析

参数介绍

public class ThreadPoolExecutor extends AbstractExecutorService {    /**    * ctx 为原子类型的变量, 有两个概念    * workerCount, 示意无效的线程数    * runState, 示意线程状态, 是否正在运行, 敞开等    */    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));    // 29    private static final int COUNT_BITS = Integer.SIZE - 3;    // 容量 2²-1    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;    // runState is stored in the high-order bits 线程池的五中状态    // 即高3位为111, 承受新工作并解决排队工作    private static final int RUNNING    = -1 << COUNT_BITS;    // 即高3位为000, 不承受新工作, 但解决排队工作    private static final int SHUTDOWN   =  0 << COUNT_BITS;    // 即高3位为001, 不承受新工作, 不解决排队工作, 并中断正在进行的工作    private static final int STOP       =  1 << COUNT_BITS;    // 即高3位为010, 所有工作都已终止, 工作线程为0, 线程转换到状态TIDYING, 将运行terminate()钩子办法    private static final int TIDYING    =  2 << COUNT_BITS;    // 即高3位为011, 标识terminate()曾经实现    private static final int TERMINATED =  3 << COUNT_BITS;    // Packing and unpacking ctl 用来计算线程的办法    private static int runStateOf(int c)     { return c & ~CAPACITY; }    private static int workerCountOf(int c)  { return c & CAPACITY; }    private static int ctlOf(int rs, int wc) { return rs | wc; }}

结构参数及含意

public ThreadPoolExecutor(int corePoolSize,                          int maximumPoolSize,                          long keepAliveTime,                          TimeUnit unit,                          BlockingQueue<Runnable> workQueue,                          ThreadFactory threadFactory,                          RejectedExecutionHandler handler) {    // 省略}

参数阐明:

  1. corePoolSize - 外围线程数,提交工作时,如果以后线程池的数量小于 corePoolSize,则创立新线程执行工作。
  2. maximumPoolSize - 最大线程数,如果阻塞队列已满,并且线程数小于 maximumPoolSize,则会创立新线程执行工作。
  3. keepAliveTime - 当线程数大于外围线程数时,且线程闲暇,keepAliveTime 工夫后会销毁线程。
  4. unit - keepAliveTime 的工夫单位。
  5. workQueue - 阻塞队列,当线程数大于外围线程数时,用来保留工作。
  6. threadFactory - 线程创立的工厂。
  7. handler - 线程饱和策略。

线程池执行流程

execute 源码

public class ThreadPoolExecutor extends AbstractExecutorService {    public void execute(Runnable command) {        // 空则抛出异样        if (command == null)            throw new NullPointerException();        // 获取以后线程池的状态        int c = ctl.get();        // 计算工作线程数 并判断是否小于外围线程数        if (workerCountOf(c) < corePoolSize) {            // addWorker提交工作, 提交胜利则完结            if (addWorker(command, true))                return;            // 提交失败再次获取以后状态            c = ctl.get();        }        // 判断线程状态, 并插入队列, 失败则移除        if (isRunning(c) && workQueue.offer(command)) {            // 再次获取状态            int recheck = ctl.get();            // 如果状态不是RUNNING, 并移除失败            if (! isRunning(recheck) && remove(command))                // 调用回绝策略                reject(command);            // 如果工作线程为0 则调用 addWorker            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }        // 提交工作失败 走回绝策略        else if (!addWorker(command, false))            reject(command);    }}

execute 办法流程和流程图画的雷同,值得注意的是:

  1. 以后线程数小于外围线程数,则会创立新线程,这里即便是外围线程数有闲暇线程也会创立新线程!
  2. 而外围线程外面的闲暇线程会一直执行阻塞队列外面的工作。
  • workQueue阻塞队列:
  1. ArrayBlockingQueue: 是一个基于数组构造的有界阻塞队列,此队列按 FIFO(先进先出) 准则对元素进行排序。
  2. LinkedBlockingQueue: 一个基于链表构造的阻塞队列,此队列按 FIFO(先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。动态工厂办法Executors.newFixedThreadPool()应用了这个队列。
  3. SynchronousQueue: 一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作。否则插入操作始终处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,动态工厂办法Executors.newCachedThreadPool应用了这个队列。
  4. PriorityBlockingQueue: 一个具备优先级的有限阻塞队列。
  • 线程工厂:
// 默认工厂ThreadFactory threadFactory = Executors.defaultThreadFactory();// google guava工具提供ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();

个别创立工厂,是为了更好的排查问题,也倡议应用工厂指定线程名字。

  • handler线程回绝策略:

当线程池达到最大线程数,并且队列满了,新的线程要采取的解决策略。

  1. AbortPolicy 回绝新工作并抛出RejectedExecutionException异样。
  2. CallerRunsPolicy 间接在调用程序的线程中运行。
  3. DiscardOldestPolicy 放弃最早的工作, 即队列最后面的工作。
  4. DiscardPolicy 抛弃,不解决。

addWorker 源码

public class ThreadPoolExecutor extends AbstractExecutorService {    /**     * 查看工作是否能够提交     *     */    private boolean addWorker(Runnable firstTask, boolean core) {        retry:        // 外层循环         for (;;) {            // 获取以后状态            int c = ctl.get();            int rs = runStateOf(c);            // 查看线程池是否敞开            if (rs >= SHUTDOWN &&                ! (rs == SHUTDOWN &&                   firstTask == null &&                   ! workQueue.isEmpty()))                return false;            // 内层循环 CAS 减少线程个数            for (;;) {                int wc = workerCountOf(c);                // 工作线程大于容量 或者大于 外围或最大线程数                if (wc >= CAPACITY ||                    wc >= (core ? corePoolSize : maximumPoolSize))                    return false;                // CAS 线程数减少, 胜利则调到外层循环                if (compareAndIncrementWorkerCount(c))                    break retry;                // 失败则再次获取线程状态                c = ctl.get();  // Re-read ctl                // 不相等则从新走外层循环                if (runStateOf(c) != rs)                    continue retry;                // 否则内层持续循环            }        }        /**         * 创立新worker 开始新线程         * 此时曾经 CAS 胜利了         */        boolean workerStarted = false;        boolean workerAdded = false;        Worker w = null;        try {            // 创立 Worker            w = new Worker(firstTask);            final Thread t = w.thread;            if (t != null) {                final ReentrantLock mainLock = this.mainLock;                // 加锁,避免多线程同时执行线程池的 execute                mainLock.lock();                try {                    // Recheck while holding lock.                    // Back out on ThreadFactory failure or if                    // shut down before lock acquired.                    int rs = runStateOf(ctl.get());                    if (rs < SHUTDOWN ||                        (rs == SHUTDOWN && firstTask == null)) {                        // 判断线程是否存活, 已存活抛出非法异样                        if (t.isAlive()) // precheck that t is startable                            throw new IllegalThreadStateException();                        // 增加工作                        workers.add(w);                        int s = workers.size();                        // 设置池最大大小, 并将 workerAdded设置为 true                        if (s > largestPoolSize)                            largestPoolSize = s;                        workerAdded = true;                    }                } finally {                    // 解锁                    mainLock.unlock();                }                // 增加胜利 开始启动线程 并将 workerStarted 设置为 true                if (workerAdded) {                    t.start();                    workerStarted = true;                }            }        } finally {            // 启动线程失败            if (! workerStarted)                addWorkerFailed(w);        }        return workerStarted;    }    /**     * 启动线程失败, 加锁     * 移除线程, 并缩小线程总数     * 转换状态     */    private void addWorkerFailed(Worker w) {        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            if (w != null)                workers.remove(w);            decrementWorkerCount();            tryTerminate();        } finally {            mainLock.unlock();        }    }}

addWorker 代码比拟长,次要分为两局部:

  1. 双重循环,应用 CAS 减少线程数。
  2. 创立工作线程 Worker ,并应用独占锁,将其增加到线程池,并启动。

总结

Q&A

Q: 线程池的原理及相干参数?

A: 主要参数为外围线程数、阻塞队列、最大线程数、回绝策略。

Q: 线程池的线程是怎么回收的?
A: 线程被创立之后,如果 task == null 或者调用 getTask 获取工作为 null,则调用 processWorkerExit 对线程执行清理工作。

清理时只是从 HashSet<Worker> workers 中移除该 Worker,之后该线程会被 JVM 主动回收。

Q: 外围线程是不是就不能够回收了?
A: 外围线程数只会减少,而又没有回收,这时候如果线程池没有工作,就会始终维持外围线程。

当然也能够通过调用 allowCoreThreadTimeOut 办法,设置是否容许回收外围线程。

结束语

通过浏览 ThreadPoolExecutor 理解线程池的根本构造和原理,至于其余的更多扩大,文章篇幅无限,就须要小伙伴们本人浏览了。

相干举荐

  • ReentrantLock 源码、画图一起看一看!
  • ReentrantReadWriteLock 的原理!
  • Spring 自调用事务生效,你是怎么解决的?