线程池的状态

只有理解线程池的几个状态,能力读懂它的外围源码。所以先说说这几个状态

running:为线程池初始化时的默认状态,此状态会接管工作进行解决

shutdown: 该状态下的线程池不接管任何工作,但会期待正在运行的工作执行完。通常调用shutdown() 办法实现设置

stop: 该状态的线程池不接管任何工作,同时不会期待正在运行的工作执行结束。通常调用shutdownNow() 办法实现设置

tidying:该状态下的线程池内,没有任何线程和工作

terminated:该状态为线程池的终态,通常调用tryTerminate()办法实现设置

大多数状况下线程池的一个生命周期流转大略是 running -> (shutdown,stop)-> tidying -> terminated

这几个状态在ThreadPoolExecutor源码中,通过一个ctl的整型原子变量标识,高3位标识线程状态,低29位标识线程数量。翻看源码就能看到

外围源码剖析

  • execute(Runnable command)

为线程池的外围办法,调用该办法工作就会执行,间接看上面代码正文吧

  public void execute(Runnable command) {        if (command == null) throw new NullPointerException();        int c = ctl.get();//获取ctl原子变量        //如果以后线程池的线程数量小于corePoolSize,增加Worker对象。Worker对象是什么前面说        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                 return;//返回,完结            c = ctl.get();        }        // 如果以后线程池的线程数量 > corePoolSize        // 且以后线程是否处于running ,则增加工作到队列        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();           // 二次查看,以后线程不是处于running,则移除工作            if (! isRunning(recheck) && remove(command))            // 执行回绝策略                reject(command);                //线程数量等于零,那就在增加Worker对象呗            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }        // 如果工作队列满,则增加Worker对象,如果增加失败执行回绝策略        else if (!addWorker(command, false))            reject(command);    }

以上为外围源码的剖析,无非就是依据线程池状况增加Worker、工作入队、执行回绝策略。能够看看上面这个流程图,可能会更清晰

到这里,咱们能够来讲讲addWorker 了。这个办法会封装成一个Worker对象,而后运行工作。看看Worker对象的类图:

Worker实现Runnable接口、继承AbstractQueuedSynchronizer,持有一个Thread的成员变量。所以能够把Worker对象看成一个线程,同时领有AbstractQueuedSynchronizer的属性和办法,因而它可能进行加锁和开释锁的操作。

ok,逐渐跟进来看看addWorker办法外面的逻辑。

  private boolean addWorker(Runnable firstTask, boolean core) {        retry:        for (;;) {            int c = ctl.get();                        //以后线程池状态            int rs = runStateOf(c);            // 如果以后线程池状态不非法就不让增加            if (rs >= SHUTDOWN &&                ! (rs == SHUTDOWN &&                   firstTask == null &&                   ! workQueue.isEmpty()))                return false;                                    for (;;) {               //获取以后线程数量                int wc = workerCountOf(c);                // 如果wc 大于ctl所能示意的最大线程数或者大于最大线程数则不让增加                if (wc >= CAPACITY ||                    wc >= (core ? corePoolSize : maximumPoolSize))                    return false;                // 通过CAS操作,减少线程池中的Worker数。如果增加胜利完结双层循环                if (compareAndIncrementWorkerCount(c))                    break retry;                //如果CAS操作失败,内层循环继续执行                   c = ctl.get();  // Re-read ctl                if (runStateOf(c) != rs)                    continue retry;            }        }        boolean workerStarted = false;        boolean workerAdded = false;        Worker w = null;        try {           //创立Worker对象,传入工作            w = new Worker(firstTask);            // 获取Worker对象的线程变量            final Thread t = w.thread;            if (t != null) {                final ReentrantLock mainLock = this.mainLock;                //加mainLock锁,防并发                mainLock.lock();                try {                    //以后线程池状态                    int rs = runStateOf(ctl.get());                     // 如果Worker对象的线程状态不非法,抛异样                    if (rs < SHUTDOWN ||                        (rs == SHUTDOWN && firstTask == null)) {                        if (t.isAlive())                             throw new IllegalThreadStateException();                         // 如果非法增加到workers汇合                        workers.add(w);                        int s = workers.size();                        if (s > largestPoolSize)                            largestPoolSize = s;                      // 一个变量标识,表明workers汇合是否有增加新的worker对象                              workerAdded = true;                    }                } finally {                    mainLock.unlock();                }                if (workerAdded) {                   //启动线程                    t.start();                    workerStarted = true;                }            }        } finally {            if (! workerStarted)                addWorkerFailed(w);        }        return workerStarted;    }

整体还不算简单,外围就是依据传入的工作创立一个Worker对象,而后启动Worker。

上面来看看Worker启动的逻辑,后面说过了Worker实现Runnable接口,所以启动将会触发执行run办法,而run办法最终调的是runWorker()办法。

 final void runWorker(Worker w) {        Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;        w.unlock(); // allow interrupts        boolean completedAbruptly = true;        try {           //死循环获取工作,而后执行工作。这里getTask()办法会有阻塞状况的,咱们这里晓得一下就行,上面马上讲。            while (task != null || (task = getTask()) != null) {               //获取w锁。后面说过了,Worker对象继承AbstractQueuedSynchronizer,所以自身就内置了一把锁                w.lock();                // 判断同一个时刻以后线程和线程池的状态是否非法,不非法完结呗                if ((runStateAtLeast(ctl.get(), STOP) ||                     (Thread.interrupted() &&                      runStateAtLeast(ctl.get(), STOP))) &&                    !wt.isInterrupted())                    wt.interrupt();                try {                   //工作执行前的解决逻辑                    beforeExecute(wt, task);                    Throwable thrown = null;                    try {                        task.run();                    } catch (RuntimeException x) {                        thrown = x; throw x;                    } catch (Error x) {                        thrown = x; throw x;                    } catch (Throwable x) {                        thrown = x; throw new Error(x);                    } finally {                    //工作执行后的解决逻辑                        afterExecute(task, thrown);                    }                } finally {                    task = null;                    //以后Worker实现的工作数量                    w.completedTasks++;                    //开释w锁                    w.unlock();                }            }            completedAbruptly = false;        } finally {            //解决Worker退出的逻辑            processWorkerExit(w, completedAbruptly);        }    }

整个办法的逻辑其实也不算简单,就是以后Worker一直死循环获取队列外面是否有工作。有,就加锁而后执行工作。无,就阻塞期待获取工作。那什么状况下才会跳出整个死循环,执行processWorkerExit呢?这里就须要看下getTask() 办法逻辑了。

    private Runnable getTask() {        boolean timedOut = false; // Did the last poll() time out?        for (;;) {            int c = ctl.get();            int rs = runStateOf(c);            // 判断线程池状态和工作队列的状况,不满足条件间接返回 null,完结。            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {                decrementWorkerCount();                return null;            }            int wc = workerCountOf(c);                        // 超时工夫的标识,[是否设置了外围线程数的超时工夫 或者 以后线程数量是否大于外围线程数 ],    //因为咱们晓得线程池运行的线程数量如果大于外围线程数,多进去的那局部线程是须要被回收的。            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;            if ((wc > maximumPoolSize || (timed && timedOut))                && (wc > 1 || workQueue.isEmpty())) {                if (compareAndDecrementWorkerCount(c))                    return null;                continue;            }            try {            // 如果timed为false,则始终阻塞期待,直到获取到元素,而后返回            // 如果timed为true,则始终阻塞期待keepAliveTime超时后返回,            //到这里其实就晓得如何完结runWorker办法的那个死循环了,也就意味着Worker它的线程生命周期完结了。                Runnable r = timed ?                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                    workQueue.take();                if (r != null)                    return r;                timedOut = true;            } catch (InterruptedException retry) {                timedOut = false;            }        }    }

最初,来看下processWorkerExit() 办法解决了哪些逻辑

    private void processWorkerExit(Worker w, boolean completedAbruptly) {        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted            decrementWorkerCount();        final ReentrantLock mainLock = this.mainLock;        //获取mainLock锁        mainLock.lock();        try {        //增加工作数量,而后移除worker            completedTaskCount += w.completedTasks;            workers.remove(w);        } finally {        // 开释mainLock锁            mainLock.unlock();        }        //尝试将线程池状态设置为 terminate        tryTerminate();                //次要判断以后线程池的线程数是否小于corePoolSize,如果小于持续增加Worker对象        int c = ctl.get();        if (runStateLessThan(c, STOP)) {            if (!completedAbruptly) {                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;                if (min == 0 && ! workQueue.isEmpty())                    min = 1;                if (workerCountOf(c) >= min)                    return; // replacement not needed            }            addWorker(null, false);        }    }

这个办法次要就是移除Worker对象,而后尝试将线程池的状态更改为terminate。这里须要讲一下tryTerminate办法逻辑,因为它和线程池awaitTermination()办法有肯定的关联,来看看它的代码。

    final void tryTerminate() {        for (;;) {            int c = ctl.get();            //判断线程池状态,还在运行或者曾经是 terminate的状态间接完结了            if (isRunning(c) ||                runStateAtLeast(c, TIDYING) ||                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))                return;             // 就是中断闲暇的Worker,前面讲shutDown办法的时候聊            if (workerCountOf(c) != 0) {                 interruptIdleWorkers(ONLY_ONE);                return;            }                        final ReentrantLock mainLock = this.mainLock;            //获取mainLock锁            mainLock.lock();            try {            //线程池设置成TIDYING状态                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {                    try {                    //钩子办法,线程池终止时执行的逻辑                        terminated();                    } finally {                        ctl.set(ctlOf(TERMINATED, 0));                    // termination为mainLock锁的condition实例,这个是来实现线程之间的通信。                    //其实这里是来唤醒awaitTermination()办法,前面剖析awaitTermination源码会看到。                        termination.signalAll();                    }                    return;                }            } finally {            // 开释锁                mainLock.unlock();            }                   }    }

到这里,线程池execute办法大抵的逻辑就完了。能够再看看时序图,理清下几个办法和类之间的调用。

  • shutdown()

中断线程池的线程,会期待正在执行的线程完结执行,来看看源码它是怎么实现的

    public void shutdown() {        final ReentrantLock mainLock = this.mainLock;        //获取mainLock锁,避免其余线程执行        mainLock.lock();        try {        //查看权限,确保用户线程有敞开线程池的权限            checkShutdownAccess();        //通过CAS将线程池状态设置成 SHUTDOWN            advanceRunState(SHUTDOWN);          //中断所有闲暇的Workers , 上面剖析这个办法            interruptIdleWorkers();          //钩子办法,让子类进行收尾的逻辑            onShutdown(); // hook for ScheduledThreadPoolExecutor        } finally {        // 开释mainLock锁            mainLock.unlock();        }        //execute办法,咱们剖析过了,次要就是尝试将线程池的状态设置为terminate        tryTerminate();    }

该办法咱们比拟关注的点是 interruptIdleWorkers办法,是怎么中断闲暇Worker,而后是如何保障Worker执行结束的?看看代码就晓得了

    private void interruptIdleWorkers(boolean onlyOne) {        final ReentrantLock mainLock = this.mainLock;        //获取mainLock锁        mainLock.lock();        try {        //轮询workers逐个中断            for (Worker w : workers) {                Thread t = w.thread;                //判断 如果以后线程未中断且可能获取w锁,则执行中断                // 如果以后线程未中断但不能获取w锁,那么就会阻塞,直到获取锁为止。                //这里的w锁,就是后面在剖析execute时,有个死循环不断取工作,取到工作就会获取w锁。                //所以这边如果获取不到w锁,就证实还有工作没有执行完。                if (!t.isInterrupted() && w.tryLock()) {                    try {                    //中断线程                        t.interrupt();                    } catch (SecurityException ignore) {                    } finally {                        w.unlock();                    }                }                if (onlyOne)                    break;            }        } finally {            mainLock.unlock();        }    }

到这里,外围逻辑就是通过w这个锁来实现的。

  • shutdownNow
    public List<Runnable> shutdownNow() {        List<Runnable> tasks;        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            checkShutdownAccess();            advanceRunState(STOP);            interruptWorkers();            tasks = drainQueue();        } finally {            mainLock.unlock();        }        tryTerminate();        return tasks;    private void interruptWorkers() {        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            for (Worker w : workers)                w.interruptIfStarted();        } finally {            mainLock.unlock();        }    }    }

源码和shutdown差不多,只不过将线程池状态设置为stop,而后调用interruptWorkers 办法,看看worker办法。

    private void interruptWorkers() {        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            for (Worker w : workers)                w.interruptIfStarted();        } finally {            mainLock.unlock();        }    }

代码中并没有获取w锁的逻辑,所以这个办法会间接中断所有线程,并不会期待那些正在执行工作的worker把工作执行完。

  • awaitTermination

调用awaitTermination办法会始终阻塞期待线程池状态变为 terminated 才返回 或者期待超时返回。来看看代码就明确了

    public boolean awaitTermination(long timeout, TimeUnit unit)        throws InterruptedException {        long nanos = unit.toNanos(timeout);        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            for (;;) {            //如果曾经是terminated状态间接返回                if (runStateAtLeast(ctl.get(), TERMINATED))                    return true;                if (nanos <= 0)                    return false;                // (1)期待mainLock锁的condition实例来唤醒,不然继续阻塞。                nanos = termination.awaitNanos(nanos);            }        } finally {            mainLock.unlock();        }    }

(1)处的代码曾经通知了该办法什么时候返回,就是mainLock锁的termination条件变量被唤醒返回。在下面剖析中termination条件变量被唤醒是在执行tryTerminate()时实现的,因为外部调用termination.signalAll()。而tryTerminate() 办法被shutDown() 和shutDownNow() 调用过,所以如果要让awaitTermination 返回,调用这2个办法就行。