SynchronousQueue 是一个由链表或栈构造组成的阻塞队列,实用于传递性场景,即生产者线程解决的数据间接传递给消费者线程。
队列中不间接存储数据元素(队列容量固定为 0)。采纳无锁算法,每个线程的存入或取出操作没有被匹配时,将会阻塞在队列中期待匹配,外部的链表或栈构造用于暂存阻塞的线程。当消费者生产速度赶不上生产速度,队列会阻塞重大。
1. 继承体系
java.util.concurrent.SynchronousQueue
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable
2. 数据结构
双重栈(Dual stack)或 双重队列(Dual Queue)。
定义了外部类 Transferer,作为栈和队列实现的公共 API。
// 传输器,即两个线程替换元素应用的货色。提供两种实现形式:队列、栈private transient volatile Transferer<E> transferer;
3. 构造函数
如果是偏心模式就应用队列,如果是非偏心模式就应用栈,默认应用栈(非偏心)。
/** * Creates a {@code SynchronousQueue} with nonfair access policy. */public SynchronousQueue() { this(false);}/** * Creates a {@code SynchronousQueue} with the specified fairness policy. * * @param fair if true, waiting threads contend in FIFO order for * access; otherwise the order is unspecified. */public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();}
4. 属性
/** The number of CPUs, for spin control */static final int NCPUS = Runtime.getRuntime().availableProcessors();// 指定超时工夫状况下,以后线程最大自旋次数。 // CPU小于2,不须要自旋,否则自旋32次。 static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32; // 未指定超时工夫状况下,以后线程最大自旋次数。// 16倍static final int maxUntimedSpins = maxTimedSpins * 16; // 指定超时工夫状况下,若自旋完结后如果剩余时间大于该值,则阻塞相应工夫static final long spinForTimeoutThreshold = 1000L;
5. 容量
队列容量固定为 0。
/** * Always returns {@code true}. * A {@code SynchronousQueue} has no internal capacity. * * @return {@code true} */public boolean isEmpty() { return true;}/** * Always returns zero. * A {@code SynchronousQueue} has no internal capacity. * * @return zero */public int size() { return 0;}
6. 存入取出操作
SynchronousQueue 中的传输器定义了公共办法 Transferer#transfer:
java.util.concurrent.SynchronousQueue.Transferer
/** * Shared internal API for dual stacks and queues. */abstract static class Transferer<E> { /** * Performs a put or take. * * @param e if non-null, the item to be handed to a consumer; // 传输的数据元素。非空示意须要向消费者传递数据,为空示意须要向生产者申请数据 * if null, requests that transfer return an item * offered by producer. * @param timed if this operation should timeout // 该操作是否会超时 * @param nanos the timeout, in nanoseconds * @return if non-null, the item provided or received; if null, // 非空示意数据元素传递或接管胜利,为空示意失败 * the operation failed due to timeout or interrupt -- // 失败的起因有两种:1.超时;2.中断,通过 Thread.interrupted 来检测中断 * the caller can distinguish which of these occurred * by checking Thread.interrupted. */ abstract E transfer(E e, boolean timed, long nanos);}
SynchronousQueue 因为继承了 BlockingQueue,遵循办法约定:
抛出异样 非凡值 阻塞 超时插入 add(e) offer(e) put(e) offer(e, time, unit)移除 remove() poll() take() poll(time, unit)查看 element() peek() 不可用 不可用
底层都是通过调用 Transferer#transferer 来实现。
add(e) transferer.transfer(e, true, 0)offer(e) transferer.transfer(e, true, 0)put(e) transferer.transfer(e, false, 0)offer(e, time, unit) transferer.transfer(e, true, unit.toNanos(timeout))remove() transferer.transfer(null, true, 0)poll() transferer.transfer(null, true, 0)take() transferer.transfer(null, false, 0)poll(time, unit) transferer.transfer(null, true, unit.toNanos(timeout))
其中 put 和 take 都是设置为不超时,示意始终阻塞直到实现。
7. 栈实现
栈定义
双重栈(Dual stack),指的是栈中的节点具备两种模式。
TransferStack 进行了扩大,其节点具备三种模式:申请模式、数据模式、匹配模式。
java.util.concurrent.SynchronousQueue.TransferStack
/** Dual stack */static final class TransferStack<E> extends Transferer<E> { /* Modes for SNodes, ORed together in node fields */ /** Node represents an unfulfilled consumer */ static final int REQUEST = 0; // 申请模式,消费者申请数据 /** Node represents an unfulfilled producer */ static final int DATA = 1; // 数据模式,生产者提供数据 /** Node is fulfilling another unfulfilled DATA or REQUEST */ static final int FULFILLING = 2; // 匹配模式,示意数据从正一个节点传递给另外的节点 /** Returns true if m has fulfilling bit set. */ static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; } /** The head (top) of the stack */ volatile SNode head; boolean casHead(SNode h, SNode nh) { return h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh); } /** * Creates or resets fields of a node. Called only from transfer * where the node to push on stack is lazily created and * reused when possible to help reduce intervals between reads * and CASes of head and to avoid surges of garbage when CASes * to push nodes fail due to contention. */ static SNode snode(SNode s, Object e, SNode next, int mode) { if (s == null) s = new SNode(e); s.mode = mode; s.next = next; return s; }}
节点定义
- 应用 mode 标记该节点的模式。
- 以后节点匹配胜利,则 match 设置为所匹配的节点。
- 以后节点勾销匹配,则 match 设置为本身。
- 应用 waiter 存储操作该节点的线程,期待匹配时挂起该线程,匹配胜利时需唤醒该线程。
java.util.concurrent.SynchronousQueue.TransferStack.SNode
/** Node class for TransferStacks. */static final class SNode { volatile SNode next; // next node in stack // 栈中下一个节点 volatile SNode match; // the node matched to this // 以后节点所匹配的节点 volatile Thread waiter; // to control park/unpark // 期待着的线程 Object item; // data; or null for REQUESTs // 数据元素 int mode; // 节点的模式:REQUEST、DATA、FULFILLING // Note: item and mode fields don't need to be volatile // since they are always written before, and read after, // other volatile/atomic operations. SNode(Object item) { this.item = item; } // 若下一个节点为cmp,将其替换为节点val boolean casNext(SNode cmp, SNode val) { return cmp == next && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } /** * Tries to match node s to this node, if so, waking up thread. * Fulfillers call tryMatch to identify their waiters. * Waiters block until they have been matched. * * @param s the node to match * @return true if successfully matched to s */ // m.tryMatch(s),示意节点m尝试与节点s进行匹配。留神入参节点s由以后线程所持有,而节点m的线程是阻塞状态的(期待匹配) boolean tryMatch(SNode s) { if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { // 如果m还没有匹配者,就把s作为它的匹配者,设置m.match为s Thread w = waiter; // 节点m的线程 if (w != null) { // waiters need at most one unpark waiter = null; LockSupport.unpark(w); // 唤醒m中的线程,两者匹配结束 // 补充:节点m的线程从TransferStack#awaitFulfill办法中被唤醒,查看得悉被匹配胜利了,返回匹配的节点 } return true; } return match == s; // 可能其它线程先一步匹配了m,判断是否是s } /** * Tries to cancel a wait by matching node to itself. */ // 将节点的match属性设为本身,示意勾销 void tryCancel() { UNSAFE.compareAndSwapObject(this, matchOffset, null, this); } boolean isCancelled() { return match == this; }}
transfer
根本思维,在循环中尝试三种操作:
- 以后栈内为空,或者栈顶节点模式与以后节点模式统一,尝试着把以后节点入栈并且期待匹配。若匹配胜利,返回该节点的数据。若勾销匹配,则返回空。
- 如果栈顶节点模式与以后节点模式互补,尝试把以后节点入栈进行匹配,再把匹配的两个节点弹出栈。
- 如果栈顶节点的模式为匹配中,则帮助匹配和弹出。
java.util.concurrent.SynchronousQueue.TransferStack#transfer
/** * Puts or takes an item. */@SuppressWarnings("unchecked")E transfer(E e, boolean timed, long nanos) { SNode s = null; // constructed/reused as needed int mode = (e == null) ? REQUEST : DATA; // 元素为空,为申请模式,否则为数据模式(留神,以后节点入栈前不会是匹配模式!) for (;;) { // 自旋CAS SNode h = head; // 模式雷同 if (h == null || h.mode == mode) { // empty or same-mode // 栈顶没有元素,或者栈顶元素跟以后元素是一个模式的(申请模式或数据模式) if (timed && nanos <= 0) { // can't wait // 已超时 if (h != null && h.isCancelled()) // 头节点h不为空且已勾销 casHead(h, h.next); // pop cancelled node // 把h.next作为新的头节点 // 把曾经勾销的头节点弹出,并进入下一次循环 else return null; } else if (casHead(h, s = snode(s, e, h, mode))) { // 把节点s入栈,作为新的头节点,s.next指向h(因为是模式雷同的,所以只能入栈) SNode m = awaitFulfill(s, timed, nanos); // 期待匹配,自旋阻塞以后线程 if (m == s) { // wait was cancelled // 期待被勾销了,须要革除节点s clean(s); // 出栈 return null; } if ((h = head) != null && h.next == s) // 执行到这里,阐明节点s已匹配胜利 // 若栈顶有数据,头节点为h,下一个节点为s,须要将节点h和s出栈 // 因为是期待匹配,这里是等到了其余线程入栈新的节点,跟以后节点s匹配了 casHead(h, s.next); // help s's fulfiller // 把s.next作为新的头节点 return (E) ((mode == REQUEST) ? m.item : s.item); // 若以后为申请模式,返回匹配到的节点m的数据;否则返回节点s的数据 } } // 模式互补(执行到这里,阐明栈顶元素跟以后元素的模式不同) else if (!isFulfilling(h.mode)) { // try to fulfill // 这里判断栈顶模式不是FULFILLING(匹配中),阐明是互补的 if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry // 弹出已勾销的栈顶元素,并重试 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { // 把节点s入栈,并设为匹配模式(FULFILLING|mode的后果合乎TransferStack#isFulfilling) for (;;) { // loop until matched or waiters disappear SNode m = s.next; // m is s's match if (m == null) { // all waiters are gone // 如果m为null,阐明除了s节点外的节点都被其它线程先一步匹配掉了,就清空栈并跳出外部循环,到内部循环再从新入栈判断 casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } SNode mn = m.next; if (m.tryMatch(s)) { // 如果m和s尝试匹配胜利,就弹出栈顶的两个元素s和m casHead(s, mn); // pop both s and m return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match // m和s匹配失败,阐明m曾经先一步被其它线程匹配了,须要革除 s.casNext(m, mn); // help unlink // 若s的下一个节点为m,则将m换成mn } } } // 模式互补(执行到这里,阐明栈顶元素跟以后元素的模式不同,且栈顶是匹配模式,阐明栈顶和栈顶上面的节点正在产生匹配,以后申请须要做帮助工作) else { // help a fulfiller SNode m = h.next; // m is h's match if (m == null) // waiter is gone // 节点m是h匹配的节点,然而m为空,阐明m曾经被其它线程先一步匹配了 casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match // 帮助匹配,如果节点m和h匹配,则弹出h和m casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink // 如果节点m和h不匹配,则将h的下一个节点换为mn(即m曾经被其余线程匹配了,须要革除它) } } }}
留神:
- TransferStack#casHead 能够用于入栈,也能够用于出栈。
mode = FULFILLING|mode
失去的后果满足 TransferStack#isFulfilling,阐明是匹配模式。- TransferStack#awaitFulfill 是被动期待匹配
- TransferStack.SNode#tryMatch 是被动进行匹配
- 节点从阻塞中被其余线程唤醒时,个别是位于栈的第二个节点,此时栈的头节点是新入栈的与之匹配的节点。
awaitFulfill
节点s自旋或阻塞,直到被其余节点匹配。
java.util.concurrent.SynchronousQueue.TransferStack#awaitFulfill
/** * Spins/blocks until node s is matched by a fulfill operation. * * @param s the waiting node * @param timed true if timed wait * @param nanos timeout value * @return matched node, or s if cancelled */SNode awaitFulfill(SNode s, boolean timed, long nanos) { // 节点s自旋或阻塞,直到被其余节点匹配 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 到期工夫 Thread w = Thread.currentThread(); // 以后线程 int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); // 自旋次数(自旋完结后再判断是否须要阻塞) for (;;) { if (w.isInterrupted()) // 以后线程中断了,尝试勾销节点s s.tryCancel(); // 勾销操作,设置 s.match = s SNode m = s.match; // 查看节点s是否匹配到了节点m(由其它线程的m匹配到以后线程的s,或者s已勾销) if (m != null) return m; // 如果匹配到了,间接返回m if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { // 已超时,尝试勾销节点s s.tryCancel(); continue; } } if (spins > 0) spins = shouldSpin(s) ? (spins-1) : 0; // 如果spins大于0,则始终自减直到为0,才会执行去elseif的逻辑 else if (s.waiter == null) s.waiter = w; // establish waiter so can park next iter // 如果s的waiter为null,把以后线程注入进去,并进入下一次自旋 else if (!timed) LockSupport.park(this); // 如果无设置超时,则阻塞期待被其它线程唤醒,唤醒后持续自旋并查看是否匹配到了元素 else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); // 如果设置超时且还有剩余时间,就阻塞相应工夫 }}
在自旋过程中,执行以下逻辑:
- 查看以后线程是否已中断,是则勾销以后节点。
- 查看以后线程是否已匹配胜利,是则返回匹配的节点。
- 自旋完结,若容许超时,判断是否达到超时工夫,未达到则阻塞剩余时间。
- 自旋完结,若不容许超时,则阻塞期待唤醒。
期待匹配的过程中,首先进行自旋再判断是否进入阻塞,是因为线程的阻塞和唤醒操作波及到零碎内核态与用户态之间的切换,比拟耗时。
8. 队列实现
队列定义
双重队列(Dual Queue),指的是队列中的节点具备两种模式。
TransferStack 进行了扩大,队列中大部分节点具备两种模式:申请节点、数据节点。
队列的头节点不属于这两种模式,而是空节点(dummy node)。
java.util.concurrent.SynchronousQueue.TransferQueue
/** Dual Queue */static final class TransferQueue<E> extends Transferer<E> { // 队列实现 /** Head of queue */ transient volatile QNode head; // 头节点 /** Tail of queue */ transient volatile QNode tail; // 尾节点 /** * Reference to a cancelled node that might not yet have been * unlinked from queue because it was the last inserted node * when it was cancelled. */ transient volatile QNode cleanMe; TransferQueue() { QNode h = new QNode(null, false); // initialize to dummy node. // 队列的头节点,是一个空节点 head = h; tail = h; } /** * Tries to cas nh as new head; if successful, unlink * old head's next node to avoid garbage retention. */ // CAS将头节点(TransferQueue#head属性的值)从节点h改为节点nh,再将h出队 void advanceHead(QNode h, QNode nh) { if (h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh)) h.next = h; // forget old next // 将节点h的next指针设为本身,示意h出队,不便垃圾回收 } /** * Tries to cas nt as new tail. */ void advanceTail(QNode t, QNode nt) { if (tail == t) UNSAFE.compareAndSwapObject(this, tailOffset, t, nt); }}
节点定义
- 应用 isData 标记该节点的模式。
- 以后节点匹配胜利,则 item 设置为所匹配的节点的数据。
- 以后节点勾销匹配,则 item 设置为本身。
- 以后节点出队,则 next 指向本身。
- 应用 waiter 存储操作该节点的线程,期待匹配时挂起该线程,匹配胜利时需唤醒该线程。
java.util.concurrent.SynchronousQueue.TransferQueue.QNode
/** Node class for TransferQueue. */static final class QNode { volatile QNode next; // next node in queue // 队列的下一个节点 volatile Object item; // CAS'ed to or from null // 数据元素 volatile Thread waiter; // to control park/unpark // 期待着的线程 final boolean isData; // true示意为DATA类型,false示意为REQUEST类型 QNode(Object item, boolean isData) { this.item = item; this.isData = isData; } boolean casNext(QNode cmp, QNode val) { return next == cmp && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } boolean casItem(Object cmp, Object val) { return item == cmp && UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } /** * Tries to cancel by CAS'ing ref to this as item. */ void tryCancel(Object cmp) { UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this); } boolean isCancelled() { return item == this; } /** * Returns true if this node is known to be off the queue * because its next pointer has been forgotten due to * an advanceHead operation. */ // 如果节点曾经出队了,则返回 true。由 TransferQueue#advanceHead 操作将 next 指向本身。 boolean isOffList() { return next == this; }}
transfer
根本思维,在循环中尝试两种操作:
- 如果队列为空,或者尾节点模式与以后节点模式雷同,则入队(尾部)并期待匹配。
- 如果队列中有期待中的节点,且首个非空节点与以后节点的模式互补,则匹配并出队(头部)。
/** * Puts or takes an item. */@SuppressWarnings("unchecked")E transfer(E e, boolean timed, long nanos) { QNode s = null; // constructed/reused as needed boolean isData = (e != null); for (;;) { QNode t = tail; QNode h = head; if (t == null || h == null) // saw uninitialized value continue; // spin if (h == t || t.isData == isData) { // empty or same-mode // 队列为空,或者队尾节点模式与以后节点模式雷同,只能入队 QNode tn = t.next; if (t != tail) // inconsistent read // 尾节点已过期,需从新获取尾节点 continue; if (tn != null) { // lagging tail // 尾节点已过期,需从新获取尾节点 advanceTail(t, tn); continue; } if (timed && nanos <= 0) // can't wait // 已超时 return null; if (s == null) s = new QNode(e, isData); // 以后节点设为s if (!t.casNext(null, s)) // failed to link in // 把以后节点s插入节点t之后,若失败则重试 continue; advanceTail(t, s); // swing tail and wait // 将节点s设为新的尾节点 Object x = awaitFulfill(s, e, timed, nanos); // 节点s期待匹配 if (x == s) { // wait was cancelled // 阐明以后节点s已勾销,须要出队 clean(t, s); return null; } if (!s.isOffList()) { // not already unlinked // 执行到这里,阐明节点s匹配胜利。如果s尚未出队,则进入以下逻辑 advanceHead(t, s); // unlink if head // 已知t是s的上一个节点,这里判断如果t是头节点,则把t出队,把节点s作为新的头节点 // 上面操作进一步把s设为空节点,即dummy node if (x != null) // and forget fields s.item = s; // 把节点s的数据域设为本身,示意已勾销,不再匹配 s.waiter = null; } return (x != null) ? (E)x : e; // REQUEST类型,返回匹配到的数据x,否则返回e } else { // complementary-mode // 互补 QNode m = h.next; // node to fulfill // 首个非空节点,将它与申请节点匹配 if (t != tail || m == null || h != head) continue; // inconsistent read Object x = m.item; if (isData == (x != null) || // m already fulfilled // 并不是互补的,阐明m曾经被先一步匹配了 x == m || // m cancelled // m曾经勾销 !m.casItem(x, e)) { // lost CAS // 若CAS失败阐明m数据元素不为x,即节点m曾经被匹配了。若CAS胜利则阐明匹配胜利。 advanceHead(h, m); // dequeue and retry // 把头节点出队,把节点m作为新的头节点(dummy node),持续重试 continue; } advanceHead(h, m); // successfully fulfilled // 匹配胜利,把节点m作为新的头节点(dummy node) LockSupport.unpark(m.waiter); // 唤醒m上的线程 return (x != null) ? (E)x : e; } }}
留神:
- 入队的时候,总是先获取最新的尾节点,阐明 tail 是精确的(比照其余并发队列)。
- 出队的时候,总是先获取最新的头节点,阐明 head 是精确的(比照其余并发队列)。
- TransferQueue#awaitFulfill 是被动期待匹配。
m.casItem(x, e)
是被动进行匹配,这里 m 为队首第一个非空节点,x 为 m.item,e 为入参数据元素。- 依据 FIFO 规定,当阻塞期待中的节点被唤醒时,表明该节点要么已超时,要么已从队尾排到了队头且匹配胜利。
- 因为头节点是空节点(dummy node),从队头进行匹配时,不是比拟 head 节点,而是比拟 head.next 节点。
- 从队头匹配胜利时,以后节点并不会入队,而是把旧的头节点出队,把匹配的节点设为新的头节点(dummy node)。
awaitFulfill
节点s自旋或阻塞,直到被其余节点匹配。
java.util.concurrent.SynchronousQueue.TransferQueue#awaitFulfill
/** * Spins/blocks until node s is fulfilled. * * @param s the waiting node * @param e the comparison value for checking match * @param timed true if timed wait * @param nanos timeout value * @return matched item, or s if cancelled */Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { // 节点s期待匹配,其数据元素为e /* Same idea as TransferStack.awaitFulfill */ final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted()) s.tryCancel(e); Object x = s.item; // 查看节点s是否匹配到了数据x,留神这里是跟栈构造不一样的中央! if (x != e) return x; if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(e); continue; } } if (spins > 0) --spins; else if (s.waiter == null) s.waiter = w; else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); }}
除了检测是否被匹配胜利和勾销节点的形式,与 TransferStack#awaitFulfill 不同之外,整体逻辑是一样的。
作者:Sumkor
链接:https://segmentfault.com/a/11...