共计 7023 个字符,预计需要花费 18 分钟才能阅读完成。
ThreadPoolExecutor 以 BlockingQueue 存储待执行工作,包含 SynchronousQueue、LinkedBlockingQueue 和 ArrayBlockingQueue,明天的目标是源码角度深入研究 SynchronousQueue。
之后打算是持续钻研 LinkedBlockingQueue 和 ArrayBlockingQueue,搬开所有绊脚石之后再开始线程池。
基本概念 #BlockingQueue
BlockingQueue 是 SynchronousQueue 的爹,他们的先人是 Queue,所以他们都会听从 Queue 的一些根本逻辑:比方按程序存入数据、按程序(FIFO 或者 LIFO)取出数据,都是从队首(head)获取数据,FIFO 队列新数据从队尾入队、LIFO 队列队列新数据入队首。
对于 BlockingQueue,咱们还是认真看一下他的 javaDoc:
BlockingQueue 是一个非凡的 Queue:当获取数据的时候阻塞期待队列非空,存入数据的时候阻塞期待队列可用(有界队列未满)。
BlockingQueue 的办法(获取数据或者存入数据)不能立刻胜利、然而未来某一时间点可能会胜利的状况下,有四种不同的解决形式:一种是抛出异样,第二种是返回非凡值(空值或者 false),第三种是无限期阻塞以后线程晓得胜利,第四种是阻塞期待设定的工夫。
具体如下表:
Throws exception | Special value | Blocks | Times out | |
Insert | add add(e) | offer offer(e) | put put(e) | offer(Object, long, TimeUnit) offer(e, time, unit) |
Remove | #remove remove() | #poll poll() | #take take() | #poll(long, TimeUnit) poll(time, unit) |
Examine | #element element() | #peek peek() | not applicable | not applicable |
BlockingQueue 不承受空值 null,尝试写入 null 会抛出异样,因为 BlockingQueue 用 null 示意操作失败。
BlockingQueue 能够是有界的也能够是无界的,有界队列保护残余容量属性 remainingCapacity,超出该属性后会阻塞写入操作。
无界队列没有容量限度,始终保护 remainingCapacity 为 Integer.MAX_VALUE
BlockingQueue 是线程平安的,BlockingQueue 的实现类的办法都是原子操作、或者通过其余并发管制形式实现线程安全性。
基本概念 SynchronousQueue
SynchronousQueue 是一个每次写入数据都必须期待其余线程获取数据(反之亦然)的 BlockingQueue。SynchronousQueue 没有容量的概念、一条数据都不能存储。你不能对 SynchronousQueue 队列执行 peek 操作因为只有执行 remove 操作能力获取到数据,只有其余线程要 remove 数据的时候你能力插入数据,你也不能进行迭代因为他基本就没有数据。队列的队首数据就是第一个写入数据的线程尝试写入的数据,如果没有写入线程则队列中就没有数据可获取,poll() 办法会返回 null。对于汇合类的其余办法、比方 contains,SynchronousQueue 的返回等同于空集合。
SynchronousQueue 不容许空(null)元素。
通过结构参数设置 fairness 为 true 提供偏心队列,偏心队列听从 FIFO(先进先出)。
基本概念 Transferer
Transferer 是 SynchronousQueue 的外部类,他的 JavaDoc 也很长:
Transferer 扩大实现了 W. N. Scherer III 和 M. L.Scott 在 ”Nonblocking Concurrent Objects with Condition Synchronization” 中形容的双栈(dual stack)或者双队列(dual queue)算法。
后进先出(Lifo)栈用来实现非偏心模式,先进先出(Fifo)队列用来实现偏心模式。两者的性能是一样的,个别状况下 Fifo 反对大吞吐量、Lifo maintains higher thread locality(道歉,没搞懂什么意思)。
dual queue(或 dual stack)在任一时间要么持有数据(写入操作提供的)、要么持有申请(获取数据的申请),向正好持有数据的队列申请数据、或者向持有申请的队列写入数据被称之为 ”fulfill”。最乏味的个性是对队列的任何操作都能晓得队列过后处于什么状态,因而操作不必要上锁。
queue 和 stack 都扩大自虚构类 Transferer,Transferer 定义了一个办法 transfer,能够同时实现 put 和 take 性能。把 put 和 take 对立在一个 transfer 办法中的起因是 dual 数据结构使得 put 和 take 操作是对称操作,所以两个办法的大部分代码都能够被合并。
好了,无关 JavaDoc 形容的个性就到这里了,SynchronousQueue 的 JavaDoc 很不容易了解,代码也是。
开始剖析源码:
- 构造函数:决定 SynchronousQueue 底层数据结构是用 Queue 还是 Stack
- 因为 SynchronousQueue 是比拟非凡的:不存储数据的(JavaDoc 提到过),所以须要明确的是相干汇合办法的返回也相应比拟非凡,比方 size=0,isEmpty=true 等等,这部分源码就不看了,特地简略
- 队列存、取数据的办法,最终调用的都是 Transferer 的 tranfer 办法,所以咱们次要就看这个办法
SynchronousQueue 构造方法
提供一个有参构造方法接管一个布尔量 fair,咱们后面说过,Queue 是偏心的、Stack 是非偏心的,所以 fair=true 的话创立 TransferQueue,否则创立 TransferStack。
TransferStack#SNode
TransferStack(TransferQueue 也一样)的源码尽管不多,然而必须首先理解分明他的数据结构,否则不太容易读懂。
节点 SNode: 也就是存储到栈内的内容,留神我这里没有说存储在栈内的数据而是说内容,是因为 TransferStack 的特殊性导致说数据容易引起误会:栈内有两种类型的节点,一种是“data”,能够了解为“生产者”放到栈内等得消费者生产的数据,另一种是“request”,能够了解为消费者的生产申请,也就是说申请和数据都会入栈,都属于“节点”。
TransferStack 通过外部类 SNode 定义节点,次要属性:
static final class SNode {
volatile SNode next; // next node in stack
volatile SNode match; // the node matched to this
volatile Thread waiter; // to control park/unpark
Object item; // data; or null for REQUESTs
int mode;
next: 下一节点。
match:以后节点的匹配节点,比方一个申请数据的 Request 节点入栈后,正好有一个 data 节点入栈,他们两个如果匹配胜利的话,match 就是对方节点。
waiter:如果以后节点即便在自旋期待后依然没有被匹配,比方一个申请线程发送获取数据的申请后,该申请会以申请节点(Request 节点)入栈,始终没有数据送进来,则以后节点的 waiters 就记录为以后线程,之后以后线程本人挂起,期待匹配。这个期待匹配的过程是被动的,只能被另外一个 data 线程送进来的 data 节点匹配,匹配之后 data 线程通过 Request 节点的 waiters 获取到其对应的线程后唤醒该线程。
item:data 类的节点,记录送进来的待生产的数据,Request 类的节点,item 为 null。
mode:以后节点的 mode,有三个 mode:data mode 示意以后节点是数据节点(生产者发来的),Request mode 示意以后节点是申请节点(消费者发来的),还有一个比拟非凡的 mode 是:匹配中的数据节点或匹配中的申请节点,这个 mode 前面剖析 tranfer 代码的时候再说。
head:头节点,栈构造嘛,入栈节点始终是头节点,也只有头节点具备失常出栈的权限。
SNode 提供了几个原子性的操作:
- casNext:cas 形式替换以后节点的下一节点
- tryCancel;这个实现比拟非凡:以后节点的 match 如果为 null 的话则将 match 指向本人。用这种形式示意该节点被 calcel
- casHead:cas 的形式批改头节点,其实就是入栈或出栈操作
TransferStack#transfer
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
// 如果 e 为 null 的话就是 REQUEST 操作,否则就是 DATA 操作
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
// 取头节点(首节点)h
SNode h = head;
// 空栈,或者首节点 mode 与以后操作的 mode 雷同,阐明以后节点与首节点不可能匹配了
if (h == null || h.mode == mode) { // empty or same-mode
// 工夫到,等不了了
if (timed && nanos <= 0) { // can't wait
// 首节点被 calcel 了
if (h != null && h.isCancelled())
// 首节点出栈
casHead(h, h.next); // pop cancelled node
// 既然等不及了,就返回 null
else
return null;
// 否则,以后节点入栈,如果入栈胜利,s 就是首节点了
} else if (casHead(h, s = snode(s, e, h, mode))) {
// 调用 awaitFulfill 阻塞期待匹配节点
SNode m = awaitFulfill(s, timed, nanos);
// 阻塞期待调用后果如果是 s 的话,阐明 s 被勾销了
if (m == s) { // wait was cancelled
clean(s);
return null;
}
// 否则就是阻塞期待后匹配胜利了,那么判断如果头节点不空并且下一节点是 s 的话
// 阐明除了等来一个匹配节点之外,没有其余节点退出,那么这一对儿匹配节点都出栈
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
// 胜利匹配,能够返回了
return (E) ((mode == REQUEST) ? m.item : s.item);
}
// 否则,存在头节点并且以后节点 mode 不同能够匹配,并且头节点尚未被其余线程匹配
} else if (!isFulfilling(h.mode)) { // try to fulfill
// 如果头节点曾经被勾销,则出栈
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
// 否则以后节点以 FULLFILLING 模式入栈,s 变为首节点
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
// 始终循环直到胜利匹配或栈内的期待节点忽然隐没
for (;;) { // loop until matched or waiters disappear
// m 为 s 的下一节点
SNode m = s.next; // m is s's match
// m 空,阐明以前的期待节点忽然隐没,比方期待节点超时勾销
if (m == null) { // all waiters are gone
// 清空栈
casHead(s, null); // pop fulfill node
// 清空 s,从新进入主循环
s = null; // use new node next time
break; // restart main loop
}
// 否则,能够开始匹配了,mn 为 m 的下一节点,为出栈做好筹备
SNode mn = m.next;
// 如果 m 和 s 能匹配胜利,则 m 和 s 都出栈,返回后果
if (m.tryMatch(s)) {casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
// 匹配没有胜利,这种状况应该是 m 超时勾销掉了,则 m 出栈
} else // lost match
s.casNext(m, mn); // help unlink
}
}
// 否则,栈首节点处于匹配中 FULLFILLING 的状态(其余线程正在匹配然而尚未实现)// 这种状况下,新来的节点不入栈,先帮助实现栈首节点的匹配
} else { // help a fulfiller
SNode m = h.next; // m is h's match
// 首节点的下一节点为空(被勾销了),则清空栈
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
// 否则,去匹配首节点 h 和他的下一节点 m,如果匹配胜利了则 h 和 m 出栈
// 这种状况下是不须要返回,因为是帮助其余线程实现匹配,本人的匹配工作尚未开始呢...,其余线程如果取得执行权之后,会发现曾经有人帮忙他实现匹配了,所以会很快返回后果
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
TransferStack#awaitFulfill
awaitFulfill 的作用是通过自旋、或者阻塞以后线程来期待节点被匹配。
3 个参数:
SNode s:期待匹配的节点。
booean timed:true 则示意限时期待。
long nanos:限时期待时长。
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
// 计算期待时长
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 获取以后线程
Thread w = Thread.currentThread();
// 计算自旋时长
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
// 自旋开始
for (;;) {
// 如果以后线程被中断,则 calcel 掉以后节点:将 s 的 match 指向本人
if (w.isInterrupted())
s.tryCancel();
// 自旋过程中实现匹配,则间接返回匹配节点
SNode m = s.match;
if (m != null)
return m;
// 如果是显示匹配并且匹配超时,则 cancel 掉 s 节点
if (timed) {nanos = deadline - System.nanoTime();
if (nanos <= 0L) {s.tryCancel();
continue;
}
}
// 自旋时长未到则持续自旋
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
// 实现自旋后记录以后线程
else if (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter
// 阻塞以后线程
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
TransferStack#clean
clean 办法在 transfer 办法中调用,如果以后节点在期待匹配的过程中曾经被 cancel 掉的话。
代码不贴出了,根本逻辑就是首先清掉 s 及 s 关联的线程(s=null,s.waiter=null), 而后追查并清理掉被 cancel 掉的 head 节点(从 head 到 s 之后的第一个未被 cancel 掉的节点一一查看),直到确保栈的 head 节点失常(未被 calcel)。
而后从 head 开始、到 s 之后的第一个未被 cancel 掉的节点一一查看,如果有节点被标记为 cancel 则该节点出栈。
执行实现之后,不止是 s 节点被清理,栈内从 head 节点开始直到 s 节点的下一个未被 cancel 掉的节点之间的节点,如果被 cancel 掉的话,全副会被清理出栈。
小结
基于 TransferStack 的 SynchronousQueue 的源码就剖析实现了,感觉不对照代码逐行阐明的话,就很不容易说分明 TransferStack 的 transfer、awaitFulfill 办法的代码逻辑,所以就采纳在源码中逐行正文的形式来阐明了。
篇幅起因,TransferQueue 下次再说!
Thanks a lot!
上一篇 Runable 和 Callable 的区别?你必须要搞清楚 Thread 以及 FutureTask!