ThreadPoolExecutor以BlockingQueue存储待执行工作,包含SynchronousQueue、LinkedBlockingQueue和ArrayBlockingQueue,明天的目标是源码角度深入研究SynchronousQueue。
之后打算是持续钻研LinkedBlockingQueue和ArrayBlockingQueue,搬开所有绊脚石之后再开始线程池。
基本概念#BlockingQueue
BlockingQueue是SynchronousQueue的爹,他们的先人是Queue,所以他们都会听从Queue的一些根本逻辑:比方按程序存入数据、按程序(FIFO或者LIFO)取出数据,都是从队首(head)获取数据,FIFO队列新数据从队尾入队、LIFO队列队列新数据入队首。
对于BlockingQueue,咱们还是认真看一下他的javaDoc:
BlockingQueue是一个非凡的Queue:当获取数据的时候阻塞期待队列非空,存入数据的时候阻塞期待队列可用(有界队列未满)。
BlockingQueue的办法(获取数据或者存入数据)不能立刻胜利、然而未来某一时间点可能会胜利的状况下,有四种不同的解决形式:一种是抛出异样,第二种是返回非凡值(空值或者false),第三种是无限期阻塞以后线程晓得胜利,第四种是阻塞期待设定的工夫。
具体如下表:
Throws exception | Special value | Blocks | Times out | |
Insert | add add(e) | offer offer(e) | put put(e) | offer(Object, long, TimeUnit) offer(e, time, unit) |
Remove | #remove remove() | #poll poll() | #take take() | #poll(long, TimeUnit) poll(time, unit) |
Examine | #element element() | #peek peek() | not applicable | not applicable |
BlockingQueue不承受空值null,尝试写入null会抛出异样,因为BlockingQueue用null示意操作失败。
BlockingQueue能够是有界的也能够是无界的,有界队列保护残余容量属性remainingCapacity,超出该属性后会阻塞写入操作。
无界队列没有容量限度,始终保护remainingCapacity为Integer.MAX_VALUE
BlockingQueue是线程平安的,BlockingQueue的实现类的办法都是原子操作、或者通过其余并发管制形式实现线程安全性。
基本概念SynchronousQueue
SynchronousQueue是一个每次写入数据都必须期待其余线程获取数据(反之亦然)的BlockingQueue。SynchronousQueue没有容量的概念、一条数据都不能存储。你不能对SynchronousQueue队列执行peek操作因为只有执行remove操作能力获取到数据,只有其余线程要remove数据的时候你能力插入数据,你也不能进行迭代因为他基本就没有数据。队列的队首数据就是第一个写入数据的线程尝试写入的数据,如果没有写入线程则队列中就没有数据可获取,poll()办法会返回null。对于汇合类的其余办法、比方contains,SynchronousQueue的返回等同于空集合。
SynchronousQueue不容许空(null)元素。
通过结构参数设置fairness为true提供偏心队列,偏心队列听从FIFO(先进先出)。
基本概念 Transferer
Transferer是SynchronousQueue的外部类,他的JavaDoc也很长:
Transferer扩大实现了W. N. Scherer III和M. L.Scott在"Nonblocking Concurrent Objects with Condition Synchronization"中形容的双栈(dual stack)或者双队列(dual queue)算法。
后进先出(Lifo)栈用来实现非偏心模式,先进先出(Fifo)队列用来实现偏心模式。两者的性能是一样的,个别状况下Fifo反对大吞吐量、Lifo maintains higher thread locality(道歉,没搞懂什么意思)。
dual queue(或dual stack)在任一时间要么持有数据(写入操作提供的)、要么持有申请(获取数据的申请),向正好持有数据的队列申请数据、或者向持有申请的队列写入数据被称之为"fulfill"。最乏味的个性是对队列的任何操作都能晓得队列过后处于什么状态,因而操作不必要上锁。
queue和stack都扩大自虚构类Transferer,Transferer定义了一个办法transfer,能够同时实现put和take性能。把put和take对立在一个transfer办法中的起因是dual数据结构使得put和take操作是对称操作,所以两个办法的大部分代码都能够被合并。
好了,无关JavaDoc形容的个性就到这里了,SynchronousQueue的JavaDoc很不容易了解,代码也是。
开始剖析源码:
- 构造函数:决定SynchronousQueue底层数据结构是用Queue还是Stack
- 因为SynchronousQueue是比拟非凡的:不存储数据的(JavaDoc提到过),所以须要明确的是相干汇合办法的返回也相应比拟非凡,比方size=0,isEmpty=true等等,这部分源码就不看了,特地简略
- 队列存、取数据的办法,最终调用的都是Transferer的tranfer办法,所以咱们次要就看这个办法
SynchronousQueue构造方法
提供一个有参构造方法接管一个布尔量fair,咱们后面说过,Queue是偏心的、Stack是非偏心的,所以fair=true的话创立TransferQueue,否则创立TransferStack。
TransferStack#SNode
TransferStack(TransferQueue也一样)的源码尽管不多,然而必须首先理解分明他的数据结构,否则不太容易读懂。
节点SNode:也就是存储到栈内的内容,留神我这里没有说存储在栈内的数据而是说内容,是因为TransferStack的特殊性导致说数据容易引起误会:栈内有两种类型的节点,一种是“data”,能够了解为“生产者”放到栈内等得消费者生产的数据,另一种是“request”,能够了解为消费者的生产申请,也就是说申请和数据都会入栈,都属于“节点”。
TransferStack通过外部类SNode定义节点,次要属性:
static final class SNode { volatile SNode next; // next node in stack volatile SNode match; // the node matched to this volatile Thread waiter; // to control park/unpark Object item; // data; or null for REQUESTs int mode;
next:下一节点。
match:以后节点的匹配节点,比方一个申请数据的Request节点入栈后,正好有一个data节点入栈,他们两个如果匹配胜利的话,match就是对方节点。
waiter:如果以后节点即便在自旋期待后依然没有被匹配,比方一个申请线程发送获取数据的申请后,该申请会以申请节点(Request节点)入栈,始终没有数据送进来,则以后节点的waiters就记录为以后线程,之后以后线程本人挂起,期待匹配。这个期待匹配的过程是被动的,只能被另外一个data线程送进来的data节点匹配,匹配之后data线程通过Request节点的waiters获取到其对应的线程后唤醒该线程。
item:data类的节点,记录送进来的待生产的数据,Request类的节点,item为null。
mode:以后节点的mode,有三个mode:data mode示意以后节点是数据节点(生产者发来的),Request mode示意以后节点是申请节点(消费者发来的),还有一个比拟非凡的mode是:匹配中的数据节点或匹配中的申请节点,这个mode前面剖析tranfer代码的时候再说。
head:头节点,栈构造嘛,入栈节点始终是头节点,也只有头节点具备失常出栈的权限。
SNode提供了几个原子性的操作:
- casNext:cas形式替换以后节点的下一节点
- tryCancel;这个实现比拟非凡:以后节点的match如果为null的话则将match指向本人。用这种形式示意该节点被calcel
- casHead:cas的形式批改头节点,其实就是入栈或出栈操作
TransferStack#transfer
E transfer(E e, boolean timed, long nanos) { SNode s = null; // constructed/reused as needed //如果e为null的话就是REQUEST操作,否则就是DATA操作 int mode = (e == null) ? REQUEST : DATA; for (;;) { //取头节点(首节点)h SNode h = head; //空栈,或者首节点mode与以后操作的mode雷同,阐明以后节点与首节点不可能匹配了 if (h == null || h.mode == mode) { // empty or same-mode //工夫到,等不了了 if (timed && nanos <= 0) { // can't wait //首节点被calcel了 if (h != null && h.isCancelled()) //首节点出栈 casHead(h, h.next); // pop cancelled node //既然等不及了,就返回null else return null; //否则,以后节点入栈,如果入栈胜利,s就是首节点了 } else if (casHead(h, s = snode(s, e, h, mode))) { //调用awaitFulfill阻塞期待匹配节点 SNode m = awaitFulfill(s, timed, nanos); //阻塞期待调用后果如果是s的话,阐明s被勾销了 if (m == s) { // wait was cancelled clean(s); return null; } //否则就是阻塞期待后匹配胜利了,那么判断如果头节点不空并且下一节点是s的话 //阐明除了等来一个匹配节点之外,没有其余节点退出,那么这一对儿匹配节点都出栈 if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller //胜利匹配,能够返回了 return (E) ((mode == REQUEST) ? m.item : s.item); } //否则,存在头节点并且以后节点mode不同能够匹配,并且头节点尚未被其余线程匹配 } else if (!isFulfilling(h.mode)) { // try to fulfill //如果头节点曾经被勾销,则出栈 if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry //否则以后节点以FULLFILLING模式入栈,s变为首节点 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { //始终循环直到胜利匹配或栈内的期待节点忽然隐没 for (;;) { // loop until matched or waiters disappear //m为s的下一节点 SNode m = s.next; // m is s's match //m空,阐明以前的期待节点忽然隐没,比方期待节点超时勾销 if (m == null) { // all waiters are gone //清空栈 casHead(s, null); // pop fulfill node //清空s,从新进入主循环 s = null; // use new node next time break; // restart main loop } //否则,能够开始匹配了,mn为m的下一节点,为出栈做好筹备 SNode mn = m.next; //如果m和s能匹配胜利,则m和s都出栈,返回后果 if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m return (E) ((mode == REQUEST) ? m.item : s.item); //匹配没有胜利,这种状况应该是m超时勾销掉了,则m出栈 } else // lost match s.casNext(m, mn); // help unlink } } //否则,栈首节点处于匹配中FULLFILLING的状态(其余线程正在匹配然而尚未实现) //这种状况下,新来的节点不入栈,先帮助实现栈首节点的匹配 } else { // help a fulfiller SNode m = h.next; // m is h's match //首节点的下一节点为空(被勾销了),则清空栈 if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { //否则,去匹配首节点h和他的下一节点m,如果匹配胜利了则h和m出栈 //这种状况下是不须要返回,因为是帮助其余线程实现匹配,本人的匹配工作尚未开始呢...,其余线程如果取得执行权之后,会发现曾经有人帮忙他实现匹配了,所以会很快返回后果 SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } } }
TransferStack#awaitFulfill
awaitFulfill的作用是通过自旋、或者阻塞以后线程来期待节点被匹配。
3个参数:
SNode s:期待匹配的节点。
booean timed:true则示意限时期待。
long nanos:限时期待时长。
SNode awaitFulfill(SNode s, boolean timed, long nanos) { //计算期待时长 final long deadline = timed ? System.nanoTime() + nanos : 0L; //获取以后线程 Thread w = Thread.currentThread(); //计算自旋时长 int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); //自旋开始 for (;;) { //如果以后线程被中断,则calcel掉以后节点:将s的match指向本人 if (w.isInterrupted()) s.tryCancel(); //自旋过程中实现匹配,则间接返回匹配节点 SNode m = s.match; if (m != null) return m; //如果是显示匹配并且匹配超时,则cancel掉s节点 if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(); continue; } } //自旋时长未到则持续自旋 if (spins > 0) spins = shouldSpin(s) ? (spins-1) : 0; //实现自旋后记录以后线程 else if (s.waiter == null) s.waiter = w; // establish waiter so can park next iter //阻塞以后线程 else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
TransferStack#clean
clean办法在transfer办法中调用,如果以后节点在期待匹配的过程中曾经被cancel掉的话。
代码不贴出了,根本逻辑就是首先清掉s及s关联的线程(s=null,s.waiter=null),而后追查并清理掉被cancel掉的head节点(从head到s之后的第一个未被cancel掉的节点一一查看),直到确保栈的head节点失常(未被calcel)。
而后从head开始、到s之后的第一个未被cancel掉的节点一一查看,如果有节点被标记为cancel则该节点出栈。
执行实现之后,不止是s节点被清理,栈内从head节点开始直到s节点的下一个未被cancel掉的节点之间的节点,如果被cancel掉的话,全副会被清理出栈。
小结
基于TransferStack的SynchronousQueue的源码就剖析实现了,感觉不对照代码逐行阐明的话,就很不容易说分明TransferStack的transfer、awaitFulfill办法的代码逻辑,所以就采纳在源码中逐行正文的形式来阐明了。
篇幅起因,TransferQueue下次再说!
Thanks a lot!
上一篇 Runable和Callable的区别?你必须要搞清楚Thread以及FutureTask!