乐趣区

关于java:BlockQueue-基于TransferStack的SynchronousQueue

ThreadPoolExecutor 以 BlockingQueue 存储待执行工作,包含 SynchronousQueue、LinkedBlockingQueue 和 ArrayBlockingQueue,明天的目标是源码角度深入研究 SynchronousQueue。

之后打算是持续钻研 LinkedBlockingQueue 和 ArrayBlockingQueue,搬开所有绊脚石之后再开始线程池。

基本概念 #BlockingQueue

BlockingQueue 是 SynchronousQueue 的爹,他们的先人是 Queue,所以他们都会听从 Queue 的一些根本逻辑:比方按程序存入数据、按程序(FIFO 或者 LIFO)取出数据,都是从队首(head)获取数据,FIFO 队列新数据从队尾入队、LIFO 队列队列新数据入队首。

对于 BlockingQueue,咱们还是认真看一下他的 javaDoc:

BlockingQueue 是一个非凡的 Queue:当获取数据的时候阻塞期待队列非空,存入数据的时候阻塞期待队列可用(有界队列未满)。
BlockingQueue 的办法(获取数据或者存入数据)不能立刻胜利、然而未来某一时间点可能会胜利的状况下,有四种不同的解决形式:一种是抛出异样,第二种是返回非凡值(空值或者 false),第三种是无限期阻塞以后线程晓得胜利,第四种是阻塞期待设定的工夫。
具体如下表:

Throws exception Special value Blocks Times out
Insert add add(e) offer offer(e) put put(e) offer(Object, long, TimeUnit) offer(e, time, unit)
Remove #remove remove() #poll poll() #take take() #poll(long, TimeUnit) poll(time, unit)
Examine #element element() #peek peek() not applicable not applicable

BlockingQueue 不承受空值 null,尝试写入 null 会抛出异样,因为 BlockingQueue 用 null 示意操作失败。
BlockingQueue 能够是有界的也能够是无界的,有界队列保护残余容量属性 remainingCapacity,超出该属性后会阻塞写入操作。
无界队列没有容量限度,始终保护 remainingCapacity 为 Integer.MAX_VALUE
BlockingQueue 是线程平安的,BlockingQueue 的实现类的办法都是原子操作、或者通过其余并发管制形式实现线程安全性。

基本概念 SynchronousQueue

SynchronousQueue 是一个每次写入数据都必须期待其余线程获取数据(反之亦然)的 BlockingQueue。SynchronousQueue 没有容量的概念、一条数据都不能存储。你不能对 SynchronousQueue 队列执行 peek 操作因为只有执行 remove 操作能力获取到数据,只有其余线程要 remove 数据的时候你能力插入数据,你也不能进行迭代因为他基本就没有数据。队列的队首数据就是第一个写入数据的线程尝试写入的数据,如果没有写入线程则队列中就没有数据可获取,poll() 办法会返回 null。对于汇合类的其余办法、比方 contains,SynchronousQueue 的返回等同于空集合。
SynchronousQueue 不容许空(null)元素。
通过结构参数设置 fairness 为 true 提供偏心队列,偏心队列听从 FIFO(先进先出)。

基本概念 Transferer

Transferer 是 SynchronousQueue 的外部类,他的 JavaDoc 也很长:

Transferer 扩大实现了 W. N. Scherer III 和 M. L.Scott 在 ”Nonblocking Concurrent Objects with Condition Synchronization” 中形容的双栈(dual stack)或者双队列(dual queue)算法。
后进先出(Lifo)栈用来实现非偏心模式,先进先出(Fifo)队列用来实现偏心模式。两者的性能是一样的,个别状况下 Fifo 反对大吞吐量、Lifo maintains higher thread locality(道歉,没搞懂什么意思)。
dual queue(或 dual stack)在任一时间要么持有数据(写入操作提供的)、要么持有申请(获取数据的申请),向正好持有数据的队列申请数据、或者向持有申请的队列写入数据被称之为 ”fulfill”。最乏味的个性是对队列的任何操作都能晓得队列过后处于什么状态,因而操作不必要上锁。
queue 和 stack 都扩大自虚构类 Transferer,Transferer 定义了一个办法 transfer,能够同时实现 put 和 take 性能。把 put 和 take 对立在一个 transfer 办法中的起因是 dual 数据结构使得 put 和 take 操作是对称操作,所以两个办法的大部分代码都能够被合并。

好了,无关 JavaDoc 形容的个性就到这里了,SynchronousQueue 的 JavaDoc 很不容易了解,代码也是。

开始剖析源码:

  1. 构造函数:决定 SynchronousQueue 底层数据结构是用 Queue 还是 Stack
  2. 因为 SynchronousQueue 是比拟非凡的:不存储数据的(JavaDoc 提到过),所以须要明确的是相干汇合办法的返回也相应比拟非凡,比方 size=0,isEmpty=true 等等,这部分源码就不看了,特地简略
  3. 队列存、取数据的办法,最终调用的都是 Transferer 的 tranfer 办法,所以咱们次要就看这个办法

SynchronousQueue 构造方法

提供一个有参构造方法接管一个布尔量 fair,咱们后面说过,Queue 是偏心的、Stack 是非偏心的,所以 fair=true 的话创立 TransferQueue,否则创立 TransferStack。

TransferStack#SNode

TransferStack(TransferQueue 也一样)的源码尽管不多,然而必须首先理解分明他的数据结构,否则不太容易读懂。

节点 SNode: 也就是存储到栈内的内容,留神我这里没有说存储在栈内的数据而是说内容,是因为 TransferStack 的特殊性导致说数据容易引起误会:栈内有两种类型的节点,一种是“data”,能够了解为“生产者”放到栈内等得消费者生产的数据,另一种是“request”,能够了解为消费者的生产申请,也就是说申请和数据都会入栈,都属于“节点”。

TransferStack 通过外部类 SNode 定义节点,次要属性:

static final class SNode {
            volatile SNode next;        // next node in stack
            volatile SNode match;       // the node matched to this
            volatile Thread waiter;     // to control park/unpark
            Object item;                // data; or null for REQUESTs
            int mode;

next: 下一节点。
match:以后节点的匹配节点,比方一个申请数据的 Request 节点入栈后,正好有一个 data 节点入栈,他们两个如果匹配胜利的话,match 就是对方节点。
waiter:如果以后节点即便在自旋期待后依然没有被匹配,比方一个申请线程发送获取数据的申请后,该申请会以申请节点(Request 节点)入栈,始终没有数据送进来,则以后节点的 waiters 就记录为以后线程,之后以后线程本人挂起,期待匹配。这个期待匹配的过程是被动的,只能被另外一个 data 线程送进来的 data 节点匹配,匹配之后 data 线程通过 Request 节点的 waiters 获取到其对应的线程后唤醒该线程。
item:data 类的节点,记录送进来的待生产的数据,Request 类的节点,item 为 null。
mode:以后节点的 mode,有三个 mode:data mode 示意以后节点是数据节点(生产者发来的),Request mode 示意以后节点是申请节点(消费者发来的),还有一个比拟非凡的 mode 是:匹配中的数据节点或匹配中的申请节点,这个 mode 前面剖析 tranfer 代码的时候再说。
head:头节点,栈构造嘛,入栈节点始终是头节点,也只有头节点具备失常出栈的权限。

SNode 提供了几个原子性的操作:

  1. casNext:cas 形式替换以后节点的下一节点
  2. tryCancel;这个实现比拟非凡:以后节点的 match 如果为 null 的话则将 match 指向本人。用这种形式示意该节点被 calcel
  3. casHead:cas 的形式批改头节点,其实就是入栈或出栈操作

TransferStack#transfer

E transfer(E e, boolean timed, long nanos) {
      SNode s = null; // constructed/reused as needed
      // 如果 e 为 null 的话就是 REQUEST 操作,否则就是 DATA 操作
      int mode = (e == null) ? REQUEST : DATA;
      for (;;) {
            // 取头节点(首节点)h
           SNode h = head;
           // 空栈,或者首节点 mode 与以后操作的 mode 雷同,阐明以后节点与首节点不可能匹配了
           if (h == null || h.mode == mode) {  // empty or same-mode
               // 工夫到,等不了了
               if (timed && nanos <= 0) {      // can't wait
                   // 首节点被 calcel 了
                   if (h != null && h.isCancelled())
                       // 首节点出栈
                       casHead(h, h.next);     // pop cancelled node
                   // 既然等不及了,就返回 null
                   else
                       return null;
               // 否则,以后节点入栈,如果入栈胜利,s 就是首节点了
               } else if (casHead(h, s = snode(s, e, h, mode))) {
                   // 调用 awaitFulfill 阻塞期待匹配节点
                   SNode m = awaitFulfill(s, timed, nanos);
                   // 阻塞期待调用后果如果是 s 的话,阐明 s 被勾销了
                   if (m == s) {               // wait was cancelled
                       clean(s);
                       return null;
                   }
                   // 否则就是阻塞期待后匹配胜利了,那么判断如果头节点不空并且下一节点是 s 的话
                   // 阐明除了等来一个匹配节点之外,没有其余节点退出,那么这一对儿匹配节点都出栈
                   if ((h = head) != null && h.next == s)
                       casHead(h, s.next);     // help s's fulfiller
                   // 胜利匹配,能够返回了
                   return (E) ((mode == REQUEST) ? m.item : s.item);
               }
           // 否则,存在头节点并且以后节点 mode 不同能够匹配,并且头节点尚未被其余线程匹配
           } else if (!isFulfilling(h.mode)) { // try to fulfill
               // 如果头节点曾经被勾销,则出栈
               if (h.isCancelled())            // already cancelled
                   casHead(h, h.next);         // pop and retry
               // 否则以后节点以 FULLFILLING 模式入栈,s 变为首节点
               else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                   // 始终循环直到胜利匹配或栈内的期待节点忽然隐没
                   for (;;) { // loop until matched or waiters disappear
                       // m 为 s 的下一节点
                       SNode m = s.next;       // m is s's match
                       // m 空,阐明以前的期待节点忽然隐没,比方期待节点超时勾销
                       if (m == null) {        // all waiters are gone
                           // 清空栈
                           casHead(s, null);   // pop fulfill node
                           // 清空 s,从新进入主循环
                           s = null;           // use new node next time
                           break;              // restart main loop
                       }
                       // 否则,能够开始匹配了,mn 为 m 的下一节点,为出栈做好筹备
                       SNode mn = m.next;
                       // 如果 m 和 s 能匹配胜利,则 m 和 s 都出栈,返回后果
                       if (m.tryMatch(s)) {casHead(s, mn);     // pop both s and m
                           return (E) ((mode == REQUEST) ? m.item : s.item);
                       // 匹配没有胜利,这种状况应该是 m 超时勾销掉了,则 m 出栈
                       } else                  // lost match
                           s.casNext(m, mn);   // help unlink
                   }
               }
           // 否则,栈首节点处于匹配中 FULLFILLING 的状态(其余线程正在匹配然而尚未实现)// 这种状况下,新来的节点不入栈,先帮助实现栈首节点的匹配
           } else {                            // help a fulfiller
                    SNode m = h.next;               // m is h's match
                    // 首节点的下一节点为空(被勾销了),则清空栈
                    if (m == null)                  // waiter is gone
                        casHead(h, null);           // pop fulfilling node
                    else {
                        // 否则,去匹配首节点 h 和他的下一节点 m,如果匹配胜利了则 h 和 m 出栈
                        // 这种状况下是不须要返回,因为是帮助其余线程实现匹配,本人的匹配工作尚未开始呢...,其余线程如果取得执行权之后,会发现曾经有人帮忙他实现匹配了,所以会很快返回后果
                        SNode mn = m.next;
                        if (m.tryMatch(h))          // help match
                            casHead(h, mn);         // pop both h and m
                        else                        // lost match
                            h.casNext(m, mn);       // help unlink
                    }
                }
            }
        }

TransferStack#awaitFulfill

awaitFulfill 的作用是通过自旋、或者阻塞以后线程来期待节点被匹配。

3 个参数:
SNode s:期待匹配的节点。
booean timed:true 则示意限时期待。
long nanos:限时期待时长。

SNode awaitFulfill(SNode s, boolean timed, long nanos) {
            // 计算期待时长
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            // 获取以后线程
            Thread w = Thread.currentThread();
            // 计算自旋时长
            int spins = (shouldSpin(s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            // 自旋开始
            for (;;) {
                // 如果以后线程被中断,则 calcel 掉以后节点:将 s 的 match 指向本人
                if (w.isInterrupted())
                    s.tryCancel();
                // 自旋过程中实现匹配,则间接返回匹配节点
                SNode m = s.match;
                if (m != null)
                    return m;
                // 如果是显示匹配并且匹配超时,则 cancel 掉 s 节点
                if (timed) {nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {s.tryCancel();
                        continue;
                    }
                }
                // 自旋时长未到则持续自旋
                if (spins > 0)
                    spins = shouldSpin(s) ? (spins-1) : 0;
                // 实现自旋后记录以后线程
                else if (s.waiter == null)
                    s.waiter = w; // establish waiter so can park next iter
                // 阻塞以后线程
                else if (!timed)
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }

TransferStack#clean

clean 办法在 transfer 办法中调用,如果以后节点在期待匹配的过程中曾经被 cancel 掉的话。

代码不贴出了,根本逻辑就是首先清掉 s 及 s 关联的线程(s=null,s.waiter=null), 而后追查并清理掉被 cancel 掉的 head 节点(从 head 到 s 之后的第一个未被 cancel 掉的节点一一查看),直到确保栈的 head 节点失常(未被 calcel)。

而后从 head 开始、到 s 之后的第一个未被 cancel 掉的节点一一查看,如果有节点被标记为 cancel 则该节点出栈。

执行实现之后,不止是 s 节点被清理,栈内从 head 节点开始直到 s 节点的下一个未被 cancel 掉的节点之间的节点,如果被 cancel 掉的话,全副会被清理出栈。

小结

基于 TransferStack 的 SynchronousQueue 的源码就剖析实现了,感觉不对照代码逐行阐明的话,就很不容易说分明 TransferStack 的 transfer、awaitFulfill 办法的代码逻辑,所以就采纳在源码中逐行正文的形式来阐明了。

篇幅起因,TransferQueue 下次再说!

Thanks a lot!

上一篇 Runable 和 Callable 的区别?你必须要搞清楚 Thread 以及 FutureTask!

退出移动版