关于高并发:高并发深度解析ScheduledThreadPoolExecutor类的源代码

在【高并发专题】的专栏中,咱们深度剖析了ThreadPoolExecutor类的源代码,而ScheduledThreadPoolExecutor类是ThreadPoolExecutor类的子类。明天咱们就来一起手撕ScheduledThreadPoolExecutor类的源代码。

构造方法

咱们先来看下ScheduledThreadPoolExecutor的构造方法,源代码如下所示。

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}

public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
}

public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

从代码构造上来看,ScheduledThreadPoolExecutor类是ThreadPoolExecutor类的子类,ScheduledThreadPoolExecutor类的构造方法实际上调用的是ThreadPoolExecutor类的构造方法。

schedule办法

接下来,咱们看一下ScheduledThreadPoolExecutor类的schedule办法,源代码如下所示。

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    //如果传递的Runnable对象和TimeUnit工夫单位为空
    //抛出空指针异样
    if (command == null || unit == null)
        throw new NullPointerException();
    //封装工作对象,在decorateTask办法中间接返回ScheduledFutureTask对象
    RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
    //执行延时工作
    delayedExecute(t);
    //返回工作
    return t;
}

public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) 
    //如果传递的Callable对象和TimeUnit工夫单位为空
    //抛出空指针异样
    if (callable == null || unit == null)
        throw new NullPointerException();
    //封装工作对象,在decorateTask办法中间接返回ScheduledFutureTask对象
    RunnableScheduledFuture<V> t = decorateTask(callable,
        new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));
    //执行延时工作
    delayedExecute(t);
    //返回工作
    return t;
}

从源代码能够看出,ScheduledThreadPoolExecutor类提供了两个重载的schedule办法,两个schedule办法的第一个参数不同。能够传递Runnable接口对象,也能够传递Callable接口对象。在办法外部,会将Runnable接口对象和Callable接口对象封装成RunnableScheduledFuture对象,实质上就是封装成ScheduledFutureTask对象。并通过delayedExecute办法来执行延时工作。

在源代码中,咱们看到两个schedule都调用了decorateTask办法,接下来,咱们就看看decorateTask办法。

decorateTask办法

decorateTask办法源代码如下所示。

protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
    return task;
}

protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
    return task;
}

通过源码能够看出decorateTask办法的实现比较简单,接管一个Runnable接口对象或者Callable接口对象和封装的RunnableScheduledFuture工作,两个办法都是将RunnableScheduledFuture工作间接返回。在ScheduledThreadPoolExecutor类的子类中能够重写这两个办法。

接下来,咱们持续看下scheduleAtFixedRate办法。

scheduleAtFixedRate办法

scheduleAtFixedRate办法源代码如下所示。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
    //传入的Runnable对象和TimeUnit为空,则抛出空指针异样
    if (command == null || unit == null)
        throw new NullPointerException();
    //如果执行周期period传入的数值小于或者等于0
    //抛出非法参数异样
    if (period <= 0)
        throw new IllegalArgumentException();
    //将Runnable对象封装成ScheduledFutureTask工作,
    //并设置执行周期
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period));
    //调用decorateTask办法,实质上还是间接返回ScheduledFutureTask对象
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    //设置执行的工作
    sft.outerTask = t;
    //执行延时工作
    delayedExecute(t);
    //返回执行的工作
    return t;
}

通过源码能够看出,scheduleAtFixedRate办法将传递的Runnable对象封装成ScheduledFutureTask工作对象,并设置了执行周期,下一次的执行工夫绝对于上一次的执行工夫来说,加上了period时长,时长的具体单位由TimeUnit决定。采纳固定的频率来执行定时工作。

ScheduledThreadPoolExecutor类中另一个定时调度工作的办法是scheduleWithFixedDelay办法,接下来,咱们就一起看看scheduleWithFixedDelay办法。

scheduleWithFixedDelay办法

scheduleWithFixedDelay办法的源代码如下所示。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
    //传入的Runnable对象和TimeUnit为空,则抛出空指针异样
    if (command == null || unit == null)
        throw new NullPointerException();
    //工作延时时长小于或者等于0,则抛出非法参数异样
    if (delay <= 0)
        throw new IllegalArgumentException();
    //将Runnable对象封装成ScheduledFutureTask工作
    //并设置固定的执行周期来执行工作
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command, null,triggerTime(initialDelay, unit), unit.toNanos(-delay));
    //调用decorateTask办法,实质上间接返回ScheduledFutureTask工作
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    //设置执行的工作
    sft.outerTask = t;
    //执行延时工作
    delayedExecute(t);
    //返回工作
    return t;
}

从scheduleWithFixedDelay办法的源代码,咱们能够看出在将Runnable对象封装成ScheduledFutureTask时,设置了执行周期,然而此时设置的执行周期与scheduleAtFixedRate办法设置的执行周期不同。此时设置的执行周期规定为:下一次工作执行的工夫是上一次工作实现的工夫加上delay时长,时长单位由TimeUnit决定。也就是说,具体的执行工夫不是固定的,然而执行的周期是固定的,整体采纳的是绝对固定的提早来执行定时工作。

如果大家仔细的话,会发现在scheduleWithFixedDelay办法中设置执行周期时,传递的delay值为正数,如下所示。

ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay));

这里的正数示意的是绝对固定的提早。

在ScheduledFutureTask类中,存在一个setNextRunTime办法,这个办法会在run办法执行完工作后调用,这个办法更能体现scheduleAtFixedRate办法和scheduleWithFixedDelay办法的不同,setNextRunTime办法的源码如下所示。

private void setNextRunTime() {
    //间隔下次执行工作的时长
    long p = period;
    //固定频率执行,
    //上次执行工作的工夫
    //加上工作的执行周期
    if (p > 0)
        time += p;
    //绝对固定的提早
    //应用的是零碎以后工夫
    //加上工作的执行周期
    else
        time = triggerTime(-p);
}

在setNextRunTime办法中通过对下次执行工作的时长进行判断来确定是固定频率执行还是绝对固定的提早。

triggerTime办法

在ScheduledThreadPoolExecutor类中提供了两个triggerTime办法,用于获取下一次执行工作的具体工夫。triggerTime办法的源码如下所示。

private long triggerTime(long delay, TimeUnit unit) {
    return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}

long triggerTime(long delay) {
    return now() +
        ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

这两个triggerTime办法的代码比较简单,就是获取下一次执行工作的具体工夫。有一点须要留神的是:delay < (Long.MAX_VALUE >> 1判断delay的值是否小于Long.MAX_VALUE的一半,如果小于Long.MAX_VALUE值的一半,则间接返回delay,否则须要解决溢出的状况。

咱们看到在triggerTime办法中解决避免溢出的逻辑应用了overflowFree办法,接下来,咱们就看看overflowFree办法的实现。

overflowFree办法

overflowFree办法的源代码如下所示。

private long overflowFree(long delay) {
    //获取队列中的节点
    Delayed head = (Delayed) super.getQueue().peek();
    //获取的节点不为空,则进行后续解决
    if (head != null) {
        //从队列节点中获取延迟时间
        long headDelay = head.getDelay(NANOSECONDS);
        //如果从队列中获取的延迟时间小于0,并且传递的delay
        //值减去从队列节点中获取延迟时间小于0
        if (headDelay < 0 && (delay - headDelay < 0))
            //将delay的值设置为Long.MAX_VALUE + headDelay
            delay = Long.MAX_VALUE + headDelay;
    }
    //返回延迟时间
    return delay;
}

通过对overflowFree办法的源码剖析,能够看出overflowFree办法实质上就是为了限度队列中的所有节点的延迟时间在Long.MAX_VALUE值之内,避免在ScheduledFutureTask类中的compareTo办法中溢出。

ScheduledFutureTask类中的compareTo办法的源码如下所示。

public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

compareTo办法的次要作用就是对各提早工作进行排序,间隔下次执行工夫靠前的工作就排在后面。

delayedExecute办法

delayedExecute办法是ScheduledThreadPoolExecutor类中提早执行工作的办法,源代码如下所示。

private void delayedExecute(RunnableScheduledFuture<?> task) {
    //如果以后线程池曾经敞开
    //则执行线程池的回绝策略
    if (isShutdown())
        reject(task);
    //线程池没有敞开
    else {
        //将工作增加到阻塞队列中
        super.getQueue().add(task);
        //如果以后线程池是SHUTDOWN状态
        //并且以后线程池状态下不能执行工作
        //并且胜利从阻塞队列中移除工作
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            //勾销工作的执行,但不会中断执行中的工作
            task.cancel(false);
        else
            //调用ThreadPoolExecutor类中的ensurePrestart()办法
            ensurePrestart();
    }
}

能够看到在delayedExecute办法外部调用了canRunInCurrentRunState办法,canRunInCurrentRunState办法的源码实现如下所示。

boolean canRunInCurrentRunState(boolean periodic) {
    return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown);
}

能够看到canRunInCurrentRunState办法的逻辑比较简单,就是判断线程池以后状态下可能执行工作。

另外,在delayedExecute办法外部还调用了ThreadPoolExecutor类中的ensurePrestart()办法,接下来,咱们看下ThreadPoolExecutor类中的ensurePrestart()办法的实现,如下所示。

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

在ThreadPoolExecutor类中的ensurePrestart()办法中,首先获取以后线程池中线程的数量,如果线程数量小于corePoolSize则调用addWorker办法传递null和true,如果线程数量为0,则调用addWorker办法传递null和false。

对于addWork()办法的源码解析,大家能够参考【高并发专题】中的《高并发之——通过ThreadPoolExecutor类的源码深度解析线程池执行工作的外围流程》一文,这里,不再赘述。

reExecutePeriodic办法

reExecutePeriodic办法的源代码如下所示。

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    //线程池以后状态下可能执行工作
    if (canRunInCurrentRunState(true)) {
        //将工作放入队列
        super.getQueue().add(task);
        //线程池以后状态下不能执行工作,并且胜利移除工作
        if (!canRunInCurrentRunState(true) && remove(task))
            //勾销工作
            task.cancel(false);
        else
            //调用ThreadPoolExecutor类的ensurePrestart()办法
            ensurePrestart();
    }
}

总体来说reExecutePeriodic办法的逻辑比较简单,然而,这里须要留神和delayedExecute办法的不同点:调用reExecutePeriodic办法的时候曾经执行过一次工作,所以,并不会触发线程池的回绝策略;传入reExecutePeriodic办法的工作肯定是周期性的工作。

onShutdown办法

onShutdown办法是ThreadPoolExecutor类中的钩子函数,它是在ThreadPoolExecutor类中的shutdown办法中调用的,而在ThreadPoolExecutor类中的onShutdown办法是一个空办法,如下所示。

void onShutdown() {
}

ThreadPoolExecutor类中的onShutdown办法交由子类实现,所以ScheduledThreadPoolExecutor类覆写了onShutdown办法,实现了具体的逻辑,ScheduledThreadPoolExecutor类中的onShutdown办法的源码实现如下所示。

@Override
void onShutdown() {
    //获取队列
    BlockingQueue<Runnable> q = super.getQueue();
    //在线程池曾经调用shutdown办法后,是否继续执行现有提早工作
    boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
    //在线程池曾经调用shutdown办法后,是否继续执行现有定时工作
    boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
    //在线程池曾经调用shutdown办法后,不继续执行现有提早工作和定时工作
    if (!keepDelayed && !keepPeriodic) {
        //遍历队列中的所有工作
        for (Object e : q.toArray())
            //勾销工作的执行
            if (e instanceof RunnableScheduledFuture<?>)
                ((RunnableScheduledFuture<?>) e).cancel(false);
        //清空队列
        q.clear();
    }
    //在线程池曾经调用shutdown办法后,继续执行现有提早工作和定时工作
    else {
        //遍历队列中的所有工作
        for (Object e : q.toArray()) {
            //当前任务是RunnableScheduledFuture类型
            if (e instanceof RunnableScheduledFuture) {
                //将工作强转为RunnableScheduledFuture类型
                RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
                //在线程池调用shutdown办法后不持续的提早工作或周期工作
                //则从队列中删除并勾销工作
                if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
                    t.isCancelled()) {
                    if (q.remove(t))
                        t.cancel(false);
                }
            }
        }
    }
    //最终调用tryTerminate()办法
    tryTerminate();
}

ScheduledThreadPoolExecutor类中的onShutdown办法的次要逻辑就是先判断线程池调用shutdown办法后,是否继续执行现有的提早工作和定时工作,如果不再执行,则勾销工作并清空队列;如果继续执行,将队列中的工作强转为RunnableScheduledFuture对象之后,从队列中删除并勾销工作。大家须要好好了解这两种解决形式。最初调用ThreadPoolExecutor类的tryTerminate办法。无关ThreadPoolExecutor类的tryTerminate办法的源码解析,大家能够参考【高并发专题】中的《高并发之——通过源码深度剖析线程池中Worker线程的执行流程》一文,这里不再赘述。

至此,ScheduledThreadPoolExecutor类中的外围办法的源代码,咱们就剖析完了。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理