共计 3254 个字符,预计需要花费 9 分钟才能阅读完成。
前言
JUC 上面的相干源码持续往下浏览,这就看到了非阻塞的无界限程平安队列 —— ConcurrentLinkedQueue,来一起看看吧。
公众号:『刘志航』,记录工作学习中的技术、开发及源码笔记;时不时分享一些生存中的见闻感悟。欢送大佬来领导!
介绍
基于链接节点的无界限程平安队列,对元素 FIFO(先进先出)进行排序。队列的头部是队列中最长工夫的元素,队列的尾部是队列中最短时间的元素。在队列的尾部插入新元素,队列检索操作获取队列头部的元素。
当许多线程共享对公共汇合的拜访 ConcurrentLinkedQueue 是一个适合的抉择。与大多数其余并发汇合实现一样,此类不容许应用 null 元素。
根本应用
public class ConcurrentLinkedQueueTest {public static void main(String[] args) {ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
// 将指定元素插入此队列的尾部。queue.add("liuzhihang");
// 将指定元素插入此队列的尾部。queue.offer("liuzhihang");
// 获取但不移除此队列的头,队列为空返回 null。queue.peek();
// 获取并移除此队列的头,此队列为空返回 null。queue.poll();}
}
源码剖析
根本构造
参数介绍
private static class Node<E> {
// 节点中的元素
volatile E item;
// 下一个节点
volatile Node<E> next;
Node(E item) {UNSAFE.putObject(this, itemOffset, item);
}
// CAS 的形式设置节点元素
boolean casItem(E cmp, E val) {return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
// 设置下一个节点
void lazySetNext(Node<E> val) {UNSAFE.putOrderedObject(this, nextOffset, val);
}
// CAS 的形式设置下一个节点
boolean casNext(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// 省略 ……
}
在 ConcurrentLinkedQueue 外部含有一个外部类 Node,如上所示,这个外部类用来标识链表中的一个节点,通过代码能够看出,在 ConcurrentLinkedQueue 中的链表为 单向链表
。
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {
// 其余省略
// 头结点
private transient volatile Node<E> head;
// 尾节点
private transient volatile Node<E> tail;
}
头尾节点应用 volatile
润饰,保障内存可见性。
构造函数
public ConcurrentLinkedQueue() {head = tail = new Node<E>(null);
}
当创建对象时,头尾节点都是指向一个空节点。
增加元素
public boolean add(E e) {return offer(e);
}
public boolean offer(E e) {
// 验证是否为空
checkNotNull(e);
// 创立节点
final Node<E> newNode = new Node<E>(e);
// 循环入队列
// t 是以后尾节点,p 初始为 t
for (Node<E> t = tail, p = t;;) {
// q 为尾节点的下一个节点
Node<E> q = p.next;
if (q == null) {
// 为空,阐明前面没有节点,则 CAS 设置尾节点
if (p.casNext(null, newNode)) {
// 此时 p.next 是 newNode
// 如果 p!= t 阐明有并发
if (p != t)
// 其余线程曾经更新了 tail
// q = p.next 所以 q == null 不正确了
// q 取到了 t.next
// 此时将 tail 更新为 新节点
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
// 多线程状况下,poll,操作移除元素,可能会导致 p == q
// 此时要从新查找
else if (p == q)
//
p = (t != (t = tail)) ? t : head;
else
// 查看 tail 并更新
p = (p != t && t != (t = tail)) ? t : q;
}
}
画图阐明:
- 单线程状况下:
- 当执行到
Node<E> q = p.next;
时,当前情况如图所示:
- 判断
q == null
,满足条件,此时便会执行p.casNext(null, newNode)
应用 CAS 设置 p.next。 - 设置胜利之后,
p == t
没有变动,所以程序退出。
- 多线程状况下:
- 当执行到
Node<E> q = p.next;
时,当前情况如图所示:
- 多个线程执行
p.casNext(null, newNode)
应用 CAS 设置 p.next。 - A 线程 CAS 设置胜利:
- B 线程 CAS 执行失败,从新循环,会执行到
p = (p != t && t != (t = tail)) ? t : q
。
- 再次循环就能够胜利设置上了。
获取元素
public E poll() {
restartFromHead:
// 有限循环
for (;;) {for (Node<E> h = head, p = h, q;;) {
// 头结点的 iterm
E item = p.item;
// 以后节点如果不为 null CAS 设置为 null
if (item != null && p.casItem(item, null)) {
// CAS 胜利 则标记移除
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 以后队列未空 返回 null
else if ((q = p.next) == null) {updateHead(h, p);
return null;
}
// 自援用了,从新进行循环
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
画图过程如下:
- 在执行内层循环时,如果队列为空:
E item = p.item;
此时,iterm 为 null,会updateHead(h, p)
并返回 null。 - 假如同时有并发插入操作,增加了一个元素,此时如图所示:
这时会执行最初的 else 将 p = q
- 持续循环获取 item,并执行
p.casItem(item, null)
,而后判断p != h
,更新 head 并返回 item。
这里的状况比较复杂,这里只是列举一种,如果须要能够本人多列举几种。
而查看元素的代码和获取元素代码相似就不多介绍了。
size 操作
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}
CAS 没有加锁,所以 size 是不精确的。并且 size 会遍历一遍列表,比拟消耗性能。
总结
ConcurrentLinkedQueue 在工作中应用的绝对较少,所以浏览相干源码的时候也只是大略看了一下,理解罕用 API,以及底层原理。
简略总结就是应用 单向链表 来保留队列元素,外部应用非阻塞的 CAS 算法,没有加锁。所以计算 size 时可能不精确,同样 size 会遍历链表,所以并不倡议应用。