共计 7958 个字符,预计需要花费 20 分钟才能阅读完成。
Timer 和 ScheduledExecutorService 是 JDK 内置的定时工作计划,而业内还有一个经典的定时工作的设计叫工夫轮 (Timing Wheel), Netty 外部基于工夫轮实现了一个 HashedWheelTimer 来优化百万量级 I / O 超时的检测,它是一个高性能,低消耗的数据结构,它适宜用非准实时,提早的短平快工作,例如心跳检测。本文次要介绍工夫轮(Timing Wheel) 及其应用。@pdai
-
SpringBoot 定时工作 – Netty HashedWheelTimer 形式
-
常识筹备
- 什么是工夫轮(Timing Wheel)
- Netty 的 HashedWheelTimer 要解决什么问题
- HashedWheelTimer 的应用形式
-
实现案例
- Pom 依赖
- 2 个简略例子
-
进一步了解
- HashedWheelTimer 是如何实现的?
- 什么是多级 Timing Wheel?
- 示例源码
-
常识筹备
须要对工夫轮(Timing Wheel),以及 Netty 的 HashedWheelTimer 要解决什么问题有初步的意识。
什么是工夫轮(Timing Wheel)
工夫轮 (Timing Wheel) 是 George Varghese 和 Tony Lauck 在 1996 年的论文 ’Hashed and Hierarchical Timing Wheels: data structures to efficiently implement a timer facility’ 实现的,它在 Linux 内核中应用宽泛,是 Linux 内核定时器的实现办法和根底之一。
工夫轮 (Timing Wheel) 是一种环形的数据结构,就像一个时钟能够分成很多格子(Tick),每个格子代表工夫的距离,它指向存储的具体任务(timerTask)的一个链表。
以上述在论文中的图片例子,这里一个轮子蕴含 8 个格子(Tick), 每个 tick 是一秒钟;
工作的增加:如果一个工作要在 17 秒后执行,那么它须要转 2 轮,最终加到 Tick= 1 地位的链表中。
工作的执行:在时钟转 2Round 到 Tick= 1 的地位,开始执行这个地位指向的链表中的这个工作。(# 这里示意残余须要转几轮再执行这个工作)
Netty 的 HashedWheelTimer 要解决什么问题
HashedWheelTimer 是 Netty 依据工夫轮 (Timing Wheel) 开发的工具类,它要解决什么问题呢?这外面有两个要点:提早工作 + 低时效性。@pdai
在 Netty 中的一个典型利用场景是判断某个连贯是否 idle,如果 idle(如客户端因为网络起因导致到服务器的心跳无奈送达),则服务器会被动断开连接,开释资源。判断连贯是否 idle 是通过定时工作实现的,然而 Netty 可能维持数百万级别的长连贯,对每个连贯去定义一个定时工作是不可行的,所以如何晋升 I / O 超时调度的效率呢?
Netty 依据工夫轮 (Timing Wheel) 开发了 HashedWheelTimer 工具类,用来优化 I / O 超时调度 (实质上是提早工作);之所以采纳工夫轮(Timing Wheel) 的构造还有一个很重要的起因是 I / O 超时这种类型的工作对时效性不须要十分精准。
HashedWheelTimer 的应用形式
在理解工夫轮(Timing Wheel)和 Netty 的 HashedWheelTimer 要解决的问题后,咱们看下 HashedWheelTimer 的应用形式
通过构造函数看主要参数
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts, Executor taskExecutor) {}
具体参数阐明如下:
threadFactory
:线程工厂,用于创立工作线程,默认是 Executors.defaultThreadFactory()tickDuration
:tick 的周期,即多久 tick 一次unit
: tick 周期的单位ticksPerWheel
:工夫轮的长度,一圈下来有多少格leakDetection
:是否开启内存透露检测,默认是 truemaxPendingTimeouts
:最多执行的工作数,默认是 -1,即不限度。在高并发量状况下才会设置这个参数。
实现案例
这里展现下 HashedWheelTimer 的根本应用案例。@pdai
Pom 依赖
引入 pom 的依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.77.Final</version>
</dependency>
2 个简略例子
例子 1:5 秒后执行 TimerTask
@SneakyThrows
public static void simpleHashedWheelTimer() {log.info("init task 1...");
HashedWheelTimer timer = new HashedWheelTimer(1, TimeUnit.SECONDS, 8);
// add a new timeout
timer.newTimeout(timeout -> {log.info("running task 1...");
}, 5, TimeUnit.SECONDS);
}
执行后果如下:
23:32:21.364 [main] INFO tech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester - init task 1...
...
23:32:27.454 [pool-1-thread-1] INFO tech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester - running task 1...
例子 2:工作生效后 cancel 并让它从新在 3 秒后执行。
@SneakyThrows
public static void reScheduleHashedWheelTimer() {log.info("init task 2...");
HashedWheelTimer timer = new HashedWheelTimer(1, TimeUnit.SECONDS, 8);
Thread.sleep(5000);
// add a new timeout
Timeout tm = timer.newTimeout(timeout -> {log.info("running task 2...");
}, 5, TimeUnit.SECONDS);
// cancel
if (!tm.isExpired()) {log.info("cancel task 2...");
tm.cancel();}
// reschedule
timer.newTimeout(tm.task(), 3, TimeUnit.SECONDS);
}
23:28:36.408 [main] INFO tech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester - init task 2...
23:28:41.412 [main] INFO tech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester - cancel task 2...
23:28:45.414 [pool-2-thread-1] INFO tech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester - running task 2...
进一步了解
咱们通过如下问题进一步了解 HashedWheelTimer。@pdai
HashedWheelTimer 是如何实现的?
简略看下 HashedWheelTimer 是如何实现的
Worker
:worker 工作线程次要负责任务调度触发,单线程运行。HashedWheelBucket
:工夫轮下面的格子,外部持有 HashedWheelTimeout 组成的链表构造的头尾节点,多个格子组成的工夫轮造成一圈又一圈的工作环HashedWheelTimeout
:往工夫轮外面提交的工作会被封装成 HashedWheelTimeout
构造函数
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts, Executor taskExecutor) {checkNotNull(threadFactory, "threadFactory");
checkNotNull(unit, "unit");
checkPositive(tickDuration, "tickDuration");
checkPositive(ticksPerWheel, "ticksPerWheel");
this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
// Normalize ticksPerWheel to power of two and initialize the wheel.
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
// Convert tickDuration to nanos.
long duration = unit.toNanos(tickDuration);
// Prevent overflow.
if (duration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}
if (duration < MILLISECOND_NANOS) {logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",
tickDuration, MILLISECOND_NANOS);
this.tickDuration = MILLISECOND_NANOS;
} else {this.tickDuration = duration;}
workerThread = threadFactory.newThread(worker);
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
this.maxPendingTimeouts = maxPendingTimeouts;
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {reportTooManyInstances();
}
}
创立 wheel
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
//ticksPerWheel may not be greater than 2^30
checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel");
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i ++) {wheel[i] = new HashedWheelBucket();}
return wheel;
}
private static int normalizeTicksPerWheel(int ticksPerWheel) {
int normalizedTicksPerWheel = 1;
while (normalizedTicksPerWheel < ticksPerWheel) {normalizedTicksPerWheel <<= 1;}
return normalizedTicksPerWheel;
}
工作的增加
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {checkNotNull(task, "task");
checkNotNull(unit, "unit");
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending"
+ "timeouts (" + maxPendingTimeouts + ")");
}
start();
// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// Guard against overflow.
if (delay > 0 && deadline < 0) {deadline = Long.MAX_VALUE;}
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
执行办法
/**
* Starts the background thread explicitly. The background thread will
* start automatically on demand even if you did not call this method.
*
* @throws IllegalStateException if this timer has been
* {@linkplain #stop() stopped} already
*/
public void start() {switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// Wait until the startTime is initialized by the worker.
while (startTime == 0) {
try {startTimeInitialized.await();
} catch (InterruptedException ignore) {// Ignore - it will be ready very soon.}
}
}
进行办法
@Override
public Set<Timeout> stop() {if (Thread.currentThread() == workerThread) {
throw new IllegalStateException(HashedWheelTimer.class.getSimpleName() +
".stop() cannot be called from" +
TimerTask.class.getSimpleName());
}
if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
// workerState can be 0 or 2 at this moment - let it always be 2.
if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {boolean closed = leak.close(this);
assert closed;
}
}
return Collections.emptySet();}
try {
boolean interrupted = false;
while (workerThread.isAlive()) {workerThread.interrupt();
try {workerThread.join(100);
} catch (InterruptedException ignored) {interrupted = true;}
}
if (interrupted) {Thread.currentThread().interrupt();}
} finally {INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {boolean closed = leak.close(this);
assert closed;
}
}
return worker.unprocessedTimeouts();}
什么是多级 Timing Wheel?
多级的工夫轮是比拟好了解的,时钟是有小时,分钟,秒的,秒转一圈 (Round) 分钟就转一个格(Tick), 分钟转一圈 (Round) 小时就转一格(Tick)。
PS:显然 HashedWheelTimer 是一层工夫轮。
示例源码
https://github.com/realpdai/t…
更多内容
辞别碎片化学习,无套路一站式体系化学习后端开发: Java 全栈常识体系 https://pdai.tech