SynchronousQueue 原理详解 - 非公平模式
开篇
说明:本文分析采用的是 jdk1.8
约定:下面内容中 Ref-xxx 代表的是引用地址,引用对应的节点
前面已经讲解了公平模式的内容,今天来讲解下关于非公平模式下的 SynchronousQueue 是如何进行工作的,在源码分析的时候,先来简单看一下非公平模式的简单原理,它采用的栈这种 FILO 先进后出的方式进行非公平处理,它内部有三种状态,分别是 REQUEST,DATA,FULFILLING,其中 REQUEST 代表的数据请求的操作也就是 take 操作,而 DATA 表示的是数据也就是 Put 操作将数据存放到栈中,用于消费者进行获取操作,而 FULFILLING 代表的是可以进行互补操作的状态,其实和前面讲的公平模式也很类似。
当有相同模式情况下进行入栈操作,相同操作指的是 REQUEST 和 DATA 两种类型中任意一种进行操作时,模式相同则进行入栈操作,如下图所示:
同 REQUEST 进行获取数据时的入栈情况:
同样的 put 的操作,进行数据操作时为 DATA 类型的操作,此时队列情况为:
不同模式下又是如何进行操作的?当有不同模式进来的时候,他不是将当前的模式压入栈顶,而是将 FullFill 模式和当前模式进行按位或之后压入栈顶,也就是压入一个进行 FullFill 请求的模式进入栈顶,请求配对操作,如下图所示:
通过上图可见,本来栈中有一个 DATA 模式的数据等待消费者进行消费,这时候来了一个 REQUEST 模式的请求操作来进行消费数据,这时候并没有将 REQUEST 模式直接压入栈顶,而是将其转换为 FULLFILLING 模式,并且保留了原有的类型,这是进行 FULLFILLING 的请求,请求和栈顶下方元素进行匹配,当匹配成功后将栈顶和匹配元素同时进行出栈操作,详细请见下文分析:
TransferStack
字段信息
/** 消费者模式 */
static final int REQUEST = 0;
/** 提供者模式 */
static final int DATA = 1;
/** 互补模式 */
static final int FULFILLING = 2;
/** 栈顶指针 */
volatile SNode head;
方法
方法名 | 描述 |
---|---|
isFulfilling | 判断指定类型是否是互补模式 |
casHead | 替换当前头结点 |
snode | 生成 SNode 节点对象 |
transfer | 主要处理逻辑 |
awaitFulfill | 等待 fulfill 操作 |
shouldSpin | 判断节点 s 是头结点或是 fulfill 节点则返回 true |
SNode 内容
字段信息
volatile SNode next; // 栈下一个元素
volatile SNode match; // 匹配的节点
volatile Thread waiter; // 控制 park/unpark 的线程
Object item; // 数据或请求
int mode; // 模式,上面介绍的三种模式
方法
方法名 | 描述 |
---|---|
casNext | 判断指定类型是否是互补模式 |
tryMatch | 尝试匹配节点,如果存在匹配节点则判断是否是当前节点,直接返回判断结果,如果没有则替换 match 内容并且唤醒线程 |
tryCancel | 生成 SNode 节点对象 |
isCancelled | 主要处理逻辑 |
经过上面内容的分析,接下来就进入正题,让我们整体先看一下下 transfer 都为我们做了些什么内容,下面是 transfer 源码内容:
E transfer(E e, boolean timed, long nanos) {
/*
* Basic algorithm is to loop trying one of three actions:
*
* 1. If apparently empty or already containing nodes of same
* mode, try to push node on stack and wait for a match,
* returning it, or null if cancelled.
*
* 2. If apparently containing node of complementary mode,
* try to push a fulfilling node on to stack, match
* with corresponding waiting node, pop both from
* stack, and return matched item. The matching or
* unlinking might not actually be necessary because of
* other threads performing action 3:
*
* 3. If top of stack already holds another fulfilling node,
* help it out by doing its match and/or pop
* operations, and then continue. The code for helping
* is essentially the same as for fulfilling, except
* that it doesn't return the item.
*/
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) { // 栈顶指针为空或者是模式相同
if (timed && nanos <= 0) { // 制定了 timed 并且时间小于等于 0 则取消操作。if (h != null && h.isCancelled())
casHead(h, h.next); // 判断头结点是否被取消了取消了就弹出队列,将头结点指向下一个节点
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {// 初始化新节点并且修改栈顶指针
SNode m = awaitFulfill(s, timed, nanos); // 进行等待操作
if (m == s) { // 返回内容是本身则进行清理操作
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { // 尝试去匹配
if (h.isCancelled()) // 判断是否已经被取消了
casHead(h, h.next); // 弹出取消的节点并且从新进入主循环
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {// 新建一个 Full 节点压入栈顶
for (;;) { // 循环直到匹配
SNode m = s.next; // s 的下一个节点为匹配节点
if (m == null) { // 代表没有等待内容了
casHead(s, null); // 弹出 full 节点
s = null; // 设置为 null 用于下次生成新的节点
break; // 退回到主循环中
}
SNode mn = m.next;
if (m.tryMatch(s)) {casHead(s, mn); // 弹出 s 节点和 m 节点两个节点
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // 如果失去了匹配
s.casNext(m, mn); // 帮助取消连接
}
}
} else { // 这里是帮助进行 fillull
SNode m = h.next; // m 是头结点的匹配节点
if (m == null) // 如果 m 不存在则直接将头节点赋值为 nll
casHead(h, null); // 弹出 fulfill 节点
else {
SNode mn = m.next;
if (m.tryMatch(h)) // h 节点尝试匹配 m 节点
casHead(h, mn); // 弹出 h 和 m 节点
else // 丢失匹配则直接将头结点的下一个节点赋值为头结点的下下节点
h.casNext(m, mn);
}
}
}
}
- 模式相同的时候则进行等待操作,入队等待操作
- 当模式不相同时,首先判断头结点是否是 fulfill 节点如果不是则进行匹配操作,如果是 fulfill 节点先帮助头结点的 fulfill 节点进行匹配操作
接下来再来看一下 awaitFulfill
方法内容
SNode awaitFulfill(SNode s, boolean timed, long nanos) {final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 等待线程
Thread w = Thread.currentThread();
// 等待时间设置
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {if (w.isInterrupted()) // 判断当前线程是否被中断
s.tryCancel(); // 尝试取消操作
SNode m = s.match; // 获取当前节点的匹配节点,如果节点不为 null 代表匹配或取消操作,则返回
if (m != null)
return m;
if (timed) {nanos = deadline - System.nanoTime();
if (nanos <= 0L) {s.tryCancel();
continue;
}
}
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
通过上面的源码,其实我们之前分析同步模式的时候差不太多,变化的地方其中包括返回内容判断这里判断的是 match 节点是否为 null,还有就是 spins 时间设置这里发现了 shoudSpin
用来判断是否进行轮训,来看一下 shouldSpin
方法:
/**
* 判断节点是否是 fulfill 节点,或者是头结点为空再或者是头结点和当前节点相等时则不需要进行轮训操作
*/
boolean shouldSpin(SNode s) {
SNode h = head;
return (h == s || h == null || isFulfilling(h.mode));
}
实际上就是判断节点是否是 fulfill 节点,或者是头结点为空再或者是头结点和当前节点相等时则不需要进行轮训操作,如果满足上述条件就不小进行轮训等到操作了直接进行等待就行了。
接下来我们来用例子一点点解析原理:
首先先进行一个 put 操作,这样可以简单分析下内部信息。
/**
* SynchronousQueue 原理内容
*
* @author battleheart
*/
public class SynchronousQueueDemo1 {public static void main(String[] args) throws Exception {SynchronousQueue<Integer> queue = new SynchronousQueue<>();
Thread thread1 = new Thread(() -> {
try {queue.put(1);
} catch (InterruptedException e) {e.printStackTrace();
}
});
thread1.start();}
}
首先它会进入到 transfer 方法中,进行第一步的判断他的类型信息,如下所示:
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
通过上面代码可以看到 e = 1 所以是 DATA 类型,接下来进行判断是如何进行操作,当前堆栈是空的,如何判断堆栈为空呢?上面也讲到了 head
节点为空时则代表堆栈为空,接下来就要判断如果 head 节点为空或 head 指向的节点和当前操作内容模式相同,则进行等待操作,如下代码所示:
SNode h = head;
if (h == null || h.mode == mode) { // empty or same-mode
if (timed && nanos <= 0) { // can't wait
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {SNode m = awaitFulfill(s, timed, nanos);
if (m == s) { // wait was cancelled
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);
}
}
显然头结点是空的,所以进入到第一个 fi 语句中执行等待操作,如果指定了 timed 则判断时间是否小于 0,如果小于 0 则直接 null,反之判断当前节点是否不是头结点以及头结点是否取消,潘祖条件弹出头结点,并将下一个节点设置为头结点,上述条件在当前例子中都不满足,所以要进入到下面这段代码中,首先进行对 s 进行初始化值,并且进行入栈操作,casHead(h, s = snode(s, e, h, mode))
,下面看一下栈中的情况如下图所示:
当执行完了入栈操作之后接下来要执行 awaitFulfill
这里的操作就是轮训以及将当前节点的线程赋值,并且挂起当前线程。此时的栈的情况如下图所示:
当有同样的模式进行操作时候也是重复上述的操作内容,我们这里模拟两次 put 操作,让让我们看一下栈中的情况如下图所示:
通过上图可以看到,其实就是将头结点移动到了新的节点上,然后新节点的 next 节点维护这下一个节点的引用,好了,上述内容分析是同模式的操作,接下来我们试着进行 take 操作时,这时候会发什么内容呢?
/**
* SynchronousQueue 例子二进行两次 put 操作和一次 take 操作
*
* @author battleheart
*/
public class SynchronousQueueDemo1 {public static void main(String[] args) throws Exception {SynchronousQueue<Integer> queue = new SynchronousQueue<>();
Thread thread1 = new Thread(() -> {
try {queue.put(1);
} catch (InterruptedException e) {e.printStackTrace();
}
});
thread1.start();
Thread.sleep(2000);
Thread thread2 = new Thread(() -> {
try {queue.put(2);
} catch (InterruptedException e) {e.printStackTrace();
}
});
thread2.start();
Thread.sleep(2000);
Thread thread6 = new Thread(() -> {
try {queue.take();
} catch (InterruptedException e) {e.printStackTrace();
}
});
thread6.start();}
}
上面例子正好符合上面例子两次 put 操作的截图,进行两次 put 操作过后再进行 take 操作,接下来我们来看一下 take 操作是如何进行操作的,换句话说当有不同模式的操作时又是如何进行处理呢?上面分析的内容是同种操作模式下的,当有不同操作则会走下面内容:
else if (!isFulfilling(h.mode)) { // try to fulfill
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
SNode mn = m.next;
if (m.tryMatch(s)) {casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
s.casNext(m, mn); // help unlink
}
}
} else { // help a fulfiller
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
最下面的 else 我们等会来进行分析,我们看到如果不是同模式的话,则会先判断是否是 fulfill 模式,如果不是 fulfill 模式,则进入到第一个 if 语句中,显然通过 图示 6
可以得出,头结点 head
模式并不是 fillfull 模式,则进入到该 if 语句中,上来首先判断当前头结点是否被取消了,如果被取消则将头结点移动到栈顶下一个节点,反之则将 s 节点赋值为 fulfill 模式按位或当前节点模式,个人认为目的是既保留了原有模式也变成了 fulfill 模式,我们开篇就讲到了,REQUEST=0,二进制则是 00,而 DATA=1,其二进制为 01,而 FULFILLING=2,其二进制表示 10,也就是说如果当前节点是 REQUEST 的话那么节点的内容值时 00|10=10,如果节点是 DATA 模式则 s 节点的模式时 01|10=11,这样的话 11 既保留了原有模式也是 FULFILLING 模式,然后将头节点移动到当前 s 节点,也就是将 FULFILLING 模式节点入栈操作,目前分析到这里时casHead(h, s=snode(s, e, h, FULFILLING|mode)
,栈的情况如下图所示:
接下来运行 for 循环里面内容,先运行如下内容:
SNode m = s.next; // m is s's match
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
先判断当前节点也就是头结点 s 的下一个节点上图中 head= s 节点,所以 s.next 节点代表的是Ref-750
,判断当前节点是否为空,如果为空的话代表没有可匹配的节点,先对 head 进行替换为 null 代表堆栈为空,然后将当前 s 节点设置为 null,退出 fulfill 匹配模式进入到主循环中,会重新进行对当前节点进行操作,是消费还是匹配,显然本例子中 m 节点是不为空的,所以这里不会运行,跳过之后运行下面内容:
SNode mn = m.next;
if (m.tryMatch(s)) {casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
s.casNext(m, mn); // help unlink
mn 节点在上图中对应的是Ref-681
,这里是重点,m.tryMatch(s)
,m 节点尝试匹配 s 节点,进入到方法里,到这一步是我们再来看一下头结点的元素的内容:
并且唤醒 m 节点的,告诉 m 节点,你现在有匹配的对象了你可以被唤醒了,这里唤醒之后就会进入到 awaitFulfill
下面的操作
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) { // wait was cancelled
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);
运行这里的线程显然是上图中的 m 节点,因为 m 节点被唤醒了,m== s 代表的是取消了节点,显然没有进行该操作,然后就是帮助头结点进行 fulfill 操作,这里重点说一下这段代码:
if ((h = head) != null && h.next == s)
casHead(h, s.next);
获取当前头结点,也就是上图中的头结点如果不为空而且 h.next 节点为 m 节点正好是 m 节点进行操作时的 s 节点,也就是说这个语句是成立的,直接将头节点指向了上图的 mn 节点,这里的操作和 take 中的下面操作是一样的,也就是帮助 fulfill 操作弹出栈顶和栈顶匹配的节点内容,下面代码:
SNode mn = m.next;
if (m.tryMatch(s)) {casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
s.casNext(m, mn); // help unlink
重点是 casHead 的代码,弹出 s 和 m 两个节点,此时栈中内容如下图所示:
主要的流程分析完毕了,但是细心的朋友会发现,最后面还有一个帮助 fulfill 的操作,(transfer 中)代码如下所示:
else { // help a fulfiller
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
个人理解是这样的,我们上面也分析到了如果模式是相同模式情况和如果是不同模式且模式不为匹配模式的情况,但是还会有另外一种情况就是如果是不同模式并且头结点是匹配模式的就会进入到帮助去 fullfill 的情况,我来画图说明一下该情况:
如上图所示,上一个匹配操作没有进行完然后又来了一个请求操作,他就会帮助 head 进行匹配操作,也就是运行上面的代码逻辑,逻辑和匹配内容是一样的。
接下来让我们看一下取消的 clean 方法内容:
void clean(SNode s) {
s.item = null; // 将 item 值设置为 null
s.waiter = null; // 将线程设置为 null
SNode past = s.next; // s 节点下一个节点如果不为空,并且节点是取消节点则指向下下个节点,这里是结束的标识,代表没有了。if (past != null && past.isCancelled())
past = past.next;
// 如果取消的是头节点则运行下面的清理操作,操作逻辑很简单就是判断头结点是不是取消节点,如果是则将节点一定到下一个节点
SNode p;
while ((p = head) != null && p != past && p.isCancelled())
casHead(p, p.next);
// 取消不是头结点的嵌套节点。while (p != null && p != past) {
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
}
通过源码可以看到首先是先找到一个可以结束的标识 past,也就说到这里就结束了,判断是否不是头节点被取消了,如果是头节点被取消了则进行第一个 while 语句,操作也很简单就是将头节点替换头结点的下一个节点,如果不是头节点被取消了则进行下面的 while 语句操作,其实就是将取消的上一个节点的下一个节点指定为被取消节点的下一个节点,到此分析完毕了。
结束语
如果有分析不正确的请各位指正,我这边改正~