概述
延时工作置信大家都不生疏,在事实的业务中利用场景能够说是亘古未有。例如订单下单 15 分钟未领取间接勾销,外卖超时主动赔付等等。这些状况下,咱们该怎么设计咱们的服务的实现呢?
笨一点的办法天然是定时工作去数据库进行轮询。然而当业务量较大,事件处理比拟费时的时候,咱们的零碎和数据库往往会面临微小的压力,如果采纳这种形式或者会导致数据库和零碎的解体。那么有什么好方法吗?明天我来为大家介绍几种实现延时工作的方法。
JAVA DelayQueue
你没看错,java 外部有内置延时队列,位于 java concurrent 包内。
DelayQueue
是一个 jdk 中自带的延时队列实现,他的实现依赖于可重入锁 ReentrantLock
以及条件锁 Condition
和优先队列PriorityQueue
。而且实质上他也是一个阻塞队列。那么他是如何实现延时成果的呢。
DelayQueue 的实现原理
首先 DelayQueue
队列中的元素必须继承一个接口叫做Delayed
,咱们找到这个类
public interface Delayed extends Comparable<Delayed> {long getDelay(TimeUnit unit);
}
发现这个类外部定义了一个返回值为 long
的办法getDelay
,这个办法用来定义队列中的元素的过期工夫,所有须要放在队列中的元素,必须实现这个办法。
而后咱们来看看提早队列的队列是如何操作的,咱们就拿最典型的 offer
和take
来看:
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();}
return true;
} finally {lock.unlock();
}
}
offer
操作平平无奇,甚至间接调用到了优先队列的 offer 来将队列依据延时进行排序,只不过加了个锁,做了些数据的调整,没有什么深刻的中央,然而 take
的实现看上去就很简单了。(留神,Dalaye
d 继承了 Comparable
办法,所以是能够间接用优先队列来排序的,只有你本人实现了 compareTo
办法)我尝试加了些正文让各位看得更明确些:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 自选操作
for (;;) {
// 获取队列第一个元素,如果队列为空
// 阻塞住直到有新元素退出队列,offer 等办法调用 signal 唤醒线程
E first = q.peek();
if (first == null)
available.await();
else {
// 如果队列中有元素
long delay = first.getDelay(NANOSECONDS);
// 判断延时工夫,如果到工夫了,间接取出数据并 return
if (delay <= 0)
return q.poll();
first = null;
// 如果 leader 为空则阻塞
if (leader != null)
available.await();
else {
// 获取以后线程
Thread thisThread = Thread.currentThread();
// 设置 leader 为以后线程
leader = thisThread;
try {
// 阻塞延时工夫
available.awaitNanos(delay);
} finally {if (leader == thisThread)
leader = null;
}
}
}
}
} finally {if (leader == null && q.peek() != null)
available.signal();
lock.unlock();}
}
咱们能够看到 take
的实现依附了有限自旋,直到第一个队列元素过了超时工夫后才会返回,否则期待他的只有被阻塞。
DelayQueue 实现延时队列的优缺点
看了源码后,咱们应该对 DelayQueue
的实现有了一个大抵的理解,也对他的优缺点有了肯定的了解。他的长处很显著:
- java 原生反对,不须要引入第三方工具
- 线程平安,即插即用使用方便
然而他的毛病也是很显著的:
- 不反对分布式,并且数据放在内存中,没有长久化的反对,服务宕机会失落数据
- 插入时应用的是优先队列的排序,工夫复杂度较高,并且对于队列中的工作不能很好的治理
所以有没有更好的延时队列的实现呢,咱们持续看上来~
工夫轮算法
工夫轮算法是一个被设计进去解决延时工作的算法,事实中的利用能够在 kafka
以及 netty
等我的项目中找到相似的实现。
工夫轮的具体实现
所谓工夫轮,顾名思义,他是一个相似于时钟的构造,即他的主构造是一个环形数组,如图:
环形数组中寄存的是一个一个的链表,链表中寄存着须要执行的工作,咱们设定好数组中执行的距离,假如咱们的环形数组的长度是 60,每个数组的执行距离为 1s,那么咱们会在每过 1s 就会执行数组下一个元素中的链表中的元素。如果只是这样,那么咱们将无奈解决 60 秒之外的延时工作,这显然不适合,所以咱们会在每个工作中加上一个参数 圈数,来表明工作会在几圈后执行。如果咱们有一个工作是在 150s 后执行,那么他应该在 30s 的地位,同时圈数应该为 2。咱们每次执行一个链表中的工作的时候会把当圈须要执行的工作取出执行,而后把他从链表中删除,如果工作不是当圈执行,则批改他的圈数,将圈数减 1,于是一个简略的工夫轮出炉了。
那么这样的工夫轮有什么优缺点呢?
先来说长处吧:
- 相比
DelayQueue
来说,工夫轮的插入更加的高效,工夫复杂度为 O(1) - 实现简略清晰,任务调度更加不便正当
当然他的毛病也不少:
- 他和
DelayQueue
一样不反对分布式,并且数据放在内存中,没有长久化的反对,服务宕机会失落数据 - 数组间的距离设置会影响工作的精度
- 因为不同圈数的工作会在同一个链表中,执行到每个数组元素时须要遍历所有的链表数据,效率会很低
进阶优化版工夫轮算法
方才提到了一些工夫轮算法的毛病,那么是不是有一些办法来进行下优化?这里我来介绍一下工夫轮的优化版本。
之前咱们提到不同圈数的工作会在同一个链表中被反复遍历影响效率,这种状况下咱们能够进行如下优化:将工夫轮进行分层
咱们能够看到图中,咱们采纳了多层级的设计,上图中分了三层,每层都是 60 格,第一个轮盘中的距离为 1 小时,咱们的数据每一次都是插入到这个轮盘中,每当这个轮盘通过一个小时起初到下一个刻度,就会取出其中的所有元素,依照延迟时间放入到第二个象征着分钟的轮盘中,以此类推。
这样的实现益处能够说是不言而喻的:
- 首先防止了当时间跨度较大时空间的节约
- 每一次达到刻度的时候咱们不必再像以前那样遍历链表取出须要的数据,而是能够一次性全副拿进去,大大节约了操作的工夫
工夫轮算法的利用
工夫轮算法可能在之前大家没有据说过,然而他在各个中央都有着不小的作用。linux 的定时器的实现中就有工夫轮的身影,同样如果你是一个爱好看源码的读者,你也可能会在 kafka
以及 netty
中找到他的实现。
kafka
kafka 中利用了工夫轮算法,他的实现和之前提到的进阶版工夫轮没有太大的区别,只有在一点上:kafka
外部实现的工夫轮利用到了DelayQueue
。
@nonthreadsafe
private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {private[this] val interval = tickMs * wheelSize
private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) {_ => new TimerTaskList(taskCounter) }
private[this] var currentTime = startMs - (startMs % tickMs)
@volatile private[this] var overflowWheel: TimingWheel = null
private[this] def addOverflowWheel(): Unit = {
synchronized {if (overflowWheel == null) {
overflowWheel = new TimingWheel(
tickMs = interval,
wheelSize = wheelSize,
startMs = currentTime,
taskCounter = taskCounter,
queue
)
}
}
}
def add(timerTaskEntry: TimerTaskEntry): Boolean = {
val expiration = timerTaskEntry.expirationMs
if (timerTaskEntry.cancelled) {false} else if (expiration < currentTime + tickMs) {false} else if (expiration < currentTime + interval) {
val virtualId = expiration / tickMs
val bucket = buckets((virtualId % wheelSize.toLong).toInt)
bucket.add(timerTaskEntry)
if (bucket.setExpiration(virtualId * tickMs)) {queue.offer(bucket)
}
true
} else {if (overflowWheel == null) addOverflowWheel()
overflowWheel.add(timerTaskEntry)
}
}
def advanceClock(timeMs: Long): Unit = {if (timeMs >= currentTime + tickMs) {currentTime = timeMs - (timeMs % tickMs)
if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
}
}
}
下面是 kafka 外部的实现(应用的语言是 scala),咱们能够看到实现十分的简洁,并且应用到了 DelayQueue
。咱们方才曾经探讨过了DelayQueue
的优缺点,查看源码后咱们曾经能够有一个大抵的论断了:DelayQueue
在 kafka 的工夫轮中的作用是负责推动工作的,为的就是避免在工夫轮中因为工作比拟稠密而造成的 ” 空推动 ”。DelayQueue
的触发机制能够很好的防止这一点,同时因为 DelayQueue
的插入效率较低,所以仅用于底层的推动,工作的插入由工夫轮来操作,两者配置,能够实现效率和资源的均衡。
netty
netty
的外部也有工夫轮的实现HashedWheelTimer
HashedWheelTimer
的实现要比 kafka 外部的实现简单许多,和 kafka 不同的是,它的外部推动不是依附的 DelayQueue
而是本人实现了一套,源码太长,有趣味的读者能够本人去看一下。
小结
工夫轮说了这么多,咱们能够看到他的效率是很出众的,然而还是有这么一个问题:他不反对分布式。当咱们的业务很简单,须要分布式的时候,工夫轮显得力不从心,那么这个时候有什么好一点的延时队列的抉择呢?咱们或者能够尝试应用第三方的工具
redis 延时队列
其实啊说起延时,咱们如果罕用 redis
的话,就会想起 redis
是存在过期机制的,那么咱们是否能够利用这个机制来实现一个延时队列呢?
redis
自带 key 的过期机制,而且能够设置过期后的回调办法。基于此个性,咱们能够非常容易就实现一个延时队列,工作进来时,设定定时工夫,并且配置好过期回调办法即可。
除了应用 redis
的过期机制之外,咱们也能够利用它自带的 zset
来实现延时队列。zset
反对高性能的排序,因而咱们工作进来时能够将工夫戳作为排序的根据,以此将工作的执行先后进行有序的排列,这样也能实现延时队列。
zset
实现延时队列的益处:
- 反对高性能排序
redis
自身的高可用和高性能以及持久性
mq 延时队列
rocketmq 延时音讯
rocketmq
人造反对延时音讯,他的延时音讯分为 18 个等级,每个等级对应不同的延时工夫。
那么他的原理是怎么的呢?
rocketmq
的 broker
收到音讯后会将音讯写入commitlog
,并且判断这个音讯是否是延时音讯(即 delay 属性是否大于 0),之后如果判断的确是延时音讯,那么他不会马上写入,而是通过转发的形式将音讯放入对应的延时topic
(18 个延时级别对应 18 个topic
)
rocketmq
会有一个定时工作进行轮询,如果工作的延迟时间曾经到了就发往指定的topic
。
这个设计比拟的简略粗犷,然而毛病也非常显著:
- 延时是固定的,如果想要的提早超出 18 个级别就没方法实现
- 无奈实现精准延时,队列的沉积等等状况也会导致执行产生误差
rocketmq 的精准延时音讯
rocketmq
自身是不反对的准确提早的,他的商业版本 ons
倒是反对。不过 rocketmq
的社区中有相应的解决方案。计划是借助于工夫轮算法来实现的,感兴趣的敌人能够自行去社区查看。(社区中的一些未被合并的 pr 是不错的实现参考)
总结
延时队列的实现千千万,然而如果要在生产中大规模应用,那么大部分状况下其实都避不开工夫轮算法。改良过的工夫轮算法能够做到精准延时,长久化,高性能,高可用性,堪称是完满。然而话又说回来,其余的延时形式就无用了吗?其实不是的,所有的形式都是须要匹配本人的应用场景。如果你是极少量数据的轮询,那么定时轮询数据库或者才是最佳的解决方案,而不是无脑的引入简单的延时队列。如果是单机的工作,那么 jdk 的延时队列也是不错的抉择。
本文介绍的这些延时队列只是为了向大家展现他们的原理和优缺点,具体的应用还须要联合本人业务的场景。