零 后期筹备
0 FBI WARNING
文章异样啰嗦且绕弯。
1 版本
JDK 版本 : Adoptopenjdk 14.0.1
IDE : idea 2020.1
Netty 版本 : netty-all 4.1.46.Final
2 HashedWheelTimer 简介
HashedWheelTimer 是 Netty 中实现提早工作的工具类。
3 Demo
import io.netty.util.HashedWheelTimer;import io.netty.util.Timeout;import io.netty.util.TimerTask;import java.util.concurrent.TimeUnit;/*** netty 工夫轮 demo*/public class TimeWheel {public static void main(String[] args) {// 创立工夫轮对象HashedWheelTimer wheel = new HashedWheelTimer();// 打印以后工夫System.out.println(System.currentTimeMillis());// 创立一个提早工作,用于打印wheel.newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {System.out.println("test print " + System.currentTimeMillis());}},1000, TimeUnit.MILLISECONDS);// 线程休眠,避免线程退出try {Thread.sleep(1000000000L);} catch (InterruptedException e) {e.printStackTrace();}}}
4 创立 HashedTWheelTimer
4.1 结构器
上述 demo 中创立 HashedTWheelTimer:
HashedWheelTimer wheel = new HashedWheelTimer();
追踪它的实现:
/*** 其它所有结构器都最终调用到这个结构器* threadFactory - 线程工厂,默认是 Executors 中的 defaultThreadFactory* tickDuration - 工夫轮两次 tick 之间的间隔时间,默认值 100* unit - tick 的工夫单位,默认为 毫秒* ticksPerWheel - 一轮 tick 的数量* leakDetection - 用于优雅敞开的援用对象,默认 true* maxPendingTimeouts - 最大工作数,默认 -1,即为不限度**/public HashedWheelTimer(ThreadFactory threadFactory,long tickDuration,TimeUnit unit,int ticksPerWheel,boolean leakDetection,long maxPendingTimeouts) {// 做局部参数的非空和合规验证ObjectUtil.checkNotNull(threadFactory, "threadFactory");ObjectUtil.checkNotNull(unit, "unit");ObjectUtil.checkPositive(tickDuration, "tickDuration");ObjectUtil.checkPositive(ticksPerWheel, "ticksPerWheel");// wheel 是一个 HashedWheelBucket 数组,是工作存储器,具体见 4.2wheel = createWheel(ticksPerWheel);mask = wheel.length - 1;// 获取实在的 tick 工夫,并测验合法性long duration = unit.toNanos(tickDuration);if (duration >= Long.MAX_VALUE / wheel.length) {throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d",tickDuration, Long.MAX_VALUE / wheel.length));}// MILLISECOND_NANOS = 1000000// duration 不得小于 1000000if (duration < MILLISECOND_NANOS) {logger.warn("Configured tickDuration {} smaller then {}, using 1ms.",tickDuration, MILLISECOND_NANOS);this.tickDuration = MILLISECOND_NANOS;} else {this.tickDuration = duration;}// 工作线程,具体见 4.3workerThread = threadFactory.newThread(worker);// 判断是否须要创立一个用于优雅敞开的弱援用对象leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;// 最大工作数this.maxPendingTimeouts = maxPendingTimeouts;// 工夫轮在一个 jvm 过程中的最大初始化数量不得大于 64 个,不然的话会在此处报错if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {reportTooManyInstances();}}
4.2 HashedWheelBucket
wheel 是一个 HashedWheelBucket 数组对象:
private final HashedWheelBucket[] wheel;
wheel 的初始化在 createWheel(...) 办法中进行:
// HashedWheelBucket.classprivate static HashedWheelBucket[] createWheel(int ticksPerWheel) {// ticks 是在创立 HashedWheelBucket 的时候输出的分片数// 分片数不可小于 0,或者大于 1073741824if (ticksPerWheel <= 0) {throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);}if (ticksPerWheel > 1073741824) {throw new IllegalArgumentException("ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);}// ticks 必须是 2 的倍数,所以此处会对使用者传入的 ticksPerWheel 进行修改ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);// 创立数组并初始化HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];for (int i = 0; i < wheel.length; i ++) {wheel[i] = new HashedWheelBucket();}// 返回return wheel;}
HashedWheelBucket 是 HashedWheelTimer 的外部类:
private static final class HashedWheelBucket {// HashedWheelBucket 实质上是 HashedWheelTimeout 的链表,而 HashedWheelTimeout 实质上是要执行的工作private HashedWheelTimeout head;private HashedWheelTimeout tail;// ...}
当使用者存入一个工作的时候,会调用到 HashedWheelBucket 的 addTimeout(...) 办法:
// 实质上是将工作放入到链表中public void addTimeout(HashedWheelTimeout timeout) {assert timeout.bucket == null;timeout.bucket = this;if (head == null) {head = tail = timeout;} else {tail.next = timeout;timeout.prev = tail;tail = timeout;}}
而取出工作要生产的时候,调用 pollTimeout() 办法:
// 实质上是将工作从链表中拿出private HashedWheelTimeout pollTimeout() {HashedWheelTimeout head = this.head;if (head == null) {return null;}HashedWheelTimeout next = head.next;if (next == null) {tail = this.head = null;} else {this.head = next;next.prev = null;}head.next = null;head.prev = null;head.bucket = null;return head;}
4.3 workerThread
workerThread 是一个线程对象:
private final Thread workerThread;
调用线程工厂的 newThread 办法实现初始化:
workerThread = threadFactory.newThread(worker);
worker 对象是一个 Worker 对象。Worker 是 HashedWheelTimer 的外部类,是 Runnable 的实现类:
private final class Worker implements Runnable {private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();private long tick;@Overridepublic void run() {// 记录开始工夫startTime = System.nanoTime();if (startTime == 0) {startTime = 1;}// CountDownLatch,确保先实现初始化之后才会塞入工作startTimeInitialized.countDown();do {// 在这个办法中线程会休眠,直到下一个 tick 的工夫final long deadline = waitForNextTick();if (deadline > 0) {int idx = (int) (tick & mask);// 清理曾经被勾销的 taskprocessCancelledTasks();// 选定以后要解决的 bucketHashedWheelBucket bucket = wheel[idx];// 将贮存在队列里的工作搁置到对应的 bucket 里transferTimeoutsToBuckets();// 执行以后 bucket 里所有的工作bucket.expireTimeouts(deadline);// 切换到下一个 ticktick++;}} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);// 后续代码用以优雅敞开for (HashedWheelBucket bucket: wheel) {bucket.clearTimeouts(unprocessedTimeouts);}for (;;) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {break;}if (!timeout.isCancelled()) {unprocessedTimeouts.add(timeout);}}processCancelledTasks();}// 略 ...}
5 增加工作
回到 demo 的增加工作的代码:
wheel.newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {System.out.println("test print " + System.currentTimeMillis());}},1000, TimeUnit.MILLISECONDS);
追踪 newTimeout(...) 办法:
// HashedWheelTimer.javapublic Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {// 非空验证ObjectUtil.checkNotNull(task, "task");ObjectUtil.checkNotNull(unit, "unit");// 工作数量,并与设定的最大值进行比拟long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {pendingTimeouts.decrementAndGet();throw new RejectedExecutionException("Number of pending timeouts ("+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "+ "timeouts (" + maxPendingTimeouts + ")");}// 如果是第一次增加工作,那么在此处会启动工作线程start();// 该当被执行的延迟时间long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;// 时钟回拨的状况if (delay > 0 && deadline < 0) {deadline = Long.MAX_VALUE;}// 将 task 包装成一个 timeout 对象HashedWheelTimeout timeout = new HashedWheelTimeout(this,task,deadline);// 放到队列里timeouts.add(timeout);return timeout;}
本文仅为集体的学习笔记,可能存在谬误或者表述不清的中央,有缘补充