关于java:这才是图文并茂我写了1万多字就是为了让你了解AQS是怎么运行的

31次阅读

共计 16155 个字符,预计需要花费 41 分钟才能阅读完成。

前言

如果你想深入研究 Java 并发的话,那么 AQS 肯定是绕不开的一块知识点,Java 并发包很多的同步工具类底层都是基于 AQS 来实现的,比方咱们工作中常常用的 Lock 工具 ReentrantLock、栅栏 CountDownLatch、信号量 Semaphore 等,而且对于 AQS 的知识点也是面试中常常考查的内容,所以,无论是为了更好的应用还是为了应酬面试,深刻学习 AQS 都很有必要。

CAS

学习 AQS 之前,咱们有必要理解一个知识点,就是 AQS 底层中大量应用的 CAS,对于 CAS,大家应该都不生疏,如果还有哪位同学不分明的话,能够看看我之前的文章《面试必问系列:乐观锁和乐观锁的那些事儿》,这里不多复述,哈哈,给本人旧文章加了浏览量

AQS 简介

本文配角正式退场。

AQS,全名 AbstractQueuedSynchronizer,是一个抽象类的队列式同步器,它的外部通过保护一个状态 volatile int state(共享资源),一个 FIFO 线程期待队列来实现同步性能。

state 用关键字 volatile 润饰,代表着该共享资源的状态一更改就能被所有线程可见,而 AQS 的加锁形式实质上就是多个线程在竞争 state,当 state 为 0 时代表线程能够竞争锁,不为 0 时代表以后对象锁曾经被占有,其余线程来加锁时则会失败,加锁失败的线程会被放入一个 FIFO 的期待队列中,这些线程会被 UNSAFE.park()操作挂起,期待其余获取锁的线程开释锁才可能被唤醒。

而这个期待队列其实就相当于一个 CLH 队列,用一张原理图来示意大抵如下:

根底定义

AQS 反对两种资源分享的形式:Exclusive(独占,只有一个线程能执行,如 ReentrantLock)和 Share(共享,多个线程可同时执行,如 Semaphore/CountDownLatch)。

自定义的同步器继承 AQS 后,只须要实现共享资源 state 的获取和开释形式即可,其余如线程队列的保护(如获取资源失败入队 / 唤醒出队等)等操作,AQS 在顶层曾经实现了,

AQS 代码外部提供了一系列操作锁和线程队列的办法,次要操作锁的办法蕴含以下几个:

  • compareAndSetState():利用 CAS 的操作来设置 state 的值
  • tryAcquire(int):独占形式获取锁。胜利则返回 true,失败则返回 false。
  • tryRelease(int):独占形式开释锁。胜利则返回 true,失败则返回 false。
  • tryAcquireShared(int):共享形式开释锁。正数示意失败;0 示意胜利,但没有残余可用资源;负数示意胜利,且有残余资源。
  • tryReleaseShared(int):共享形式开释锁。如果开释后容许唤醒后续期待结点返回 true,否则返回 false。

像 ReentrantLock 就是实现了自定义的 tryAcquire-tryRelease,从而操作 state 的值来实现同步成果。

除此之外,AQS 外部还定义了一个动态类 Node,示意 CLH 队列的每一个结点,该结点的作用是对每一个期待获取资源做了封装,蕴含了须要同步的线程自身、线程期待状态 …..

咱们能够看下该类的一些重点变量:

static final class Node {

    /** 示意共享模式下期待的 Node */
    static final Node SHARED = new Node();
    /** 示意独占模式下期待的 mode */
    static final Node EXCLUSIVE = null;

    /** 上面几个为 waitStatus 的具体值 */
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    volatile int waitStatus;
    
     /** 示意后面的结点 */
    volatile Node prev;
     /** 示意前面的结点 */
    volatile Node next;
     /** 以后结点装载的线程,初始化时被创立,应用后会置空 */
    volatile Thread thread;
     /** 链接到下一个节点的期待条件,用到 Condition 的时候会应用到 */
    Node nextWaiter;

} 

代码外面定义了一个示意以后 Node 结点期待状态的字段 waitStatus,该字段的取值蕴含了 CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)、0,这五个值代表了不同的特定场景:

  • CANCELLED:示意以后结点已勾销调度。当 timeout 或被中断(响应中断的状况下),会触发变更为此状态,进入该状态后的结点将不会再变动。
  • SIGNAL:示意后继结点在期待以后结点唤醒。后继结点入队时,会将前继结点的状态更新为 SIGNAL(记住这个 - 1 的值,因为前面咱们讲的时候常常会提到)
  • CONDITION:示意结点期待在 Condition 上,当其余线程调用了 Condition 的 signal()办法后,CONDITION 状态的结点将从期待队列转移到同步队列中,期待获取同步锁。(注:Condition 是 AQS 的一个组件,前面会细说)
  • PROPAGATE:共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
  • 0:新结点入队时的默认状态。

也就是说,当 waitStatus 为负值示意结点处于无效期待状态,为正值的时候示意结点已被勾销。

在 AQS 外部中还保护了两个 Node 对象 head 和 tail,一开始默认都为 null

private transient volatile Node head;
private transient volatile Node tail; 

讲完了 AQS 的一些根底定义,咱们就能够开始学习同步的具体运行机制了,为了更好的演示,咱们用 ReentrantLock 作为应用入口,一步步跟进源码探索 AQS 底层是如何运作的,这里阐明一下,因为 ReentrantLock 底层调用的 AQS 是独占模式,所以下文解说的 AQS 源码也是针对独占模式的操作

好了,热身正式完结,来吧。

独占模式


加锁过程

咱们都晓得,ReentrantLock 的加锁和解锁办法别离为 lock()和 unLock(),咱们先来看获取锁的办法,

final void lock() {if (compareAndSetState(0, 1))
  setExclusiveOwnerThread(Thread.currentThread());
 else
  acquire(1);
} 

逻辑很简略,线程进来后间接利用 CAS 尝试抢占锁,如果抢占胜利 state 值回被改为 1,且设置对象独占锁线程为以后线程,否则就调用 acquire(1)再次尝试获取锁。

咱们假设有两个线程 A 和 B 同时竞争锁,A 进来先抢占到锁,此时的 AQS 模型图就相似这样:

持续走上面的办法,

public final void acquire(int arg) {if (!tryAcquire(arg) &&
  acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  selfInterrupt();} 

acquire 蕴含了几个函数的调用,

tryAcquire:尝试间接获取锁,如果胜利就间接返回;

addWaiter:将该线程退出期待队列 FIFO 的尾部,并标记为独占模式;

acquireQueued:线程阻塞在期待队列中获取锁,始终获取到资源后才返回。如果在整个期待过程中被中断过,则返回 true,否则返回 false。

selfInterrupt:自我中断,就是既拿不到锁,又在期待时被中断了,线程就会进行自我中断 selfInterrupt(),将中断补上。

咱们一个个来看源码,并联合下面的两个线程来做场景剖析。

tryAcquire

不必多说,就是为了再次尝试获取锁

protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
 int c = getState();
 if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
   return true;
  }
 }
 else if (current == getExclusiveOwnerThread()) {
  int nextc = c + acquires;
  if (nextc < 0) // overflow
   throw new Error("Maximum lock count exceeded");
  setState(nextc);
  return true;
 }
 return false;
} 

当线程 B 进来后,nonfairTryAcquire 办法首先会获取 state 的值,如果为 0,则失常获取该锁,不为 0 的话判断是否是以后线程占用了,是的话就累加 state 的值,这里的累加也是为了配合开释锁时候的次数,从而实现可重入锁的成果。

当然,因为之前锁曾经被线程 A 霸占了,所以这时候 tryAcquire 会返回 false,持续上面的流程。

addWaiter

private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);
 // Try the fast path of enq; backup to full enq on failure
 Node pred = tail;
 if (pred != null) {
  node.prev = pred;
  if (compareAndSetTail(pred, node)) {
   pred.next = node;
   return node;
  }
 }
 enq(node);
 return node;
} 

这段代码首先会创立一个和以后线程绑定的 Node 节点,Node 为双向链表。此时期待队列中的 tail 指针为空,间接调用 enq(node)办法将以后线程退出期待队列尾部,而后返回以后结点的前驱结点,

private Node enq(final Node node) {
 // CAS"自旋",直到胜利退出队尾
    for (;;) {
        Node t = tail;
        if (t == null) {
         // 队列为空,初始化一个 Node 结点作为 Head 结点,并将 tail 结点也指向它
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
         // 把以后结点插入队列尾部
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
} 

第一遍循环时,tail 指针为空,初始化一个 Node 结点,并把 head 和 tail 结点都指向它,而后第二次循环进来之后,tail 结点不为空了,就将以后的结点退出到 tail 结点前面,也就是这样:

todo 如果此时有另一个线程 C 进来的话,发现锁曾经被 A 拿走了,而后队列里曾经有了线程 B,那么线程 C 就只能乖乖排到线程 B 的前面去,

acquireQueued

接着解读办法,通过 tryAcquire()和 addWaiter(),咱们的线程还是没有拿到资源,并且还被排到了队列的尾部,如果让你来设计的话,这个时候你会怎么解决线程呢?其实答案也很简略,能做的事无非两个:

1、循环让线程再抢资源。但认真一斟酌就晓得不合理,因为如果有多个线程都参加的话,你抢我也抢只会升高零碎性能

2、进入期待状态劳动,直到其余线程彻底开释资源后唤醒本人,本人再拿到资源

毫无疑问,抉择 2 更加靠谱,acquireQueued 办法做的也是这样的解决:

final boolean acquireQueued(final Node node, int arg) {
 boolean failed = true;
 try {
  // 标记是否会被中断
  boolean interrupted = false;
  // CAS 自旋
  for (;;) {
   // 获取以后结点的前结点
   final Node p = node.predecessor();
   if (p == head && tryAcquire(arg)) {setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
   }
   if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
  }
 } finally {if (failed)
   // 获取锁失败,则将此线程对应的 node 的 waitStatus 改为 CANCEL
   cancelAcquire(node);
 }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 int ws = pred.waitStatus;
 if (ws == Node.SIGNAL)
  // 前驱结点期待状态为 "SIGNAL",那么本人就能够安心期待被唤醒了
  return true;
 if (ws > 0) {
  /*
   * 前驱结点被勾销了,通过循环始终往前找,直到找到期待状态无效的结点(期待状态值小于等于 0),* 而后排在他们的后边,至于那些被以后 Node 强制 "靠后" 的结点,因为曾经被勾销了,也没有援用链,* 就等着被 GC 了
   */
  do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
  pred.next = node;
 } else {
  // 如果前驱失常,那就把前驱的状态设置成 SIGNAL
  compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
 }
 return false;
}
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);
 return Thread.interrupted();} 

acquireQueued 办法的流程是这样的:

1、CAS 自旋,先判断以后传入的 Node 的前结点是否为 head 结点,是的话就尝试获取锁,获取锁胜利的话就把以后结点置为 head,之前的 head 置为 null(不便 GC),而后返回

2、如果前驱结点不是 head 或者加锁失败的话,就调用 shouldParkAfterFailedAcquire,将前驱节点的 waitStatus 变为了 SIGNAL=-1,最初执行 parkAndChecknIterrupt 办法,调用 LockSupport.park()挂起以后线程,parkAndCheckInterrupt 在挂起线程后会判断线程是否被中断,如果被中断的话,就会从新跑 acquireQueued 办法的 CAS 自旋操作,直到获取资源。

ps:LockSupport.park 办法会让以后线程进入 waitting 状态,在这种状态下,线程被唤醒的状况有两种,一是被 unpark(),二是被 interrupt(),所以,如果是第二种状况的话,须要返回被中断的标记,而后在 acquire 顶层办法的窗口那里自我中断补上

此时,因为线程 A 还未开释锁,所以线程 B 状态都是被挂起的,

到这里,加锁的流程就剖析完了,其实整体来说也并不简单,而且当你了解了独占模式加锁的过程,前面开释锁和共享模式的运行机制也没什么难懂的了,所以整个加锁的过程还是有必要多消化下的,也是 AQS 的重中之重。

为了不便你们更加清晰了解,我加多一张流程图吧(这个作者也太暖了吧,哈哈)

开释锁

说完了加锁,咱们来看看开释锁是怎么做的,AQS 中开释锁的办法是 release(),当调用该办法时会开释指定量的资源 (也就是锁),如果彻底开释了(即 state=0), 它会唤醒期待队列里的其余线程来获取资源。

还是一步步看源码吧,

public final boolean release(int arg) {if (tryRelease(arg)) {
  Node h = head;
  if (h != null && h.waitStatus != 0)
   unparkSuccessor(h);
  return true;
 }
 return false;
} 

tryRelease

代码上能够看出,外围的逻辑都在 tryRelease 办法中,该办法的作用是开释资源,AQS 里该办法没有具体的实现,须要由自定义的同步器去实现,咱们看下 ReentrantLock 代码中对应办法的源码:

protected final boolean tryRelease(int releases) {int c = getState() - releases;
     if (Thread.currentThread() != getExclusiveOwnerThread())
      throw new IllegalMonitorStateException();
     boolean free = false;
     if (c == 0) {
      free = true;
      setExclusiveOwnerThread(null);
     }
     setState(c);
     return free;
} 

tryRelease 办法会减去 state 对应的值,如果 state 为 0,也就是曾经彻底开释资源,就返回 true,并且把独占的线程置为 null,否则返回 false。

此时 AQS 中的数据就会变成这样:

齐全开释资源后,以后线程要做的就是唤醒 CLH 队列中第一个在期待资源的线程,也就是 head 结点前面的线程,此时调用的办法是 unparkSuccessor(),

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
     // 将 head 结点的状态置为 0
        compareAndSetWaitStatus(node, ws, 0);
 // 找到下一个须要唤醒的结点 s
    Node s = node.next;
    // 如果为空或已勾销
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从后向前,直到找到期待状态小于 0 的结点,后面说了,结点 waitStatus 小于 0 时才无效
        for (Node t = tail; t != null && t != node; t = t.prev) 
            if (t.waitStatus <= 0)
                s = t;
    }
    // 找到无效的结点,间接唤醒
    if (s != null)
        LockSupport.unpark(s.thread);// 唤醒
} 

办法的逻辑很简略,就是先将 head 的结点状态置为 0,防止上面找结点的时候再找到 head,而后找到队列中最后面的无效结点,而后唤醒,咱们假如这个时候线程 A 曾经开释锁,那么此时队列中排最前边竞争锁的线程 B 就会被唤醒,

而后被唤醒的线程 B 就会尝试用 CAS 获取锁,回到 acquireQueued 办法的逻辑,

for (;;) {
     // 获取以后结点的前结点
     final Node p = node.predecessor();
     if (p == head && tryAcquire(arg)) {setHead(node);
      p.next = null; // help GC
      failed = false;
      return interrupted;
     }
     if (shouldParkAfterFailedAcquire(p, node) &&
      parkAndCheckInterrupt())
      interrupted = true;
} 

当线程 B 获取锁之后,会把以后结点赋值给 head,而后原先的前驱结点 (也就是原来的 head 结点) 去掉援用链,不便回收,这样一来,线程 B 获取锁的整个过程就实现了,此时 AQS 的数据就会变成这样:

到这里,咱们曾经剖析完了 AQS 独占模式下加锁和开释锁的过程,也就是 tryAccquire->tryRelease 这一链条的逻辑,除此之外,AQS 中还反对共享模式的同步,这种模式下对于锁的操作外围其实就是 tryAcquireShared->tryReleaseShared 这两个办法,咱们能够简略看下

共享模式


获取锁

AQS 中,共享模式获取锁的顶层入口办法是 acquireShared,该办法会获取指定数量的资源,胜利的话就间接返回,失败的话就进入期待队列,直到获取资源,

public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)
      doAcquireShared(arg);
} 

该办法里蕴含了两个办法的调用,

tryAcquireShared:尝试获取肯定资源的锁,返回的值代表获取锁的状态。

doAcquireShared:进入期待队列,并循环尝试获取锁,直到胜利。

tryAcquireShared

tryAcquireShared 在 AQS 里没有实现,同样由自定义的同步器去实现具体的逻辑,像一些较为常见的并发工具 Semaphore、CountDownLatch 里就有对该办法的自定义实现,尽管实现的逻辑不同,但办法的作用是一样的,就是获取肯定资源的资源,而后依据返回值判断是否还有残余资源,从而决定下一步的操作。

返回值有三种定义:

  • 负值代表获取失败;
  • 0 代表获取胜利,但没有残余的资源,也就是 state 曾经为 0;
  • 正值代表获取胜利,而且 state 还有残余,其余线程能够持续支付

当返回值小于 0 时,证实此次获取肯定数量的锁失败了,而后就会走 doAcquireShared 办法

doAcquireShared

此办法的作用是将以后线程退出期待队列尾部劳动,直到其余线程开释资源唤醒本人,本人胜利拿到相应量的资源后才返回,这是它的源码:

private void doAcquireShared(int arg) {
 // 退出队列尾部
 final Node node = addWaiter(Node.SHARED);
 boolean failed = true;
 try {
  boolean interrupted = false;
  // CAS 自旋
  for (;;) {final Node p = node.predecessor();
   // 判断前驱结点是否是 head
   if (p == head) {
    // 尝试获取肯定数量的锁
    int r = tryAcquireShared(arg);
    if (r >= 0) {
     // 获取锁胜利,而且还有残余资源,就设置以后结点为 head,并持续唤醒下一个线程
     setHeadAndPropagate(node, r);
     // 让前驱结点去掉援用链,不便被 GC
     p.next = null; // help GC
     if (interrupted)
      selfInterrupt();
     failed = false;
     return;
    }
   }
   // 跟独占模式一样,改前驱结点 waitStatus 为 -1,并且以后线程挂起,期待被唤醒
   if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
  }
 } finally {if (failed)
   cancelAcquire(node);
 }
}

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    // head 指向本人
    setHead(node);
     // 如果还有残余量,持续唤醒下一个街坊线程
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();}
} 

看到这里,你会不会一点相熟的感觉,这个办法的逻辑怎么跟下面那个 acquireQueued() 那么相似啊?对的,其实两个流程并没有太大的差异。只是 doAcquireShared()比起独占模式下的获取锁上多了一步唤醒后继线程的操作,当获取完肯定的资源后,发现还有残余的资源,就持续唤醒下一个街坊线程,这才合乎 ” 共享 ” 的思维嘛。

这里咱们能够提出一个疑难,共享模式下,以后线程开释了肯定数量的资源,但这部分资源满足不了下一个期待结点的需要的话,那么会怎么样?

依照失常的思维,共享模式是能够多个线程同时执行的才对,所以,多个线程的状况下,如果老大开释完资源,但这部分资源满足不了老二,但能满足老三,那么老三就能够拿到资源。可事实是,从源码设计中能够看出,如果真的产生了这种状况,老三是拿不到资源的,因为期待队列是按顺序排列的,老二的资源需求量大,会把前面量小的老三以及老四、老五等都给卡住。从这一个角度来看,尽管 AQS 严格保障了程序,但也升高了并发能力

接着往下说吧,唤醒下一个街坊线程的逻辑在 doReleaseShared()中,咱们放到上面的开释锁来解析。

开释锁

共享模式开释锁的顶层办法是 releaseShared,它会开释指定量的资源,如果胜利开释且容许唤醒期待线程,它会唤醒期待队列里的其余线程来获取资源。上面是 releaseShared()的源码:

public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();
      return true;
     }
     return false;
} 

该办法同样蕴含两局部的逻辑:

tryReleaseShared:开释资源。

doAcquireShared:唤醒后继结点。

跟 tryAcquireShared 办法一样,tryReleaseShared 在 AQS 中没有具体的实现,由子同步器本人去定义,但性能都一样,就是开释肯定数量的资源。

开释完资源后,线程不会马上就出工,而是唤醒期待队列里最前排的期待结点。

doAcquireShared

唤醒后继结点的工作在 doReleaseShared()办法中实现,咱们能够看下它的源码:

private void doReleaseShared() {for (;;) {
      // 获取期待队列中的 head 结点
      Node h = head;
      if (h != null && h != tail) {
       int ws = h.waitStatus;
       // head 结点 waitStatus = -1, 唤醒下一个结点对应的线程
       if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
         continue;            // loop to recheck cases
        // 唤醒后继结点
        unparkSuccessor(h);
       }
       else if (ws == 0 &&
          !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
        continue;                // loop on failed CAS
      }
      if (h == head)                   // loop if head changed
       break;
     }
} 

代码没什么特地的,就是如果期待队列 head 结点的 waitStatus 为 - 1 的话,就间接唤醒后继结点,唤醒的办法 unparkSuccessor()在下面曾经讲过了,这里也没必要再复述。

总的来看,AQS 共享模式的运作流程和独占模式很类似,只有把握了独占模式的流程运行,共享模式什么的不就那样吗,没难度。这也是我为什么共享模式解说中不画流程图的起因,没必要嘛。

Condition

介绍完了 AQS 的外围性能,咱们再扩大一个知识点,在 AQS 中,除了提供独占 / 共享模式的加锁 / 解锁性能,它还对外提供了对于 Condition 的一些操作方法。

Condition 是个接口,在 jdk1.5 版本后设计的,根本的办法就是 await()和 signal()办法,性能大略就对应 Object 的 wait()和 notify(),Condition 必须要配合锁一起应用,因为对共享状态变量的拜访产生在多线程环境下。一个 Condition 的实例必须与一个 Lock 绑定,因而 Condition 个别都是作为 Lock 的外部实现,AQS 中就定义了一个类 ConditionObject 来实现了这个接口,

那么它应该怎么用呢?咱们能够简略写个 demo 来看下成果

public class ConditionDemo {public static void main(String[] args) {ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        Thread tA = new Thread(() -> {lock.lock();
            try {System.out.println("线程 A 加锁胜利");
                System.out.println("线程 A 执行 await 被挂起");
                condition.await();
                System.out.println("线程 A 被唤醒胜利");
            } catch (InterruptedException e) {e.printStackTrace();
            } finally {lock.unlock();
                System.out.println("线程 A 开释锁胜利");
            }
        });

        Thread tB = new Thread(() -> {lock.lock();
            try {System.out.println("线程 B 加锁胜利");
                condition.signal();
                System.out.println("线程 B 唤醒线程 A");
            } finally {lock.unlock();
                System.out.println("线程 B 开释锁胜利");
            }
        });
        tA.start();
        tB.start();}
} 

执行 main 函数后后果输入为:

线程 A 加锁胜利
线程 A 执行 await 被挂起
线程 B 加锁胜利
线程 B 唤醒线程 A
线程 B 开释锁胜利
线程 A 被唤醒胜利
线程 A 开释锁胜利

代码执行的后果很容易了解,线程 A 先获取锁,而后调用 await()办法挂起以后线程并开释锁,线程 B 这时候拿到锁,而后调用 signal 唤醒线程 A。

毫无疑问,这两个办法让线程的状态产生了变动,咱们认真来钻研一下,

翻看 AQS 的源码,咱们会发现 Condition 中定义了两个属性 firstWaiter 和 lastWaiter,后面说了,AQS 中蕴含了一个 FIFO 的 CLH 期待队列,每个 Conditon 对象就蕴含这样一个期待队列,而这两个属性别离示意的是期待队列中的首尾结点,

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter; 

留神:Condition 当中的期待队列和 AQS 主体的同步期待队列是离开的,两个队列尽管构造体雷同,然而作用域是离开的

await

先看 await()的源码:

public final void await() throws InterruptedException {if (Thread.interrupted())
        throw new InterruptedException();
    // 将以后线程退出到期待队列中
    Node node = addConditionWaiter();
    // 齐全开释占有的资源,并返回资源数
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 循环判断以后结点是不是在 Condition 的队列中,是的话挂起
    while (!isOnSyncQueue(node)) {LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
} 

当一个线程调用 Condition.await()办法,将会以以后线程结构结点,这个结点的 waitStatus 赋值为 Node.CONDITION,也就是 -2,并将结点从尾部退出期待队列,而后尾部结点就会指向这个新增的结点,

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
} 

咱们仍然用下面的 demo 来演示,此时,线程 A 获取锁并调用 Condition.await()办法后,AQS 外部的数据结构会变成这样:

在 Condition 队列中插入对应的结点后,线程 A 会开释所持有的资源,走到 while 循环那层逻辑,

while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}

isOnSyncQueue 办法的会判断以后的线程节点是不是在同步队列中,这个时候此结点还在 Condition 队列中,所以该办法返回 false,这样的话循环会始终继续上来,线程被挂起,期待被唤醒,此时,线程 A 的流程临时进行了。

当线程 A 调用 await()办法挂起的时候,线程 B 获取到了线程 A 开释的资源,而后执行 signal()办法:

signal

public final void signal() {if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
} 

先判断以后线程是否为获取锁的线程,如果不是则间接抛出异样。接着调用 doSignal()办法来唤醒线程。

private void doSignal(Node first) {
 // 循环,从队列始终往后找不为空的首结点
    do {if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
 // CAS 循环,将结点的 waitStatus 改为 0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
 // 下面曾经剖析过,此办法会把以后结点退出到期待队列中,并返回前驱结点
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
} 

从 doSignal 的代码中能够看出,这时候程序寻找的是 Condition 期待队列中首结点 firstWaiter 的结点,此时该结点指向的是线程 A 的结点,所以之后的流程作用的都是线程 A 的结点。

这里剖析下 transferForSignal 办法,先通过 CAS 自旋将结点 waitStatus 改为 0,而后就把结点放入到同步队列 (此队列不是 Condition 的期待队列) 中,而后再用 CAS 将同步队列中该结点的前驱结点 waitStatus 改为 Node.SIGNAL,也就是 -1,此时 AQS 的数据结构大略如下 (额 ….. 少画了个箭头,大家就当 head 结点是线程 A 结点的前驱结点就好):

回到 await()办法,当线程 A 的结点被退出同步队列中时,isOnSyncQueue()会返回 true,跳出循环,

while (!isOnSyncQueue(node)) {LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode); 

接着执行 acquireQueued()办法,这里就不必多说了吧,尝试从新获取锁,如果获取锁失败持续会被挂起,直到另外线程开释锁才被唤醒。

所以,当线程 B 开释完锁后,线程 A 被唤醒,持续尝试获取锁,至此流程完结。

对于这整个通信过程,咱们能够画一张流程图展现下:

总结

说完了 Condition 的应用和底层运行机制,咱们再来总结下它跟一般 wait/notify 的比拟,个别这也是问的比拟多的,Condition 大略有以下两点劣势:

  • Condition 须要联合 Lock 进行管制,应用的时候要留神肯定要对应的 unlock(),能够对多个不同条件进行管制,只有 new 多个 Condition 对象就能够为多个线程管制通信,wait/notify 只能和 synchronized 关键字一起应用,并且只能唤醒一个或者全副的期待队列;
  • Condition 有相似于 await 的机制,因而不会产生加锁形式而产生的死锁呈现,同时底层实现的是 park/unpark 的机制,因而也不会产生先唤醒再挂起的死锁,一句话就是不会产生死锁,然而 wait/notify 会产生先唤醒再挂起的死锁。

最初

对 AQS 的源码剖析到这里就全副完结了,尽管还有很多知识点没解说,比方偏心锁 / 非偏心锁下 AQS 是怎么作用的,篇幅所限,局部知识点没有扩大还请见谅,尽管如此,如果您能看完文章的话,置信对 AQS 也算是有足够的理解了。

回顾本篇文章,咱们不难发现,无论是独占还是共享模式,或者联合是 Condition 工具应用,AQS 实质上的同步性能都是通过对锁和队列中结点的操作来实现的,从设计上讲,AQS 的组成构造并不算简单,底层的运行机制也不会很绕,所以,大家如果看源码的时候感觉有些艰难的话也不必灰心,多看几遍,顺便画个图之类的,理清下流程还是没什么问题的。

当然,本人看得懂是一回事,写进去让他人看懂又是另一回事了,就像这篇文章,我花了好长的工夫来筹备,又是画图又是理流程的,期间还参考了不少网上大神的博文,肝了几天才算是成文了。尽管我晓得本文不算什么高质文,但我也算是费尽心力了,写技术文真是挺累的,大家看的感觉不错的话还请帮忙转发下或点个赞吧!这也是对我最好的激励了


作者:鄙人薛某,一个不拘于技术的互联网人,技术三流,吹水一流,想看更多精彩文章能够关注我的公众号哦~~~

正文完
 0