共计 6262 个字符,预计需要花费 16 分钟才能阅读完成。
ConcurrentLinkedQueue 原理探索
介绍
ConcurrentLinkedQueue 是 线程平安的无界非阻塞队列,底层应用单向链表实现,对于入队和出队操作应用 CAS 实现线程平安。
ConcurrentLinkedQueue 外部的队列应用单向链表形式实现,其中有 两个 volatile 类型的 Node 节点
别离用来寄存队列的头、尾节点。
上面无参构造函数中能够晓得,默认头尾节点都是指向 item 为 null 的 哨兵节点,新元素插入队尾,获取元素从头部出队。
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {head = tail = new Node<E>(null);
}
在 Node 节点中保护一个应用 volatile 润饰的变量 item,用来寄存节点的值;next 寄存链表的下一个节点;其外部则应用 UNSafe 工具类提供的 CAS 算法来保障出入队时操作链表的原子性。
offer 操作
offer 操作是在队尾减少一个元素,如果传递的参数是 null 则抛出异样,否则因为 ConcurrentLinkedQueue 是无界队列,该办法会始终返回 true,另外,因为应用的是 CAS 算法,所以该办法不会被阻塞挂起调用方线程。上面咱们看源码:
public boolean offer(E e) {
//(1)为 null 则抛出异样
checkNotNull(e);
//(2)结构 Node 节点,外部调用 unsafe.putObject
final Node<E> newNode = new Node<E>(e);
//(3)从尾节点插入
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
//(4)如果 q == null 则阐明 p 是尾节点,执行插入
if (q == null) {
//(5)应用 CAS 设置 p 节点的 next 节点
if (p.casNext(null, newNode)) {
//(6)如果 p != t,则将入队节点设置成 tail 节点,更新失败了也没关系,因为失败了示意有其余线程胜利更新了 tail 节点
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
}
else if (p == q)
//(7)多线程操作时,因为 poll 操作移除元素后可能会把 head 变成自援用,也就是 head 的 next 变成了 head,这里须要从新找新的 head
p = (t != (t = tail)) ? t : head;
else
//(8)寻找尾节点
p = (p != t && t != (t = tail)) ? t : q;
}
}
咱们解析一下这个办法的执行流程,首先当调用该办法时,代码(1)对传参进行查看,为 null 则抛出异样,否则执行代码(2)并应用 item 作为结构函数参数创立一个新的节点,而后代码(3)从队列尾部节点开始循环,打算从尾部增加元素,到代码(4)时队列状态如下图。
这时候节点 p、t、head、tail 都指向 item 为 null 的哨兵节点,因为哨兵节点 next 节点为 null,所以 q 也是 null。
代码(4)发现 q == null 则执行代码(5),应用 CAS 判断 p 节点的 next 是否为 null,为 null 则应用 newNode 替换 p 的 next 节点,而后执行代码(6),但这时候 p == t 所以没有设置尾部节点,而后退出 offer 办法。目前队列状态如下图:
方才咱们解说的是一个线程调用 offer 办法的状况,然而如果多个线程同时调用,就会存在多个线程同时执行到代码(5)的状况。
假如线程 1 调用了 offer(item1),线程 2 调用了 offer(item2),都同时执行到了代码(5),p.casNext(null,newNode)。因为 CAS 是原子性的,咱们假如线程 1 先执行 CAS 操作,发现 p.next 为 null,则更新为 item1,这时候线程 2 也会判断 p.next 是否为 null,发现不为 null,则跳转到代码(3),而后执行代码(4)。这时队列散布如下图:
随后线程 2 发现不满足代码(4),则跳转执行代码(8),而后把 q 赋值给了 p。队列状态如下:
而后线程 2 再次跳转到代码(3)执行,当执行到代码(4)时队列状态如下图:
这时候q == null
,所以线程 2 会执行代码(5),通过 CAS 操作判断 p.next 节点是否为 null,是 null 则应用 item2 替换,不是则持续循环尝试。假如 CAS 胜利,那么执行代码(6),因为p != t
,所以设置 tail 节点为 item2,而后退出 offer 办法。这时候队列散布如下:
到当初咱们就差代码(7)没有讲过,其实在这一步次要是在执行 poll 操作后才会执行。在执行 poll 可能会产生如下状况:
咱们剖析一下如果是这种状态时调用 offer 增加元素,在执行到代码(4)时状态图如下:
这里因为 q 节点不为空并且 p == q
所以执行到代码(7)因为 t == tail
所以 p 被赋值为 head,而后从新循环,循环后执行到代码(4),队列状态如下:
这时候因为 q == null
,所以执行代码(5)进行 CAS 操作,如果以后没有其余线程执行 offer 操作,则 CAS 操作会胜利,p.next 节点被设置为新增节点。而后执行代码(6),因为p != t
所以设置新节点为队列的尾部节点,当初队列状态如下图:
📢 须要留神,这里自援用节点会被垃圾回收掉
总结:offer 操作的关键步骤是代码(5),通过 CAS 操作来管制同一时刻只有一个线程能够追加元素到队列开端,而失败的线程会进行循环尝试 CAS 操作,直到 CAS 胜利。
add 操作
add 操作是在链表开端增加一个元素,外部还是调用的 offer 办法。
public boolean add(E e) {return offer(e);
}
poll 操作
poll 操作是在队列头部获取并移除一个元素,如果队列为空则返回 null。上面看看该办法的实现原理。
public E poll() {
//(1)goto 标记
restartFromHead:
//(2)有限循环
for (;;) {for (Node<E> h = head, p = h, q;;) {
//(3)保留以后节点
E item = p.item;
//(4)以后节点有值切 CAS 变为 null
if (item != null && p.casItem(item, null)) {
//(5)CAS 胜利后标记以后节点并移除
if (p != h) // hop two nodes at a time
// 更新头节点
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
//(6)以后队列为空则返回 null
else if ((q = p.next) == null) {updateHead(h, p);
return null;
}
//(7)如果以后节点被自援用,则从新寻找头节点
else if (p == q)
continue restartFromHead;
else //(8)如果下一个元素不为空,则将头节点的下一个节点设置成头节点
p = q;
}
}
}
final void updateHead(Node<E> h, Node<E> p) {if (h != p && casHead(h, p))
h.lazySetNext(h);
}
因为 poll 办法是从头部获取一个元素并移除,所以代码(2)内层循环是从 head 节点开始,代码(3)获取以后队列头节点,队列一开始为空时队列状态如下:
因为 head 节点指向的 item 为 null 的哨兵节点,所以会执行到代码(6),假如这个过程中没有线程调用 offer 办法,则此时 q 等于 null,这时队列状态如下:
随后执行 updateHead 办法,因为h == p
,所以没有设置头节点,而后返回 null。
假如执行到代码(6)时,曾经有其它线程调用了 offer 办法并胜利增加了一个元素到队列,这时候 q 指向的是新减少的元素节点,如下图:
而后不满足代码(6)(q = p.next) == null
,执行代码(7)的时候 p != q
则执行代码(8),而后将 p 指向了节点 q,队列状态如下:
而后程序又去执行代码(3),p 当初指向的不为 null,则执行 p.casItem(item, null)
通过 CAS 操作尝试设置 p 的 item 值为 null,如果此时 CAS 胜利,执行代码(5),此时 p != h
则设置头节点为 p,并且设置 h 的 next 节点为本人,poll 而后返回被移除的节点 item。队列图如下:
目前的队列状态就是咱们方才讲 offer 操作时,执行到代码(7)的状态。
当初咱们还有代码(7)没执行过,咱们看一下什么时候执行。假如线程 1 执行 poll 操作时,队列状态如下:
那么执行 p.casItem(item, null)
通过 CAS 操作尝试设置 p 的 item 值为 null,假如 CAS 设置胜利则标记该节点并从队列中将其移除。队列状态如下:
而后,因为p != h
,所以会执行 updateHead 办法,如果线程 1 执行 updateHead 前另外一个线程 2 开始 poll 操作,这时候线程 2 的 p 指向 head 节点,然而还没有执行到代码(6),这时候队列状态如下:
而后线程 1 执行 updateHead 操作,执行结束后线程 1 退出,这时候队列状态如下:
而后线程 2 继续执行代码(6), q = p.next
,因为该节点是自援用节点,所以p == q
,所以会执行代码(7)跳到外层循环 restartFromHead,获取以后队列头 head,当初的状态如下:
总结:poll 办法在移除一个元素时,只是简略地应用 CAS 操作把以后节点的 item 值设置为 null,而后通过从新设置头节点将该元素从队列外面移除,被移除的节点就成了孤立节点,这个节点会在垃圾回收时被回收掉。另外,如果在执行分支中发现头节点被批改了,要跳到外层循环从新获取新的头节点。
peek 操作
peek 操作是获取队列头部一个元素(只获取不移除),如果队列为空则返回 null。上面看下其实现原理。
public E peek() {
//1.
restartFromHead:
for (;;) {for (Node<E> h = head, p = h, q;;) {
//2.
E item = p.item;
//3.
if (item != null || (q = p.next) == null) {updateHead(h, p);
return item;
}
//4.
else if (p == q)
continue restartFromHead;
//5.
else
p = q;
}
}
}
peek 操作的代码构造与 poll 操作相似,不同之处在于代码(3)中少了 castItem 操作。其实这很失常,因为 peek 只是获取队列头元素值,并不清空其值。依据后面的介绍咱们晓得第一次执行 offer 后 head 指向的是哨兵节点(也就是 item 为 null 的节点),那么第一次执行 peek 时在代码(3)中会发现item == null
,而后执行q = p.next
,这时候 q 节点指向的才是队列外面第一个真正的元素,或者如果队列为 null 则 q 指向 null。
当队列为空时,队列如下:
这时候执行 updateHead,因为 h 节点等于 p 节点,所以不进行任何操作,而后 peek 操作会返回 null。
当队列中至多有一个元素时(假如只有一个),队列状态如下:
这时候执行代码(5), p 指向了 q 节点,而后执行代码(3),此时队列状态如下:
执行代码(3)时发现 item 不为 null,所以执行 updateHead 办法,因为h != p
,所以设置头节点,设置后队列状态如下:
最初剔除哨兵节点。
总结 :peek 操作的代码与 poll 操作相似,只是前者只获取队列头元素然而并不从队列里将它删除,而后者获取后须要从队列外面将它删除。另外, 在第一次调用 peek 操作时,会删除哨兵节点,并让队列的 head 节点指向队列外面第一个元素或者为 null。
size 操作
计算以后队列元素个数,在并发环境下不是很有用,因为 CAS 没有加锁,所以从调用 size 办法到返回后果期间有可能增删元素,导致统计的元素个数不准确。
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}
// 获取第一个队列元素,(剔除哨兵节点),没有则为 null
Node<E> first() {
restartFromHead:
for (;;) {for (Node<E> h = head, p = h, q;;) {boolean hasItem = (p.item != null);
if (hasItem || (q = p.next) == null) {updateHead(h, p);
return hasItem ? p : null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
remove 操作
如果队列外面存在该元素则删除该元素,如果存在多个则删除第一个,并返回 true,否则返回 false。
public boolean remove(Object o) {if (o != null) {
Node<E> next, pred = null;
for (Node<E> p = first(); p != null; pred = p, p = next) {
boolean removed = false;
E item = p.item;
if (item != null) {
// 若不匹配,则获取 next 节点持续匹配
if (!o.equals(item)) {next = succ(p);
continue;
}
// 相等则应用 CAS 设置为 null
removed = p.casItem(item, null);
}
// 获取删除节点的后继节点
next = succ(p);
// 如果有前驱节点,并且 next 节点不为 null,则链接前驱节点到 next 节点
if (pred != null && next != null) // unlink
// 将被删除的节点通过 CAS 移除队列
pred.casNext(p, next);
if (removed)
return true;
}
}
return false;
}
contains 操作
判断队列外面是否含有指定对象,因为是遍历整个队列,所以像 size 操作一样后果也不是那么准确,有可能调用该办法时元素还在队列外面,然而遍历过程中其余线程才把该元素删除了,那么就会返回 false。
public boolean contains(Object o) {if (o == null) return false;
for (Node<E> p = first(); p != null; p = succ(p)) {
E item = p.item;
if (item != null && o.equals(item))
return true;
}
return false;
}
总结
ConcurrentLinkedQueue 的底层应用单向链表数据结构来保留队列元素,每个元素被包装成一个 Node 节点。队列是靠头、尾节点来保护的,创立队列时头、尾节点指向一个 item 为 null 的哨兵节点。第一次执行 peek 或者 first 操作时会把 head 指向第一个真正的队列元素。因为应用非阻塞 CAS 算法,没有加锁,所以在计算 size 时有可能进行了 offer、poll 或者 remove 操作,导致计算的元素个数不准确,所以在并发状况下 size 办法不是很有用。