关于java:阅读-JDK-8-源码SynchronousQueue

33次阅读

共计 14351 个字符,预计需要花费 36 分钟才能阅读完成。

SynchronousQueue 是一个由链表或栈构造组成的阻塞队列,实用于传递性场景,即生产者线程解决的数据间接传递给消费者线程。
队列中不间接存储数据元素(队列容量固定为 0)。采纳无锁算法,每个线程的存入或取出操作没有被匹配时,将会阻塞在队列中期待匹配,外部的链表或栈构造用于暂存阻塞的线程。当消费者生产速度赶不上生产速度,队列会阻塞重大。

1. 继承体系

java.util.concurrent.SynchronousQueue

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable

2. 数据结构

双重栈(Dual stack)或 双重队列(Dual Queue)。

定义了外部类 Transferer,作为栈和队列实现的公共 API。

// 传输器,即两个线程替换元素应用的货色。提供两种实现形式:队列、栈
private transient volatile Transferer<E> transferer;

3. 构造函数

如果是偏心模式就应用队列,如果是非偏心模式就应用栈,默认应用栈(非偏心)。

/**
 * Creates a {@code SynchronousQueue} with nonfair access policy.
 */
public SynchronousQueue() {this(false);
}

/**
 * Creates a {@code SynchronousQueue} with the specified fairness policy.
 *
 * @param fair if true, waiting threads contend in FIFO order for
 *        access; otherwise the order is unspecified.
 */
public SynchronousQueue(boolean fair) {transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();}

4. 属性

/** The number of CPUs, for spin control */
static final int NCPUS = Runtime.getRuntime().availableProcessors();

// 指定超时工夫状况下,以后线程最大自旋次数。// CPU 小于 2,不须要自旋,否则自旋 32 次。static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32; 

// 未指定超时工夫状况下,以后线程最大自旋次数。// 16 倍
static final int maxUntimedSpins = maxTimedSpins * 16; 

// 指定超时工夫状况下,若自旋完结后如果剩余时间大于该值,则阻塞相应工夫
static final long spinForTimeoutThreshold = 1000L;

5. 容量

队列容量固定为 0。

/**
 * Always returns {@code true}.
 * A {@code SynchronousQueue} has no internal capacity.
 *
 * @return {@code true}
 */
public boolean isEmpty() {return true;}

/**
 * Always returns zero.
 * A {@code SynchronousQueue} has no internal capacity.
 *
 * @return zero
 */
public int size() {return 0;}

6. 存入取出操作

SynchronousQueue 中的传输器定义了公共办法 Transferer#transfer:

java.util.concurrent.SynchronousQueue.Transferer

/**
 * Shared internal API for dual stacks and queues.
 */
abstract static class Transferer<E> {
    /**
     * Performs a put or take.
     *
     * @param e if non-null, the item to be handed to a consumer;   // 传输的数据元素。非空示意须要向消费者传递数据,为空示意须要向生产者申请数据
     *          if null, requests that transfer return an item
     *          offered by producer.
     * @param timed if this operation should timeout                // 该操作是否会超时
     * @param nanos the timeout, in nanoseconds
     * @return if non-null, the item provided or received; if null, // 非空示意数据元素传递或接管胜利,为空示意失败
     *         the operation failed due to timeout or interrupt --  // 失败的起因有两种:1. 超时;2. 中断,通过 Thread.interrupted 来检测中断
     *         the caller can distinguish which of these occurred
     *         by checking Thread.interrupted.
     */
    abstract E transfer(E e, boolean timed, long nanos);
}

SynchronousQueue 因为继承了 BlockingQueue,遵循办法约定:

     抛出异样    非凡值     阻塞     超时
插入  add(e)     offer(e)  put(e)   offer(e, time, unit)
移除  remove()   poll()    take()   poll(time, unit)
查看  element()  peek()    不可用   不可用 

底层都是通过调用 Transferer#transferer 来实现。

add(e)                 transferer.transfer(e, true, 0)
offer(e)               transferer.transfer(e, true, 0)
put(e)                 transferer.transfer(e, false, 0)
offer(e, time, unit)   transferer.transfer(e, true, unit.toNanos(timeout))

remove()               transferer.transfer(null, true, 0)
poll()                 transferer.transfer(null, true, 0)
take()                 transferer.transfer(null, false, 0)
poll(time, unit)       transferer.transfer(null, true, unit.toNanos(timeout))

其中 put 和 take 都是设置为不超时,示意始终阻塞直到实现。

7. 栈实现

栈定义

双重栈(Dual stack),指的是栈中的节点具备两种模式。
TransferStack 进行了扩大,其节点具备三种模式:申请模式、数据模式、匹配模式。

java.util.concurrent.SynchronousQueue.TransferStack

/** Dual stack */
static final class TransferStack<E> extends Transferer<E> {

    /* Modes for SNodes, ORed together in node fields */
    /** Node represents an unfulfilled consumer */
    static final int REQUEST    = 0; // 申请模式,消费者申请数据
    /** Node represents an unfulfilled producer */
    static final int DATA       = 1; // 数据模式,生产者提供数据
    /** Node is fulfilling another unfulfilled DATA or REQUEST */
    static final int FULFILLING = 2; // 匹配模式,示意数据从正一个节点传递给另外的节点
    
    /** Returns true if m has fulfilling bit set. */
    static boolean isFulfilling(int m) {return (m & FULFILLING) != 0; }
    
    /** The head (top) of the stack */
    volatile SNode head;
    
    boolean casHead(SNode h, SNode nh) {
        return h == head &&
            UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
    }

    /**
     * Creates or resets fields of a node. Called only from transfer
     * where the node to push on stack is lazily created and
     * reused when possible to help reduce intervals between reads
     * and CASes of head and to avoid surges of garbage when CASes
     * to push nodes fail due to contention.
     */
    static SNode snode(SNode s, Object e, SNode next, int mode) {if (s == null) s = new SNode(e);
        s.mode = mode;
        s.next = next;
        return s;
    }
}

节点定义

  1. 应用 mode 标记该节点的模式。
  2. 以后节点匹配胜利,则 match 设置为所匹配的节点。
  3. 以后节点勾销匹配,则 match 设置为本身。
  4. 应用 waiter 存储操作该节点的线程,期待匹配时挂起该线程,匹配胜利时需唤醒该线程。

java.util.concurrent.SynchronousQueue.TransferStack.SNode

/** Node class for TransferStacks. */
static final class SNode {
    volatile SNode next;        // next node in stack         // 栈中下一个节点
    volatile SNode match;       // the node matched to this   // 以后节点所匹配的节点
    volatile Thread waiter;     // to control park/unpark     // 期待着的线程
    Object item;                // data; or null for REQUESTs // 数据元素
    int mode;                   // 节点的模式:REQUEST、DATA、FULFILLING
    // Note: item and mode fields don't need to be volatile
    // since they are always written before, and read after,
    // other volatile/atomic operations.

    SNode(Object item) {this.item = item;}

    // 若下一个节点为 cmp,将其替换为节点 val
    boolean casNext(SNode cmp, SNode val) {
        return cmp == next &&
            UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    /**
     * Tries to match node s to this node, if so, waking up thread.
     * Fulfillers call tryMatch to identify their waiters.
     * Waiters block until they have been matched.
     *
     * @param s the node to match
     * @return true if successfully matched to s
     */
    // m.tryMatch(s),示意节点 m 尝试与节点 s 进行匹配。留神入参节点 s 由以后线程所持有,而节点 m 的线程是阻塞状态的(期待匹配)boolean tryMatch(SNode s) {
        if (match == null &&
            UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { // 如果 m 还没有匹配者,就把 s 作为它的匹配者,设置 m.match 为 s
            Thread w = waiter;  // 节点 m 的线程
            if (w != null) {    // waiters need at most one unpark
                waiter = null;
                LockSupport.unpark(w); // 唤醒 m 中的线程,两者匹配结束
                // 补充:节点 m 的线程从 TransferStack#awaitFulfill 办法中被唤醒,查看得悉被匹配胜利了,返回匹配的节点
            }
            return true;
        }
        return match == s; // 可能其它线程先一步匹配了 m,判断是否是 s
    }
    
    /**
     * Tries to cancel a wait by matching node to itself.
     */
    // 将节点的 match 属性设为本身,示意勾销
    void tryCancel() {UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
    }

    boolean isCancelled() {return match == this;}
}    

transfer

根本思维,在循环中尝试三种操作:

  1. 以后栈内为空,或者栈顶节点模式与以后节点模式统一,尝试着把以后节点入栈并且期待匹配。若匹配胜利,返回该节点的数据。若勾销匹配,则返回空。
  2. 如果栈顶节点模式与以后节点模式互补,尝试把以后节点入栈进行匹配,再把匹配的两个节点弹出栈。
  3. 如果栈顶节点的模式为匹配中,则帮助匹配和弹出。

java.util.concurrent.SynchronousQueue.TransferStack#transfer

/**
 * Puts or takes an item.
 */
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
    SNode s = null; // constructed/reused as needed
    int mode = (e == null) ? REQUEST : DATA; // 元素为空,为申请模式,否则为数据模式(留神,以后节点入栈前不会是匹配模式!)for (;;) { // 自旋 CAS
        SNode h = head;
        
        // 模式雷同
        if (h == null || h.mode == mode) {  // empty or same-mode // 栈顶没有元素,或者栈顶元素跟以后元素是一个模式的(申请模式或数据模式)if (timed && nanos <= 0) {      // can't wait // 已超时
                if (h != null && h.isCancelled()) // 头节点 h 不为空且已勾销
                    casHead(h, h.next);     // pop cancelled node // 把 h.next 作为新的头节点 // 把曾经勾销的头节点弹出,并进入下一次循环
                else
                    return null;
            } else if (casHead(h, s = snode(s, e, h, mode))) { // 把节点 s 入栈,作为新的头节点,s.next 指向 h(因为是模式雷同的,所以只能入栈)SNode m = awaitFulfill(s, timed, nanos); // 期待匹配,自旋阻塞以后线程
                if (m == s) {               // wait was cancelled // 期待被勾销了,须要革除节点 s
                    clean(s);               // 出栈
                    return null;
                }
                if ((h = head) != null && h.next == s) // 执行到这里,阐明节点 s 已匹配胜利
                    // 若栈顶有数据,头节点为 h,下一个节点为 s,须要将节点 h 和 s 出栈
                    // 因为是期待匹配,这里是等到了其余线程入栈新的节点,跟以后节点 s 匹配了
                    casHead(h, s.next);     // help s's fulfiller // 把 s.next 作为新的头节点
                return (E) ((mode == REQUEST) ? m.item : s.item); // 若以后为申请模式,返回匹配到的节点 m 的数据;否则返回节点 s 的数据
            }
        } 
        
        // 模式互补(执行到这里,阐明栈顶元素跟以后元素的模式不同)else if (!isFulfilling(h.mode)) {   // try to fulfill // 这里判断栈顶模式不是 FULFILLING(匹配中),阐明是互补的
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry // 弹出已勾销的栈顶元素,并重试
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { // 把节点 s 入栈,并设为匹配模式(FULFILLING|mode 的后果合乎 TransferStack#isFulfilling)for (;;) { // loop until matched or waiters disappear
                    SNode m = s.next;       // m is s's match
                    if (m == null) {        // all waiters are gone 
                        // 如果 m 为 null,阐明除了 s 节点外的节点都被其它线程先一步匹配掉了,就清空栈并跳出外部循环,到内部循环再从新入栈判断
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                    if (m.tryMatch(s)) {    // 如果 m 和 s 尝试匹配胜利,就弹出栈顶的两个元素 s 和 m
                        casHead(s, mn);     // pop both s and m
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match  // m 和 s 匹配失败,阐明 m 曾经先一步被其它线程匹配了,须要革除
                        s.casNext(m, mn);   // help unlink // 若 s 的下一个节点为 m,则将 m 换成 mn
                }
            }
        } 
        
        // 模式互补(执行到这里,阐明栈顶元素跟以后元素的模式不同,且栈顶是匹配模式,阐明栈顶和栈顶上面的节点正在产生匹配,以后申请须要做帮助工作)else {                              // help a fulfiller
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone // 节点 m 是 h 匹配的节点,然而 m 为空,阐明 m 曾经被其它线程先一步匹配了
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))          // help match // 帮助匹配,如果节点 m 和 h 匹配,则弹出 h 和 m
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink // 如果节点 m 和 h 不匹配,则将 h 的下一个节点换为 mn(即 m 曾经被其余线程匹配了,须要革除它)}
        }
    }
}

留神:

  1. TransferStack#casHead 能够用于入栈,也能够用于出栈。
  2. mode = FULFILLING|mode 失去的后果满足 TransferStack#isFulfilling,阐明是匹配模式。
  3. TransferStack#awaitFulfill 是被动期待匹配
  4. TransferStack.SNode#tryMatch 是被动进行匹配
  5. 节点从阻塞中被其余线程唤醒时,个别是位于栈的第二个节点,此时栈的头节点是新入栈的与之匹配的节点。

awaitFulfill

节点 s 自旋或阻塞,直到被其余节点匹配。

java.util.concurrent.SynchronousQueue.TransferStack#awaitFulfill

/**
 * Spins/blocks until node s is matched by a fulfill operation.
 *
 * @param s the waiting node
 * @param timed true if timed wait
 * @param nanos timeout value
 * @return matched node, or s if cancelled
 */
SNode awaitFulfill(SNode s, boolean timed, long nanos) { // 节点 s 自旋或阻塞,直到被其余节点匹配
    final long deadline = timed ? System.nanoTime() + nanos : 0L; // 到期工夫
    Thread w = Thread.currentThread(); // 以后线程
    int spins = (shouldSpin(s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0); // 自旋次数(自旋完结后再判断是否须要阻塞)for (;;) {if (w.isInterrupted()) // 以后线程中断了,尝试勾销节点 s
            s.tryCancel(); // 勾销操作,设置 s.match = s
        SNode m = s.match; // 查看节点 s 是否匹配到了节点 m(由其它线程的 m 匹配到以后线程的 s,或者 s 已勾销)if (m != null)
            return m; // 如果匹配到了,间接返回 m
        if (timed) {nanos = deadline - System.nanoTime();
            if (nanos <= 0L) { // 已超时,尝试勾销节点 s
                s.tryCancel();
                continue;
            }
        }
        if (spins > 0)
            spins = shouldSpin(s) ? (spins-1) : 0; // 如果 spins 大于 0,则始终自减直到为 0,才会执行去 elseif 的逻辑
        else if (s.waiter == null)
            s.waiter = w; // establish waiter so can park next iter // 如果 s 的 waiter 为 null,把以后线程注入进去,并进入下一次自旋
        else if (!timed)
            LockSupport.park(this); // 如果无设置超时,则阻塞期待被其它线程唤醒,唤醒后持续自旋并查看是否匹配到了元素
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos); // 如果设置超时且还有剩余时间,就阻塞相应工夫
    }
}

在自旋过程中,执行以下逻辑:

  1. 查看以后线程是否已中断,是则勾销以后节点。
  2. 查看以后线程是否已匹配胜利,是则返回匹配的节点。
  3. 自旋完结,若容许超时,判断是否达到超时工夫,未达到则阻塞剩余时间。
  4. 自旋完结,若不容许超时,则阻塞期待唤醒。

期待匹配的过程中,首先进行自旋再判断是否进入阻塞,是因为线程的阻塞和唤醒操作波及到零碎内核态与用户态之间的切换,比拟耗时。

8. 队列实现

队列定义

双重队列(Dual Queue),指的是队列中的节点具备两种模式。
TransferStack 进行了扩大,队列中大部分节点具备两种模式:申请节点、数据节点。
队列的头节点不属于这两种模式,而是空节点(dummy node)。

java.util.concurrent.SynchronousQueue.TransferQueue

/** Dual Queue */
static final class TransferQueue<E> extends Transferer<E> { // 队列实现

    /** Head of queue */
    transient volatile QNode head; // 头节点
    /** Tail of queue */
    transient volatile QNode tail; // 尾节点
    /**
     * Reference to a cancelled node that might not yet have been
     * unlinked from queue because it was the last inserted node
     * when it was cancelled.
     */
    transient volatile QNode cleanMe;

    TransferQueue() {QNode h = new QNode(null, false); // initialize to dummy node. // 队列的头节点,是一个空节点
        head = h;
        tail = h;
    }

    /**
     * Tries to cas nh as new head; if successful, unlink
     * old head's next node to avoid garbage retention.
     */
    // CAS 将头节点(TransferQueue#head 属性的值)从节点 h 改为节点 nh,再将 h 出队
    void advanceHead(QNode h, QNode nh) {
        if (h == head &&
            UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
            h.next = h; // forget old next // 将节点 h 的 next 指针设为本身,示意 h 出队,不便垃圾回收
    }

    /**
     * Tries to cas nt as new tail.
     */
    void advanceTail(QNode t, QNode nt) {if (tail == t)
            UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
    }
}    

节点定义

  1. 应用 isData 标记该节点的模式。
  2. 以后节点匹配胜利,则 item 设置为所匹配的节点的数据。
  3. 以后节点勾销匹配,则 item 设置为本身。
  4. 以后节点出队,则 next 指向本身。
  5. 应用 waiter 存储操作该节点的线程,期待匹配时挂起该线程,匹配胜利时需唤醒该线程。

java.util.concurrent.SynchronousQueue.TransferQueue.QNode

/** Node class for TransferQueue. */
static final class QNode {
    volatile QNode next;          // next node in queue     // 队列的下一个节点
    volatile Object item;         // CAS'ed to or from null // 数据元素
    volatile Thread waiter;       // to control park/unpark // 期待着的线程
    final boolean isData;         // true 示意为 DATA 类型,false 示意为 REQUEST 类型

    QNode(Object item, boolean isData) {
        this.item = item;
        this.isData = isData;
    }

    boolean casNext(QNode cmp, QNode val) {
        return next == cmp &&
            UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    boolean casItem(Object cmp, Object val) {
        return item == cmp &&
            UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    /**
     * Tries to cancel by CAS'ing ref to this as item.
     */
    void tryCancel(Object cmp) {UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
    }

    boolean isCancelled() {return item == this;}

    /**
     * Returns true if this node is known to be off the queue 
     * because its next pointer has been forgotten due to
     * an advanceHead operation.
     */
    // 如果节点曾经出队了,则返回 true。由 TransferQueue#advanceHead 操作将 next 指向本身。boolean isOffList() {return next == this;}
}

transfer

根本思维,在循环中尝试两种操作:

  1. 如果队列为空,或者尾节点模式与以后节点模式雷同,则入队(尾部)并期待匹配。
  2. 如果队列中有期待中的节点,且首个非空节点与以后节点的模式互补,则匹配并出队(头部)。
/**
 * Puts or takes an item.
 */
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {

    QNode s = null; // constructed/reused as needed
    boolean isData = (e != null);

    for (;;) {
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null)         // saw uninitialized value
            continue;                       // spin

        if (h == t || t.isData == isData) { // empty or same-mode // 队列为空,或者队尾节点模式与以后节点模式雷同,只能入队
            QNode tn = t.next;
            if (t != tail)                  // inconsistent read // 尾节点已过期,需从新获取尾节点
                continue;
            if (tn != null) {               // lagging tail // 尾节点已过期,需从新获取尾节点
                advanceTail(t, tn);
                continue;
            }
            if (timed && nanos <= 0)        // can't wait // 已超时
                return null;
            if (s == null)
                s = new QNode(e, isData);   // 以后节点设为 s
            if (!t.casNext(null, s))        // failed to link in // 把以后节点 s 插入节点 t 之后,若失败则重试
                continue;

            advanceTail(t, s);              // swing tail and wait // 将节点 s 设为新的尾节点
            Object x = awaitFulfill(s, e, timed, nanos); // 节点 s 期待匹配
            if (x == s) {                   // wait was cancelled // 阐明以后节点 s 已勾销,须要出队
                clean(t, s);
                return null;
            }

            if (!s.isOffList()) {           // not already unlinked // 执行到这里,阐明节点 s 匹配胜利。如果 s 尚未出队,则进入以下逻辑
                advanceHead(t, s);          // unlink if head 
                // 已知 t 是 s 的上一个节点,这里判断如果 t 是头节点,则把 t 出队,把节点 s 作为新的头节点
                // 上面操作进一步把 s 设为空节点,即 dummy node
                if (x != null)              // and forget fields
                    s.item = s;             // 把节点 s 的数据域设为本身,示意已勾销,不再匹配
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;  // REQUEST 类型,返回匹配到的数据 x,否则返回 e

        } else {                            // complementary-mode // 互补
            QNode m = h.next;               // node to fulfill // 首个非空节点,将它与申请节点匹配
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            if (isData == (x != null) ||    // m already fulfilled // 并不是互补的,阐明 m 曾经被先一步匹配了
                x == m ||                   // m cancelled // m 曾经勾销
                !m.casItem(x, e)) {         // lost CAS // 若 CAS 失败阐明 m 数据元素不为 x,即节点 m 曾经被匹配了。若 CAS 胜利则阐明匹配胜利。advanceHead(h, m);          // dequeue and retry // 把头节点出队,把节点 m 作为新的头节点(dummy node),持续重试
                continue;
            }

            advanceHead(h, m);              // successfully fulfilled // 匹配胜利,把节点 m 作为新的头节点(dummy node)LockSupport.unpark(m.waiter);   // 唤醒 m 上的线程
            return (x != null) ? (E)x : e;
        }
    }
}

留神:

  1. 入队的时候,总是先获取最新的尾节点,阐明 tail 是精确的(比照其余并发队列)。
  2. 出队的时候,总是先获取最新的头节点,阐明 head 是精确的(比照其余并发队列)。
  3. TransferQueue#awaitFulfill 是被动期待匹配。
  4. m.casItem(x, e) 是被动进行匹配,这里 m 为队首第一个非空节点,x 为 m.item,e 为入参数据元素。
  5. 依据 FIFO 规定,当阻塞期待中的节点被唤醒时,表明该节点要么已超时,要么已从队尾排到了队头且匹配胜利。
  6. 因为头节点是空节点(dummy node),从队头进行匹配时,不是比拟 head 节点,而是比拟 head.next 节点。
  7. 从队头匹配胜利时,以后节点并不会入队,而是把旧的头节点出队,把匹配的节点设为新的头节点(dummy node)。

awaitFulfill

节点 s 自旋或阻塞,直到被其余节点匹配。

java.util.concurrent.SynchronousQueue.TransferQueue#awaitFulfill

/**
 * Spins/blocks until node s is fulfilled.
 *
 * @param s the waiting node
 * @param e the comparison value for checking match
 * @param timed true if timed wait
 * @param nanos timeout value
 * @return matched item, or s if cancelled
 */
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { // 节点 s 期待匹配,其数据元素为 e
    /* Same idea as TransferStack.awaitFulfill */
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    int spins = ((head.next == s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {if (w.isInterrupted())
            s.tryCancel(e);
        Object x = s.item; // 查看节点 s 是否匹配到了数据 x,留神这里是跟栈构造不一样的中央!if (x != e)
            return x;
        if (timed) {nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {s.tryCancel(e);
                continue;
            }
        }
        if (spins > 0)
            --spins;
        else if (s.waiter == null)
            s.waiter = w;
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

除了检测是否被匹配胜利和勾销节点的形式,与 TransferStack#awaitFulfill 不同之外,整体逻辑是一样的。


作者:Sumkor
链接:https://segmentfault.com/a/11…

正文完
 0