乐趣区

定时器线程池ScheduledThreadPoolExecutor

前言

定时器线程池提供了定时执行任务的能力,即可以延迟执行,可以周期性执行。但定时器线程池也还是线程池,最底层实现还是 ThreadPoolExecutor,可以参考我的另外一篇文章多线程–精通 ThreadPoolExecutor。

特点说明

1. 构造函数

 public ScheduledThreadPoolExecutor(int corePoolSize) {
 // 对于其他几个参数在 ThreadPoolExecutor 中都已经详细分析过了,所以这里, 将不再展开
 // 这里我们可以看到调用基类中的方法时有个特殊的入参 DelayedWorkQueue。// 同时我们也可以发现这里并没有设置延迟时间、周期等参数入口。// 所以定时执行的实现必然在 DelayedWorkQueue 这个对象中了。super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

2.DelayedWorkQueue

DelayedWorkQueue 是在 ScheduledThreadPoolExecutor 的一个内部类,实现了 BlockingQueue 接口
里面存放任务队列的数组如下:

private RunnableScheduledFuture<?>[] queue =
            new RunnableScheduledFuture<?>[INITIAL_CAPACITY];

我们分析过 ThreadPoolExecutor,它从任务队列中获取任务的方式为 poll 和 take 两种,所以看一下 poll 和 take 两个方法的源码,回顾一下,ThreadPoolExecutor 它会调用 poll 或 take 方法,先 poll,再 take,只要其中一个接口有返回就行

public RunnableScheduledFuture<?> poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {RunnableScheduledFuture<?> first = queue[0];
                // 这里有个 getDelay,这是关键点,获取执行延时时间
                // 但是如果我们有延时设置的话,这就返回空了,然后就会调用 take 方法
                if (first == null || first.getDelay(NANOSECONDS) > 0)
                    return null;
                else
                    return finishPoll(first);
            } finally {lock.unlock();
            }
        }

public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {for (;;) {RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                    // 获取延时时间
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                            // 使用锁,执行延时等待。// 使用锁,执行延时等待。// 使用锁,执行延时等待。available.awaitNanos(delay);
                            } finally {if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();}
        }

3.RunnableScheduledFuture

在 ScheduledThreadPoolExecutor 内部有一个 ScheduledFutureTask 类实现了 RunnableScheduledFuture,ScheduledFutureTask 这个类采用了装饰者设计模式,在执行 Runnable 的方法基础上还执行了一些额外的功能。
我们需要特别注意几个参数 period、time。
(1)time
首先看一下 time 的作用,可以发现 time 是用于获取执行延时时间的,也就是 delay 是根据 time 生成的

public long getDelay(TimeUnit unit) {return unit.convert(time - now(), NANOSECONDS);
        }

(2)period
这个参数不是说设置执行几个周期,而是用于判断是否需要按周期执行,以及执行周期,也就是本次执行与下次执行间隔的时间

// 判断是否需要按周期执行,如果周期设置成 0,不是无间隔执行,而是只执行一次,这个需要特别注意
  public boolean isPeriodic() {return period != 0;}
 private void setNextRunTime() {
            long p = period;
            if (p > 0)
            // 这里将周期加给 time,这样获取的延迟时间就是周期时间了。time += p;
            else
                time = triggerTime(-p);
        }

(3)执行

 public void run() {
            // 先判断是否为周期性的任务
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
            // 如果不是周期性的,就执行调用父类的 run 方法,也就是构造函数中传入的 Runnable 对象的 run 方法。ScheduledFutureTask.super.run();
                // 在 if 的括号中先执行了任务
            else if (ScheduledFutureTask.super.runAndReset()) {
            // 如果是周期性的,就需要设置下次执行的时间,然后利用 reExecutePeriodic 方法,将任务再次丢入任务队列中。// 这里尤其需要注意的是 if 中的逻辑执行失败,如果没有捕捉异常,那么后面的逻辑就不会再执行了,也就是说中间有一次执行失败,后面这个周期性的任务就失效了。setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }

总结

ScheduledThreadPoolExecutor 通过 time 参数,设置当前任务执行的等待时间,再通过 period 设置任务下次执行需要等待的时间。这两个参数都不是设置在线程池中的,而是携带在任务中的,这就可以把线程池和任务进行完全解耦。
注意点:
(1)任务的执行等待时间是在队列的 take 方法中的。
(2)period 参数设置成 0,任务将只会执行一次,而不会执行多次
(3)如果要自己实现周期性 Task,周期性任务在执行过程中,一定要注意捕捉异常,否则某一次执行失败,将导致后续的任务周期失效,任务将不再继续执行。

退出移动版