共计 6125 个字符,预计需要花费 16 分钟才能阅读完成。
零 后期筹备
0 FBI WARNING
文章异样啰嗦且绕弯。
1 版本
JDK 版本 : Adoptopenjdk 14.0.1
IDE : idea 2020.1
Netty 版本 : netty-all 4.1.46.Final
2 HashedWheelTimer 简介
HashedWheelTimer 是 Netty 中实现提早工作的工具类。
3 Demo
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.util.concurrent.TimeUnit;
/**
* netty 工夫轮 demo
*/
public class TimeWheel {public static void main(String[] args) {
// 创立工夫轮对象
HashedWheelTimer wheel = new HashedWheelTimer();
// 打印以后工夫
System.out.println(System.currentTimeMillis());
// 创立一个提早工作,用于打印
wheel.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {System.out.println("test print" + System.currentTimeMillis());
}
},1000, TimeUnit.MILLISECONDS);
// 线程休眠,避免线程退出
try {Thread.sleep(1000000000L);
} catch (InterruptedException e) {e.printStackTrace();
}
}
}
4 创立 HashedTWheelTimer
4.1 结构器
上述 demo 中创立 HashedTWheelTimer:
HashedWheelTimer wheel = new HashedWheelTimer();
追踪它的实现:
/**
* 其它所有结构器都最终调用到这个结构器
* threadFactory - 线程工厂,默认是 Executors 中的 defaultThreadFactory
* tickDuration - 工夫轮两次 tick 之间的间隔时间,默认值 100
* unit - tick 的工夫单位,默认为 毫秒
* ticksPerWheel - 一轮 tick 的数量
* leakDetection - 用于优雅敞开的援用对象,默认 true
* maxPendingTimeouts - 最大工作数,默认 -1,即为不限度
**/
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration,
TimeUnit unit,
int ticksPerWheel,
boolean leakDetection,
long maxPendingTimeouts) {
// 做局部参数的非空和合规验证
ObjectUtil.checkNotNull(threadFactory, "threadFactory");
ObjectUtil.checkNotNull(unit, "unit");
ObjectUtil.checkPositive(tickDuration, "tickDuration");
ObjectUtil.checkPositive(ticksPerWheel, "ticksPerWheel");
// wheel 是一个 HashedWheelBucket 数组,是工作存储器,具体见 4.2
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
// 获取实在的 tick 工夫,并测验合法性
long duration = unit.toNanos(tickDuration);
if (duration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d",tickDuration, Long.MAX_VALUE / wheel.length));
}
// MILLISECOND_NANOS = 1000000
// duration 不得小于 1000000
if (duration < MILLISECOND_NANOS) {logger.warn("Configured tickDuration {} smaller then {}, using 1ms.",tickDuration, MILLISECOND_NANOS);
this.tickDuration = MILLISECOND_NANOS;
} else {this.tickDuration = duration;}
// 工作线程,具体见 4.3
workerThread = threadFactory.newThread(worker);
// 判断是否须要创立一个用于优雅敞开的弱援用对象
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
// 最大工作数
this.maxPendingTimeouts = maxPendingTimeouts;
// 工夫轮在一个 jvm 过程中的最大初始化数量不得大于 64 个,不然的话会在此处报错
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {reportTooManyInstances();
}
}
4.2 HashedWheelBucket
wheel 是一个 HashedWheelBucket 数组对象:
private final HashedWheelBucket[] wheel;
wheel 的初始化在 createWheel(…) 办法中进行:
// HashedWheelBucket.class
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
// ticks 是在创立 HashedWheelBucket 的时候输出的分片数
// 分片数不可小于 0,或者大于 1073741824
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0:" + ticksPerWheel);
}
if (ticksPerWheel > 1073741824) {
throw new IllegalArgumentException("ticksPerWheel may not be greater than 2^30:" + ticksPerWheel);
}
// ticks 必须是 2 的倍数,所以此处会对使用者传入的 ticksPerWheel 进行修改
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
// 创立数组并初始化
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i ++) {wheel[i] = new HashedWheelBucket();}
// 返回
return wheel;
}
HashedWheelBucket 是 HashedWheelTimer 的外部类:
private static final class HashedWheelBucket {
// HashedWheelBucket 实质上是 HashedWheelTimeout 的链表,而 HashedWheelTimeout 实质上是要执行的工作
private HashedWheelTimeout head;
private HashedWheelTimeout tail;
// ...
}
当使用者存入一个工作的时候,会调用到 HashedWheelBucket 的 addTimeout(…) 办法:
// 实质上是将工作放入到链表中
public void addTimeout(HashedWheelTimeout timeout) {
assert timeout.bucket == null;
timeout.bucket = this;
if (head == null) {head = tail = timeout;} else {
tail.next = timeout;
timeout.prev = tail;
tail = timeout;
}
}
而取出工作要生产的时候,调用 pollTimeout() 办法:
// 实质上是将工作从链表中拿出
private HashedWheelTimeout pollTimeout() {
HashedWheelTimeout head = this.head;
if (head == null) {return null;}
HashedWheelTimeout next = head.next;
if (next == null) {tail = this.head = null;} else {
this.head = next;
next.prev = null;
}
head.next = null;
head.prev = null;
head.bucket = null;
return head;
}
4.3 workerThread
workerThread 是一个线程对象:
private final Thread workerThread;
调用线程工厂的 newThread 办法实现初始化:
workerThread = threadFactory.newThread(worker);
worker 对象是一个 Worker 对象。Worker 是 HashedWheelTimer 的外部类,是 Runnable 的实现类:
private final class Worker implements Runnable {private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
private long tick;
@Override
public void run() {
// 记录开始工夫
startTime = System.nanoTime();
if (startTime == 0) {startTime = 1;}
// CountDownLatch,确保先实现初始化之后才会塞入工作
startTimeInitialized.countDown();
do {
// 在这个办法中线程会休眠,直到下一个 tick 的工夫
final long deadline = waitForNextTick();
if (deadline > 0) {int idx = (int) (tick & mask);
// 清理曾经被勾销的 task
processCancelledTasks();
// 选定以后要解决的 bucket
HashedWheelBucket bucket = wheel[idx];
// 将贮存在队列里的工作搁置到对应的 bucket 里
transferTimeoutsToBuckets();
// 执行以后 bucket 里所有的工作
bucket.expireTimeouts(deadline);
// 切换到下一个 tick
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// 后续代码用以优雅敞开
for (HashedWheelBucket bucket: wheel) {bucket.clearTimeouts(unprocessedTimeouts);
}
for (;;) {HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {break;}
if (!timeout.isCancelled()) {unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks();}
// 略 ...
}
5 增加工作
回到 demo 的增加工作的代码:
wheel.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {System.out.println("test print" + System.currentTimeMillis());
}
},1000, TimeUnit.MILLISECONDS);
追踪 newTimeout(…) 办法:
// HashedWheelTimer.java
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
// 非空验证
ObjectUtil.checkNotNull(task, "task");
ObjectUtil.checkNotNull(unit, "unit");
// 工作数量,并与设定的最大值进行比拟
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending"
+ "timeouts (" + maxPendingTimeouts + ")");
}
// 如果是第一次增加工作,那么在此处会启动工作线程
start();
// 该当被执行的延迟时间
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// 时钟回拨的状况
if (delay > 0 && deadline < 0) {deadline = Long.MAX_VALUE;}
// 将 task 包装成一个 timeout 对象
HashedWheelTimeout timeout = new HashedWheelTimeout(this,task,deadline);
// 放到队列里
timeouts.add(timeout);
return timeout;
}
本文仅为集体的学习笔记,可能存在谬误或者表述不清的中央,有缘补充
正文完