ConcurrentLinkedQueue 是一个由链表构造组成的无界非阻塞队列,是 JDK 中惟一一个并发平安的非阻塞队列。应用无锁算法来保障线程平安,为了缩小 CAS 操作造成的资源抢夺损耗,其链表构造被设计为“松弛”的,本文对 ConcurrentLinkedQueue 的入队和出队过程进行图解,直观展现其内部结构。
1. 继承体系
java.util.concurrent.ConcurrentLinkedQueue
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable
2. 数据结构
ConcurrentLinkedQueue 的数据结构为链表。
2.1 链表节点
须要留神的是,item 为空示意有效节点,非空示意无效节点。
有效节点是须要从链表中清理掉的节点,ConcurrentLinkedQueue 队列中为什么要存储有效节点呢,持续往下看。
java.util.concurrent.ConcurrentLinkedQueue.Node
private static class Node<E> { volatile E item; // 节点的数据 volatile Node<E> next; // 下一个节点 /** * Constructs a new node. Uses relaxed write because item can * only be seen after publication via casNext. */ Node(E item) { UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } // 相比 putObjectVolatile(),putOrderedObject() 不保障内存可见性,然而性能较高 void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); }}
2.2 head 和 tail 节点
队列中定义了 head 和 tail 节点。
因为采纳了非阻塞算法(non-blocking algorithms),head 和 tail 节点并不严格指向链表的头尾节点,也就是每次入队出队操作并不会及时更新 head 和 tail 节点。
通过规定“不变式”和“可变式”来保护非阻塞算法的正确性。
不变式:并发对象须要始终放弃的个性。
不变式是并发对象的各个办法之间必须恪守的“契约”,每个办法在调用前和调用后都必须放弃不变式。
采纳不变式,就能够隔离的剖析每个办法,而不必思考它们之间所有可能的交互。
根本不变式
在执行办法之前和之后,队列必须要放弃的不变式(The fundamental invariants):
- 当入队插入新节点之后,队列中有一个 next 域为 null 的节点(真正的尾节点)。
- 从 head 开始遍历队列,能够拜访所有 item 域不为 null 的节点(无效节点)。
head 的不变式和可变式
/** * A node from which the first live (non-deleted) node (if any) * can be reached in O(1) time. * Invariants: * - all live nodes are reachable from head via succ() * - head != null * - (tmp = head).next != tmp || tmp != head * Non-invariants: * - head.item may or may not be null. * - it is permitted for tail to lag behind head, that is, for tail * to not be reachable from head! */private transient volatile Node<E> head;
head 的不变式:
- 所有的无效节点,都能从 head 通过调用 succ() 办法遍历可达。
- head 不能为 null。
- head 节点的 next 域不能引用到本身。
head 的可变式:
- head 节点的 item 域可能为 null,也可能不为 null。
- 容许 tail 滞后(lag behind)于 head,也就是说:从 head 开始遍历队列,不肯定能达到 tail。
tail 的不变式和可变式
/** * A node from which the last node on list (that is, the unique * node with node.next == null) can be reached in O(1) time. * Invariants: * - the last node is always reachable from tail via succ() * - tail != null * Non-invariants: * - tail.item may or may not be null. * - it is permitted for tail to lag behind head, that is, for tail * to not be reachable from head! * - tail.next may or may not be self-pointing to tail. */private transient volatile Node<E> tail;
tail 的不变式:
- 通过 tail 调用 succ() 办法,最初节点总是可达的。
- tail 不能为 null。
tail 的可变式:
- tail 节点的 item 域可能为 null,也可能不为 null。
- 容许 tail 滞后于 head,也就是说:从 head 开始遍历队列,不肯定能达到 tail。
- tail 节点的 next 域能够援用到本身。
3. 构造函数
默认创立空节点,head 和 tail 都指向该节点。
/** * Creates a {@code ConcurrentLinkedQueue} that is initially empty. */public ConcurrentLinkedQueue() { head = tail = new Node<E>(null);}/** * Creates a {@code ConcurrentLinkedQueue} * initially containing the elements of the given collection, * added in traversal order of the collection's iterator. * * @param c the collection of elements to initially contain * @throws NullPointerException if the specified collection or any * of its elements are null */public ConcurrentLinkedQueue(Collection<? extends E> c) { Node<E> h = null, t = null; for (E e : c) { checkNotNull(e); Node<E> newNode = new Node<E>(e); if (h == null) h = t = newNode; else { t.lazySetNext(newNode); t = newNode; } } if (h == null) h = t = new Node<E>(null); head = h; tail = t;}
4. 入队
4.1 源码剖析
因为是无界队列,add(e)
办法不必抛出异样。不反对增加 null。
java.util.concurrent.ConcurrentLinkedQueue#add
/** * Inserts the specified element at the tail of this queue. * As the queue is unbounded, this method will never throw * {@link IllegalStateException} or return {@code false}. * * @return {@code true} (as specified by {@link Collection#add}) * @throws NullPointerException if the specified element is null */public boolean add(E e) { return offer(e);}
入队的外围逻辑:
java.util.concurrent.ConcurrentLinkedQueue#offer
/** * Inserts the specified element at the tail of this queue. * As the queue is unbounded, this method will never return {@code false}. * * @return {@code true} (as specified by {@link Queue#offer}) * @throws NullPointerException if the specified element is null */public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); // 留神tail不肯定是尾节点(甚至tail有可能存在于废除的链上,后有解释),然而也无妨从tail节点开始遍历链表 for (Node<E> t = tail, p = t;;) { // 初始时t和p都指向tail节点 Node<E> q = p.next; if (q == null) { // 应用p.next是否为空来判断p是否是尾节点,比拟精确 // p is last node // 进入这里阐明此时p是尾节点 if (p.casNext(null, newNode)) { // 若节点p的下一个节点为null,则设置为newNode // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". if (p != t) // hop two nodes at a time // 不论p与t是否雷同,都应该casTail。然而这里只在p与t不同时才casTail,导致tail节点不总是尾节点,目标是缩小对tail的CAS casTail(t, newNode); // Failure is OK. // 将尾节点tail由t改为newNode,更新失败了也没关系,因为tail是不是尾节点不重要:) return true; } // Lost CAS race to another thread; re-read next // CAS失败,阐明其余线程先一步操作使得p的下一个节点不为null,需从新获取尾节点 } else if (p == q) // 如果p的next等于p,阐明p曾经出队了,须要从新设置p、t的值 // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. // 1. 若节点t不再是tail,阐明其余线程退出过元素(批改过tail),则取最新tail作为t和p,从新的tail节点持续遍历链表 // 2. 若节点t仍旧是tail,阐明从tail节点开始遍历链表曾经不论用了,则把head作为p,从head节点从头遍历链表(留神这一步造成后续遍历中p!=t成立) p = (t != (t = tail)) ? t : head; // 这里没有更新tail,仍留在废链上 else // Check for tail updates after two hops. // 进入这里,阐明p.next不为null,且p未出队,须要判断: // 1. 若p与t相等,则t留在原位,p=p.next始终往下遍历(留神这一步造成后续遍历中p!=t成立)。 // 2. 若p与t不等,需进一步判断t与tail是否相等。若t不为tail,则取最新tail作为t和p;若t为tail,则p=p.next始终往下遍历。 // 就是说从tail节点往后遍历链表的过程,需时刻关注tail是否发生变化 p = (p != t && t != (t = tail)) ? t : q; }}
更新 tail 节点:
java.util.concurrent.ConcurrentLinkedQueue#casTail
private boolean casTail(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);}
入队的根本思维:
- 从 tail 节点开始遍历到尾节点,若定位到尾节点(p.next == null),则入队。
- 遍历过程中,如果遍历到有效节点(p.next == p),须要从新从无效节点(tail 或 head)开始遍历。
- 遍历过程中,时刻关注 tail 节点是否有效。若有效了须要从新从最新的 tail 开始遍历,否则持续遍历以后的下一个节点。
须要留神的点:
- 入队过程中没有频繁执行 casTail(出队过程不会执行 casTail),因而 tail 地位有滞后,不肯定指向尾节点,甚至可能位于废除的链上。
- 应用 p.next == null 来判断尾节点,比应用 tail 精确。
- 通过 tail 遍历节点可能会遍历到有效节点,然而从 head 遍历总能拜访到无效节点。
4.2 入队过程图示
执行offer(e)
入队,tail 并不总是指向尾节点,多个元素入队过程如下:
增加第一个元素(t 与 p 相等,不会更新 tail):
增加第二个元素(t 与 p 不相等,更新 tail):
增加第三个元素(t 与 p 相等,不会更新 tail):
4.3 tail 位于废除链
因为出队 poll()
逻辑并不会执行 casTail()
来保护 tail 所在位置,因而 tail 可能滞后于 head,甚至位于废除链上,如下图所示:
此时从 tail 往后遍历会拜访到有效节点 p,该节点满足 p == p.next
。
如果想要持续拜访到无效节点,需分两种状况:
- 从遍历开始至今,tail 的地位无变动,此时须要从 head 节点开始往下能力遍历到无效节点。
- 从遍历开始至今,tail 的地位产生了变动,阐明其余线程更新了 tail 的地位,此时从新的 tail 开始往下遍历即可。
5. 出队
java.util.concurrent.ConcurrentLinkedQueue#poll
public E poll() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { // 初始时h和p都指向head节点,从head节点开始遍历链表 E item = p.item; if (item != null && p.casItem(item, null)) { // p.item不为空,把p节点的数据域设为空,返回p节点的数据 // Successful CAS is the linearization point // for item to be removed from this queue. if (p != h) // hop two nodes at a time // 若p.next不为空,则把p.next设为头节点,把h和p出队;若p.next为空,则把p设为头节点,把h出队 updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { // 进入这里,阐明p.item必然为空。若p.next也为空,阐明队列中没有数据了,须要返回null updateHead(h, p); // 把头节点设为p,把h出队 return null; } else if (p == q) // 如果p的next等于p,阐明p曾经出队了,从新从头节点开始遍历 continue restartFromHead; else p = q; // p = p.next 持续遍历链表 } }}
更新 head 节点:
java.util.concurrent.ConcurrentLinkedQueue#updateHead
/** * Tries to CAS head to p. If successful, repoint old head to itself * as sentinel for succ(), below. */final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) // 节点h和p不等,且以后头节点为h,则把头节点设为p h.lazySetNext(h); // 原头节点h的next指向本身,示意h出队}
出队的根本思维:
- 从 head 节点开始遍历找出首个无效节点(p.item != null),返回该节点的数据(p.item)。
- 遍历过程中,如果遍历到尾节点(p.next == null),则返回空。
- 遍历过程中,如果遍历到有效节点(p.next == p),阐明其余线程批改了 head,须要从新从无效节点(新的 head)开始遍历。
须要留神的是,并不是每次出队时都执行 updateHead()
更新 head 节点:
- 当 head 节点里有元素时,间接弹出 head 节点里的元素,而不会更新 head 节点。
- 只有当 head 节点里没有元素时,出队操作才会更新 head 节点。
采纳这种形式同样是为了缩小应用 CAS 更新 head 节点的耗费,从而进步出队效率。
5.1 出队过程图示
场景一:队列中具备两个节点,头节点为有效节点。因为 p != h,此时须要把头节点出队。
场景二:队列中具备两个节点,头节点为无效节点。因为 p == h,此时不须要把头节点出队。
6. 容量
不必定义初始容量,毋庸扩容,容量最大值为 Integer.MAX_VALUE。
获取队列的容量:从头开始遍历队列中的无效节点,并计数。留神是遍历过程是弱统一的。
java.util.concurrent.ConcurrentLinkedQueue#size
public int size() { int count = 0; for (Node<E> p = first(); p != null; p = succ(p)) // 从第一个有数据的节点开始,始终遍历链表 if (p.item != null) // Collection.size() spec says to max out if (++count == Integer.MAX_VALUE) // 自增直到最大值 break; return count;}
java.util.concurrent.ConcurrentLinkedQueue#succ
/** * Returns the successor of p, or the head node if p.next has been * linked to self, which will only be true if traversing with a * stale pointer that is now off the list. */final Node<E> succ(Node<E> p) { Node<E> next = p.next; return (p == next) ? head : next; // 如果p曾经出队了,则从新从头节点开始,否则持续遍历下一个节点}
7. IDEA 调试的问题
编写单元测试,对 ConcurrentLinkedQueue#offer 进行调试。
@Testpublic void test() { ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>(); queue.offer("a"); queue.offer("b"); queue.offer("c"); queue.offer("d");}
浏览源码可知入队过程并不会批改 head 节点,然而从 IDEA 的 debug 后果看到 head 节点产生了变动!
这是因为 IDEA 的 debug 过程会调用 ConcurrentLinkedQueue#toString 导致的,敞开即可。
ConcurrentLinkedQueue#toString 办法会创立迭代器,会调用到 ConcurrentLinkedQueue#first 办法,该办法会将首个无效节点作为头节点。
java.util.concurrent.ConcurrentLinkedQueue.Itr#Itr
java.util.concurrent.ConcurrentLinkedQueue.Itr#advance
java.util.concurrent.ConcurrentLinkedQueue#first
Node<E> first() { // 获取第一个具备非空元素的节点 restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { boolean hasItem = (p.item != null); if (hasItem || (q = p.next) == null) { updateHead(h, p); return hasItem ? p : null; } else if (p == q) continue restartFromHead; else p = q; } }}
从新打断点调试,能够看到入队后 head 节点不变。
8. 总结
- ConcurrentLinkedQueue 是非阻塞队列,采纳 CAS 和自旋保障并发平安。
- ConcurrentLinkedQueue 的 tail 并不是严格指向尾节点,通过缩小出队时对 tail 的 CAS 以提高效率。
- ConcurrentLinkedQueue 的 head 所指节点可能是空节点,也可能是数据节点,通过缩小出队时对 head 的 CAS 以提高效率。
- 采纳非阻塞算法,容许队列处于不统一状态(head/tail),通过保障不变式和可变式,来保护非阻塞算法的正确性。
- 因为是非阻塞队列,无奈应用在线程池中。
作者:Sumkor
链接:https://segmentfault.com/a/11...