在 JDK8 的阻塞队列实现中还有两个未进行说明,今天继续对其中的一个阻塞队列 LinkedTransferQueue 进行源码分析,如果之前的队列分析已经让你对阻塞队列有了一定的了解,相信本文要讲解的 LinkedTransferQueue 的源码也能很快被理解,接下来一起学习吧
前言
JDK 版本号:1.8.0_171
LinkedTransferQueue 是基于链表的 FIFO 无界阻塞队列,在源码分析前,需要提前对源码实现整体有个印象,便于细节的理解。注释部分对于这个类进行了一些说明和介绍,如果有能力的话可以阅读理解,对于其中的部分这里进行简单说明:
LinkedTransferQueue 使用了松弛型双重队列,双重的意思可以理解为两种类型的节点(请求数据的消费者和生产数据的生产者),也就是说队列中保存了这两种类型的节点,理解上要稍微复杂些,其实之前 SynchronousQueue 中就使用了类似的队列,队列维护了两个指针:head 指向第一个匹配节点 (M)(如果为空则为空);tail 指向队列中的最后一个节点(如果为空则为空)
在双重队列中为了减少 CAS 的开销,加入了 Slack(松弛度)的处理方式,在节点被匹配(被删除)之后,不会立即更新 head/tail,而是当 head/tail 节点和最近一个未匹配的节点之间的距离超过一个阈值之后才会更新,在 LinkedTransferQueue 中松弛度值设置为 2,这是一个经验值,不多深究。同时为了避免匹配节点在队列中的堆积,在 CAS 更新 head 时,会把已匹配的 head 的 next 引用指向自己。当我们进行遍历时,遇到这种节点,表示当前线程已经落后于其他线程,需要重新获取 head 来进行遍历
其与其他阻塞队列不同之处在于,LinkedTransferQueue 允许消费者线程获取元素时,如果未请求到数据,则可以生成一个数据节点(节点 item 为 null)入队,然后消费者线程在这个节点线程上等待,直到之后生产者线程入队时发现有一个 item 为 null 的数据节点,生产者线程就不再进行入队操作了,直接就将元素填充到该节点的 item,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用处返回,生产者同样直接返回
类定义
实现 TransferQueue 接口中的方法是 LinkedTransferQueue 操作的核心部分
public class LinkedTransferQueue<E> extends AbstractQueue<E>
implements TransferQueue<E>, java.io.Serializable
TransferQueue
先了解其中的每个方法的含义,便于下面源码实现的理解
- tryTransfer(E):立即匹配节点返回。立刻去匹配一个请求节点(消费数据的消费者线程),如果未匹配上,则直接返回 false,注意理解未匹配上时也不会入队数据节点
- transfer(E):匹配请求节点,未匹配上则进行等待直到匹配
- tryTransfer(E,long,TimeUnit):匹配请求节点,未匹配上则阻塞等待指定时间,还未匹配上则返回 false,超时未匹配上时同 tryTransfer(E) 一样不会入队数据节点
- hasWaitingConsumer():判断当前是否有等待的消费者
- getWaitingConsumerCount():获取等待的消费者的估计值
常量 / 变量
/**
* 如果机器为多处理器则为 true,MP 为 multiprocessor 缩写
*/
private static final boolean MP =
Runtime.getRuntime().availableProcessors() > 1;
/**
* 节点自旋等待的次数 128
*/
private static final int FRONT_SPINS = 1 << 7;
/**
* 当前驱节点在处理,当前节点自旋等待的次数 64
*/
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
/**
* sweepVotes 的阈值,达到这个阈值上限则进行一次清理操作
*/
static final int SWEEP_THRESHOLD = 32;
/** 队列头节点 */
transient volatile Node head;
/** 队列尾节点 */
private transient volatile Node tail;
/** 解除删除节点关联失败的次数 */
private transient volatile int sweepVotes;
/*
* xfer 方法 how 参数可能的取值类型
* 队列操作统一方法根据类型进行不同的处理
*/
/** poll 和 tryTransfer 使用 */
private static final int NOW = 0; // for untimed poll, tryTransfer
/** offer, put, add 方法使用 */
private static final int ASYNC = 1; // for offer, put, add
/** transfer, take 方法使用 */
private static final int SYNC = 2; // for transfer, take
/** 超时等待的 poll 和 tryTransfer 使用 */
private static final int TIMED = 3; // for timed poll, tryTransfer
上述参数中对 xfer 参数类型进行详细说明:
- NOW 表示无需等待直接返回结果,用于需要立即返回匹配节点的生产者(tryTransfer)和消费者(poll)
- ASYNC 异步返回,用于立即将生产者的数据节点入队,异步等待消费者来消费(offer, put, add)
- SYNC 同步返回,等待直到匹配节点出现返回,用于消费者等待生产者的数据节点(take)或生产者生产数据等待消费者的消费(transfer)
- TIMED 超时返回,等待超时时间后返回匹配结果(poll, tryTransfer)
内部类
LinkedTransferQueue 中链表的节点实现 Node 与 SynchronousQueue 中的实现类似,需要注意的是当节点已经匹配或被取消时我们必然需要将节点离队,通过 forgetNext 和 forgetContents 来将节点排除队列匹配操作
/**
* 队列 Node 实现
* CAS 更新 Node 成员变量
*/
static final class Node {
// 数据节点和请求节点类型区分标识
final boolean isData; // false if this is a request node
// 数据节点保存数据,请求节点为 null
volatile Object item; // initially non-null if isData; CASed to match
// 指向队列中下一个节点
volatile Node next;
// 当前节点对应的等待线程
volatile Thread waiter; // null until waiting
// CAS methods for fields
final boolean casNext(Node cmp, Node val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
final boolean casItem(Object cmp, Object val) {// assert cmp == null || cmp.getClass() != Node.class;
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
/**
* 构造方法需传入的参数
*/
Node(Object item, boolean isData) {UNSAFE.putObject(this, itemOffset, item); // relaxed write
this.isData = isData;
}
/**
* 将 next 指向自己,避免无用 Node 过长影响垃圾回收
* 在 cas 更新 head 后调用
*/
final void forgetNext() {UNSAFE.putObject(this, nextOffset, this);
}
/**
* 匹配或者节点被取消的时候被调用,设置 item 指向自己,waiter 为 null
*/
final void forgetContents() {UNSAFE.putObject(this, itemOffset, this);
UNSAFE.putObject(this, waiterOffset, null);
}
/**
* 如果此节点已匹配或者是被取消匹配的节点,则返回 true
* x == this 调用了 forgetContents
* (x == null) == isData 表示请求节点匹配了数据节点(请求节点的 item 更新为数据节点的数据)* 或者数据节点匹配了请求节点(数据节点的 item 更新为 null)*/
final boolean isMatched() {
Object x = item;
return (x == this) || ((x == null) == isData);
}
/**
* 当前节点是否是未匹配的请求节点
* !isData 请求节点
* item == null 还未匹配被更新
*/
final boolean isUnmatchedRequest() {return !isData && item == null;}
/**
* 能否将指定的节点 node(haveData 类型)追加到当前节点后。如果 node 节点属性与当前节点相反,且当前节点还未进行匹配则不能追加
*/
final boolean cannotPrecede(boolean haveData) {
boolean d = isData;
Object x;
return d != haveData && (x = item) != this && (x != null) == d;
}
/**
* 尝试人为匹配数据节点,匹配成功返回 true,设置 item 为 null(不用再匹配了)
* 相当于移除当前数据节点,用在 remove 方法中
*/
final boolean tryMatchData() {
// assert isData;
Object x = item;
// item 非空且未指向自己则表示当前节点为还未匹配的数据节点
// 之后尝试将 item 置为 null 同时唤醒等待的线程
if (x != null && x != this && casItem(x, null)) {LockSupport.unpark(waiter);
return true;
}
return false;
}
private static final long serialVersionUID = -3375979862319811754L;
// Unsafe mechanics
// CAS 操作
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
private static final long waiterOffset;
static {
try {UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
waiterOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiter"));
} catch (Exception e) {throw new Error(e);
}
}
}
构造方法
构造方法比较容易理解,addAll 最终循环调用 add 方法一个一个进行添加
public LinkedTransferQueue() {}
public LinkedTransferQueue(Collection<? extends E> c) {this();
addAll(c);
}
重要方法
put/offer/add
put,offer,add 都是调用 xfer(e, true, ASYNC, 0)
,需要注意,offer 设置超时的那个方法没用,使用时需要注意!ASYNC 表示异步操作,相当于这些方法执行后直接入队元素然后结束不会像 SynchronousQueue 队列那样阻塞等待匹配元素出现
public void put(E e) {xfer(e, true, ASYNC, 0);
}
public boolean offer(E e, long timeout, TimeUnit unit) {xfer(e, true, ASYNC, 0);
return true;
}
public boolean offer(E e) {xfer(e, true, ASYNC, 0);
return true;
}
public boolean add(E e) {xfer(e, true, ASYNC, 0);
return true;
}
transfer/tryTransfer
当我们需要不同的队列入队操作时,根据需要使用下列方法
// 由于中断操作导致失败会抛错
public void transfer(E e) throws InterruptedException {if (xfer(e, true, SYNC, 0) != null) {Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();}
}
// 立刻尝试匹配返回,不进行任何等待操作,xfer 源码部分有判断这个标识
public boolean tryTransfer(E e) {return xfer(e, true, NOW, 0) == null;
}
// 尝试匹配未匹配等待超时时间才返回,如被中断则抛错
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();}
take/poll
出队操作,请求数据节点,这里 xfer 方法的参数也能看出其使用方法的不同,take 方法获取不到对应的匹配节点会阻塞操作,而 poll 方法在未设置超时时间时以 NOW 模式,相当于直接获取数据,不管有没有都会直接返回结果,不会进行阻塞等待
public E take() throws InterruptedException {E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {E e = xfer(null, false, TIMED, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();}
public E poll() {return xfer(null, false, NOW, 0);
}
xfer
LinkedTransferQueue 核心方法,所有的操作最终都通过 xfer 实现,通过 how 参数的不同进行不同的处理,在匹配上时判断当前 head 的 slack 阈值,如果达到上限则进行 head 更新
/**
* @param e 数据节点(e 非空)或者请求节点(e 为 null)* @param haveData 数据节点为 true,请求节点为 false
* @param how NOW, ASYNC, SYNC, or TIMED 4 种类型,上面已经介绍过
* @param nanos TIMED 模式下设置的超时时间
* @return 节点匹配上则返回对应的匹配项否则传入的参数 e
* @throws
*/
private E xfer(E e, boolean haveData, int how, long nanos) {
// 校验节点
if (haveData && (e == null))
throw new NullPointerException();
// 假如需要的话,这里先声明待添加的节点
Node s = null; // the node to append, if needed
// 标签 continue 跳转
retry:
for (;;) { // restart on append race
// 从头节点开始尝试匹配
for (Node h = head, p = h; p != null;) { // find & match first node
boolean isData = p.isData;
Object item = p.item;
// item != p 表示 p 节点 item 未指向自己,未执行 forgetContents,未被取消或匹配
// (item != null) == isData 表示 p 是一个还未匹配的数据节点或请求节点
// 不满足条件可能需要执行后面逻辑
if (item != p && (item != null) == isData) { // unmatched
// 相同节点类型,说明和队列中所有节点相同类型,无需匹配,跳出这个循环根据类型继续接下来的操作
if (isData == haveData) // can't match
break;
// 执行到这说明 p 节点还未匹配上且与当前节点是相异类型,cas 更新 item 成功则表示匹配上了
// 注意这里只更新了 head 指向的节点,因为本次线程的 e 节点到这里还未入队
// 这里将 p 的 item 指向为对应操作的节点 e,表示 p 对应的节点已经与此次的 e 匹配上了
// 如果未更新成功,说明 p 已经被其他人匹配上,执行后面逻辑继续循环
if (p.casItem(item, e)) { // match
// p 当前已经不是指向 h 了,说明 p 已经被循环 next 更新过了
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
// h 依旧指向头节点尝试更新 head 指向
// 松弛度等于 2 则更新 head,h->q->n
if (head == h && casHead(h, n == null ? q : n)) {
// 将 h 的 next 更新方便回收
h.forgetNext();
break;
} // advance and retry
// head 为空或者 head 的 next 节为空或者 head 的 next 节点未被匹配或取消
// 此时跳出循环,slack 较小不需要更新 head
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
// p 已经与 e 匹配上了,唤醒 p 节点对应的等待线程
LockSupport.unpark(p.waiter);
// 转换类型返回
return LinkedTransferQueue.<E>cast(item);
}
}
// 已被其他线程匹配则遍历下一个节点
Node n = p.next;
// p == n 即 p == p.next 执行了 forgetNext
// 说明头节点指向已经更新了,p 节点已经离队需要重新从头开始匹配
p = (p != n) ? n : (h = head); // Use head if p offlist
}
// 循环队列未匹配上同时为非 NOW 模式,NOW 则直接返回入参 e
if (how != NOW) { // No matches available
// s 为空则初始化 s 节点
if (s == null)
s = new Node(e, haveData);
// 尝试添加节点 s 到队列尾部
Node pred = tryAppend(s, haveData);
// null 表示当前有匹配的节点了,从 retry 开始重新开始判断处理
// 在后面的方法中会分析 tryAppend
if (pred == null)
continue retry; // lost race vs opposite mode
// 执行到这里说明 pred 非 null,s 添加到队列中了,pred 代表的是 s 的前驱节点或者 s 本身
// 处理 SYNC/TIMED 模式
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
为了方便理解,这里通过流程图显示 xfer 的操作流程:
tryAppend
尝试将 s 节点添加到队列尾部,同时当 tail 的 slack 达到阈值时则更新 tail 指向,不同的返回值对应不同的处理过程,查看 xfer 源码上的处理,返回值含义如下:
- s 更新头节点返回 s 本身
- s 添加到尾节点则返回 s 的前驱节点
- 有可匹配节点则返回 null 不需再添加到尾部,应回到上层处理(xfer 中处理了,重新匹配)
/**
*
* @param s 添加到队列的节点元素
* @param haveData 数据节点入队为 true,请求节点入队为 false
*/
private Node tryAppend(Node s, boolean haveData) {
// p 指向尾结点
for (Node t = tail, p = t;;) { // move p to last node and append
Node n, u; // temps for reads of next & tail
// 头尾节点为空则表示队列为空
if (p == null && (p = head) == null) {
// 队列为空时尝试更新头节点为 s 即可,失败重新循环处理
if (casHead(null, s))
return s; // initialize
}
// 队列非空同时添加的 s 节点与 p 节点数据类型不同表示两者可以匹配则返回 null 进行标记处理(xfer 中使用到了)else if (p.cannotPrecede(haveData))
return null; // lost race vs opposite mode
// p 节点非当前尾节点(可能被其他线程更新了 tail)else if ((n = p.next) != null) // not last; keep traversing
// 满足条件更新 t 指向 tail,p 指向 t 重新循环开始
p = p != t && t != (u = tail) ? (t = u) : // stale tail
// 在队列更新 p = p.next 重新开始循环或离队状态则置 p 为 null 循环从头节点开始
(p != n) ? n : null; // restart if off list
// p 为当前尾结点,尝试更新 p 的 next 指向 s 失败则更新 p 指向 p 的 next
// 失败说明别的线程更新了 p 的 next,此时更新 p 重新循环执行
else if (!p.casNext(null, s))
p = p.next; // re-read on CAS failure
else {
// 执行到这表明更新 p 的 next 为 s 成功
// p 和 t 已经不同了,p 可能循环了几次才成功更新 next,t 还是之前的指向,需要更新
// p != t 为真时 slack >=2
// 如果 t = p 更新 next 为 s 成功,则 slack = 1, 这个条件不会进去
// t != p,整个节点关联为...->t->...->p->s,t 到 s 距离 >= 2
if (p != t) { // update if slack now >= 2
// 更新 tail, 不停的判断是否是 tail,不是则持续向前直到尾部节点,然后更新 tail 退出 while
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t);
}
// 返回添加节点的前驱节点 p
return p;
}
}
}
awaitMatch
xfer 中处理 SYNC/TIMED 模式时调用,处理阻塞等待和超时等待匹配节点方式,参考上面 xfer 中调用的地方,其中的入参说明如下:
- s:阻塞等待的节点
- pred:s 的前驱节点或者 s 自己,null 的情况已经被处理掉了,参考 xfer 方法已说明过
- e:对应的比较的数据,s 的 item
- timed:是否进行超时时间的设置
- nanos:设置超时时间
注意返回值的含义,匹配上返回对应匹配节点的 item,如未匹配上,中断或者超时则返回入参 e
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
// 计算过期时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
// 初始化
int spins = -1; // initialized after first item and cancel checks
// 使用 ThreadLocalRandom 产生并发随机数
ThreadLocalRandom randomYields = null; // bound if needed
for (;;) {
Object item = s.item;
// item != e 说明 s 的 item 已经被更新了,表示已经与其他节点匹配上了
// item 更新成对应匹配节点的 item,参考 xfer 匹配节点过程理解
if (item != e) { // matched
// assert item != s;
// 已经被匹配上了,将当前节点 forgetContents,避免垃圾堆积
s.forgetContents(); // avoid garbage
// 类型转化返回结束
return LinkedTransferQueue.<E>cast(item);
}
// 还未被匹配,先判断当前线程是否被中断或者超时
// 第一个条件为 true 时,s 节点尝试更新 item 指向自己(取消操作,这里 s 是本次操作的节点,取消了就不用再继续处理了)if ((w.isInterrupted() || (timed && nanos <= 0)) &&
s.casItem(e, s)) { // cancel
// 将 s 和其前驱节点解除关联
unsplice(pred, s);
return e;
}
// 到这里表明没匹配上同时也没被中断或超时
// 自旋次数设置,单核机器不会进行自旋
if (spins < 0) { // establish spins at/near front
// 计算 spins
if ((spins = spinsFor(pred, s.isData)) > 0)
// 调用 current 获取并发随机数产生类
randomYields = ThreadLocalRandom.current();}
// 自旋次数循环递减
else if (spins > 0) { // spin
--spins;
if (randomYields.nextInt(CHAINED_SPINS) == 0)
// 随机等于 0 时,当前线程让步,给其他线程执行机会
Thread.yield(); // occasionally yield}
// 执行到这已经进行过自旋了 spins = 0,说明暂时无匹配节点先保存当前线程
// 这里设置完了还继续循环处理
else if (s.waiter == null) {s.waiter = w; // request unpark then recheck}
// 设置超时则超时等待
else if (timed) {nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
}
// 阻塞等待当前线程直到被其他线程唤醒
else {LockSupport.park(this);
}
}
}
spinsFor
根据当前节点的前驱节点和当前节点的数据类型返回对应的自旋值,不同情况下返回不同的数值
private static int spinsFor(Node pred, boolean haveData) {
// 多 CPU,前驱节点非空才能自旋操作
if (MP && pred != null) {
// 前驱和当前节点类型不同则自旋 FRONT_SPINS + CHAINED_SPINS
if (pred.isData != haveData) // phase change
return FRONT_SPINS + CHAINED_SPINS;
// 前驱节点已经被匹配了,返回自旋 FRONT_SPINS 次数
if (pred.isMatched()) // probably at front
return FRONT_SPINS;
// 前驱等待线程为空,还没更新 waiter,说明前驱节点在自旋操作,返回 CHAINED_SPINS
if (pred.waiter == null) // pred apparently spinning
return CHAINED_SPINS;
}
return 0;
}
succ
返回 p 的后继节点,如果 p.next 指向 p(p 节点已经离队),则返回 head 头节点
final Node succ(Node p) {
Node next = p.next;
return (p == next) ? head : next;
}
firstOfMode/firstDataNode/firstDataItem/countOfMode
找到第一个未匹配节点,数据类型一致则返回节点,不一致则返回 null。hasWaitingConsumer 使用 firstOfMode 来进行了判断,firstDataNode,firstDataItem(peek 使用了)类似不详细进行说明了,countOfMode 计算对应类型节点的数量,源码也比较简单
private Node firstOfMode(boolean isData) {
// 从头开始进行循环判断
for (Node p = head; p != null; p = succ(p)) {
// 未匹配节点
if (!p.isMatched())
// 数据类型一致则返回 p,否则返回 null
return (p.isData == isData) ? p : null;
}
return null;
}
final Node firstDataNode() {for (Node p = head; p != null;) {
Object item = p.item;
if (p.isData) {if (item != null && item != p)
return p;
}
// 头节点未被匹配同时非数据节点则队列中此刻应该只有请求节点不需要再循环判断下去了
else if (item == null)
break;
if (p == (p = p.next))
p = head;
}
return null;
}
private E firstDataItem() {for (Node p = head; p != null; p = succ(p)) {
Object item = p.item;
if (p.isData) {if (item != null && item != p)
return LinkedTransferQueue.<E>cast(item);
}
else if (item == null)
return null;
}
return null;
}
private int countOfMode(boolean data) {
int count = 0;
for (Node p = head; p != null;) {if (!p.isMatched()) {if (p.isData != data)
return 0;
if (++count == Integer.MAX_VALUE) // saturated
break;
}
Node n = p.next;
if (n != p)
p = n;
else {
count = 0;
p = head;
}
}
return count;
}
unsplice
前驱节点与已删除或者取消状态的 s 节点取消连接,将两个节点取消关联
/**
*
* @param pred s 的前驱节点或者为 null 或者为 s 自己(当 s 为头节点时)* @param s 取消或删除的节点
*/
final void unsplice(Node pred, Node s) {
// 清理 s 节点变量
s.forgetContents(); // forget unneeded fields
// 确认 pred 的 next 指向 s 即两者之间还有关联才处理
if (pred != null && pred != s && pred.next == s) {
Node n = s.next;
// s 的 next 为空表示 s 为尾结点
// s 的后继非 s 且 pred 更新 next 成功且 pred 已被匹配,尝试解除 s 节点
if (n == null ||
(n != s && pred.casNext(s, n) && pred.isMatched())) {
// 检查是否是头节点并更新
for (;;) { // check if at, or could be, head
Node h = head;
// 头节点为前驱节点
// 头节点为 s 节点
// 头节点为空,则表示为空队列
if (h == pred || h == s || h == null)
return; // at head or list empty
// 头节点未被匹配则跳出循环
if (!h.isMatched())
break;
// 到这说明 h 已经被匹配,需要更新 head
Node hn = h.next;
// 头节点后继节点为空,验证队列为空
if (hn == null)
return; // now empty
// 头节点后继节点非头节点并且尝试更新头节点为后继节点
if (hn != h && casHead(h, hn))
// 清理原头节点
h.forgetNext(); // advance head}
// 解除前后节点链接失败则统计阈值处理
// 再次检查是否离队
if (pred.next != pred && s.next != s) { // recheck if offlist
// 根据 SWEEP_THRESHOLD 阈值进行判断处理
for (;;) { // sweep now if enough votes
int v = sweepVotes;
// 小于阈值则尝试将阈值加 1
if (v < SWEEP_THRESHOLD) {if (casSweepVotes(v, v + 1))
break;
}
// 大于等于阈值则将阈值归 0 同时通过 sweep 方法进行清理
else if (casSweepVotes(v, 0)) {sweep();
break;
}
}
}
}
}
}
sweep
从头节点开始遍历清理匹配节点(取消的节点)的节点关联关系
private void sweep() {
// 从头节点开始,p 开始为头节点,s 为 p 的后继节点
for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
// s 为未匹配的节点,开始遍历下一个
if (!s.isMatched())
// Unmatched nodes are never self-linked
p = s;
// s 已经被匹配了,如果 s 为尾节点,遍历完了,终止
else if ((n = s.next) == null) // trailing node is pinned
break;
// s 的 next 指向自己,说明 s 已经离队
else if (s == n) // stale
// No need to also check for p == s, since that implies s == n
// 从头重新开始
p = head;
else
// 更新 p 的 next
p.casNext(s, n);
}
}
findAndRemove
移除对应的节点
private boolean findAndRemove(Object e) {if (e != null) {
// 循环
for (Node pred = null, p = head; p != null;) {
// 匹配 item
Object item = p.item;
// 数据节点比较 item 是否相等,相等则通过 tryMatchData 自我匹配,然后 unsplice 取消前后节点关系
if (p.isData) {if (item != null && item != p && e.equals(item) &&
p.tryMatchData()) {unsplice(pred, p);
return true;
}
}
// 请求节点同时还未被匹配,队列中没有数据节点,直接跳出
else if (item == null)
break;
pred = p;
// p 已经是旧的数据,需要更新 p 指向 head 重新循环处理
if ((p = p.next) == pred) { // stale
pred = null;
p = head;
}
}
}
return false;
}
总结
主要的源码部分基本已分析完毕,关于迭代器的部分不再详述,读者可自行阅读理解
LinkedTransferQueue 作为一个基于链表的 FIFO 无界阻塞队列,使用了一些复杂的概念,双重队列,松弛度都是需要好好理解的部分,应该先从整体了解其流程处理,再细看其内部实现,其核心方法在于 xfer,可以参考流程图进行梳理,作为阻塞队列,使用好 LinkedTransferQueue 是不容易的,方法的使用需要参考源码,否则用错地方导致线上事故得不偿失,希望本文对各位有所帮助
以上内容如有问题欢迎指出,笔者验证后将及时修正,谢谢