关于netty:Netty源码解析-FastThreadLocal与HashedWheelTimer

45次阅读

共计 4618 个字符,预计需要花费 12 分钟才能阅读完成。

Netty 源码剖析系列文章已靠近序幕,本文再来剖析 Netty 中两个常见组件:FastThreadLoca 与 HashedWheelTimer。
源码剖析基于 Netty 4.1.52

FastThreadLocal

FastThreadLocal 比较简单。
FastThreadLocal 和 FastThreadLocalThread 是配套应用的。
FastThreadLocalThread 继承了 Thread,FastThreadLocalThread#threadLocalMap 是一个 InternalThreadLocalMap,该 InternalThreadLocalMap 对象只能用于以后线程。
InternalThreadLocalMap#indexedVariables 是一个数组,寄存了以后线程所有 FastThreadLocal 对应的值。
而每个 FastThreadLocal 都有一个 index,用于定位 InternalThreadLocalMap#indexedVariables。

FastThreadLocal#get

public final V get() {
    // #1
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    // #2
    Object v = threadLocalMap.indexedVariable(index);
    if (v != InternalThreadLocalMap.UNSET) {return (V) v;
    }
    // #3
    return initialize(threadLocalMap);
}

#1 获取该线程的 InternalThreadLocalMap
如果是 FastThreadLocalThread,间接获取 FastThreadLocalThread#threadLocalMap。
否则,从 UnpaddedInternalThreadLocalMap.slowThreadLocalMap 获取该线程 InternalThreadLocalMap。
留神,UnpaddedInternalThreadLocalMap.slowThreadLocalMap 是一个 ThreadLocal,这里理论回退到应用 ThreadLocal 了。
#2 每个 FastThreadLocal 都有一个 index。
通过该 index,获取 InternalThreadLocalMap#indexedVariables 中寄存的值
#3 找不到值,通过 initialize 办法构建新对象。

能够看到,FastThreadLocal 中连 hash 算法都不必,通过下标获取对应的值,复杂度为 log(1),天然很快啦。

HashedWheelTimer

HashedWheelTimer 是 Netty 提供的工夫轮调度器。
工夫轮是一种充分利用线程资源进行批量化任务调度的调度模型,可能高效的治理各种延时工作。
简略说,就是将延时工作寄存到一个环形队列中,并通过执行线程定时执行该队列的工作。

例如,
环形队列上有 60 个格子,
执行线程每秒挪动一个格子,则环形队列每轮可寄存 1 分钟内的工作。
当初有两个定时工作
task1,32 秒后执行
task2,2 分 25 秒后执行
而执行线程以后位于第 6 格子
则 task1 放到 32+6=38 格,轮数为 0
task2 放到 25+6=31 个,轮数为 2
执行线程将执行以后格子轮数为 0 的工作,并将其余工作轮数减 1。

毛病,工夫轮调度器的工夫精度不高。
因为工夫轮算法的精度取决于执行线程挪动速度。
例如下面例子中执行线程每秒挪动一个格子,则调度精度小于一秒的工作就无奈准时调用。

HashedWheelTimer 关键字段

// 工作执行器,负责执行工作
Worker worker = new Worker();
// 工作执行线程
Thread workerThread;
//  HashedWheelTimer 状态,0 - init, 1 - started, 2 - shut down
int workerState;
// 工夫轮队列,应用数组实现
HashedWheelBucket[] wheel;
// 暂存新增的工作
Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
// 已勾销工作
Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();

增加提早工作 HashedWheelTimer#newTimeout

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    ...

    // #1
    start();

    // #2
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

    ...
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    timeouts.add(timeout);
    return timeout;
}

#1 如果 HashedWheelTimer 未启动,则启动该 HashedWheelTimer
HashedWheelTimer#start 办法负责是启动 workerThread 线程
#2 startTime 是 HashedWheelTimer 启动工夫
deadline 是绝对 HashedWheelTimer 启动的延迟时间
构建 HashedWheelTimeout,增加到 HashedWheelTimer#timeouts

工夫轮运行 Worker#run

public void run() {
    ...

    // #1
    startTimeInitialized.countDown();

    do {
        // #2
        final long deadline = waitForNextTick();
        if (deadline > 0) {
            // #3
            int idx = (int) (tick & mask);
            processCancelledTasks();
            HashedWheelBucket bucket = wheel[idx];
            // #4
            transferTimeoutsToBuckets();
            // #5
            bucket.expireTimeouts(deadline);
            // #6
            tick++;
        }
    } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

    // #7
    ...
}

#1 HashedWheelTimer#start 办法阻塞 HashedWheelTimer 线程直到 Worker 启动实现,这里解除 HashedWheelTimer 线程阻塞。
#2 计算下一格子开始执行的工夫,而后 sleep 到下次格子开始执行工夫
#2 tick 是从 HashedWheelTimer 启动后挪动的总格子数,这里获取 tick 对应的格子索引。
因为 Long 类型足够大,这里并不思考溢出问题。
#4 将 HashedWheelTimer#timeouts 的工作迁徙到对应的格子中
#5 解决已到期工作
#6 挪动到下一个格子
#7 这里是 HashedWheelTimer#stop 后的逻辑解决,勾销工作,进行工夫轮

迁徙工作 Worker#transferTimeoutsToBuckets

private void transferTimeoutsToBuckets() {
    // #1
    for (int i = 0; i < 100000; i++) {HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            // all processed
            break;
        }
        if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {continue;}
        // #2
        long calculated = timeout.deadline / tickDuration;
        // #3
        timeout.remainingRounds = (calculated - tick) / wheel.length;
        // #4
        final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
        // #5
        int stopIndex = (int) (ticks & mask);

        HashedWheelBucket bucket = wheel[stopIndex];
        bucket.addTimeout(timeout);
    }
}

#1 留神,每次只迁徙 100000 个工作,免得阻塞线程
#2 工作延迟时间 / 每格工夫数,失去该工作需提早的总格子挪动数
#3 (总格子挪动数 – 已挪动格子数)/ 每轮格子数,失去轮数
#4 如果工作在 timeouts 队列放得太久导致曾经过了执行工夫,则应用以后 tick,也就是放到以后 bucket,以便尽快执行该工作
#5 计算 tick 对应格子索引,放到对应的格子地位

执行到期工作 HashedWheelBucket#expireTimeouts

public void expireTimeouts(long deadline) {
    HashedWheelTimeout timeout = head;

    while (timeout != null) {
        HashedWheelTimeout next = timeout.next;
        // #1
        if (timeout.remainingRounds <= 0) {
            // #2
            next = remove(timeout);
            if (timeout.deadline <= deadline) {
                // #3
                timeout.expire();} else {
                throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
            }
        } else if (timeout.isCancelled()) {next = remove(timeout);
        } else {
            // #4
            timeout.remainingRounds --;
        }
        timeout = next;
    }
}

#1 抉择轮数小于等于 0 的工作
#2 移除工作
#3 批改状态为过期,并执行工作
#4 其余工作轮数减 1

ScheduledExecutorService 应用堆 (DelayedWorkQueue) 保护工作,新增工作复杂度为 O(logN)。
而 HashedWheelTimer 新增工作复杂度为 O(1),所以在工作十分多时,HashedWheelTimer 能够体现出它的劣势。
然而工作较少甚至没有工作时,HashedWheelTimer 的执行线程都须要一直挪动,也会造成性能耗费。
留神,HashedWheelTimer 应用同一个线程调用和执行工作,如果某些工作执行工夫过久,则影响后续定时工作执行。当然,咱们也能够思考在工作中另起线程执行逻辑。
另外,如果工作过多,也会导致工作长期滞留在 HashedWheelTimer#timeouts 中而不能及时执行。

如果您感觉本文不错,欢送关注我的微信公众号,系列文章继续更新中。您的关注是我保持的能源!

正文完
 0