前言

JUC 上面的相干源码持续往下浏览,这就看到了非阻塞的无界限程平安队列 —— ConcurrentLinkedQueue,来一起看看吧。

公众号:『 刘志航 』,记录工作学习中的技术、开发及源码笔记;时不时分享一些生存中的见闻感悟。欢送大佬来领导!

介绍

基于链接节点的无界限程平安队列,对元素FIFO(先进先出)进行排序。 队列的头部是队列中最长工夫的元素,队列的尾部是队列中最短时间的元素。 在队列的尾部插入新元素,队列检索操作获取队列头部的元素。

当许多线程共享对公共汇合的拜访 ConcurrentLinkedQueue 是一个适合的抉择。 与大多数其余并发汇合实现一样,此类不容许应用null元素。

根本应用

public class ConcurrentLinkedQueueTest {    public static void main(String[] args) {        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();        // 将指定元素插入此队列的尾部。        queue.add("liuzhihang");        // 将指定元素插入此队列的尾部。        queue.offer("liuzhihang");        // 获取但不移除此队列的头,队列为空返回 null。        queue.peek();        // 获取并移除此队列的头,此队列为空返回 null。        queue.poll();            }}

源码剖析

根本构造

参数介绍

private static class Node<E> {        // 节点中的元素    volatile E item;    // 下一个节点    volatile Node<E> next;    Node(E item) {        UNSAFE.putObject(this, itemOffset, item);    }    // CAS 的形式设置节点元素    boolean casItem(E cmp, E val) {        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);    }    // 设置下一个节点    void lazySetNext(Node<E> val) {        UNSAFE.putOrderedObject(this, nextOffset, val);    }    // CAS 的形式设置下一个节点    boolean casNext(Node<E> cmp, Node<E> val) {        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);    }    // 省略 ……}

在 ConcurrentLinkedQueue 外部含有一个外部类 Node,如上所示,这个外部类用来标识链表中的一个节点,通过代码能够看出,在 ConcurrentLinkedQueue 中的链表为单向链表

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>        implements Queue<E>, java.io.Serializable {            // 其余省略    // 头结点    private transient volatile Node<E> head;          // 尾节点    private transient volatile Node<E> tail;}

头尾节点应用 volatile 润饰,保障内存可见性。

构造函数

public ConcurrentLinkedQueue() {    head = tail = new Node<E>(null);}

当创建对象时,头尾节点都是指向一个空节点。

增加元素

public boolean add(E e) {    return offer(e);}public boolean offer(E e) {        // 验证是否为空    checkNotNull(e);    // 创立节点    final Node<E> newNode = new Node<E>(e);    // 循环入队列    // t 是以后尾节点,p 初始为 t    for (Node<E> t = tail, p = t;;) {        // q 为尾节点的下一个节点        Node<E> q = p.next;        if (q == null) {            // 为空,阐明前面没有节点,则 CAS 设置尾节点            if (p.casNext(null, newNode)) {                // 此时 p.next 是 newNode                // 如果 p != t 阐明有并发                if (p != t)                     // 其余线程曾经更新了 tail                    // q = p.next 所以 q == null 不正确了                    // q 取到了 t.next                    // 此时将 tail 更新为 新节点                    casTail(t, newNode);  // Failure is OK.                return true;            }            // Lost CAS race to another thread; re-read next        }        // 多线程状况下, poll ,操作移除元素,可能会导致 p == q         // 此时要从新查找        else if (p == q)            //             p = (t != (t = tail)) ? t : head;        else            // 查看 tail 并更新            p = (p != t && t != (t = tail)) ? t : q;    }}

画图阐明:

  • 单线程状况下:
  1. 当执行到 Node<E> q = p.next; 时,当前情况如图所示:

  1. 判断 q == null,满足条件,此时便会执行 p.casNext(null, newNode) 应用 CAS 设置 p.next。
  2. 设置胜利之后,p == t 没有变动,所以程序退出。
  • 多线程状况下:
  1. 当执行到 Node<E> q = p.next; 时,当前情况如图所示:

  1. 多个线程执行 p.casNext(null, newNode) 应用 CAS 设置 p.next。
  2. A 线程 CAS 设置胜利:

  1. B 线程 CAS 执行失败, 从新循环,会执行到 p = (p != t && t != (t = tail)) ? t : q

  1. 再次循环就能够胜利设置上了。

获取元素

public E poll() {    restartFromHead:    // 有限循环    for (;;) {        for (Node<E> h = head, p = h, q;;) {            // 头结点的 iterm            E item = p.item;            // 以后节点如果不为 null CAS 设置为 null            if (item != null && p.casItem(item, null)) {                // CAS 胜利 则标记移除                if (p != h) // hop two nodes at a time                    updateHead(h, ((q = p.next) != null) ? q : p);                return item;            }            // 以后队列未空 返回 null            else if ((q = p.next) == null) {                updateHead(h, p);                return null;            }            // 自援用了, 从新进行循环            else if (p == q)                continue restartFromHead;            else                p = q;        }    }}

画图过程如下:

  1. 在执行内层循环时,如果队列为空:E item = p.item; 此时,iterm 为 null,会 updateHead(h, p) 并返回 null。
  2. 假如同时有并发插入操作,增加了一个元素,此时如图所示:

这时会执行最初的 else 将 p = q

  1. 持续循环获取 item,并执行 p.casItem(item, null) , 而后判断 p != h,更新 head 并返回 item。

这里的状况比较复杂,这里只是列举一种,如果须要能够本人多列举几种。

而查看元素的代码和获取元素代码相似就不多介绍了。

size 操作

public int size() {    int count = 0;    for (Node<E> p = first(); p != null; p = succ(p))        if (p.item != null)            // Collection.size() spec says to max out            if (++count == Integer.MAX_VALUE)                break;    return count;}

CAS 没有加锁,所以 size 是不精确的。并且 size 会遍历一遍列表,比拟消耗性能。

总结

ConcurrentLinkedQueue 在工作中应用的绝对较少,所以浏览相干源码的时候也只是大略看了一下,理解罕用 API,以及底层原理。

简略总结就是应用单向链表来保留队列元素,外部应用非阻塞的 CAS 算法,没有加锁。所以计算 size 时可能不精确,同样 size 会遍历链表,所以并不倡议应用。