共计 9167 个字符,预计需要花费 23 分钟才能阅读完成。
阻塞队列中目前还剩下一个比较特殊的队列实现,相比较前面讲解过的队列,本文中要讲的 LinkedBlockingDeque 比较容易理解了,但是与之前讲解过的阻塞队列又有些不同,从命名上你应该能看出一些端倪,接下来就一起看看这个特殊的阻塞队列
前言
JDK 版本号:1.8.0_171
LinkedBlockingDeque 在结构上有别于之前讲解过的阻塞队列,它不是 Queue 而是 Deque,中文翻译成双端队列,双端队列指可以从任意一端入队或者出队元素的队列,实现了在队列头和队列尾的高效插入和移除
LinkedBlockingDeque 是链表实现的线程安全的无界的同时支持 FIFO、LIFO 的双端阻塞队列,可以回顾下之前的 LinkedBlockingQueue 阻塞队列特点,本质上是类似的,但是又有些不同:
- 内部是通过 Node 节点组成的链表来实现的,当然为了支持双端操作,结点结构不同
- LinkedBlockingQueue 通过两个 ReentrantLock 锁保护竞争资源,实现了多线程对竞争资源的互斥访问,入队和出队互不影响,可同时操作,然而 LinkedBlockingDeque 只设置了一个全局 ReentrantLock 锁,两个条件对象实现互斥访问,性能上要比 LinkedBlockingQueue 差一些
- 无界,默认链表长度为 Integer.MAX_VALUE,本质上还是有界
- 阻塞队列,是指多线程访问竞争资源时,当竞争资源已被某线程获取时,其它要获取该资源的线程需要阻塞等待
Queue 和 Deque 的关系有点类似于单链表和双向链表,LinkedBlockingQueue 和 LinkedBlockingDeque 的内部结点实现就是单链表和双向链表的区别,具体可参考源码
在第二点中可能有些人有些疑问,两个互斥锁和一个互斥锁的区别在哪里?我们可以考虑以下场景:
A 线程先进行入队操作,B 线程随后进行出队操作,如果是 LinkedBlockingQueue,A 线程入队过程还未结束(已获得锁还未释放),B 线程出队操作不会被阻塞等待(锁不同),如果是 LinkedBlockingDeque 则 B 线程会被阻塞等待(同一把锁)A 线程完成操作才继续执行
LinkedBlockingQueue 一般的操作是获取一把锁就可以,但有些操作例如 remove 操作,则需要同时获取两把锁,之前的 LinkedBlockingQueue 讲解曾经说明过,这里就不详细讲解了
类定义
实现 BlockingDeque 接口,其中定义了双端队列应该实现的方法,具体方法不说明了,主要是每个方法都分为了 First 和 Last 两种方式,从头部或者尾部进行队列操作
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable
常量 / 变量
/**
* 头结点
*/
transient Node<E> first;
/**
* 尾结点
*/
transient Node<E> last;
/** 双端队列实际结点个数 */
private transient int count;
/** 双端队列容量 */
private final int capacity;
/** 互斥重入锁 */
final ReentrantLock lock = new ReentrantLock();
/** 非空条件对象 */
private final Condition notEmpty = lock.newCondition();
/** 非满条件对象 */
private final Condition notFull = lock.newCondition();
内部类
为了实现双端队列,内部使用了双向链表,不像 LinkedBlockingQueue 使用的是单链表,前驱和后继指针的特殊情况需要注意
/** 双向链表 Node */
static final class Node<E> {
/**
* 节点数据值,移除则为 null
*/
E item;
/**
* 下列情况之一:
* - 前驱节点
* - 前驱是尾节点,则 prev 指向当前节点
* - 无前驱节点则为 null
*/
Node<E> prev;
/**
* 下列情况之一:
* - 后继节点
* - 后继是头节点,则 next 指向当前节点
* - 无后继节点则为 null
*/
Node<E> next;
Node(E x) {item = x;}
}
构造方法
构造方法比较简单,可设置容量上限,不设置默认 Integer.MAX_VALUE,类似之前的 LinkedBlockingQueue
public LinkedBlockingDeque() {this(Integer.MAX_VALUE);
}
public LinkedBlockingDeque(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
}
public LinkedBlockingDeque(Collection<? extends E> c) {this(Integer.MAX_VALUE);
final ReentrantLock lock = this.lock;
// 确保可见性
lock.lock(); // Never contended, but necessary for visibility
try {for (E e : c) {if (e == null)
throw new NullPointerException();
if (!linkLast(new Node<E>(e)))
throw new IllegalStateException("Deque full");
}
} finally {lock.unlock();
}
}
重要方法
offerFirst/offerLast
在入队操作中,将每种方法都分成了队首入队和队尾入队两种,在获取锁之后最终调用方法是 linkFirst/linkLast。在 putFirst 中如果队列已满则通过 notFull.await() 阻塞操作,比较容易理解
入队方法如下:
类别 | 失败返回特殊值 | 失败抛异常 | 阻塞等待 | 超时阻塞等待 |
---|---|---|---|---|
队首 | offerFirst | push/addFirst | putFirst | offerFirst(E e, long timeout, TimeUnit unit) |
队尾 | offer/offerLast | add/addLast | put/putLast | offer/offerLast(E e, long timeout, TimeUnit unit) |
public boolean offerFirst(E e) {if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {return linkFirst(node);
} finally {lock.unlock();
}
}
public boolean offerLast(E e) {if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {return linkLast(node);
} finally {lock.unlock();
}
}
linkFirst/linkLast
将结点添加到头部或者尾部,如果队列已满则返回 false,在调用方法之前已经获取互斥锁才进行操作
/**
* Links node as first element, or returns false if full.
*/
private boolean linkFirst(Node<E> node) {// assert lock.isHeldByCurrentThread();
// 队列已满
if (count >= capacity)
return false;
// 更新 first
Node<E> f = first;
node.next = f;
first = node;
// last 为空则表示原队列为空,当前队列仅有 node 则 last 更新指向 node 即可
if (last == null)
last = node;
else
// 原队列非空,更新原 first 的前驱指针
f.prev = node;
++count;
// 通过条件对象唤醒出队操作阻塞线程
notEmpty.signal();
return true;
}
/**
* Links node as last element, or returns false if full.
*/
private boolean linkLast(Node<E> node) {// assert lock.isHeldByCurrentThread();
if (count >= capacity)
return false;
// 更新 last
Node<E> l = last;
node.prev = l;
last = node;
// first 为空则表示原队列为空,当前队列仅有 node 则 first 更新指向 node 即可
if (first == null)
first = node;
else
// 原队列非空,更新原 last 的后继指针
l.next = node;
// 实际节点数量 + 1
++count;
// 通过条件对象唤醒出队操作阻塞线程
notEmpty.signal();
return true;
}
pollFirst/pollLast
在出队操作中,同样将每种方法都分成了队首出队和队尾出队两种,在获取锁之后最终调用方法是 unlinkFirst/unlinkLast。在 takeFirst 中如果队列为空则通过 notEmpty.await() 阻塞操作,比较容易理解
出队方法如下:
类别 | 失败返回特殊值 | 失败抛异常 | 阻塞等待 | 超时阻塞等待 |
---|---|---|---|---|
队首 | poll/pollFirst | pop/remove/removeFirst | take/takeFirst | poll/pollFirst(long timeout, TimeUnit unit) |
队尾 | pollLast | removeLast | takeLast | pollLast(long timeout, TimeUnit unit) |
public E pollFirst() {
final ReentrantLock lock = this.lock;
lock.lock();
try {return unlinkFirst();
} finally {lock.unlock();
}
}
public E pollLast() {
final ReentrantLock lock = this.lock;
lock.lock();
try {return unlinkLast();
} finally {lock.unlock();
}
}
unlinkFirst/unlinkLast
移除返回队头元素或队尾元素,假如队列为空则返回 null
/**
* Removes and returns first element, or null if empty.
*/
private E unlinkFirst() {// assert lock.isHeldByCurrentThread();
Node<E> f = first;
// 判空
if (f == null)
return null;
// 更新 first
Node<E> n = f.next;
E item = f.item;
// item 置空,next 指向自己,方便 gc 回收
f.item = null;
f.next = f; // help GC
first = n;
// 当前队列为空,last 指向置空
if (n == null)
last = null;
else
// 当前队列非空,新的头结点前驱置空
n.prev = null;
// 实际结点数量 - 1
--count;
// 通过条件对象唤醒入队操作阻塞线程
notFull.signal();
return item;
}
/**
* Removes and returns last element, or null if empty.
*/
private E unlinkLast() {// assert lock.isHeldByCurrentThread();
Node<E> l = last;
if (l == null)
return null;
// 更新 last
Node<E> p = l.prev;
// item 置空,prev 指向自己,方便 gc 回收
E item = l.item;
l.item = null;
l.prev = l; // help GC
last = p;
// 当前队列为空,last 指向置空
if (p == null)
first = null;
else
// 当前队列非空,新的尾结点后继置空
p.next = null;
--count;
// 通过条件对象唤醒入队操作阻塞线程
notFull.signal();
return item;
}
unlink
将队列中匹配的结点删除,链表中进行解除前后关联即可,注意下如果 x 处于队列中间(非头和尾结点),则 x 本身的前驱和后继指针不会被更新修改,为的是防止迭代器循环到 x 找不到前后结点,避免迭代器异常
/**
* Unlinks x.
*/
void unlink(Node<E> x) {// assert lock.isHeldByCurrentThread();
// 前驱结点和后继结点
Node<E> p = x.prev;
Node<E> n = x.next;
// 前驱为空,相当于删除头结点
if (p == null) {unlinkFirst();
} else if (n == null) {
// 后继为空,相当于删除尾结点
unlinkLast();} else {
// 前驱后继都不为空,解除删除结点与前后结点的关系,item 置空
// 注意,这里 x 本身的前驱和后继没有被更新修改,为的是防止迭代器循环到 x 找不到前后结点,避免迭代器异常
p.next = n;
n.prev = p;
x.item = null;
--count;
// 通过条件对象唤醒入队操作阻塞线程
notFull.signal();}
}
peekFirst/peekLast
peek 操作直接通过首尾节点指向获得对应的 item 即可,不会删除节点
public E peekFirst() {
final ReentrantLock lock = this.lock;
lock.lock();
try {return (first == null) ? null : first.item;
} finally {lock.unlock();
}
}
public E peekLast() {
final ReentrantLock lock = this.lock;
lock.lock();
try {return (last == null) ? null : last.item;
} finally {lock.unlock();
}
}
removeFirstOccurrence/removeLastOccurrence
删除第一个 / 最后一个满足条件的队列结点,removeFirstOccurrence 从前向后进行匹配,removeLastOccurrence 从后向前进行匹配,找到第一个满足条件的结点进行删除操作
public boolean removeFirstOccurrence(Object o) {if (o == null) return false;
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 从前向后循环判断相等则通过 unlink 移除
for (Node<E> p = first; p != null; p = p.next) {if (o.equals(p.item)) {unlink(p);
return true;
}
}
return false;
} finally {lock.unlock();
}
}
public boolean removeLastOccurrence(Object o) {if (o == null) return false;
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 从后向前循环判断相等则通过 unlink 移除
for (Node<E> p = last; p != null; p = p.prev) {if (o.equals(p.item)) {unlink(p);
return true;
}
}
return false;
} finally {lock.unlock();
}
}
drainTo
转移队列操作
public int drainTo(Collection<? super E> c, int maxElements) {if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final ReentrantLock lock = this.lock;
lock.lock();
try {int n = Math.min(maxElements, count);
for (int i = 0; i < n; i++) {
// 先添加防止未添加到新队列中时原队列结点出队
c.add(first.item); // In this order, in case add() throws.
// 解除关联
unlinkFirst();}
return n;
} finally {lock.unlock();
}
}
clear
清空队列操作,比较简单,很好理解
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {for (Node<E> f = first; f != null;) {
f.item = null;
Node<E> n = f.next;
f.prev = null;
f.next = null;
f = n;
}
first = last = null;
count = 0;
notFull.signalAll();} finally {lock.unlock();
}
}
迭代器
由于其双向链表的实现,迭代器可分为升序迭代器(Itr)和倒序迭代器(DescendingItr),通过 AbstractItr 封装公共操作方法,Itr 和 DescendingItr 分别实现对应不同的方法,模板方法设计模式写法可借鉴,一个从头节点开始向后进行遍历,一个从尾节点向后进行遍历
private abstract class AbstractItr implements Iterator<E> {
/**
* next 方法返回的 node
*/
Node<E> next;
/**
* 保存 next 的 item,防止 hasNext 为 true 后节点被删除再调用 next 获取不到值的情况
*/
E nextItem;
/**
* 最近一次调用 next 返回的节点,如果通过调用 remove 删除了此元素,则重置为 null
*/
private Node<E> lastRet;
// 两个不同迭代器实现不同
abstract Node<E> firstNode();
abstract Node<E> nextNode(Node<E> n);
// 构造方法初始化,设置 next 和 nextItem
AbstractItr() {
// set to initial position
final ReentrantLock lock = LinkedBlockingDeque.this.lock;
lock.lock();
try {next = firstNode();
nextItem = (next == null) ? null : next.item;
} finally {lock.unlock();
}
}
/**
* 返回后继节点
*/
private Node<E> succ(Node<E> n) {for (;;) {Node<E> s = nextNode(n);
if (s == null)
return null;
else if (s.item != null)
return s;
else if (s == n)
return firstNode();
else
n = s;
}
}
/**
* 设置下一次执行 next 应该返回的值
*/
void advance() {
final ReentrantLock lock = LinkedBlockingDeque.this.lock;
lock.lock();
try {
// assert next != null;
next = succ(next);
nextItem = (next == null) ? null : next.item;
} finally {lock.unlock();
}
}
public boolean hasNext() {return next != null;}
public E next() {if (next == null)
throw new NoSuchElementException();
lastRet = next;
E x = nextItem;
advance();
return x;
}
// 移除操作,注意,这里直接在原队列中移除了 lastRet 对应的节点
public void remove() {
Node<E> n = lastRet;
if (n == null)
throw new IllegalStateException();
lastRet = null;
final ReentrantLock lock = LinkedBlockingDeque.this.lock;
lock.lock();
try {if (n.item != null)
unlink(n);
} finally {lock.unlock();
}
}
}
/** Forward iterator */
// 从头节点向后遍历迭代器
private class Itr extends AbstractItr {Node<E> firstNode() {return first;}
Node<E> nextNode(Node<E> n) {return n.next;}
}
/** Descending iterator */
// 从尾节点向后遍历迭代器
private class DescendingItr extends AbstractItr {Node<E> firstNode() {return last;}
Node<E> nextNode(Node<E> n) {return n.prev;}
}
总结
源码分析完毕,整理 LinkedBlockingDeque 有如下特点:
- 链表实现的线程安全的无界的同时支持 FIFO、LIFO 的双端阻塞队列
- 通过结点同时持有前驱结点和后继结点的引用支持队列的头和尾的双端操作
- 迭代器支持正向和反向两种迭代方式
- 大部分方法都争抢唯一一把可重入互斥锁,在竞争激烈的多线程并发环境下吞吐量比较低
- 删除中间结点(非头尾)时,保留结点前后前驱指针,防止迭代器异常
以上内容如有问题欢迎指出,笔者验证后将及时修正,谢谢