一、工夫轮简介
1.1 为什么要应用工夫轮
在平时开发中,常常会与定时工作打交道。上面举几个定时工作解决的例子。
1)心跳检测 。在 Dubbo 中,须要有心跳机制来维持 Consumer 与 Provider 的长连贯,默认的心跳距离是 60s。当 Provider 在 3 次心跳工夫内没有收到心跳响应,会敞开连贯通道。当 Consumer 在 3 次心跳工夫内没有收到心跳响应,会进行重连。Provider 侧和 Consumer 侧的心跳检测机制都是通过定时工作实现的,而且是本篇文章要剖析的工夫轮 HashedWheelTimer 解决的。
2)超时解决 。在 Dubbo 中发动 RPC 调用时,通常会配置超时工夫,当消费者调用服务提供者呈现超时进行肯定的逻辑解决。那么怎么检测工作调用超时了呢?咱们能够利用定时工作,每次创立一个 Future,记录这个 Future 的创立工夫与超时工夫,后盾有一个定时工作进行检测,当 Future 达到超时工夫并且没有被解决时,就须要对这个 Future 执行超时逻辑解决。
3)Redisson 分布式锁续期 。在分布式锁解决中,通常会指定分布式锁的超时工夫,同样会在 finally 块里开释分布式锁。然而有一个问题时,通常分布式锁的超时工夫不好判断,如果设置短了业务却没执行实现就把锁开释掉了,或者超时工夫设置很长,同样也会存在一些问题。Redisson 提供了一种看门狗机制,通过工夫轮定时给分布式锁续期,也就是缩短分布式锁的超时工夫。
能够看到,上述几个例子都与定时工作无关,那么传统的定时工作有什么毛病呢?为什么要应用工夫轮来实现?
如果应用一般的定时工作解决机制来解决例 2)中的超时状况:
1)简略地,能够针对每一次申请创立一个线程,而后 Sleep 到超时工夫,之后若判断超时则进行超时逻辑解决。存在的问题是如果面临是高并发申请,针对每个申请都要去创立线程,这样太消耗资源了。
2)针对计划 1 的有余,能够改成一个线程来解决所有的定时工作,比方这个线程能够每隔 50ms 扫描所有须要解决的超时工作,如果发现有超时工作,则进行解决。然而,这样也存在一个问题,可能一段时间内都没有工作达到超时工夫,那么就让 CPU 多了很多无用的轮询遍历操作。
针对上述计划的有余,能够采纳工夫轮来进行解决。上面先来简略介绍下工夫轮的概念。
1.2 单层工夫轮
咱们先以单层工夫轮为例,假如工夫轮的周期是 1 秒,工夫轮中有 10 个槽位,则每个槽位代表 100ms。假如咱们当初有 3 个工作,别离是工作 A(220ms 后执行)、B(410ms 之后运行)、C(1930ms 之后运行)。则这三个工作在工夫轮所处的槽位如下图,能够看到工作 A 被放到了槽位 2,工作 B 被放到了槽位 4,工作 C 被放到了槽位 9。
当工夫轮转动到对应的槽时,就会从槽中取出工作判断是否须要执行。同时能够发现有一个残余周期的概念,这是因为工作 C 的执行工夫为 1930ms,超过了工夫轮的周期 1 秒,所以能够标记它的残余周期为 1,当工夫轮第一次转动到它的地位时,发现它的残余周期为 1,示意还没有到要解决的工夫,将残余周期减 1,工夫轮持续转动,当下一次转动到 C 工作地位时,发现残余周期为 0,示意工夫到了须要解决该定时工作了。Dubbo 中采纳的就是这种单层工夫轮机制。
1.3 多层工夫轮
既然有单层工夫轮,那么自然而然能够想到利用多层工夫轮来解决上述工作执行工夫超出工夫轮周期的状况。上面以两层工夫轮为例,第一层工夫轮周期为 1 秒,第二层工夫轮周期为 10 秒。
还是以上述 3 个工作为例,能够看到工作 A 和 B 散布在第一层工夫轮上,而工作 C 散布在第二层工夫轮的槽 1 处。当第一层工夫轮转动时,工作 A 和工作 B 会被先后执行。1 秒钟之后,第一层工夫轮实现了一个周期转动。从新开始第 0 跳,这时第二层工夫轮从槽 0 跳到了槽 1 处,将槽 1 处的工作,也就是工作 C 取出放入到第一层工夫轮的槽位 9 处,当第一层工夫轮转动到槽位 9 处,工作 C 就会被执行。这种将第二层的工作取出放入第一层中称为降级,它是为了保障工作被解决的工夫精度。Kafka 外部就是采纳的这种多层工夫轮机制。
二、工夫轮原理
上面先来看一下 Dubbo 中的工夫轮的构造,能够看到,它和时钟很像,它被划分成了一个个 Bucket,每个 Bucket 有一个头指针和尾指针,别离指向双向链表的头节点和尾节点,双向链表中存储的就是要解决的工作。工夫轮不停转动,当指向 Bucket0 所负责保护的双向链表时,就将它所存储的工作遍历取出来解决。
上面咱们先来介绍下 Dubbo 中工夫轮 HashedWheelTimer 所波及到的一些外围概念,在解说完这些外围概念之后,再来对工夫轮的源码进行剖析。
2.1 TimerTask
在 Dubbo 中,TimerTask 封装了要执行的工作,它就是上图双向链表中节点所封装的工作。所有的定时工作都须要继承 TimerTask 接口。如下图,能够看到 Dubbo 中的心跳工作 HeartBeatTask、注册失败重试工作 FailRegisteredTask 等都实现了 TimerTask 接口。
public interface TimerTask {void run(Timeout timeout) throws Exception;
}
2.2 Timeout
TimerTask 中 run 办法的入参是 Timeout,Timeout 与 TimerTask 一一对应,Timeout 的惟一实现类 HashedWheelTimeout 中就封装了 TimerTask 属性,能够了解为 HashedWheelTimeout 就是上述双向链表的一个节点,因而它也蕴含两个 HashedWheelTimeout 类型的指针,别离指向以后节点的上一个节点和下一个节点。
public interface Timeout {
// Timer 就是定时器, 也就是 Dubbo 中的工夫轮
Timer timer();
// 获取该节点要执行的工作
TimerTask task();
// 判断该节点封装的工作有没有过期、被勾销
boolean isExpired();
boolean isCancelled();
// 勾销该节点的工作
boolean cancel();}
HashedWheelTimeout 是 Timeout 的惟一实现,它的作用有两个:
- 它是工夫轮槽所保护的双向链表的节点,其中封装了理论要执行的工作 TimerTask。
- 通过它能够查看定时工作的状态、对定时工作进行勾销、从双向链表中移除等操作。
上面来看一下 Timeout 的实现类 HashedWheelTimeout 的外围字段与实现。
1) int ST_INIT = 0、int ST_CANCELLED = 1、int ST_EXPIRED = 2
HashedWheelTimeout 里定义了三种状态,别离示意工作的初始化状态、被勾销状态、已过期状态
2) STATE_UPDATER
用于更新定时工作的状态
3) HashedWheelTimer timer
指向工夫轮对象
4) TimerTask task
理论要执行的工作
5) long deadline
指定时工作执行的工夫,这个工夫是在创立 HashedWheelTimeout 时指定的
计算公式是: currentTime(创立 HashedWheelTimeout 的工夫) + delay(工作延迟时间)
- startTime(HashedWheelTimer 的启动工夫),工夫单位为纳秒
6) int state = ST_INIT
工作初始状态
7) long remainingRounds
指当前任务残余的时钟周期数. 工夫轮所能示意的工夫长度是无限的,在工作到期工夫与以后时刻
的时间差超过工夫轮单圈能示意的时长,就呈现了套圈的状况,须要该字段值示意残余的时钟周期
8) HashedWheelTimeout next、HashedWheelTimeout prev
别离对应以后定时工作在链表中的前驱节点和后继节点,这也验证了工夫轮中每个槽所对应的工作链表是
一个双链表
9) HashedWheelBucket bucket
工夫轮中的一个槽,对应工夫轮圆圈的一个个小格子,每个槽保护一个双向链表,当工夫轮指针转到以后
槽时,就会从槽所负责的双向链表中取出工作进行解决
HashedWheelTimeout 提供了 remove 操作,能够从双向链表中移除以后本身节点,并将以后工夫轮所保护的定时工作数量减一。
void remove() {
// 获取当前任务属于哪个槽
HashedWheelBucket bucket = this.bucket;
if (bucket != null) {
// 从槽中移除本人,也就是从双向链表中移除节点,// 剖析 bucket 的办法时会剖析
bucket.remove(this);
} else {
// pendingTimeouts 示意以后工夫轮所保护的定时工作的数量
timer.pendingTimeouts.decrementAndGet();}
}
HashedWheelTimeout 提供了 cancel 操作,能够勾销工夫轮中的定时工作。当定时工作被勾销时,它会首先被暂存到 canceledTimeouts 队列中。在工夫轮转动到槽进行工作解决之前和工夫轮退出运行时都会调用 cancel,而 cancel 会调用 remove,从而清理该队列中被勾销的定时工作。
@Override
public boolean cancel() {
// 通过 CAS 进行状态变更
if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {return false;}
// 工作被勾销时,工夫轮会将它暂存到工夫轮所保护的 canceledTimeouts 队列中.
// 在工夫轮转动到槽进行工作解决之前和工夫轮退出运行时都会调用 cancel,而
// cancel 会调用 remove,从而清理该队列中被勾销的定时工作
timer.cancelledTimeouts.add(this);
return true;
}
HashedWheelTimeout 提供了 expire 操作,当工夫轮指针转动到某个槽时,会遍历该槽所保护的双向链表,判断节点的状态,如果发现工作已到期,会通过 remove 办法移除,而后调用 expire 办法执行该定时工作。
public void expire() {
// 批改定时工作状态为已过期
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {return;}
try {
// 真正的执行定时工作所要代表的逻辑
task.run(this);
} catch (Throwable t) {
// 打印日志,能够看到当工夫轮中定时工作执行异样时,// 不会抛出异样,影响到工夫轮中其余定时工作执行
}
}
2.3 HashedWheelBucket
后面也介绍过了,它是工夫轮中的槽,它外部保护了双向链表的首尾指针。上面咱们来看一下它外部的外围资源和实现。
1) HashedWheelTimeout head、HashedWheelTimeout tail
指向该槽所保护的双向链表的首节点和尾节点
HashedWheelBucket 提供了 addTimeout 办法,用于增加工作到双向链表的尾节点。
void addTimeout(HashedWheelTimeout timeout) {
// 增加之前判断一下该工作以后没有被被关联到一个槽上
assert timeout.bucket == null;
timeout.bucket = this;
if (head == null) {head = tail = timeout;} else {
tail.next = timeout;
timeout.prev = tail;
tail = timeout;
}
}
HashedWheelBucket 提供了 remove 办法,用于从双向链表中删除指定节点。外围逻辑如下图所示,依据要删除的节点找到其前置节点和后置节点,而后别离调整前置节点的 next 指针和后置节点的 prev 指针。删除过程中须要思考一些边界状况。删除之后将 pendingTimeouts,也就是以后工夫轮的待处理工作数减一。remove 代码逻辑较简略,这边就不贴代码了。
HashedWheelBucket 提供了 expireTimeouts 办法,当工夫轮指针转动到某个槽时,通过该办法解决该槽上双向链表的定时工作,分为 3 种状况:
- 定时工作已到期,则会通过 remove 办法取出,并调用其 expire 办法执行工作逻辑。
- 定时工作已被勾销,则通过 remove 办法取出间接抛弃。
- 定时工作还未到期,则会将 remainingRounds(残余时钟周期)减一。
void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// 工夫轮指针转到某个槽时从双向链表头节点开始遍历
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
// remainingRounds <= 0 示意到期了
if (timeout.remainingRounds <= 0) {
// 从链表中移除该节点
next = remove(timeout);
// 判断该定时工作的确是到期了
if (timeout.deadline <= deadline) {
// 执行该工作
timeout.expire();} else {// 抛异样}
} else if (timeout.isCancelled()) {
// 工作被勾销,移除后间接抛弃
next = remove(timeout);
} else {
// 残余时钟周期减一
timeout.remainingRounds--;
}
// 持续判断下一个工作节点
timeout = next;
}
}
HashedWheelBucket 也提供了 clearTimeouts 办法,该办法会在工夫轮进行的时候被应用,它会遍历并移除所有双向链表中的节点,并返回所有未超时和未被勾销的工作。
2.4 Worker
Worker 实现了 Runnable 接口,工夫轮外部通过 Worker 线程来解决放入工夫轮中的定时工作。上面先来看一下它的外围字段和 run 办法逻辑。
1) Set<Timeout> unprocessedTimeouts
当工夫轮进行时,用于寄存工夫轮中未过期的和未被勾销的工作
2) long tick
工夫轮指针,指向工夫轮中某个槽,当工夫轮转动时该 tick 会自增
public void run() {
// 初始化 startTime, 所有工作的的 deadline 都是绝对于这个工夫点
startTime = System.nanoTime();
// 唤醒阻塞在 start() 的线程
startTimeInitialized.countDown();
// 只有工夫轮的状态为 WORKER_STATE_STARTED, 就循环的转动 tick,
// 解决槽中的定时工作
do {
// 判断是否到了解决槽的工夫了,还没到则 sleep 一会
final long deadline = waitForNextTick();
if (deadline > 0) {
// 获取 tick 对应的槽索引
int idx = (int) (tick & mask);
// 清理用户被动勾销的定时工作, 这些定时工作在用户勾销时,
// 会记录到 cancelledTimeouts 队列中. 在每次指针转动
// 的时候, 工夫轮都会清理该队列
processCancelledTasks();
// 依据以后指针定位对应槽
HashedWheelBucket bucket = wheel[idx];
// 将缓存在 timeouts 队列中的定时工作转移到工夫轮中对应的槽中
transferTimeoutsToBuckets();
// 解决该槽位的双向链表中的定时工作
bucket.expireTimeouts(deadline);
tick++;
}
// 检测时间轮的状态, 如果工夫轮处于运行状态, 则循环执行上述步骤,
// 一直执行定时工作
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this)
== WORKER_STATE_STARTED);
// 这里应该是工夫轮进行了, 革除所有槽中的工作, 并退出到未解决工作列表,
// 以供 stop() 办法返回
for (HashedWheelBucket bucket : wheel) {bucket.clearTimeouts(unprocessedTimeouts);
}
// 将还没有退出到槽中的待处理定时工作队列中的工作取出, 如果是未勾销
// 的工作, 则退出到未解决工作队列中, 以供 stop() 办法返回
for (; ;) {HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {break;}
if (!timeout.isCancelled()) {unprocessedTimeouts.add(timeout);
}
}
// 最初再次清理 cancelledTimeouts 队列中用户被动勾销的定时工作
processCancelledTasks();}
上面对 run 办法中波及到的一些办法进行介绍:
1)waitForNextTick
逻辑比较简单,它会判断有没有达到解决下一个槽工作的工夫了,如果还没有达到则 sleep 一会。
2)processCancelledTasks
遍历 cancelledTimeouts,获取被勾销的工作并从双向链表中移除。
private void processCancelledTasks() {for (; ;) {HashedWheelTimeout timeout = cancelledTimeouts.poll();
if (timeout == null) {
// all processed
break;
}
timeout.remove();}
}
3)transferTimeoutsToBuckets
当调用 newTimeout 办法时,会先将要解决的工作缓存到 timeouts 队列中,等工夫轮指针转动时对立调用 transferTimeoutsToBuckets 办法解决,将工作转移到指定的槽对应的双向链表中,每次转移 10 万个,免得阻塞工夫轮线程。
private void transferTimeoutsToBuckets() {
// 每次 tick 只解决 10w 个工作, 免得阻塞 worker 线程
for (int i = 0; i < 100000; i++) {HashedWheelTimeout timeout = timeouts.poll();
// 没有工作了间接跳出循环
if (timeout == null) {
// all processed
break;
}
// 还没有放入到槽中就勾销了, 间接略过
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {continue;}
// 计算工作须要通过多少个 tick
long calculated = timeout.deadline / tickDuration;
// 计算工作的轮数
timeout.remainingRounds = (calculated - tick) / wheel.length;
// 如果工作在 timeouts 队列外面放久了, 以至于曾经过了执行工夫, 这个时候
// 就应用以后 tick, 也就是放到以后 bucket, 此办法调用完后就会被执行.
final long ticks = Math.max(calculated, tick);
int stopIndex = (int) (ticks & mask);
// 将工作退出到相应的槽中
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}
2.5 HashedWheelTimer
最初,咱们来剖析工夫轮 HashedWheelTimer,它实现了 Timer 接口,提供了 newTimeout 办法能够向工夫轮中增加定时工作,该工作会先被暂存到 timeouts 队列中,等工夫轮转动到某个槽时,会将该 timeouts 队列中的工作转移到某个槽所负责的双向链表中。它还提供了 stop 办法用于终止工夫轮,该办法会返回工夫轮中未解决的工作。它也提供了 isStop 办法用于判断工夫轮是否终止了。
先来看一下 HashedWheelTimer 的外围字段。
1) HashedWheelBucket[] wheel
该数组就是工夫轮的环形队列,数组每个元素都是一个槽,一个槽负责保护一个双向链表,用于存储定时
工作。它会被在构造函数中初始化,当指定为 n 时,它实际上会取最靠近 n 的且为 2 的幂次方值。2) Queue<HashedWheelTimeout> timeouts
timeouts 用于缓存内部向工夫轮提交的定时工作
3) Queue<HashedWheelTimeout> cancelledTimeouts
cancelledTimeouts 用于暂存被勾销的定时工作,工夫轮会在解决槽负责的双向链表之前,先解决这两
个队列中的数据。4) Worker worker
工夫轮解决定时工作的逻辑
5) Thread workerThread
工夫轮解决定时工作的线程
6) AtomicLong pendingTimeouts
工夫轮残余的待处理的定时工作数量
7) long tickDuration
工夫轮每个槽所代表的工夫长度
8) int workerState
工夫轮状态,可选值有 init、started、shut down
上面来看一下工夫轮的构造函数,用于初始化一个工夫轮。首先它会对传入参数 ticksPerWheel 进行转换解决,返回大于该值的 2 的幂次方,它示意工夫轮上有多少个槽,默认是 512 个。而后创立大小为该值的 HashedWheelBucket[] 数组。接着通过传入的 tickDuration 对工夫轮的 tickDuration 赋值,默认是 100ms。节通过 threadFactory 创立 workerThread 工作线程,该线程就是负责解决工夫轮中的定时工作的线程。
public HashedWheelTimer(ThreadFactory threadFactory,
long tickDuration, TimeUnit unit,
int ticksPerWheel,
long maxPendingTimeouts) {
// 圆环上一共有多少个工夫距离, HashedWheelTimer 对其正则化
// 将其换算为大于等于该值的 2^n
wheel = createWheel(ticksPerWheel);
// 这用来疾速计算工作应该呆的槽
mask = wheel.length - 1;
// 工夫轮每个槽的工夫距离
this.tickDuration = unit.toNanos(tickDuration);
// threadFactory 是创立线程的线程工厂对象
workerThread = threadFactory.newThread(worker);
// 最多容许多少个工作期待执行
this.maxPendingTimeouts = maxPendingTimeouts;
}
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
// 计算真正该当创立多少个槽
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
// 初始化工夫轮数组
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i++) {wheel[i] = new HashedWheelBucket();}
return wheel;
}
初始化工夫轮之后,就能够向其中提交定时工作了,能够通过工夫轮提供的 newTimeout 办法来实现。首先将待处理的工作数量加 1,而后启动工夫轮线程,这时 worker 的 run 办法就会被系统调度运行。而后将该定时工作封装成 HashedWheelTimeout 退出到 timeouts 队列中。start 之后,工夫轮就开始运行起来了,直到外界调用 stop 办法终止退出。
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
// 待处理的工作数量加 1
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
// 启动工夫轮
start();
// 计算该定时工作的 deadline
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// 创立一个 HashedWheelTimeout 对象,它首先会被暂存到 timeouts 队列中
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
public void start() {
/**
* 判断以后工夫轮的状态
* 1) 如果是初始化, 则启动 worker 线程, 启动整个工夫轮
* 2) 如果曾经启动则略过
* 3) 如果是曾经进行, 则报错
*/
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
// 应用 cas 来判断启动工夫轮
if (WORKER_STATE_UPDATER.compareAndSet(this,
WORKER_STATE_INIT, WORKER_STATE_STARTED)) {workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
// 抛异样
default:
throw new Error("Invalid WorkerState");
}
// 期待 worker 线程初始化工夫轮的启动工夫
while (startTime == 0) {
try {
// 这里应用 countDownLatch 来确保调度的线程曾经被启动
startTimeInitialized.await();} catch (InterruptedException ignore) {// Ignore - it will be ready very soon.}
}
}
三、工夫轮利用
到这里,Dubbo 中的工夫轮原理就剖析完了。接下来响应本文结尾的三个例子,联合它们来剖析下工夫轮在 Dubbo 或 Redisson 中是如何应用的。
1)HeartbeatTimerTask
在 Dubbo 的 HeaderExchangeClient 类中会向工夫轮中提交该心跳工作。
private void startHeartBeatTask(URL url) {
// Client 的具体实现决定是否启动该心跳工作
if (!client.canHandleIdle()) {
AbstractTimerTask.ChannelProvider cp =
() -> Collections.singletonList(HeaderExchangeClient.this);
// 计算心跳距离, 最小距离不能低于 1s
int heartbeat = getHeartbeat(url);
long heartbeatTick = calculateLeastDuration(heartbeat);
// 创立心跳工作
this.heartBeatTimerTask =
new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
// 提交到 IDLE_CHECK_TIMER 这个工夫轮中期待执行, 等工夫到了工夫轮就会去取出该工作进行调度执行
IDLE_CHECK_TIMER.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
}
}
// 下面用到的 IDLE_CHECK_TIMER 就是咱们本文的剖析的工夫轮
private static final HashedWheelTimer IDLE_CHECK_TIMER =
new HashedWheelTimer(new NamedThreadFactory("dubbo-client-idleCheck", true), 1, TimeUnit.SECONDS, TICKS_PER_WHEEL);
// 上述创立心跳工作时, 创立了一个 HeartbeatTimerTask 对象, 能够看下该工作具体要做什么
@Override
protected void doTask(Channel channel) {
try {
// 获取最初一次读写工夫
Long lastRead = lastRead(channel);
Long lastWrite = lastWrite(channel);
if ((lastRead != null && now() - lastRead > heartbeat)
|| (lastWrite != null && now() - lastWrite > heartbeat)) {
// 最初一次读写工夫超过心跳工夫, 就会发送心跳申请
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
// 表明它是一个心跳申请
req.setEvent(HEARTBEAT_EVENT);
channel.send(req);
}
} catch (Throwable t) {}}
2)Redisson 锁续期机制
当获取锁胜利后,Redisson 会封装一个锁续期工作放入工夫轮中,默认 10s 检查一下,用于对获取到的锁进行续期,缩短持有锁的工夫。如果业务机器宕机了,那么该续期的定时工作也就没法跑了,就没法续期了,那等加锁工夫到了锁就主动开释了。逻辑封装在 RedissonLock 中的 renewExpiration() 办法中。
private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {return;}
// 这边 newTimeout 点进去发现就是往工夫轮中提交了一个工作
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {return;}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {return;}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock "+ getName() +" expiration", e);
return;
}
if (res) {
// 续期胜利后持续调度, 又往工夫轮中放一个续期工作
renewExpiration();}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
// 通过 lua 脚本对锁进行续期
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return 1;" +
"end;" +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
3)超时重试
应用形式和 HeartbeatTimerTask 形式相似,读者能够本人入手去剖析下它是在哪里被引入的。
四、总结
在本篇文章中,先是举了 3 个例子来阐述为什么须要应用工夫轮,应用工夫轮的长处,在文末处也别离对这 3 个例子在 Dubbo 或 Redisson 中的应用做了介绍。接着通过画图解说了单层工夫轮与多层工夫轮机制,让读者对工夫轮算法有了一个简略的意识。在第二局部,顺次解说了 Dubbo 工夫轮中波及到的 TimerTask、Timeout、HashedWheelBucket、Worker、HashedWheelTimer,剖析了它们的原理与源码实现。
作者:vivo 互联网服务器团队 -Li Wanghong