关于java:BlockQueue-基于TransferQueue的SynchronousQueue

4次阅读

共计 4430 个字符,预计需要花费 12 分钟才能阅读完成。

原来认为 TransferQueue 和 TransferStack 的实现应该是一样的,一个是 Queue,一个是 Stack,take 的时候都是从 head 获取,只是 put 的时候不一样,Queue 是放在队尾,Stack 是压栈放在栈首。所以想两者的区别只是 put 不一样就能够了。

然而理论他们的算法差距不止是在 put,差距还是比拟大的,了解了 TransferStack 的源码,并不是天经地义间接就能轻松了解 TransferQueue 的。

TransferQueue#QNode

与 TransferStack 相似,通过 QNode(TransferStack 的叫 SNode)存储“数据”或者获取数据的“申请”。

QNode 次要属性:

static final class QNode {
            volatile QNode next;          // next node in queue
            volatile Object item;         // CAS'ed to or from null
            volatile Thread waiter;       // to control park/unpark
            final boolean isData;

next: 下一节点。
waiter:该节点绑定的线程,与 SNode 的 waiter 含意一样。
item:与 SNode 的 item 含意统一。
isData:相似于 SNode 的 mode,true 示意是数据,false 示意该节点是 request。
head:头节点,或者叫首节点。
tail:尾结点。

比拟厌恶的是,他没有 match!他也没有 FULFILLING 的 mode,所以也就暗示了他的算法逻辑和 TransferStack 的差距可能会比拟大。

QNode 提供了几个原子性的操作:

  1. casNext:cas 形式替换以后节点的下一节点
  2. casItem:cas 形式替换以后节点的 item
  3. tryCancel;cas 形式设置以后节点的 item 为本人,用这种形式示意该节点被 calcel
  4. advanceHead:cas 的形式批改头节点,其实就是队列从 head 出队列
  5. advanceTail:从队尾入队

TransferQueue# 初始化

 TransferQueue() {QNode h = new QNode(null, false); // initialize to dummy node.
            head = h;
            tail = h;
        }

构造函数创立了一个“dummy node”并设置为 head、tail, 须要留神,这个“dummy node”在 take 操作的时候必须要跳过。

TransferQueue#transfer

TransferQueue 的重头戏,入队列和出队列都是这一个办法。

代码的解析放在正文中了:

E transfer(E e, boolean timed, long nanos) {

       QNode s = null; // constructed/reused as needed
       boolean isData = (e != null);

       for (;;) {
           QNode t = tail;
           QNode h = head;
           // 如果以后队列尚未实现初始化,则期待初始化实现
           if (t == null || h == null)         // saw uninitialized value
               continue;                       // spin
           // 队列空,或者以后操作与 tail 的 mode 雷同
           // 这种状况下,队列内的节点与以后申请节点无奈匹配,所以,以后申请节点入队 
           if (h == t || t.isData == isData) { // empty or same-mode
               QNode tn = t.next;
               // 并发操作过程中,t 曾经不是尾结点了(其余线程有节点入队了),重来主循环
               if (t != tail)                  // inconsistent read
                   continue;
               // 有新节点退出队尾了,从新设置尾结点,重来主循环
               if (tn != null) {               // lagging tail
                   advanceTail(t, tn);
                   continue;
               }
               // 等超时的,返回 null
               if (timed && nanos <= 0)        // can't wait
                   return null;
               // 创立新节点
               if (s == null)
                   s = new QNode(e, isData);
               // 新节点退出队尾,退出失败的话,重来
               if (!t.casNext(null, s))        // failed to link in
                   continue;
               // 新节点设置为尾结点 --- 实现入队
               advanceTail(t, s);              // swing tail and wait
               // 通过自旋或者阻塞以后过程,期待匹配
               Object x = awaitFulfill(s, e, timed, nanos);
               // 期待匹配过程中,节点勾销,则 clean 之后,返回 null
               if (x == s) {                   // wait was cancelled
                   clean(t, s);
                   return null;
               }
               
               // 实现匹配了,要么就是自旋过程中间接实现了匹配,要么就是阻塞后被匹配胜利的节点唤醒了
               // 如果 s 还在队列中
               if (!s.isOffList()) {           // not already unlinked
                   // 则 s 的上一节点如果是首节点的话,换成 s 为首节点,原来的首节点 t 移除队列
                   // 接下来的操作逻辑基于:s 曾经匹配胜利,原来的头节点出队列,s 如果是数据节点的话解除 s 对数据的援用,s 节点变为一个 dummy 类的节点变更为 head
                   advanceHead(t, s);          // unlink if head
                   // 匹配胜利后,如果 x!=null,阐明以后节点 s 是 "request",匹配到了“data”if (x != null)              // and forget fields
                       // s 的 item 指向本人,因为 x 就要返回来,开释掉 s 对 x 的援用
                       s.item = s;
                  // s 的期待线程置空
                  s.waiter = null;
               }
               // 匹配胜利,返回
               return (x != null) ? (E)x : e;

           // 否则,队列不空并且 mode 不同,能够匹配
           } else {                            // complementary-mode
               // 筹备进行匹配,留神获取的是首节点的下一个节点,跳过了首节点
               QNode m = h.next;               // node to fulfill
               // 处理过程中队列产生了变动,重来
               if (t != tail || m == null || h != head)
                   continue;                   // inconsistent read

               Object x = m.item;
               // 如果 m 曾经被其余节点匹配过,或者 m 被勾销,或者匹配失败,就把 m 更换为头节点,持续主循环
               // 如果 m 没有被匹配也没有被勾销,则通过 casItem 的形式进行匹配,匹配实现后,m 节点的 isData 会变更为反向
               // 即:request 节点变更为 data,data 节点变更为 request,// 这个操作是为了在唤醒对应的阻塞过程后使得阻塞过程满足实现匹配的条件, 相似于 Stack 的 match 赋值
               if (isData == (x != null) ||    // m already fulfilled
                   x == m ||                   // m cancelled
                   !m.casItem(x, e)) {         // lost CAS
                   advanceHead(h, m);          // dequeue and retry
                   continue;
               }
               // 匹配实现的话,首节点出队列,须要留神,m 节点并没有出队列,而是变为了首节点,// 阻塞过程被唤醒后会把 m 节点解决为相似于 dummy 节点。首节点在下次匹配的时候会被跳过
               advanceHead(h, m);              // successfully fulfilled
               // 唤醒 m 节点的阻塞过程
               LockSupport.unpark(m.waiter);
               // 返回
               return (x != null) ? (E)x : e;
           }
       }
   }

TransferQueue#awaitFulfill

awaitFulfill 的作用是通过自旋、或者阻塞以后线程来期待节点被匹配。

参数变成 4 个了:
QNode s:期待匹配的节点。
E e:匹配判断规范,和 TransferStack 的齐全不一样了,TransferStack 是通过 match 属性来判断的,TransferQueue 是用了这么个玩意儿,很不容易了解,初始送进来的 e 就是 s 的 item,稍后解释具体匹配规定。
booean timed:true 则示意限时期待。
long nanos:限时期待时长。

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
       // 自旋时长、timeout 时长等
       final long deadline = timed ? System.nanoTime() + nanos : 0L;
       Thread w = Thread.currentThread();
       int spins = ((head.next == s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
       for (;;) {if (w.isInterrupted())
               s.tryCancel(e);
           Object x = s.item;
           // 刚开始 e ==x,用 x!= e 来判断是否匹配的话,必定是没有实现匹配
           // 在自旋或线程阻塞期待的过程中,变动的是 x,也就是队列中 s 的 item 在其余线程实现匹配后会被更换
           // 具体参考 transfer 办法中的匹配局部的 casItem,就是干这个的,替换实现之后唤醒本过程,x!= e 就成立了
           if (x != e)
               return x;
           // 超时判断,如果超时则勾销以后节点
           if (timed) {nanos = deadline - System.nanoTime();
               if (nanos <= 0L) {s.tryCancel(e);
                   continue;
               }
           }
           // 首先自旋
           if (spins > 0)
               --spins;
           // 线程赋值,筹备阻塞
           else if (s.waiter == null)
               s.waiter = w;
           // 阻塞线程
           else if (!timed)
               LockSupport.park(this);
           else if (nanos > spinForTimeoutThreshold)
               LockSupport.parkNanos(this, nanos);
            }
        }

代码正文中解释过了,刚开始调用的时候 e ==x,用 x!= e 来判断是否匹配的话,必定是没有实现匹配。

在自旋或线程阻塞期待的过程中,变动的是 x,也就是队列中 s 的 item 在其余线程实现匹配后会被更换,具体参考 transfer 办法中的匹配局部的 casItem,就是干这个的,替换实现之后唤醒本过程,x!= e 就成立了,匹配得以实现。

TransferQueue#clean

和 TransferStack 的 clean 办法性能一样,就是要清理掉尚未实现匹配就被勾销掉的节点,查看以后节点如果不是尾结点的话就清出队列。

小结

SynchronousQueue 的源码就剖析实现了,尽管代码量不大,但却不太容易了解。须要重复浏览、对照写入队列、从队列获取数据两种类型的操作、构想多线程并发状况下的操作场景、在空白纸上画出队列在各场景、各给定工夫点的状态,有助于更好的了解代码逻辑。

Thanks a lot!

上一篇 BlockQueue – 基于 TransferStack 的 SynchronousQueue

正文完
 0