共计 8864 个字符,预计需要花费 23 分钟才能阅读完成。
作为阻塞队列的一员,DelayQueue(延迟队列)由于其特殊含义而使用在特定的场景之中,主要在于 Delay 这个词上,那么其内部是如何实现的呢?今天一起通过 DelayQueue 的源码来看一看其是如何完成 Delay 操作的
前言
JDK 版本号:1.8.0_171
DelayQueue 内部通过优先级队列 PriorityQueue 来实现队列元素的排序操作,之前已经介绍过 PriorityBlockingQueue 的源码实现,两者比较类似,可自行回顾下,既然用到了优先级队列,则需要保证其队列元素的可比较性,以及延迟队列的特性(可计算延迟时间,通过延迟时间进行比较排序),故这里其中的队列元素需要实现 Delayed 接口,DelayQueue 主要就在于理解这两部分内容
- DelayQueue 内部通过优先级队列 PriorityQueue 来实现队列元素的排序操作
- DelayQueue 队列元素需要实现 Delayed 接口(包含 compareTo 接口)
使用示例
下面示例代码部分已经显示了 DelayQueue 的用法,从名字命名上也能理解出其含义,延迟队列,主要在于延迟消费,如何实现呢?这里就需要用到 Delayed 接口,后面会进行说明,在使用时需要实现 Delayed 接口和 compareTo 接口
- 通过 getDelay 方法判断当前对象延迟时间是否已经到期
- 通过 compareTo 方法对其队列元素排序完成其队列元素出队的先后顺序
自己可以先试试运行结果,理解看看,可以看下调用 poll 和 take 的结果。如果用过 rocketmq,可以类比其中的延迟消息队列,等到规定的时间再进行消费,只不过 mq 中的实现要比这复杂
public class TestDelayQueue {public static void main(String[] args) throws InterruptedException {DelayQueue<DelayItem> delayQueue = new DelayQueue();
// 20s 后
delayQueue.add(new DelayItem(20, "aaaaaa"));
// 10 秒后
delayQueue.add(new DelayItem(10, "bbbbbb"));
// 30 秒后
delayQueue.add(new DelayItem(30, "cccccc"));
while (0 < delayQueue.size()) {Thread.sleep(1000);
DelayItem d = delayQueue.poll();
// DelayItem d = delayQueue.take();
System.out.println(null != d ? d.getItem() : "null");
}
}
static class DelayItem implements Delayed {
private long delayTime;
private String item;
public DelayItem(long delayTime, String item) {super();
// 当前时间
LocalDateTime localDateTime = LocalDateTime.now();
this.delayTime = localDateTime.getSecond() + delayTime;
this.item = item;
}
@Override
public long getDelay(TimeUnit unit) {LocalDateTime localDateTime = LocalDateTime.now();
return unit.convert(delayTime - localDateTime.getSecond(), TimeUnit.SECONDS);
}
@Override
public int compareTo(Delayed o) {return this.getDelay(TimeUnit.SECONDS) - o.getDelay(TimeUnit.SECONDS) < 0 ? -1 : 1;
}
public String getItem() {return item;}
}
}
类定义
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E>
Delayed
首先要说明的是 Delayed 接口,类定义部分也已经明确指出其使用(E extends Delayed),我们在操作时放入 DelayQueue 队列元素必须实现这个接口,实现其中的 getDelay 方法和 compareTo 方法,在使用示例代码部分我也说明了这两个方法的作用
public interface Delayed extends Comparable<Delayed> {
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}
常量 / 变量
其中使用了 PriorityQueue 来完成有序出队操作,与之前讲解过的 PriorityBlockingQueue 类似,有些许不同,可自行参考源码部分,也可以去看我之前的一篇专门讲解 PriorityBlockingQueue 源码的文章,主要异同在于 PriorityQueue 是非线程安全的,而 PriorityBlockingQueue 是线程安全的,内部排序机制使用的都是堆排序
如果你了解过 PriorityQueue 或 PriorityBlockingQueue 则在这里使用这个类是很容易理解源码实现人员的目的的,建议先去了解其实现,要不直接看这个源码比较有难度
由于需要实现延迟队列,使用 PriorityQueue 根据时间排序(自行实现具体细节,例如上边示例根据时间来排序),通过 Delayed 接口限制使用 DelayQueue 的场景
/**
* 可重入锁 ReentrantLock
*/
private final transient ReentrantLock lock = new ReentrantLock();
/**
* 内部使用 PriorityQueue 来完成 DelayQueue 的操作
*/
private final PriorityQueue<E> q = new PriorityQueue<E>();
/**
* leader 线程
* 指定了用于等待队列元素出队的线程
* 如果非空,则这个线程可以阻塞等待一段时间(时间通过计算获得),其他线程则无限等待
* 避免其他线程不必要的等待
* 这个线程等待一段时间然后出队操作,其他线程则无限等待,* 如果等待过程中入队了过期时间更短的元素(优先级队列堆顶元素变化),则会重置 leader 为 null, 并会唤醒等待的线程去争抢 leader 来获取执行出队的权利
*/
private Thread leader = null;
/**
* Condition 对象完成线程等待和唤醒任务
*/
private final Condition available = lock.newCondition();
构造方法
构造方法比较简单,无参构造没有进行任何操作,有参构造方法直接传入对应类型的集合,循环 add 放入队列
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {this.addAll(c);
}
重要方法
offer
入队操作,先获得 lock,之后通过优先级队列的 offer 方法完成入队,同时判断是否要重置 leader
/**
* Inserts the specified element into this delay queue.
*
* @param e the element to add
* @return {@code true}
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {q.offer(e);
// 此节点为当前优先级队列堆顶节点,即 0 的索引位置
// 即此次添加的节点即为下次要获取的堆顶节点(出队节点)// 如果非堆顶节点则表示堆顶节点未变化则不要重置 leader
if (q.peek() == e) {
// leader 线程置空,让出队线程争抢 leader 优先执行权
leader = null;
// 唤醒阻塞的线程
available.signal();}
return true;
} finally {lock.unlock();
}
}
poll
出队操作,先获得 lock,之后通过优先级队列的 poll 方法完成出队,当然需要判断堆顶元素是否已到期。等待超时方法较为复杂,需耐心理解
/**
* Retrieves and removes the head of this queue, or returns {@code null}
* if this queue has no elements with an expired delay.
*
* @return the head of this queue, or {@code null} if this
* queue has no elements with an expired delay
*/
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 堆顶元素
E first = q.peek();
// 堆为空或者堆顶元素延迟时间还未到期则返回 null,否则通过 poll 出队
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();} finally {lock.unlock();
}
}
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue,
* or the specified wait time expires.
*
* @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element with
* an expired delay becomes available
* @throws InterruptedException {@inheritDoc}
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 时间转成纳秒
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {for (;;) {
// 堆顶元素
E first = q.peek();
// 空表示队列为空
if (first == null) {
// 等待时间小于等于 0 则直接返回 null, 否则就阻塞等待 nanos 时间
if (nanos <= 0)
return null;
else
// 如果中途被唤醒则更新 nanos,剩余等待时间
nanos = available.awaitNanos(nanos);
} else {
// 堆顶元素的延迟时间
long delay = first.getDelay(NANOSECONDS);
// 延迟时间到期直接出队操作
if (delay <= 0)
return q.poll();
// 延迟时间未到期直接返回 null
if (nanos <= 0)
return null;
// 延迟时间未到期同时设置了超时时间进入下面进行处理
// 处于等待状态不要引用 first
first = null; // don't retain ref while waiting
// 超时时间小于延迟时间或者 leader 非空阻塞等待 nanos
// 超时时间小于延迟时间则当前线程最多等待 nanos 超时时间即可
// leader 非空则表明其他线程已经获得优先执行权,最多等待 nanos 超时时间即可
// 在等待中有可能被唤醒再此循环执行
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
// 超时时间大于延迟时间同时 leader 线程为空进入下面处理
Thread thisThread = Thread.currentThread();
// 先设置 leader 线程获取执行权
leader = thisThread;
try {
// 阻塞等待 delay 即可出队操作
// 万一等待过程中被唤醒则通过剩余等待时间循环判断处理
// 有可能在等待中入队了延迟时间更短的元素,此时需释放 leader 重新争抢优先执行权
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
// 释放 leader 执行权,重新争抢 leader
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// leader 空且队列非空则唤醒其他阻塞的线程
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();}
}
take
出队操作,先获得 lock,再通过判断最终执行 poll 完成出队操作,和 poll 的超时等待方法类似
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {for (;;) {E first = q.peek();
if (first == null)
// 队列无数据则阻塞等待
available.await();
else {
// 获取其堆顶元素延迟时间
long delay = first.getDelay(NANOSECONDS);
// 延迟时间已到期,可以进行出队操作了
if (delay <= 0)
return q.poll();
// 延迟时间还未到期
// 置空 first,等待时间内去掉引用
first = null; // don't retain ref while waiting
// leader 线程非空,表示其他线程已经获取优先执行权,阻塞等待
if (leader != null)
available.await();
else {
// leader 为空则指向当前线程,表示当前线程获得执行权
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 阻塞等待 delay 秒之后继续
// 也有可能新入队元素(堆顶元素变化时)被唤醒需重新获取 leader 执行权
available.awaitNanos(delay);
} finally {
// leader 置空,释放优先执行权
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// leader 空且队列非空则唤醒其他阻塞的线程
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();}
}
drainTo
转移队列操作,内部是先通过 peek 方法先获取队列堆顶元素,判断其是否已到期,如到期则添加元素到新队列中,同时对原队列出队操作,当然,只转移已经到期的所有元素
/**
* Returns first element only if it is expired.
* Used only by drainTo. Call only when holding lock.
*
* 命名上完全能了解其含义
* 获取队列中的堆顶元素,延迟时间还未到期则返回 null
* 被 drainTo 所使用,参照下面方法
*
*/
private E peekExpired() {// assert lock.isHeldByCurrentThread();
E first = q.peek();
return (first == null || first.getDelay(NANOSECONDS) > 0) ?
null : first;
}
public int drainTo(Collection<? super E> c) {if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = 0;
// 转移已过期的元素到新队列中
for (E e; (e = peekExpired()) != null;) {c.add(e); // In this order, in case add() throws.
q.poll();
++n;
}
return n;
} finally {lock.unlock();
}
}
public int drainTo(Collection<? super E> c, int maxElements) {
// 判空
if (c == null)
throw new NullPointerException();
// 非本对象
if (c == this)
throw new IllegalArgumentException();
// 长度判断
if (maxElements <= 0)
return 0;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = 0;
// 转移已过期的元素到新队列中,最多转移 maxElements 个元素
for (E e; n < maxElements && (e = peekExpired()) != null;) {c.add(e); // In this order, in case add() throws.
q.poll();
++n;
}
return n;
} finally {lock.unlock();
}
}
其他方法如 peek,size,clear,toArray,remove 等都是通过优先级队列 PriorityQueue 来实现的,只是每次操作时需要先获得可重入锁保证线程安全
迭代器
迭代器的实现不是很复杂,迭代器复制了队列中的所有元素,需要注意的是,迭代器中的 remove 方法会通过 removeEQ 方法直接删除原 PriorityQueue 队列中的元素,不是删除拷贝的数据元素
/**
*
* 本质上调用 PriorityQueue.toArray
* 将 PriorityQueue 的底层数组拷贝作为迭代器的 array
* 故这里保存了所有的元素,不仅仅是已过期的元素
*/
public Iterator<E> iterator() {return new Itr(toArray());
}
/**
* Snapshot iterator that works off copy of underlying q array.
*/
private class Itr implements Iterator<E> {
// 保存 PriorityQueue 的数组
final Object[] array; // Array of all elements
// 下次 next 返回的元素索引
int cursor; // index of next element to return
// 上次返回的 return 元素索引
int lastRet; // index of last element, or -1 if no such
Itr(Object[] array) {
lastRet = -1;
this.array = array;
}
public boolean hasNext() {return cursor < array.length;}
@SuppressWarnings("unchecked")
public E next() {if (cursor >= array.length)
throw new NoSuchElementException();
lastRet = cursor;
return (E)array[cursor++];
}
// 删除元素,需要注意,会直接把原队列中的元素删除
public void remove() {if (lastRet < 0)
throw new IllegalStateException();
removeEQ(array[lastRet]);
lastRet = -1;
}
}
总结
DelayQueue 作为一个特殊的阻塞队列,主要在于 Delay 特性上,内部通过优先级阻塞队列和 Delayed 接口实现延迟的操作,如果之前已经了解了优先级队列,则非常容易理解其源码实现逻辑,复杂点的部分也就在于在多线程环境下入队一个新的更短的元素时内部做的处理,通过争抢 leader 来确定优先出队的那个线程,做不同的处理,比较有意思,可以参考文章多理解理解,不算过于复杂
以上内容如有问题欢迎指出,笔者验证后将及时修正,谢谢