乐趣区

关于java:通俗易懂的JUC源码剖析ConcurrentLinkedQueue

简介

ConcurrentLinkedQueue 是 JUC 包下的线程平安的无界非阻塞队列,它与 BlockingQueue 接口实现类最大的不同就是,BlockingQueue 是阻塞队列,而 ConcurrentLinkedQueue 是非阻塞队列。这里的阻塞非阻塞,指的是队列满了或为空的时候,线程移除或放入元素的时候,是否须要阻塞挂起。BlockingQueue 底层是用锁实现的,而 ConcurrentLinkedQueue 底层应用 CAS 实现的。

实现原理

先来看重要的属性和数据结构:

// 头结点
private transient volatile Node<E> head;
// 尾结点
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {head = tail = new Node<E>(null);
}

其中,Node 是它的外部类:

private static class Node<E> {
    volatile E item;
    volatile Node<E> next;
    Node(E item) {UNSAFE.putObject(this, itemOffset, item);
    }
    // CAS 形式批改以后结点的元素
    boolean casItem(E cmp, E val) {return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }
    // 提早设置以后结点的后继结点
    void lazySetNext(Node<E> val) {
        // 有序、提早版本的 putObjectVolatile 办法,不保障值的扭转被其余线程立刻可见。只有在 field 被 volatile 润饰时无效
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }
    // CAS 形式设置以后结点的后继结点
    boolean casNext(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }
    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;
    static {
           try {UNSAFE = sun.misc.Unsafe.getUnsafe();
               Class<?> k = Node.class;
               itemOffset = UNSAFE.objectFieldOffset
                   (k.getDeclaredField("item"));
               nextOffset = UNSAFE.objectFieldOffset
                   (k.getDeclaredField("next"));
            } catch (Exception e) {throw new Error(e);
            }
    }
}

不难看出,ConcurrentLinkedQueue 是用单向链表实现的。

再来看它的重要办法实现:
offer 操作

public boolean offer(E e) {
    // e 为空则抛空指针
    checkNotNull(e);
    // 结构待插入的元素结点
    final Node<E> newNode = new Node<E>(e);
    // 多线程环境下,从尾结点处,循环尝试插入
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) {
               // q 为 null 阐明 p 是尾结点,尝试 CAS 插入
               // p is last node
               if (p.casNext(null, newNode)) {
                   // Successful CAS is the linearization point
                   // for e to become an element of this queue, 
                   // and for newNode to become "live". 
                   if (p != t) // hop two nodes at a time
                      casTail(t, newNode); // Failure is OK.
                   return true;
               }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to 
            // jump to head, from which all live nodes are always 
            // reachable.  Else the new tail is a better bet. 
            p = (t != (t = tail)) ? t : head;
        else 
            // Check for tail updates after two hops.
            p = (p != t && t != (t = tail)) ? t : q;
 }
}

我滴妈,逻辑太简单了,后续再剖析,明天先睡了。

退出移动版