关于java:Netty八基于时间轮的定时器HashedWheelTimer

59次阅读

共计 5581 个字符,预计需要花费 14 分钟才能阅读完成。

一、前言

最近在浏览 Redisson 的源码时看到了外部应用了 netty 提供的这个组件,就想着看下这个定时器具体是如何实现的?

先介绍一下 HashedWheelTimer,它是 基于工夫轮实现 的一个定时器,它的长处是 实现绝对简略,毛病是无奈准确、准时地执行定时工作,只能是近似执行

因为工夫轮中每个刻度大小可能是 100ms 也可能 1ms,所以在执行工作时,工夫上会存在一点误差,在大部分网络应用中,IO 工作的执行工夫往往不须要那么准确,因而默认每个刻度小大是 100ms,但你能够本人来调整刻度大小,最小是 1ms。

简略介绍完 HahsedWheelTimer,接下来咱们先来看下工夫轮的构造

二、工夫轮的构造

工夫轮相似于一个时钟,它和时钟一样是有刻度的,每个刻度大小能够是 100ms 也能够是 1ms,如下图

上图的工夫轮有 6 个刻度,每个刻度大小是 100ms,也就是每过 100ms 会顺时针挪动一个刻度,走完一圈须要 600ms(上面要介绍的 HashedWheelTimer 默认刻度数是 512,每个刻度大小默认是 100ms)

工作原理 :每往工夫轮提交一个延时工作,会 判断该工作的执行工夫要放在哪个刻度上 ,比方在工夫轮启动后的第 100ms,提交了一个延时 400ms 执行的工作,那么该工作应该放在刻度 5 上,如果提交了一个提早 700ms 执行的工作,那么该工作会放在刻度 2 上,并且会记录该工作还须要走一圈工夫轮能力执行。 工夫轮每挪动一个刻度,就会执行以后刻度上的工作,一个刻度上的工作可能会有多个

因为 HashedWheelTimer 是基于工夫轮的定时器,所以接下来看一下 HashedWheelTimer 是如何实现的?

三、HashedWheelTimer 的相干组件

这里咱们能够先看下 HashedWheelTimer 的 UML 图,可能对相干组件先有个整体的意识,如下

  • Timer: 定时器接口,提供 提交延时工作 newTimeout、进行定时器 等办法
  • HashedWheelTimer: 实现 Timer 接口,外部蕴含 工作线程 Worker、工夫轮 wheel、延时工作队列 timeouts、线程池 taskExecutor 等
  • HashedWheelBucket:下面的工夫轮 wheel 是一个 HashedWheelBucket 数组,每一个刻度对应一个 HashedWheelBucket,而每一个 HashedWheelBucket 外部是一个 HashedWheelTimeout 的双向链表,如下图
  • TimerTask: 延时工作接口,外部只提供一个 run 办法用于执行
  • Timeout: 对 Timer、TimerTask 的封装
  • HashedWheelTimeout: 蕴含了 工作的执行工夫 dealline、所须要的圈数 remainingRounds、双向链表中上一个以及下一个 HashedWheelTimeout、所在的 HashedWheelBucket 等

四、HashedWheelTimer 的工作流程

大抵工作流程如下图:

从上图能够看到,次要分为 4 步骤,然而精确来说应该是有 5 步:

  1. 提交延时工作给 HashedWheelTimer,延时工作会先放到工作队列 timeouts 中
  2. 工作线程 Worker 会从工作队列 timeouts 中获取工作
  3. 将获取到的 HashedWheelTimeout 工作放到指定的 HashedWheelBucket 中
  4. 取出以后刻度对应的 HashedWheelBucket 的所有 HashedWheelTimeout 来执行
  5. 将刻度 tick 加 1,再回到第二步,如此循环

五、源码解读

5.1 HahedWheelTimer 的要害属性

要害属性如下:

  • Worker worker:工作线程 Worker
  • int workerState:工作线程状态
  • long tickDuration:刻度大小,默认是 100ms
  • HashedWheelBucket[] wheel工夫轮的每个刻度会对应一个 HashedWheelBucket
  • Queue<HashedWheelTimeout> timeouts工作队列
  • Queue<HashedWheelTimeout> cancelledTimeouts:已勾销的工作队列
  • AtomicLong pendingTimeouts:正在解决的工作数
  • Executor taskExecutor:线程池,用于执行工作
  • long startTime定时器的启动工夫

5.2 提交延时工作给 HahedWheelTimer

通过 newTimeout 办法来提交延时工作,newTimeout 办法步骤如下:

  1. 启动工作线程 Worker,如果是首次启动,设置启动工夫 startTime,如果已启动,则跳过
  2. 计算延时工作的 deadline(以后工夫 + 延迟时间 - 启动工夫 startTime),用于判断后续放到工夫轮的哪个 HashedWheelBucket 中
  3. 将延时工作封装为 HashedWheelTimeout,并 增加到工作队列 timeouts

联合源码来看:

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    // 局部代码省略
    
    // 启动工作线程 Worker
    start();

    // 计算 deadline
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
    if (delay > 0 && deadline < 0) {deadline = Long.MAX_VALUE;}
    
    // 封装为 HashedWheelTimeout,并增加到工作队列 timeouts 中
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    timeouts.add(timeout);
    return timeout;
}

5.3 工作线程 Worker 运行的具体步骤

Worker 类中有一个 要害的属性 tick,代表 绝对于定时器的启动工夫 startTime,以后曾经走到了哪个刻度,tick 只会始终往上递增,初始值为 0

具体步骤如下:

  1. 等到下一个刻度降临,即以后工夫 > 以后刻度 tick 的完结工夫
    a) 计算以后刻度 tick 的完结工夫 ,比方 Worker 刚启动,以后刻度 tick 为 0,那么刻度 tick 的完结工夫 = tickDuration * (tick + 1),即 100ms
    b) 判断以后工夫( 绝对于启动工夫 startTime)是否大于以后刻度的完结工夫,如果大于,阐明以后工夫曾经过了以后刻度的完结工夫,开始筹备解决以后刻度的所有工作。如果小于,阐明以后工夫还没到以后刻度的完结工夫,被动 sleep 一段时间后持续判断,直到以后工夫大于以后刻度的完结工夫。
  2. 从工作队列 timeouts 中获取工作,将延时工作的 deadline 除以 tickDuration,计算出该工作的 总刻度数以及还须要的圈数 ,通过 总刻度数 &(wheel.length -1)来算出放在哪个 HashedWheelBucket 中(比方算出 A 工作的总刻度数 = 1026,以后刻度 = 25,工夫轮的刻度有 512 个,那么算出还须要的圈数是 1【如果以后刻度 = 1,那么还须要的圈数会是 2】,放在下标为 2 的 HashedWheelBucket 中
  3. 获取以后刻度对应的 HashedWheelBucket,从 head 开始逐个遍历工作链表,如果延时工作的所需圈数为 0,开始执行工作,否则所需圈数减 1。
  4. 刻度 tick 加 1 ,回到第一步,如此循环

联合源码来看

public void run() {
    // 初始化定时器的启动工夫 startTime
    startTime = System.nanoTime();
    startTimeInitialized.countDown();

    do {
        // 1、等到下一个刻度降临,即以后工夫 > 以后刻度 tick 的完结工夫
        final long deadline = waitForNextTick();
        if (deadline > 0) {
            // 获取以后刻度 tick 对应的 HashedWheelBucket
            int idx = (int) (tick & mask);
            processCancelledTasks();
            HashedWheelBucket bucket = wheel[idx];
            
            // 2、从工作队列 timeouts 中获取工作,并将工作放入到对应的 HashedWheelBucket 中        
            transferTimeoutsToBuckets();
            // 3、执行以后刻度 tick 对应的 HashedWheelBucket 中的所有工作
            bucket.expireTimeouts(deadline);
            // 4、将以后刻度 tick 加 1
            tick++;
        }
    } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

    // 局部代码省略
}

这里咱们先来看下第一步 waitForNextTick 办法的具体实现

private long waitForNextTick() {long deadline = tickDuration * (tick + 1);

    for (;;) {
        // 绝对于 startTime 的以后工夫
        final long currentTime = System.nanoTime() - startTime;
        long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
   
        // 如果以后刻度 tick 的完结工夫 < 以后工夫,阐明以后工夫曾经过了以后刻度的完结工夫,间接返回以后工夫
        if (sleepTimeMs <= 0) {if (currentTime == Long.MIN_VALUE) {return -Long.MAX_VALUE;} else {return currentTime;}
        }
        
        // 否则被动 sleep 一段时间,下面的条件成立
        try {Thread.sleep(sleepTimeMs);
        } catch (InterruptedException ignored) {if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {return Long.MIN_VALUE;}
        }
    }
}

接着看下第二步的 transferTimeoutsToBuckets 办法,如下

private void transferTimeoutsToBuckets() {
    // 这里一次最多从队列里获取 100000 个工作
    for (int i = 0; i < 100000; i++) {HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            // 代表队列里曾经没有工作,间接返回
            break;
        }
        if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {continue;}
        
        // 计算总刻度数 = 延时工作的 deadline / 刻度大小
        long calculated = timeout.deadline / tickDuration;
        // 计算还须要的圈数 = 总刻度数 - 以后刻度 / 工夫轮的刻度数
        timeout.remainingRounds = (calculated - tick) / wheel.length;
        
        final long ticks = Math.max(calculated, tick);
        // 计算放在哪个下标
        int stopIndex = (int) (ticks & mask);
        HashedWheelBucket bucket = wheel[stopIndex];
        // 将该工作放入到对应的 HashedWheelBucket 中
        bucket.addTimeout(timeout);
    }
}

最初看下第三步 bucket.expireTimeouts,源码如下:

public void expireTimeouts(long deadline) {
    HashedWheelTimeout timeout = head;

    // 解决该 HashedWheelBucket 的所有工作
    while (timeout != null) {
        HashedWheelTimeout next = timeout.next;
        if (timeout.remainingRounds <= 0) {
            // 将工作从双向链表中移除
            next = remove(timeout);
            if (timeout.deadline <= deadline) {
                // 执行工作
                timeout.expire();} else {
                // The timeout was placed into a wrong slot. This should never happen.
                throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
            }
        } else if (timeout.isCancelled()) {next = remove(timeout);
        } else {
            // 如果所需圈数 > 0,则将其减 1
            timeout.remainingRounds --;
        }
        timeout = next;
    }
}

至此,工作线程 Worker 运行的具体步骤以及局部源码的解读就实现了

六、总结

HashedWheelTimer 只是定时器的一种简略实现,像 java 中常见的定时器还有 Timer、ScheduledThreadPoolExecutor 等,从下面剖析它的实现原理可知,它无奈利用于须要准确执行的场景,然而在网络应用中,IO 工作的执行工夫往往不须要准确,所以它能够在 工作较多、但工作不须要准确执行 的场景下进行应用。

正文完
 0