乐趣区

关于java:Netty八基于时间轮的定时器HashedWheelTimer

一、前言

最近在浏览 Redisson 的源码时看到了外部应用了 netty 提供的这个组件,就想着看下这个定时器具体是如何实现的?

先介绍一下 HashedWheelTimer,它是 基于工夫轮实现 的一个定时器,它的长处是 实现绝对简略,毛病是无奈准确、准时地执行定时工作,只能是近似执行

因为工夫轮中每个刻度大小可能是 100ms 也可能 1ms,所以在执行工作时,工夫上会存在一点误差,在大部分网络应用中,IO 工作的执行工夫往往不须要那么准确,因而默认每个刻度小大是 100ms,但你能够本人来调整刻度大小,最小是 1ms。

简略介绍完 HahsedWheelTimer,接下来咱们先来看下工夫轮的构造

二、工夫轮的构造

工夫轮相似于一个时钟,它和时钟一样是有刻度的,每个刻度大小能够是 100ms 也能够是 1ms,如下图

上图的工夫轮有 6 个刻度,每个刻度大小是 100ms,也就是每过 100ms 会顺时针挪动一个刻度,走完一圈须要 600ms(上面要介绍的 HashedWheelTimer 默认刻度数是 512,每个刻度大小默认是 100ms)

工作原理 :每往工夫轮提交一个延时工作,会 判断该工作的执行工夫要放在哪个刻度上 ,比方在工夫轮启动后的第 100ms,提交了一个延时 400ms 执行的工作,那么该工作应该放在刻度 5 上,如果提交了一个提早 700ms 执行的工作,那么该工作会放在刻度 2 上,并且会记录该工作还须要走一圈工夫轮能力执行。 工夫轮每挪动一个刻度,就会执行以后刻度上的工作,一个刻度上的工作可能会有多个

因为 HashedWheelTimer 是基于工夫轮的定时器,所以接下来看一下 HashedWheelTimer 是如何实现的?

三、HashedWheelTimer 的相干组件

这里咱们能够先看下 HashedWheelTimer 的 UML 图,可能对相干组件先有个整体的意识,如下

  • Timer: 定时器接口,提供 提交延时工作 newTimeout、进行定时器 等办法
  • HashedWheelTimer: 实现 Timer 接口,外部蕴含 工作线程 Worker、工夫轮 wheel、延时工作队列 timeouts、线程池 taskExecutor 等
  • HashedWheelBucket:下面的工夫轮 wheel 是一个 HashedWheelBucket 数组,每一个刻度对应一个 HashedWheelBucket,而每一个 HashedWheelBucket 外部是一个 HashedWheelTimeout 的双向链表,如下图
  • TimerTask: 延时工作接口,外部只提供一个 run 办法用于执行
  • Timeout: 对 Timer、TimerTask 的封装
  • HashedWheelTimeout: 蕴含了 工作的执行工夫 dealline、所须要的圈数 remainingRounds、双向链表中上一个以及下一个 HashedWheelTimeout、所在的 HashedWheelBucket 等

四、HashedWheelTimer 的工作流程

大抵工作流程如下图:

从上图能够看到,次要分为 4 步骤,然而精确来说应该是有 5 步:

  1. 提交延时工作给 HashedWheelTimer,延时工作会先放到工作队列 timeouts 中
  2. 工作线程 Worker 会从工作队列 timeouts 中获取工作
  3. 将获取到的 HashedWheelTimeout 工作放到指定的 HashedWheelBucket 中
  4. 取出以后刻度对应的 HashedWheelBucket 的所有 HashedWheelTimeout 来执行
  5. 将刻度 tick 加 1,再回到第二步,如此循环

五、源码解读

5.1 HahedWheelTimer 的要害属性

要害属性如下:

  • Worker worker:工作线程 Worker
  • int workerState:工作线程状态
  • long tickDuration:刻度大小,默认是 100ms
  • HashedWheelBucket[] wheel工夫轮的每个刻度会对应一个 HashedWheelBucket
  • Queue<HashedWheelTimeout> timeouts工作队列
  • Queue<HashedWheelTimeout> cancelledTimeouts:已勾销的工作队列
  • AtomicLong pendingTimeouts:正在解决的工作数
  • Executor taskExecutor:线程池,用于执行工作
  • long startTime定时器的启动工夫

5.2 提交延时工作给 HahedWheelTimer

通过 newTimeout 办法来提交延时工作,newTimeout 办法步骤如下:

  1. 启动工作线程 Worker,如果是首次启动,设置启动工夫 startTime,如果已启动,则跳过
  2. 计算延时工作的 deadline(以后工夫 + 延迟时间 - 启动工夫 startTime),用于判断后续放到工夫轮的哪个 HashedWheelBucket 中
  3. 将延时工作封装为 HashedWheelTimeout,并 增加到工作队列 timeouts

联合源码来看:

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    // 局部代码省略
    
    // 启动工作线程 Worker
    start();

    // 计算 deadline
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
    if (delay > 0 && deadline < 0) {deadline = Long.MAX_VALUE;}
    
    // 封装为 HashedWheelTimeout,并增加到工作队列 timeouts 中
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    timeouts.add(timeout);
    return timeout;
}

5.3 工作线程 Worker 运行的具体步骤

Worker 类中有一个 要害的属性 tick,代表 绝对于定时器的启动工夫 startTime,以后曾经走到了哪个刻度,tick 只会始终往上递增,初始值为 0

具体步骤如下:

  1. 等到下一个刻度降临,即以后工夫 > 以后刻度 tick 的完结工夫
    a) 计算以后刻度 tick 的完结工夫 ,比方 Worker 刚启动,以后刻度 tick 为 0,那么刻度 tick 的完结工夫 = tickDuration * (tick + 1),即 100ms
    b) 判断以后工夫( 绝对于启动工夫 startTime)是否大于以后刻度的完结工夫,如果大于,阐明以后工夫曾经过了以后刻度的完结工夫,开始筹备解决以后刻度的所有工作。如果小于,阐明以后工夫还没到以后刻度的完结工夫,被动 sleep 一段时间后持续判断,直到以后工夫大于以后刻度的完结工夫。
  2. 从工作队列 timeouts 中获取工作,将延时工作的 deadline 除以 tickDuration,计算出该工作的 总刻度数以及还须要的圈数 ,通过 总刻度数 &(wheel.length -1)来算出放在哪个 HashedWheelBucket 中(比方算出 A 工作的总刻度数 = 1026,以后刻度 = 25,工夫轮的刻度有 512 个,那么算出还须要的圈数是 1【如果以后刻度 = 1,那么还须要的圈数会是 2】,放在下标为 2 的 HashedWheelBucket 中
  3. 获取以后刻度对应的 HashedWheelBucket,从 head 开始逐个遍历工作链表,如果延时工作的所需圈数为 0,开始执行工作,否则所需圈数减 1。
  4. 刻度 tick 加 1 ,回到第一步,如此循环

联合源码来看

public void run() {
    // 初始化定时器的启动工夫 startTime
    startTime = System.nanoTime();
    startTimeInitialized.countDown();

    do {
        // 1、等到下一个刻度降临,即以后工夫 > 以后刻度 tick 的完结工夫
        final long deadline = waitForNextTick();
        if (deadline > 0) {
            // 获取以后刻度 tick 对应的 HashedWheelBucket
            int idx = (int) (tick & mask);
            processCancelledTasks();
            HashedWheelBucket bucket = wheel[idx];
            
            // 2、从工作队列 timeouts 中获取工作,并将工作放入到对应的 HashedWheelBucket 中        
            transferTimeoutsToBuckets();
            // 3、执行以后刻度 tick 对应的 HashedWheelBucket 中的所有工作
            bucket.expireTimeouts(deadline);
            // 4、将以后刻度 tick 加 1
            tick++;
        }
    } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

    // 局部代码省略
}

这里咱们先来看下第一步 waitForNextTick 办法的具体实现

private long waitForNextTick() {long deadline = tickDuration * (tick + 1);

    for (;;) {
        // 绝对于 startTime 的以后工夫
        final long currentTime = System.nanoTime() - startTime;
        long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
   
        // 如果以后刻度 tick 的完结工夫 < 以后工夫,阐明以后工夫曾经过了以后刻度的完结工夫,间接返回以后工夫
        if (sleepTimeMs <= 0) {if (currentTime == Long.MIN_VALUE) {return -Long.MAX_VALUE;} else {return currentTime;}
        }
        
        // 否则被动 sleep 一段时间,下面的条件成立
        try {Thread.sleep(sleepTimeMs);
        } catch (InterruptedException ignored) {if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {return Long.MIN_VALUE;}
        }
    }
}

接着看下第二步的 transferTimeoutsToBuckets 办法,如下

private void transferTimeoutsToBuckets() {
    // 这里一次最多从队列里获取 100000 个工作
    for (int i = 0; i < 100000; i++) {HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            // 代表队列里曾经没有工作,间接返回
            break;
        }
        if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {continue;}
        
        // 计算总刻度数 = 延时工作的 deadline / 刻度大小
        long calculated = timeout.deadline / tickDuration;
        // 计算还须要的圈数 = 总刻度数 - 以后刻度 / 工夫轮的刻度数
        timeout.remainingRounds = (calculated - tick) / wheel.length;
        
        final long ticks = Math.max(calculated, tick);
        // 计算放在哪个下标
        int stopIndex = (int) (ticks & mask);
        HashedWheelBucket bucket = wheel[stopIndex];
        // 将该工作放入到对应的 HashedWheelBucket 中
        bucket.addTimeout(timeout);
    }
}

最初看下第三步 bucket.expireTimeouts,源码如下:

public void expireTimeouts(long deadline) {
    HashedWheelTimeout timeout = head;

    // 解决该 HashedWheelBucket 的所有工作
    while (timeout != null) {
        HashedWheelTimeout next = timeout.next;
        if (timeout.remainingRounds <= 0) {
            // 将工作从双向链表中移除
            next = remove(timeout);
            if (timeout.deadline <= deadline) {
                // 执行工作
                timeout.expire();} else {
                // The timeout was placed into a wrong slot. This should never happen.
                throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
            }
        } else if (timeout.isCancelled()) {next = remove(timeout);
        } else {
            // 如果所需圈数 > 0,则将其减 1
            timeout.remainingRounds --;
        }
        timeout = next;
    }
}

至此,工作线程 Worker 运行的具体步骤以及局部源码的解读就实现了

六、总结

HashedWheelTimer 只是定时器的一种简略实现,像 java 中常见的定时器还有 Timer、ScheduledThreadPoolExecutor 等,从下面剖析它的实现原理可知,它无奈利用于须要准确执行的场景,然而在网络应用中,IO 工作的执行工夫往往不须要准确,所以它能够在 工作较多、但工作不须要准确执行 的场景下进行应用。

退出移动版