原来认为TransferQueue和TransferStack的实现应该是一样的,一个是Queue,一个是Stack,take的时候都是从head获取,只是put的时候不一样,Queue是放在队尾,Stack是压栈放在栈首。所以想两者的区别只是put不一样就能够了。

然而理论他们的算法差距不止是在put,差距还是比拟大的,了解了TransferStack的源码,并不是天经地义间接就能轻松了解TransferQueue的。

TransferQueue#QNode

与TransferStack相似,通过QNode(TransferStack的叫SNode)存储“数据”或者获取数据的“申请”。

QNode次要属性:

static final class QNode {            volatile QNode next;          // next node in queue            volatile Object item;         // CAS'ed to or from null            volatile Thread waiter;       // to control park/unpark            final boolean isData;

next:下一节点。
waiter:该节点绑定的线程,与SNode的waiter含意一样。
item:与SNode的item含意统一。
isData:相似于SNode的mode,true示意是数据,false示意该节点是request。
head:头节点,或者叫首节点。
tail:尾结点。

比拟厌恶的是,他没有match!他也没有FULFILLING的mode,所以也就暗示了他的算法逻辑和TransferStack的差距可能会比拟大。

QNode提供了几个原子性的操作:

  1. casNext:cas形式替换以后节点的下一节点
  2. casItem:cas形式替换以后节点的item
  3. tryCancel;cas形式设置以后节点的item为本人,用这种形式示意该节点被calcel
  4. advanceHead:cas的形式批改头节点,其实就是队列从head出队列
  5. advanceTail:从队尾入队

TransferQueue#初始化

 TransferQueue() {            QNode h = new QNode(null, false); // initialize to dummy node.            head = h;            tail = h;        }

构造函数创立了一个“dummy node”并设置为head、tail,须要留神,这个“dummy node”在take操作的时候必须要跳过。

TransferQueue#transfer

TransferQueue的重头戏,入队列和出队列都是这一个办法。

代码的解析放在正文中了:

E transfer(E e, boolean timed, long nanos) {       QNode s = null; // constructed/reused as needed       boolean isData = (e != null);       for (;;) {           QNode t = tail;           QNode h = head;           //如果以后队列尚未实现初始化,则期待初始化实现           if (t == null || h == null)         // saw uninitialized value               continue;                       // spin           //队列空,或者以后操作与tail的mode雷同           //这种状况下,队列内的节点与以后申请节点无奈匹配,所以,以后申请节点入队            if (h == t || t.isData == isData) { // empty or same-mode               QNode tn = t.next;               //并发操作过程中,t曾经不是尾结点了(其余线程有节点入队了),重来主循环               if (t != tail)                  // inconsistent read                   continue;               //有新节点退出队尾了,从新设置尾结点,重来主循环               if (tn != null) {               // lagging tail                   advanceTail(t, tn);                   continue;               }               //等超时的,返回null               if (timed && nanos <= 0)        // can't wait                   return null;               //创立新节点               if (s == null)                   s = new QNode(e, isData);               //新节点退出队尾,退出失败的话,重来               if (!t.casNext(null, s))        // failed to link in                   continue;               //新节点设置为尾结点 --- 实现入队               advanceTail(t, s);              // swing tail and wait               //通过自旋或者阻塞以后过程,期待匹配               Object x = awaitFulfill(s, e, timed, nanos);               //期待匹配过程中,节点勾销,则clean之后,返回null               if (x == s) {                   // wait was cancelled                   clean(t, s);                   return null;               }                              //实现匹配了,要么就是自旋过程中间接实现了匹配,要么就是阻塞后被匹配胜利的节点唤醒了               //如果s还在队列中               if (!s.isOffList()) {           // not already unlinked                   //则s的上一节点如果是首节点的话,换成s为首节点,原来的首节点t移除队列                   //接下来的操作逻辑基于:s曾经匹配胜利,原来的头节点出队列,s如果是数据节点的话解除s对数据的援用,s节点变为一个dummy类的节点变更为head                   advanceHead(t, s);          // unlink if head                   //匹配胜利后,如果x!=null,阐明以后节点s是"request",匹配到了“data”                   if (x != null)              // and forget fields                       //s的item指向本人,因为x就要返回来,开释掉s对x的援用                       s.item = s;                  //s的期待线程置空                  s.waiter = null;               }               //匹配胜利,返回               return (x != null) ? (E)x : e;           //否则,队列不空并且mode不同,能够匹配           } else {                            // complementary-mode               //筹备进行匹配,留神获取的是首节点的下一个节点,跳过了首节点               QNode m = h.next;               // node to fulfill               //处理过程中队列产生了变动,重来               if (t != tail || m == null || h != head)                   continue;                   // inconsistent read               Object x = m.item;               //如果m曾经被其余节点匹配过,或者m被勾销,或者匹配失败,就把m更换为头节点,持续主循环               //如果m没有被匹配也没有被勾销,则通过casItem的形式进行匹配,匹配实现后,m节点的isData会变更为反向               //即:request节点变更为data,data节点变更为request,               //这个操作是为了在唤醒对应的阻塞过程后使得阻塞过程满足实现匹配的条件,相似于Stack的match赋值               if (isData == (x != null) ||    // m already fulfilled                   x == m ||                   // m cancelled                   !m.casItem(x, e)) {         // lost CAS                   advanceHead(h, m);          // dequeue and retry                   continue;               }               //匹配实现的话,首节点出队列,须要留神,m节点并没有出队列,而是变为了首节点,               //阻塞过程被唤醒后会把m节点解决为相似于dummy节点。首节点在下次匹配的时候会被跳过               advanceHead(h, m);              // successfully fulfilled               //唤醒m节点的阻塞过程               LockSupport.unpark(m.waiter);               //返回               return (x != null) ? (E)x : e;           }       }   }

TransferQueue#awaitFulfill

awaitFulfill的作用是通过自旋、或者阻塞以后线程来期待节点被匹配。

参数变成4个了:
QNode s:期待匹配的节点。
E e:匹配判断规范,和TransferStack的齐全不一样了,TransferStack是通过match属性来判断的,TransferQueue是用了这么个玩意儿,很不容易了解,初始送进来的e就是s的item,稍后解释具体匹配规定。
booean timed:true则示意限时期待。
long nanos:限时期待时长。

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {       //自旋时长、timeout时长等       final long deadline = timed ? System.nanoTime() + nanos : 0L;       Thread w = Thread.currentThread();       int spins = ((head.next == s) ?                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);       for (;;) {           if (w.isInterrupted())               s.tryCancel(e);           Object x = s.item;           //刚开始e==x,用x!=e来判断是否匹配的话,必定是没有实现匹配           //在自旋或线程阻塞期待的过程中,变动的是x,也就是队列中s的item在其余线程实现匹配后会被更换           //具体参考transfer办法中的匹配局部的casItem,就是干这个的,替换实现之后唤醒本过程,x!=e就成立了           if (x != e)               return x;           //超时判断,如果超时则勾销以后节点           if (timed) {               nanos = deadline - System.nanoTime();               if (nanos <= 0L) {                   s.tryCancel(e);                   continue;               }           }           //首先自旋           if (spins > 0)               --spins;           //线程赋值,筹备阻塞           else if (s.waiter == null)               s.waiter = w;           //阻塞线程           else if (!timed)               LockSupport.park(this);           else if (nanos > spinForTimeoutThreshold)               LockSupport.parkNanos(this, nanos);            }        }

代码正文中解释过了,刚开始调用的时候e==x,用x!=e来判断是否匹配的话,必定是没有实现匹配。

在自旋或线程阻塞期待的过程中,变动的是x,也就是队列中s的item在其余线程实现匹配后会被更换,具体参考transfer办法中的匹配局部的casItem,就是干这个的,替换实现之后唤醒本过程,x!=e就成立了,匹配得以实现。

TransferQueue#clean

和TransferStack的clean办法性能一样,就是要清理掉尚未实现匹配就被勾销掉的节点,查看以后节点如果不是尾结点的话就清出队列。

小结

SynchronousQueue的源码就剖析实现了,尽管代码量不大,但却不太容易了解。须要重复浏览、对照写入队列、从队列获取数据两种类型的操作、构想多线程并发状况下的操作场景、在空白纸上画出队列在各场景、各给定工夫点的状态,有助于更好的了解代码逻辑。

Thanks a lot!

上一篇 BlockQueue - 基于TransferStack的SynchronousQueue