共计 8999 个字符,预计需要花费 23 分钟才能阅读完成。
ConcurrentLinkedQueue 是一个由链表构造组成的无界非阻塞队列,是 JDK 中惟一一个并发平安的非阻塞队列。应用无锁算法来保障线程平安,为了缩小 CAS 操作造成的资源抢夺损耗,其链表构造被设计为“松弛”的,本文对 ConcurrentLinkedQueue 的入队和出队过程进行图解,直观展现其内部结构。
1. 继承体系
java.util.concurrent.ConcurrentLinkedQueue
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable
2. 数据结构
ConcurrentLinkedQueue 的数据结构为链表。
2.1 链表节点
须要留神的是,item 为空示意有效节点,非空示意无效节点。
有效节点是须要从链表中清理掉的节点,ConcurrentLinkedQueue 队列中为什么要存储有效节点呢,持续往下看。
java.util.concurrent.ConcurrentLinkedQueue.Node
private static class Node<E> {
volatile E item; // 节点的数据
volatile Node<E> next; // 下一个节点
/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
Node(E item) {UNSAFE.putObject(this, itemOffset, item);
}
boolean casItem(E cmp, E val) {return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
// 相比 putObjectVolatile(),putOrderedObject() 不保障内存可见性,然而性能较高
void lazySetNext(Node<E> val) {UNSAFE.putOrderedObject(this, nextOffset, val);
}
boolean casNext(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
}
2.2 head 和 tail 节点
队列中定义了 head 和 tail 节点。
因为采纳了非阻塞算法(non-blocking algorithms),head 和 tail 节点并不严格指向链表的头尾节点,也就是每次入队出队操作并不会及时更新 head 和 tail 节点。
通过规定“不变式”和“可变式”来保护非阻塞算法的正确性。
不变式:并发对象须要始终放弃的个性。
不变式是并发对象的各个办法之间必须恪守的“契约”,每个办法在调用前和调用后都必须放弃不变式。
采纳不变式,就能够隔离的剖析每个办法,而不必思考它们之间所有可能的交互。
根本不变式
在执行办法之前和之后,队列必须要放弃的不变式(The fundamental invariants):
- 当入队插入新节点之后,队列中有一个 next 域为 null 的节点(真正的尾节点)。
- 从 head 开始遍历队列,能够拜访所有 item 域不为 null 的节点(无效节点)。
head 的不变式和可变式
/**
* A node from which the first live (non-deleted) node (if any)
* can be reached in O(1) time.
* Invariants:
* - all live nodes are reachable from head via succ()
* - head != null
* - (tmp = head).next != tmp || tmp != head
* Non-invariants:
* - head.item may or may not be null.
* - it is permitted for tail to lag behind head, that is, for tail
* to not be reachable from head!
*/
private transient volatile Node<E> head;
head 的不变式:
- 所有的无效节点,都能从 head 通过调用 succ() 办法遍历可达。
- head 不能为 null。
- head 节点的 next 域不能引用到本身。
head 的可变式:
- head 节点的 item 域可能为 null,也可能不为 null。
- 容许 tail 滞后(lag behind)于 head,也就是说:从 head 开始遍历队列,不肯定能达到 tail。
tail 的不变式和可变式
/**
* A node from which the last node on list (that is, the unique
* node with node.next == null) can be reached in O(1) time.
* Invariants:
* - the last node is always reachable from tail via succ()
* - tail != null
* Non-invariants:
* - tail.item may or may not be null.
* - it is permitted for tail to lag behind head, that is, for tail
* to not be reachable from head!
* - tail.next may or may not be self-pointing to tail.
*/
private transient volatile Node<E> tail;
tail 的不变式:
- 通过 tail 调用 succ() 办法,最初节点总是可达的。
- tail 不能为 null。
tail 的可变式:
- tail 节点的 item 域可能为 null,也可能不为 null。
- 容许 tail 滞后于 head,也就是说:从 head 开始遍历队列,不肯定能达到 tail。
- tail 节点的 next 域能够援用到本身。
3. 构造函数
默认创立空节点,head 和 tail 都指向该节点。
/**
* Creates a {@code ConcurrentLinkedQueue} that is initially empty.
*/
public ConcurrentLinkedQueue() {head = tail = new Node<E>(null);
}
/**
* Creates a {@code ConcurrentLinkedQueue}
* initially containing the elements of the given collection,
* added in traversal order of the collection's iterator.
*
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public ConcurrentLinkedQueue(Collection<? extends E> c) {
Node<E> h = null, t = null;
for (E e : c) {checkNotNull(e);
Node<E> newNode = new Node<E>(e);
if (h == null)
h = t = newNode;
else {t.lazySetNext(newNode);
t = newNode;
}
}
if (h == null)
h = t = new Node<E>(null);
head = h;
tail = t;
}
4. 入队
4.1 源码剖析
因为是无界队列,add(e)
办法不必抛出异样。不反对增加 null。
java.util.concurrent.ConcurrentLinkedQueue#add
/**
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never throw
* {@link IllegalStateException} or return {@code false}.
*
* @return {@code true} (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
*/
public boolean add(E e) {return offer(e);
}
入队的外围逻辑:
java.util.concurrent.ConcurrentLinkedQueue#offer
/**
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never return {@code false}.
*
* @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
// 留神 tail 不肯定是尾节点(甚至 tail 有可能存在于废除的链上,后有解释),然而也无妨从 tail 节点开始遍历链表
for (Node<E> t = tail, p = t;;) { // 初始时 t 和 p 都指向 tail 节点
Node<E> q = p.next;
if (q == null) { // 应用 p.next 是否为空来判断 p 是否是尾节点,比拟精确
// p is last node // 进入这里阐明此时 p 是尾节点
if (p.casNext(null, newNode)) { // 若节点 p 的下一个节点为 null,则设置为 newNode
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
// 不论 p 与 t 是否雷同,都应该 casTail。然而这里只在 p 与 t 不同时才 casTail,导致 tail 节点不总是尾节点,目标是缩小对 tail 的 CAS
casTail(t, newNode); // Failure is OK. // 将尾节点 tail 由 t 改为 newNode,更新失败了也没关系,因为 tail 是不是尾节点不重要:)
return true;
}
// Lost CAS race to another thread; re-read next // CAS 失败,阐明其余线程先一步操作使得 p 的下一个节点不为 null,需从新获取尾节点
}
else if (p == q) // 如果 p 的 next 等于 p,阐明 p 曾经出队了,须要从新设置 p、t 的值
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
// 1. 若节点 t 不再是 tail,阐明其余线程退出过元素(批改过 tail),则取最新 tail 作为 t 和 p,从新的 tail 节点持续遍历链表
// 2. 若节点 t 仍旧是 tail,阐明从 tail 节点开始遍历链表曾经不论用了,则把 head 作为 p,从 head 节点从头遍历链表(留神这一步造成后续遍历中 p!= t 成立)p = (t != (t = tail)) ? t : head;
// 这里没有更新 tail,仍留在废链上
else
// Check for tail updates after two hops.
// 进入这里,阐明 p.next 不为 null,且 p 未出队,须要判断:// 1. 若 p 与 t 相等,则 t 留在原位,p=p.next 始终往下遍历(留神这一步造成后续遍历中 p!= t 成立)。// 2. 若 p 与 t 不等,需进一步判断 t 与 tail 是否相等。若 t 不为 tail,则取最新 tail 作为 t 和 p;若 t 为 tail,则 p =p.next 始终往下遍历。// 就是说从 tail 节点往后遍历链表的过程,需时刻关注 tail 是否发生变化
p = (p != t && t != (t = tail)) ? t : q;
}
}
更新 tail 节点:
java.util.concurrent.ConcurrentLinkedQueue#casTail
private boolean casTail(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}
入队的根本思维:
- 从 tail 节点开始遍历到尾节点,若定位到尾节点(p.next == null),则入队。
- 遍历过程中,如果遍历到有效节点(p.next == p),须要从新从无效节点(tail 或 head)开始遍历。
- 遍历过程中,时刻关注 tail 节点是否有效。若有效了须要从新从最新的 tail 开始遍历,否则持续遍历以后的下一个节点。
须要留神的点:
- 入队过程中没有频繁执行 casTail(出队过程不会执行 casTail),因而 tail 地位有滞后,不肯定指向尾节点,甚至可能位于废除的链上。
- 应用 p.next == null 来判断尾节点,比应用 tail 精确。
- 通过 tail 遍历节点可能会遍历到有效节点,然而从 head 遍历总能拜访到无效节点。
4.2 入队过程图示
执行 offer(e)
入队,tail 并不总是指向尾节点,多个元素入队过程如下:
增加第一个元素(t 与 p 相等,不会更新 tail):
增加第二个元素(t 与 p 不相等,更新 tail):
增加第三个元素(t 与 p 相等,不会更新 tail):
4.3 tail 位于废除链
因为出队 poll()
逻辑并不会执行 casTail()
来保护 tail 所在位置,因而 tail 可能滞后于 head,甚至位于废除链上,如下图所示:
此时从 tail 往后遍历会拜访到有效节点 p,该节点满足 p == p.next
。
如果想要持续拜访到无效节点,需分两种状况:
- 从遍历开始至今,tail 的地位无变动,此时须要从 head 节点开始往下能力遍历到无效节点。
- 从遍历开始至今,tail 的地位产生了变动,阐明其余线程更新了 tail 的地位,此时从新的 tail 开始往下遍历即可。
5. 出队
java.util.concurrent.ConcurrentLinkedQueue#poll
public E poll() {
restartFromHead:
for (;;) {for (Node<E> h = head, p = h, q;;) { // 初始时 h 和 p 都指向 head 节点,从 head 节点开始遍历链表
E item = p.item;
if (item != null && p.casItem(item, null)) { // p.item 不为空,把 p 节点的数据域设为空,返回 p 节点的数据
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
// 若 p.next 不为空,则把 p.next 设为头节点,把 h 和 p 出队;若 p.next 为空,则把 p 设为头节点,把 h 出队
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) { // 进入这里,阐明 p.item 必然为空。若 p.next 也为空,阐明队列中没有数据了,须要返回 null
updateHead(h, p); // 把头节点设为 p,把 h 出队
return null;
}
else if (p == q) // 如果 p 的 next 等于 p,阐明 p 曾经出队了,从新从头节点开始遍历
continue restartFromHead;
else
p = q; // p = p.next 持续遍历链表
}
}
}
更新 head 节点:
java.util.concurrent.ConcurrentLinkedQueue#updateHead
/**
* Tries to CAS head to p. If successful, repoint old head to itself
* as sentinel for succ(), below.
*/
final void updateHead(Node<E> h, Node<E> p) {if (h != p && casHead(h, p)) // 节点 h 和 p 不等,且以后头节点为 h,则把头节点设为 p
h.lazySetNext(h); // 原头节点 h 的 next 指向本身,示意 h 出队
}
出队的根本思维:
- 从 head 节点开始遍历找出首个无效节点(p.item != null),返回该节点的数据(p.item)。
- 遍历过程中,如果遍历到尾节点(p.next == null),则返回空。
- 遍历过程中,如果遍历到有效节点(p.next == p),阐明其余线程批改了 head,须要从新从无效节点(新的 head)开始遍历。
须要留神的是,并不是每次出队时都执行 updateHead()
更新 head 节点:
- 当 head 节点里有元素时,间接弹出 head 节点里的元素,而不会更新 head 节点。
- 只有当 head 节点里没有元素时,出队操作才会更新 head 节点。
采纳这种形式同样是为了缩小应用 CAS 更新 head 节点的耗费,从而进步出队效率。
5.1 出队过程图示
场景一:队列中具备两个节点,头节点为有效节点。因为 p != h,此时须要把头节点出队。
场景二:队列中具备两个节点,头节点为无效节点。因为 p == h,此时不须要把头节点出队。
6. 容量
不必定义初始容量,毋庸扩容,容量最大值为 Integer.MAX_VALUE。
获取队列的容量:从头开始遍历队列中的无效节点,并计数。留神是遍历过程是弱统一的。
java.util.concurrent.ConcurrentLinkedQueue#size
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p)) // 从第一个有数据的节点开始,始终遍历链表
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE) // 自增直到最大值
break;
return count;
}
java.util.concurrent.ConcurrentLinkedQueue#succ
/**
* Returns the successor of p, or the head node if p.next has been
* linked to self, which will only be true if traversing with a
* stale pointer that is now off the list.
*/
final Node<E> succ(Node<E> p) {
Node<E> next = p.next;
return (p == next) ? head : next; // 如果 p 曾经出队了,则从新从头节点开始,否则持续遍历下一个节点
}
7. IDEA 调试的问题
编写单元测试,对 ConcurrentLinkedQueue#offer 进行调试。
@Test
public void test() {ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer("a");
queue.offer("b");
queue.offer("c");
queue.offer("d");
}
浏览源码可知入队过程并不会批改 head 节点,然而从 IDEA 的 debug 后果看到 head 节点产生了变动!
这是因为 IDEA 的 debug 过程会调用 ConcurrentLinkedQueue#toString 导致的,敞开即可。
ConcurrentLinkedQueue#toString 办法会创立迭代器,会调用到 ConcurrentLinkedQueue#first 办法,该办法会将首个无效节点作为头节点。
java.util.concurrent.ConcurrentLinkedQueue.Itr#Itr
java.util.concurrent.ConcurrentLinkedQueue.Itr#advance
java.util.concurrent.ConcurrentLinkedQueue#first
Node<E> first() { // 获取第一个具备非空元素的节点
restartFromHead:
for (;;) {for (Node<E> h = head, p = h, q;;) {boolean hasItem = (p.item != null);
if (hasItem || (q = p.next) == null) {updateHead(h, p);
return hasItem ? p : null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
从新打断点调试,能够看到入队后 head 节点不变。
8. 总结
- ConcurrentLinkedQueue 是非阻塞队列,采纳 CAS 和自旋保障并发平安。
- ConcurrentLinkedQueue 的 tail 并不是严格指向尾节点,通过缩小出队时对 tail 的 CAS 以提高效率。
- ConcurrentLinkedQueue 的 head 所指节点可能是空节点,也可能是数据节点,通过缩小出队时对 head 的 CAS 以提高效率。
- 采纳非阻塞算法,容许队列处于不统一状态(head/tail),通过保障不变式和可变式,来保护非阻塞算法的正确性。
- 因为是非阻塞队列,无奈应用在线程池中。
作者:Sumkor
链接:https://segmentfault.com/a/11…