乐趣区

关于java:阅读-JDK-8-源码ConcurrentLinkedQueue

ConcurrentLinkedQueue 是一个由链表构造组成的无界非阻塞队列,是 JDK 中惟一一个并发平安的非阻塞队列。应用无锁算法来保障线程平安,为了缩小 CAS 操作造成的资源抢夺损耗,其链表构造被设计为“松弛”的,本文对 ConcurrentLinkedQueue 的入队和出队过程进行图解,直观展现其内部结构。

1. 继承体系

java.util.concurrent.ConcurrentLinkedQueue

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable

2. 数据结构

ConcurrentLinkedQueue 的数据结构为链表。

2.1 链表节点

须要留神的是,item 为空示意有效节点,非空示意无效节点。
有效节点是须要从链表中清理掉的节点,ConcurrentLinkedQueue 队列中为什么要存储有效节点呢,持续往下看。

java.util.concurrent.ConcurrentLinkedQueue.Node

private static class Node<E> {
    volatile E item;       // 节点的数据
    volatile Node<E> next; // 下一个节点
    
    /**
     * Constructs a new node.  Uses relaxed write because item can
     * only be seen after publication via casNext.
     */
    Node(E item) {UNSAFE.putObject(this, itemOffset, item);
    }

    boolean casItem(E cmp, E val) {return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    // 相比 putObjectVolatile(),putOrderedObject() 不保障内存可见性,然而性能较高
    void lazySetNext(Node<E> val) {UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    boolean casNext(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }
}    

2.2 head 和 tail 节点

队列中定义了 head 和 tail 节点。
因为采纳了非阻塞算法(non-blocking algorithms),head 和 tail 节点并不严格指向链表的头尾节点,也就是每次入队出队操作并不会及时更新 head 和 tail 节点。
通过规定“不变式”和“可变式”来保护非阻塞算法的正确性。

不变式:并发对象须要始终放弃的个性。
不变式是并发对象的各个办法之间必须恪守的“契约”,每个办法在调用前和调用后都必须放弃不变式。
采纳不变式,就能够隔离的剖析每个办法,而不必思考它们之间所有可能的交互。

根本不变式

在执行办法之前和之后,队列必须要放弃的不变式(The fundamental invariants):

  • 当入队插入新节点之后,队列中有一个 next 域为 null 的节点(真正的尾节点)。
  • 从 head 开始遍历队列,能够拜访所有 item 域不为 null 的节点(无效节点)。

head 的不变式和可变式

/**
 * A node from which the first live (non-deleted) node (if any)
 * can be reached in O(1) time.
 * Invariants:
 * - all live nodes are reachable from head via succ()
 * - head != null
 * - (tmp = head).next != tmp || tmp != head
 * Non-invariants:
 * - head.item may or may not be null.
 * - it is permitted for tail to lag behind head, that is, for tail
 *   to not be reachable from head!
 */
private transient volatile Node<E> head;

head 的不变式:

  • 所有的无效节点,都能从 head 通过调用 succ() 办法遍历可达。
  • head 不能为 null。
  • head 节点的 next 域不能引用到本身。

head 的可变式:

  • head 节点的 item 域可能为 null,也可能不为 null。
  • 容许 tail 滞后(lag behind)于 head,也就是说:从 head 开始遍历队列,不肯定能达到 tail。

tail 的不变式和可变式

/**
 * A node from which the last node on list (that is, the unique
 * node with node.next == null) can be reached in O(1) time.
 * Invariants:
 * - the last node is always reachable from tail via succ()
 * - tail != null
 * Non-invariants:
 * - tail.item may or may not be null.
 * - it is permitted for tail to lag behind head, that is, for tail
 *   to not be reachable from head!
 * - tail.next may or may not be self-pointing to tail.
 */
private transient volatile Node<E> tail;

tail 的不变式:

  • 通过 tail 调用 succ() 办法,最初节点总是可达的。
  • tail 不能为 null。

tail 的可变式:

  • tail 节点的 item 域可能为 null,也可能不为 null。
  • 容许 tail 滞后于 head,也就是说:从 head 开始遍历队列,不肯定能达到 tail。
  • tail 节点的 next 域能够援用到本身。

3. 构造函数

默认创立空节点,head 和 tail 都指向该节点。

/**
 * Creates a {@code ConcurrentLinkedQueue} that is initially empty.
 */
public ConcurrentLinkedQueue() {head = tail = new Node<E>(null);
}

/**
 * Creates a {@code ConcurrentLinkedQueue}
 * initially containing the elements of the given collection,
 * added in traversal order of the collection's iterator.
 *
 * @param c the collection of elements to initially contain
 * @throws NullPointerException if the specified collection or any
 *         of its elements are null
 */
public ConcurrentLinkedQueue(Collection<? extends E> c) {
    Node<E> h = null, t = null;
    for (E e : c) {checkNotNull(e);
        Node<E> newNode = new Node<E>(e);
        if (h == null)
            h = t = newNode;
        else {t.lazySetNext(newNode);
            t = newNode;
        }
    }
    if (h == null)
        h = t = new Node<E>(null);
    head = h;
    tail = t;
}

4. 入队

4.1 源码剖析

因为是无界队列,add(e)办法不必抛出异样。不反对增加 null。

java.util.concurrent.ConcurrentLinkedQueue#add

/**
 * Inserts the specified element at the tail of this queue.
 * As the queue is unbounded, this method will never throw
 * {@link IllegalStateException} or return {@code false}.
 *
 * @return {@code true} (as specified by {@link Collection#add})
 * @throws NullPointerException if the specified element is null
 */
public boolean add(E e) {return offer(e);
}

入队的外围逻辑:

java.util.concurrent.ConcurrentLinkedQueue#offer

/**
 * Inserts the specified element at the tail of this queue.
 * As the queue is unbounded, this method will never return {@code false}.
 *
 * @return {@code true} (as specified by {@link Queue#offer})
 * @throws NullPointerException if the specified element is null
 */
public boolean offer(E e) {checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    // 留神 tail 不肯定是尾节点(甚至 tail 有可能存在于废除的链上,后有解释),然而也无妨从 tail 节点开始遍历链表
    for (Node<E> t = tail, p = t;;) { // 初始时 t 和 p 都指向 tail 节点
        Node<E> q = p.next;
        if (q == null) { // 应用 p.next 是否为空来判断 p 是否是尾节点,比拟精确
            // p is last node // 进入这里阐明此时 p 是尾节点
            if (p.casNext(null, newNode)) { // 若节点 p 的下一个节点为 null,则设置为 newNode
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
                // and for newNode to become "live".
                if (p != t) // hop two nodes at a time 
                // 不论 p 与 t 是否雷同,都应该 casTail。然而这里只在 p 与 t 不同时才 casTail,导致 tail 节点不总是尾节点,目标是缩小对 tail 的 CAS
                    casTail(t, newNode);  // Failure is OK. // 将尾节点 tail 由 t 改为 newNode,更新失败了也没关系,因为 tail 是不是尾节点不重要:)
                return true;
            }
            // Lost CAS race to another thread; re-read next // CAS 失败,阐明其余线程先一步操作使得 p 的下一个节点不为 null,需从新获取尾节点
        }
        else if (p == q) // 如果 p 的 next 等于 p,阐明 p 曾经出队了,须要从新设置 p、t 的值
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet. 
            // 1. 若节点 t 不再是 tail,阐明其余线程退出过元素(批改过 tail),则取最新 tail 作为 t 和 p,从新的 tail 节点持续遍历链表
            // 2. 若节点 t 仍旧是 tail,阐明从 tail 节点开始遍历链表曾经不论用了,则把 head 作为 p,从 head 节点从头遍历链表(留神这一步造成后续遍历中 p!= t 成立)p = (t != (t = tail)) ? t : head;
            // 这里没有更新 tail,仍留在废链上
        else
            // Check for tail updates after two hops. 
            // 进入这里,阐明 p.next 不为 null,且 p 未出队,须要判断:// 1. 若 p 与 t 相等,则 t 留在原位,p=p.next 始终往下遍历(留神这一步造成后续遍历中 p!= t 成立)。// 2. 若 p 与 t 不等,需进一步判断 t 与 tail 是否相等。若 t 不为 tail,则取最新 tail 作为 t 和 p;若 t 为 tail,则 p =p.next 始终往下遍历。// 就是说从 tail 节点往后遍历链表的过程,需时刻关注 tail 是否发生变化
            p = (p != t && t != (t = tail)) ? t : q;  
    }
}

更新 tail 节点:

java.util.concurrent.ConcurrentLinkedQueue#casTail

private boolean casTail(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}

入队的根本思维:

  1. 从 tail 节点开始遍历到尾节点,若定位到尾节点(p.next == null),则入队。
  2. 遍历过程中,如果遍历到有效节点(p.next == p),须要从新从无效节点(tail 或 head)开始遍历。
  3. 遍历过程中,时刻关注 tail 节点是否有效。若有效了须要从新从最新的 tail 开始遍历,否则持续遍历以后的下一个节点。

须要留神的点:

  1. 入队过程中没有频繁执行 casTail(出队过程不会执行 casTail),因而 tail 地位有滞后,不肯定指向尾节点,甚至可能位于废除的链上。
  2. 应用 p.next == null 来判断尾节点,比应用 tail 精确。
  3. 通过 tail 遍历节点可能会遍历到有效节点,然而从 head 遍历总能拜访到无效节点。

4.2 入队过程图示

执行 offer(e) 入队,tail 并不总是指向尾节点,多个元素入队过程如下:

增加第一个元素(t 与 p 相等,不会更新 tail):

增加第二个元素(t 与 p 不相等,更新 tail):

增加第三个元素(t 与 p 相等,不会更新 tail):

4.3 tail 位于废除链

因为出队 poll() 逻辑并不会执行 casTail() 来保护 tail 所在位置,因而 tail 可能滞后于 head,甚至位于废除链上,如下图所示:

此时从 tail 往后遍历会拜访到有效节点 p,该节点满足 p == p.next

如果想要持续拜访到无效节点,需分两种状况:

  1. 从遍历开始至今,tail 的地位无变动,此时须要从 head 节点开始往下能力遍历到无效节点。
  2. 从遍历开始至今,tail 的地位产生了变动,阐明其余线程更新了 tail 的地位,此时从新的 tail 开始往下遍历即可。

5. 出队

java.util.concurrent.ConcurrentLinkedQueue#poll

public E poll() {
    restartFromHead:
    for (;;) {for (Node<E> h = head, p = h, q;;) { // 初始时 h 和 p 都指向 head 节点,从 head 节点开始遍历链表
            E item = p.item;

            if (item != null && p.casItem(item, null)) { // p.item 不为空,把 p 节点的数据域设为空,返回 p 节点的数据
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time
                    // 若 p.next 不为空,则把 p.next 设为头节点,把 h 和 p 出队;若 p.next 为空,则把 p 设为头节点,把 h 出队
                    updateHead(h, ((q = p.next) != null) ? q : p); 
                return item;
            }
            else if ((q = p.next) == null) { // 进入这里,阐明 p.item 必然为空。若 p.next 也为空,阐明队列中没有数据了,须要返回 null
                updateHead(h, p); // 把头节点设为 p,把 h 出队
                return null;
            }
            else if (p == q) // 如果 p 的 next 等于 p,阐明 p 曾经出队了,从新从头节点开始遍历
                continue restartFromHead;
            else
                p = q; // p = p.next 持续遍历链表
        }
    }
}

更新 head 节点:

java.util.concurrent.ConcurrentLinkedQueue#updateHead

/**
 * Tries to CAS head to p. If successful, repoint old head to itself
 * as sentinel for succ(), below.
 */
final void updateHead(Node<E> h, Node<E> p) {if (h != p && casHead(h, p)) // 节点 h 和 p 不等,且以后头节点为 h,则把头节点设为 p
        h.lazySetNext(h); // 原头节点 h 的 next 指向本身,示意 h 出队
}

出队的根本思维:

  1. 从 head 节点开始遍历找出首个无效节点(p.item != null),返回该节点的数据(p.item)。
  2. 遍历过程中,如果遍历到尾节点(p.next == null),则返回空。
  3. 遍历过程中,如果遍历到有效节点(p.next == p),阐明其余线程批改了 head,须要从新从无效节点(新的 head)开始遍历。

须要留神的是,并不是每次出队时都执行 updateHead() 更新 head 节点:

  1. 当 head 节点里有元素时,间接弹出 head 节点里的元素,而不会更新 head 节点。
  2. 只有当 head 节点里没有元素时,出队操作才会更新 head 节点。

采纳这种形式同样是为了缩小应用 CAS 更新 head 节点的耗费,从而进步出队效率。

5.1 出队过程图示

场景一:队列中具备两个节点,头节点为有效节点。因为 p != h,此时须要把头节点出队。

场景二:队列中具备两个节点,头节点为无效节点。因为 p == h,此时不须要把头节点出队。

6. 容量

不必定义初始容量,毋庸扩容,容量最大值为 Integer.MAX_VALUE。

获取队列的容量:从头开始遍历队列中的无效节点,并计数。留神是遍历过程是弱统一的。

java.util.concurrent.ConcurrentLinkedQueue#size

public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p)) // 从第一个有数据的节点开始,始终遍历链表
        if (p.item != null)
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE) // 自增直到最大值
                break;
    return count;
}

java.util.concurrent.ConcurrentLinkedQueue#succ

/**
 * Returns the successor of p, or the head node if p.next has been
 * linked to self, which will only be true if traversing with a
 * stale pointer that is now off the list.
 */
final Node<E> succ(Node<E> p) {
    Node<E> next = p.next;
    return (p == next) ? head : next; // 如果 p 曾经出队了,则从新从头节点开始,否则持续遍历下一个节点
}

7. IDEA 调试的问题

编写单元测试,对 ConcurrentLinkedQueue#offer 进行调试。

@Test
public void test() {ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
    queue.offer("a");
    queue.offer("b");
    queue.offer("c");
    queue.offer("d");
}

浏览源码可知入队过程并不会批改 head 节点,然而从 IDEA 的 debug 后果看到 head 节点产生了变动!

这是因为 IDEA 的 debug 过程会调用 ConcurrentLinkedQueue#toString 导致的,敞开即可。

ConcurrentLinkedQueue#toString 办法会创立迭代器,会调用到 ConcurrentLinkedQueue#first 办法,该办法会将首个无效节点作为头节点。

java.util.concurrent.ConcurrentLinkedQueue.Itr#Itr
java.util.concurrent.ConcurrentLinkedQueue.Itr#advance
java.util.concurrent.ConcurrentLinkedQueue#first

Node<E> first() { // 获取第一个具备非空元素的节点
    restartFromHead:
    for (;;) {for (Node<E> h = head, p = h, q;;) {boolean hasItem = (p.item != null);
            if (hasItem || (q = p.next) == null) {updateHead(h, p);
                return hasItem ? p : null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

从新打断点调试,能够看到入队后 head 节点不变。

8. 总结

  1. ConcurrentLinkedQueue 是非阻塞队列,采纳 CAS 和自旋保障并发平安。
  2. ConcurrentLinkedQueue 的 tail 并不是严格指向尾节点,通过缩小出队时对 tail 的 CAS 以提高效率。
  3. ConcurrentLinkedQueue 的 head 所指节点可能是空节点,也可能是数据节点,通过缩小出队时对 head 的 CAS 以提高效率。
  4. 采纳非阻塞算法,容许队列处于不统一状态(head/tail),通过保障不变式和可变式,来保护非阻塞算法的正确性。
  5. 因为是非阻塞队列,无奈应用在线程池中。

作者:Sumkor
链接:https://segmentfault.com/a/11…

退出移动版