AQS

AQS全称 AbstractQueuedSynchronizer,队列同步器,该组件是JUC包下的实现锁和其余同步组件的根底框架。咱们先从JavaDoc看看是如何介绍的。因原文过长,这里间接了解摘取第一段,该段能大抵理解该类的作用。若须要理解具体细节仍须要查看其余正文段落。翻译如下:

提供一个依赖先进先出(FIFO)期待队列实现阻塞锁和相干同步器的根底框架。该类被设计为有用的根底是因为大多数同步器都依赖于一个原子的int值来作为他们的状态。子类必须定义更改此状态的protected办法和状态获取和开释对该子类意味着什么。基于以上办法,该类中的其余办法则提供排队和阻塞的机制。子类还能够保护其余状态字段,然而须要应用 getState(), setState(int)compareAndSetState(int, int)来原子同步的更新int值。

简略总结一下:

  • AQS类

    • 提供FIFO队列及其排队和阻塞机制的办法
    • 提供一个int类型的值state来作为子类同步器的状态
  • 子类

    • 提供更改此状态的protected办法
    • 当理论须要获取或更改状态时须要应用对应办法

    state字段

    /** * The synchronization state. */private volatile int state;//返回同步状态的以后值protected final int getState() {  return state;}//设置以后同步状态protected final void setState(int newState) {  state = newState;}//应用CAS设置以后状态,该办法可能保障状态设置的原子性protected final boolean compareAndSetState(int expect, int update) {  // See below for intrinsics setup to support this  return unsafe.compareAndSwapInt(this, stateOffset, expect, update);}

    该字段就是作为子类同步器的状态,volidate润饰该字段是为了保障多线程之间的可见性,具体在子类中意味着什么由实现子类决定。所以了解每个同步器类的state 字段都至关重要。比方在ReentrantLock 中该字段就示意以后线程持有了几次锁。

    FIFO期待队列

    从介绍中看出AQS提供了FIFO期待队列,咱们先看看AQS是如何提供FIFO期待队列及其对应的办法。队列中存储着什么呢?AQS类中首先定义了一个Node类作为队列的节点:节点存在2种模式,SHAREDEXCLUSIVE;存在5种状态用来控制线程的唤醒和阻塞:CANCELLEDSIGNALCONDITIONPROPAGATE ,INITAL;并且该节点为双向的,存在前驱节点和后续节点。(其实每一个节点就是一个期待获取锁的线程)。

    public abstract class AbstractQueuedSynchronizer  extends AbstractOwnableSynchronizer  implements java.io.Serializable {  static final class Node {      /** 共享模式 */      static final Node SHARED = new Node();      /** 排他模式 */      static final Node EXCLUSIVE = null;      //节点期待状态      // CANCELLED(1) :因为超时或者中断,节点会被设置为勾销状态,被勾销的节点时不会参加到竞争中的,他会始终放弃勾销状态不会转变为其余状态      // SIGNAL (-1): 后继节点的线程处于期待状态,而以后节点的线程如果开释了同步状态或者被勾销,将会告诉后继节点,使后继节点的线程得以运行      // CONDITION (-2): 节点在期待队列中,节点线程期待在Condition上,当其余线程对Condition调用了signal()后,该节点将会从期待队列中转移到同步队列中,退出到同步状态的获取中      // PROPAGATE (-3): 示意下一次共享式同步状态获取将会无条件地流传上来      volatile int waitStatus;      //前驱节点      volatile Node prev;      //后续节点      volatile Node next;      //获取同步的线程      volatile Thread thread;      //期待队列中的后续节点      Node nextWaiter;      //返回是否为共享      final boolean isShared() {          return nextWaiter == SHARED;      }      //返回前继节点      final Node predecessor() {          Node p = prev;          if (p == null)              throw new NullPointerException();          else              return p;      }      Node() {    // Used to establish initial head or SHARED marker      }      Node(Thread thread, Node mode) {    // Used by addWaiter          this.nextWaiter = mode;          this.thread = thread;      }      Node(Thread thread, int waitStatus) {    // Used by Condition          this.waitStatus = waitStatus;          this.thread = thread;      }  }    //头节点  private transient volatile Node head;  //尾节点  private transient volatile Node tail;}    

    退出队列

    因为是FIFO队列,整个步骤是依据以后线程和传入的节点模式创立新节点并退出队列尾部。这里尾节点应用cas来操作就是保障尾节点是线程平安的。

    private Node addWaiter(Node mode) {  //创立节点  Node node = new Node(Thread.currentThread(), mode);  // Try the fast path of enq; backup to full enq on failure  //获取尾部节点  Node pred = tail;  if (pred != null) {    //若队列中已有节点      //则设置新节点的prev指向以后尾节点      node.prev = pred;      if (compareAndSetTail(pred, node)) { //若cas操作将新节点设置为尾节点          //则将原来的尾节点的next指向新节点          pred.next = node;          return node;      }  }  //循环尝试,直至胜利  enq(node);  return node;}

    enq办法通过一个死循环尝试,直到把新节点入队列。

    private Node enq(final Node node) {  for (;;) {      //获取尾节点      Node t = tail;      if (t == null) { // Must initialize 若为空队列          if (compareAndSetHead(new Node())) //cas设置新节点为头节点              tail = head;    //尾节点也指向头结点      } else {    //非空队列 (操作同上)          node.prev = t;              if (compareAndSetTail(t, node)) {              t.next = node;              return t;          }      }  }}


    该图简略的展现了节点入队的过程。图中数字示意执行程序。红色示意原先队列中的节点,而蓝色示意新增节点。

    出队列

    因为是FIFO队列,首节点在开释同步状态时将会告诉后继节点,当后继节点失去同步状态时,会把本人设置为 首节点。这个过程中不须要是不须要cas操作的,因为只有一个线程,能够取得同步状态。

    /** * Sets head of queue to be node, thus dequeuing. Called only by * acquire methods.  Also nulls out unused fields for sake of GC * and to suppress unnecessary signals and traversals. * * @param node the node */private void setHead(Node node) {  head = node;  node.thread = null;  node.prev = null;}

    过程图如图所示。

    其余办法

    AQS还定义了其余的办法来作为排队和阻塞的机制。AQS中定义了大量的模板办法来实现同步机制,由子类继承实现。主体上能够分为三大类:独占式获取、开释锁;共享式获取、开释锁;查看同步队列线程状况。

  1. 独占式:同一时刻只有一个线程能够取得锁

    • acquire(int arg):独占式获取同步状态,如果以后线程获取同步状态胜利,则由该办法返回,否则,将会进入同步队列期待,该办法将会调用可重写的tryAcquire(int arg)办法;
    • acquireInterruptibly(int arg):与acquire(int arg)雷同,然而该办法响应中断,以后线程为获取到同步状态而进入到同步队列中,如果以后线程被中断,则该办法会抛出InterruptedException异样并返回;
    • tryAcquire(int arg):独占式获取同步状态,获取同步状态胜利后,其余线程须要期待该线程开释同步状态能力获取同步状态;
    • tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果以后线程在nanos工夫内没有获取到同步状态,那么将会返回false,曾经获取则返回true;
    • release(int arg):独占式开释同步状态,该办法会在开释同步状态之后,将同步队列中第一个节点蕴含的线程唤醒;
    • tryRelease(int arg):独占式开释同步状态;
  2. 共享式:同一时刻能够有多个线程取得锁,比方读锁多个线程取得,写锁一个线程取得

    • acquireShared(int arg):共享式获取同步状态,如果以后线程未获取到同步状态,将会进入同步队列期待,与独占式的次要区别是在同一时刻能够有多个线程获取到同步状态;
    • acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;
    • tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则示意获取胜利,否则获取失败;
    • tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,减少超时限度;
    • releaseShared(int arg):共享式开释同步状态;
    • tryReleaseShared(int arg):共享式开释同步状态;
  3. 查问同步队列线程状况

    • isHeldExclusively():以后同步器是否在独占式模式下被线程占用,个别该办法示意是否被以后线程所独占;
    • ...

咱们接下来就筛选最根底的获取和开释锁来看看是怎么解决的。

独占式

获取锁

public final void acquire(int arg) {        if (!tryAcquire(arg) &&         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))         selfInterrupt();}//子类实现protected boolean tryAcquire(int arg) {    throw new UnsupportedOperationException();}final boolean acquireQueued(final Node node, int arg) {    //是否失败    boolean failed = true;    try {        //是否中断        boolean interrupted = false;        //死循环自旋        for (;;) {            //获取前节点            final Node p = node.predecessor();            //若前节点为头节点且获取锁胜利            if (p == head && tryAcquire(arg)) {                //设置头节点                setHead(node);                p.next = null; // help GC                fail = false;                return interrupted;            }            //获取失败,判断是否须要阻塞 后文介绍(线程阻塞)            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)            cancelAcquire(node);    }}

tryAcquire 是交由子类同步器实现的办法,具体就是尝试获取锁,如果胜利则间接返回;否则执行addWaiter(Node.EXCLUSIVE) 该线程退出到FIFO队列尾部;而后执行 acquireQueued 线程会进入一个死循环自旋的过程去获取锁直到退出,并返回是否被中断。

开释锁

public final boolean release(int arg) {    if (tryRelease(arg)) {    //尝试开释锁        Node h = head;        if (h != null && h.waitStatus != 0)            //唤醒后继节点 该办法前面会介绍            unparkSuccessor(h);        return true;    }    return false;}//子类实现protected boolean tryRelease(int arg) {    throw new UnsupportedOperationException();}

tryReleasetryAcquire 一样是交由子类同步器实现的办法,具体则是尝试开释锁;如果胜利则调用unparkSuccessor唤醒后继节点。

共享式

获取锁

public final void acquireShared(int arg) {    if (tryAcquireShared(arg) < 0)        doAcquireShared(arg);}//由子类实现protected int tryAcquireShared(int arg) {    throw new UnsupportedOperationException();}private void doAcquireShared(int arg) {    //退出队列尾部    final Node node = addWaiter(Node.SHARED);    boolean failed = true;    try {        boolean interrupted = false;        //死循环自旋        for (;;) {            //前继节点            final Node p = node.predecessor();            if (p == head) {    //头结点                //尝试获取锁                int r = tryAcquireShared(arg);                if (r >= 0) {                    setHeadAndPropagate(node, r);                    p.next = null; // help GC                    if (interrupted)                        selfInterrupt();                    failed = false;                    return;                }            }            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)            cancelAcquire(node);    }}

tryAcquireShared 则不多作介绍都是由子类实现,这里要留神的是共享式返回值是 int ,只有该值大于等于0则示意获取到了锁,否则通过doAcquireShared退出队列自旋获取锁。   (整个过程相似于独占式获取锁)

开释锁

public final boolean releaseShared(int arg) {    if (tryReleaseShared(arg)) {        doReleaseShared();        return true;    }    return false;}//子类实现protected boolean tryReleaseShared(int arg) {    throw new UnsupportedOperationException();}private void doReleaseShared() {    //死循环 自旋    for (;;) {        //获取头节点        Node h = head;            if (h != null && h != tail) { //不是只有一个节点            int ws = h.waitStatus;    //获取节点状态            if (ws == Node.SIGNAL) {    //唤醒后继节点                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))     //cas开释                    continue;            // loop to recheck cases                unparkSuccessor(h);            }            else if (ws == 0 &&        //初始化状态                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //cas开释                continue;                // loop on failed CAS        }        if (h == head)                   // loop if head changed            break;    }}

可能存在多个线程同时进行开释同步资源(读锁),所以通过CAS操作须要确保同步状态平安胜利开释。

线程阻塞

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {       //前驱节点期待状态    int ws = pred.waitStatus;    if (ws == Node.SIGNAL)    //前驱节点处于处于期待状态 间接返回        return true;    if (ws > 0) {    // 前驱节点被中断或勾销,须要从队列中移除 直到所有中断或勾销的前驱节点全副被移除        do {            node.prev = pred = pred.prev;        } while (pred.waitStatus > 0);        pred.next = node;    } else {    // ws < 0 即 -2 -3的状况 CONDITION PROPAGATE        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);    }    return false;}

该段代码处于循环获取锁的过程中(acquireQueued),在自旋的过程中须要判断以后线程是否须要阻塞。该办法依附前驱节点来判断是否须要阻塞。如果返回true则调用下列办法。

private final boolean parkAndCheckInterrupt() {    LockSupport.park(this);    return Thread.interrupted();}

该办法调用次要应用LockSupport把线程挂起,阻塞住线程的调用栈,同时返回以后线程的中断状态。

线程唤醒

private void unparkSuccessor(Node node) {    //以后节点状态    int ws = node.waitStatus;    if (ws < 0) //节点状态为0 设置为0        compareAndSetWaitStatus(node, ws, 0);    //后继节点    Node s = node.next;    if (s == null || s.waitStatus > 0) { //后继节点为空或被中断勾销        s = null;        //从尾节点向前找到可用节点        for (Node t = tail; t != null && t != node; t = t.prev)            if (t.waitStatus <= 0)                s = t;        }    //唤醒后继节点    if (s != null)        LockSupport.unpark(s.thread);}

当线程被阻塞且前一个线程开释锁后,调用unparkSuccessor办法来唤醒后继节点。从尾节点开始寻找一个最近可用的节点,为什么从最初一个找是因为正着找可能后继节点依然会是为null或勾销。此处尾节点开始找到之后并没有退出循环,而是持续往前找,直到找到间隔开释锁最近的一个节点。

参考链接

【死磕Java并发】—–J.U.C之AQS:AQS简介
【死磕Java并发】—–J.U.C之AQS:CLH同步队列
【死磕Java并发】—–J.U.C之AQS:同步状态的获取与开释
【死磕Java并发】—–J.U.C之AQS:阻塞和唤醒线程