一、前言
最近在浏览Redisson的源码时看到了外部应用了netty提供的这个组件,就想着看下这个定时器具体是如何实现的?
先介绍一下HashedWheelTimer,它是基于工夫轮实现
的一个定时器,它的长处是实现绝对简略,毛病是无奈准确、准时地执行定时工作,只能是近似执行
。
因为工夫轮中每个刻度大小可能是100ms也可能1ms,所以在执行工作时,工夫上会存在一点误差,在大部分网络应用中,IO工作的执行工夫往往不须要那么准确,因而默认每个刻度小大是100ms,但你能够本人来调整刻度大小,最小是1ms。
简略介绍完HahsedWheelTimer,接下来咱们先来看下工夫轮的构造
二、工夫轮的构造
工夫轮相似于一个时钟,它和时钟一样是有刻度的,每个刻度大小能够是100ms也能够是1ms,如下图
上图的工夫轮有6个刻度,每个刻度大小是100ms,也就是每过100ms会顺时针挪动一个刻度,走完一圈须要600ms(上面要介绍的HashedWheelTimer默认刻度数是512,每个刻度大小默认是100ms)
工作原理:每往工夫轮提交一个延时工作,会判断该工作的执行工夫要放在哪个刻度上
,比方在工夫轮启动后的第100ms,提交了一个延时400ms执行的工作,那么该工作应该放在刻度5上,如果提交了一个提早700ms执行的工作,那么该工作会放在刻度2上,并且会记录该工作还须要走一圈工夫轮能力执行。工夫轮每挪动一个刻度,就会执行以后刻度上的工作,一个刻度上的工作可能会有多个。
因为HashedWheelTimer是基于工夫轮的定时器,所以接下来看一下HashedWheelTimer是如何实现的?
三、HashedWheelTimer的相干组件
这里咱们能够先看下HashedWheelTimer的UML图,可能对相干组件先有个整体的意识,如下
- Timer: 定时器接口,提供
提交延时工作newTimeout、进行定时器
等办法 - HashedWheelTimer: 实现Timer接口,外部蕴含
工作线程Worker、工夫轮wheel、延时工作队列timeouts、线程池taskExecutor等
- HashedWheelBucket:下面的工夫轮wheel是一个HashedWheelBucket数组,
每一个刻度对应一个HashedWheelBucket,而每一个HashedWheelBucket外部是一个HashedWheelTimeout的双向链表
,如下图 - TimerTask: 延时工作接口,外部只提供一个run办法用于执行
- Timeout: 对Timer、TimerTask的封装
- HashedWheelTimeout: 蕴含了
工作的执行工夫dealline、所须要的圈数remainingRounds、双向链表中上一个以及下一个HashedWheelTimeout、所在的HashedWheelBucket等
四、HashedWheelTimer的工作流程
大抵工作流程如下图:
从上图能够看到,次要分为4步骤,然而精确来说应该是有5步:
- 提交延时工作给HashedWheelTimer,延时工作会先放到工作队列timeouts中
- 工作线程Worker会从工作队列timeouts中获取工作
- 将获取到的HashedWheelTimeout工作放到指定的HashedWheelBucket中
- 取出以后刻度对应的HashedWheelBucket的所有HashedWheelTimeout来执行
- 将刻度tick加1,再回到第二步,如此循环
五、源码解读
5.1 HahedWheelTimer的要害属性
要害属性如下:
- Worker worker:工作线程Worker
- int workerState:工作线程状态
- long tickDuration:刻度大小,
默认是100ms
- HashedWheelBucket[] wheel:
工夫轮的每个刻度会对应一个HashedWheelBucket
- Queue<HashedWheelTimeout> timeouts:
工作队列
- Queue<HashedWheelTimeout> cancelledTimeouts:已勾销的工作队列
- AtomicLong pendingTimeouts:正在解决的工作数
- Executor taskExecutor:线程池,用于执行工作
- long startTime:
定时器的启动工夫
5.2 提交延时工作给HahedWheelTimer
通过newTimeout办法来提交延时工作,newTimeout办法步骤如下:
- 启动工作线程Worker,如果是首次启动,设置启动工夫startTime,如果已启动,则跳过
- 计算延时工作的deadline(
以后工夫 + 延迟时间 - 启动工夫startTime
),用于判断后续放到工夫轮的哪个HashedWheelBucket中 - 将延时工作封装为HashedWheelTimeout,并
增加到工作队列timeouts
中
联合源码来看:
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
// 局部代码省略
// 启动工作线程Worker
start();
// 计算deadline
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
// 封装为HashedWheelTimeout,并增加到工作队列timeouts中
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
5.3 工作线程Worker运行的具体步骤
Worker类中有一个要害的属性tick
,代表绝对于定时器的启动工夫startTime,以后曾经走到了哪个刻度
,tick只会始终往上递增,初始值为0
具体步骤如下:
- 等到下一个刻度降临,即以后工夫 > 以后刻度tick的完结工夫
a)计算以后刻度tick的完结工夫
,比方Worker刚启动,以后刻度tick为0,那么刻度tick的完结工夫 = tickDuration * (tick + 1),即100ms
b) 判断以后工夫(绝对于启动工夫startTime
)是否大于以后刻度的完结工夫,如果大于,阐明以后工夫曾经过了以后刻度的完结工夫,开始筹备解决以后刻度的所有工作。如果小于,阐明以后工夫还没到以后刻度的完结工夫,被动sleep一段时间后持续判断,直到以后工夫大于以后刻度的完结工夫。 - 从工作队列timeouts中获取工作,将延时工作的deadline除以tickDuration,计算出该工作的
总刻度数以及还须要的圈数
,通过 总刻度数 & (wheel.length -1 )来算出放在哪个HashedWheelBucket中(比方算出A工作的总刻度数 = 1026,以后刻度 = 25,工夫轮的刻度有512个,那么算出还须要的圈数是1【如果以后刻度 = 1,那么还须要的圈数会是2】,放在下标为2的HashedWheelBucket中
) - 获取以后刻度对应的HashedWheelBucket,从head开始逐个遍历工作链表,如果延时工作的所需圈数为0,开始执行工作,否则所需圈数减1。
刻度tick加1
,回到第一步,如此循环
联合源码来看
public void run() {
// 初始化定时器的启动工夫startTime
startTime = System.nanoTime();
startTimeInitialized.countDown();
do {
// 1、等到下一个刻度降临,即以后工夫 > 以后刻度tick的完结工夫
final long deadline = waitForNextTick();
if (deadline > 0) {
// 获取以后刻度tick对应的HashedWheelBucket
int idx = (int) (tick & mask);
processCancelledTasks();
HashedWheelBucket bucket = wheel[idx];
// 2、从工作队列timeouts中获取工作,并将工作放入到对应的HashedWheelBucket中
transferTimeoutsToBuckets();
// 3、执行以后刻度tick对应的HashedWheelBucket中的所有工作
bucket.expireTimeouts(deadline);
// 4、将以后刻度tick加1
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// 局部代码省略
}
这里咱们先来看下第一步waitForNextTick办法的具体实现
private long waitForNextTick() {
long deadline = tickDuration * (tick + 1);
for (;;) {
// 绝对于startTime的以后工夫
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
// 如果以后刻度tick的完结工夫 < 以后工夫,阐明以后工夫曾经过了以后刻度的完结工夫,间接返回以后工夫
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
// 否则被动sleep一段时间,下面的条件成立
try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
接着看下第二步的transferTimeoutsToBuckets办法,如下
private void transferTimeoutsToBuckets() {
// 这里一次最多从队列里获取100000个工作
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// 代表队列里曾经没有工作,间接返回
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
continue;
}
// 计算总刻度数 = 延时工作的deadline / 刻度大小
long calculated = timeout.deadline / tickDuration;
// 计算还须要的圈数 = 总刻度数 - 以后刻度 / 工夫轮的刻度数
timeout.remainingRounds = (calculated - tick) / wheel.length;
final long ticks = Math.max(calculated, tick);
// 计算放在哪个下标
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
// 将该工作放入到对应的HashedWheelBucket中
bucket.addTimeout(timeout);
}
}
最初看下第三步bucket.expireTimeouts,源码如下:
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// 解决该HashedWheelBucket的所有工作
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
if (timeout.remainingRounds <= 0) {
// 将工作从双向链表中移除
next = remove(timeout);
if (timeout.deadline <= deadline) {
// 执行工作
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
next = remove(timeout);
} else {
// 如果所需圈数 > 0,则将其减1
timeout.remainingRounds --;
}
timeout = next;
}
}
至此,工作线程Worker运行的具体步骤以及局部源码的解读就实现了
六、总结
HashedWheelTimer只是定时器的一种简略实现,像java中常见的定时器还有Timer、ScheduledThreadPoolExecutor等,从下面剖析它的实现原理可知,它无奈利用于须要准确执行的场景,然而在网络应用中,IO工作的执行工夫往往不须要准确,所以它能够在工作较多、但工作不须要准确执行
的场景下进行应用。
发表回复