关于spring:AQS源码深入分析之条件队列你知道Java中的阻塞队列是如何实现的吗

本文基于JDK-8u261源码剖析


1 简介

因为CLH队列中的线程,什么线程获取到锁,什么线程进入队列排队,什么线程开释锁,这些都是不受咱们管制的。所以条件队列的呈现为咱们提供了主动式地、只有满足指定的条件后能力线程阻塞和唤醒的形式。对于条件队列首先须要阐明一些概念:条件队列是AQS中除了CLH队列之外的另一种队列,每创立一个Condition实际上就是创立了一个条件队列,而每调用一次await办法实际上就是往条件队列中入队,每调用一次signal办法实际上就是往条件队列中出队。不像CLH队列上节点的状态有多个,条件队列上节点的状态只有一个:CONDITION。所以如果条件队列上一个节点不再是CONDITION状态时,就意味着这个节点该出队了。须要留神的是,条件队列只能运行在独占模式下

个别在应用条件队列作为阻塞队列来应用时都会创立两个条件队列:notFullnotEmpty。notFull示意当条件队列已满的时候,put办法会处于期待状态,直到队列没满;notEmpty示意当条件队列为空的时候,take办法会处于期待状态,直到队列有数据了。

而notFull.signal办法和notEmpty.signal办法会将条件队列上的节点移到CLH队列中(每次只转移一个)。也就是说,存在一个节点从条件队列被转移到CLH队列的状况产生。同时也意味着,条件队列上不会产生锁资源竞争,所有的锁竞争都是产生在CLH队列上的

其余一些条件队列和CLH队列之间的差别如下:

  • 条件队列应用nextWaiter指针来指向下一个节点,是一个单向链表构造,不同于CLH队列的双向链表构造;
  • 条件队列应用firstWaiter和lastWaiter来指向头尾指针,不同于CLH队列的head和tail;
  • 条件队列中的第一个节点也不会像CLH队列一样,是一个非凡的空节点;
  • 不同于CLH队列中会用很多的CAS操作来管制并发,条件队列进队列的前提是曾经获取到了独占锁资源,所以很多中央不须要思考并发。

上面就是具体的源码剖析了。条件队列以ArrayBlockingQueue来举例:


2 结构器

 1  /**
 2   * ArrayBlockingQueue:
 3   */
 4  public ArrayBlockingQueue(int capacity) {
 5    this(capacity, false);
 6}
 7
 8  public ArrayBlockingQueue(int capacity, boolean fair) {
 9    if (capacity <= 0)
10        throw new IllegalArgumentException();
11    //寄存理论数据的数组
12    this.items = new Object[capacity];
13    //独占锁应用ReentrantLock来实现(fair示意的就是偏心锁还是非偏心锁,默认为非偏心锁)
14    lock = new ReentrantLock(fair);
15    //notEmpty条件队列
16    notEmpty = lock.newCondition();
17    //notFull条件队列
18    notFull = lock.newCondition();
19  }

3 put办法

  1  /**
  2   * ArrayBlockingQueue:
  3   */
  4  public void put(E e) throws InterruptedException {
  5    //非空校验
  6    checkNotNull(e);
  7    final ReentrantLock lock = this.lock;
  8    /*
  9    获取独占锁资源,响应中断模式。其实现代码和lock办法还有Semaphore的acquire办法是相似的
 10    因为这里剖析的是条件队列,于是就不再剖析该办法的细节了
 11     */
 12    lock.lockInterruptibly();
 13    try {
 14        while (count == items.length)
 15            //如果数组中数据曾经满了的话,就在notFull中入队一个新节点,并阻塞以后线程
 16            notFull.await();
 17        //增加数组元素并唤醒notEmpty
 18        enqueue(e);
 19    } finally {
 20        //开释锁资源
 21        lock.unlock();
 22    }
 23  }

4 await办法

如果在put的时候发现数组已满,或者在take的时候发现数组是空的,就会调用await办法来将以后节点放入条件队列中:

 1  /**
 2   * AbstractQueuedSynchronizer:
 3   */
 4  public final void await() throws InterruptedException {
 5    //如果以后线程被中断就抛出异样
 6    if (Thread.interrupted())
 7        throw new InterruptedException();
 8    //把以后节点退出到条件队列中
 9    Node node = addConditionWaiter();
10    //开释之前获取到的锁资源,因为后续会阻塞该线程,所以如果不开释的话,其余线程将会期待该线程被唤醒
11    int savedState = fullyRelease(node);
12    int interruptMode = 0;
13    //如果以后节点不在CLH队列中则阻塞住,期待unpark唤醒
14    while (!isOnSyncQueue(node)) {
15        LockSupport.park(this);
16        /*
17        这里被唤醒可能是失常的signal操作也可能是被中断了。但无论是哪种状况,都会将以后节点插入到CLH队列尾,
18        并退出循环(留神,这里被唤醒除了下面两种状况之外,还有一种状况是操作系统级别的虚伪唤醒(spurious wakeup),
19        也就是以后线程毫无理由就会被唤醒了,所以下面须要应用while来躲避掉这种状况)
20         */
21        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
22            break;
23    }
24    //走到这里阐明以后节点曾经插入到了CLH队列中(被signal所唤醒或者被中断)。而后在CLH队列中进行获取锁资源的操作
25    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
26        /*
27        <<<THROW_IE和REINTERRUPT的解释详见transferAfterCancelledWait办法>>>
28
29        之前剖析过的如果acquireQueued办法返回true,阐明以后线程被中断了
30        返回true意味着在acquireQueued办法中此时会再一次被中断(留神,这意味着有两个代码点判断线程是否被中断:
31        一个是在第15行代码处,另一个是在acquireQueued办法外面),如果之前没有被中断,则interruptMode=0,
32        而在acquireQueued办法外面线程被中断返回了,这个时候将interruptMode从新修改为REINTERRUPT即可
33        至于为什么不修改为THROW_IE是因为在这种状况下,第15行代码处曾经通过调用signal办法失常唤醒了,
34        节点曾经放进了CLH队列中。而此时的中断是在signal操作之后,在第25行代码处去抢锁资源的时候产生的
35        这个时候中断不中断曾经无所谓了,所以就不须要抛出InterruptedException
36         */
37        interruptMode = REINTERRUPT;
38    /*
39    走到这里阐明以后节点曾经获取到了锁资源(获取不到的话就会被再次阻塞在acquireQueued办法里)
40    如果interruptMode=REINTERRUPT的话,阐明之前曾经调用过signal办法了,也就是说该节点曾经从条件队列中剔除掉了,
41    nextWaiter指针必定为空,所以在这种状况下是不须要执行unlinkCancelledWaiters办法的
42    而如果interruptMode=THROW_IE的话,阐明之前还没有调用过signal办法来从条件队列中剔除该节点。这个时候就须要调用
43    unlinkCancelledWaiters办法来剔除这个节点了(在之前的transferAfterCancelledWait办法中
44    曾经把该节点的状态改为了初始状态0),顺便把所有其余不是CONDITION状态的节点也一并剔除掉。留神:如果以后节点是条件队列中的
45    最初一个节点的话,并不会被清理。不妨,等到下次增加节点或调用signal办法的时候就会被清理了
46     */
47    if (node.nextWaiter != null)
48        unlinkCancelledWaiters();
49    //依据不同模式解决中断(失常模式不须要解决)
50    if (interruptMode != 0)
51        reportInterruptAfterWait(interruptMode);
52  }

5 addConditionWaiter办法

在条件队列中增加一个节点的逻辑:

 1  /**
 2   * AbstractQueuedSynchronizer:
 3   */
 4  private Node addConditionWaiter() {
 5    Node t = lastWaiter;
 6    /*
 7    如果最初一个节点不是CONDITION状态,就删除条件队列中所有不是CONDITION状态的节点
 8    至于为什么只须要判断最初一个节点的状态就能晓得整个队列中是否有不是CONDITION的节点,前面会阐明
 9     */        
10    if (t != null && t.waitStatus != Node.CONDITION) {
11        //删除所有不是CONDITION状态的节点
12        unlinkCancelledWaiters();
13        t = lastWaiter;
14    }
15    //创立一个类型为CONDITION的新节点
16    Node node = new Node(Thread.currentThread(), Node.CONDITION);
17    if (t == null)
18        //t为null意味着此时条件队列中为空,间接将头指针指向这个新节点即可
19        firstWaiter = node;
20    else
21        //t不为null的话就阐明此时条件队列中有节点,间接在尾处退出这个新节点
22        t.nextWaiter = node;
23    //尾指针指向这个新节点,增加节点结束
24    lastWaiter = node;
25    /*
26    留神,这里不必像CLH队列中的enq办法一样,如果插入失败就会自旋直到插入胜利为止
27    因为此时还没有开释独占锁
28     */
29    return node;
30  }
31
32  /**
33   * 第12行代码处:
34   * 删除条件队列当中所有不是CONDITION状态的节点
35   */
36  private void unlinkCancelledWaiters() {
37    Node t = firstWaiter;
38    /*
39    在上面的每次循环中,trail指向的是从头到循环的节点为止,最初一个是CONDITION状态的节点
40    这样做是因为要剔除队列两头不是CONDITION的节点,就须要保留上一个是CONDITION节点的指针,
41    而后间接trail.nextWaiter = next就能够断开了
42     */
43    Node trail = null;
44    while (t != null) {
45        Node next = t.nextWaiter;
46        if (t.waitStatus != Node.CONDITION) {
47            t.nextWaiter = null;
48            if (trail == null)
49                firstWaiter = next;
50            else
51                trail.nextWaiter = next;
52            if (next == null)
53                lastWaiter = trail;
54        } else
55            trail = t;
56        t = next;
57    }
58  }

6 fullyRelease办法

开释锁资源,包含可重入锁的所有锁资源:

 1  /**
 2   * AbstractQueuedSynchronizer:
 3   */
 4  final int fullyRelease(Node node) {
 5    boolean failed = true;
 6    try {
 7        int savedState = getState();
 8        /*
 9        开释锁资源。留神这里是开释所有的锁,包含可重入锁有屡次加锁的话,会一次性全副开释。因为在上一行
10        代码savedState存的是所有的锁资源,而这里就是开释这些所有的资源,这也就是办法名中“fully”的含意
11         */
12        if (release(savedState)) {
13            failed = false;
14            return savedState;
15        } else {
16            /*
17            开释失败就抛异样,也就是说没有开释洁净,可能是在并发的情景下state被批改了的起因,
18            也可能是其余起因。留神如果在这里抛出异样了那么会走第166行代码
19             */
20            throw new IllegalMonitorStateException();
21        }
22    } finally {
23        /*
24        如果开释锁失败,就把节点置为CANCELLED状态。比拟精妙的一点是,在之前addConditionWaiter办法中的第10行代码处,
25        判断条件队列中是否有不是CONDITION的节点时,只须要判断最初一个节点的状态是否是CONDITION就行了
26        按常理来说,是须要遍历整个队列能力晓得的。然而条件队列每次增加新节点都是插在尾处,而如果开释锁失败,
27        会将这个新增加的、在队列尾巴的新节点置为CANCELLED状态。而之前的CONDITION节点必然都是在队头
28        因为如果此时再有新的节点入队的话,会首先在addConditionWaiter办法中的第12行代码处将所有不是CONDITION的节点都剔除了
29        也就是说无论什么状况下,如果队列中有不是CONDITION的节点,那它肯定在队尾,所以只须要判断它就能够了
30         */
31        if (failed)
32            node.waitStatus = Node.CANCELLED;
33    }
34  }

7 isOnSyncQueue办法

判断节点是否在CLH队列中,以此来判断唤醒时signal办法是否实现。当然,在transferAfterCancelledWait办法中也会调用到本办法:

 1  /**
 2   * AbstractQueuedSynchronizer:
 3   * 判断节点是否在CLH队列中
 4   */
 5  final boolean isOnSyncQueue(Node node) {
 6    /*
 7    如果以后节点的状态是CONDITION或者节点没有prev指针(prev指针只在CLH队列中的节点有,
 8    尾插法保障prev指针肯定有)的话,就返回false
 9     */
10    if (node.waitStatus == Node.CONDITION || node.prev == null)
11        return false;
12    //如果以后节点有next指针(next指针只在CLH队列中的节点有,条件队列中的节点是nextWaiter)的话,就返回true
13    if (node.next != null)
14        return true;
15    //如果下面无奈疾速判断的话,就只能从CLH队列中进行遍历,一个一个地去进行判断了
16    return findNodeFromTail(node);
17  }
18
19  /**
20   * 遍历判断以后节点是否在CLH队列其中
21   */
22  private boolean findNodeFromTail(Node node) {
23    Node t = tail;
24    for (; ; ) {
25        if (t == node)
26            return true;
27        if (t == null)
28            return false;
29        t = t.prev;
30    }
31  }

8 checkInterruptWhileWaiting办法

判断唤醒时属于的状态(0 / THROW_IE / REINTERRUPT):

 1  /**
 2   * AbstractQueuedSynchronizer:
 3   * 如果以后线程没有被中断过,则返回0
 4   * 如果以后线程被中断时没有被signal过,则返回THROW_IE
 5   * 如果以后线程被中断时曾经signal过了,则返回REINTERRUPT
 6   */
 7  private int checkInterruptWhileWaiting(Node node) {
 8    return Thread.interrupted() ?
 9            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
10            0;
11  }
12
13  /**
14   * 本办法是用来判断以后线程被中断时有没有产生过signal,以此来辨别出THROW_IE和REINTERRUPT。判断的根据是:
15   * 如果产生过signal,则以后节点的状态曾经不是CONDITION了,并且在CLH队列中也能找到该节点。详见transferForSignal办法
16   * <p>
17   * THROW_IE:示意在线程中断产生时还没有调用过signal办法,这个时候咱们将这个节点放进CLH队列中去抢资源,
18   * 直到抢到锁资源后,再把这个节点从CLH队列和条件队列中都删除掉,最初再抛出InterruptedException
19   * <p>
20   * REINTERRUPT:示意在线程中断产生时曾经调用过signal办法了,这个时候发不产生中断实际上曾经没有意义了,
21   * 因为该节点此时曾经被放进到了CLH队列中。而且在signal办法中曾经将这个节点从条件队列中剔除掉了
22   * 此时咱们将这个节点放进CLH队列中去抢资源,直到抢到锁资源后(抢到资源的同时就会将这个节点从CLH队列中删除),
23   * 再次中断以后线程即可,并不会抛出InterruptedException
24   */
25  final boolean transferAfterCancelledWait(Node node) {
26    //判断一下以后的节点状态是否是CONDITION
27    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
28        /*
29        如果CAS胜利了就示意以后节点是CONDITION状态,此时就意味着interruptMode为THROW_IE
30        而后会进行CLH队列入队,随后进行抢锁资源的操作
31         */
32        enq(node);
33        return true;
34    }
35    /*
36    如果CAS失败了的话就意味着以后节点曾经不是CONDITION状态了,阐明此时曾经调用过signal办法了,
37    然而因为之前曾经开释锁资源了,signal办法中的transferForSignal办法将节点状态改为CONDITION
38    和将节点入CLH队列的这两个操作不是原子操作,所以可能存在并发的问题。也就是说可能会存在将节点状态改为CONDITION后,
39    然而还没入CLH队列这个工夫点。上面的代码思考的就是这种场景。这个时候只须要一直让渡以后线程资源,
40    期待signal办法将节点增加CLH队列结束后即可
41     */
42    while (!isOnSyncQueue(node))
43        Thread.yield();
44    return false;
45  }

9 reportInterruptAfterWait办法

中断唤醒最初的解决:

 1  /**
 2   * AbstractQueuedSynchronizer:
 3   */
 4  private void reportInterruptAfterWait(int interruptMode)
 5        throws InterruptedException {
 6    if (interruptMode == THROW_IE)
 7        //如果是THROW_IE最终就会抛出InterruptedException异样
 8        throw new InterruptedException();
 9    else if (interruptMode == REINTERRUPT)
10        //如果是REINTERRUPT就仅仅是“中断”以后线程而已(只是设置中断标记位为true)
11        selfInterrupt();
12  }

10 enqueue办法

ArrayBlockingQueue的入队逻辑:

 1  /**
 2   * ArrayBlockingQueue:
 3   */
 4  private void enqueue(E x) {
 5    final Object[] items = this.items;
 6    //插入数据
 7    items[putIndex] = x;
 8    //putIndex记录的是下次插入的地位。如果putIndex曾经是最初一个了,从新复位为0,意味着数据可能会被笼罩
 9    if (++putIndex == items.length)
10        putIndex = 0;
11    //以后数组中的数量+1
12    count++;
13    /*
14    如果notEmpty条件队列不为空的话,唤醒notEmpty条件队列中的第一个节点去CLH队列当中去排队抢资源
15    如果notEmpty里没有节点的话,阐明此时数组没空。signal办法将不会有任何作用,因为此时没有阻塞住的take线程
16     */
17    notEmpty.signal();
18  }

11 signal办法

查看是否须要唤醒条件队列中的节点,须要就进行唤醒(将节点从条件队列中转移到CLH队列中):

 1  /**
 2   * AbstractQueuedSynchronizer:
 3   */
 4  public final void signal() {
 5    //如果以后线程不是加锁时候的线程,就抛出异样
 6    if (!isHeldExclusively())
 7        throw new IllegalMonitorStateException();
 8    Node first = firstWaiter;
 9    if (first != null)
10        //如果notEmpty条件队列中有节点的话,就告诉去CLH队列中排队抢资源
11        doSignal(first);
12  }
13
14  private void doSignal(Node first) {
15    do {
16        if ((firstWaiter = first.nextWaiter) == null)
17            //等于null意味着循环到此时条件队列曾经空了,那么把lastWaiter也置为null
18            lastWaiter = null;
19        //断开notEmpty条件队列中以后节点的nextWaiter指针,也就相当于剔除以后节点,期待GC
20        first.nextWaiter = null;
21    } while (!transferForSignal(first) &&
22            //如果以后节点曾经不是CONDITION状态的话(就阐明以后节点曾经生效了),就抉择下一个节点尝试放进CLH队列中
23            (first = firstWaiter) != null);
24  }
25
26  /**
27   * 将notEmpty条件队列中的节点从条件队列挪动到CLH队列当中
28   * 第21行代码处:
29   */
30  final boolean transferForSignal(Node node) {
31    /*
32    如果notEmpty条件队列中的节点曾经不是CONDITION状态的时候,就间接返回false,
33    跳过该节点,相当于把该节点剔除出条件队列
34     */
35    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
36        return false;
37
38    //走到这里阐明该节点的状态曾经被批改成了初始状态0。把其退出到CLH队列尾部,并返回前一个节点
39    Node p = enq(node);
40    int ws = p.waitStatus;
41    /*
42    再来温习一下,SIGNAL状态示意以后节点是阻塞状态的话,上一个节点就是SIGNAL。notEmpty条件队列中的
43    节点此时还是处于阻塞状态,所以此时将这个节点挪动到CLH队列后就须要将前一个节点的状态改为SIGNAL
44    如果CAS批改失败了的话,就将这个节点所在的线程唤醒去竞争锁资源,终局必定是没抢到(因为锁资源是
45    以后线程所持有着),所以会在acquireQueued办法中持续被阻塞住的,而且在这其中会再次修改前一个节点
46    的SIGNAL状态(必然是要批改胜利的,如果批改不胜利,就会始终在acquireQueued办法中循环去CAS批改)
47    当然如果前一个节点是CANCELLED状态的话,也去唤醒这个节点。这样acquireQueued办法中有机会去剔除掉
48    这些CANCELLED节点,相当于做了次清理工作
49    须要提一下的是,该处是唤醒被阻塞住的take线程(之前数组始终是空的,当初增加了一个节点
50    后数组就不为空了,所以须要唤醒之前被阻塞住的一个拿取线程。假如这个被唤醒的线程是线程2,执行唤醒动作
51    的是线程1)。如后面所说,线程2会进入到acquireQueued办法中再次被阻塞住。直到线程1走到put办法中的
52    最初一步unlock解锁的时候会被再次唤醒(也不肯定就是这次会被唤醒,也有可能唤醒的是其余的线程(如果说
53    是线程3)。但只有线程3最初执行unlock办法的时候,就会持续去唤醒,相当于把这个唤醒的动作给传递上来了
54    那么线程2最终就会有机会被唤醒(等到它变成CLH队列中的第一个节点的时候))
55     */
56    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
57        LockSupport.unpark(node.thread);
58    return true;
59  }

12 take办法

ArrayBlockingQueue的take办法:

 1  /**
 2   * ArrayBlockingQueue:
 3   */
 4  public E take() throws InterruptedException {
 5    final ReentrantLock lock = this.lock;
 6    //响应中断模式下的加锁
 7    lock.lockInterruptibly();
 8    try {
 9        while (count == 0)
10            //如果数组为空的话,就在notEmpty中入队一个新节点,并阻塞以后线程
11            notEmpty.await();
12        //删除数组元素并唤醒notFull
13        return dequeue();
14    } finally {
15        //解锁
16        lock.unlock();
17    }
18  }
19
20  /**
21   * 第13行代码处:
22   */
23  private E dequeue() {
24    final Object[] items = this.items;
25    //记录旧值并最终返回进来
26    @SuppressWarnings("unchecked")
27    E x = (E) items[takeIndex];
28    //将数组元素清空
29    items[takeIndex] = null;
30    //takeIndex记录的是下次拿取的地位。如果takeIndex曾经是最初一个了,从新复位为0
31    if (++takeIndex == items.length)
32        takeIndex = 0;
33    //以后数组中的数量-1
34    count--;
35    //elementDequeued办法在数组中移除数据时会被调用,以保障Itrs迭代器和队列数据的一致性
36    if (itrs != null)
37        itrs.elementDequeued();
38    /*
39    如果notFull条件队列不为空的话,唤醒notFull条件队列中的第一个节点去CLH队列当中去排队抢资源
40    如果notFull里没有节点的话,阐明此时数组没满。signal办法将不会有任何作用,因为此时没有阻塞住的put线程
41     */
42    notFull.signal();
43    return x;
44  }

更多内容请关注微信公众号:奇客工夫

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理