共计 11259 个字符,预计需要花费 29 分钟才能阅读完成。
本文基于 JDK-8u261 源码剖析
1 简介
因为 CLH 队列中的线程,什么线程获取到锁,什么线程进入队列排队,什么线程开释锁,这些都是不受咱们管制的。所以条件队列的呈现为咱们提供了主动式地、只有满足指定的条件后能力线程阻塞和唤醒的形式。对于条件队列首先须要阐明一些概念:条件队列是 AQS 中除了 CLH 队列之外的另一种队列,每创立一个 Condition 实际上就是创立了一个条件队列,而每调用一次 await 办法实际上就是往条件队列中入队,每调用一次 signal 办法实际上就是往条件队列中出队。不像 CLH 队列上节点的状态有多个,条件队列上节点的状态只有一个:CONDITION。所以如果条件队列上一个节点不再是 CONDITION 状态时,就意味着这个节点该出队了。须要留神的是,条件队列只能运行在独占模式下。
个别在应用条件队列作为阻塞队列来应用时都会创立两个条件队列:notFull和notEmpty。notFull 示意当条件队列已满的时候,put 办法会处于期待状态,直到队列没满;notEmpty 示意当条件队列为空的时候,take 办法会处于期待状态,直到队列有数据了。
而 notFull.signal 办法和 notEmpty.signal 办法会将条件队列上的节点移到 CLH 队列中(每次只转移一个)。也就是说,存在一个节点从条件队列被转移到 CLH 队列的状况产生 。同时也意味着, 条件队列上不会产生锁资源竞争,所有的锁竞争都是产生在 CLH 队列上的。
其余一些条件队列和 CLH 队列之间的差别如下:
- 条件队列应用 nextWaiter 指针来指向下一个节点,是一个单向链表构造,不同于 CLH 队列的双向链表构造;
- 条件队列应用 firstWaiter 和 lastWaiter 来指向头尾指针,不同于 CLH 队列的 head 和 tail;
- 条件队列中的第一个节点也不会像 CLH 队列一样,是一个非凡的空节点;
- 不同于 CLH 队列中会用很多的 CAS 操作来管制并发,条件队列进队列的前提是曾经获取到了独占锁资源,所以很多中央不须要思考并发。
上面就是具体的源码剖析了。条件队列以 ArrayBlockingQueue 来举例:
2 结构器
1 /**
2 * ArrayBlockingQueue:
3 */
4 public ArrayBlockingQueue(int capacity) {5 this(capacity, false);
6}
7
8 public ArrayBlockingQueue(int capacity, boolean fair) {9 if (capacity <= 0)
10 throw new IllegalArgumentException();
11 // 寄存理论数据的数组
12 this.items = new Object[capacity];
13 // 独占锁应用 ReentrantLock 来实现(fair 示意的就是偏心锁还是非偏心锁,默认为非偏心锁)14 lock = new ReentrantLock(fair);
15 //notEmpty 条件队列
16 notEmpty = lock.newCondition();
17 //notFull 条件队列
18 notFull = lock.newCondition();
19 }
3 put 办法
1 /**
2 * ArrayBlockingQueue:
3 */
4 public void put(E e) throws InterruptedException {
5 // 非空校验
6 checkNotNull(e);
7 final ReentrantLock lock = this.lock;
8 /*
9 获取独占锁资源,响应中断模式。其实现代码和 lock 办法还有 Semaphore 的 acquire 办法是相似的
10 因为这里剖析的是条件队列,于是就不再剖析该办法的细节了
11 */
12 lock.lockInterruptibly();
13 try {14 while (count == items.length)
15 // 如果数组中数据曾经满了的话,就在 notFull 中入队一个新节点,并阻塞以后线程
16 notFull.await();
17 // 增加数组元素并唤醒 notEmpty
18 enqueue(e);
19 } finally {
20 // 开释锁资源
21 lock.unlock();
22 }
23 }
4 await 办法
如果在 put 的时候发现数组已满,或者在 take 的时候发现数组是空的,就会调用 await 办法来将以后节点放入条件队列中:
1 /**
2 * AbstractQueuedSynchronizer:
3 */
4 public final void await() throws InterruptedException {
5 // 如果以后线程被中断就抛出异样
6 if (Thread.interrupted())
7 throw new InterruptedException();
8 // 把以后节点退出到条件队列中
9 Node node = addConditionWaiter();
10 // 开释之前获取到的锁资源,因为后续会阻塞该线程,所以如果不开释的话,其余线程将会期待该线程被唤醒
11 int savedState = fullyRelease(node);
12 int interruptMode = 0;
13 // 如果以后节点不在 CLH 队列中则阻塞住,期待 unpark 唤醒
14 while (!isOnSyncQueue(node)) {15 LockSupport.park(this);
16 /*
17 这里被唤醒可能是失常的 signal 操作也可能是被中断了。但无论是哪种状况,都会将以后节点插入到 CLH 队列尾,18 并退出循环(留神,这里被唤醒除了下面两种状况之外,还有一种状况是操作系统级别的虚伪唤醒(spurious wakeup),19 也就是以后线程毫无理由就会被唤醒了,所以下面须要应用 while 来躲避掉这种状况)20 */
21 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
22 break;
23 }
24 // 走到这里阐明以后节点曾经插入到了 CLH 队列中(被 signal 所唤醒或者被中断)。而后在 CLH 队列中进行获取锁资源的操作
25 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
26 /*
27 <<<THROW_IE 和 REINTERRUPT 的解释详见 transferAfterCancelledWait 办法 >>>
28
29 之前剖析过的如果 acquireQueued 办法返回 true,阐明以后线程被中断了
30 返回 true 意味着在 acquireQueued 办法中此时会再一次被中断(留神,这意味着有两个代码点判断线程是否被中断:31 一个是在第 15 行代码处,另一个是在 acquireQueued 办法外面),如果之前没有被中断,则 interruptMode=0,32 而在 acquireQueued 办法外面线程被中断返回了,这个时候将 interruptMode 从新修改为 REINTERRUPT 即可
33 至于为什么不修改为 THROW_IE 是因为在这种状况下,第 15 行代码处曾经通过调用 signal 办法失常唤醒了,34 节点曾经放进了 CLH 队列中。而此时的中断是在 signal 操作之后,在第 25 行代码处去抢锁资源的时候产生的
35 这个时候中断不中断曾经无所谓了,所以就不须要抛出 InterruptedException
36 */
37 interruptMode = REINTERRUPT;
38 /*
39 走到这里阐明以后节点曾经获取到了锁资源(获取不到的话就会被再次阻塞在 acquireQueued 办法里)40 如果 interruptMode=REINTERRUPT 的话,阐明之前曾经调用过 signal 办法了,也就是说该节点曾经从条件队列中剔除掉了,41 nextWaiter 指针必定为空,所以在这种状况下是不须要执行 unlinkCancelledWaiters 办法的
42 而如果 interruptMode=THROW_IE 的话,阐明之前还没有调用过 signal 办法来从条件队列中剔除该节点。这个时候就须要调用
43 unlinkCancelledWaiters 办法来剔除这个节点了(在之前的 transferAfterCancelledWait 办法中
44 曾经把该节点的状态改为了初始状态 0),顺便把所有其余不是 CONDITION 状态的节点也一并剔除掉。留神:如果以后节点是条件队列中的
45 最初一个节点的话,并不会被清理。不妨,等到下次增加节点或调用 signal 办法的时候就会被清理了
46 */
47 if (node.nextWaiter != null)
48 unlinkCancelledWaiters();
49 // 依据不同模式解决中断(失常模式不须要解决)50 if (interruptMode != 0)
51 reportInterruptAfterWait(interruptMode);
52 }
5 addConditionWaiter 办法
在条件队列中增加一个节点的逻辑:
1 /**
2 * AbstractQueuedSynchronizer:
3 */
4 private Node addConditionWaiter() {
5 Node t = lastWaiter;
6 /*
7 如果最初一个节点不是 CONDITION 状态,就删除条件队列中所有不是 CONDITION 状态的节点
8 至于为什么只须要判断最初一个节点的状态就能晓得整个队列中是否有不是 CONDITION 的节点,前面会阐明
9 */
10 if (t != null && t.waitStatus != Node.CONDITION) {
11 // 删除所有不是 CONDITION 状态的节点
12 unlinkCancelledWaiters();
13 t = lastWaiter;
14 }
15 // 创立一个类型为 CONDITION 的新节点
16 Node node = new Node(Thread.currentThread(), Node.CONDITION);
17 if (t == null)
18 // t 为 null 意味着此时条件队列中为空,间接将头指针指向这个新节点即可
19 firstWaiter = node;
20 else
21 // t 不为 null 的话就阐明此时条件队列中有节点,间接在尾处退出这个新节点
22 t.nextWaiter = node;
23 // 尾指针指向这个新节点,增加节点结束
24 lastWaiter = node;
25 /*
26 留神,这里不必像 CLH 队列中的 enq 办法一样,如果插入失败就会自旋直到插入胜利为止
27 因为此时还没有开释独占锁
28 */
29 return node;
30 }
31
32 /**
33 * 第 12 行代码处:34 * 删除条件队列当中所有不是 CONDITION 状态的节点
35 */
36 private void unlinkCancelledWaiters() {
37 Node t = firstWaiter;
38 /*
39 在上面的每次循环中,trail 指向的是从头到循环的节点为止,最初一个是 CONDITION 状态的节点
40 这样做是因为要剔除队列两头不是 CONDITION 的节点,就须要保留上一个是 CONDITION 节点的指针,41 而后间接 trail.nextWaiter = next 就能够断开了
42 */
43 Node trail = null;
44 while (t != null) {
45 Node next = t.nextWaiter;
46 if (t.waitStatus != Node.CONDITION) {
47 t.nextWaiter = null;
48 if (trail == null)
49 firstWaiter = next;
50 else
51 trail.nextWaiter = next;
52 if (next == null)
53 lastWaiter = trail;
54 } else
55 trail = t;
56 t = next;
57 }
58 }
6 fullyRelease 办法
开释锁资源,包含可重入锁的所有锁资源:
1 /**
2 * AbstractQueuedSynchronizer:
3 */
4 final int fullyRelease(Node node) {
5 boolean failed = true;
6 try {7 int savedState = getState();
8 /*
9 开释锁资源。留神这里是开释所有的锁,包含可重入锁有屡次加锁的话,会一次性全副开释。因为在上一行
10 代码 savedState 存的是所有的锁资源,而这里就是开释这些所有的资源,这也就是办法名中“fully”的含意
11 */
12 if (release(savedState)) {
13 failed = false;
14 return savedState;
15 } else {
16 /*
17 开释失败就抛异样,也就是说没有开释洁净,可能是在并发的情景下 state 被批改了的起因,18 也可能是其余起因。留神如果在这里抛出异样了那么会走第 166 行代码
19 */
20 throw new IllegalMonitorStateException();
21 }
22 } finally {
23 /*
24 如果开释锁失败,就把节点置为 CANCELLED 状态。比拟精妙的一点是,在之前 addConditionWaiter 办法中的第 10 行代码处,25 判断条件队列中是否有不是 CONDITION 的节点时,只须要判断最初一个节点的状态是否是 CONDITION 就行了
26 按常理来说,是须要遍历整个队列能力晓得的。然而条件队列每次增加新节点都是插在尾处,而如果开释锁失败,27 会将这个新增加的、在队列尾巴的新节点置为 CANCELLED 状态。而之前的 CONDITION 节点必然都是在队头
28 因为如果此时再有新的节点入队的话,会首先在 addConditionWaiter 办法中的第 12 行代码处将所有不是 CONDITION 的节点都剔除了
29 也就是说无论什么状况下,如果队列中有不是 CONDITION 的节点,那它肯定在队尾,所以只须要判断它就能够了
30 */
31 if (failed)
32 node.waitStatus = Node.CANCELLED;
33 }
34 }
7 isOnSyncQueue 办法
判断节点是否在 CLH 队列中,以此来判断唤醒时 signal 办法是否实现。当然,在 transferAfterCancelledWait 办法中也会调用到本办法:
1 /**
2 * AbstractQueuedSynchronizer:
3 * 判断节点是否在 CLH 队列中
4 */
5 final boolean isOnSyncQueue(Node node) {
6 /*
7 如果以后节点的状态是 CONDITION 或者节点没有 prev 指针(prev 指针只在 CLH 队列中的节点有,8 尾插法保障 prev 指针肯定有)的话,就返回 false
9 */
10 if (node.waitStatus == Node.CONDITION || node.prev == null)
11 return false;
12 // 如果以后节点有 next 指针(next 指针只在 CLH 队列中的节点有,条件队列中的节点是 nextWaiter)的话,就返回 true
13 if (node.next != null)
14 return true;
15 // 如果下面无奈疾速判断的话,就只能从 CLH 队列中进行遍历,一个一个地去进行判断了
16 return findNodeFromTail(node);
17 }
18
19 /**
20 * 遍历判断以后节点是否在 CLH 队列其中
21 */
22 private boolean findNodeFromTail(Node node) {
23 Node t = tail;
24 for (; ;) {25 if (t == node)
26 return true;
27 if (t == null)
28 return false;
29 t = t.prev;
30 }
31 }
8 checkInterruptWhileWaiting 办法
判断唤醒时属于的状态(0 / THROW_IE / REINTERRUPT):
1 /**
2 * AbstractQueuedSynchronizer:
3 * 如果以后线程没有被中断过,则返回 0
4 * 如果以后线程被中断时没有被 signal 过,则返回 THROW_IE
5 * 如果以后线程被中断时曾经 signal 过了,则返回 REINTERRUPT
6 */
7 private int checkInterruptWhileWaiting(Node node) {8 return Thread.interrupted() ?
9 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
10 0;
11 }
12
13 /**
14 * 本办法是用来判断以后线程被中断时有没有产生过 signal,以此来辨别出 THROW_IE 和 REINTERRUPT。判断的根据是:15 * 如果产生过 signal,则以后节点的状态曾经不是 CONDITION 了,并且在 CLH 队列中也能找到该节点。详见 transferForSignal 办法
16 * <p>
17 * THROW_IE:示意在线程中断产生时还没有调用过 signal 办法,这个时候咱们将这个节点放进 CLH 队列中去抢资源,18 * 直到抢到锁资源后,再把这个节点从 CLH 队列和条件队列中都删除掉,最初再抛出 InterruptedException
19 * <p>
20 * REINTERRUPT:示意在线程中断产生时曾经调用过 signal 办法了,这个时候发不产生中断实际上曾经没有意义了,21 * 因为该节点此时曾经被放进到了 CLH 队列中。而且在 signal 办法中曾经将这个节点从条件队列中剔除掉了
22 * 此时咱们将这个节点放进 CLH 队列中去抢资源,直到抢到锁资源后(抢到资源的同时就会将这个节点从 CLH 队列中删除),23 * 再次中断以后线程即可,并不会抛出 InterruptedException
24 */
25 final boolean transferAfterCancelledWait(Node node) {
26 // 判断一下以后的节点状态是否是 CONDITION
27 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
28 /*
29 如果 CAS 胜利了就示意以后节点是 CONDITION 状态,此时就意味着 interruptMode 为 THROW_IE
30 而后会进行 CLH 队列入队,随后进行抢锁资源的操作
31 */
32 enq(node);
33 return true;
34 }
35 /*
36 如果 CAS 失败了的话就意味着以后节点曾经不是 CONDITION 状态了,阐明此时曾经调用过 signal 办法了,37 然而因为之前曾经开释锁资源了,signal 办法中的 transferForSignal 办法将节点状态改为 CONDITION
38 和将节点入 CLH 队列的这两个操作不是原子操作,所以可能存在并发的问题。也就是说可能会存在将节点状态改为 CONDITION 后,39 然而还没入 CLH 队列这个工夫点。上面的代码思考的就是这种场景。这个时候只须要一直让渡以后线程资源,40 期待 signal 办法将节点增加 CLH 队列结束后即可
41 */
42 while (!isOnSyncQueue(node))
43 Thread.yield();
44 return false;
45 }
9 reportInterruptAfterWait 办法
中断唤醒最初的解决:
1 /**
2 * AbstractQueuedSynchronizer:
3 */
4 private void reportInterruptAfterWait(int interruptMode)
5 throws InterruptedException {6 if (interruptMode == THROW_IE)
7 // 如果是 THROW_IE 最终就会抛出 InterruptedException 异样
8 throw new InterruptedException();
9 else if (interruptMode == REINTERRUPT)
10 // 如果是 REINTERRUPT 就仅仅是“中断”以后线程而已(只是设置中断标记位为 true)11 selfInterrupt();
12 }
10 enqueue 办法
ArrayBlockingQueue 的入队逻辑:
1 /**
2 * ArrayBlockingQueue:
3 */
4 private void enqueue(E x) {5 final Object[] items = this.items;
6 // 插入数据
7 items[putIndex] = x;
8 //putIndex 记录的是下次插入的地位。如果 putIndex 曾经是最初一个了,从新复位为 0,意味着数据可能会被笼罩
9 if (++putIndex == items.length)
10 putIndex = 0;
11 // 以后数组中的数量 +1
12 count++;
13 /*
14 如果 notEmpty 条件队列不为空的话,唤醒 notEmpty 条件队列中的第一个节点去 CLH 队列当中去排队抢资源
15 如果 notEmpty 里没有节点的话,阐明此时数组没空。signal 办法将不会有任何作用,因为此时没有阻塞住的 take 线程
16 */
17 notEmpty.signal();
18 }
11 signal 办法
查看是否须要唤醒条件队列中的节点,须要就进行唤醒(将节点从条件队列中转移到 CLH 队列中):
1 /**
2 * AbstractQueuedSynchronizer:
3 */
4 public final void signal() {
5 // 如果以后线程不是加锁时候的线程,就抛出异样
6 if (!isHeldExclusively())
7 throw new IllegalMonitorStateException();
8 Node first = firstWaiter;
9 if (first != null)
10 // 如果 notEmpty 条件队列中有节点的话,就告诉去 CLH 队列中排队抢资源
11 doSignal(first);
12 }
13
14 private void doSignal(Node first) {
15 do {16 if ((firstWaiter = first.nextWaiter) == null)
17 // 等于 null 意味着循环到此时条件队列曾经空了,那么把 lastWaiter 也置为 null
18 lastWaiter = null;
19 // 断开 notEmpty 条件队列中以后节点的 nextWaiter 指针,也就相当于剔除以后节点,期待 GC
20 first.nextWaiter = null;
21 } while (!transferForSignal(first) &&
22 // 如果以后节点曾经不是 CONDITION 状态的话(就阐明以后节点曾经生效了),就抉择下一个节点尝试放进 CLH 队列中
23 (first = firstWaiter) != null);
24 }
25
26 /**
27 * 将 notEmpty 条件队列中的节点从条件队列挪动到 CLH 队列当中
28 * 第 21 行代码处:29 */
30 final boolean transferForSignal(Node node) {
31 /*
32 如果 notEmpty 条件队列中的节点曾经不是 CONDITION 状态的时候,就间接返回 false,33 跳过该节点,相当于把该节点剔除出条件队列
34 */
35 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
36 return false;
37
38 // 走到这里阐明该节点的状态曾经被批改成了初始状态 0。把其退出到 CLH 队列尾部,并返回前一个节点
39 Node p = enq(node);
40 int ws = p.waitStatus;
41 /*
42 再来温习一下,SIGNAL 状态示意以后节点是阻塞状态的话,上一个节点就是 SIGNAL。notEmpty 条件队列中的
43 节点此时还是处于阻塞状态,所以此时将这个节点挪动到 CLH 队列后就须要将前一个节点的状态改为 SIGNAL
44 如果 CAS 批改失败了的话,就将这个节点所在的线程唤醒去竞争锁资源,终局必定是没抢到(因为锁资源是
45 以后线程所持有着),所以会在 acquireQueued 办法中持续被阻塞住的,而且在这其中会再次修改前一个节点
46 的 SIGNAL 状态(必然是要批改胜利的,如果批改不胜利,就会始终在 acquireQueued 办法中循环去 CAS 批改)47 当然如果前一个节点是 CANCELLED 状态的话,也去唤醒这个节点。这样 acquireQueued 办法中有机会去剔除掉
48 这些 CANCELLED 节点,相当于做了次清理工作
49 须要提一下的是,该处是唤醒被阻塞住的 take 线程(之前数组始终是空的,当初增加了一个节点
50 后数组就不为空了,所以须要唤醒之前被阻塞住的一个拿取线程。假如这个被唤醒的线程是线程 2,执行唤醒动作
51 的是线程 1)。如后面所说,线程 2 会进入到 acquireQueued 办法中再次被阻塞住。直到线程 1 走到 put 办法中的
52 最初一步 unlock 解锁的时候会被再次唤醒(也不肯定就是这次会被唤醒,也有可能唤醒的是其余的线程(如果说
53 是线程 3)。但只有线程 3 最初执行 unlock 办法的时候,就会持续去唤醒,相当于把这个唤醒的动作给传递上来了
54 那么线程 2 最终就会有机会被唤醒(等到它变成 CLH 队列中的第一个节点的时候))55 */
56 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
57 LockSupport.unpark(node.thread);
58 return true;
59 }
12 take 办法
ArrayBlockingQueue 的 take 办法:
1 /**
2 * ArrayBlockingQueue:
3 */
4 public E take() throws InterruptedException {
5 final ReentrantLock lock = this.lock;
6 // 响应中断模式下的加锁
7 lock.lockInterruptibly();
8 try {9 while (count == 0)
10 // 如果数组为空的话,就在 notEmpty 中入队一个新节点,并阻塞以后线程
11 notEmpty.await();
12 // 删除数组元素并唤醒 notFull
13 return dequeue();
14 } finally {
15 // 解锁
16 lock.unlock();
17 }
18 }
19
20 /**
21 * 第 13 行代码处:22 */
23 private E dequeue() {24 final Object[] items = this.items;
25 // 记录旧值并最终返回进来
26 @SuppressWarnings("unchecked")
27 E x = (E) items[takeIndex];
28 // 将数组元素清空
29 items[takeIndex] = null;
30 //takeIndex 记录的是下次拿取的地位。如果 takeIndex 曾经是最初一个了,从新复位为 0
31 if (++takeIndex == items.length)
32 takeIndex = 0;
33 // 以后数组中的数量 -1
34 count--;
35 //elementDequeued 办法在数组中移除数据时会被调用,以保障 Itrs 迭代器和队列数据的一致性
36 if (itrs != null)
37 itrs.elementDequeued();
38 /*
39 如果 notFull 条件队列不为空的话,唤醒 notFull 条件队列中的第一个节点去 CLH 队列当中去排队抢资源
40 如果 notFull 里没有节点的话,阐明此时数组没满。signal 办法将不会有任何作用,因为此时没有阻塞住的 put 线程
41 */
42 notFull.signal();
43 return x;
44 }
更多内容请关注微信公众号:奇客工夫