关于java:ThreadPoolExecutor源码分析

38次阅读

共计 8511 个字符,预计需要花费 22 分钟才能阅读完成。

线程池的状态

只有理解线程池的几个状态,能力读懂它的外围源码。所以先说说这几个状态

running:为线程池初始化时的默认状态,此状态会接管工作进行解决

shutdown: 该状态下的线程池不接管任何工作,但会期待正在运行的工作执行完。通常调用 shutdown() 办法实现设置

stop: 该状态的线程池不接管任何工作,同时不会期待正在运行的工作执行结束。通常调用 shutdownNow() 办法实现设置

tidying: 该状态下的线程池内,没有任何线程和工作

terminated: 该状态为线程池的终态,通常调用 tryTerminate() 办法实现设置

大多数状况下线程池的一个生命周期流转大略是 running -> (shutdown,stop)-> tidying -> terminated

这几个状态在 ThreadPoolExecutor 源码中,通过一个 ctl 的整型原子变量标识,高 3 位标识线程状态,低 29 位标识线程数量。翻看源码就能看到

外围源码剖析

  • execute(Runnable command)

为线程池的外围办法, 调用该办法工作就会执行,间接看上面代码正文吧

  public void execute(Runnable command) {if (command == null) throw new NullPointerException();
        int c = ctl.get();// 获取 ctl 原子变量

        // 如果以后线程池的线程数量小于 corePoolSize,增加 Worker 对象。Worker 对象是什么前面说
        if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) 
                return;// 返回,完结
            c = ctl.get();}

        // 如果以后线程池的线程数量 > corePoolSize
        // 且以后线程是否处于 running,则增加工作到队列
        if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
           // 二次查看,以后线程不是处于 running,则移除工作
            if (! isRunning(recheck) && remove(command))
            // 执行回绝策略
                reject(command);
                // 线程数量等于零,那就在增加 Worker 对象呗
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

        // 如果工作队列满,则增加 Worker 对象,如果增加失败执行回绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

以上为外围源码的剖析,无非就是依据线程池状况增加 Worker、工作入队、执行回绝策略。能够看看上面这个流程图,可能会更清晰

到这里,咱们能够来讲讲 addWorker 了。这个办法会封装成一个 Worker 对象,而后运行工作。看看 Worker 对象的类图:

Worker 实现 Runnable 接口、继承 AbstractQueuedSynchronizer,持有一个 Thread 的成员变量。所以能够把 Worker 对象看成一个线程,同时领有 AbstractQueuedSynchronizer 的属性和办法,因而它可能进行加锁和开释锁的操作。

ok,逐渐跟进来看看 addWorker 办法外面的逻辑。

  private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {int c = ctl.get();
            
            // 以后线程池状态
            int rs = runStateOf(c);

            // 如果以后线程池状态不非法就不让增加
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            
            
            for (;;) {
               // 获取以后线程数量
                int wc = workerCountOf(c);
                // 如果 wc 大于 ctl 所能示意的最大线程数或者大于最大线程数则不让增加
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 通过 CAS 操作,减少线程池中的 Worker 数。如果增加胜利完结双层循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 如果 CAS 操作失败,内层循环继续执行   
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
           // 创立 Worker 对象,传入工作
            w = new Worker(firstTask);
            // 获取 Worker 对象的线程变量
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 加 mainLock 锁,防并发
                mainLock.lock();
                try {
                    // 以后线程池状态
                    int rs = runStateOf(ctl.get());
                     // 如果 Worker 对象的线程状态不非法,抛异样
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                         // 如果非法增加到 workers 汇合
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                      // 一个变量标识,表明 workers 汇合是否有增加新的 worker 对象      
                        workerAdded = true;
                    }
                } finally {mainLock.unlock();
                }
                if (workerAdded) {
                   // 启动线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

整体还不算简单,外围就是依据传入的工作创立一个 Worker 对象,而后启动 Worker。

上面来看看 Worker 启动的逻辑,后面说过了 Worker 实现 Runnable 接口,所以启动将会触发执行 run 办法,而 run 办法最终调的是 runWorker() 办法。

 final void runWorker(Worker w) {Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {// 死循环获取工作,而后执行工作。这里 getTask() 办法会有阻塞状况的,咱们这里晓得一下就行,上面马上讲。while (task != null || (task = getTask()) != null) {
               // 获取 w 锁。后面说过了,Worker 对象继承 AbstractQueuedSynchronizer, 所以自身就内置了一把锁
                w.lock();
                // 判断同一个时刻以后线程和线程池的状态是否非法,不非法完结呗
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                   // 工作执行前的解决逻辑
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {task.run();
                    } catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
                    } finally {
                    // 工作执行后的解决逻辑
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    // 以后 Worker 实现的工作数量
                    w.completedTasks++;
                    // 开释 w 锁
                    w.unlock();}
            }
            completedAbruptly = false;
        } finally {
            // 解决 Worker 退出的逻辑
            processWorkerExit(w, completedAbruptly);
        }
    }

整个办法的逻辑其实也不算简单,就是以后 Worker 一直死循环获取队列外面是否有工作。有,就加锁而后执行工作。无,就阻塞期待获取工作。那什么状况下才会跳出整个死循环,执行 processWorkerExit 呢?这里就须要看下 getTask() 办法逻辑了。

    private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?
        for (;;) {int c = ctl.get();
            int rs = runStateOf(c);

            // 判断线程池状态和工作队列的状况,不满足条件间接返回 null,完结。if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);
            
            // 超时工夫的标识,[是否设置了外围线程数的超时工夫 或者 以后线程数量是否大于外围线程数],// 因为咱们晓得线程池运行的线程数量如果大于外围线程数,多进去的那局部线程是须要被回收的。boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
            // 如果 timed 为 false,则始终阻塞期待,直到获取到元素,而后返回
            // 如果 timed 为 true,则始终阻塞期待 keepAliveTime 超时后返回,// 到这里其实就晓得如何完结 runWorker 办法的那个死循环了,也就意味着 Worker 它的线程生命周期完结了。Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {timedOut = false;}
        }
    }

最初,来看下 processWorkerExit() 办法解决了哪些逻辑

    private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        // 获取 mainLock 锁
        mainLock.lock();
        try {
        // 增加工作数量,而后移除 worker
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
        // 开释 mainLock 锁
            mainLock.unlock();}
        // 尝试将线程池状态设置为 terminate
        tryTerminate();
        
        // 次要判断以后线程池的线程数是否小于 corePoolSize,如果小于持续增加 Worker 对象
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

这个办法次要就是移除 Worker 对象,而后尝试将线程池的状态更改为 terminate。这里须要讲一下 tryTerminate 办法逻辑,因为它和线程池 awaitTermination() 办法有肯定的关联,来看看它的代码。

    final void tryTerminate() {for (;;) {int c = ctl.get();
            // 判断线程池状态,还在运行或者曾经是 terminate 的状态间接完结了
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
             // 就是中断闲暇的 Worker,前面讲 shutDown 办法的时候聊
            if (workerCountOf(c) != 0) {interruptIdleWorkers(ONLY_ONE);
                return;
            }
            

            final ReentrantLock mainLock = this.mainLock;
            // 获取 mainLock 锁
            mainLock.lock();
            try {
            // 线程池设置成 TIDYING 状态
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                    // 钩子办法,线程池终止时执行的逻辑
                        terminated();} finally {ctl.set(ctlOf(TERMINATED, 0));
                    // termination 为 mainLock 锁的 condition 实例,这个是来实现线程之间的通信。// 其实这里是来唤醒 awaitTermination() 办法,前面剖析 awaitTermination 源码会看到。termination.signalAll();}
                    return;
                }
            } finally {
            // 开释锁
                mainLock.unlock();}
           
        }
    }

到这里,线程池 execute 办法大抵的逻辑就完了。能够再看看时序图,理清下几个办法和类之间的调用。

  • shutdown()

中断线程池的线程,会期待正在执行的线程完结执行,来看看源码它是怎么实现的

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        // 获取 mainLock 锁,避免其余线程执行
        mainLock.lock();
        try {
        // 查看权限,确保用户线程有敞开线程池的权限
            checkShutdownAccess();
        // 通过 CAS 将线程池状态设置成 SHUTDOWN
            advanceRunState(SHUTDOWN);
          // 中断所有闲暇的 Workers , 上面剖析这个办法
            interruptIdleWorkers();
          // 钩子办法,让子类进行收尾的逻辑
            onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {
        // 开释 mainLock 锁
            mainLock.unlock();}
        //execute 办法,咱们剖析过了,次要就是尝试将线程池的状态设置为 terminate
        tryTerminate();}

该办法咱们比拟关注的点是 interruptIdleWorkers 办法,是怎么中断闲暇 Worker,而后是如何保障 Worker 执行结束的?看看代码就晓得了

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        // 获取 mainLock 锁
        mainLock.lock();
        try {
        // 轮询 workers 逐个中断
            for (Worker w : workers) {
                Thread t = w.thread;
                // 判断 如果以后线程未中断且可能获取 w 锁,则执行中断
                // 如果以后线程未中断但不能获取 w 锁,那么就会阻塞,直到获取锁为止。// 这里的 w 锁,就是后面在剖析 execute 时,有个死循环不断取工作,取到工作就会获取 w 锁。// 所以这边如果获取不到 w 锁,就证实还有工作没有执行完。if (!t.isInterrupted() && w.tryLock()) {
                    try {
                    // 中断线程
                        t.interrupt();} catch (SecurityException ignore) { } finally {w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {mainLock.unlock();
        }
    }

到这里,外围逻辑就是通过 w 这个锁来实现的。

  • shutdownNow
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();} finally {mainLock.unlock();
        }
        tryTerminate();
        return tasks;    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {for (Worker w : workers)
                w.interruptIfStarted();} finally {mainLock.unlock();
        }
    }
    }

源码和 shutdown 差不多,只不过将线程池状态设置为 stop,而后调用 interruptWorkers 办法,看看 worker 办法。

    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {for (Worker w : workers)
                w.interruptIfStarted();} finally {mainLock.unlock();
        }
    }

代码中并没有获取 w 锁的逻辑,所以这个办法会间接中断所有线程,并不会期待那些正在执行工作的 worker 把工作执行完。

  • awaitTermination

调用 awaitTermination 办法会始终阻塞期待线程池状态变为 terminated 才返回 或者期待超时返回。来看看代码就明确了

    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {for (;;) {
            // 如果曾经是 terminated 状态间接返回
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                //(1)期待 mainLock 锁的 condition 实例来唤醒,不然继续阻塞。nanos = termination.awaitNanos(nanos);
            }
        } finally {mainLock.unlock();
        }
    }

(1)处的代码曾经通知了该办法什么时候返回,就是 mainLock 锁的 termination 条件变量被唤醒返回。在下面剖析中 termination 条件变量被唤醒是在执行 tryTerminate() 时实现的,因为外部调用 termination.signalAll()。而 tryTerminate() 办法被 shutDown() 和 shutDownNow() 调用过,所以如果要让 awaitTermination 返回,调用这 2 个办法就行。

正文完
 0