关于java:Java并发编程AQS浅析

1次阅读

共计 8075 个字符,预计需要花费 21 分钟才能阅读完成。

AQS

AQS 全称 AbstractQueuedSynchronizer,队列同步器,该组件是 JUC 包下的实现锁和其余同步组件的根底框架。咱们先从 JavaDoc 看看是如何介绍的。因原文过长,这里间接了解摘取第一段,该段能大抵理解该类的作用。若须要理解具体细节仍须要查看其余正文段落。翻译如下:

提供一个依赖先进先出(FIFO)期待队列实现阻塞锁和相干同步器的根底框架。该类被设计为有用的根底是因为大多数同步器都依赖于一个原子的 int 值来作为他们的状态。子类必须定义更改此状态的 protected 办法和状态获取和开释对该子类意味着什么。基于以上办法,该类中的其余办法则提供排队和阻塞的机制。子类还能够保护其余状态字段,然而须要应用 getState(), setState(int)compareAndSetState(int, int)来原子同步的更新 int 值。

简略总结一下:

  • AQS 类

    • 提供 FIFO 队列及其排队和阻塞机制的办法
    • 提供一个 int 类型的值 state 来作为子类同步器的状态
  • 子类

    • 提供更改此状态的 protected 办法
    • 当理论须要获取或更改状态时须要应用对应办法

    state 字段

    /**
     * The synchronization state.
     */
    private volatile int state;
    
    // 返回同步状态的以后值
    protected final int getState() {return state;}
    // 设置以后同步状态
    protected final void setState(int newState) {state = newState;}
    // 应用 CAS 设置以后状态,该办法可能保障状态设置的原子性
    protected final boolean compareAndSetState(int expect, int update) {
      // See below for intrinsics setup to support this
      return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    该字段就是作为子类同步器的状态,volidate 润饰该字段是为了保障多线程之间的可见性,具体在子类中意味着什么由实现子类决定。所以了解每个同步器类的state 字段都至关重要。比方在ReentrantLock 中该字段就示意以后线程持有了几次锁。

    FIFO 期待队列

    从介绍中看出 AQS 提供了 FIFO 期待队列,咱们先看看 AQS 是如何提供 FIFO 期待队列及其对应的办法。队列中存储着什么呢?AQS 类中首先定义了一个 Node 类作为队列的节点:节点存在 2 种模式,SHAREDEXCLUSIVE;存在 5 种状态用来控制线程的唤醒和阻塞:CANCELLEDSIGNALCONDITIONPROPAGATE ,INITAL;并且该节点为双向的,存在前驱节点和后续节点。( 其实每一个节点就是一个期待获取锁的线程)。

    public abstract class AbstractQueuedSynchronizer
      extends AbstractOwnableSynchronizer
      implements java.io.Serializable {
      static final class Node {
          /** 共享模式 */
          static final Node SHARED = new Node();
          /** 排他模式 */
          static final Node EXCLUSIVE = null;
          // 节点期待状态
          // CANCELLED(1) : 因为超时或者中断,节点会被设置为勾销状态,被勾销的节点时不会参加到竞争中的,他会始终放弃勾销状态不会转变为其余状态
          // SIGNAL (-1): 后继节点的线程处于期待状态,而以后节点的线程如果开释了同步状态或者被勾销,将会告诉后继节点,使后继节点的线程得以运行
          // CONDITION (-2): 节点在期待队列中,节点线程期待在 Condition 上,当其余线程对 Condition 调用了 signal()后,该节点将会从期待队列中转移到同步队列中,退出到同步状态的获取中
          // PROPAGATE (-3): 示意下一次共享式同步状态获取将会无条件地流传上来
          volatile int waitStatus;
          // 前驱节点
          volatile Node prev;
          // 后续节点
          volatile Node next;
          // 获取同步的线程
          volatile Thread thread;
          // 期待队列中的后续节点
          Node nextWaiter;
          // 返回是否为共享
          final boolean isShared() {return nextWaiter == SHARED;}
          // 返回前继节点
          final Node predecessor() {
              Node p = prev;
              if (p == null)
                  throw new NullPointerException();
              else
                  return p;
          }
    
          Node() {    // Used to establish initial head or SHARED marker}
    
          Node(Thread thread, Node mode) {    // Used by addWaiter
              this.nextWaiter = mode;
              this.thread = thread;
          }
    
          Node(Thread thread, int waitStatus) {    // Used by Condition
              this.waitStatus = waitStatus;
              this.thread = thread;
          }
      }
      
      // 头节点
      private transient volatile Node head;
      // 尾节点
      private transient volatile Node tail;
    
    }    

    退出队列

    因为是 FIFO 队列,整个步骤是依据以后线程和传入的节点模式创立新节点并退出队列尾部。这里尾节点应用 cas 来操作就是保障尾节点是线程平安的。

    private Node addWaiter(Node mode) {
      // 创立节点
      Node node = new Node(Thread.currentThread(), mode);
      // Try the fast path of enq; backup to full enq on failure
      // 获取尾部节点
      Node pred = tail;
      if (pred != null) {    // 若队列中已有节点
          // 则设置新节点的 prev 指向以后尾节点
          node.prev = pred;
          if (compareAndSetTail(pred, node)) { // 若 cas 操作将新节点设置为尾节点
              // 则将原来的尾节点的 next 指向新节点
              pred.next = node;
              return node;
          }
      }
      // 循环尝试,直至胜利
      enq(node);
      return node;
    }

    enq办法通过一个死循环尝试,直到把新节点入队列。

    private Node enq(final Node node) {for (;;) {
          // 获取尾节点
          Node t = tail;
          if (t == null) { // Must initialize 若为空队列
              if (compareAndSetHead(new Node())) //cas 设置新节点为头节点
                  tail = head;    // 尾节点也指向头结点
          } else {    // 非空队列(操作同上)node.prev = t;    
              if (compareAndSetTail(t, node)) {
                  t.next = node;
                  return t;
              }
          }
      }
    }

    该图简略的展现了节点入队的过程。图中数字示意执行程序。红色示意原先队列中的节点,而蓝色示意新增节点。

    出队列

    因为是 FIFO 队列,首节点在开释同步状态时将会告诉后继节点,当后继节点失去同步状态时,会把本人设置为 首节点。这个过程中不须要是不须要 cas 操作的,因为只有一个线程,能够取得同步状态。

    /**
     * Sets head of queue to be node, thus dequeuing. Called only by
     * acquire methods.  Also nulls out unused fields for sake of GC
     * and to suppress unnecessary signals and traversals.
     *
     * @param node the node
     */
    private void setHead(Node node) {
      head = node;
      node.thread = null;
      node.prev = null;
    }

    过程图如图所示。

    其余办法

    AQS 还定义了其余的办法来作为排队和阻塞的机制。AQS 中定义了大量的模板办法来实现同步机制,由子类继承实现。主体上能够分为三大类:独占式获取、开释锁;共享式获取、开释锁;查看同步队列线程状况。

  1. 独占式:同一时刻只有一个线程能够取得锁

    • acquire(int arg):独占式获取同步状态,如果以后线程获取同步状态胜利,则由该办法返回,否则,将会进入同步队列期待,该办法将会调用可重写的 tryAcquire(int arg)办法;
    • acquireInterruptibly(int arg):与 acquire(int arg)雷同,然而该办法响应中断,以后线程为获取到同步状态而进入到同步队列中,如果以后线程被中断,则该办法会抛出 InterruptedException 异样并返回;
    • tryAcquire(int arg):独占式获取同步状态,获取同步状态胜利后,其余线程须要期待该线程开释同步状态能力获取同步状态;
    • tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果以后线程在 nanos 工夫内没有获取到同步状态,那么将会返回 false,曾经获取则返回 true;
    • release(int arg):独占式开释同步状态,该办法会在开释同步状态之后,将同步队列中第一个节点蕴含的线程唤醒;
    • tryRelease(int arg):独占式开释同步状态;
  2. 共享式:同一时刻能够有多个线程取得锁,比方读锁多个线程取得,写锁一个线程取得

    • acquireShared(int arg):共享式获取同步状态,如果以后线程未获取到同步状态,将会进入同步队列期待,与独占式的次要区别是在同一时刻能够有多个线程获取到同步状态;
    • acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;
    • tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于 0 则示意获取胜利,否则获取失败;
    • tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,减少超时限度;
    • releaseShared(int arg):共享式开释同步状态;
    • tryReleaseShared(int arg):共享式开释同步状态;
  3. 查问同步队列线程状况

    • isHeldExclusively():以后同步器是否在独占式模式下被线程占用,个别该办法示意是否被以后线程所独占;

咱们接下来就筛选最根底的获取和开释锁来看看是怎么解决的。

独占式

获取锁

public final void acquire(int arg) {if (!tryAcquire(arg) && 
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
        selfInterrupt();}

// 子类实现
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}

final boolean acquireQueued(final Node node, int arg) {
    // 是否失败
    boolean failed = true;
    try {
        // 是否中断
        boolean interrupted = false;
        // 死循环自旋
        for (;;) {
            // 获取前节点
            final Node p = node.predecessor();
            // 若前节点为头节点且获取锁胜利
            if (p == head && tryAcquire(arg)) {
                // 设置头节点
                setHead(node);
                p.next = null; // help GC
                fail = false;
                return interrupted;
            }
            // 获取失败,判断是否须要阻塞 后文介绍(线程阻塞)if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {if (failed)
            cancelAcquire(node);
    }
}

tryAcquire 是交由子类同步器实现的办法,具体就是尝试获取锁,如果胜利则间接返回;否则执行addWaiter(Node.EXCLUSIVE) 该线程退出到 FIFO 队列尾部;而后执行 acquireQueued 线程会进入一个死循环自旋的过程去获取锁直到退出,并返回是否被中断。

开释锁

public final boolean release(int arg) {if (tryRelease(arg)) {    // 尝试开释锁
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 唤醒后继节点 该办法前面会介绍
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// 子类实现
protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();
}

tryReleasetryAcquire 一样是交由子类同步器实现的办法,具体则是尝试开释锁;如果胜利则调用 unparkSuccessor 唤醒后继节点。

共享式

获取锁

public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
// 由子类实现
protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();
}


private void doAcquireShared(int arg) {
    // 退出队列尾部
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 死循环自旋
        for (;;) {
            // 前继节点
            final Node p = node.predecessor();
            if (p == head) {    // 头结点
                // 尝试获取锁
                int r = tryAcquireShared(arg);
                if (r >= 0) {setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {if (failed)
            cancelAcquire(node);
    }
}

tryAcquireShared 则不多作介绍都是由子类实现,这里要留神的是共享式返回值是 int,只有该值大于等于 0 则示意获取到了锁,否则通过 doAcquireShared 退出队列自旋获取锁。(整个过程相似于独占式获取锁)

开释锁

public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();
        return true;
    }
    return false;
}
// 子类实现
protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();
}

private void doReleaseShared() {
    // 死循环 自旋
    for (;;) {
        // 获取头节点
        Node h = head;    
        if (h != null && h != tail) { // 不是只有一个节点
            int ws = h.waitStatus;    // 获取节点状态
            if (ws == Node.SIGNAL) {    // 唤醒后继节点
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))     //cas 开释
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&        // 初始化状态
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //cas 开释
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

可能存在多个线程同时进行开释同步资源(读锁),所以通过 CAS 操作须要确保同步状态平安胜利开释。

线程阻塞

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
       // 前驱节点期待状态
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)    // 前驱节点处于处于期待状态 间接返回
        return true;
    if (ws > 0) {    // 前驱节点被中断或勾销,须要从队列中移除 直到所有中断或勾销的前驱节点全副被移除
        do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
        pred.next = node;
    } else {    // ws < 0 即 -2 - 3 的状况 CONDITION PROPAGATE
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

该段代码处于循环获取锁的过程中(acquireQueued),在自旋的过程中须要判断以后线程是否须要阻塞。该办法依附前驱节点来判断是否须要阻塞。如果返回 true 则调用下列办法。

private final boolean parkAndCheckInterrupt() {LockSupport.park(this);
    return Thread.interrupted();}

该办法调用次要应用 LockSupport 把线程挂起,阻塞住线程的调用栈,同时返回以后线程的中断状态。

线程唤醒

private void unparkSuccessor(Node node) {
    // 以后节点状态
    int ws = node.waitStatus;
    if (ws < 0) // 节点状态为 0 设置为 0
        compareAndSetWaitStatus(node, ws, 0);
    // 后继节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) { // 后继节点为空或被中断勾销
        s = null;
        // 从尾节点向前找到可用节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;    
    }
    // 唤醒后继节点
    if (s != null)
        LockSupport.unpark(s.thread);
}

当线程被阻塞且前一个线程开释锁后,调用 unparkSuccessor 办法来唤醒后继节点。从尾节点开始寻找一个最近可用的节点,为什么从最初一个找是因为正着找可能后继节点依然会是为 null 或勾销。此处尾节点开始找到之后并没有退出循环,而是持续往前找,直到找到间隔开释锁最近的一个节点。

参考链接

【死磕 Java 并发】—–J.U.C 之 AQS:AQS 简介
【死磕 Java 并发】—–J.U.C 之 AQS:CLH 同步队列
【死磕 Java 并发】—–J.U.C 之 AQS:同步状态的获取与开释
【死磕 Java 并发】—–J.U.C 之 AQS:阻塞和唤醒线程

正文完
 0