共计 2430 个字符,预计需要花费 7 分钟才能阅读完成。
ThreadPoolTaskScheduler 外围就是 schedule 办法
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {ScheduledExecutorService executor = getScheduledExecutor();
try {
ErrorHandler errorHandler = this.errorHandler;
if (errorHandler == null) {errorHandler = TaskUtils.getDefaultErrorHandler(true);
}
// 最终调用 ReschedulingRunnable.schedule 办法
return new ReschedulingRunnable(task, trigger, executor, errorHandler).schedule();}
catch (RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" + executor + "] did not accept task:" + task, ex);
}
}
后续进入 ReschedulingRunnable.schedule 办法,该类中 executor 属性为 ScheduledThreadPoolExecutor 类,属性为 ScheduledThreadPoolExecutor 类继承了 ThreadPoolExecutor 线程池,然而自定义了 DelayedWorkQueue 提早队列,而不是应用 ThreadPoolExecutor 类自带的队列,周期工作提早执行的根本原因就是 DelayedWorkQueue 这个提早队列。
public ScheduledFuture<?> schedule() {synchronized (this.triggerContextMonitor) {this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
if (this.scheduledExecutionTime == null) {return null;}
long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
return this;
}
}
DelayedWorkQueue 类通过一个最小堆来存储 ThreadPoolTaskScheduler 中的工作,各工作会进行比拟,最快要执行的工作放在最小堆顶部。放入最小堆,通过 siftUp,取出最小堆,通过 siftDown。
/**
* 上浮
*/
private void siftUp(int k, RunnableScheduledFuture<?> key) {
// 始终遍历到根节点下方
while (k > 0) {
// 二叉堆,最高节点坐标为 0
// 父节点,(k - 1)/2
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
// 指标比父节点大
// 不须要再上浮,间接跳出
if (key.compareTo(e) >= 0)
break;
// 指标节点比父节点小
// 持续上浮,以后坐标填入父节点
queue[k] = e;
setIndex(e, k);
// 以后坐标设为父节点坐标
k = parent;
}
// 指标上浮到最小节点坐标,填入该坐标
queue[k] = key;
setIndex(key, k);
}
/**
* 下沉
*/
private void siftDown(int k, RunnableScheduledFuture<?> key) {
// half = size/2;
// 二叉堆,最高节点坐标为 0
// 任何节点,其左子节点坐标 (k*2)+1,右子节点坐标 (k*2)+2
int half = size >>> 1;
// 须要拿到子节点,所以只须要到 size/2 即可,不须要到最底层
while (k < half) {
// 左节点坐标
int child = (k << 1) + 1;
RunnableScheduledFuture<?> c = queue[child];
// 右节点坐标
int right = child + 1;
// 左节点比右节点大
if (right < size && c.compareTo(queue[right]) > 0)
// 最小 = 右子节点
c = queue[child = right];
// 指标最小比子节点小
// 不再须要下沉,间接退出,指标填入以后坐标
if (key.compareTo(c) <= 0)
break;
// 指标比子节点大
// 持续下沉,小子节点放到以后节点坐标
queue[k] = c;
setIndex(c, k);
// 以后坐标设为子节点坐标
// 坐标一直下沉
k = child;
}
// 以后坐标为指标对象最小状况下的坐标
// 讲指标对象放入该坐标
queue[k] = key;
setIndex(key, k);
}
当从 DelayedWorkQueue 队列中取出工作时,会取出最小堆顶部的工作,也就是最快要执行的工作,而后线程期待指定工夫。等待时间完结后,通过自旋实现工作。
public RunnableScheduledFuture<?> take() throws InterruptedException {
...
for (;;) {RunnableScheduledFuture<?> first = queue[0];
...
if (delay <= 0)
return finishPoll(first);
...
available.awaitNanos(delay);
...
}
...
}
正文完