共计 3549 个字符,预计需要花费 9 分钟才能阅读完成。
提早队列是这样一种队列:元素有一个过期工夫,元素在队列中程序是依照过期工夫排序的,只有达到过期工夫的元素能力出队,最先达到过期工夫的元素最先出队。
提早队列能够用到这样的场景:比方有一个工作须要 10 分钟后执行,那么就能够把这个工作存入提早队列,并且设置过期工夫为 10 分钟,队列的另一头有一个消费者去生产,没达到过期工夫时消费者是取不到这个工作的,达到过期工夫后消费者能力取到这个工作。
像这种多少分钟之后执行的工作特地适宜应用提早队列实现,如果不应用提早队列,把工作存入数据库定期扫描也能实现,然而会有一个问题:如果扫描时间距离长了,执行工夫不准,如果扫描间隔时间短了,那么对数据库压力比拟大。
DelayQueue
DelayQueue 是 JDK 中自带的提早队列实现,向该队列中提交的工作须要实现 Delayed 接口,这个接口只有一个办法 long getDelay(TimeUnit unit)
这个接口如果返回 0 或正数示意工作曾经达到了过期工夫,能够执行;否则示意还没有达到过期工夫,不能执行。
DelayQueue 应用示例
public class DelayedTask implements Delayed { | |
private String taskName; | |
private long avaibleTime; | |
public String getTaskName() {return taskName;} | |
public DelayedTask(String taskName, long delayTime) { | |
this.taskName = taskName; | |
this.avaibleTime = System.currentTimeMillis() + delayTime;} | |
@Override | |
public long getDelay(TimeUnit unit) {return unit.convert(avaibleTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); | |
} | |
@Override | |
public int compareTo(Delayed o) {return (int)(this.avaibleTime - ((DelayedTask)o).avaibleTime); | |
} | |
} |
下面代码中咱们定义了 DelayedTask 工作并且实现了 Delayed 接口,在 DelayedTask 中保护了一个成员变量 avaibleTime 寄存工作到期工夫,在 getDelay 办法实现中用 avaibleTime 减去以后工夫戳,如果是一个正数或 0,示意工作曾经达到了到期工夫,能够执行了。
测试
DelayQueue<DelayedTask> queue = new DelayQueue<>(); | |
new Thread(()->{ | |
try { | |
//take 办法会阻塞,直到有工作到期 | |
DelayedTask task = queue.take(); | |
System.out.println(task.getTaskName()); | |
} catch (InterruptedException e) {e.printStackTrace(); | |
} | |
}).start(); | |
queue.put(new DelayedTask("工作", 3000)); |
三秒种后输入 工作
实现原理
次要属性
// 优先队列 | |
private final PriorityQueue q = new PriorityQueue(); | |
//leader 线程示意正在期待出队的线程 | |
private Thread leader = null; | |
// 可用条件 | |
private final Condition available = lock.newCondition(); |
DelayQueue 的底层实现是优先队列
take 办法
public E take() throws InterruptedException { | |
final ReentrantLock lock = this.lock; | |
lock.lockInterruptibly(); ① | |
try {for (;;) {E first = q.peek(); | |
if (first == null) ② | |
available.await(); | |
else {long delay = first.getDelay(NANOSECONDS); | |
if (delay <= 0) ③ | |
return q.poll(); | |
first = null; // don't retain ref while waiting | |
if (leader != null) ④ | |
available.await(); | |
else {Thread thisThread = Thread.currentThread(); | |
leader = thisThread; ⑤ | |
try {available.awaitNanos(delay); | |
} finally {if (leader == thisThread) | |
leader = null; | |
} | |
} | |
} | |
} | |
} finally {if (leader == null && q.peek() != null) | |
available.signal(); ⑥ | |
lock.unlock();} | |
} |
在 take 办法中,首先获取锁,而后查看队首元素
- 如果队首元素为 null,阐明队列是空的,调用 available.await() 期待 ②
-
如果队首元素不为 null,调用 getDelay 判断该工作是否到期
- 如果返回正数或 0,阐明工作曾经到期,间接获取队首元素返回 ③
-
如果不是正数或 0,阐明工作还没到期。这时判断 leader 是否为 null
-
- 如果 leader 不为 null,阐明以后有一个 leader 在期待队列头元素到期,调用 available.await() 将以后线程挂起 ④
- 如果 leader 为 null,将以后线程设置为 leader,并调用 available.await(delay) 挂起期待残余到期工夫。当线程被零碎唤醒后,将 leader 设置为 null,从新去获取元素,这时队首元素就曾经到期了,这个线程会在 ③ 处返回。
如图,当初有三个线程去生产这个队列,队列中的 TaskA 没到执行工夫,Thread1 是以后的 Leader 线程,它会期待提早纳秒数,而后拉取队列头节点生产并且调用 signal() 唤醒 Follower 节点。如下图,Thread3 节点被唤醒,它会先看一下头节点是否到期,如果没到期就期待到它到期。
工夫轮算法
如上图,在工夫轮算法中,有一个轮,这个轮由 n 个槽组成,有一个指针向钟表一样每隔一个工夫间走到下个槽位上,而后这个槽位上的工作就能够执行。假如当初有 8 个槽组成工夫轮,指针每秒挪动一个槽位,当初指针指向 1 这个槽,假如当初有一个工作要在 5 秒钟之后执行,那么把这个工作就放在 5 + 1 = 6 这个槽上,这样在 5 秒钟后指针就会挪动到这个槽,而后这个槽对应的工作就会被执行。
如果有工作要在 20 秒后执行,那么就须要指针多转两圈,将工作放在(20%8 + 1)= 5 这个槽上。
这个圈数须要记录在槽中的数据结构外面。这个数据结构最重要的是两个指针,一个是触发工作的函数指针,另外一个是触发的总第几圈数。工夫轮能够用简略的数组或者是环形链表来实现。
netty 的 HashedWheelTimer 就采纳了工夫轮算法。
相比 DelayQueue 的数据结构,工夫轮在算法复杂度上有肯定劣势。DelayQueue 因为波及到排序,须要调整数据的地位,插入和移除的复杂度是 O(lgn),而工夫轮在插入和移除的复杂度都是 O(1)。
应用 zset 实现提早队列
音讯的生产者应用 zadd quque [以后工夫戳 + 延迟时间] [序列化成字符串的音讯]
将音讯插入音讯队列。
开启多个线程一直执行 zrangebyscore quque [0] [以后工夫戳] limit 0 1
命令,ZRANGEBYSCORE 这个命令示意获取队列中的成员,并按分数从小到大排列,limit 0 1
偏移量为 0,获取一条数据,所以返回的就是工夫戳最小的那条。
判断这条数据是否达到了执行工夫,如果达到了执行工夫,执行 zrem member
,ZREM 执行这个命令有两个目标:
1. 从提早队列中删除这条音讯避免其它线程再次获取;
2. 可能存在多个线程同时获取到这条音讯,所以这里靠 zrem 只有返回 > 0,才阐明以后线程胜利获取到音讯,能够生产音讯,如果返回 = 0,阐明已有其它线程领先获取到音讯了,以后线程不能够生产音讯。
优化:
因为咱们是用多个线程去同时 zrangebyscore 而后再通过 zrem 是否胜利来确定是否争抢到音讯,没有抢到音讯的线程实际上就白白执行了一次 zrangebyscore,能够应用 Lua 脚本将 zrangebyscore 和 zrem 放到一起在服务端执行。