乐趣区

关于java:追踪解析-Netty-的-HashedWheelTimer-源码

零 后期筹备

0 FBI WARNING

文章异样啰嗦且绕弯。

1 版本

JDK 版本 : Adoptopenjdk 14.0.1

IDE : idea 2020.1

Netty 版本 : netty-all 4.1.46.Final

2 HashedWheelTimer 简介

HashedWheelTimer 是 Netty 中实现提早工作的工具类。

3 Demo


import io.netty.util.HashedWheelTimer;

import io.netty.util.Timeout;

import io.netty.util.TimerTask;

import java.util.concurrent.TimeUnit;

/**

* netty 工夫轮 demo

*/

public class TimeWheel {public static void main(String[] args) {

// 创立工夫轮对象

HashedWheelTimer wheel = new HashedWheelTimer();

// 打印以后工夫

System.out.println(System.currentTimeMillis());

// 创立一个提早工作,用于打印

wheel.newTimeout(new TimerTask() {

@Override

public void run(Timeout timeout) throws Exception {System.out.println("test print" + System.currentTimeMillis());

}

},1000, TimeUnit.MILLISECONDS);

// 线程休眠,避免线程退出

try {Thread.sleep(1000000000L);

} catch (InterruptedException e) {e.printStackTrace();

}

}

}

4 创立 HashedTWheelTimer

4.1 结构器

上述 demo 中创立 HashedTWheelTimer:


HashedWheelTimer wheel = new HashedWheelTimer();

追踪它的实现:


/**

* 其它所有结构器都最终调用到这个结构器

* threadFactory - 线程工厂,默认是 Executors 中的 defaultThreadFactory

* tickDuration - 工夫轮两次 tick 之间的间隔时间,默认值 100

* unit - tick 的工夫单位,默认为 毫秒

* ticksPerWheel - 一轮 tick 的数量

* leakDetection - 用于优雅敞开的援用对象,默认 true

* maxPendingTimeouts - 最大工作数,默认 -1,即为不限度

**/

public HashedWheelTimer(

ThreadFactory threadFactory,

long tickDuration,

TimeUnit unit,

int ticksPerWheel,

boolean leakDetection,

long maxPendingTimeouts) {

// 做局部参数的非空和合规验证

ObjectUtil.checkNotNull(threadFactory, "threadFactory");

ObjectUtil.checkNotNull(unit, "unit");

ObjectUtil.checkPositive(tickDuration, "tickDuration");

ObjectUtil.checkPositive(ticksPerWheel, "ticksPerWheel");

// wheel 是一个 HashedWheelBucket 数组,是工作存储器,具体见 4.2

wheel = createWheel(ticksPerWheel);

mask = wheel.length - 1;

// 获取实在的 tick 工夫,并测验合法性

long duration = unit.toNanos(tickDuration);

if (duration >= Long.MAX_VALUE / wheel.length) {

throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d",tickDuration, Long.MAX_VALUE / wheel.length));

}

// MILLISECOND_NANOS = 1000000

// duration 不得小于 1000000

if (duration < MILLISECOND_NANOS) {logger.warn("Configured tickDuration {} smaller then {}, using 1ms.",tickDuration, MILLISECOND_NANOS);

this.tickDuration = MILLISECOND_NANOS;

} else {this.tickDuration = duration;}

// 工作线程,具体见 4.3

workerThread = threadFactory.newThread(worker);

// 判断是否须要创立一个用于优雅敞开的弱援用对象

leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;

// 最大工作数

this.maxPendingTimeouts = maxPendingTimeouts;

// 工夫轮在一个 jvm 过程中的最大初始化数量不得大于 64 个,不然的话会在此处报错

if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&

WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {reportTooManyInstances();

}

}

4.2 HashedWheelBucket

wheel 是一个 HashedWheelBucket 数组对象:


private final HashedWheelBucket[] wheel;

wheel 的初始化在 createWheel(…) 办法中进行:


// HashedWheelBucket.class

private static HashedWheelBucket[] createWheel(int ticksPerWheel) {

// ticks 是在创立 HashedWheelBucket 的时候输出的分片数

// 分片数不可小于 0,或者大于 1073741824

if (ticksPerWheel <= 0) {

throw new IllegalArgumentException("ticksPerWheel must be greater than 0:" + ticksPerWheel);

}

if (ticksPerWheel > 1073741824) {

throw new IllegalArgumentException("ticksPerWheel may not be greater than 2^30:" + ticksPerWheel);

}

// ticks 必须是 2 的倍数,所以此处会对使用者传入的 ticksPerWheel 进行修改

ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);

// 创立数组并初始化

HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];

for (int i = 0; i < wheel.length; i ++) {wheel[i] = new HashedWheelBucket();}

// 返回

return wheel;

}

HashedWheelBucket 是 HashedWheelTimer 的外部类:


private static final class HashedWheelBucket {

// HashedWheelBucket 实质上是 HashedWheelTimeout 的链表,而 HashedWheelTimeout 实质上是要执行的工作

private HashedWheelTimeout head;

private HashedWheelTimeout tail;

// ...

}

当使用者存入一个工作的时候,会调用到 HashedWheelBucket 的 addTimeout(…) 办法:


// 实质上是将工作放入到链表中

public void addTimeout(HashedWheelTimeout timeout) {

assert timeout.bucket == null;

timeout.bucket = this;

if (head == null) {head = tail = timeout;} else {

tail.next = timeout;

timeout.prev = tail;

tail = timeout;

}

}

而取出工作要生产的时候,调用 pollTimeout() 办法:


// 实质上是将工作从链表中拿出

private HashedWheelTimeout pollTimeout() {

HashedWheelTimeout head = this.head;

if (head == null) {return null;}

HashedWheelTimeout next = head.next;

if (next == null) {tail = this.head = null;} else {

this.head = next;

next.prev = null;

}

head.next = null;

head.prev = null;

head.bucket = null;

return head;

}

4.3 workerThread

workerThread 是一个线程对象:


private final Thread workerThread;

调用线程工厂的 newThread 办法实现初始化:


workerThread = threadFactory.newThread(worker);

worker 对象是一个 Worker 对象。Worker 是 HashedWheelTimer 的外部类,是 Runnable 的实现类:


private final class Worker implements Runnable {private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();

private long tick;

@Override

public void run() {

// 记录开始工夫

startTime = System.nanoTime();

if (startTime == 0) {startTime = 1;}

// CountDownLatch,确保先实现初始化之后才会塞入工作

startTimeInitialized.countDown();

do {

// 在这个办法中线程会休眠,直到下一个 tick 的工夫

final long deadline = waitForNextTick();

if (deadline > 0) {int idx = (int) (tick & mask);

// 清理曾经被勾销的 task

processCancelledTasks();

// 选定以后要解决的 bucket

HashedWheelBucket bucket = wheel[idx];

// 将贮存在队列里的工作搁置到对应的 bucket 里

transferTimeoutsToBuckets();

// 执行以后 bucket 里所有的工作

bucket.expireTimeouts(deadline);

// 切换到下一个 tick

tick++;

}

} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

// 后续代码用以优雅敞开

for (HashedWheelBucket bucket: wheel) {bucket.clearTimeouts(unprocessedTimeouts);

}

for (;;) {HashedWheelTimeout timeout = timeouts.poll();

if (timeout == null) {break;}

if (!timeout.isCancelled()) {unprocessedTimeouts.add(timeout);

}

}

processCancelledTasks();}

// 略 ...

}

5 增加工作

回到 demo 的增加工作的代码:


wheel.newTimeout(new TimerTask() {

@Override

public void run(Timeout timeout) throws Exception {System.out.println("test print" + System.currentTimeMillis());

}

},1000, TimeUnit.MILLISECONDS);

追踪 newTimeout(…) 办法:


// HashedWheelTimer.java

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {

// 非空验证

ObjectUtil.checkNotNull(task, "task");

ObjectUtil.checkNotNull(unit, "unit");

// 工作数量,并与设定的最大值进行比拟

long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {pendingTimeouts.decrementAndGet();

throw new RejectedExecutionException("Number of pending timeouts ("

+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending"

+ "timeouts (" + maxPendingTimeouts + ")");

}

// 如果是第一次增加工作,那么在此处会启动工作线程

start();

// 该当被执行的延迟时间

long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

// 时钟回拨的状况

if (delay > 0 && deadline < 0) {deadline = Long.MAX_VALUE;}

// 将 task 包装成一个 timeout 对象

HashedWheelTimeout timeout = new HashedWheelTimeout(this,task,deadline);

// 放到队列里

timeouts.add(timeout);

return timeout;

}

本文仅为集体的学习笔记,可能存在谬误或者表述不清的中央,有缘补充

退出移动版