JDK源码那些事儿之SynchronousQueue下篇

37次阅读

共计 9700 个字符,预计需要花费 25 分钟才能阅读完成。

之前一篇文章已经讲解了阻塞队列 SynchronousQueue 的大部分内容,其中默认的非公平策略还未说明,本文就紧接上文继续讲解其中的非公平策略下的内部实现,顺便简单说明其涉及到的线程池部分的使用

前言

回顾一下,SynchronousQueue 通过两个内部类实现了公平策略和非公平策略的无缓存阻塞队列,每种操作都需要对应的互补操作同时进行才能完成,例如,入队操作必然对应出队操作,在不涉及超时和中断的情况下,必须等待另一个线程进行出队操作,两两匹配才能执行,否则就阻塞等待

之前已经对公平策略下的内部类实现 TransferQueue 做了详细的说明,今天就非公平策略下的内部实现类 TransferStack 进行说明

TransferStack

不同于公平策略下的操作,只有一种状态需要注意:

  • 取消操作(被外部中断或者超时):match == this;

SNode

SNode 基于栈的节点实现,变量与 QNode 有些不同,其中 match 在两个操作匹配上之后可以通过这个变量找到其匹配的节点,节点类型 mode 在使用上也有所不同,下面使用到时会进行说明,其他参数可参考 TransferQueue 的 QNode 说明

    static final class SNode {
        // next 指向栈中下一个元素
        volatile SNode next;        // next node in stack
        // 和当前节点匹配的节点
        volatile SNode match;       // the node matched to this
        // 等待线程
        volatile Thread waiter;     // to control park/unpark
        // 节点内容
        Object item;                // data; or null for REQUESTs
        // 节点类型
        int mode;
        // Note: item and mode fields don't need to be volatile
        // since they are always written before, and read after,
        // other volatile/atomic operations.
        
        SNode(Object item) {this.item = item;}
        
        // CAS 更新 next 字段
        boolean casNext(SNode cmp, SNode val) {
            return cmp == next &&
                UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        /**
         * Tries to match node s to this node, if so, waking up thread.
         * Fulfillers call tryMatch to identify their waiters.
         * Waiters block until they have been matched.
         *
         * @param s the node to match
         * @return true if successfully matched to s
         */
        // 尝试 s 节点与当前节点进行匹配,成功则唤醒等待线程继续执行
        // 在使用到时才能理解,同时可参考我举例上的图示说明部分
        boolean tryMatch(SNode s) {
            // match == null 表明当前节点未被其他节点匹配上
            // cas 更新 match 字段为 s
            if (match == null &&
                UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                Thread w = waiter;
                // 当前节点等待线程未被其他线程操作
                if (w != null) {    // waiters need at most one unpark
                    // 唤醒等待线程同时将 waiter 置空
                    waiter = null;
                    LockSupport.unpark(w);
                }
                return true;
            }
            // 判断当前节点是否已与 s 进行匹配
            return match == s;
        }

        /**
         * Tries to cancel a wait by matching node to itself.
         */
        // 尝试取消操作 将 match 置为 this
        void tryCancel() {UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
        }
        
        // 判断 tryCancel 是否操作成功
        boolean isCancelled() {return match == this;}
        
        // 获取 match 和 next 在对象中的偏移量
        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long matchOffset;
        private static final long nextOffset;

        static {
            try {UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = SNode.class;
                matchOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("match"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {throw new Error(e);
            }
        }
    }

变量

变量部分需要注意的就在于这 3 种类型,其中 FULFILLING 需要注意的是这个变量不是直接进行使用的,而是与其他两种操作进行位操作时使用

为什么是 2 呢?因为 2 在二进制中表示 10,在高位上有个 1,而 REQUEST 与 DATA 的 0 和 1 二进制只在最低位上用二进制 0 和 1 表示即可,我们可以通过 FULFILLING 与其他两种类型位操作使得高位不同来判断节点是否已经被其他节点匹配互补上,此时还能通过最低位判断出此节点是什么操作,当然,由于是栈结构,主要在栈顶元素,这里通过高位的不同来判断出是去匹配节点操作还是帮助匹配的两个节点进行一些操作,在后面要说明的 transfer 部分你会看到有 3 个条件分支执行,第 3 个即为帮助已经确定匹配的两个节点进行一些操作以便尽快完成出栈让自己继续执行匹配操作

    /** Node represents an unfulfilled consumer */
    // 数据请求操作 如 take 操作 代表未被匹配上的消费者
    static final int REQUEST    = 0;
    /** Node represents an unfulfilled producer */
    // 数据保存操作 如 put 操作 代表未被匹配上的生产者
    static final int DATA       = 1;
    /** Node is fulfilling another unfulfilled DATA or REQUEST */
    // 有节点与其匹配,相当于已经有互补操作,使用上不是直接使用,可参考后面的源码部分
    static final int FULFILLING = 2;
    
    /** The head (top) of the stack */
    // 栈顶指针
    volatile SNode head;
    
    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long headOffset;
    static {
        try {UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = TransferStack.class;
            headOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("head"));
        } catch (Exception e) {throw new Error(e);
        }
    }

casHead

CAS 更新栈顶指针,比较简单

    boolean casHead(SNode h, SNode nh) {
        return h == head &&
            UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
    }

isFulfilling

判断 m 对应的节点是否已经被匹配,和 FULFILLING 进行位与操作,判断 m 对应的栈节点处于 FULFILLING 状态,即已经匹配上了,在 transfer 里与栈顶节点非相同操作时会入栈一个节点,此节点的 mode 和普通节点不一样,会通过 FULFILLING|mode 操作更新 mode,故这里最低位来区分是保存数据还是请求数据,高位来区分此节点是否是已经找到匹配节点的节点,当然,只在此次操作中使用,具体参见下面方法的说明

        /** Returns true if m has fulfilling bit set. */
        static boolean isFulfilling(int m) {return (m & FULFILLING) != 0; }

snode

创建或重置 SNode 节点,如果为空则创建新的 SNode 节点,不为空则重置节点的 mode 和 next 属性

    /**
     * Creates or resets fields of a node. Called only from transfer
     * where the node to push on stack is lazily created and
     * reused when possible to help reduce intervals between reads
     * and CASes of head and to avoid surges of garbage when CASes
     * to push nodes fail due to contention.
     */
    static SNode snode(SNode s, Object e, SNode next, int mode) {if (s == null) s = new SNode(e);
        s.mode = mode;
        s.next = next;
        return s;
    }

transfer

类似于在公平模式下的 TransferQueue.transfer,入队和出队操作,统一使用一个方法,即实现接口中的 transfer 方法来完成,需要明白的是保存的是每次操作这个动作,当然,与 TransferQueue.transfer 有所不同的在于这里有 3 个条件分支,按顺序含义如下:

  • 栈为空或栈顶元素操作类型和当前操作类型相同,入栈阻塞等待
  • 栈顶非匹配互补节点(匹配互补节点:已经和其他节点匹配上了,mode 值高位为 1),进行匹配操作
  • 帮助已经匹配的栈顶节点操作

需要注意的就是上面多次提醒的 mode 变量部分,需要好好理解

    /**
     * Puts or takes an item.
     */
    @SuppressWarnings("unchecked")
    E transfer(E e, boolean timed, long nanos) {
        
        SNode s = null; // constructed/reused as needed
        // 节点类型,是 put 还是 take 操作,即是保存数据还是请求数据
        int mode = (e == null) ? REQUEST : DATA;

        for (;;) {
            // 获取栈顶指针
            SNode h = head;
            // 栈为空
            // 或栈顶节点和当前操作节点为相同操作
            if (h == null || h.mode == mode) {  // empty or same-mode
                // 设置超时时间且超时时间小于等于 0
                if (timed && nanos <= 0) {      // can't wait
                    if (h != null && h.isCancelled())
                        // 栈顶非空且栈顶节点为取消操作状态
                        // 出栈,尝试将栈顶节点更新
                        casHead(h, h.next);     // pop cancelled node
                    else
                        return null;
                // 创建节点,尝试更新栈顶节点    
                } else if (casHead(h, s = snode(s, e, h, mode))) {
                    // 通过 awaitFulfill 方法自旋阻塞找到匹配操作的节点, 这个下面进行说明
                    // 可以类比公平模式下的 awaitFulfill
                    SNode m = awaitFulfill(s, timed, nanos);
                    // 取消或超时
                    if (m == s) {               // wait was cancelled
                        // 清理节点,取消本次操作
                        clean(s);
                        return null;
                    }
                    // 栈顶节点更新为 s 的 next 元素
                    // 执行到这一步时应该是栈顶两个节点进行了匹配
                    // 出栈栈顶 2 个节点元素,帮助更新栈顶元素为第三个节点元素即为 s.next
                    // 当然,也可能另一个栈顶节点线程帮助更新了
                    if ((h = head) != null && h.next == s)
                        casHead(h, s.next);     // help s's fulfiller
                    // 判断下,如果当前是请求数据,即 take 操作,返回 m.item 值,即返回匹配节点的 item
                    // 当前是保存数据,即 put 操作,返回 s.item    
                    return (E) ((mode == REQUEST) ? m.item : s.item);
                }
            // 与栈顶节点非相同操作,栈顶元素非匹配互补节点
            } else if (!isFulfilling(h.mode)) { // try to fulfill
                // 栈顶元素处于取消操作状态
                if (h.isCancelled())            // already cancelled
                    // 尝试出栈更新栈顶元素
                    casHead(h, h.next);         // pop and retry
                // 入栈新创建的节点,同时 FULFILLING|mode 位与操作 
                // s 的 mode 为 10 或者 11 
                else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                    // 进入这里表明 s 已经为栈顶节点,而且 s.next 是其匹配节点
                    // 循环直到匹配上
                    for (;;) { // loop until matched or waiters disappear
                        SNode m = s.next;       // m is s's match
                        // 空则可能被其他线程匹配上了则更新头节点为 null,重新进入外层循环
                        if (m == null) {        // all waiters are gone
                            casHead(s, null);   // pop fulfill node
                            // 这里 s 节点需置空,因为比较特殊,mode 不同于普通节点
                            // 重新循环时根据情况重新创建节点
                            s = null;           // use new node next time
                            break;              // restart main loop
                        }
                        // 
                        SNode mn = m.next;
                        // 尝试 m 与 s 进行匹配,实际上是更新 m 节点的 match 为 s, 同时唤醒 m 的等待线程
                        if (m.tryMatch(s)) {
                            // 成功则出栈栈顶两个元素,即更新栈顶节点
                            casHead(s, mn);     // pop both s and m
                            return (E) ((mode == REQUEST) ? m.item : s.item);
                        } else                  // lost match
                            // 未匹配上,可能被其他节点匹配上了,尝试更新 s 的 next 指针,再继续匹配
                            s.casNext(m, mn);   // help unlink
                    }
                }
            // 不满足上边两个条件,即此时栈顶为匹配节点,还未匹配完成,这里帮忙完成匹配出栈操作
            // 注意,这里只是帮助更新 head 和 next 并不做其他操作,参考上面方法的处理
            } else {                            // help a fulfiller
                SNode m = h.next;               // m is h's match
                if (m == null)                  // waiter is gone
                    casHead(h, null);           // pop fulfilling node
                else {
                    SNode mn = m.next;
                    if (m.tryMatch(h))          // help match
                        casHead(h, mn);         // pop both h and m
                    else                        // lost match
                        h.casNext(m, mn);       // help unlink
                }
            }
        }
    }

awaitFulfill

与 TransferQueue.awaitFulfill 类似,在当前操作同之前操作相同时,未设置操作时间同时未被外部线程中断则需阻塞等待匹配节点唤醒当前阻塞的线程,整体上非常相似,由于 match 的存在使得判断对应的匹配节点要比 TransferQueue.awaitFulfill 简单许多

    /**
     * Spins/blocks until node s is matched by a fulfill operation.
     *
     * @param s the waiting node
     * @param timed true if timed wait
     * @param nanos timeout value
     * @return matched node, or s if cancelled
     */
    SNode awaitFulfill(SNode s, boolean timed, long nanos) {
        // 获取超时时间点
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        // 当前线程
        Thread w = Thread.currentThread();
        // shouldSpin 判断是否需要进行自旋,下一个方法进行说明
        int spins = (shouldSpin(s) ?
                     (timed ? maxTimedSpins : maxUntimedSpins) : 0);            
        for (;;) {
            // 判断当前线程是否中断,外部中断操作,相当于取消本次操作
            if (w.isInterrupted())
                // 尝试将 s 节点的 match 设置为 s 自己,这样判断的时候就知道这个节点是被取消的
                s.tryCancel();
            SNode m = s.match;
            // match 非空则表示当前节点已经被匹配 match 匹配上
            if (m != null)
                return m;
            // 超时配置处理
            if (timed) {nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {s.tryCancel();
                    continue;
                }
            }
            // 自旋 spins
            if (spins > 0)
                spins = shouldSpin(s) ? (spins-1) : 0;
            // 设置等待线程
            else if (s.waiter == null)
                s.waiter = w; // establish waiter so can park next iter
            // 未设置超时,直接阻塞
            else if (!timed)
                LockSupport.park(this);
            // 设置超时时间阻塞
            else if (nanos > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanos);
        }
    }

shouldSpin

判断是否需要自旋操作,满足下列情况之一即需要自旋:

  • 栈顶节点等于 s 节点
  • 栈顶节点为空
  • 栈顶节点为已和其他节点匹配的节点(mode = FULFILLING|mode)
    /**
     * Returns true if node s is at head or there is an active
     * fulfiller.
     */
    boolean shouldSpin(SNode s) {
        SNode h = head;
        return (h == s || h == null || isFulfilling(h.mode));
    }

clean

清理操作,清理栈节点 s 的关联关系,同时会清理整个栈节点的取消操作节点,无 cleanMe 节点,比 TransferQueue.clean 操作要简单许多

    /**
     * Unlinks s from the stack.
     */
    void clean(SNode s) {
        // item,waiter 置空
        s.item = null;   // forget item
        s.waiter = null; // forget thread
        
        // s 的下一个节点处于取消操作状态,则 past 指向 past 的下一个节点
        SNode past = s.next;
        if (past != null && past.isCancelled())
            past = past.next;

        // Absorb cancelled nodes at head
        // 头节点被取消操作则进行将 next 节点更新为头节点
        SNode p;
        while ((p = head) != null && p != past && p.isCancelled())
            casHead(p, p.next);

        // Unsplice embedded nodes
        // 头节点调整完毕,现在将栈节点中每个节点都会进行检查一遍,更新前后节点的关系,将取消操作的节点进行排除
        while (p != null && p != past) {
            SNode n = p.next;
            if (n != null && n.isCancelled())
                p.casNext(n, n.next);
            else
                p = n;
        }
    }

举例说明

参考公平模式下的代码,通过下列最简单的示例进行说明,一个线程 take 操作,一个线程 put 操作,画图进行说明

public class SynchronousQueueTest {public static void main(String[] args) {BlockingQueue<String> sq = new SynchronousQueue<>();
        new Thread(() -> {
            try {System.out.println(sq.take());
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }).start();
        new Thread(() -> {
            try {sq.put("test");
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }).start();}
}

1. 创建非公平策略下的 SynchronousQueue,new TransferStack<E>() 无参构造方法默认,变量上没有进行任何操作

2. 一线程执行 take 操作,以先执行 take 的线程为例子进行说明,此时另一线程 put 操作还未执行,take 操作阻塞等待

3. 另一线程执行 put 操作,通过!isFulfilling 判断出当前栈顶未与其他节点匹配,则其尝试与栈顶节点匹配,成功则唤醒之前阻塞等待的 take 操作,同时处理完成

最终执行 return (E) ((mode == REQUEST) ? m.item : s.item),获取操作结果,当然,其中还有一个条件分支可以帮助匹配互补更新操作,这部分自行读者可自行画图理解

线程池的使用

线程池使用 Executors.newCachedThreadPool() 方法创建可缓冲线程池,这里看下源码实现:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

核心线程设置为 0,最大线程池设置 Integer.MAX_VALUE,存活时间 60s,阻塞队列使用 SynchronousQueue,默认非公平模式,可缓冲线程池通过复用空闲线程提高效率,当然,如果我们使用这种方式创建线程池可能会带来一些问题

这会造成什么问题呢?

这里最大线程数设置为 Integer.MAX_VALUE,可能会创建非常多的线程,甚至导致 OOM,所以阿里规范中提及了这部分内容,指出了其中存在的隐患,需要规避资源耗尽的风险,开发人员应直接使用 ThreadPoolExecutor 来创建线程池,每个参数需要根据自己的需求进行设置

总结

至此,SynchronousQueue 的非公平策略的内部实现也已讲解完毕,非公平策略下要注意其对于 mode 部分状态的处理,通过高位和低位分别区分是否已匹配和是什么类型的操作(生产者还是消费者),理解了这部分,对于非公平模式下的整体操作流程也能很快熟悉,相对来说不是十分复杂,多画图观察代码执行过程能帮助更好的理解

SynchronousQueue 作为一个无数据缓冲的阻塞队列,其内部通过两个内部类(队列和栈)分别实现了公平策略和非公平策略下的队列操作,其实我们需要记住的在于其操作必须是成双成对的,在无超时无中断的情况下,一个线程执行入队操作,必然需要另一个线程执行出队操作,此时两操作互相匹配,同时完成操作,这也是其取名为 Synchronous(同时发生)的含义吧

以上内容如有问题欢迎指出,笔者验证后将及时修正,谢谢

正文完
 0