Netty源码剖析系列文章已靠近序幕,本文再来剖析Netty中两个常见组件:FastThreadLoca与HashedWheelTimer。
源码剖析基于Netty 4.1.52
FastThreadLocal
FastThreadLocal比较简单。
FastThreadLocal和FastThreadLocalThread是配套应用的。
FastThreadLocalThread继承了Thread,FastThreadLocalThread#threadLocalMap 是一个InternalThreadLocalMap,该InternalThreadLocalMap对象只能用于以后线程。
InternalThreadLocalMap#indexedVariables是一个数组,寄存了以后线程所有FastThreadLocal对应的值。
而每个FastThreadLocal都有一个index,用于定位InternalThreadLocalMap#indexedVariables。
FastThreadLocal#get
public final V get() { // #1 InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); // #2 Object v = threadLocalMap.indexedVariable(index); if (v != InternalThreadLocalMap.UNSET) { return (V) v; } // #3 return initialize(threadLocalMap);}
#1
获取该线程的InternalThreadLocalMap
如果是FastThreadLocalThread,间接获取FastThreadLocalThread#threadLocalMap。
否则,从UnpaddedInternalThreadLocalMap.slowThreadLocalMap获取该线程InternalThreadLocalMap。
留神,UnpaddedInternalThreadLocalMap.slowThreadLocalMap是一个ThreadLocal,这里理论回退到应用ThreadLocal了。#2
每个FastThreadLocal都有一个index。
通过该index,获取InternalThreadLocalMap#indexedVariables中寄存的值#3
找不到值,通过initialize办法构建新对象。
能够看到,FastThreadLocal中连hash算法都不必,通过下标获取对应的值,复杂度为log(1),天然很快啦。
HashedWheelTimer
HashedWheelTimer是Netty提供的工夫轮调度器。
工夫轮是一种充分利用线程资源进行批量化任务调度的调度模型,可能高效的治理各种延时工作。
简略说,就是将延时工作寄存到一个环形队列中,并通过执行线程定时执行该队列的工作。
例如,
环形队列上有60个格子,
执行线程每秒挪动一个格子,则环形队列每轮可寄存1分钟内的工作。
当初有两个定时工作
task1,32秒后执行
task2,2分25秒后执行
而执行线程以后位于第6格子
则task1放到32+6=38格,轮数为0
task2放到25+6=31个,轮数为2
执行线程将执行以后格子轮数为0的工作,并将其余工作轮数减1。
毛病,工夫轮调度器的工夫精度不高。
因为工夫轮算法的精度取决于执行线程挪动速度。
例如下面例子中执行线程每秒挪动一个格子,则调度精度小于一秒的工作就无奈准时调用。
HashedWheelTimer关键字段
// 工作执行器,负责执行工作Worker worker = new Worker();// 工作执行线程Thread workerThread;// HashedWheelTimer状态, 0 - init, 1 - started, 2 - shut downint workerState;// 工夫轮队列,应用数组实现HashedWheelBucket[] wheel;// 暂存新增的工作Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();// 已勾销工作Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
增加提早工作 HashedWheelTimer#newTimeout
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { ... // #1 start(); // #2 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; ... HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); timeouts.add(timeout); return timeout;}
#1
如果HashedWheelTimer未启动,则启动该HashedWheelTimer
HashedWheelTimer#start办法负责是启动workerThread线程#2
startTime是HashedWheelTimer启动工夫
deadline是绝对HashedWheelTimer启动的延迟时间
构建HashedWheelTimeout,增加到HashedWheelTimer#timeouts
工夫轮运行 Worker#run
public void run() { ... // #1 startTimeInitialized.countDown(); do { // #2 final long deadline = waitForNextTick(); if (deadline > 0) { // #3 int idx = (int) (tick & mask); processCancelledTasks(); HashedWheelBucket bucket = wheel[idx]; // #4 transferTimeoutsToBuckets(); // #5 bucket.expireTimeouts(deadline); // #6 tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); // #7 ...}
#1
HashedWheelTimer#start办法阻塞HashedWheelTimer线程直到Worker启动实现,这里解除HashedWheelTimer线程阻塞。#2
计算下一格子开始执行的工夫,而后sleep到下次格子开始执行工夫#2
tick是从HashedWheelTimer启动后挪动的总格子数,这里获取tick对应的格子索引。
因为Long类型足够大,这里并不思考溢出问题。#4
将HashedWheelTimer#timeouts的工作迁徙到对应的格子中#5
解决已到期工作#6
挪动到下一个格子#7
这里是HashedWheelTimer#stop后的逻辑解决,勾销工作,进行工夫轮
迁徙工作 Worker#transferTimeoutsToBuckets
private void transferTimeoutsToBuckets() { // #1 for (int i = 0; i < 100000; i++) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { // all processed break; } if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { continue; } // #2 long calculated = timeout.deadline / tickDuration; // #3 timeout.remainingRounds = (calculated - tick) / wheel.length; // #4 final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past. // #5 int stopIndex = (int) (ticks & mask); HashedWheelBucket bucket = wheel[stopIndex]; bucket.addTimeout(timeout); }}
#1
留神,每次只迁徙100000个工作,免得阻塞线程#2
工作延迟时间/每格工夫数, 失去该工作需提早的总格子挪动数#3
(总格子挪动数 - 已挪动格子数)/每轮格子数,失去轮数#4
如果工作在timeouts队列放得太久导致曾经过了执行工夫,则应用以后tick, 也就是放到以后bucket,以便尽快执行该工作#5
计算tick对应格子索引,放到对应的格子地位
执行到期工作 HashedWheelBucket#expireTimeouts
public void expireTimeouts(long deadline) { HashedWheelTimeout timeout = head; while (timeout != null) { HashedWheelTimeout next = timeout.next; // #1 if (timeout.remainingRounds <= 0) { // #2 next = remove(timeout); if (timeout.deadline <= deadline) { // #3 timeout.expire(); } else { throw new IllegalStateException(String.format( "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } } else if (timeout.isCancelled()) { next = remove(timeout); } else { // #4 timeout.remainingRounds --; } timeout = next; }}
#1
抉择轮数小于等于0的工作#2
移除工作#3
批改状态为过期,并执行工作#4
其余工作轮数减1
ScheduledExecutorService应用堆(DelayedWorkQueue)保护工作,新增工作复杂度为O(logN)。
而 HashedWheelTimer 新增工作复杂度为O(1),所以在工作十分多时, HashedWheelTimer 能够体现出它的劣势。
然而工作较少甚至没有工作时,HashedWheelTimer的执行线程都须要一直挪动,也会造成性能耗费。
留神,HashedWheelTimer应用同一个线程调用和执行工作,如果某些工作执行工夫过久,则影响后续定时工作执行。当然,咱们也能够思考在工作中另起线程执行逻辑。
另外,如果工作过多,也会导致工作长期滞留在HashedWheelTimer#timeouts中而不能及时执行。
如果您感觉本文不错,欢送关注我的微信公众号,系列文章继续更新中。您的关注是我保持的能源!