关于java:周期性任务线程池-ScheduledThreadPoolExecutor-DelayedWorkQueue

36次阅读

共计 7467 个字符,预计需要花费 19 分钟才能阅读完成。

ScheduledThreadPoolExecutor 是 ThreadPoolExecutor 的扩大类,用来实现提早执行的工作、或者周期性执行的工作。

一般来讲,周期性工作或者定时工作蕴含两大组件:一个是执行工作的线程池,一个是存储工作的存储器。还记得 Quartz 吗?企业级定时工作框架,最重要的内容其实也是这两局部:SimpleThreadPool 和 JobStore。

ScheduledThreadPoolExecutor 也不例外,由线程池和工作队列组成。线程池继承自 ThreadPoolExecutor,工作队列 DelayedWorkQueue,理解了 ThreadPoolExecutor 和 DelayedWorkQueue,也就根本理解了 ScheduledThreadPoolExecutor。

此外,ScheduledThreadPoolExecutor 的非凡之处还在于他所执行的工作必须是 ScheduledFutureTask,ScheduledFutureTask 是“将来要执行的工作”,“将来”由 delay 指定。即便是通过 ScheduledThreadPoolExecutor 提交“立刻”而不是“将来”要执行的工作,也要通过指定 delay 时长为 0 的 ScheduledFutureTask 来提交。ScheduledFutureTask 工作提交之后退出阻塞队列 DelayedWorkQueue 期待调度。

ScheduledThreadPoolExecutor 的创立

提供了四个构造方法,都是通过调用父类 ThreadPoolExecutor 的构造方法实现 ScheduledThreadPoolExecutor 对象的创立:

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }
public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), handler);
    }
public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }

corePoolSize 通过构造方法的参数指定,maximumPoolSize 在构造方法中都固定设置为 Integer.MAX_VALUE,也就是不受限制。

keepAliveTime 设置为 0,前面咱们会晓得其实 ScheduledThreadPoolExecutor 的线程数不会超过 corePoolSize,而且如果 allowCoreThreadTimeOut 放弃默认的话(false),那其实这个 keepAliveTime 是没有意义的。

四个构造方法均设置阻塞队列为 new DelayedWorkQueue(),即仅反对 DelayedWorkQueue。

ScheduledThreadPoolExecutor 的线程池治理

ScheduledThreadPoolExecutor 是 ThreadPoolExecutor 的扩大,线程池治理局部没有做扩大,保留了 ThreadPoolExecutor 的原有性能。

每次工作退出队列后会调用 ensurePrestart(ThreadPoolExecutor 实现)办法创立并启动一个线程,ThreadPoolExecutor 的 ensurePrestart 办法:

 void ensurePrestart() {int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

如果线程数小于 corePoolSize 则调用 addWorker(null, true) 创立外围线程线程,否则如果线程数为 0(设置 corePoolSize= 0 则可能走到这个分支)创立非核心线程。

corePoolSize 大于 0 的状况下,ScheduledThreadPoolExecutor 启动的线程数不会大于外围线程数。而且每一个线程创立的时候都不会有 firstTask,线程总是从阻塞队列里获取工作执行。

提交工作

ScheduledThreadPoolExecutor 提供 execute(Executor 接口的工作提交办法)、submit、schedule、scheduleAtFixedRate、scheduleWithFixedDelay 等办法提交工作。

尽管说 ScheduledThreadPoolExecutor 只承受 ScheduledFutureTask,但这并不是说应用层只能提交给他 ScheduledFutureTask 的工作,利用通过以上各办法提交工作的时候的 Task 是非常灵活的:能够是 Callable,也能够是 Runnable,ScheduledThreadPoolExecutor 外部再把它们包装为 ScheduledFutureTask — 对应用层来说是通明的。

提供了这么多提交工作的办法,无非是为了反对应用层以更加灵便的形式提交工作,其实底层执行逻辑大同小异。

咱们就以 scheduleAtFixedRate 为例来剖析工作提交过程:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

scheduleAtFixedRate 办法的目标是提交一个在提交后延时(参数 initialDelay 指定)启动、以固定工夫周期(参数 period 指定)反复执行的工作。

次要执行了以下动作:

  1. 首先创立了 ScheduledFutureTask 工作
  2. 之后将它包装成 RunnableScheduledFuture,其实 decorateTask 办法间接返回了创立好的 ScheduledFutureTask,没什么好剖析的
  3. 而后调用 delayedExecute 启动线程执行工作

ScheduledFutureTask 工作

这个也是 ScheduledThreadPoolExecutor 的重头戏!

ScheduledFutureTask 继承自 FutureTask,并实现了 RunnableScheduledFuture 接口,所以他是一个复合体:

  1. 能够异步执行带有返回的工作(Future 接口)
  2. 能够执行周期性工作(RunnableScheduledFuture 接口)

先意识几个重要属性:
time: 工作应该被执行的工夫(纳秒)
period:周期执行工作的间隔时间(纳秒),负数示意 fix-rate, 正数表述 fix-delay(fixed-rate 和 fixed-delay 后面 jdk timer 的文章讲过,含意一样)
outerTask:指向本人的 RunnableScheduledFuture 对象

compareTo 办法:进入 DelayedWorkQueue 队列时须要调用 CompareTo 办法比拟大小,以便把最近执行的工作放在堆头。compareTo 办法最终比拟的其实就是 time 属性。

run 办法:

public void run() {boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }

如果是一次性工作则间接调用 FutureTask 的 run 办法执行工作,否则,如果是周期性工作,首先调用 FutureTask 的 runAndReset 办法,调用胜利的话,设置下次执行工夫,而后通过调用 reExecutePeriodic 将当前任务再次退出队列。

runAndReset 办法首先执行当前任务,执行实现后从新设置当前任务状态为 NEW,筹备下次执行。

通过剖析 runAndReset 办法能够晓得,周期性工作执行后不再可能获取到返回(回顾一下 FutureTask 的代码逻辑,状态设置为 NEW 之后,就不再可能获取到返回了)。

delayedExecute 启动线程

办法代码很简略:

private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())
            reject(task);
        else {super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                ensurePrestart();}
    }

工作退出队列,之后调用 ensurePrestart 办法启动线程!

工作间接进入队列,之后启动线程,把工作的执行齐全交给 ThreadPoolExecutor 的工作执行线程 Worker 去调度了:Worker 线程调用 getTask 办法从队列获取并执行工作!

咱们当初能够大胆猜想一下了:周期性工作是有严格的执行工夫要求的,没到执行工夫的工作是不能执行的,因为 ThreadPoolExecutor 的工作执行线程的逻辑中并没有执行工夫的判断,那么,这个逻辑应该是在 getTask 办法向队列获取工作的时候、由队列的出队办法实现的。

当初轮到阻塞队列 DelayedWorkQueue 出场了,咱们带着这个疑难来钻研一下 DelayedWorkQueue 队列。

阻塞队列 DelayedWorkQueue

DelayedWorkQueue 底层是以数组实现的堆构造。堆构造是一个齐全二叉树,能够确保每一个节点都比他的叶子节点大(或者小),这样的话堆头节点(也就是数组的第一个元素)就肯定是最大(或最小的)。

DelayedWorkQueue 是存储 ScheduledFutureTask 的队列,最近执行的工作须要寄存在堆头,每一个节点都应该比他的叶子节点小。

每次工作退出队列、节点出队、删除节点等操作都须要依照工作执行工夫 time 从新调整队列。

初始化容量为 16,新节点退出队列时如果队列容量不够则扩容原来容量的 50%(新容量 = 1.5 * 旧容量)。

新节点入队的时候调用 siftUp 办法从新调整队列,以便新节点退出到堆的适合地位。

private void siftUp(int k, RunnableScheduledFuture<?> key) {while (k > 0) {int parent = (k - 1) >>> 1;
                RunnableScheduledFuture<?> e = queue[parent];
                if (key.compareTo(e) >= 0)
                    break;
                queue[k] = e;
                setIndex(e, k);
                k = parent;
            }
            queue[k] = key;
            setIndex(key, k);
        }

siftUp 办法基于齐全二叉树的一个个性:齐全二叉树的第 k 个节点的父节点在数组中的地位为:(k-1)/ 2 取整。

节点退出队列的时候默认退出到尾部 k(以后数组的 size),获取到 k 的父节点、比拟以后节点和父节点,如果以后节点大于父节点(调用了 ScheduledFutureTask 的 compareTo 办法,比拟的是 time),阐明以后节点找到了正确的地位,否则以后节点与父节点替换地位,持续寻找父节点比拟、直到找到根节点。

这样,新退出的节点通过 siftUp 操作之后会依据工作触发工夫 time 进入到队列的适合地位。

节点须要从队列 remove 的时候也须要执行相似的操作(调用 siftUp 或 siftDown 办法)确保堆的正确程序。

这样一来,DelayedWorkQueue 队列能够始终保障堆头(也就是数组的第一个元素)就是最近须要执行的工作。工作执行线程在获取最近须要被执行工作的时候,不须要遍历整个队列、只须要获取堆头第一个节点执行即可。

堆构造非常适合周期性工作或定时工作这一利用场景:节点退出工作时的时效性要求不高(因为是须要延时执行的工作嘛,时效性要求必定就不高了),获取数据的时效性要求高(到了工作的执行工夫了,最好当然是能即时获取到、立刻执行),可能十分无效的进步工作执行效率。

理解了堆构造的个性,晓得了堆构造入队出队的排序逻辑,接下来还须要去验证咱们的猜想: 出队逻辑会判断以后是否曾经到了节点的执行工夫。

因为 ThreadPoolExecutor 的 getTask() 办法调用队列的 take 办法获取工作,所以,间接看 DelayedWorkQueue 的 take() 办法就能够了,

public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {for (;;) {RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else {long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {available.awaitNanos(delay);
                            } finally {if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();}
        }

逻辑很清晰:

  1. 队列上锁
  2. 获取堆头的工作,如果为空(空队列,没有工作),期待(留神期待是会开释锁资源的、期待被唤醒之后从新获取锁资源,无关 ReentrantLock 咱们前面会专门做详细分析)
  3. 否则, 判断堆头工作曾经到执行工夫了,堆头工作出队列并返回堆头工作。
  4. 否则,堆头工作执行工夫未到,采纳 Leader-Follower 模式期待

下面第 3 点验证了咱们的猜想!

Leader-Follower 模式是指线程池中多个线程在期待执行工作的时候,线程会竞争 Leader,只有一个线程会在竞争中获胜成为 Leader,其余线程就都是 Follower。Leader 获权仅期待指定工夫(以后距下次工作执行的时间差)、Follower 线程则须要无限期期待(被勾销或者被其余线程唤醒)。期待过程中如果有新的节点退出队列并成为堆头的话(新退出的工作变成了最近要被执行的工作),此时须要设置 leader 为空并唤醒期待线程从新竞争 leader。Leader-Follower 模式能够无效防止所有期待线程都进入无限期、被动期待其余线程唤醒的期待模式、在期待时长达到后被动唤醒执行工作。

小结

周期性工作线程池 ScheduledThreadPoolExecutor 扒完了,简略总结下:

  1. 周期性线程池能够解决立刻执行的工作、提早执行的一次性工作、提早执行的周期性工作(FixedRate 和 FixedDelay 两种模式)
  2. 创立的时候指定外围线程数量,线程池最终启动的线程数量不会超过外围线程数量,每提交一个工作的同时启动一个线程、直到线程池数量达到外围线程数
  3. 不论是立刻执行的工作、还是提早执行的工作,工作提交后间接退出阻塞队列,期待线程从队列中获取并执行
  4. 阻塞队列采纳 DelayedWorkQueue,堆构造,最近执行的工作始终放在堆头
  5. 线程池中的线程采纳 Leader-Follower 模式竞争工作,能够认为竞争成为 Leader 的线程取得了堆头工作的优先执行权

Thanks a lot!

上一篇 线程池 – ThreadPoolExecutor 源码剖析

正文完
 0