乐趣区

线程池ThreadPoolExecutor-了解

本文章出处 线程池 ThreadPoolExecutor 了解
转载请说明

常用线程池类型

Java 通过 Executors 静态方法创建 4 种不同类型线程池。

  • newSingleThreadExecutor 创建单例的线程池,保证执行任务顺序,超出线程任务将会在任务中等待,所有的任务都按照 FIFO 队列顺序执行。
  • newFixedThreadPool 创建一个固定大小的线程组,指定工作线程数量,当任务超过指定工作数量时,在队列中排队等待执行。
  • newCachedThreadPool 创建一个可以缓存线程池,这个线程池活动线程是 0,最大线程 Integer.MAX, 当不断有新的任务添加到线程池中,池内线程数量不够时,可以立刻创建新的线程执行任务。当空闲的线程超过 60s 就被系统回收掉。
  • newScheduleThreadPool 创建一个定长的线程池,而且支持定时的以及周期性的任务执行,类似于 Timer。
  • newWorkStealingPool 会创建一个含有足够多线程的线程池,来维持相应的并行级别,它会通过工作窃取的方式,使得多核的 CPU 不会闲置,总会有活着的线程让 CPU 去运行。

像 newSingleThreadExecutor、newFixedThreadPool、newCachedThreadPool 都时内部封装 ThreadPoolExecutor 生成线程池的,下面具体分析 ThreadPoolExecutor 这个类。

ThreadPoolExecutor 构造函数

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • corePoolSize 线程核心线程数,不会被回收的线程。
  • maximumPoolSize 线程池能够申请最大线程数量
  • workQueue 同步性队列转载执行的任务
  • keepAliveTime 当线程数大于核心时,这是多余的空闲线程在终止之前等待新任务的最大时间。
  • threadFactory 线程工厂
  • handler 当任务数量超过队列容量时,需要处理这种情况,饱和策略,主要有 4 种处理策略

    • AbortPolicy:直接抛出异常,这是默认策略;
    • CallerRunsPolicy:使用调用者所在的线程来执行任务;
    • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    • DiscardPolicy:直接丢弃任务;

线程池疑问

创建线程池基本核心构造参数我们已经知道了,但是我们还有很多问题没有搞明白的。怎么知道线程池内每个线程运行状态,是在工作中还是空闲呢?是不是有一个专门线程去标记空闲线程活动时间?线程是如何实现共用线程。带着这些问题去阅读代码。

线程池内线程状态

以下内容都是来自 ThreadPoolExecutor 代码注释。
线程池内的线程状态都是有一个 AtomicInteger ctl 保持的,是一个原子整数,包装了两个领域含义。

  • workerCount 有效的线程数,线程总数 2 ^ 29 -1 , 线程启动数量不包括线程停止的数量,而该值可能是
    与活动线程的实际数量暂时不同。例如当 ThreadFactory 创建线程失败时,线程正在执行退出,统计线程数量依然包括退出的线程。
  • runState 线程状态

    • RUNNING 正在接受新的任务并且处理队列中的任务
    • SHUTDOWN 不接受新的任务,但是能处理任务
    • STOP 不能接受新的任务,不能处理队列中的任务,但是可以中断正在执行的任务。
    • TIDYING 所有的任务终止,workerCount 为 0,线程全部过渡到 TIDYING 状态,即将运行 terminated() 钩子方法
    • TERMINATEDterminated() 钩子方法执行完成

这些状态都有一个转换顺序

  • RUNNING -> SHUTDOWN 执行 shutdown()
  • (RUNNING or SHUTDOWN) -> STOP 执行 shutdownNow()
  • SHUTDOWN -> TIDYING 当任务队列和线程池都是空
  • STOP -> TIDYING 线程池都是空
  • TIDYING -> TERMINATED 当 terminated()钩子方法执行完
    这些状态具体代码实现
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

execute 方法解析

    public void execute(Runnable command) {if (command == null)
            throw new NullPointerException();
        /*
         * 处理 3 个步骤
         * 1. 如果正在运行的线程数量小于核心线程数,直接创建一个新的线程去执行任务
         * 调用 addWorker 方法自动检查 线程状态和数量,避免在不能添加线程时添加线程出现错误警报
         *
         * 2. 如果任务可以成功进入队列,我们仍然需要双重检查是否添加一个线程
         *   因为存在上次检查时有线程死亡或者当我们进入方法时线程池正在关闭
         *   因此,我们重新检查状态,如果停止,则回滚排队,如果没有,则启动新线程。*
         * 3. 添加任务失败,则尝试创建一个线程,如果失败了,使用拒绝策略
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { // 当前线程数量小于核心线程数
            if (addWorker(command, true)) // 创建线程
                return;
            c = ctl.get();}
        if (isRunning(c) && workQueue.offer(command)) { // 线程池状态 RUNNING 并且 任务添加成功
            int recheck = ctl.get(); // 第二重检查
            if (! isRunning(recheck) && remove(command)) // 判断线程池状态  删除任务修改状态
                reject(command);
            else if (workerCountOf(recheck) == 0)  // 线程池数量为 0
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) // 线程池状态不为 RUNNING 或者 队列已满再或者线程大于最大线程数并且任务队列满了
            reject(command);
    }
 

下一步我们进入 addWorker 创建线程的核心方法

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry: //retry 标记,第一次看到 ?
        for (int c = ctl.get();;) {
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN) // 至少 SHUTDOWN
                && (runStateAtLeast(c, STOP) // 至少 STOP  都是不合法
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;

            for (;;) { // 状态合法
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) // 大于核心线程或者最大线程都不需要创建线程,和掩码相与防止最大线程数超过 2 ^ 29 - 1 细节啊
                    return false;
                if (compareAndIncrementWorkerCount(c))  // ctl 自增成功,跳出整个循环
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateAtLeast(c, SHUTDOWN)) // 状态至少 SHUTDOWN 重新进入循环 
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {w = new Worker(firstTask); // 创建线程
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // 在加锁期间重新检查线程池状态
                    int c = ctl.get();

                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {if (t.isAlive()) // 刚创建线程已经开始执行任务,这是有问题
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {mainLock.unlock();
                }
                if (workerAdded) {t.start(); // 启动任务
                    workerStarted = true;
                }
            }
        } finally {if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

addWorker() 主要流程检查线程池状态是否合法,创建新的线程,加入 workers 中, 调用 start() 执行任务。我们去了解下 Worker 类

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        // TODO: switch to AbstractQueuedLongSynchronizer and move
        // completedTasks into the lock word.

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker. */
        public void run() {runWorker(this);
        }
}

Worker 其实就是 Runnable 包装类,但是增加了任务中断功能,他的主要任务就是维护中断状态,继承 AQS 可以简化获取和释放围绕每个任务执行的锁定,防止旨在唤醒等待任务的工作线程的中断。
了解 Worker 怎么执行任务的进入runWorker()

    final void runWorker(Worker w) {Thread wt = Thread.currentThread();
        Runnable task = w.firstTask; // 取出任务
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {while (task != null || (task = getTask()) != null) { // 如果当前 worker 没有任务,从队列中获取任务,直到队列为空
                w.lock();
                // 处理线程中断机制 
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {beforeExecute(wt, task); // 前置处理,类似拦截器机制,需要子类去实现
                    try {task.run(); // 调用任务方法
                        afterExecute(task, null); // 后置处理
                    } catch (Throwable ex) {afterExecute(task, ex); // 异常处理
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;  // 执行任务数量 + 1
                    w.unlock();}
            }
            completedAbruptly = false;
        } finally {processWorkerExit(w, completedAbruptly); // 线程生命周期走完,执行回收工作
        }
    }

结合 Worker 构造函数,Worker 在初始化就自己给自己上锁了,避免线程在任务还没有开始的情况下就被中断了。启动线程执行 runWorker 方法,取出任务,释放锁,如果 Worker 中的任务为空,从队列中拉取任务。处理线程中断,主要依据第一线程状态已经至少 STOP 状态,然后清除中断状态,在判断线程没有中断信号了,再发送中断信号。按照作者注释的意思就是当线程池已经在停止过程中,线程应该中断,但是必须双重检查防止关闭过程中竞争发送中继信号。调用 run 方法执行任务。为什么要上锁执行任务,主要是执行任务过程,必须要获取锁才能中断线程的,但是 Worker 本身不支持重入锁的,只有在任务开始关闭过程才能中断。
在这里我们终于看到线程共用方式了,通过线程不断从队列中获取任务,然后再进行调用 run 方法执行任务,当线程退出获取队列循环,线程生命周期就结束了。

geTask()

    private Runnable getTask() {
        boolean timedOut = false; // 上一次拉取是否超时

        for (;;) {int c = ctl.get();

            // 检查线程池状态是 SHUTDOWN  不接受新的任务
            // 任务队列为空
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {decrementWorkerCount(); // 核心线程数 workerCount -1
                return null;
            }

            int wc = workerCountOf(c); 

            // allowCoreThreadTimeOut  空闲情况下是否回收核心线程数 默认是 false
           // 当前线程数大于 核心线程数
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // wc 大于最大线程数,先处理线程数量
           // 线程在存活的时间内没有获取到任务,则需要回收掉,上一个循环的,线程数 -1
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) { //wc 不要为 0,任务队列为空的情况
                if (compareAndDecrementWorkerCount(c)) // 线程 - 1 成功没有其他线程竞争,没有新增任务
                    return null;
                continue;
            }

            try {
                Runnable r = timed ? 
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 超时会返回空
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true; 
            } catch (InterruptedException retry) { // 中断等待获取任务,放弃执行任务
                timedOut = false;
            }
        }
    }

这里我们知道空闲时间是怎么回收线程的,通过同步性队列 poll() + 超时时间知道一个线程在这个时间内没有任务执行,线程池处于空闲状态的,返回 null 给调用方法,跳出 while 循环,结束整个线程的生命周期。

进入 processWorkerExit()

    private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // 如果没有执行到任务,核心线程 -1
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w); // 移除当前 worker,线程会被回收掉
        } finally {mainLock.unlock();
        }

        tryTerminate(); // 判断线程池内状态,是否对线程池发出关闭信号

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) { // 线程池在 RUNNABLE 或者 SHUTDOWN 状态,线程池任然可以执行任务或者接受任务
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty()) // 线程池内线程已经被回收完了并且任务还没有执行完
                    min = 1;
                if (workerCountOf(c) >= min) // 线程池内线程数量大于核心线程池,不需要新建线程去处理
                    return; // replacement not needed
            }
            addWorker(null, false); // 创建新的线程处理任务
        }
    }

进入 tryTerminate()

在线程池 SHUTDOWN 状态线程为 0 和任务队列为空的情况,或者 STOP 状态核心队列为空情况,线程池状向 TIDYING 转移,传播关闭池信号。

    final void tryTerminate() {for (;;) {int c = ctl.get();
            if (isRunning(c) || //RUNNING 状态不需要处理
                runStateAtLeast(c, TIDYING) || // 已经进入 TIDYING,也不做处理 
                (runStateLessThan(c, STOP) && ! workQueue.isEmpty())) // 任务队列不为空,不满足条件
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE); 尝试去中断一个 worker 
                return;
            }
        
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock(); // 加锁修改线程池状态
            try {if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // 进入 TIDYING 状态
                    try {terminated();
                    } finally {ctl.set(ctlOf(TERMINATED, 0));  // 执行完 terminated() 进入 TERMINATED 状态 
                        termination.signalAll();}
                    return;
                }
            } finally {mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

shutdown()

再去了解下线程池终止方法

   public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {checkShutdownAccess();
            advanceRunState(SHUTDOWN); // 修改线程池状态为 SHUTDOWN
            interruptIdleWorkers(); // 中断线程
            onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();
        }
        tryTerminate();}

进入 interruptIdleWorkers() 怎么中断线程

    private void interruptIdleWorkers() {interruptIdleWorkers(false);
    }

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock(); // 加锁主要是 workers 是一个不安全集合
        try {for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) { // 没有中断和 能够获取到锁,说明此线程池没有在执行任务,Worker 是不支持重入的
                    try {t.interrupt(); 
                    } catch (SecurityException ignore) { } finally {w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {mainLock.unlock();
        }
    }

处理方法挺简单的,修改线程池状态不要接收新的任务,将 works 中空闲线程取出发出中断信号。

shutdownNow

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();  // 删除队列中的任务,返回给 tasks} finally {mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

shutdownNow 会将队列中还没有来得及处理任务全部删除掉,直接调用 tryTerminate()终止线程池生命周期。

总结

现在我们知道线程池内部机制是如何创建线程,共用线程,空闲回收,线程池的生命周期。调用 execute()提交任务,如果当前线程池数量小于核心线程数,调用 addWorker()创建一个新的线程池去执行任务,否则直接加入到队列中。在 addWorker()启动一个线程去不断从队列拉取任务,直到一个队列存活时间没有任务执行或者队列为空,线程才会被回收掉。

退出移动版