ThreadPoolTaskScheduler 外围就是schedule 办法
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) { ScheduledExecutorService executor = getScheduledExecutor(); try { ErrorHandler errorHandler = this.errorHandler; if (errorHandler == null) { errorHandler = TaskUtils.getDefaultErrorHandler(true); } // 最终调用ReschedulingRunnable.schedule 办法 return new ReschedulingRunnable(task, trigger, executor, errorHandler).schedule(); } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); }}
后续进入ReschedulingRunnable.schedule 办法,该类中executor 属性为ScheduledThreadPoolExecutor 类,属性为ScheduledThreadPoolExecutor 类继承了ThreadPoolExecutor 线程池,然而自定义了DelayedWorkQueue 提早队列,而不是应用ThreadPoolExecutor 类自带的队列,周期工作提早执行的根本原因就是DelayedWorkQueue 这个提早队列。
public ScheduledFuture<?> schedule() { synchronized (this.triggerContextMonitor) { this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext); if (this.scheduledExecutionTime == null) { return null; } long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis(); this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS); return this; }}
DelayedWorkQueue 类通过一个最小堆来存储ThreadPoolTaskScheduler 中的工作,各工作会进行比拟,最快要执行的工作放在最小堆顶部。放入最小堆,通过siftUp,取出最小堆,通过siftDown。
/** * 上浮 */private void siftUp(int k, RunnableScheduledFuture<?> key) { // 始终遍历到根节点下方 while (k > 0) { // 二叉堆,最高节点坐标为0 // 父节点,(k - 1)/2 int parent = (k - 1) >>> 1; RunnableScheduledFuture<?> e = queue[parent]; // 指标比父节点大 // 不须要再上浮,间接跳出 if (key.compareTo(e) >= 0) break; // 指标节点比父节点小 // 持续上浮,以后坐标填入父节点 queue[k] = e; setIndex(e, k); // 以后坐标设为父节点坐标 k = parent; } // 指标上浮到最小节点坐标,填入该坐标 queue[k] = key; setIndex(key, k);}/** * 下沉 */private void siftDown(int k, RunnableScheduledFuture<?> key) { // half = size/2; // 二叉堆,最高节点坐标为0 // 任何节点,其左子节点坐标(k*2)+1,右子节点坐标(k*2)+2 int half = size >>> 1; // 须要拿到子节点,所以只须要到size/2 即可,不须要到最底层 while (k < half) { // 左节点坐标 int child = (k << 1) + 1; RunnableScheduledFuture<?> c = queue[child]; // 右节点坐标 int right = child + 1; // 左节点比右节点大 if (right < size && c.compareTo(queue[right]) > 0) // 最小 = 右子节点 c = queue[child = right]; // 指标最小比子节点小 // 不再须要下沉,间接退出,指标填入以后坐标 if (key.compareTo(c) <= 0) break; // 指标比子节点大 // 持续下沉,小子节点放到以后节点坐标 queue[k] = c; setIndex(c, k); // 以后坐标设为子节点坐标 // 坐标一直下沉 k = child; } // 以后坐标为指标对象最小状况下的坐标 // 讲指标对象放入该坐标 queue[k] = key; setIndex(key, k);}
当从DelayedWorkQueue 队列中取出工作时,会取出最小堆顶部的工作,也就是最快要执行的工作,而后线程期待指定工夫。等待时间完结后,通过自旋实现工作。
public RunnableScheduledFuture<?> take() throws InterruptedException { ... for (;;) { RunnableScheduledFuture<?> first = queue[0]; ... if (delay <= 0) return finishPoll(first); ... available.awaitNanos(delay); ... } ...}