关于线程池:多线程学习Executor框架

前言

Executor框架提供了组件来治理Java中的线程,Executor框架将其分为工作线程执行工作工作执行后果三局部。本篇文章将对Executor框架中的组件进行学习。

参考:《Java并发编程的艺术》

注释

一. Executor的组件

前言中曾经提到,Executor框架将线程的治理分为工作线程执行工作工作执行后果三局部。上面以图表模式对这三局部进行阐明。

阐明
工作 Executor框架提供了Runnable接口和Callable接口,工作须要实现这两个接口能力被线程执行。
线程执行工作 Executor框架提供了接口Executor和继承于ExecutorExecutorService接口来定义工作执行机制。Executor框架中的线程池类ThreadPoolExecutorScheduledThreadPoolExecutor均实现了ExecutorService接口。
工作执行后果 Executor框架提供了Future接口和实现了Future接口的FutureTask类来定义工作执行后果。

组件之间的类图关系如下所示。

二. ThreadPoolExecutor的解析

ThreadPoolExecutor继承于AbstractExecutorService,并实现了ExecutorService接口,是Executor框架的外围类,用于治理线程。在多线程学习-线程池应用中曾经对ThreadPoolExecutor的原理,创立,执行和敞开进行了简略学习。在本大节将对ThreadPoolExecutor的具体实现进行学习。

ThreadPoolExecutor应用了原子整型ctl来示意线程池状态Worker数量ctl是一个原子整型,前3位示意线程池状态,后29位示意Worker数量。ThreadPoolExecutor中这部分的源码如下所示。

public class ThreadPoolExecutor extends AbstractExecutorService {

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    //取整型前3位,即获取线程池状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //取整型后29位,即获取Worker数量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    //依据线程池状态和Worker数量拼装ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    //线程池状态判断
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    //线程池状态判断
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    //判断线程池状态是否为RUNNING
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
    
    ......
    
}

ThreadPoolExecutor中规定了线程池的状态如下。

  • RUNNING,线程池承受新工作,会执行工作阻塞队列中的工作,ctl前三位示意为111
  • SHUTDOWN,线程池回绝新工作,会执行工作阻塞队列中的工作,ctl前三位示意为000
  • STOP,线程池回绝新工作,不会执行工作阻塞队列中的工作,尝试中断正在执行的工作,ctl前三位示意为001
  • TIDYING,所有工作被敞开,Worker数量为0,ctl前三位示意为010
  • TERMINATEDterminated()执行结束,ctl前三位示意为011

得益于ctl的构造,所以无论Worker数量是多少,ThreadPoolExecutor中线程池状态存在如下关系。

RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED

因而runStateLessThan()runStateAtLeast()isRunning()办法能够不便的对线程池状态进行判断。

ThreadPoolExecutor中执行工作的入口办法为execute(),其实现如下。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    //如果Worker数量小于外围线程数,则创立Worker并执行工作
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //如果Worker数量大于等于外围线程数,则将工作增加到工作阻塞队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //如果线程池状态忽然不再是RUNNING,则尝试将工作从工作阻塞队列中删除,删除胜利则为该工作执行回绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //如果线程池中Worker数量忽然为0,则创立一个Worker来执行工作
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //执行到这里示意线程池状态曾经不是RUNNING或者工作阻塞队列已满
    //此时尝试新建一个Worker来执行工作
    //如果新建一个Worker来执行工作失败,表明线程池状态不再是RUNNING或者Worker数量曾经达到最大线程数,此时执行回绝策略
    else if (!addWorker(command, false))
        reject(command);
}

execute()中会依据Worker数量和线程池状态来决定是新建Worker来执行工作还是将工作增加到工作阻塞队列。新建Worker来执行工作的实现如下所示。

private boolean addWorker(Runnable firstTask, boolean core) {
    //标记外层for循环
    retry:
    for (;;) {
        int c = ctl.get();
        //获取线程池状态
        int rs = runStateOf(c);

        //rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
        //线程池状态为RUNNING时,能够创立Worker
        //线程池状态为SHUTDOWN,且工作阻塞队列不为空时,能够创立初始工作为null的Worker
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            //获取Worker数量
            int wc = workerCountOf(c);
            
            //如果Worker数量大于CAPACITY,回绝创立Worker
            //core为true示意创立外围线程Worker,如果Worker数量此时曾经大于等于外围线程数,则回绝创立Worker,转而应该将工作增加到工作阻塞队列
            //core为false示意创立非核心线程Worker,如果Worker数量此时曾经大于等于最大线程数,则回绝创立Worker,转而应该执行回绝策略
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //以CAS形式将Worker数量加1
            //加1胜利表明无竞争产生,从外层for循环跳出
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //加1失败表明有竞争产生,此时须要从新获取ctl的值
            c = ctl.get();
            //从新获取ctl后如果发现线程池状态产生了扭转,此时从新执行外层for循环,即须要基于新的线程池状态判断是否容许创立Worker
            //从新获取ctl后如果线程池状态未产生扭转,则继续执行内层for循环,即尝试再一次以CAS形式将Worker数量加1
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //创立一个Worker
        w = new Worker(firstTask);
        //获取Worker的线程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            //因为线程池中存储Worker的汇合为HashSet,因而将Worker增加到Worker汇合时须要获取全局锁保障线程平安
            mainLock.lock();
            try {
                //再一次获取线程池状态
                int rs = runStateOf(ctl.get());

                //如果线程池状态还是为RUNNING或者线程池状态为SHUTDOWN但创立的Worker的初始工作为null,则容许将创立进去的Worker增加到汇合
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    //检查一下Worker的线程是否能够启动(处于活动状态的线程无奈再启动)
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    //将Worker增加到Worker汇合
                    workers.add(w);
                    int s = workers.size();
                    //largestPoolSize用于记录线程池最多存在过的Worker数
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //启动Worker线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            //Worker线程没有胜利启动起来,此时须要对该Worker的创立执行回滚操作
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorker()办法中只容许两种状况能够创立Worker

  • 线程池状态为RUNNING,能够创立Worker
  • 线程池状态为SHUTDOWN,且工作阻塞队列不为空,能够创立初始工作为null的Worker

一旦Worker创立胜利,就会将Worker的线程启动来,如果Worker创立失败或者Worker的线程启动失败,则会调用addWorkerFailed()办法执行回滚操作,其实现如下所示。

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //如果Worker增加到了Worker汇合中,则将Worker从Worker汇合中删除
        if (w != null)
            workers.remove(w);
        //以CAS形式将Worker数量减1
        decrementWorkerCount();
        //尝试终止线程池
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

因为Worker本身实现了Runnable,因而Worker本身就是一个工作,实际上Worker的线程执行的工作就是Worker自身,因而addWorker()中将Worker的线程启动时,会调用Workerrun()办法,其实现如下。

public void run() {
    runWorker(this);
}

Workerrun()办法中调用了ThreadPoolExecutorrunWorker()办法,其实现如下所示。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try {
        //如果task为null,则从工作阻塞队列中获取工作
        //通常Worker启动时会先执行初始工作,而后再去工作阻塞队列中获取工作
        while (task != null || (task = getTask()) != null) {
            w.lock();
            //线程池正在进行时,须要确保以后Worker的线程是被中断的
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //Worker执行工作产生异样或者从getTask()中获取工作为空时会执行这里的逻辑
        //processWorkerExit()会将Worker从Worker汇合中删除,并尝试终止线程池
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker()办法就是先让Worker将初始工作(如果有的话)执行完,而后循环从工作阻塞队列中获取工作来执行,如果Worker执行工作产生异样或者从工作阻塞队列获取工作失败(获取到的工作为null),则调用processWorkerExit()办法来将本身从Worker汇合中删除。上面先看一下getTask()办法的实现。

private Runnable getTask() {
    boolean timedOut = false;

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        //如果线程池状态为SHUTDOWN,且工作阻塞队列为空,则不再容许从工作阻塞队列中获取工作
        //如果线程池状态为STOP,则不再容许从工作阻塞队列中获取工作
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        //如果allowCoreThreadTimeOut为true,或者以后线程数大于外围线程数,此时timed为true,表明从工作阻塞队列以超时退出的形式获取工作
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        //如果以后线程数大于最大线程数,则以后Worker应该被删除
        //如果以后Worker上一次从工作阻塞队列中获取工作时超时退出,且工作阻塞队列当初还是为空,则以后Worker应该被删除
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            //从工作阻塞队列中获取工作
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                //获取到工作则返回该工作
                return r;
            //timedOut为true表明Worker上一次从工作阻塞队列中获取工作时超时退出
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

getTask()办法在如下状况不容许Worker从工作阻塞队列中获取工作。

  • 线程池状态为SHUTDOWN,且工作阻塞队列为空;
  • 线程池状态为STOP

如果Worker有资格从工作阻塞队列获取工作,那么当allowCoreThreadTimeOut为true,或者以后线程数大于外围线程数时,Worker以超时退出的形式获取工作,否则Worker以始终阻塞的形式获取工作。

WorkergetTask()办法中获取工作失败时,getTask()办法会返回null,从而导致Worker会执行processWorkerExit()办法来删除本身,其实现如下所示。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //completedAbruptly为true表明是执行工作时产生异样导致Worker须要被删除
    if (completedAbruptly)
        //修改Worker数量
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        //将Worker从Worker汇合中删除
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    //尝试终止线程池
    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return;
        }
        addWorker(null, false);
    }
}

WorkerprocessWorkerExit()办法中删除本身之后,还会调用tryTerminate()尝试终止线程池,tryTerminate()办法很精华,前面会对其进行详细分析,这里暂且不谈。至此,Worker的创立,执行工作,获取工作和删除的整个流程曾经大体剖析结束。上面对ThreadPoolExecutor中敞开线程池的shutdown()shutdownNow()办法进行剖析。

首先剖析shutdown()办法,其实现如下。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //循环通过CAS形式将线程池状态置为SHUTDOWN
        advanceRunState(SHUTDOWN);
        //中断闲暇Worker
        interruptIdleWorkers();
        onShutdown();
    } finally {
        mainLock.unlock();
    }
    //尝试终止线程池
    tryTerminate();
}

shutdown()办法中首先会将线程池状态置为SHUTDOWN,而后调用interruptIdleWorkers()办法中断闲暇Worker,最初调用tryTerminate()办法来尝试终止线程池。那么这里要解释一下什么是闲暇Worker,先看一下interruptIdleWorkers()的实现。

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            //中断线程前须要先尝试获取Worker的锁
            //只能获取到闲暇Worker的锁,所以shutdown()办法只会中断闲暇Worker
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

调用interruptIdleWorkers()办法中断Worker前首先须要尝试获取Worker的锁,已知Worker除了实现Runnable接口外,还继承于AbstractQueuedSynchronizer,因而Worker自身是一把锁,而后在runWorker()Worker执行工作前都会先获取Worker的锁,这里看一下Workerlock()办法的实现。

public void lock() { 
    acquire(1);
}

protected boolean tryAcquire(int unused) {
    //以CAS形式将state从0设置为1
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

能够发现,Workerlock()中调用了acquire()办法,该办法由AbstractQueuedSynchronizer抽象类提供,在acquire()中会调用其子类实现的tryAcquire()办法,tryAcquire()办法会以CAS形式将state从0设置为1,因而这样的设计让Worker是一把不可重入锁。回到interruptIdleWorkers()办法,后面提到该办法中断Worker前会尝试获取Worker的锁,可能获取到锁才会中断Worker,而因为Worker是不可重入锁,所以正在执行工作的Worker是无奈获取到锁的,只有那些没有执行工作的Worker的锁才可能被获取,因而所谓的中断闲暇Worker,理论就是中断没有执行工作的Worker,那些执行工作的Workershutdown()办法被调用时不会被中断,这些Worker执行完工作后会持续从工作阻塞队列中获取工作来执行,直到工作阻塞队列为空,此时没有被中断过的Worker也会被删除掉,等到线程池中没有Worker以及工作阻塞队列没有工作后,线程池才会被终止掉。

对于shutdown()办法,一句话总结就是:将线程池状态置为SHUTDOWN并拒绝接受新工作,等到线程池Worker数量为0,工作阻塞队列为空时,敞开线程池。

当初再来剖析shutdownNow()办法。

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //循环通过CAS形式将线程池状态置为STOP
        advanceRunState(STOP);
        //中断所有Worker
        interruptWorkers();
        //将工作阻塞队列中的工作获取进去并返回
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    //尝试终止线程池
    tryTerminate();
    return tasks;
}

private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //中断线程池中所有Worker
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}

shutdownNow()办法中首先会将线程池状态置为STOP,而后调用interruptWorkers()办法中断线程池中的所有Worker,接着调用tryTerminate()办法来尝试终止线程池,最初shutdownNow()办法会将工作阻塞队列中还未被执行的工作返回。shutdownNow()办法调用之后,线程池中的所有Worker都会被中断,包含正在执行工作的Worker,等到所有Worker都被删除之后,线程池即被终止,也就是说,shutdownNow()不会保障以后时刻正在执行的工作会被平安的执行完,并且会放弃执行工作阻塞队列中的所有工作。

对于线程池的敞开,还有一个重要的办法,那就是后面屡次提到的tryTerminate()办法,该办法能确保线程池能够被正确的敞开,其实现如下所示。

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        //如果线程池状态为RUNNING,则没有资格终止线程池
        //如果线程池状态大于等于TIDYING,则没有资格终止线程池
        //如果线程池状态为SHUTDOWN但工作阻塞队列不为空,则没有资格终止线程池
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        //线程池状态为SHUTDOWN且工作阻塞队列为空会执行到这里
        //线程池状态为STOP会执行到这里
        //Worker数量不为0,表明以后还有正在执行工作的Worker或者闲暇的Worker,此时中断一个闲暇的Worker
        //在这里被中断的闲暇Worker会在getTask()办法中返回null,从而执行processWorkerExit(),最终该Worker会被删除
        //processWorkerExit()办法中又会调用tryTerminate(),因而将shutdown信号在闲暇Worker之间进行了流传
        if (workerCountOf(c) != 0) {
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //将线程池状态置为TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    //终止线程池
                    terminated();
                } finally {
                    //将线程池状态最终置为TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
    }
}

tryTerminate()办法的官网正文中给出了两种线程池会被终止的状况:

  • 线程池的状态为SHUTDOWNWorker数量为0,工作阻塞队列为空;
  • 线程池的状态为STOPWorker数量为0。

官网正文中还阐明在所有可能导致线程池终止的操作中都应该调用tryTerminate()办法来尝试终止线程池,因而线程池中Worker被删除时工作阻塞队列中工作被删除时会调用tryTerminate(),以达到在线程池合乎终止条件时及时终止线程池。

大节:ThreadPoolExecutor执行工作的入口办法为execute(),如果Worker数量小于外围线程数,则创立Worker并执行工作,如果Worker数量大于等于外围线程数,则将工作增加到工作阻塞队列,如果工作阻塞队列已满但Worker数量小于最大线程数,则创立Worker并执行工作,如果Worker数量曾经大于等于最大线程数,此时执行回绝策略。执行完工作的Worker会调用getTask()办法从工作阻塞队列获取工作,如果Worker从工作阻塞队列获取工作失败那么线程池会删除这个Worker。调用shutdown()办法会置线程池状态为SHUTDOWN,此时会等到线程池Worker数量为0,工作阻塞队列为空时将线程池终止;调用shutdownNow()办法会置线程池状态为STOP,此时会等到线程池Worker数量为0时将线程池终止。

三. ScheduledThreadPoolExecutor的解析

ScheduledThreadPoolExecutor继承于ThreadPoolExecutor,扩大实现了延时执行工作定时执行工作的性能。ScheduledThreadPoolExecutor存储工作的队列为DelayedWorkQueue,是一个基于小根堆实现的延时优先级队列,ScheduledThreadPoolExecutor会将每一个提交到线程池的工作先封装为ScheduledFutureTask,而后再插入到DelayedWorkQueue中。上面将联合源码,对ScheduledThreadPoolExecutor的原理进行解析。

1. 工作提交

ScheduledThreadPoolExecutor提供了四个提交工作的办法,如下所示。

//提交无返回值的延时工作
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

//提交有返回值的延时工作
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay,
                                       TimeUnit unit) {
    if (callable == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<V> t = decorateTask(callable,
        new ScheduledFutureTask<V>(callable,
                                   triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

//提交固定周期的定时工作
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

//提交固定延时的定时工作
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

可知ScheduledThreadPoolExecutor反对执行如下的工作。

  • 无返回值的延时工作
  • 有返回值的延时工作
  • 固定周期的定时工作
  • 固定延时的定时工作

这里解释一下固定周期的定时工作固定延时的定时工作固定周期的定时工作的下一次执行工夫点为上一次执行工夫点加上周期时间,而固定延时的定时工作的下一次执行工夫点为上一次执行完结工夫点加上延时工夫。

在提交工作的四个办法中,均是将提交的工作封装为ScheduledFutureTask,在将工作封装成ScheduledFutureTask时须要指定工作首次执行的工夫点(即初始延时),和工作的执行距离(为正值示意固定周期,为负值示意固定时延,为0示意仅执行一次),而指定工作首次执行的工夫点时,为了避免工夫点的值不在长整型的最大值范畴内,须要在triggerTime()办法中进行解决,如下所示。

//获取延时操作的触发工夫
//触发工夫 = 以后工夫 + 延时工夫
private long triggerTime(long delay, TimeUnit unit) {
    return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}

//如果延时工夫大于等于长整型最大值的一半,则执行overflowFree()办法对延时工夫进行解决
//执行overflowFree()办法就是为了使得最终失去的触发工夫的值在长整型最大值以内,以避免compareTo()时值溢出
long triggerTime(long delay) {
    return now() +
        ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

封装好的ScheduledFutureTask会在delayedExecute()办法中增加到延时队列,如下所示。

private void delayedExecute(RunnableScheduledFuture<?> task) {
    //如果线程池状态不是RUNNING,则执行回绝策略
    if (isShutdown())
        reject(task);
    else {
        //将工作增加到延时队列中
        super.getQueue().add(task);
        //此时如果线程池状态为非RUNNING,并且线程池策略为非RUNNING状态下延时工作或定时工作不再执行,则将工作从延时队列中删除
        //将工作从延时队列中删除后,还须要敞开工作,如果工作尚未执行,那么敞开工作后工作就不会再被执行
        //如果工作正在被执行,则不会尝试中断执行工作的Worker来敞开工作
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            //创立初始工作为null的Worker,即便外围线程数为0也须要确保线程池至多有一个Worker在工作
            ensurePrestart();
    }
}

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

如果线程池状态为RUNNING,那么会将封装好的ScheduledFutureTask增加到延时队列,同时会创立一个初始工作为null的Worker,之所以创立的Worker初始工作为null,是因为ScheduledThreadPoolExecutor中的工作ScheduledFutureTask都会有一个初始延时,即提交到线程池的工作不会立刻被执行,所以Worker均须要到延时队列中去获取工作。

2. 工作-ScheduledFutureTask

已知提交到ScheduledThreadPoolExecutor中的工作均会被封装成ScheduledFutureTask,因而这里对ScheduledFutureTask的原理进行学习。

首先查看ScheduledFutureTask的类图,如下所示。

ScheduledFutureTask继承了FutureTask的性能,同时实现了RunnableScheduledFuture接口的办法。上面看一下ScheduledFutureTask的字段。

private class ScheduledFutureTask<V>
        extends FutureTask<V> implements RunnableScheduledFuture<V> {

    //工作增加到ScheduledThreadPoolExecutor中时调配的一个序列号
    private final long sequenceNumber;

    //工作下次执行的工夫点
    private long time;

    //工作的执行距离
    //大于0示意工作是以固定周期执行的工作
    //小于0示意工作是以固定延时执行的工作
    //等于0示意工作是非反复执行的工作
    private final long period;

    //指向当前任务自身
    RunnableScheduledFuture<V> outerTask = this;

    //工作在延时队列中的索引
    int heapIndex;

    ......

}

因为ScheduledFutureTask是作为元素存储在基于小根堆实现的延时优先级队列中,所以ScheduledFutureTask提供了sequenceNumber和time这两个字段用于堆中元素之间的比拟,而后heapIndex就是ScheduledFutureTask在延时队列中的索引,即元素在堆数组中的索引。

ScheduledFutureTask提供了三个构造方法,如下所示。

ScheduledFutureTask(Runnable r, V result, long ns) {
    super(r, result);
    this.time = ns;
    this.period = 0;
    this.sequenceNumber = sequencer.getAndIncrement();
}

ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);
    this.time = ns;
    this.period = period;
    this.sequenceNumber = sequencer.getAndIncrement();
}

ScheduledFutureTask(Callable<V> callable, long ns) {
    super(callable);
    this.time = ns;
    this.period = 0;
    this.sequenceNumber = sequencer.getAndIncrement();
}

ScheduledFutureTask的构造方法中,会先结构父类FutureTask,而后依据构造方法入参设置工作下次的执行工夫点和工作的执行距离,并为任务分配sequenceNumber。

上面再看一下ScheduledFutureTask是如何进行比拟的。ScheduledFutureTask实现了Comparable接口,其实现的compareTo()办法如下所示。

public int compareTo(Delayed other) {
    if (other == this)
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        //优先比拟工作下一次执行的工夫点先后,越先执行time越小
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        //如果工作下一次执行的工夫点雷同,则比拟sequenceNumber
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

两个ScheduledFutureTask进行比拟时,会先比拟下一次工作执行的工夫先后,如果下一次工作执行的工夫一样,则再依据sequenceNumber进行比拟。

最初再看一下ScheduledFutureTaskrun()办法,如下所示。

public void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        //如果以后线程池状态下不容许执行工作,则敞开工作
        cancel(false);
    else if (!periodic)
        //执行延时工作
        ScheduledFutureTask.super.run();
    //执行定时工作,而后为定时工作设置下一次执行的工夫点并增加到延时队列中
    else if (ScheduledFutureTask.super.runAndReset()) {
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}

ScheduledFutureTask在执行run()办法时会先依据periodic字段的值判断工作是延时工作(只在延时工夫到了后执行一次)还是定时工作(须要依据指定距离反复执行),如果是定时工作,那么在定时工作执行完之后,会为定时工作设置下一次执行的工夫点并从新增加到延时队列中。设置定时工作下一次执行的工夫点的setNextRunTime()实现如下。

//设置工作的下一次执行的工夫点
private void setNextRunTime() {
    long p = period;
    //如果是以固定周期执行的工作,则下一次执行的工夫点就是上一次执行的工夫点加上执行距离
    if (p > 0)
        time += p;
    //如果是以固定延时执行的工作,则下一次执行的工夫点就是以后工夫加上执行距离
    else
        time = triggerTime(-p);
}

3. 延时队列-DelayedWorkQueue

DelayedWorkQueueScheduledThreadPoolExecutor线程池应用的工作阻塞队列。DelayedWorkQueue是基于小根堆实现的延时优先级队列,队列中的元素就是ScheduledFutureTask,因而DelayedWorkQueue的队列头节点工作总是最优先被执行的工作。先看一下DelayedWorkQueue的字段,如下所示。

static class DelayedWorkQueue extends AbstractQueue<Runnable>
    implements BlockingQueue<Runnable> {

    //堆数组的初始大小
    private static final int INITIAL_CAPACITY = 16;
    
    //堆数组,数组中的元素实际上是ScheduledFutureTask
    private RunnableScheduledFuture<?>[] queue =
        new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
    
    private final ReentrantLock lock = new ReentrantLock();
    
    //延时队列元素个数
    private int size = 0;

    //在延时队列头期待工作的领导线程
    private Thread leader = null;

    private final Condition available = lock.newCondition();
    
    ......
    
}

特地阐明一下leader字段和available字段,首先是leader字段,示意在延时队列头期待工作的第一个线程,即如果延时队列头的工作须要被执行时,这个工作会被leader字段指向的线程取得。同时所有在延时队列头期待工作的线程,均会在available上进入期待状态,并且在延时队列头的工作须要被执行时或者延时队列头的工作被更新时唤醒所有在available上期待的线程。

已知DelayedWorkQueue是一个基于小根堆实现的延时优先级队列,那么往DelayedWorkQueue中插入和删除工作后,均须要放弃堆的性质,在DelayedWorkQueue中,次要是siftUp()siftDown()这两个办法来放弃堆的性质,siftUp()是用于往DelayedWorkQueue中插入工作时来放弃堆的性质,而siftDown()是用于DelayedWorkQueue弹出工作后放弃堆的性质,其实现如下。

siftUp()实现如下所示。

private void siftUp(int k, RunnableScheduledFuture<?> key) {
    while (k > 0) {
        //计算父节点索引
        int parent = (k - 1) >>> 1;
        RunnableScheduledFuture<?> e = queue[parent];
        //将插入元素与父节点元素进行比拟
        //如果插入元素大于等于父节点元素,则循环完结
        if (key.compareTo(e) >= 0)
            break;
        //如果插入元素小于父节点元素,则插入元素与父节点元素调换地位
        queue[k] = e;
        setIndex(e, k);
        k = parent;
    }
    queue[k] = key;
    setIndex(key, k);
}

siftDown()实现如下所示。

private void siftDown(int k, RunnableScheduledFuture<?> key) {
    int half = size >>> 1;
    while (k < half) {
        //计算左子节点索引,并将左子节点索引赋值给child
        int child = (k << 1) + 1;
        RunnableScheduledFuture<?> c = queue[child];
        //计算右子节点索引
        int right = child + 1;
        //令c示意左子节点和右子节点中元素值更小的元素
        //令child示意左子节点和右子节点中元素值更小的节点索引
        if (right < size && c.compareTo(queue[right]) > 0)
            c = queue[child = right];
        //将以后元素值与c的值进行比拟,如果以后元素值曾经小于等于c的值,则退出循环
        if (key.compareTo(c) <= 0)
            break;
        //如果以后元素值大于c的值,则将以后元素与c调换地位
        queue[k] = c;
        setIndex(c, k);
        k = child;
    }
    queue[k] = key;
    setIndex(key, k);
}

了解了siftUp()siftDown()这两个办法之后,先来看一下DelayedWorkQueue中增加工作的实现。因为DelayedWorkQueue实现了BlockingQueue接口,因而对外提供了put()add()offer()和超时退出的offer()这四个办法来增加工作,然而因为DelayedWorkQueue在容量满时会进行扩容,能够当成一个无界队列来对待,所以DelayedWorkQueueput()add()和超时退出的offer()办法均是调用的offer()办法,上面看一下offer()办法的实现。

public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        //延时队列已满时须要进行扩容解决
        if (i >= queue.length)
            //扩容后容量为扩容前容量的1.5倍
            grow();
        size = i + 1;
        if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
        } else {
            //向延时队列插入工作
            //与向堆插入元素一样,将工作插入到工作堆的开端节点,并逐渐与父节点进行比拟和替换,直到满足堆的性质为止
            siftUp(i, e);
        }
        if (queue[0] == e) {
            //如果插入的工作最终成为延时队列头节点工作,那么重置领导线程leader并唤醒所有期待获取工作的线程
            leader = null;
            available.signal();
        }
    } finally {
        lock.unlock();
    }
    return true;
}

同样的,DelayedWorkQueue对外提供了remove()poll()take()和超时退出的poll()这四个办法来移除或获取工作,这里重点剖析一下take()和超时退出的poll()这两个办法。

take()的实现如下所示。

public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            //如果头节点工作为空,间接进入期待状态
            if (first == null)
                available.await();
            else {
                //delay示意延时队列头节点工作的残余等待时间
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    //如果延时队列头节点工作的残余等待时间小于等于0,则弹出头节点工作并放弃堆性质
                    return finishPoll(first);
                first = null;
                if (leader != null)
                    //如果曾经存在领导线程,则进入期待状态
                    available.await();
                else {
                    //如果不存在领导线程,则将以后Worker的线程置为领导线程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        //期待delay的工夫,即等到延时队列头工作能够执行
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

take()办法中,首先判断延时队列头节点工作是否为空,如果为空则间接在available上进入期待状态,如果不为空则再判断工作是否曾经能够执行,若能够执行则间接弹出工作并返回,若还不能执行那么就再判断领导线程是否曾经存在,如果存在那么阐明以后线程不是在延时队列头期待工作的第一个线程,须要在available上进入期待状态,如果不存在就阐明以后线程是在延时队列头期待工作的第一个线程,须要将以后线程置为领导线程,而后在available上进入期待状态直到头节点工作能够执行。

超时退出的poll()take()办法的大体实现一样,只是超时退出的poll()还须要额定退出对Worker从延时队列获取工作的等待时间的判断,其实现如下所示。

public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
    throws InterruptedException {
    //nanos示意Worker从延时队列获取工作的等待时间
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            //延时队列为空时
            //如果nanos小于等于0,则间接返回null
            //如果nanos大于0,则进入期待状态并期待nanos的工夫
            if (first == null) {
                if (nanos <= 0)
                    return null;
                else
                    nanos = available.awaitNanos(nanos);
            } else {
                //delay示意延时队列头节点工作的残余等待时间
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    //如果延时队列头节点工作的残余等待时间小于等于0,则弹出头节点工作并放弃堆性质
                    return finishPoll(first);
                if (nanos <= 0)
                    //如果Worker从延时队列获取工作的等待时间小于等于0,则返回null
                    return null;
                first = null;
                //如下状况会进入期待状态并期待nanos的工夫
                //Worker从延时队列获取工作的等待时间小于延时队列头节点工作的残余等待时间
                //Worker从延时队列获取工作的等待时间大于等于延时队列头节点工作的残余等待时间,但领导线程不为空
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                else {
                    //如果Worker从延时队列获取工作的等待时间大于等于延时队列头节点工作的残余等待时间,且领导线程为空
                    //将领导线程置为以后Worker的线程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        //期待delay的工夫,即等到延时队列头工作能够执行
                        long timeLeft = available.awaitNanos(delay);
                        //从新计算Worker从延时队列获取工作的等待时间
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

4. 大节

本节次要是对ScheduledThreadPoolExecutor工作提交工作延时优先级队列进行了剖析,了解了这三个局部,也就会对ScheduledThreadPoolExecutor的大抵工作原理有一个较为清晰的意识。而ScheduledThreadPoolExecutor敞开线程池是复用的ThreadPoolExecutor中的shutdown()shutdownNow办法,故本节不再赘述。

四. 工作和工作的执行后果

1. 工作

Executor框架中,Runnable接口和Callable接口用于定义工作,Runnable接口的实现类能够被ThreadThreadPoolExecutorScheduledThreadPoolExecutor执行,Callable接口的实现类能够被ThreadPoolExecutorScheduledThreadPoolExecutor执行,此外,Runnable接口和Callable接口最大的不同在于:Runnable工作没有返回值,Callable工作有返回值。

Executor框架提供的工具类Executors能够将Runnable封装成Callable,办法签名如下所示。

public static Callable<Object> callable(Runnable task)

public static <T> Callable<T> callable(Runnable task, T result)

2. 工作的执行后果

Executor框架提供了Future接口来示意工作的异步计算结果Future接口定义如下所示。

public interface Future<V> {

    //敞开工作的执行,在工作曾经执行结束或者曾经被敞开时,返回false
    //在该办法调用时如果工作正在被执行,mayInterruptIfRunning决定是否打断执行工作的线程来进行工作
    boolean cancel(boolean mayInterruptIfRunning);

    //判断工作是否被敞开过
    //如果在工作执行完之前执行过敞开工作的操作,该办法返回true
    boolean isCancelled();

    //判断工作是否执行结束
    //因为中断,异样和敞开而导致的工作执行结束,该办法均会返回true
    boolean isDone();

    //期待工作执行结束并获取执行后果
    V get() throws InterruptedException, ExecutionException;

    //在指定工夫内期待工作执行结束并获取执行后果
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

此外,Executor框架中还有一个RunnableFuture接口,该接口继承于Runnable接口和Future接口,即该接口的实现类即能够作为Runnable被执行,也能作为Future获取执行后果。

所以Executor框架提供了一个RunnableFuture接口的实现类FutureTask,而且,在调用ThreadPoolExecutorsubmit()办法提交工作(Runnable或者Callable)或者向ScheduledThreadPoolExecutor提交工作(Runnable或者Callable)时,所提交的工作均会被封装为FutureTask,而后封装成FutureTask的工作会作为Runnable被增加到工作阻塞队列中,同时也会作为Future被返回。

FutureTask的类图如下所示。

3. FutureTask

FutureTaskExecutor框架中重要的组件,上面对其原理进行学习。FutureTask的关键字段如下所示。

public class FutureTask<V> implements RunnableFuture<V> {

    //工作状态,会有以下几种变动状况:
    //NEW -> COMPLETING -> NORMAL
    //NEW -> COMPLETING -> EXCEPTIONAL
    //NEW -> CANCELLED
    //NEW -> INTERRUPTING -> INTERRUPTED
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    //须要被执行的工作
    private Callable<V> callable;
    //工作执行的后果或抛出的异样
    private Object outcome;
    //正在执行工作的线程
    private volatile Thread runner;
    //记录调用get()办法期待的线程
    private volatile WaitNode waiters;
    
    ......

}

对于FutureTask的状态字段稍后再剖析,当初先看一下FutureTask中的须要被执行的工作callable字段,可知该字段为Callable接口,而后面曾经剖析晓得无论是ThreadPoolExecutor还是ScheduledThreadPoolExecutor,均能够将RunnableCallable封装为FutureTask,而FutureTask中却只有Callable,那么必定是在某个中央将Runnable转换为了Callable,这个中央就是FutureTask的构造函数,如下所示。

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;
}

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;
}

能够看到,FutureTask的构造函数中,如果传进来的工作为Runnable,那么会应用工具类ExecutorsRunnable转换为Callable。同时构造函数中还会将状态设置为NEW

上面剖析FutureTaskrun()办法,源码如下所示。

public void run() {
    //判断工作状态是否为NEW,状态为NEW的工作才容许被执行
    //如果工作状态为NEW,则以CAS形式将工作的runner字段设置为以后执行工作的线程,设置胜利才容许执行工作
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //执行工作
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                //工作执行时抛出异样,将异样赋值给outcome字段
                //而后以CAS形式将状态先置为COMPLETING,而后置为EXCEPTIONAL,最初唤醒所有调用get()办法进入期待的线程
                setException(ex);
            }
            if (ran)
                //工作执行时未抛出异样,将执行后果赋值给outcome字段
                //而后以CAS形式将状态先置为COMPLETING,而后置为NORMAL,最初唤醒所有调用get()办法进入期待的线程
                set(result);
        }
    } finally {
        //在工作执行完结后将runner字段置为null
        //在工作执行完结以前runner字段不能为null,以避免工作被并发屡次执行
        runner = null;
        int s = state;
        if (s >= INTERRUPTING)
            //执行到这里,示意工作在执行时(工作状态为NEW)被调用了cancel(true)办法,并且cancel(true)中将工作状态置为了INTERRUPTING或INTERRUPTED
            //如果工作状态为INTERRUPTING,则循环调用Thread.yield()来放弃工夫片,直到工作状态变为INTERRUPTED
            handlePossibleCancellationInterrupt(s);
    }
}

run()办法中,次要的步骤如下。

  • 先在工作状态为NEW的状况下以CAS形式将执行工作的线程runner字段置为以后线程,工作状态不为NEW或者CAS失败,都间接退出run()办法,避免工作被并发屡次执行;
  • 工作执行胜利或者工作执行抛出异样,会别离以CAS形式将工作状态先置为COMPLETING,如果CAS胜利,之后调用cancel()办法会间接返回false,然而如果CAS操作之前先调用了cancel()办法,那么会导致CAS失败,此时工作的状态为INTERRUPTINGCANCELLED或者INTERRUPTED
  • run()办法最初会判断工作的状态是否为INTERRUPTINGINTERRUPTED,如果满足,表明在工作执行结束并翻转工作状态之前cancel(true)办法被调用了,此时如果工作状态为INTERRUPTING,则循环调用Thread.yield()来放弃工夫片以期待工作状态翻转为INTERRUPTED

同时FutureTask还提供了一个runAndReset()办法,该办法与run()办法略有不同,如下所示。

  • runAndReset()不关怀工作的执行后果,即不会将工作执行后果赋值给outcome字段;
  • runAndReset()不会被动去翻转工作的状态,即工作失常执行结束之后状态还是为NEW,以实用于工作须要屡次被执行的状况,比方定时工作。

上面剖析FutureTaskget()办法,FutureTask提供了始终期待直到工作执行结束的get()办法,和指定等待时间的get()办法,如下所示。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

get()办法中,会先在awaitDone()办法中获取工作状态,如果工作状态为NEW,则进入期待状态。获取到工作状态之后,在report()办法中依据获取到的工作状态来返回工作执行后果。

awaitDone()办法实现如下所示。

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            //调用get()办法的线程如果被中断,则将其从期待链表中删除,并抛出中断异样
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {
            //如果工作状态大于COMPLETING,阐明工作执行结束,或抛出异样,或被调用了cancel()办法
            if (q != null)
                q.thread = null;
            //返回工作的状态
            return s;
        }
        else if (s == COMPLETING)
            //如果工作状态为COMPLETING,则放弃工夫片,目标是为了调用get()办法的线程再次取得工夫片时工作状态翻转为NORMAL或者EXCEPTIONAL
            Thread.yield();
        else if (q == null)
            //执行到这里,阐明工作状态为NEW
            //基于调用get()办法的线程创立一个WaitNode节点
            q = new WaitNode();
        else if (!queued)
            //如果WaitNode节点没有增加到期待链表,则将其退出到期待链表中
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            //如果调用get()办法时指定了等待时间,则应用LockSupport进入期待状态并指定等待时间
            //如果等待时间到,工作状态还是NEW,则移除WaitNode节点并返回工作状态
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            //调用get()办法的线程进入期待状态
            LockSupport.park(this);
    }
}

awaitDone()中,次要步骤如下所示。

  • 如果工作状态大于COMPLETING,即工作执行结束,或抛出异样,或被调用了cancel()办法,此时返回工作状态;
  • 如果工作状态为COMPLETING,示意工作曾经执行结束(或抛出异样),然而执行后果尚未赋值给outcome字段,此时调用Thread.yield()放弃工夫片,因为工作状态从COMPLETINGNORMALEXCEPTIONAL转换的工夫十分短,所以Thread.yield()可能让调用get()办法的线程更快的响应工作状态的转换,最终目标是为了让调用get()办法的线程再次取得工夫片时工作状态曾经翻转为NORMAL或者EXCEPTIONAL
  • 如果工作状态为NEW,那么阐明工作还未开始执行或者工作正在执行,此时基于调用get()办法的线程创立一个WaitNode节点并退出期待链表中;
  • 调用get()办法的线程进入期待状态,只有在等待时间到(如果指定了的话),或者工作执行结束,或者工作执行抛出异样,或者cancel()办法被调用时,所有期待线程才会完结期待状态。

report()办法的实现如下所示。

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

上面最初看一下cancel()办法的实现,如下所示。

public boolean cancel(boolean mayInterruptIfRunning) {
    //如果工作状态不是NEW,则返回false
    //如果以CAS形式将工作状态置为INTERRUPTING或者CANCELLED失败,则返回false
    //即工作曾经执行结束或者曾经被敞开时,返回false
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {
        //mayInterruptIfRunning为true示意须要中断正在执行工作的线程,并最终将工作状态置为INTERRUPTED
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally {
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        //唤醒所有调用get()办法进入期待的线程
        finishCompletion();
    }
    return true;
}

cancel()办法调用时如果工作曾经执行结束(状态为COMPLETINGNORMALEXCEPTIONAL),或者工作曾经被敞开(状态为CANCELLEDINTERRUPTINGINTERRUPTED),则间接返回false,并且会依据mayInterruptIfRunning参数来决定是否会中断正在执行工作的线程。

4. 大节

Executor框架中,Runnable接口和Callable接口示意须要被执行的工作,不同在于前者无返回值而后者有返回值。Future接口示意异步计算的后果,同时RunnableFuture接口继承了Runnable接口和Future接口,RunnableFuture接口的实现类FutureTask既能作为Runnable工作来执行,也能作为Future来获取计算结果。

总结

本篇文章对Executor框架的次要组件进行了学习,次要分为工作线程执行工作工作执行后果三局部,具体学习了工作相干接口Runnable接口和Callable接口,线程执行工作相干组件ThreadPoolExecutorScheduledThreadPoolExecutor线程池,以及执行后果相干组件FutureTask

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理