共计 13021 个字符,预计需要花费 33 分钟才能阅读完成。
今天继续来讲解阻塞队列,一个比较特殊的阻塞队列 SynchronousQueue,通过 Executors 框架提供的线程池 cachedThreadPool 中我们可以看到其被使用作为可缓存线程池的队列实现,下面通过源码来了解其内部实现,便于后面帮助我们更好的使用线程池
前言
JDK 版本号:1.8.0_171
synchronousQueue 是一个没有数据缓冲的阻塞队列,生产者线程的插入操作 put() 必须等待消费者的删除操作 take(),反过来也一样。当然,也可以不进行等待直接返回,例如 poll 和 offer
在使用上很好理解,每次操作都需要找到对应的匹配操作,如 A 线程通过 put 插入操作填入值 1,如果无其他线程操作则需要阻塞等待一个线程执行 take 操作 A 线程才能继续,反过来同样道理,这样看似乎 synchronousQueue 是没有队列进行保存数据的,每次操作都在等待其互补操作一起执行
这里和其他阻塞队列不同之处在于,内部类将入队出队操作统一封装成了一个接口实现,内部类数据保存的是每个操作动作,比如 put 操作,保存插入的值,并根据标识来判断是入队还是出队操作,如果是 take 操作,则值为 null,通过标识符能判断出来是出队操作
多思考下,我们需要找到互补的操作必然需要一个公共的区域来判断已经发生的所有操作,内部类就是用来进行这些操作的,SynchronousQueue 分为公平策略(FIFO)和非公平策略 (LIFO),两种策略分别对应其两个内部类实现,公平策略使用队列结构实现,非公平策略使用栈结构实现
由于篇幅过长,本篇先说明 SynchronousQueue 相关知识和公平策略下的实现类 TransferQueue,下篇将说明非公平策略下的实现类 TransferStack 和其他知识
类定义
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
常量 / 变量
/** The number of CPUs, for spin control */
// cpu 数量,会在自旋控制时使用
static final int NCPUS = Runtime.getRuntime().availableProcessors();
// 自旋次数,指定了超时时间时使用,这个常量配合 CAS 操作使用,相当于循环次数
// 如果 CAS 操作失败,则根据这个参数判断继续循环
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
// 自旋次数,未指定超时时间时使用
static final int maxUntimedSpins = maxTimedSpins * 16;
/**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices.
*/
// 自旋超时时间阈值,在设置的时间超过这个时间时以这个时间为准,单位,纳秒
static final long spinForTimeoutThreshold = 1000L;
// 后进先出队列和先进先出队列
@SuppressWarnings("serial")
static class WaitQueue implements java.io.Serializable { }
static class LifoWaitQueue extends WaitQueue {private static final long serialVersionUID = -3633113410248163686L;}
static class FifoWaitQueue extends WaitQueue {private static final long serialVersionUID = -3623113410248163686L;}
// 序列化操作使用
private ReentrantLock qlock;
private WaitQueue waitingProducers;
private WaitQueue waitingConsumers;
/**
* The transferer. Set only in constructor, but cannot be declared
* as final without further complicating serialization. Since
* this is accessed only at most once per public method, there
* isn't a noticeable performance penalty for using volatile
* instead of final here.
*/
// 所有的队列操作都通过 transferer 来执行,统一方法执行
// 初始化时会根据所选的策略实例化对应的内部实现类
private transient volatile Transferer<E> transferer;
从上边也能看出没有设置变量来保存入队出队操作的数据,统一操作方法都放置到了 Transferer 中
构造方法
构造方法很清晰,根据所选的策略实现对应的 Transferer 内部接口实现类来进行队列操作
// 默认非公平策略
public SynchronousQueue() {this(false);
}
// 可选策略,通过两个内部类 TransferQueue 和 TransferStack 来实现公平策略(队列)和非公平策略(栈)public SynchronousQueue(boolean fair) {transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();}
抽象类 Transferer
抽象内部类 Transferer,transfer 方法用来替代 put 和 take 操作,每个参数解释如下:
- e:非空时,说明本身为 put 操作,生产者入队操作,需要消费者出队操作来获取入队的值;空时,说明本身是消费者,消费者消费操作需要生产者入队操作拿到入队的值
- timed:操作是否设置超时
- nanos:超时时间,单位: 纳秒
返回值:非空则表明操作成功,返回消费的 item 或生产的 item;空则表明由于超时或中断引起操作失败。调用者可以通过检查 Thread.interrupted 判断是哪种原因
/**
* Shared internal API for dual stacks and queues.
*/
abstract static class Transferer<E> {
/**
* Performs a put or take.
*
* @param e if non-null, the item to be handed to a consumer;
* if null, requests that transfer return an item
* offered by producer.
* @param timed if this operation should timeout
* @param nanos the timeout, in nanoseconds
* @return if non-null, the item provided or received; if null,
* the operation failed due to timeout or interrupt --
* the caller can distinguish which of these occurred
* by checking Thread.interrupted.
*/
abstract E transfer(E e, boolean timed, long nanos);
}
重要方法
put/offer
入队操作通过内部类调用 transfer,传参含义如下已在上面内部抽象类中说明,入队元素 e 非空
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {Thread.interrupted();
throw new InterruptedException();}
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {if (e == null) throw new NullPointerException();
if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
return true;
if (!Thread.interrupted())
// 超时返回 false
return false;
// 线程中断抛错
throw new InterruptedException();}
public boolean offer(E e) {if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, 0) != null;
}
take/poll
出队操作通过内部类调用 transfer,入队元素 e 为 null
public E take() throws InterruptedException {E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {E e = transferer.transfer(null, true, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();}
public E poll() {return transferer.transfer(null, true, 0);
}
其他方法以空队列为标准进行处理,比如队列长度直接返回 0,判空总是返回 true,其他方法类似,直接参考源码,比较简单,不多说
内部实现类
上面已经看到了最重要的核心方法在于 transferer.transfer 方法,那么其具体的实现类中这个方法是如何实现的呢?
先说明公平策略下的实现类 TransferQueue
TransferQueue
基于 Transferer 实现公平策略下的实现类 TransferQueue,既然是公平策略,则需要先进先出,这里 queue 也表明其结构特点,内部通过 QNode 类实现链表的队列形态,通过 CAS 操作更新链表元素
有两种状态需要注意:
- 取消操作(被外部中断或者超时):item == this;
- 离队操作(已成功匹配,找到互补操作):next == this;
QNode
QNode 即为队列的链表实现,其中的变量属性 isData 也可以看出其保存的是每次的操作动作而不仅仅是入队的值,入队操作会以 QNode 保存,出队操作同样会以 QNode 保存,同时变量都是通过 CAS 操作更新
static final class QNode {
// next 指向链表下一个节点
volatile QNode next; // next node in queue
// 队列元素的值
volatile Object item; // CAS'ed to or from null
// 保存等待的线程
volatile Thread waiter; // to control park/unpark
// 是否有数据,队列元素的类型标识,入队时有数据值为 true,出队时无数据值为 false
final boolean isData;
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
// cas 操作更新 next
boolean casNext(QNode cmp, QNode val) {
return next == cmp &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// cas 操作更新 item
boolean casItem(Object cmp, Object val) {
return item == cmp &&
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
// cas 操作取消操作,将当前的 QNode 的 item 赋值为当前的 QNode
void tryCancel(Object cmp) {UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
// 判断是否取消成功,紧跟着 tryCancel 操作后进行判断
boolean isCancelled() {return item == this;}
// 判断当前节点是否已处于离队状态,这里看到是将节点 next 指向自己
boolean isOffList() {return next == this;}
// 获取 item 和 next 的偏移量, 操作 CAS 使用
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = QNode.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {throw new Error(e);
}
}
}
变量
队头队尾元素引用设置,需要注意的是 cleanMe 节点的含义,在具体方法操作中会进行说明
/** Head of queue */
// 队头
transient volatile QNode head;
/** Tail of queue */
// 队尾
transient volatile QNode tail;
/**
* Reference to a cancelled node that might not yet have been
* unlinked from queue because it was the last inserted node
* when it was cancelled.
*/
// 标记节点,清理链表尾部节点时,不直接删除尾部节点,而是将尾节点的前驱节点 next 指向设置为 cleanMe
// 防止此时向尾部插入节点的线程失败导致出现数据问题
transient volatile QNode cleanMe;
// 偏移量获取
private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
private static final long tailOffset;
private static final long cleanMeOffset;
static {
try {UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = TransferQueue.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
tailOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("tail"));
cleanMeOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("cleanMe"));
} catch (Exception e) {throw new Error(e);
}
}
构造方法
头尾节点初始化操作
TransferQueue() {
// 初始化一个值为 null 的 QNode,初始化头尾节点
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
advanceHead/advanceTail/casCleanMe
CAS 更新变量操作
/**
* Tries to cas nh as new head; if successful, unlink
* old head's next node to avoid garbage retention.
*/
// 尝试将 nh 更新为新的队头
void advanceHead(QNode h, QNode nh) {
if (h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
// 原头节点 next 指向更新为自己,使得 h 为离队状态,isOffList 方法为 true
h.next = h; // forget old next
}
/**
* Tries to cas nt as new tail.
*/
// 尝试更新队尾节点
void advanceTail(QNode t, QNode nt) {if (tail == t)
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
/**
* Tries to CAS cleanMe slot.
*/
// 尝试更新 cleanMe 节点
boolean casCleanMe(QNode cmp, QNode val) {
return cleanMe == cmp &&
UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
}
transfer
入队和出队操作,统一使用一个方法,即实现接口中的 transfer 方法来完成,需要明白的是保存的是每次操作这个动作
/**
* Puts or takes an item.
*/
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
// e 为 null 时相当于出队操作 isData 为 false, 入队操作为 true
boolean isData = (e != null);
for (;;) {
// 获取最新的尾节点和头节点
QNode t = tail;
QNode h = head;
// 头,尾节点为空,未初始化,则循环 spin
if (t == null || h == null) // saw uninitialized value
continue; // spin
// 首尾节点相同则为空队列或尾节点类型和新操作的类型相同,都是入队操作或出队操作
// 为何只判断尾部,因为如果头节点和尾结点不同在队列中不可能存在
// 一入队和一出队直接进入 else 匹配上不会再保存在链表中
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
// 尾节点已经被其他线程更新修改,则重新循环判断
if (t != tail) // inconsistent read
continue;
// tn 非空,说明其他线程已经添加了节点,尝试更新尾节点,重新循环判断
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
// 设置超时时间并且超时时间小于等于 0 则直接返回 null
if (timed && nanos <= 0) // can't wait
return null;
// s 为 null 则初始化节点 s
if (s == null)
s = new QNode(e, isData);
// 尝试将 s 添加到尾节点的 next 上,失败则重新循环
if (!t.casNext(null, s)) // failed to link in
continue;
// 尝试更新尾节点,尾节点此时为 s
advanceTail(t, s); // swing tail and wait
// 通过 awaitFulfill 方法自旋阻塞找到匹配操作的节点 item, 这个下面进行说明
Object x = awaitFulfill(s, e, timed, nanos);
// 表示当前线程已经中断或者超时,在 awaitFulfill 超时或者中断时更新 s.item 指向自己
if (x == s) { // wait was cancelled
// 清理节点,取消本次操作
clean(t, s);
return null;
}
// 判断 s 是否已从队列移除,正常情况下,出队和入队操作匹配上 s 节点肯定是需要被清理掉的
if (!s.isOffList()) { // not already unlinked
// 未被从队列清除则尝试更新队头
advanceHead(t, s); // unlink if head
// 当前线程为出队操作时,s 节点取消操作
if (x != null) // and forget fields
s.item = s;
// 清除等待线程
s.waiter = null;
}
return (x != null) ? (E)x : e;
// 与上次队列操作非同一类型操作,上次入队,这次为出队,上次出队,这次为入队才会执行
// 匹配操作才会执行下面逻辑
} else { // complementary-mode
QNode m = h.next; // node to fulfill
// 头节点或尾节点被其他线程更新或者为空队列则循环操作
if (t != tail || m == null || h != head)
continue; // inconsistent read
// 头节点的下一个节点对应的 item
Object x = m.item;
// 同类型,被取消操作或更新 item 失败则更新头节点指向重新操作
if (isData == (x != null) || // m already fulfilled 相同类型操作说明 m 已经被其他线程操作匹配
x == m || // m cancelled 取消操作标识
// CAS 更新 item 为匹配上的操作值,比如当前是出队操作,m 为入队操作 x 为入队的值,那么此时要替换为出队值 null
// CAS 操作失败
!m.casItem(x, e)) { // lost CAS
// 删除匹配上的头节点,更新头节点
advanceHead(h, m); // dequeue and retry
continue;
}
// 更新头节点
advanceHead(h, m); // successfully fulfilled
// 释放 m 的等待线程锁使得 m 操作结束
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
awaitFulfill
在 transfer 相同类型操作时被调用,正常情况下(不算超时和中断)阻塞线程直到与之匹配的操作到来再继续执行
/**
* Spins/blocks until node s is fulfilled.
*
* @param s the waiting node
* @param e the comparison value for checking match
* @param timed true if timed wait
* @param nanos timeout value
* @return matched item, or s if cancelled
*/
// 自旋或阻塞直到超时或被唤醒匹配上节点
// 比如此时是入队操作,上次也是入队操作,在未设置超时时,这里可能需要自旋或阻塞等待一个出队操作来唤醒本次入队操作
// 相当于互补匹配上同时继续完成后续操作,出队操作拿走入队操作的值才能完成 入队操作被出队操作获取值才能完成
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
// 获取超时时间点
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 当前线程
Thread w = Thread.currentThread();
// 仅在 head.next== s 时才使用 spins(自旋次数),同时判断是否设置了超时
// 非 head.next 则不走 spins,相当于只是在第一次操作入链表时执行自旋 spins 操作,不是上来就进行阻塞
// 也能明白,在入队和出队操作匹配时 新操作是和头节点匹配,故自旋一定次数而不是直接阻塞来提升执行效率,减少线程切换开销
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 判断当前线程是否中断,外部中断操作,相当于取消本次操作
if (w.isInterrupted())
// 尝试将 s 节点的 item 设置为 s 自己,这样判断的时候就知道这个节点是被取消的
s.tryCancel(e);
Object x = s.item;
// s 的 item 已经改变,直接返回 x
// 没改变的情况下即没有匹配的操作,有匹配上的 item 即 x 将被改变,取消时如上也会改变,如下超时也会改变
// 故 return 后还需要要区分出取消和超时的情况
if (x != e)
return x;
// 线程超时将 s 节点的 item 设置为 s 自己
if (timed) {nanos = deadline - System.nanoTime();
if (nanos <= 0L) {s.tryCancel(e);
continue;
}
}
// 需要自旋时循环
if (spins > 0)
--spins;
// 设置 s 的等待线程
else if (s.waiter == null)
s.waiter = w;
// 未设置超时,直接阻塞
else if (!timed)
LockSupport.park(this);
// 设置超时时间阻塞
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
clean
清理 s 节点,同时需要关注 cleanMe 节点,整体处理过程如下:
- 删除的节点不是尾节点,使用 pred.casNext(s, s.next) 方式来进行清理
- 删除节点是队尾节点,如果 cleanMe 为空,则将其前继节点 pred 更新为 cleanMe, 为下次删除做准备
- 如果 cleanMe 不为空,则根据 cleanMe 删除上次需要删除的节点, 然后将 cleanMe 置空, 如果此次 pred 非之前 cleanMe,则 cleanMe 置为 pred,为下一次删除操作做准备
/**
* Gets rid of cancelled node s with original predecessor pred.
*/
// 中断取消操作将 pred 节点代替 s 节点,修改前后节点之间的关联
void clean(QNode pred, QNode s) {
// 清理前先将等待线程置空
s.waiter = null; // forget thread
// pred 与 s 的前后关系
while (pred.next == s) { // Return early if already unlinked
QNode h = head;
QNode hn = h.next; // Absorb cancelled first node as head
// hn 非空且被取消操作,更新头节点为 hn
if (hn != null && hn.isCancelled()) {advanceHead(h, hn);
continue;
}
// 尾节点
QNode t = tail; // Ensure consistent read for tail
// 空队列返回
if (t == h)
return;
// 尾节点下一个
QNode tn = t.next;
// 尾节点已被其他线程更新
if (t != tail)
continue;
// 非空 更新尾节点
if (tn != null) {advanceTail(t, tn);
continue;
}
// s 不是尾节点
if (s != t) { // If not tail, try to unsplice
// s 的下一个节点
QNode sn = s.next;
// 更新 pred 节点后一个节点为 s 的下一个节点,相当于删除 s 在链表中的关系
if (sn == s || pred.casNext(s, sn))
return;
}
// 执行到这里说明 s 为尾节点则需要处理 cleanMe 节点
QNode dp = cleanMe;
if (dp != null) { // Try unlinking previous cancelled node
// 被清除的节点,从下面 else 部分代码也可以看出如果为空,传入的节点为清理节点的前一个节点
// 这里代表上次需要清理的 cleanMe 节点
// 这里 d 代表真正需要清理的节点即 dp.next
QNode d = dp.next;
QNode dn;
if (d == null || // 清除节点为 null,相当于已经清理了
d == dp || // dp 节点处于离队状态
!d.isCancelled() || // 清除节点被取消
(d != t && // 清除节点非尾节点
(dn = d.next) != null && // 清除节点下一节点非 null
dn != d && // 清除节点下一节点在队列中
dp.casNext(d, dn))) // 清理 d 与其他节点的关系
casCleanMe(dp, null); // 清理完毕设置为 null
// 相当于 s 为需要清理的节点,上边已经清理过了,不需要再次清理
if (dp == pred)
return; // s is already saved node
// 更新 cleanMe 为 pred,为下次清理准备
} else if (casCleanMe(null, pred))
return; // Postpone cleaning s
}
}
举例说明
TransferQueue 的源码操作上面已经说明完毕,为了更好的理解其内部数据转换,举个例子同时画图进行说明方便各位理解:
public class SynchronousQueueTest {public static void main(String[] args) {BlockingQueue<String> sq = new SynchronousQueue<>(true);
new Thread(() -> {
try {System.out.println(sq.take());
} catch (InterruptedException e) {e.printStackTrace();
}
}).start();
new Thread(() -> {
try {sq.put("test");
} catch (InterruptedException e) {e.printStackTrace();
}
}).start();}
}
上面代码很简单,公平模式下,一个线程执行 take 操作,一个线程执行 put 操作,那么 SynchronousQueue 内部是如何处理的呢?我们以图进行说明
- 创建公平策略下的 SynchronousQueue
- 一线程执行 take 操作,以先执行 take 的线程为例子进行说明,此时另一线程 put 操作还未执行,take 操作阻塞等待
- 另一线程执行 put 操作,唤醒之前阻塞等待的 take 操作,同时处理完成
之后会进行节点的清理和头尾节点的指向更新,这部分自行读者可自行画图理解
总结
SynchronousQueue 是一个无数据缓冲的阻塞队列,在不进行超时和中断的情况下,入队操作需匹配出队操作才能继续执行下去,相当于进行互补操作,同时执行,成双成对完成,在理解这点的基础上,我们可以看到其拥有以下特点:
- 内部实现主要在于两个内部实现类,同时需要理解内部类中节点保存的不是数据而是每次操作这个动作
- 通过封装的一个接口 transfer 来完成入队和出队操作
- cleanMe 节点操作时的特殊考虑
非公平策略下的实现类 TransferStack 和其他知识将放在下篇文章进行说明
以上内容如有问题欢迎指出,笔者验证后将及时修正,谢谢