共计 3549 个字符,预计需要花费 9 分钟才能阅读完成。
提早队列是这样一种队列:元素有一个过期工夫,元素在队列中程序是依照过期工夫排序的,只有达到过期工夫的元素能力出队,最先达到过期工夫的元素最先出队。
提早队列能够用到这样的场景:比方有一个工作须要 10 分钟后执行,那么就能够把这个工作存入提早队列,并且设置过期工夫为 10 分钟,队列的另一头有一个消费者去生产,没达到过期工夫时消费者是取不到这个工作的,达到过期工夫后消费者能力取到这个工作。
像这种多少分钟之后执行的工作特地适宜应用提早队列实现,如果不应用提早队列,把工作存入数据库定期扫描也能实现,然而会有一个问题:如果扫描时间距离长了,执行工夫不准,如果扫描间隔时间短了,那么对数据库压力比拟大。
DelayQueue
DelayQueue 是 JDK 中自带的提早队列实现,向该队列中提交的工作须要实现 Delayed 接口,这个接口只有一个办法 long getDelay(TimeUnit unit)
这个接口如果返回 0 或正数示意工作曾经达到了过期工夫,能够执行;否则示意还没有达到过期工夫,不能执行。
DelayQueue 应用示例
public class DelayedTask implements Delayed {
private String taskName;
private long avaibleTime;
public String getTaskName() {return taskName;}
public DelayedTask(String taskName, long delayTime) {
this.taskName = taskName;
this.avaibleTime = System.currentTimeMillis() + delayTime;}
@Override
public long getDelay(TimeUnit unit) {return unit.convert(avaibleTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {return (int)(this.avaibleTime - ((DelayedTask)o).avaibleTime);
}
}
下面代码中咱们定义了 DelayedTask 工作并且实现了 Delayed 接口,在 DelayedTask 中保护了一个成员变量 avaibleTime 寄存工作到期工夫,在 getDelay 办法实现中用 avaibleTime 减去以后工夫戳,如果是一个正数或 0,示意工作曾经达到了到期工夫,能够执行了。
测试
DelayQueue<DelayedTask> queue = new DelayQueue<>();
new Thread(()->{
try {
//take 办法会阻塞,直到有工作到期
DelayedTask task = queue.take();
System.out.println(task.getTaskName());
} catch (InterruptedException e) {e.printStackTrace();
}
}).start();
queue.put(new DelayedTask("工作", 3000));
三秒种后输入 工作
实现原理
次要属性
// 优先队列
private final PriorityQueue q = new PriorityQueue();
//leader 线程示意正在期待出队的线程
private Thread leader = null;
// 可用条件
private final Condition available = lock.newCondition();
DelayQueue 的底层实现是优先队列
take 办法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); ①
try {for (;;) {E first = q.peek();
if (first == null) ②
available.await();
else {long delay = first.getDelay(NANOSECONDS);
if (delay <= 0) ③
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null) ④
available.await();
else {Thread thisThread = Thread.currentThread();
leader = thisThread; ⑤
try {available.awaitNanos(delay);
} finally {if (leader == thisThread)
leader = null;
}
}
}
}
} finally {if (leader == null && q.peek() != null)
available.signal(); ⑥
lock.unlock();}
}
在 take 办法中,首先获取锁,而后查看队首元素
- 如果队首元素为 null,阐明队列是空的,调用 available.await() 期待 ②
-
如果队首元素不为 null,调用 getDelay 判断该工作是否到期
- 如果返回正数或 0,阐明工作曾经到期,间接获取队首元素返回 ③
-
如果不是正数或 0,阐明工作还没到期。这时判断 leader 是否为 null
-
- 如果 leader 不为 null,阐明以后有一个 leader 在期待队列头元素到期,调用 available.await() 将以后线程挂起 ④
- 如果 leader 为 null,将以后线程设置为 leader,并调用 available.await(delay) 挂起期待残余到期工夫。当线程被零碎唤醒后,将 leader 设置为 null,从新去获取元素,这时队首元素就曾经到期了,这个线程会在 ③ 处返回。
如图,当初有三个线程去生产这个队列,队列中的 TaskA 没到执行工夫,Thread1 是以后的 Leader 线程,它会期待提早纳秒数,而后拉取队列头节点生产并且调用 signal() 唤醒 Follower 节点。如下图,Thread3 节点被唤醒,它会先看一下头节点是否到期,如果没到期就期待到它到期。
工夫轮算法
如上图,在工夫轮算法中,有一个轮,这个轮由 n 个槽组成,有一个指针向钟表一样每隔一个工夫间走到下个槽位上,而后这个槽位上的工作就能够执行。假如当初有 8 个槽组成工夫轮,指针每秒挪动一个槽位,当初指针指向 1 这个槽,假如当初有一个工作要在 5 秒钟之后执行,那么把这个工作就放在 5 + 1 = 6 这个槽上,这样在 5 秒钟后指针就会挪动到这个槽,而后这个槽对应的工作就会被执行。
如果有工作要在 20 秒后执行,那么就须要指针多转两圈,将工作放在(20%8 + 1)= 5 这个槽上。
这个圈数须要记录在槽中的数据结构外面。这个数据结构最重要的是两个指针,一个是触发工作的函数指针,另外一个是触发的总第几圈数。工夫轮能够用简略的数组或者是环形链表来实现。
netty 的 HashedWheelTimer 就采纳了工夫轮算法。
相比 DelayQueue 的数据结构,工夫轮在算法复杂度上有肯定劣势。DelayQueue 因为波及到排序,须要调整数据的地位,插入和移除的复杂度是 O(lgn),而工夫轮在插入和移除的复杂度都是 O(1)。
应用 zset 实现提早队列
音讯的生产者应用 zadd quque [以后工夫戳 + 延迟时间] [序列化成字符串的音讯]
将音讯插入音讯队列。
开启多个线程一直执行 zrangebyscore quque [0] [以后工夫戳] limit 0 1
命令,ZRANGEBYSCORE 这个命令示意获取队列中的成员,并按分数从小到大排列,limit 0 1
偏移量为 0,获取一条数据,所以返回的就是工夫戳最小的那条。
判断这条数据是否达到了执行工夫,如果达到了执行工夫,执行 zrem member
,ZREM 执行这个命令有两个目标:
1. 从提早队列中删除这条音讯避免其它线程再次获取;
2. 可能存在多个线程同时获取到这条音讯,所以这里靠 zrem 只有返回 > 0,才阐明以后线程胜利获取到音讯,能够生产音讯,如果返回 = 0,阐明已有其它线程领先获取到音讯了,以后线程不能够生产音讯。
优化:
因为咱们是用多个线程去同时 zrangebyscore 而后再通过 zrem 是否胜利来确定是否争抢到音讯,没有抢到音讯的线程实际上就白白执行了一次 zrangebyscore,能够应用 Lua 脚本将 zrangebyscore 和 zrem 放到一起在服务端执行。