乐趣区

HashedWheelTimer定时任务算法解析

1、原理
HashedWheelTimer 是采用一种定时轮的方式来管理和维护大量的 Timer 调度算法.Linux 内核中的定时器采用的就是这个方案。一个 HashedWheelTimer 是环形结构,类似一个时钟,分为很多槽,一个槽代表一个时间间隔,每个槽又对应一个类似 Map 结构的对象,使用双向链表存储定时任务,指针周期性的跳动,跳动到一个槽位,就执行该槽位的定时任务。环形结构可以根据超时时间的 hash 值 (这个 hash 值实际上就是 ticks & mask) 将 task 分布到不同的槽位中, 当 tick 到那个槽位时, 只需要遍历那个槽位的 task 即可知道哪些任务会超时(而使用线性结构, 你每次 tick 都需要遍历所有 task), 所以, 我们任务量大的时候, 相应的增加 wheel 的 ticksPerWheel 值, 可以减少 tick 时遍历任务的个数.
2、结构图

3、效率
3.1 优点

可以添加、删除、取消定时任务
能高效的处理大批定时任务

3.2 缺点

对内存要求较高,占用较高的内存
时间精度要求不高

4、结合源码分析
首先来看 HashedWheelTimer 的构造函数,HashedWheelTimer 有很多构造方法,但是最后都是调用一个:
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel,
long maxPendingTimeouts) {

if (threadFactory == null) {
throw new NullPointerException(“threadFactory”);
}
if (unit == null) {
throw new NullPointerException(“unit”);
}
if (tickDuration <= 0) {
throw new IllegalArgumentException(“tickDuration must be greater than 0: ” + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException(“ticksPerWheel must be greater than 0: ” + ticksPerWheel);
}

// Normalize ticksPerWheel to power of two and initialize the wheel.
// 构造时间轮的槽位数,槽位数只能是 2 的幂次方
wheel = createWheel(ticksPerWheel);
// 时间轮槽位数
mask = wheel.length – 1;

// Convert tickDuration to nanos.
// 初始化时间周期
this.tickDuration = unit.toNanos(tickDuration);

// Prevent overflow.
if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
“tickDuration: %d (expected: 0 < tickDuration in nanos < %d”,
tickDuration, Long.MAX_VALUE / wheel.length));
}
// 初始化轮询时间轮的线程,使用这个线程周期性的轮询时间轮
workerThread = threadFactory.newThread(worker);

this.maxPendingTimeouts = maxPendingTimeouts;

if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
时间轮实际就是一个 HashedWeelBucket 数组,上面这个构造方法就是在初始化这个数组,槽位数就是数组长度,tickDuration 是时间周期,workerThread 线程用来轮询数组;
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException(
“ticksPerWheel must be greater than 0: ” + ticksPerWheel);
}
if (ticksPerWheel > 1073741824) {
throw new IllegalArgumentException(
“ticksPerWheel may not be greater than 2^30: ” + ticksPerWheel);
}
// HashedWheelBucket 数组长度是 2 的幂次方,获取 <=ticksPerWheel 最大的 2 的幂次方
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i++) {
wheel[i] = new HashedWheelBucket();
}
return wheel;
}
初始化的 HashedWheelBucket 数组的长度必须是 2 的幂次方。HashedWheelTimer 初始化完了,记下来就是如何向时间轮里添加定时任务,其实很简单,只要调用 newTimeOut()方法即可
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException(“task”);
}
if (unit == null) {
throw new NullPointerException(“unit”);
}

long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException(“Number of pending timeouts (”
+ pendingTimeoutsCount + “) is greater than or equal to maximum allowed pending ”
+ “timeouts (” + maxPendingTimeouts + “)”);
}

// 开启时间轮轮询
start();

// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
long deadline = System.nanoTime() + unit.toNanos(delay) – startTime;

// Guard against overflow.
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
// 将定时任务封装成 HashedWheelTimeout
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
// 将定时任务存储到任务链表中
timeouts.add(timeout);
return timeout;
}
在 newTimeOut()方法中会去开启轮询时间轮的线程(即 workerThread),接下来在看如何轮询:
public void start() {
// 判断 HashedWheelTimer 状态,如果状态开启,则开启轮询线程
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException(“cannot be started once stopped”);
default:
throw new Error(“Invalid WorkerState”);
}

// Wait until the startTime is initialized by the worker.
while (startTime == 0) {
try {
// 阻塞当前线程,目的是保证轮询线程 workerThread 开启
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore – it will be ready very soon.
}
}
}
在这个方法中会去开启 workerThread 线程,执行 workerThread 线程中 run()方法
public void run() {
// Initialize the startTime.
// 初始化开始时间
startTime = System.nanoTime();
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it’s not 0 when initialized.
startTime = 1;
}

// Notify the other threads waiting for the initialization at start().
// 唤醒阻塞的线程
startTimeInitialized.countDown();

do {
// 根据周期时间 tickDuration,进行周期性的 tick 下一个槽位
final long deadline = waitForNextTick();
if (deadline > 0) {
// 获取下一个槽位的角标
int idx = (int) (tick & mask);
processCancelledTasks();
// 获取该角标对应的 HashedWheelBucket 对象
HashedWheelBucket bucket =
wheel[idx];
// 将存储在链表 timeOuts 中的定时任务存储到对应的槽位的 HashedWheelBucket 对象中
transferTimeoutsToBuckets();
// 执行槽位中定时任务
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

// Fill the unprocessedTimeouts so we can return them from stop() method.
for (HashedWheelBucket bucket : wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
for (; ;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks();
}
在上面方法中,轮询时间轮,执行对应槽位的定时任务,在执行之前,会先将存储在链表中任务按照各自的时间放入对应的槽位中,接下来咱们来看如何根据周期时间进行 tick
private long waitForNextTick() {
// 获取下一个槽位的等待时间
long deadline = tickDuration * (tick + 1);

for (; ;) {
// 获取当前时间间隔
final long currentTime = System.nanoTime() – startTime;
// 计算 tick 到下一个槽位需要等待的时间
long sleepTimeMs = (deadline – currentTime + 999999) / 1000000;

// 当前时间间隔大于等于下一个槽位周期时间,不需要等待,直接返回(从这个地方就可以得出 HashedWheelTimer 对时间精度要求不高,并不是严格按照延迟时间来执行的)
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
if (isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}

try {
// 当前时间间隔小于下一个槽位周期时间,则进行休眠
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
分析了如何实现时间间隔轮询,接下来分析如何将任务存储到 HashedWheelBucket 中
private void transferTimeoutsToBuckets() {
// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
// adds new timeouts in a loop.
// 遍历 timeouts 链表,默认遍历链表 100000 个任务
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
// 任务的状态等于取消,直接跳过
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}

// 设置任务需要轮询的圈数,如:槽位 =8,周期 tickDuration=100ms,任务时间 =900ms,则说明需要轮询一圈后,才能会执行到该任务,即 remainingRounds= 1,槽位角标 stopIndex=1
long calculated = timeout.deadline / tickDuration;
timeout.remainingRounds = (calculated – tick) / wheel.length;

// Ensure we don’t schedule for past.
final long ticks = Math.max(calculated, tick);
int stopIndex = (int) (ticks & mask);

HashedWheelBucket bucket = wheel[stopIndex];
// 将定时任务存储到对应的 HashedWheelBucket 槽位中
bucket.addTimeout(timeout);
}
}
HashedWheelBucket 是一个包含双向链表的对象,addTimeout 将任务存储到链表的末端
void expireTimeouts(long deadline) {
// 获取链表表头任务
HashedWheelTimeout timeout = head;

// process all timeouts
while (timeout != null) {
// 获取表头的下一个任务
HashedWheelTimeout next = timeout.next;
if (timeout.remainingRounds <= 0) {
// 将要执行的任务从链表中删除
next = remove(timeout);
// 任务的时间小于间隔时间,执行任务
if (timeout.deadline <= deadline) {
// 执行任务
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
“timeout.deadline (%d) > deadline (%d)”, timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
next = remove(timeout);
} else {
timeout.remainingRounds–;
}
timeout = next;
}
}
上面这个方法就是遍历槽位中链表中的任务进行执行
public void expire() {
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
return;
}

try {
// ** 这个地方就是真正执行封装的 task 任务,执行具体的任务逻辑 **
task.run(this);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn(“An exception was thrown by ” + TimerTask.class.getSimpleName() + ‘.’, t);
}
}
}
以上就是 HashedWheelTimer 执行的整个过程,在分析的过程中最好还是结合具体的实例去分析,这样会更有利于自己的理解。

退出移动版