乐趣区

关于java:这才是图文并茂我写了1万多字就是为了让你了解AQS是怎么运行的

前言

如果你想深入研究 Java 并发的话,那么 AQS 肯定是绕不开的一块知识点,Java 并发包很多的同步工具类底层都是基于 AQS 来实现的,比方咱们工作中常常用的 Lock 工具 ReentrantLock、栅栏 CountDownLatch、信号量 Semaphore 等,而且对于 AQS 的知识点也是面试中常常考查的内容,所以,无论是为了更好的应用还是为了应酬面试,深刻学习 AQS 都很有必要。

CAS

学习 AQS 之前,咱们有必要理解一个知识点,就是 AQS 底层中大量应用的 CAS,对于 CAS,大家应该都不生疏,如果还有哪位同学不分明的话,能够看看我之前的文章《面试必问系列:乐观锁和乐观锁的那些事儿》,这里不多复述,哈哈,给本人旧文章加了浏览量

AQS 简介

本文配角正式退场。

AQS,全名 AbstractQueuedSynchronizer,是一个抽象类的队列式同步器,它的外部通过保护一个状态 volatile int state(共享资源),一个 FIFO 线程期待队列来实现同步性能。

state 用关键字 volatile 润饰,代表着该共享资源的状态一更改就能被所有线程可见,而 AQS 的加锁形式实质上就是多个线程在竞争 state,当 state 为 0 时代表线程能够竞争锁,不为 0 时代表以后对象锁曾经被占有,其余线程来加锁时则会失败,加锁失败的线程会被放入一个 FIFO 的期待队列中,这些线程会被 UNSAFE.park()操作挂起,期待其余获取锁的线程开释锁才可能被唤醒。

而这个期待队列其实就相当于一个 CLH 队列,用一张原理图来示意大抵如下:

根底定义

AQS 反对两种资源分享的形式:Exclusive(独占,只有一个线程能执行,如 ReentrantLock)和 Share(共享,多个线程可同时执行,如 Semaphore/CountDownLatch)。

自定义的同步器继承 AQS 后,只须要实现共享资源 state 的获取和开释形式即可,其余如线程队列的保护(如获取资源失败入队 / 唤醒出队等)等操作,AQS 在顶层曾经实现了,

AQS 代码外部提供了一系列操作锁和线程队列的办法,次要操作锁的办法蕴含以下几个:

  • compareAndSetState():利用 CAS 的操作来设置 state 的值
  • tryAcquire(int):独占形式获取锁。胜利则返回 true,失败则返回 false。
  • tryRelease(int):独占形式开释锁。胜利则返回 true,失败则返回 false。
  • tryAcquireShared(int):共享形式开释锁。正数示意失败;0 示意胜利,但没有残余可用资源;负数示意胜利,且有残余资源。
  • tryReleaseShared(int):共享形式开释锁。如果开释后容许唤醒后续期待结点返回 true,否则返回 false。

像 ReentrantLock 就是实现了自定义的 tryAcquire-tryRelease,从而操作 state 的值来实现同步成果。

除此之外,AQS 外部还定义了一个动态类 Node,示意 CLH 队列的每一个结点,该结点的作用是对每一个期待获取资源做了封装,蕴含了须要同步的线程自身、线程期待状态 …..

咱们能够看下该类的一些重点变量:

static final class Node {

    /** 示意共享模式下期待的 Node */
    static final Node SHARED = new Node();
    /** 示意独占模式下期待的 mode */
    static final Node EXCLUSIVE = null;

    /** 上面几个为 waitStatus 的具体值 */
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    volatile int waitStatus;
    
     /** 示意后面的结点 */
    volatile Node prev;
     /** 示意前面的结点 */
    volatile Node next;
     /** 以后结点装载的线程,初始化时被创立,应用后会置空 */
    volatile Thread thread;
     /** 链接到下一个节点的期待条件,用到 Condition 的时候会应用到 */
    Node nextWaiter;

} 

代码外面定义了一个示意以后 Node 结点期待状态的字段 waitStatus,该字段的取值蕴含了 CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)、0,这五个值代表了不同的特定场景:

  • CANCELLED:示意以后结点已勾销调度。当 timeout 或被中断(响应中断的状况下),会触发变更为此状态,进入该状态后的结点将不会再变动。
  • SIGNAL:示意后继结点在期待以后结点唤醒。后继结点入队时,会将前继结点的状态更新为 SIGNAL(记住这个 - 1 的值,因为前面咱们讲的时候常常会提到)
  • CONDITION:示意结点期待在 Condition 上,当其余线程调用了 Condition 的 signal()办法后,CONDITION 状态的结点将从期待队列转移到同步队列中,期待获取同步锁。(注:Condition 是 AQS 的一个组件,前面会细说)
  • PROPAGATE:共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
  • 0:新结点入队时的默认状态。

也就是说,当 waitStatus 为负值示意结点处于无效期待状态,为正值的时候示意结点已被勾销。

在 AQS 外部中还保护了两个 Node 对象 head 和 tail,一开始默认都为 null

private transient volatile Node head;
private transient volatile Node tail; 

讲完了 AQS 的一些根底定义,咱们就能够开始学习同步的具体运行机制了,为了更好的演示,咱们用 ReentrantLock 作为应用入口,一步步跟进源码探索 AQS 底层是如何运作的,这里阐明一下,因为 ReentrantLock 底层调用的 AQS 是独占模式,所以下文解说的 AQS 源码也是针对独占模式的操作

好了,热身正式完结,来吧。

独占模式


加锁过程

咱们都晓得,ReentrantLock 的加锁和解锁办法别离为 lock()和 unLock(),咱们先来看获取锁的办法,

final void lock() {if (compareAndSetState(0, 1))
  setExclusiveOwnerThread(Thread.currentThread());
 else
  acquire(1);
} 

逻辑很简略,线程进来后间接利用 CAS 尝试抢占锁,如果抢占胜利 state 值回被改为 1,且设置对象独占锁线程为以后线程,否则就调用 acquire(1)再次尝试获取锁。

咱们假设有两个线程 A 和 B 同时竞争锁,A 进来先抢占到锁,此时的 AQS 模型图就相似这样:

持续走上面的办法,

public final void acquire(int arg) {if (!tryAcquire(arg) &&
  acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  selfInterrupt();} 

acquire 蕴含了几个函数的调用,

tryAcquire:尝试间接获取锁,如果胜利就间接返回;

addWaiter:将该线程退出期待队列 FIFO 的尾部,并标记为独占模式;

acquireQueued:线程阻塞在期待队列中获取锁,始终获取到资源后才返回。如果在整个期待过程中被中断过,则返回 true,否则返回 false。

selfInterrupt:自我中断,就是既拿不到锁,又在期待时被中断了,线程就会进行自我中断 selfInterrupt(),将中断补上。

咱们一个个来看源码,并联合下面的两个线程来做场景剖析。

tryAcquire

不必多说,就是为了再次尝试获取锁

protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
 int c = getState();
 if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
   return true;
  }
 }
 else if (current == getExclusiveOwnerThread()) {
  int nextc = c + acquires;
  if (nextc < 0) // overflow
   throw new Error("Maximum lock count exceeded");
  setState(nextc);
  return true;
 }
 return false;
} 

当线程 B 进来后,nonfairTryAcquire 办法首先会获取 state 的值,如果为 0,则失常获取该锁,不为 0 的话判断是否是以后线程占用了,是的话就累加 state 的值,这里的累加也是为了配合开释锁时候的次数,从而实现可重入锁的成果。

当然,因为之前锁曾经被线程 A 霸占了,所以这时候 tryAcquire 会返回 false,持续上面的流程。

addWaiter

private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);
 // Try the fast path of enq; backup to full enq on failure
 Node pred = tail;
 if (pred != null) {
  node.prev = pred;
  if (compareAndSetTail(pred, node)) {
   pred.next = node;
   return node;
  }
 }
 enq(node);
 return node;
} 

这段代码首先会创立一个和以后线程绑定的 Node 节点,Node 为双向链表。此时期待队列中的 tail 指针为空,间接调用 enq(node)办法将以后线程退出期待队列尾部,而后返回以后结点的前驱结点,

private Node enq(final Node node) {
 // CAS"自旋",直到胜利退出队尾
    for (;;) {
        Node t = tail;
        if (t == null) {
         // 队列为空,初始化一个 Node 结点作为 Head 结点,并将 tail 结点也指向它
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
         // 把以后结点插入队列尾部
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
} 

第一遍循环时,tail 指针为空,初始化一个 Node 结点,并把 head 和 tail 结点都指向它,而后第二次循环进来之后,tail 结点不为空了,就将以后的结点退出到 tail 结点前面,也就是这样:

todo 如果此时有另一个线程 C 进来的话,发现锁曾经被 A 拿走了,而后队列里曾经有了线程 B,那么线程 C 就只能乖乖排到线程 B 的前面去,

acquireQueued

接着解读办法,通过 tryAcquire()和 addWaiter(),咱们的线程还是没有拿到资源,并且还被排到了队列的尾部,如果让你来设计的话,这个时候你会怎么解决线程呢?其实答案也很简略,能做的事无非两个:

1、循环让线程再抢资源。但认真一斟酌就晓得不合理,因为如果有多个线程都参加的话,你抢我也抢只会升高零碎性能

2、进入期待状态劳动,直到其余线程彻底开释资源后唤醒本人,本人再拿到资源

毫无疑问,抉择 2 更加靠谱,acquireQueued 办法做的也是这样的解决:

final boolean acquireQueued(final Node node, int arg) {
 boolean failed = true;
 try {
  // 标记是否会被中断
  boolean interrupted = false;
  // CAS 自旋
  for (;;) {
   // 获取以后结点的前结点
   final Node p = node.predecessor();
   if (p == head && tryAcquire(arg)) {setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
   }
   if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
  }
 } finally {if (failed)
   // 获取锁失败,则将此线程对应的 node 的 waitStatus 改为 CANCEL
   cancelAcquire(node);
 }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 int ws = pred.waitStatus;
 if (ws == Node.SIGNAL)
  // 前驱结点期待状态为 "SIGNAL",那么本人就能够安心期待被唤醒了
  return true;
 if (ws > 0) {
  /*
   * 前驱结点被勾销了,通过循环始终往前找,直到找到期待状态无效的结点(期待状态值小于等于 0),* 而后排在他们的后边,至于那些被以后 Node 强制 "靠后" 的结点,因为曾经被勾销了,也没有援用链,* 就等着被 GC 了
   */
  do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
  pred.next = node;
 } else {
  // 如果前驱失常,那就把前驱的状态设置成 SIGNAL
  compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
 }
 return false;
}
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);
 return Thread.interrupted();} 

acquireQueued 办法的流程是这样的:

1、CAS 自旋,先判断以后传入的 Node 的前结点是否为 head 结点,是的话就尝试获取锁,获取锁胜利的话就把以后结点置为 head,之前的 head 置为 null(不便 GC),而后返回

2、如果前驱结点不是 head 或者加锁失败的话,就调用 shouldParkAfterFailedAcquire,将前驱节点的 waitStatus 变为了 SIGNAL=-1,最初执行 parkAndChecknIterrupt 办法,调用 LockSupport.park()挂起以后线程,parkAndCheckInterrupt 在挂起线程后会判断线程是否被中断,如果被中断的话,就会从新跑 acquireQueued 办法的 CAS 自旋操作,直到获取资源。

ps:LockSupport.park 办法会让以后线程进入 waitting 状态,在这种状态下,线程被唤醒的状况有两种,一是被 unpark(),二是被 interrupt(),所以,如果是第二种状况的话,须要返回被中断的标记,而后在 acquire 顶层办法的窗口那里自我中断补上

此时,因为线程 A 还未开释锁,所以线程 B 状态都是被挂起的,

到这里,加锁的流程就剖析完了,其实整体来说也并不简单,而且当你了解了独占模式加锁的过程,前面开释锁和共享模式的运行机制也没什么难懂的了,所以整个加锁的过程还是有必要多消化下的,也是 AQS 的重中之重。

为了不便你们更加清晰了解,我加多一张流程图吧(这个作者也太暖了吧,哈哈)

开释锁

说完了加锁,咱们来看看开释锁是怎么做的,AQS 中开释锁的办法是 release(),当调用该办法时会开释指定量的资源 (也就是锁),如果彻底开释了(即 state=0), 它会唤醒期待队列里的其余线程来获取资源。

还是一步步看源码吧,

public final boolean release(int arg) {if (tryRelease(arg)) {
  Node h = head;
  if (h != null && h.waitStatus != 0)
   unparkSuccessor(h);
  return true;
 }
 return false;
} 

tryRelease

代码上能够看出,外围的逻辑都在 tryRelease 办法中,该办法的作用是开释资源,AQS 里该办法没有具体的实现,须要由自定义的同步器去实现,咱们看下 ReentrantLock 代码中对应办法的源码:

protected final boolean tryRelease(int releases) {int c = getState() - releases;
     if (Thread.currentThread() != getExclusiveOwnerThread())
      throw new IllegalMonitorStateException();
     boolean free = false;
     if (c == 0) {
      free = true;
      setExclusiveOwnerThread(null);
     }
     setState(c);
     return free;
} 

tryRelease 办法会减去 state 对应的值,如果 state 为 0,也就是曾经彻底开释资源,就返回 true,并且把独占的线程置为 null,否则返回 false。

此时 AQS 中的数据就会变成这样:

齐全开释资源后,以后线程要做的就是唤醒 CLH 队列中第一个在期待资源的线程,也就是 head 结点前面的线程,此时调用的办法是 unparkSuccessor(),

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
     // 将 head 结点的状态置为 0
        compareAndSetWaitStatus(node, ws, 0);
 // 找到下一个须要唤醒的结点 s
    Node s = node.next;
    // 如果为空或已勾销
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从后向前,直到找到期待状态小于 0 的结点,后面说了,结点 waitStatus 小于 0 时才无效
        for (Node t = tail; t != null && t != node; t = t.prev) 
            if (t.waitStatus <= 0)
                s = t;
    }
    // 找到无效的结点,间接唤醒
    if (s != null)
        LockSupport.unpark(s.thread);// 唤醒
} 

办法的逻辑很简略,就是先将 head 的结点状态置为 0,防止上面找结点的时候再找到 head,而后找到队列中最后面的无效结点,而后唤醒,咱们假如这个时候线程 A 曾经开释锁,那么此时队列中排最前边竞争锁的线程 B 就会被唤醒,

而后被唤醒的线程 B 就会尝试用 CAS 获取锁,回到 acquireQueued 办法的逻辑,

for (;;) {
     // 获取以后结点的前结点
     final Node p = node.predecessor();
     if (p == head && tryAcquire(arg)) {setHead(node);
      p.next = null; // help GC
      failed = false;
      return interrupted;
     }
     if (shouldParkAfterFailedAcquire(p, node) &&
      parkAndCheckInterrupt())
      interrupted = true;
} 

当线程 B 获取锁之后,会把以后结点赋值给 head,而后原先的前驱结点 (也就是原来的 head 结点) 去掉援用链,不便回收,这样一来,线程 B 获取锁的整个过程就实现了,此时 AQS 的数据就会变成这样:

到这里,咱们曾经剖析完了 AQS 独占模式下加锁和开释锁的过程,也就是 tryAccquire->tryRelease 这一链条的逻辑,除此之外,AQS 中还反对共享模式的同步,这种模式下对于锁的操作外围其实就是 tryAcquireShared->tryReleaseShared 这两个办法,咱们能够简略看下

共享模式


获取锁

AQS 中,共享模式获取锁的顶层入口办法是 acquireShared,该办法会获取指定数量的资源,胜利的话就间接返回,失败的话就进入期待队列,直到获取资源,

public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)
      doAcquireShared(arg);
} 

该办法里蕴含了两个办法的调用,

tryAcquireShared:尝试获取肯定资源的锁,返回的值代表获取锁的状态。

doAcquireShared:进入期待队列,并循环尝试获取锁,直到胜利。

tryAcquireShared

tryAcquireShared 在 AQS 里没有实现,同样由自定义的同步器去实现具体的逻辑,像一些较为常见的并发工具 Semaphore、CountDownLatch 里就有对该办法的自定义实现,尽管实现的逻辑不同,但办法的作用是一样的,就是获取肯定资源的资源,而后依据返回值判断是否还有残余资源,从而决定下一步的操作。

返回值有三种定义:

  • 负值代表获取失败;
  • 0 代表获取胜利,但没有残余的资源,也就是 state 曾经为 0;
  • 正值代表获取胜利,而且 state 还有残余,其余线程能够持续支付

当返回值小于 0 时,证实此次获取肯定数量的锁失败了,而后就会走 doAcquireShared 办法

doAcquireShared

此办法的作用是将以后线程退出期待队列尾部劳动,直到其余线程开释资源唤醒本人,本人胜利拿到相应量的资源后才返回,这是它的源码:

private void doAcquireShared(int arg) {
 // 退出队列尾部
 final Node node = addWaiter(Node.SHARED);
 boolean failed = true;
 try {
  boolean interrupted = false;
  // CAS 自旋
  for (;;) {final Node p = node.predecessor();
   // 判断前驱结点是否是 head
   if (p == head) {
    // 尝试获取肯定数量的锁
    int r = tryAcquireShared(arg);
    if (r >= 0) {
     // 获取锁胜利,而且还有残余资源,就设置以后结点为 head,并持续唤醒下一个线程
     setHeadAndPropagate(node, r);
     // 让前驱结点去掉援用链,不便被 GC
     p.next = null; // help GC
     if (interrupted)
      selfInterrupt();
     failed = false;
     return;
    }
   }
   // 跟独占模式一样,改前驱结点 waitStatus 为 -1,并且以后线程挂起,期待被唤醒
   if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
  }
 } finally {if (failed)
   cancelAcquire(node);
 }
}

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    // head 指向本人
    setHead(node);
     // 如果还有残余量,持续唤醒下一个街坊线程
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();}
} 

看到这里,你会不会一点相熟的感觉,这个办法的逻辑怎么跟下面那个 acquireQueued() 那么相似啊?对的,其实两个流程并没有太大的差异。只是 doAcquireShared()比起独占模式下的获取锁上多了一步唤醒后继线程的操作,当获取完肯定的资源后,发现还有残余的资源,就持续唤醒下一个街坊线程,这才合乎 ” 共享 ” 的思维嘛。

这里咱们能够提出一个疑难,共享模式下,以后线程开释了肯定数量的资源,但这部分资源满足不了下一个期待结点的需要的话,那么会怎么样?

依照失常的思维,共享模式是能够多个线程同时执行的才对,所以,多个线程的状况下,如果老大开释完资源,但这部分资源满足不了老二,但能满足老三,那么老三就能够拿到资源。可事实是,从源码设计中能够看出,如果真的产生了这种状况,老三是拿不到资源的,因为期待队列是按顺序排列的,老二的资源需求量大,会把前面量小的老三以及老四、老五等都给卡住。从这一个角度来看,尽管 AQS 严格保障了程序,但也升高了并发能力

接着往下说吧,唤醒下一个街坊线程的逻辑在 doReleaseShared()中,咱们放到上面的开释锁来解析。

开释锁

共享模式开释锁的顶层办法是 releaseShared,它会开释指定量的资源,如果胜利开释且容许唤醒期待线程,它会唤醒期待队列里的其余线程来获取资源。上面是 releaseShared()的源码:

public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();
      return true;
     }
     return false;
} 

该办法同样蕴含两局部的逻辑:

tryReleaseShared:开释资源。

doAcquireShared:唤醒后继结点。

跟 tryAcquireShared 办法一样,tryReleaseShared 在 AQS 中没有具体的实现,由子同步器本人去定义,但性能都一样,就是开释肯定数量的资源。

开释完资源后,线程不会马上就出工,而是唤醒期待队列里最前排的期待结点。

doAcquireShared

唤醒后继结点的工作在 doReleaseShared()办法中实现,咱们能够看下它的源码:

private void doReleaseShared() {for (;;) {
      // 获取期待队列中的 head 结点
      Node h = head;
      if (h != null && h != tail) {
       int ws = h.waitStatus;
       // head 结点 waitStatus = -1, 唤醒下一个结点对应的线程
       if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
         continue;            // loop to recheck cases
        // 唤醒后继结点
        unparkSuccessor(h);
       }
       else if (ws == 0 &&
          !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
        continue;                // loop on failed CAS
      }
      if (h == head)                   // loop if head changed
       break;
     }
} 

代码没什么特地的,就是如果期待队列 head 结点的 waitStatus 为 - 1 的话,就间接唤醒后继结点,唤醒的办法 unparkSuccessor()在下面曾经讲过了,这里也没必要再复述。

总的来看,AQS 共享模式的运作流程和独占模式很类似,只有把握了独占模式的流程运行,共享模式什么的不就那样吗,没难度。这也是我为什么共享模式解说中不画流程图的起因,没必要嘛。

Condition

介绍完了 AQS 的外围性能,咱们再扩大一个知识点,在 AQS 中,除了提供独占 / 共享模式的加锁 / 解锁性能,它还对外提供了对于 Condition 的一些操作方法。

Condition 是个接口,在 jdk1.5 版本后设计的,根本的办法就是 await()和 signal()办法,性能大略就对应 Object 的 wait()和 notify(),Condition 必须要配合锁一起应用,因为对共享状态变量的拜访产生在多线程环境下。一个 Condition 的实例必须与一个 Lock 绑定,因而 Condition 个别都是作为 Lock 的外部实现,AQS 中就定义了一个类 ConditionObject 来实现了这个接口,

那么它应该怎么用呢?咱们能够简略写个 demo 来看下成果

public class ConditionDemo {public static void main(String[] args) {ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        Thread tA = new Thread(() -> {lock.lock();
            try {System.out.println("线程 A 加锁胜利");
                System.out.println("线程 A 执行 await 被挂起");
                condition.await();
                System.out.println("线程 A 被唤醒胜利");
            } catch (InterruptedException e) {e.printStackTrace();
            } finally {lock.unlock();
                System.out.println("线程 A 开释锁胜利");
            }
        });

        Thread tB = new Thread(() -> {lock.lock();
            try {System.out.println("线程 B 加锁胜利");
                condition.signal();
                System.out.println("线程 B 唤醒线程 A");
            } finally {lock.unlock();
                System.out.println("线程 B 开释锁胜利");
            }
        });
        tA.start();
        tB.start();}
} 

执行 main 函数后后果输入为:

线程 A 加锁胜利
线程 A 执行 await 被挂起
线程 B 加锁胜利
线程 B 唤醒线程 A
线程 B 开释锁胜利
线程 A 被唤醒胜利
线程 A 开释锁胜利

代码执行的后果很容易了解,线程 A 先获取锁,而后调用 await()办法挂起以后线程并开释锁,线程 B 这时候拿到锁,而后调用 signal 唤醒线程 A。

毫无疑问,这两个办法让线程的状态产生了变动,咱们认真来钻研一下,

翻看 AQS 的源码,咱们会发现 Condition 中定义了两个属性 firstWaiter 和 lastWaiter,后面说了,AQS 中蕴含了一个 FIFO 的 CLH 期待队列,每个 Conditon 对象就蕴含这样一个期待队列,而这两个属性别离示意的是期待队列中的首尾结点,

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter; 

留神:Condition 当中的期待队列和 AQS 主体的同步期待队列是离开的,两个队列尽管构造体雷同,然而作用域是离开的

await

先看 await()的源码:

public final void await() throws InterruptedException {if (Thread.interrupted())
        throw new InterruptedException();
    // 将以后线程退出到期待队列中
    Node node = addConditionWaiter();
    // 齐全开释占有的资源,并返回资源数
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 循环判断以后结点是不是在 Condition 的队列中,是的话挂起
    while (!isOnSyncQueue(node)) {LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
} 

当一个线程调用 Condition.await()办法,将会以以后线程结构结点,这个结点的 waitStatus 赋值为 Node.CONDITION,也就是 -2,并将结点从尾部退出期待队列,而后尾部结点就会指向这个新增的结点,

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
} 

咱们仍然用下面的 demo 来演示,此时,线程 A 获取锁并调用 Condition.await()办法后,AQS 外部的数据结构会变成这样:

在 Condition 队列中插入对应的结点后,线程 A 会开释所持有的资源,走到 while 循环那层逻辑,

while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}

isOnSyncQueue 办法的会判断以后的线程节点是不是在同步队列中,这个时候此结点还在 Condition 队列中,所以该办法返回 false,这样的话循环会始终继续上来,线程被挂起,期待被唤醒,此时,线程 A 的流程临时进行了。

当线程 A 调用 await()办法挂起的时候,线程 B 获取到了线程 A 开释的资源,而后执行 signal()办法:

signal

public final void signal() {if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
} 

先判断以后线程是否为获取锁的线程,如果不是则间接抛出异样。接着调用 doSignal()办法来唤醒线程。

private void doSignal(Node first) {
 // 循环,从队列始终往后找不为空的首结点
    do {if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
 // CAS 循环,将结点的 waitStatus 改为 0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
 // 下面曾经剖析过,此办法会把以后结点退出到期待队列中,并返回前驱结点
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
} 

从 doSignal 的代码中能够看出,这时候程序寻找的是 Condition 期待队列中首结点 firstWaiter 的结点,此时该结点指向的是线程 A 的结点,所以之后的流程作用的都是线程 A 的结点。

这里剖析下 transferForSignal 办法,先通过 CAS 自旋将结点 waitStatus 改为 0,而后就把结点放入到同步队列 (此队列不是 Condition 的期待队列) 中,而后再用 CAS 将同步队列中该结点的前驱结点 waitStatus 改为 Node.SIGNAL,也就是 -1,此时 AQS 的数据结构大略如下 (额 ….. 少画了个箭头,大家就当 head 结点是线程 A 结点的前驱结点就好):

回到 await()办法,当线程 A 的结点被退出同步队列中时,isOnSyncQueue()会返回 true,跳出循环,

while (!isOnSyncQueue(node)) {LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode); 

接着执行 acquireQueued()办法,这里就不必多说了吧,尝试从新获取锁,如果获取锁失败持续会被挂起,直到另外线程开释锁才被唤醒。

所以,当线程 B 开释完锁后,线程 A 被唤醒,持续尝试获取锁,至此流程完结。

对于这整个通信过程,咱们能够画一张流程图展现下:

总结

说完了 Condition 的应用和底层运行机制,咱们再来总结下它跟一般 wait/notify 的比拟,个别这也是问的比拟多的,Condition 大略有以下两点劣势:

  • Condition 须要联合 Lock 进行管制,应用的时候要留神肯定要对应的 unlock(),能够对多个不同条件进行管制,只有 new 多个 Condition 对象就能够为多个线程管制通信,wait/notify 只能和 synchronized 关键字一起应用,并且只能唤醒一个或者全副的期待队列;
  • Condition 有相似于 await 的机制,因而不会产生加锁形式而产生的死锁呈现,同时底层实现的是 park/unpark 的机制,因而也不会产生先唤醒再挂起的死锁,一句话就是不会产生死锁,然而 wait/notify 会产生先唤醒再挂起的死锁。

最初

对 AQS 的源码剖析到这里就全副完结了,尽管还有很多知识点没解说,比方偏心锁 / 非偏心锁下 AQS 是怎么作用的,篇幅所限,局部知识点没有扩大还请见谅,尽管如此,如果您能看完文章的话,置信对 AQS 也算是有足够的理解了。

回顾本篇文章,咱们不难发现,无论是独占还是共享模式,或者联合是 Condition 工具应用,AQS 实质上的同步性能都是通过对锁和队列中结点的操作来实现的,从设计上讲,AQS 的组成构造并不算简单,底层的运行机制也不会很绕,所以,大家如果看源码的时候感觉有些艰难的话也不必灰心,多看几遍,顺便画个图之类的,理清下流程还是没什么问题的。

当然,本人看得懂是一回事,写进去让他人看懂又是另一回事了,就像这篇文章,我花了好长的工夫来筹备,又是画图又是理流程的,期间还参考了不少网上大神的博文,肝了几天才算是成文了。尽管我晓得本文不算什么高质文,但我也算是费尽心力了,写技术文真是挺累的,大家看的感觉不错的话还请帮忙转发下或点个赞吧!这也是对我最好的激励了


作者:鄙人薛某,一个不拘于技术的互联网人,技术三流,吹水一流,想看更多精彩文章能够关注我的公众号哦~~~

退出移动版