关于java:AQSJava-中悲观锁的底层实现机制

25次阅读

共计 13023 个字符,预计需要花费 33 分钟才能阅读完成。

介绍 AQS

AQS(AbstractQueuedSynchronizer)是 Java 并发包中,实现各种同步组件的根底。比方

  • 各种锁:ReentrantLock、ReadWriteLock、StampedLock
  • 各种线程同步工具类:CountDownLatch、CyclicBarrier、Semaphore
  • 线程池中的 Worker

Lock 接口的实现根本都是通过聚合了一个 AQS 的子类来实现线程访问控制的。


Doug Lea 已经介绍过 AQS 的设计初衷。从原理上,一种同步组件往往是能够利用其余的组件实现的,例如能够应用 Semaphore 实现互斥锁。然而,对某种同步组件的偏向,会导致简单、艰涩的实现逻辑,所以,他抉择了将根底的同步相干操作形象在 AbstractQueuedSynchronizer 中,利用 AQS 为咱们构建同步组件提供了范本。

如何应用 AQS

利用 AQS 实现一个同步组件,咱们至多要实现两类根本的办法,别离是:

  • 获取资源,须要实现 tryAcquire(int arg) 办法
  • 开释资源,须要实现 tryRelease(int arg) 办法

如果须要共享式获取 / 开释资源,须要实现对应的 tryAcquireShared(int arg)、tryReleaseShared(int arg)


AQS 应用的是模板办法设计模式。AQS 办法的修饰符很有法则,其中,应用 protected 润饰的办法为形象办法,通常须要子类去实现,从而实现不同的同步组件;应用 public 润饰的办法根本能够认为是模板办法,不倡议子类间接笼罩。

通过调用 AQS 的 acquire(int arg) 办法能够获取资源,该办法会调用 protected 润饰的 tryAcquire(int arg) 办法,因而咱们须要在 AQS 的子类中实现 tryAcquire(int arg),tryAcquire(int arg) 办法的作用是:获取资源。

以后线程获取资源并执行了相应逻辑之后,就须要开释资源,使得后续节点可能持续获取资源。通过调用 AQS 的 release(int arg) 办法能够开释资源,该办法会调用 protected 润饰的 tryRelease(int arg) 办法,因而咱们须要在 AQS 的子类中实现 tryRelease(int arg),tryRelease(int arg) 办法的作用是:开释资源。

AQS 的实现原理

从实现角度剖析 AQS 是如何实现线程访问控制。

AQS 的实现原理能够从 同步阻塞队列、获取资源时的执行流程、开释资源时的执行流程 这 3 个方面介绍。

同步阻塞队列

AQS 依赖外部的同步阻塞队列(一个 FIFO 双向队列)来实现资源的治理。

同步阻塞队列的工作机制

  • 节点:同步阻塞队列中的节点(Node)用来保留获取资源失败的线程援用、期待状态以及前驱和后继节点,没有胜利获取资源的线程将会成为节点退出同步阻塞队列的尾部,同时会阻塞以后线程(Java 线程处于 WAITING 状态,开释 CPU 的使用权)。
  • 首节点:同步阻塞队列遵循 FIFO(先进先出),首节点是获取资源胜利的节点,首节点的线程在开释资源时,将会唤醒后继节点,使其再次尝试获取资源,而后继节点将会在获取资源胜利时将本人设置为首节点。
static final class Node {
    /**
     * Marker to indicate a node is waiting in shared mode
     */
    static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node();
    /**
     * Marker to indicate a node is waiting in exclusive mode
     */
    static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null;

    /**
     * waitStatus value to indicate thread has cancelled
     */
    static final int CANCELLED = 1;
    /**
     * waitStatus value to indicate successor's thread needs unparking
     */
    static final int SIGNAL = -1;
    /**
     * waitStatus value to indicate thread is waiting on condition
     */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;

    // 期待状态
    volatile int waitStatus;

    // 前驱节点
    volatile AbstractQueuedSynchronizer.Node prev;

    // 后继节点
    volatile AbstractQueuedSynchronizer.Node next;

    /**
     * The thread that enqueued this node.  Initialized on
     * construction and nulled out after use.
     */
    volatile Thread thread;

    // 条件期待队列的后继节点
    AbstractQueuedSynchronizer.Node nextWaiter;

    /**
     * Returns true if node is waiting in shared mode.
     */
    final boolean isShared() {return nextWaiter == SHARED;}

    /**
     * Returns previous node, or throws NullPointerException if null.
     * Use when predecessor cannot be null.  The null check could
     * be elided, but is present to help the VM.
     *
     * @return the predecessor of this node
     */
    final AbstractQueuedSynchronizer.Node predecessor() throws NullPointerException {
        AbstractQueuedSynchronizer.Node p = prev;
        if (p == null) throw new NullPointerException();
        else return p;
    }

    Node() {    // Used to establish initial head or SHARED marker}

    Node(Thread thread, AbstractQueuedSynchronizer.Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

期待状态
在节点中用 volatile int waitStatus 属性示意节点的期待状态。
节点有如下几种期待状态:

  • CANCELLED,值为 1,因为在同步阻塞队列中期待的线程期待超时或者被中断,须要从同步阻塞队列中勾销期待,节点进人该状态将不会变动
  • SIGNAL,值为 -1,后继节点的线程处于期待状态,而以后节点的线程如果开释了同步状态或者被勾销,将会告诉后继节点,使后继节点的线程得以运行
  • CONDITION,值为 -2,节点在条件期待队列中,节点线程期待在 Condition 上,当其余线程对 Condition 调用了 signal() 办法后,该节点将会从条件期待队列转移到同步阻塞队列中,退出到对同步状态的获取中
  • PROPAGATE,值为 -3,示意下一次共享式同步状态获取将会无条件地被流传上来
  • INITIAL,值为 0,初始状态

获取资源、开释资源的执行流程,论断后行:

  • 在获取资源时,获取资源失败的线程都会被退出到同步阻塞队列中,并在队列中进行自旋;移出队列(或进行自旋)的条件是前驱节点为头节点且胜利获取了资源。
  • 在开释资源时,AQS 调用 tryRelease(int arg) 办法开释资源,而后唤醒头节点的后继节点。

获取资源

上面来介绍获取资源时的执行流程。

调用 AQS 的 acquire(int arg) 办法能够获取资源。

acquire(int arg) 办法是独占式获取资源,它调用流程如下图所示。

用文字描述 acquire(int arg) 办法的调用流程:首先调用自定义 AQS 实现的 tryAcquire(int arg) 办法,该办法的作用是尝试获取资源:

  • 如果获取资源胜利,则间接从 acquire(int arg) 办法返回
  • 如果获取资源失败,则结构节点,并将该节点退出到同步阻塞队列的尾部,最初调用 acquireQueued(Node node,int arg) 办法,使得该节点以“死循环”的形式尝试获取资源。只有以后节点的前驱节点是头节点,能力尝试获取资源。
    • 如果以后节点的前驱节点是头节点,并且获取资源胜利,则设置以后节点为头节点,并从 acquireQueued(Node node,int arg) 办法返回
    • 如果以后节点的前驱节点不是头节点 或者 获取资源失败,则阻塞以后线程,线程被唤醒后继续执行该循环操作

acquireQueued(Node node,int arg) 办法的调用过程也被称为“自旋过程”。

自旋是什么意思是呢?我的了解就是:自旋就是一个死循环,循环执行某个操作序列,直到满足某个条件才退出循环。


/**
 * Acquires in exclusive mode, ignoring interrupts.  Implemented
 * by invoking at least once {@link #tryAcquire},
 * returning on success.  Otherwise the thread is queued, possibly
 * repeatedly blocking and unblocking, invoking {@link
 * #tryAcquire} until success.  This method can be used
 * to implement method {@link Lock#lock}.
 *
 * @param arg the acquire argument.  This value is conveyed to
 *        {@link #tryAcquire} but is otherwise uninterpreted and
 *        can represent anything you like.
 */
public final void acquire(int arg) {if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();}

acquire(int arg) 的次要逻辑是:

首先调用自定义 AQS 实现的 tryAcquire(int arg) 办法,该办法保障线程平安的获取资源:

  • 如果获取资源胜利,则间接从 acquire(int arg) 办法返回
  • 如果获取资源失败,则结构同步节点(独占式 Node.EXCLUSIVE,同一时刻只能有一个线程胜利获取资源)并通过 addWaiter(Node node) 办法将该节点退出到同步阻塞队列的尾部,最初调用 acquireQueued(Node node,int arg) 办法,使得该节点以“死循环”的形式获取资源。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒次要依附 前驱节点的出队 或 阻塞线程被中断 来实现。
/**
 * Acquires in exclusive uninterruptible mode for thread already in
 * queue. Used by condition wait methods as well as acquire.
 *
 * @param node the node
 * @param arg the acquire argument
 * @return {@code true} if interrupted while waiting
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {if (failed)
            cancelAcquire(node);
    }
}

在 acquireQueued(final Node node,int arg) 办法中,以后线程在“死循环”中尝试获取资源,而只有前驱节点是头节点才可能尝试获取资源,这是为什么?起因有两个,如下。

  • 第一,头节点是胜利获取到资源的节点,而头节点的线程开释了资源之后,将会唤醒其后继节点,后继节点的线程被唤醒后须要查看本人的前驱节点是否是头节点。
  • 第二,保护同步阻塞队列的 FIFO 准则。

开释资源

以后线程获取资源并执行了相应逻辑之后,就须要开释资源,使得后续节点可能持续获取资源。

上面来介绍开释资源时的执行流程。

通过调用 AQS 的 release(int arg) 办法能够开释资源,该办法在开释资源之后,会唤醒头节点的后继节点,进而使后继节点从新尝试获取资源。


/**
 * Releases in exclusive mode.  Implemented by unblocking one or
 * more threads if {@link #tryRelease} returns true.
 * This method can be used to implement method {@link Lock#unlock}.
 *
 * @param arg the release argument.  This value is conveyed to
 *        {@link #tryRelease} but is otherwise uninterpreted and
 *        can represent anything you like.
 * @return the value returned from {@link #tryRelease}
 */
public final boolean release(int arg) {if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

release(int arg) 办法执行时,会唤醒头节点的后继节点线程,unparkSuccessor(Node node) 办法应用 LockSupport#unpark() 办法来唤醒处于期待状态的线程。

共享式 获取 & 开释 资源

下面讲的是独占式获取 / 开释 资源。

共享式获取与独占式获取最次要的区别在于:同一时刻是否有多个线程同时获取到资源。以文件的读写为例,如果一个程序在对文件进行读操作,那么这一时刻对于该文件的写操作均被阻塞,而读操作可能同时进行。写操作要求对资源的独占式拜访,而读操作能够是共享式拜访。

  • 共享式拜访资源时,其余共享式的拜访均被容许,独占式拜访被阻塞
  • 独占式拜访资源时,同一时刻其余拜访均被阻塞

共享式获取资源

调用 AQS 的 acquireShared(int arg) 办法能够共享式地获取资源。

在 acquireShared(int arg) 办法中,AQS 调用 tryAcquireShared(int arg) 办法尝试获取资源,tryAcquireShared(int arg) 办法返回值为 int 类型,当返回值 >= 0 时,示意可能获取到资源。

能够看到,在 doAcquireShared(int arg) 办法的自旋过程中,如果以后节点的前驱为头节点时,能力尝试获取资源,如果获取资源胜利(返回值 >= 0),则设置以后节点为头节点,并从自旋过程中退出。

public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {final Node p = node.predecessor();
            if (p == head) {int r = tryAcquireShared(arg);
                if (r >= 0) {setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {if (failed)
            cancelAcquire(node);
    }
}

共享式开释资源
调用 releaseShared(int arg) 办法能够开释资源。该办法在开释资源之后,会唤醒头节点的后继节点,进而使后继节点从新尝试获取资源。

对于可能反对多个线程同时拜访的并发组件(比方 Semaphore),它和独占式次要区别在于 tryReleaseShared(int arg) 办法必须确保资源平安开释,因为开释资源的操作会同时来自多个线程。确保资源平安开释个别是通过循环和 CAS 来保障的。

public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();
        return true;
    }
    return false;
}

独占式超时获取资源

调用 AQS 的 doAcquireNanos(int arg,long nanosTimeout) 办法能够超时获取资源,即在指定的时间段内获取资源,如果获取资源胜利则返回 true,否则返回 false。

该办法提供了传统 Java 同步操作(比方 synchronized 关键字)所不具备的个性。


在剖析该办法的实现前,先介绍一下响应中断的获取资源过程。

  • 在 Java 5 之前,当一个线程获取不到锁而被阻塞在 synchronized 之外时,对该线程进行中断操作,此时该线程的中断标记位会被批改,但线程依旧会阻塞在 synchronized 上,期待着获取锁。
  • 在 Java 5 中,AQS 提供了 acquireInterruptibly(int arg) 办法,这个办法在期待获取资源时,如果以后线程被中断,会立即返回,并抛出 InterruptedException。

acquire(int arg) 办法对中断不敏感,也就是因为线程获取资源失败后进入同步阻塞队列中,后续对线程进行中断操作时,线程不会从同步阻塞队列中移出。

超时获取资源过程能够被视作响应中断获取资源过程的“增强版”,doAcquireNanos(int arg,long nanosTimeout) 办法在反对响应中断的根底上,减少了超时获取的个性。

针对超时获取,次要须要计算出须要睡眠的工夫距离 nanosTimeout,为了避免过早告诉,nanosTimeout 计算公式为:nanosTimeout -= now – lastTime,其中 now 为以后唤醒工夫,lastTime 为上次唤醒工夫,如果 nanosTimeout 大于 0 则示意超时工夫未到,须要持续睡眠 nanosTimeout 纳秒,反之,示意曾经超时。

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

/**
 * Acquires in exclusive timed mode.
 *
 * @param arg the acquire argument
 * @param nanosTimeout max wait time
 * @return {@code true} if acquired
 */
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {for (;;) {final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();}
    } finally {if (failed)
            cancelAcquire(node);
    }
}

该办法在自旋过程中,当节点的前驱节点为头节点时尝试获取资源,如果胜利获取资源则从该办法返回,这个过程和独占式同步获取的过程相似,然而在获取资源失败的解决上有所不同。

如果以后线程获取资源失败,则判断是否超时(nanosTimeout 小于等于 0 示意曾经超时),如果没有超时,则从新计算超时距离 nanosTimeout,而后使以后线程期待 nanosTimeout 纳秒(当已到设置的超时工夫,该线程会从 LockSupport.parkNanos(Object blocker,long nanos)办法返回)。

如果 nanosTimeout 小于等于 spinForTimeoutThreshold(1000 纳秒)时,将不会使该线程进行超时期待,而是进入疾速的自旋过程。起因在于,十分短的超时期待无奈做到非常准确,如果这时再进行超时期待,相同会让 nanosTimeout 的超时从整体上体现得反而不准确。因而,在超时十分短的场景下,AQS 会进入无条件的疾速自旋。

独占式超时获取资源的流程如下所示。

从图中能够看出,独占式超时获取资源 doAcquireNanos(int arg,long nanosTimeout) 和独占式获取资源 acquire(int args)在流程上十分类似,其次要区别在于:未获取到资源时的解决逻辑。

acquire(int args) 在未获取到资源时,将会使以后线程始终处于期待状态,而 doAcquireNanos(int arg,long nanosTimeout) 会使以后线程期待 nanosTimeout 纳秒,如果以后线程在 nanosTimeout 纳秒内没有获取到资源,将会从期待逻辑中主动返回。

Condition 的实现原理

技术是为了解决问题而生的,通过 Condition 咱们能够实现期待 / 告诉性能。

ConditionObject 是 AQS 的外部类。每个 Condition 对象都蕴含着一个条件期待队列,这个条件期待队列是 Condition 对象实现期待 / 告诉性能的要害。

上面咱们剖析 Condition 的实现原理,次要包含:条件期待队列、期待 和 告诉。

上面提到的 Condition 如果不加阐明均指的是 ConditionObject。

条件期待队列

Condition 依赖外部的条件期待队列(一个 FIFO 双向队列)来实现期待 / 告诉性能。

条件期待队列的工作机制

  • 节点:条件期待队列中的每个节点(Node)都蕴含一个线程援用,该线程就是在 Condition 对象上期待的线程,如果一个线程调用了 Condition.await()办法,那么该线程将会开释资源、结构成为节点退出条件期待队列的尾部,同时线程状态变为期待状态。

事实上,条件期待队列中的节点定义复用了 AQS 节点的定义,也就是说,同步阻塞队列和条件期待队列中节点类型都是 AQS 的动态外部类 AbstractQueuedSynchronizer.Node。

在 Object 的监视器模型上,一个对象领有一个同步阻塞队列和一个条件期待队列,而并发包中的 Lock(更确切地说是 AQS)领有一个同步阻塞队列和多个条件期待队列。

期待

上面来介绍让线程期待的执行流程。

调用 Condition 的 await() 办法(或者以 await 结尾的办法),将会使以后线程开释资源、结构成为节点退出条件期待队列的尾部,同时线程状态变为期待状态。

如果从队列(同步阻塞队列和条件期待队列)的角度看 await()办法,当调用 await() 办法时,相当于同步阻塞队列的首节点(获取到锁的节点)挪动到 Condition 的条件期待队列中。并且同步阻塞队列的首节点并不会间接退出条件期待队列,而是通过 addConditionWaiter() 办法把以后线程结构成一个新的节点,将其退出条件期待队列中。

/**
 * Implements interruptible condition wait.
 * <ol>
 * <li> If current thread is interrupted, throw InterruptedException.
 * <li> Save lock state returned by {@link #getState}.
 * <li> Invoke {@link #release} with saved state as argument,
 *      throwing IllegalMonitorStateException if it fails.
 * <li> Block until signalled or interrupted.
 * <li> Reacquire by invoking specialized version of
 *      {@link #acquire} with saved state as argument.
 * <li> If interrupted while blocked in step 4, throw InterruptedException.
 * </ol>
 */
public final void await() throws InterruptedException {if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

告诉

上面来介绍唤醒期待线程的执行流程。

调用 Condition 的 signal() 办法,将会唤醒在条件期待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将以后节点从条件期待队列挪动到同步阻塞队列中。

条件期待队列中的节点被唤醒后,被唤醒的线程以“死循环”的形式尝试获取资源。胜利获取资源之后,被唤醒的线程将从先前调用的 await() 办法返回。

如果被唤醒的线程不是通过其余线程调用 Condition.signal() 办法唤醒,而是对期待线程进行中断,则会抛出 InterruptedException。

被唤醒的线程,将从 await() 办法中的 while 循环中退出(isOnSyncQueue(Node node) 办法返回 true,节点曾经在同步阻塞队列中),进而调用 AQS 的 acquireQueued() 办法以“死循环”的形式尝试获取资源。胜利获取资源之后,被唤醒的线程将从先前调用的 await() 办法返回。


Condition 的 signalAll() 办法,相当于对条件期待队列中的每个节点均执行一次 signal() 办法,成果就是将条件期待队列中所有节点全副挪动到同步阻塞队列中,并唤醒每个节点的线程。

尽管是把每个节点的线程都唤醒了,这些线程须要尝试获取资源,然而只有一个线程可能胜利获取资源,而后从 await() 办法返回;其余获取资源失败的线程又都会被退出到同步阻塞队列中,并在队列中进行自旋;移出队列(或进行自旋)的条件是前驱节点为头节点且胜利获取了资源。

/**
 * Moves the longest-waiting thread, if one exists, from the
 * wait queue for this condition to the wait queue for the
 * owning lock.
 *
 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
 *         returns {@code false}
 */
public final void signal() {if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

/**
 * Removes and transfers nodes until hit non-cancelled one or
 * null. Split out from signal in part to encourage compilers
 * to inline the case of no waiters.
 * @param first (non-null) the first node on condition queue
 */
private void doSignal(Node first) {
    do {if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

参考资料

《Java 并发编程艺术》第 5 章:Java 中的锁

正文完
 0