前言
DelayQueue 是 BlockingQueue 接口的实现类,它是带有延时性能的无界阻塞队列,意思就是每个元素都有过期工夫,当从队列获取元素时,只有过期元素才会出队列。队列的头部元素是最快要过期的元素。
实现原理
先来看看它的类构造:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {}
能够看到,寄存的元素必须实现了 Delayed 接口,它的定义如下:
public interface Delayed extends Comparable<Delayed> {long getDelay(TimeUnit unit);
}
再来看要害属性:
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
private final Condition available = lock.newCondition();
再来看重要办法:
offer()
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();}
return true;
} finally {lock.unlock();
}
}
poll()
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();} finally {lock.unlock();
}
}
take()
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {for (;;) {E first = q.peek();
if (first == null)
available.await();
else {long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {Thread thisThread = Thread.currentThread();
leader = thisThread;
try {available.awaitNanos(delay);
} finally {if (leader == thisThread)
leader = null;
}
}
}
}
} finally {if (leader == null && q.peek() != null)
available.signal();
lock.unlock();}
}
今天再剖析吧,先睡了,晚安全世界!