关于高并发:高并发深度解析ScheduledThreadPoolExecutor类的源代码

11次阅读

共计 9859 个字符,预计需要花费 25 分钟才能阅读完成。

在【高并发专题】的专栏中,咱们深度剖析了 ThreadPoolExecutor 类的源代码,而 ScheduledThreadPoolExecutor 类是 ThreadPoolExecutor 类的子类。明天咱们就来一起手撕 ScheduledThreadPoolExecutor 类的源代码。

构造方法

咱们先来看下 ScheduledThreadPoolExecutor 的构造方法,源代码如下所示。

public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}

public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
}

public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

从代码构造上来看,ScheduledThreadPoolExecutor 类是 ThreadPoolExecutor 类的子类,ScheduledThreadPoolExecutor 类的构造方法实际上调用的是 ThreadPoolExecutor 类的构造方法。

schedule 办法

接下来,咱们看一下 ScheduledThreadPoolExecutor 类的 schedule 办法,源代码如下所示。

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    // 如果传递的 Runnable 对象和 TimeUnit 工夫单位为空
    // 抛出空指针异样
    if (command == null || unit == null)
        throw new NullPointerException();
    // 封装工作对象,在 decorateTask 办法中间接返回 ScheduledFutureTask 对象
    RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
    // 执行延时工作
    delayedExecute(t);
    // 返回工作
    return t;
}

public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) 
    // 如果传递的 Callable 对象和 TimeUnit 工夫单位为空
    // 抛出空指针异样
    if (callable == null || unit == null)
        throw new NullPointerException();
    // 封装工作对象,在 decorateTask 办法中间接返回 ScheduledFutureTask 对象
    RunnableScheduledFuture<V> t = decorateTask(callable,
        new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));
    // 执行延时工作
    delayedExecute(t);
    // 返回工作
    return t;
}

从源代码能够看出,ScheduledThreadPoolExecutor 类提供了两个重载的 schedule 办法,两个 schedule 办法的第一个参数不同。能够传递 Runnable 接口对象,也能够传递 Callable 接口对象。在办法外部,会将 Runnable 接口对象和 Callable 接口对象封装成 RunnableScheduledFuture 对象,实质上就是封装成 ScheduledFutureTask 对象。并通过 delayedExecute 办法来执行延时工作。

在源代码中,咱们看到两个 schedule 都调用了 decorateTask 办法,接下来,咱们就看看 decorateTask 办法。

decorateTask 办法

decorateTask 办法源代码如下所示。

protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {return task;}

protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {return task;}

通过源码能够看出 decorateTask 办法的实现比较简单,接管一个 Runnable 接口对象或者 Callable 接口对象和封装的 RunnableScheduledFuture 工作,两个办法都是将 RunnableScheduledFuture 工作间接返回。在 ScheduledThreadPoolExecutor 类的子类中能够重写这两个办法。

接下来,咱们持续看下 scheduleAtFixedRate 办法。

scheduleAtFixedRate 办法

scheduleAtFixedRate 办法源代码如下所示。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
    // 传入的 Runnable 对象和 TimeUnit 为空,则抛出空指针异样
    if (command == null || unit == null)
        throw new NullPointerException();
    // 如果执行周期 period 传入的数值小于或者等于 0
    // 抛出非法参数异样
    if (period <= 0)
        throw new IllegalArgumentException();
    // 将 Runnable 对象封装成 ScheduledFutureTask 工作,// 并设置执行周期
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period));
    // 调用 decorateTask 办法,实质上还是间接返回 ScheduledFutureTask 对象
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    // 设置执行的工作
    sft.outerTask = t;
    // 执行延时工作
    delayedExecute(t);
    // 返回执行的工作
    return t;
}

通过源码能够看出,scheduleAtFixedRate 办法将传递的 Runnable 对象封装成 ScheduledFutureTask 工作对象,并设置了执行周期,下一次的执行工夫绝对于上一次的执行工夫来说,加上了 period 时长,时长的具体单位由 TimeUnit 决定。采纳固定的频率来执行定时工作。

ScheduledThreadPoolExecutor 类中另一个定时调度工作的办法是 scheduleWithFixedDelay 办法,接下来,咱们就一起看看 scheduleWithFixedDelay 办法。

scheduleWithFixedDelay 办法

scheduleWithFixedDelay 办法的源代码如下所示。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
    // 传入的 Runnable 对象和 TimeUnit 为空,则抛出空指针异样
    if (command == null || unit == null)
        throw new NullPointerException();
    // 工作延时时长小于或者等于 0,则抛出非法参数异样
    if (delay <= 0)
        throw new IllegalArgumentException();
    // 将 Runnable 对象封装成 ScheduledFutureTask 工作
    // 并设置固定的执行周期来执行工作
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command, null,triggerTime(initialDelay, unit), unit.toNanos(-delay));
    // 调用 decorateTask 办法,实质上间接返回 ScheduledFutureTask 工作
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    // 设置执行的工作
    sft.outerTask = t;
    // 执行延时工作
    delayedExecute(t);
    // 返回工作
    return t;
}

从 scheduleWithFixedDelay 办法的源代码,咱们能够看出在将 Runnable 对象封装成 ScheduledFutureTask 时,设置了执行周期,然而此时设置的执行周期与 scheduleAtFixedRate 办法设置的执行周期不同。此时设置的执行周期规定为:下一次工作执行的工夫是上一次工作实现的工夫加上 delay 时长,时长单位由 TimeUnit 决定。也就是说,具体的执行工夫不是固定的,然而执行的周期是固定的,整体采纳的是绝对固定的提早来执行定时工作。

如果大家仔细的话,会发现在 scheduleWithFixedDelay 办法中设置执行周期时,传递的 delay 值为正数,如下所示。

ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay));

这里的正数示意的是绝对固定的提早。

在 ScheduledFutureTask 类中,存在一个 setNextRunTime 办法,这个办法会在 run 办法执行完工作后调用,这个办法更能体现 scheduleAtFixedRate 办法和 scheduleWithFixedDelay 办法的不同,setNextRunTime 办法的源码如下所示。

private void setNextRunTime() {
    // 间隔下次执行工作的时长
    long p = period;
    // 固定频率执行,// 上次执行工作的工夫
    // 加上工作的执行周期
    if (p > 0)
        time += p;
    // 绝对固定的提早
    // 应用的是零碎以后工夫
    // 加上工作的执行周期
    else
        time = triggerTime(-p);
}

在 setNextRunTime 办法中通过对下次执行工作的时长进行判断来确定是固定频率执行还是绝对固定的提早。

triggerTime 办法

在 ScheduledThreadPoolExecutor 类中提供了两个 triggerTime 办法,用于获取下一次执行工作的具体工夫。triggerTime 办法的源码如下所示。

private long triggerTime(long delay, TimeUnit unit) {return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}

long triggerTime(long delay) {return now() +
        ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

这两个 triggerTime 办法的代码比较简单,就是获取下一次执行工作的具体工夫。有一点须要留神的是:delay < (Long.MAX_VALUE >> 1 判断 delay 的值是否小于 Long.MAX_VALUE 的一半,如果小于 Long.MAX_VALUE 值的一半,则间接返回 delay,否则须要解决溢出的状况。

咱们看到在 triggerTime 办法中解决避免溢出的逻辑应用了 overflowFree 办法,接下来,咱们就看看 overflowFree 办法的实现。

overflowFree 办法

overflowFree 办法的源代码如下所示。

private long overflowFree(long delay) {
    // 获取队列中的节点
    Delayed head = (Delayed) super.getQueue().peek();
    // 获取的节点不为空,则进行后续解决
    if (head != null) {
        // 从队列节点中获取延迟时间
        long headDelay = head.getDelay(NANOSECONDS);
        // 如果从队列中获取的延迟时间小于 0,并且传递的 delay
        // 值减去从队列节点中获取延迟时间小于 0
        if (headDelay < 0 && (delay - headDelay < 0))
            // 将 delay 的值设置为 Long.MAX_VALUE + headDelay
            delay = Long.MAX_VALUE + headDelay;
    }
    // 返回延迟时间
    return delay;
}

通过对 overflowFree 办法的源码剖析,能够看出 overflowFree 办法实质上就是为了限度队列中的所有节点的延迟时间在 Long.MAX_VALUE 值之内,避免在 ScheduledFutureTask 类中的 compareTo 办法中溢出。

ScheduledFutureTask 类中的 compareTo 办法的源码如下所示。

public int compareTo(Delayed other) {if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

compareTo 办法的次要作用就是对各提早工作进行排序,间隔下次执行工夫靠前的工作就排在后面。

delayedExecute 办法

delayedExecute 办法是 ScheduledThreadPoolExecutor 类中提早执行工作的办法,源代码如下所示。

private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 如果以后线程池曾经敞开
    // 则执行线程池的回绝策略
    if (isShutdown())
        reject(task);
    // 线程池没有敞开
    else {
        // 将工作增加到阻塞队列中
        super.getQueue().add(task);
        // 如果以后线程池是 SHUTDOWN 状态
        // 并且以后线程池状态下不能执行工作
        // 并且胜利从阻塞队列中移除工作
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            // 勾销工作的执行,但不会中断执行中的工作
            task.cancel(false);
        else
            // 调用 ThreadPoolExecutor 类中的 ensurePrestart() 办法
            ensurePrestart();}
}

能够看到在 delayedExecute 办法外部调用了 canRunInCurrentRunState 办法,canRunInCurrentRunState 办法的源码实现如下所示。

boolean canRunInCurrentRunState(boolean periodic) {return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown);
}

能够看到 canRunInCurrentRunState 办法的逻辑比较简单,就是判断线程池以后状态下可能执行工作。

另外,在 delayedExecute 办法外部还调用了 ThreadPoolExecutor 类中的 ensurePrestart() 办法,接下来,咱们看下 ThreadPoolExecutor 类中的 ensurePrestart() 办法的实现,如下所示。

void ensurePrestart() {int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

在 ThreadPoolExecutor 类中的 ensurePrestart() 办法中,首先获取以后线程池中线程的数量,如果线程数量小于 corePoolSize 则调用 addWorker 办法传递 null 和 true,如果线程数量为 0,则调用 addWorker 办法传递 null 和 false。

对于 addWork() 办法的源码解析,大家能够参考【高并发专题】中的《高并发之——通过 ThreadPoolExecutor 类的源码深度解析线程池执行工作的外围流程》一文,这里,不再赘述。

reExecutePeriodic 办法

reExecutePeriodic 办法的源代码如下所示。

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    // 线程池以后状态下可能执行工作
    if (canRunInCurrentRunState(true)) {
        // 将工作放入队列
        super.getQueue().add(task);
        // 线程池以后状态下不能执行工作,并且胜利移除工作
        if (!canRunInCurrentRunState(true) && remove(task))
            // 勾销工作
            task.cancel(false);
        else
            // 调用 ThreadPoolExecutor 类的 ensurePrestart() 办法
            ensurePrestart();}
}

总体来说 reExecutePeriodic 办法的逻辑比较简单,然而,这里须要留神和 delayedExecute 办法的不同点:调用 reExecutePeriodic 办法的时候曾经执行过一次工作,所以,并不会触发线程池的回绝策略;传入 reExecutePeriodic 办法的工作肯定是周期性的工作。

onShutdown 办法

onShutdown 办法是 ThreadPoolExecutor 类中的钩子函数,它是在 ThreadPoolExecutor 类中的 shutdown 办法中调用的,而在 ThreadPoolExecutor 类中的 onShutdown 办法是一个空办法,如下所示。

void onShutdown() {}

ThreadPoolExecutor 类中的 onShutdown 办法交由子类实现,所以 ScheduledThreadPoolExecutor 类覆写了 onShutdown 办法,实现了具体的逻辑,ScheduledThreadPoolExecutor 类中的 onShutdown 办法的源码实现如下所示。

@Override
void onShutdown() {
    // 获取队列
    BlockingQueue<Runnable> q = super.getQueue();
    // 在线程池曾经调用 shutdown 办法后,是否继续执行现有提早工作
    boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
    // 在线程池曾经调用 shutdown 办法后,是否继续执行现有定时工作
    boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
    // 在线程池曾经调用 shutdown 办法后,不继续执行现有提早工作和定时工作
    if (!keepDelayed && !keepPeriodic) {
        // 遍历队列中的所有工作
        for (Object e : q.toArray())
            // 勾销工作的执行
            if (e instanceof RunnableScheduledFuture<?>)
                ((RunnableScheduledFuture<?>) e).cancel(false);
        // 清空队列
        q.clear();}
    // 在线程池曾经调用 shutdown 办法后,继续执行现有提早工作和定时工作
    else {
        // 遍历队列中的所有工作
        for (Object e : q.toArray()) {
            // 当前任务是 RunnableScheduledFuture 类型
            if (e instanceof RunnableScheduledFuture) {
                // 将工作强转为 RunnableScheduledFuture 类型
                RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
                // 在线程池调用 shutdown 办法后不持续的提早工作或周期工作
                // 则从队列中删除并勾销工作
                if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
                    t.isCancelled()) {if (q.remove(t))
                        t.cancel(false);
                }
            }
        }
    }
    // 最终调用 tryTerminate() 办法
    tryTerminate();}

ScheduledThreadPoolExecutor 类中的 onShutdown 办法的次要逻辑就是先判断线程池调用 shutdown 办法后,是否继续执行现有的提早工作和定时工作,如果不再执行,则勾销工作并清空队列;如果继续执行,将队列中的工作强转为 RunnableScheduledFuture 对象之后,从队列中删除并勾销工作。大家须要好好了解这两种解决形式。最初调用 ThreadPoolExecutor 类的 tryTerminate 办法。无关 ThreadPoolExecutor 类的 tryTerminate 办法的源码解析,大家能够参考【高并发专题】中的《高并发之——通过源码深度剖析线程池中 Worker 线程的执行流程》一文,这里不再赘述。

至此,ScheduledThreadPoolExecutor 类中的外围办法的源代码,咱们就剖析完了。

正文完
 0