共计 18673 个字符,预计需要花费 47 分钟才能阅读完成。
LinkedTransferQueue 是一个由链表构造组成的无界阻塞 TransferQueue 队列。
接口 TransferQueue 和实现类 LinkedTransferQueue 从 Java 7 开始退出 J.U.C 之中。
1. 继承体系
java.util.concurrent.LinkedTransferQueue
public class LinkedTransferQueue<E> extends AbstractQueue<E>
implements TransferQueue<E>, java.io.Serializable
2. 数据结构
LinkedTransferQueue 的数据结构为链表,是一个松弛的双重队列(Dual Queues with Slack)。
双重队列 指的是链表中的节点存在两种模式:数据节点(提供数据)、申请节点(申请数据)。
对于 TransferQueue#transfer:
线程入队非数据节点时,如果没有匹配到数据节点则阻塞,直到其余线程提供数据节点与之匹配。
线程入队数据节点时,如果没有匹配到非数据节点则阻塞,直到其余线程提供非数据节点与之匹配。
2.1 节点定义
- 应用 isData 标记该节点的模式。
- 以后节点匹配胜利,则 item 设置为所匹配的节点的数据。此时
item == null
与 isData 的值相同。 - 以后节点勾销匹配,则 item 设置为本身。
- 以后节点出队,则 next 指向本身。
- 应用 waiter 存储操作该节点的线程,期待匹配时挂起该线程,匹配胜利时需唤醒该线程。
与 SynchronousQueue.TransferQueue.QNode 的定义是一样的。
节点的匹配状态由 item 属性来管制:
对于数据节点,在匹配的时候,把该节点的 item 域从非空数据 CAS 设置为空;对于非数据节点,则相同。
static final class Node {
final boolean isData; // 是否是数据节点
volatile Object item; // isData 为 true 时才初始化,匹配时 CAS 批改该字段。应用 Object 而不是泛型 E,容许将 item 指向本身
volatile Node next;
volatile Thread waiter; // null until waiting
// CAS methods for fields
final boolean casNext(Node cmp, Node val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
final boolean casItem(Object cmp, Object val) {// assert cmp == null || cmp.getClass() != Node.class;
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
Node(Object item, boolean isData) {UNSAFE.putObject(this, itemOffset, item); // relaxed write
this.isData = isData;
}
final void forgetNext() {UNSAFE.putObject(this, nextOffset, this);
}
// 节点被勾销或被匹配之后会调用:设置 item 自连贯,waiter 为 null
final void forgetContents() {UNSAFE.putObject(this, itemOffset, this);
UNSAFE.putObject(this, waiterOffset, null);
}
// 是否已匹配(已勾销或已匹配:item 自连贯;已匹配:item == null 与 isData 的值相同)final boolean isMatched() {
Object x = item;
return (x == this) || ((x == null) == isData);
}
// 是否是一个未匹配的申请节点(!isData 为申请节点,item 为空阐明未被批改,而一旦被匹配或勾销则会批改 item)final boolean isUnmatchedRequest() {return !isData && item == null;}
// 如果给定节点不能连贯在以后节点后则返回 true
final boolean cannotPrecede(boolean haveData) {
boolean d = isData;
Object x;
// 以后节点未匹配,且数据模式与给定节点相同,则返回 true
return d != haveData && (x = item) != this && (x != null) == d;
}
/**
* Tries to artificially match a data node -- used by remove.
*/
final boolean tryMatchData() {
// assert isData;
Object x = item;
if (x != null && x != this && casItem(x, null)) {LockSupport.unpark(waiter);
return true;
}
return false;
}
private static final long serialVersionUID = -3375979862319811754L;
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
private static final long waiterOffset;
static {
try {UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
waiterOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiter"));
} catch (Exception e) {throw new Error(e);
}
}
}
2.2 head 和 tail 节点
// 队列头节点,第一次增加节点之前为空
transient volatile Node head;
// 队列尾节点,第一次增加节点之前为空
private transient volatile Node tail;
LinkedTransferQueue 队列的 松弛 体现在:
采纳无锁算法来维持链表的 head 和 tail 节点的地位,head 和 tail 节点并不严格指向链表的头尾节点。
比方,一个蕴含 4 个无效节点的队列构造可能出现为以下模式:
head 节点指向一个已匹配(matched)节点,该节点又指向队列中第一个未匹配(unmatched)节点。
tail 节点指向队列中最初一个节点。
head tail
| |
v v
M -> U -> U -> U -> U
因为队列中的节点须要保护其匹配状态,而一旦节点被匹配了,其匹配状态不会再扭转。
因而,能够在链表头部寄存零个或多个曾经被匹配的前置节点,在链表尾部寄存零个或多个尚未匹配的后置节点。
因为前置和后置节点都容许为零,意味着 LinkedTransferQueue 并不应用 dummy node 作为头节点。
head tail
| |
v v
M -> M -> U -> U -> U -> U
益处是:每次入队出队操作,不会立刻更新 head/tail,而是当 head/tail 节点和最近一个未匹配的节点之间的间隔超过一个“松弛阀值”之后才会更新,能够节俭 CAS 操作的开销。
LinkedTransferQueue 与 ConcurrentLinkedQueue 一样采纳了松弛的队列构造。
3. 构造函数
默认构造函数为空,当第一次退出元素时才初始化 head/tail 节点。
/**
* Creates an initially empty {@code LinkedTransferQueue}.
*/
public LinkedTransferQueue() {}
/**
* Creates a {@code LinkedTransferQueue}
* initially containing the elements of the given collection,
* added in traversal order of the collection's iterator.
*
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public LinkedTransferQueue(Collection<? extends E> c) {this();
addAll(c);
}
与之相比,ConcurrentLinkedQueue 和 SynchronousQueue.TransferQueue 初始化时都会结构空节点(dummy node)。
// java.util.concurrent.ConcurrentLinkedQueue#ConcurrentLinkedQueue()
public ConcurrentLinkedQueue() {head = tail = new Node<E>(null);
}
// java.util.concurrent.SynchronousQueue.TransferQueue#TransferQueue
TransferQueue() {QNode h = new QNode(null, false);
head = h;
tail = h;
}
4. 数据存取及传递
LinkedTransferQueue 中定义了传递数据的 4 种形式:
/*
* Possible values for "how" argument in xfer method.
*/
private static final int NOW = 0; // for untimed poll, tryTransfer // 立刻返回
private static final int ASYNC = 1; // for offer, put, add // 异步,不会阻塞
private static final int SYNC = 2; // for transfer, take // 同步,阻塞直到匹配
private static final int TIMED = 3; // for timed poll, tryTransfer // 超时,阻塞直到超时
传递数据的办法定义:
java.util.concurrent.LinkedTransferQueue#xfer
/**
* Implements all queuing methods. See above for explanation.
*
* @param e the item or null for take // 存入、取出、移交的数据元素
* @param haveData true if this is a put, else a take // 是否具备数据
* @param how NOW, ASYNC, SYNC, or TIMED // 4 种模式
* @param nanos timeout in nanosecs, used only if mode is TIMED // 超时工夫
* @return an item if matched, else e
* @throws NullPointerException if haveData mode but e is null
*/
private E xfer(E e, boolean haveData, int how, long nanos)
LinkedTransferQueue 因为继承了 BlockingQueue,遵循办法约定:
抛出异样 非凡值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
查看 element() peek() 不可用 不可用
此外,TransferQueue 新增了以下办法:
// 尝试移交元素,立刻返回
boolean tryTransfer(E e);
// 尝试移交元素,阻塞直到胜利、超时或中断
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
// 移交元素,阻塞直到胜利或中断
void transfer(E e) throws InterruptedException;
// 判断是否有消费者
boolean hasWaitingConsumer();
// 查看消费者的数量
int getWaitingConsumerCount();
底层都是通过调用 LinkedTransferQueue#xfer 来实现。
入队:add(e) xfer(e, true, ASYNC, 0)
offer(e) xfer(e, true, ASYNC, 0)
put(e) xfer(e, true, ASYNC, 0)
offer(e, time, unit) xfer(e, true, ASYNC, 0)
出队:remove() xfer(null, false, NOW, 0)
poll() xfer(null, false, NOW, 0)
take() xfer(null, false, SYNC, 0)
poll(time, unit) xfer(null, false, TIMED, unit.toNanos(timeout))
移交元素:tryTransfer(e) xfer(e, true, NOW, 0)
tryTransfer(e, time, unit) xfer(e, true, TIMED, unit.toNanos(timeout)
transfer(e) xfer(e, true, SYNC, 0)
因为队列是无界的,入队办法(add/put/offer)既不会抛异样,也不会阻塞或超时。
4.1 xfer
/**
* Implements all queuing methods. See above for explanation.
*
* @param e the item or null for take // 存入、取出、移交的数据元素
* @param haveData true if this is a put, else a take // 是否具备数据
* @param how NOW, ASYNC, SYNC, or TIMED // 4 种模式
* @param nanos timeout in nanosecs, used only if mode is TIMED // 超时工夫
* @return an item if matched, else e
* @throws NullPointerException if haveData mode but e is null
*/
private E xfer(E e, boolean haveData, int how, long nanos) {if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) { // restart on append race
for (Node h = head, p = h; p != null;) { // find & match first node // 从头节点开始遍历,初始时 h 和 p 都指向头节点
boolean isData = p.isData;
Object item = p.item;
if (item != p && (item != null) == isData) { // unmatched // 节点 p 尚未匹配过:item 不是 p,item 是否有值与 isData 相符
if (isData == haveData) // can't match // 节点 p 无奈匹配:节点 p 与入参节点类型雷同。此时需跳出本层循环,尝试入队
break;
if (p.casItem(item, e)) { // match // 节点 p 匹配胜利:item 域的值从 item 变更为 e
for (Node q = p; q != h;) {
// 若 q != h,阐明以后匹配的节点 p 不是头节点,而是位于头节点之后。// 阐明队列头部具备多于一个的已匹配节点,须要设置新的头节点,把已匹配的节点出队
// 循环以节点 p 为起始,始终往后遍历已匹配的节点
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {
// 如果节点 h 是头节点,而 q 是已匹配节点,分为两种状况:// 1. 若 q.next 为空,则将 q 设为新的头节点;// 2. 若 q.next 不为空,则将 q.next 设为新的头节点(留神此时 q 还在队列中,但不可达)h.forgetNext(); // 旧的头节点 h 出队(若 h 之前还有节点,则 h 自连贯代表着以 h 为尾节点的旧链表将被回收)break;
} // advance and retry
// 进入这里,h 不是头节点,阐明其余线程批改过 head,须要取最新的 head 作进一步判断
// 1. 如果 head 为空,或者 head.next 为空,或者 head.next 未匹配,则跳出不再遍历 head.next 了
// 2. 尽管获得最新 head,然而 head.next 是已匹配节点,须要从 head.next 开始遍历,从新设置 head
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
LockSupport.unpark(p.waiter); // 唤醒 p 中期待的线程
return LinkedTransferQueue.<E>cast(item);
}
}
// 来到这里,阐明节点 p 是已匹配节点,无奈与入参节点匹配,须要持续遍历 p 的下一个节点
Node n = p.next;
// 若 p != n 则持续遍历下一个节点;若 p == n 阐明 p 曾经出队,这种状况是其余线程批改了 head 导致的,须要取新的 head 从新开始遍历
p = (p != n) ? n : (h = head); // Use head if p offlist
}
if (how != NOW) { // No matches available // 来到这里,阐明没有匹配胜利,则依照 4 种模式的规定入队。if (s == null)
s = new Node(e, haveData);
Node pred = tryAppend(s, haveData); // 尝试入队
if (pred == null)
continue retry; // lost race vs opposite mode // 入队失败,重试
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos); // 阻塞期待,直到匹配或超时
}
return e; // not waiting
}
}
根本思维:
- 尝试匹配已存在的节点(xfer)。
- 尝试退出新节点(tryAppend)。
- 期待匹配或勾销(awaitMatch)。
代码流程:
- 从 head 节点开始遍历链表,寻找第一个未匹配节点 p。
- 在寻找未匹配节点 p 的过程中,如果遇到自连贯节点(p.next == p),阐明其余线程批改了 head,须要取新的 head 从新开始遍历。
- 如果找到了未匹配节点(item 非自连贯且无批改),则与以后节点(即入参的数据)进行比照,查看是否匹配(节点模式相同则匹配)。
- 如果匹配胜利,则将 p.item 设为以后节点的数据,并唤醒阻塞在节点 p 上的线程。
- 匹配胜利之后,如果节点 p 不是头节点,阐明队列头部存在多个已匹配节点,须要更新 head 将它们出队。
- 匹配胜利之后,如果节点 p 是头节点,则不更新 head,让节点 p 保留在队列中。
- 如果无奈匹配,则按 4 种模式规定入队,期待被匹配。
4.2 匹配过程图示
匹配开始时,节点 p 和 h 均处于 head 的地位,假如此时链表的快照如下:
head/p/h
|
v
M -> M -> U -> U -> U -> U
应用 p = p.next 遍历链表,当节点 p 匹配胜利时,此时队列如下(U2M 示意节点从未匹配变成已匹配):
h p
| |
v v
M -> M -> U2M -> U -> U -> U
因为 p != h,此时须要从新设置 head,分为两种状况:
状况一
如果在此期间没有其余线程批改过 head,阐明当前情况如下:
head/h p
| |
v v
M -> M -> M -> U -> U -> U
通过 casHead 将 p.next 设为新的 head,并对旧的头节点 h 执行 forgetNext 设为自连贯,从原链表中断开。
h p head
| | |
v v v
M -> M M -> U -> U -> U
因为节点 p 是不可达的,会被 GC 回收,最终已匹配的节点都会从链表中革除。
head
|
v
U -> U -> U
状况二
如果在此期间其余线程批改过 head,那么 casHead 之前须要先获取最新的 head,再判断是否进一步重设 head。
如果获取最新的 head 之后,head.next 为已匹配节点,须要从 head.next 开始从新遍历再一次设置 head。
4.3 tryAppend
在 xfer 办法中,如果在队列头部匹配失败,则会依照 4 中规定从队列尾部入队:
- NOW:不尝试入队,间接返回。
- SYNC:尝试入队,若入队胜利则阻塞,期待被匹配直到胜利。
- TIMED:尝试入队,若入队胜利则阻塞,期待被匹配直到超时。
- ASYNC:尝试入队,若入队胜利则返回,不期待被匹配。
java.util.concurrent.LinkedTransferQueue#xfer
if (how != NOW) { // No matches available
if (s == null)
s = new Node(e, haveData);
Node pred = tryAppend(s, haveData); // 尝试入队
if (pred == null) // 若入队失败,从新从头部匹配
continue retry; // lost race vs opposite mode
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
节点 s 尝试从队列尾部入队的办法:
java.util.concurrent.LinkedTransferQueue#tryAppend
/**
* Tries to append node s as tail. // 将节点 s 作为尾节点入队
*
* @param s the node to append
* @param haveData true if appending in data mode
* @return null on failure due to losing race with append in
* different mode, else s's predecessor, or s itself if no
* predecessor
*/
private Node tryAppend(Node s, boolean haveData) {for (Node t = tail, p = t;;) { // move p to last node and append
Node n, u; // temps for reads of next & tail
if (p == null && (p = head) == null) {if (casHead(null, s)) // 队列为空,将 s 作为头节点。留神,这里插入第一个元素的时候 tail 指针并没有指向 s
return s; // initialize
}
else if (p.cannotPrecede(haveData))
return null; // lost race vs opposite mode // 节点 p 之后无奈连贯节点,返回 null(p 与 s 匹配,不须要将 s 入队)else if ((n = p.next) != null) // not last; keep traversing // 节点 p 不是尾节点(因为 tail 并不严格指向尾节点),需持续遍历 p.next
// 如果节点 p 与 t 不等,且 t 不是 tail 节点(阐明其余线程批改了 tail,不用遍历 p.next 了),则取新的 tail 赋值给 p 和 t,从新从 tail 节点开始遍历
// 否则(尝试遍历 p.next):1. 如果 p 与 p.next 不等,从 p.next 持续遍历;2. 如果 p 与 p.next 相等,则设 p 为空(阐明队列为空,后续会将 s 作为头节点)p = p != t && t != (u = tail) ? (t = u) : // stale tail
(p != n) ? n : null; // restart if off list
else if (!p.casNext(null, s))// 进入这里,阐明 p 是尾节点。若 CAS 失败,阐明其余线程在 p 后加了节点,需持续遍历 p.next
p = p.next; // re-read on CAS failure
else { // 进入这里,阐明 p 是尾节点,且 CAS 将 s 设为 p.next 胜利。if (p != t) { // update if slack now >= 2 // p != t 阐明松弛度大于等于 2,须要从新设置 tail 节点
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t);
}
return p;
}
}
}
代码流程:
- 从 tail 节点开始遍历,游标节点为 p,目标是找到尾节点入队。
- 如果发现队列为空,则将节点 s 作为 head 头节点,返回节点 s。
- 如果发现节点 p 之后无奈连贯节点(节点 p 与 s 匹配,不须要将 s 入队),返回 null(后续重试从队首匹配)。
- 遍历过程需时刻关注 tail 是否发生变化,若发生变化了则从新从无效节点(新的 tail 或 head)遍历链表,否则持续遍历 p.next。
- 如果遍历找到了尾节点(p.next == null),则将节点 s 追加在尾节点 p 前面,接着判断是否通过 casTail 更新 tail 节点,最初返回旧的尾节点 p。
4.4 awaitMatch
当节点 s 入队胜利之后,须要在队列之中自旋、期待被其余线程匹配。
java.util.concurrent.LinkedTransferQueue#awaitMatch
/**
* Spins/yields/blocks until node s is matched or caller gives up.
*
* @param s the waiting node // 以后节点,处于期待之中
* @param pred the predecessor of s, or s itself if it has no // 以后节点的上一个节点,若为 s 阐明没有上一个节点,若为 null 则是未知的(作为预留)* predecessor, or null if unknown (the null case does not occur
* in any current calls but may in possible future extensions)
* @param e the comparison value for checking match // 以后节点的数据域值
* @param timed if true, wait only until timeout elapses // 是否期待超时
* @param nanos timeout in nanosecs, used only if timed is true // 超时工夫
* @return matched item, or e if unmatched on interrupt or timeout
*/
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {final long deadline = timed ? System.nanoTime() + nanos : 0L; // 到期工夫
Thread w = Thread.currentThread(); // 以后线程,即操作节点 s 的线程
int spins = -1; // initialized after first item and cancel checks // 自旋次数
ThreadLocalRandom randomYields = null; // bound if needed // 随机数,随机让一些自旋的线程让出 CPU
for (;;) {
Object item = s.item;
if (item != e) { // matched // 节点 s 的数据域 item 前后不相等,阐明节点 s 曾经被其余线程匹配了
// assert item != s;
s.forgetContents(); // avoid garbage // s 设置 item 自连贯,waiter 为 null,这里没有使 s =s.next,s 仍挂在链表上
return LinkedTransferQueue.<E>cast(item);
}
if ((w.isInterrupted() || (timed && nanos <= 0)) && // 以后线程被勾销,或者已超时
s.casItem(e, s)) { // cancel // 须要把节点 s 勾销掉,设置 item 自连贯
unsplice(pred, s); // 把 s 跟它的上一个节点 pred 断开连接
return e;
}
if (spins < 0) { // establish spins at/near front // 初始化 spins
if ((spins = spinsFor(pred, s.isData)) > 0)
randomYields = ThreadLocalRandom.current();}
else if (spins > 0) { // spin // 自减,随机让出 CPU
--spins;
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); // occasionally yield
// 让出 CPU 工夫片,若又抢夺到了 CPU 工夫片则继续执行,否则期待直到再次取得 CPU 工夫片(由其余线程让出)}
else if (s.waiter == null) { // 当 spin == 0 时才会执行这里及前面的逻辑
s.waiter = w; // request unpark then recheck // 自旋完结了,设置 s.waiter
}
else if (timed) { // 如果有超时,计算超时工夫,并阻塞肯定工夫
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
}
else { // 不是超时的,间接阻塞,期待被唤醒
LockSupport.park(this);
}
}
}
在期待匹配的自旋过程中,执行以下逻辑:
- 若查看到以后线程的节点 s 已匹配胜利(其余线程的批改使 s.item 发生变化),则设置 s.item 自连贯,返回匹配到的数据(节点 s 仍保留在链表上)。
- 若查看以后线程产生了中断、超时,则设置 s.item 自连贯,并尝试把 s 从链表上断开,返回节点 s 原有的数据 e。
- 若以后线程未设置自旋次数,则初始化自旋次数。
- 若以后线程的自旋次数大于零,则自减并随机让出 CPU 工夫片,待从新取得 CPU 工夫片后,持续自旋。
- 若以后线程的自旋次数等于零,如果设置了超时工夫,且还有剩余时间,则阻塞指定的工夫。
- 若以后线程的自旋次数等于零,且没有设置超时工夫,则进入阻塞,期待被唤醒(被匹配或中断)。
期待匹配的过程中,首先进行自旋,等自旋次数归零后,再判断是否进入阻塞,是因为线程的阻塞和唤醒操作波及到零碎内核态与用户态之间的切换,比拟耗时。
4.5 自旋次数
在 LinkedTransferQueue#awaitMatch 中,节点在队列之中自旋和阻塞之前,首先须要获取自旋的次数。
if (spins < 0) {if ((spins = spinsFor(pred, s.isData)) > 0)
randomYields = ThreadLocalRandom.current();}
与自旋次数相干的属性:
/** True if on multiprocessor */
private static final boolean MP =
Runtime.getRuntime().availableProcessors() > 1;
/**
* The number of times to spin (with randomly interspersed calls
* to Thread.yield) on multiprocessor before blocking when a node
* is apparently the first waiter in the queue. See above for
* explanation. Must be a power of two. The value is empirically
* derived -- it works pretty well across a variety of processors,
* numbers of CPUs, and OSes.
*/
// 当一个节点是队列中的第一个 waiter 时,在多处理器上进行自旋的次数(随机交叉调用 thread.yield)
private static final int FRONT_SPINS = 1 << 7;
/**
* The number of times to spin before blocking when a node is
* preceded by another node that is apparently spinning. Also
* serves as an increment to FRONT_SPINS on phase changes, and as
* base average frequency for yielding during spins. Must be a
* power of two.
*/
// 以后继节点正在解决,以后节点在阻塞之前的自旋次数。private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
与自旋次数相干的办法:
java.util.concurrent.LinkedTransferQueue#spinsFor
/**
* Returns spin/yield value for a node with given predecessor and
* data mode. See above for explanation.
*/
private static int spinsFor(Node pred, boolean haveData) {if (MP && pred != null) {if (pred.isData != haveData) // phase change
return FRONT_SPINS + CHAINED_SPINS;
if (pred.isMatched()) // probably at front
return FRONT_SPINS;
if (pred.waiter == null) // pred apparently spinning
return CHAINED_SPINS;
}
return 0;
}
须要留神的是,以后节点 s 在队列之中开始自旋之前,它的上一个的节点 pred 可能也正处于自旋之中。
初始化节点 s 自旋次数的流程:
- 如果是多核处理器且上一个的节点 pred 不为空,才计算自旋次数,否则不进行自旋。
- 以后节点与上一个节点的模式不同,且上一个节点已匹配(pred.isData != s.isData,模式不同然而 pred 与 s 无奈匹配,阐明 pred 已匹配),则以后节点自旋次数:FRONT_SPINS + CHAINED_SPINS(自旋最屡次)
- 以后节点与上一个节点的模式雷同,且上一个节点已匹配(非自旋非阻塞),则以后节点自旋次数:FRONT_SPINS
- 以后节点与上一个节点的模式雷同,且上一个节点还在自旋当中,则以后节点自旋次数:CHAINED_SPINS(自旋起码次)
对于自旋次数多少的设计,我的了解是,节点被匹配的可能性越高,则自旋次数越多;被匹配的可能性越低,则自旋次数越少,尽早进入阻塞免得 CPU 空转。
而位于队列头部的节点是越有可能被匹配的,须要自旋最屡次。
4.6 unsplice
在 LinkedTransferQueue#awaitMatch 中,节点在队列之中期待匹配时,如果检测到线程中断或已超时,须要勾销匹配并将节点从链表中断开。
if ((w.isInterrupted() || (timed && nanos <= 0)) &&
s.casItem(e, s)) {unsplice(pred, s);
return e;
}
此外,LinkedTransferQueue 中移除节点也会调用到 LinkedTransferQueue#unsplice 办法。
与革除节点相干的属性:
/** The number of apparent failures to unsplice removed nodes */
// 从链表上革除节点失败的次数
private transient volatile int sweepVotes;
/**
* The maximum number of estimated removal failures (sweepVotes)
* to tolerate before sweeping through the queue unlinking
* cancelled nodes that were not unlinked upon initial
* removal. See above for explanation. The value must be at least
* two to avoid useless sweeps when removing trailing nodes.
*/
// sweepVotes 的阀值,达到该阈值才清理有效节点
static final int SWEEP_THRESHOLD = 32;
革除指定节点的办法:
java.util.concurrent.LinkedTransferQueue#unsplice
/**
* Unsplices (now or later) the given deleted/cancelled node with
* the given predecessor.
*
* @param pred a node that was at one time known to be the
* predecessor of s, or null or s itself if s is/was at head
* @param s the node to be unspliced
*/
// 把 s 跟它的上一个节点 pred 断开连接(立刻断开,或者隔段时间再断开)final void unsplice(Node pred, Node s) {s.forgetContents(); // forget unneeded fields // s 设置 item 自连贯,waiter 为 null
if (pred != null && pred != s && pred.next == s) { // 校验 pred 是否是 s 的上一个节点
// s 是尾节点,或者 (s 不是尾节点,s 未出队,pred 批改下一个节点为 s.next 胜利,pred 曾经被匹配)
Node n = s.next;
if (n == null || (n != s && pred.casNext(s, n) && pred.isMatched())) {
// 自旋的目标是:设置新的头节点,把旧的头节点出队
for (;;) { // check if at, or could be, head
// pred 或 s 为头节点,或者头节点为空,间接返回,不须要解决 sweepVotes
Node h = head;
if (h == pred || h == s || h == null)
return; // at head or list empty
// 头节点未被匹配,跳出循环(阐明 head 是精确的,后续须要解决 sweepVotes)if (!h.isMatched())
break;
// head 曾经被匹配了,且 head.next 为空,阐明当初队列为空了,间接返回,不须要解决 sweepVotes
Node hn = h.next;
if (hn == null)
return; // now empty
// 走到这里,阐明 head 曾经被匹配了,但 head.next 不为空。// 若 head 不是自连贯,尝试将 head.next 设置为新的头节点
if (hn != h && casHead(h, hn))
h.forgetNext(); // advance head // 旧的头节点设为自连贯,示意出队
// 后续持续从新的头节点遍历,把已匹配的节点出队,重设头节点
}
// 进入这里,阐明 1. s 可能是尾节点 2. pred、s 均不为头节点 3. 队列不为空
if (pred.next != pred && s.next != s) { // recheck if offlist // 再一次校验 pred 和 s 是否未出队
for (;;) { // sweep now if enough votes
int v = sweepVotes;
if (v < SWEEP_THRESHOLD) { // 阈值为 32
if (casSweepVotes(v, v + 1))
break;
}
else if (casSweepVotes(v, 0)) {sweep(); // 达到阀值,进行“大扫除”,革除队列中的有效节点
break;
}
}
}
}
}
}
代码流程:
- 首先把节点 s 设置 item 自连贯,设置 waiter 为 null。
- 如果节点 s 是尾节点,或者 pred 已出队,则一直开 pred 与 s 的连贯,否则断开。
- 从 head 至 pred/s 遍历链表,将已匹配的节点出队(next 自连贯),重设头节点。
- 重设头节点过后,如果 pred/s 都还在队列中且均不为头节点,则通过累加 sweepVotes 进行大扫除。
为什么要累计 sweepVotes 进行大扫除,官网的解释:
除了通过节点自连贯(self-linking)来不便垃圾回收以外,还须要在链表上解开对有效节点的连贯。
一般来说,如果想要在链表上移除节点 s,只须要把 s 的上一个节点的 next 属性改掉即可。
然而用这种形式来让节点 s 不可达,在以下两种场景下是无奈保障的:
- 如果节点 s 是链表上的尾节点,其余节点利用它来入链,此时只有当其余节点追加到节点 s 之后,才可能移除节点 s。
- 如果节点 s 的上一个节点也是有效节点(已匹配或已勾销),此时也无需将节点 s 和它断开。
这样会造成一种结果:当队列头部是一个阻塞的 take 申请节点,该节点之后是大量的超时工夫很短的 poll 申请节点,一旦过了超时工夫,队列中就会呈现大量可达的有效节点。
(把这种状况代入 unsplice,大部分节点都能够通过 pred.casNext 移除,蛊惑)
鉴于此,须要在一段时间之后遍历整个链表,把已匹配的节点清理出队。
/**
* Unlinks matched (typically cancelled) nodes encountered in a
* traversal from head.
*/
private void sweep() {for (Node p = head, s, n; p != null && (s = p.next) != null; ) { // 初始时,p = head,s = p.next
if (!s.isMatched())
// Unmatched nodes are never self-linked // 未匹配的节点不会是自连贯的!p = s; // 如果节点 s 未匹配,则让 p = p.next 持续遍历下一个节点
else if ((n = s.next) == null) // trailing node is pinned // 遍历完结,跳出循环
break;
else if (s == n) // stale // s 自连贯,阐明 s 已出队,以后是在废除的链表上遍历,须要从新从 head 开始遍历
// No need to also check for p == s, since that implies s == n
p = head;
else
// 进入这里,阐明节点 s 是已匹配的,不是尾节点,不是自连贯。// 此时将节点 s 出队,尽管没有使 s 自连贯,s 不可达会被回收(留神 s.item 会不会被回收则取决于 item 本身是否可达)p.casNext(s, n);
}
}
5. 容量
size 办法遍历队列中全副的数据节点,并进行计数,最大值为 Integer.MAX_VALUE。
当遍历过程中其余线程批改了 head,须要取新的 head 从新开始遍历。
size 办法并不是一个恒定时长的操作。
java.util.concurrent.LinkedTransferQueue#size
public int size() {return countOfMode(true);
}
/**
* Traverses and counts unmatched nodes of the given mode.
* Used by methods size and getWaitingConsumerCount.
*/
private int countOfMode(boolean data) {
int count = 0;
for (Node p = head; p != null;) {if (!p.isMatched()) {if (p.isData != data)
return 0;
if (++count == Integer.MAX_VALUE) // saturated
break;
}
Node n = p.next;
if (n != p) // 节点 p 未出队,持续遍历 p.next
p = n;
else { // 节点 p 已出队,取新的 head 从新开始遍历
count = 0;
p = head;
}
}
return count;
}
6. 总结
- LinkedTransferQueue 是一个无界阻塞队列,采纳 CAS 和自旋来保障线程平安。
- 具备 SynchronousQueue 的个性,实用于生产者间接传递数据给消费者的场景。
- 借鉴 ConcurrentLinkedQueue 的涣散构造,每次操作队列都不及时更新 head/tail,升高 CAS 资源损耗。
- 采纳双重队列作为底层构造,申请数据应用申请节点,提供数据应用数据节点,模式相同的节点能够相互匹配。
- 遵循 FIFO 规定,存取操作都先跟队列头部的节点比拟,若匹配则出队,否则从队列尾部入队。
- 对于是否入队及阻塞定义了四种模式:NOW、ASYNC、SYNC、TIMED。
- 入队之后,先自旋肯定次数后再应用 LockSupport 来阻塞线程,期待匹配。
作者:Sumkor
链接:https://segmentfault.com/a/11…