从ReentrantLock到AQS的原理及应用

38次阅读

共计 11222 个字符,预计需要花费 29 分钟才能阅读完成。

前言

Java 并发包(JUC:java.util.concurrent)中提供了很多并发工具,这其中,很多咱们耳熟能详的并发工具,ReentrantLock、Semaphore,它们的实现都用到了一个独特的基类 –AbstractQueuedSynchronizer, 简称 AQS。AQS 是一个用来构建锁和同步器的框架,应用 AQS 能简略且高效地结构出利用宽泛的大量的同步器,比方咱们提到的 ReentrantLock,Semaphore,其余的诸如 ReentrantReadWriteLock,SynchronousQueue,FutureTask 等等皆是基于 AQS 的。当然,咱们本人也能利用 AQS 十分轻松容易地结构出合乎咱们本人需要的同步器。

AQS 负责管理同步器类中的状态,他治理了一个整数状态信息,能够通过 getState、setState、compareAndSetState 等类型办法来进行操作。ReentrantLock 示意所有线程曾经反复获取所的次数,Semaphore 示意残余许可数量。

ReentrantLock

1.1 ReentrantLock 概览

ReentrantLock 是一个可重入的互斥锁,又被称为“独占锁”。ReentrantLock 类实现了 Lock,它领有与 synchronized 雷同的并发性和内存语义,然而增加了相似锁投票、定时锁等待和可中断锁等待的一些个性。此外,它还提供了在强烈争用状况下更佳的性能。(换句话说,当许多线程都想访问共享资源时,JVM 能够花更少的时候来调度线程,把更多工夫用在执行线程上。)
ReentrantLock 锁在同一个工夫点只能被一个线程锁持有。

可重入:ReentrantLock 锁,能够被单个线程屡次获取。

ReentrantLock 分为“偏心锁”和“非偏心锁”。它们的区别体现在获取锁的机制上是否偏心。ReentrantLock 在同一个工夫点只能被一个线程获取 (当某线程获取到“锁”时,其它线程就必须期待);ReentraantLock 是通过一个 FIFO(先进先出)的期待队列来治理获取该锁所有线程的。在“偏心锁”的机制下,线程顺次排队获取锁;而“非偏心锁”在锁是可获取状态时,不论本人是不是在队列的结尾都会尝试获取锁而不须要马上排队( 获取不到锁再排队)。

通常咱们会将 ReentrantLock 和 synchronized 做比拟,两者孰优孰劣,其实在 java 官网对 synchronized 进行了诸多优化之后(偏差锁、轻量锁、自适应自旋、锁打消、锁粗化 …),两者的性能差距并不大,只是在某些方面存在差别而已,在此用一张表格做一个简略的比照:

// **************************Synchronized 的应用形式 **************************
// 1. 用于代码块
synchronized (this) {}
// 2. 用于对象
synchronized (object) {}
// 3. 用于办法
public synchronized void test () {}
// 4. 可重入
for (int i = 0; i < 100; i++) {synchronized (this) {}}
// **************************ReentrantLock 的应用形式 **************************
public void test () throw Exception {
    // 1. 初始化抉择偏心锁、非偏心锁
    ReentrantLock lock = new ReentrantLock(true);
    // 2. 可用于代码块
    lock.lock();
    try {
        try {
            // 3. 反对多种加锁形式,比拟灵便; 具备可重入个性
            if(lock.tryLock(100, TimeUnit.MILLISECONDS)){}} finally {
            // 4. 手动开释锁
            lock.unlock()}
    } finally {lock.unlock();
    }
}

1.2 ReentrantLock 与 AQS

下面咱们对 ReentrantLock 有了一个大略的理解,接下来能够来看看 ReentrantLock 和 AQS 之间到底是什么关系呢,其实 ReentrantLock 的底层就是应用的 AQS 实现的,咱们一起来看看源码:

/**ReentrantLock 实现了 Lock 类 其中有一个字段,Sync **/
public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    
    private final Sync sync;

    /**
     * Sync 就是继承了 AQS 的抽象类,此锁的同步控制根底。* 上面将 Sync 细分为偏心锁和非偏心锁。* 应用 AQS 的 state 来示意锁的重入次数。*/
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * 形象的 lock 办法,别离给偏心锁和非偏心锁具体实现。*/
        abstract void lock();
        ...
    }

    /**
     * 非偏心锁
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        ...
    }

    /**
     * 偏心锁
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
        
        final void lock() {acquire(1);
        }
        ...
    }

    /**
     * ReentrantLock 的默认构造方法,默认创立非偏心锁
     */
    public ReentrantLock() {sync = new NonfairSync();
    }

    /**
     * 通过传入 boolean 参数来决定创立偏心锁还是非偏心锁。*/
    public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}

ReentrantLock 中的偏心锁和非偏心锁实现形式差异不大,差异在于偏心锁判断是否间接进入队列,先看看非偏心锁的加锁流程源码:

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        /** 加锁办法 **/
        final void lock() {
            // 若通过 CAS 设置变量 State(同步状态)胜利,也就是获取锁胜利,则将以后线程设置为独占线程。if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
            // 若通过 CAS 设置变量 State(同步状态)失败,也就是获取锁失败,则进入 Acquire 办法进行后续解决。acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
        }
    }

再看下偏心锁源码中获锁的形式:

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {acquire(1);
        }
        ...
    }

通过查看偏心锁和非偏心锁获取锁的源码得悉,都会通过 lock()办法区上锁,然而 lock()办法到底是怎么上锁的呢?带着这样的疑难,咱们持续跟踪一下会发现,这个办法属于 FairSync 和 NonfairSync 的父类 AQS(AbstractQueuedSynchronizer) 的外围办法。
在理解 AQS 之前,咱们带着几个疑难:

  • 下面提到的通过 CAS 尝试扭转 state 的状态来示意获取锁胜利或者失败,如果获取失败了,调用 acquire() 办法,是怎么实现的呢?
  • 获取不到锁,这个线程是在干嘛呢?一直尝试锁?还是挂起期待唤醒?

上面咱们会对 AQS 以及 ReentrantLock 和 AQS 的关联做具体介绍。

AQS

理解 AQS 之前,先看看 AQS 的整体框架:

  • 上图中有色彩的为 Method,无色彩的为 Attribution。
  • AQS 框架共分为五层,自上而下由浅入深,从 AQS 对外裸露的 API 到底层根底数据。
  • 当有自定义同步器接入时,只需重写第一层所须要的局部办法即可,不须要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先通过第一层的 API 进入 AQS 外部办法,而后通过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的期待队列解决,而这些解决形式均依赖于第五层的根底数据提供层。

AQS 概述

AQS 保护一个用 Volatile 的 int 类型 state 的共享资源,通过内置的 FIFO 来实现获取资源线程的排队工作。( 这个内置的同步队列称为 ”CLH” 队列)。该队列由一个一个的 Node 结点组成,每个 Node 结点保护一个 prev 援用和 next 援用,别离指向本人的前驱和后继结点。AQS 保护两个指针,别离指向队列头部 head 和尾部 tail。

其实就是个变体 双端双向链表
当线程获取资源失败(比方 tryAcquire 时试图设置 state 状态失败),会被结构成一个结点退出 CLH 队列中,同时以后线程会被阻塞在队列中(通过 LockSupport.park 实现,其实是期待态)。当持有同步状态的线程开释同步状态时,会唤醒后继结点,而后此结点线程持续退出到对同步状态的抢夺中。

Node 节点

static final class Node {
        /** waitStatus 值,示意线程已被勾销(期待超时或者被中断)*/
        static final int CANCELLED =  1;
        /** waitStatus 值,示意后继线程须要被唤醒(unpaking)*/
        static final int SIGNAL    = -1;
        /**waitStatus 值,示意结点线程期待在 condition 上,当被 signal 后,会从期待队列转移到同步到队列中 */
        static final int CONDITION = -2;
       /** waitStatus 值,示意下一次共享式同步状态会被无条件地流传上来
        static final int PROPAGATE = -3;
        /** 期待状态,初始为 0 */
        volatile int waitStatus;
        /** 以后结点的前一个结点 */
        volatile Node prev;
        /** 以后结点的后一个结点 */
        volatile Node next;
        /** 与以后结点关联的排队中的线程 */
        volatile Thread thread;
        /** ...... */
    }

同步状态 State

/**
     * The synchronization state.
     */
    private volatile int state;
    
    protected final int getState() {return state;}

    
    protected final void setState(int newState) {state = newState;}

    
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

通过 AQS 中 state 的源码能够看出,对于 state 的办法都是 final 润饰的,阐明子类中无奈重写它们。咱们能够通过批改 State 字段示意的同步状态来实现多线程的 独占模式和共享模式(加锁过程)
一般来说,自定义同步器要么是独占形式,要么是共享形式,它们也只需实现 tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared 中的一种即可。AQS 也反对自定义同步器同时实现独占和共享两种形式,如 ReentrantReadWriteLock。ReentrantLock 是独占锁,所以实现了 tryAcquire-tryRelease。

独占模式(通过 ReentrantLock 了解 AQS)

获取同步状态 –acquire()

public final void acquire(int arg) {if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();}
  • tryAcquire: 首先,调用 tryAcquire 办法,若返回 true,意味着获取同步状态胜利,前面的逻辑不再执行;若返回 false,也就是获取同步状态失败,进入 acquireQueued 步骤;
  • acquireQueued: 此时,获取同步状态失败,结构独占式同步结点,通过 addWatiter 将此结点增加到同步队列的尾部(此时可能会有多个线程结点试图退出同步队列尾部,须要以线程平安的形式增加);
  • selfInterrupt: 该结点以在队列中尝试获取同步状态,若获取不到,则阻塞结点线程,直到被前驱结点唤醒或者被中断。

以非偏心锁为例,能够看到获取锁的大体流程

final boolean nonfairTryAcquire(int acquires) {
            // 获取以后线程
            final Thread current = Thread.currentThread();
            // 获取 state 的值
            int c = getState();
            // 如果 state 为 0,阐明以后没有其余线程占用共享资源,能够尝试获取锁
            if (c == 0) {
                // 用 CAS 批改 state 的值
                if (compareAndSetState(0, acquires)) {
                    // 批改胜利,获取锁胜利,解决以后线程
                    setExclusiveOwnerThread(current);
                    // 返回获取锁胜利
                    return true;
                }
            }
            // 如果 state 不为 0,判断占用锁的线程是否是以后线程
            else if (current == getExclusiveOwnerThread()) {
                // 如果是以后线程,对 state 做增量递增
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                    // 设置 state
                setState(nextc);
                // 返回获取锁胜利(可重入的原理)return true;
            }
            return false;
        }
  • 获取以后线程;
  • 获取 state 的值;
  • 如果 state 为 0,阐明以后没有其余线程占用共享资源,能够尝试获取锁;
  • 用 CAS 批改 state 的值;批改胜利,获取锁胜利,解决以后线程;
  • 返回获取锁胜利;
  • 如果 state 不为 0,判断占用锁的线程是否是以后线程;
  • 如果是以后线程,对 state 做增量递增;
  • 返回获取锁胜利(可重入的原理)。

值的留神得中央是,偏心锁中多了一部判断,再来看看偏心锁获取锁的大略流程:

protected final boolean tryAcquire(int acquires) {
            // 获取以后线程;final Thread current = Thread.currentThread();
            // 获取 state 的值;int c = getState();
            // 如果 state 为 0,阐明以后没有其余线程占用共享资源,能够尝试获取锁
            if (c == 0) {
                // 判断以后期待队列中是否存在无效节点的办法,// 如果返回 False,阐明以后线程能够争取共享资源;// 如果返回 True,阐明队列中存在无效节点,以后线程必须退出到期待队列中。// 用 CAS 批改 state 的值;批改胜利,获取锁胜利,解决以后线程;if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 如果 state 不为 0,判断占用锁的线程是否是以后线程;else if (current == getExclusiveOwnerThread()) {
                // 如果是以后线程,对 state 做增量递增;int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                    // 返回获取锁胜利(可重入的原理)。setState(nextc);
                return true;
            }
            return false;
        }
/** 判断以后期待队列中是否存在无效节点的办法 **/
public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized 其正确性取决于是否初始化头结点
        // before tail and on head.next being accurate if the current 如果以后节点在头结点和尾结点之间
        // thread is first in queue.  线程在队列中第一个
        // 获取以后尾结点
        Node t = tail; // Read fields in reverse initialization order
        // 获取以后尾结点
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是在第二个节点开始的。当 h != t 时:1. 如果(s = h.next)==null,期待队列正在有线程进行初始化,但只是进行到了 Tail 指向 Head,没有将 Head 指向 Tail,此时队列中有元素,须要返回 True(这块具体见下边 addWaiter()曾经 enq()代码剖析)。2. 如果(s = h.next) != null,阐明此时队列中至多有一个无效节点,3. 如果此时 s.thread ==Thread.currentThread(),阐明期待队列的第一个无效节点中的线程与以后线程雷同,那么以后线程是能够获取资源的;4. 如果 s.thread != Thread.currentThread(),阐明期待队列的第一个无效节点线程与以后线程不同,以后线程必须退出进期待队列。

进入队列 addWaiter()

当尝试获取锁胜利,则间接完结并返回;
当尝试获取锁失败,则进入下一步: 线程退出期待队列addWaiter()

private Node addWaiter(Node mode) {
        // 用以后线程和传入的锁模式新建一个 Node
        Node node = new Node(Thread.currentThread(), mode);
        // 先获取到以后的尾结点,将尾结点指向到 Node pred
        Node pred = tail;
        // 如果获取到的尾结点不为空
        if (pred != null) {// 将新建节点的前置节点设置为下面获取到的 tail(tail 节点的后间接点此时为 null)
            node.prev = pred;
            // 利用 CAS 批改尾结点的值(将新建的 node 节点设置为 tail 尾结点)if (compareAndSetTail(pred, node)) {
                // 如果设置胜利了,将之前获取到的 pred(批改之前的尾结点)的后置节点批改为以后新建的 node 节点
                pred.next = node;
                // 返回批改胜利的新的节点(此时此新建的这个 node 节点为新的 tail 节点)return node;
            }
        }
        // 如果获取到的 tail 节点为 null,(阐明期待队列中没有元素),// 或者以后 Pred 指针和 Tail 指向的地位不同(阐明被别的线程曾经批改),//enq()中解决了以上两种状况
        enq(node);
        return node;
    }

如果没有被初始化,须要进行初始化一个头结点进去。但请留神,初始化的头结点并不是以后线程节点,而是调用了无参构造函数的节点。如果经验了初始化或者并发导致队列中有元素,则与之前的办法雷同。其实,addWaiter 就是一个在双端链表增加尾节点的操作,须要留神的是,双端链表的头结点是一个无参构造函数的头结点。

private Node enq(final Node node) {for (;;) {
            // 获取以后的尾结点指向到 t
            Node t = tail;
            // 如果此时尾结点为空
            if (t == null) { // Must initialize  必须要初始化一个头结点
                // 利用 CAS 初始化一个头部(调用无参结构,头肩点外部都为 null)if (compareAndSetHead(new Node()))
                    // 将尾结点设置为头节点(此时就一个节拍板结点和尾结点是一个)tail = head;
            } else {
                // 如果尾结点不为 null, 将新建的节点 node 的前置节点设置为 t
                node.prev = t;
                // 通过 CAS 批改尾结点,将尾结点指向为新的 node,
                // 这个办法次要是对 tailOffset 和 Expect 进行比拟,如果 tailOffset 的 Node 和 Expect 的 Node 地址是雷同的,// 那么设置 Tail 的值为 Update 的值。if (compareAndSetTail(t, node)) {// 批改胜利,将之前的尾结点(当初的倒数第二个节点)的后置节点设置为新的尾结点(node)
                    // 返回倒数第二个节点
                    t.next = node;
                    return t;
                }
            }
        }
    }

何时出列 acquireQueued()

通过下面的 addWaiter()办法咱们能够晓得,通过将一个新建的 Node 节点放入到队列里等待获取锁的资格,那么进入队列之后,怎么能力获取到锁呢?接下来咱们来看看线程怎么获取锁。
总的来说,一个线程获取锁失败了,被放入期待队列,acquireQueued 会把放入队列中的线程一直去获取锁,直到获取胜利或者不再须要获取(中断)。

final boolean acquireQueued(final Node node, int arg) {
        // 标记是否胜利拿到资源
        boolean failed = true;
        try {
            // 标记是否被中断过
            boolean interrupted = false;
            // 开始自旋
            for (;;) {
                // 获取以后节点的前置节点
                final Node p = node.predecessor();
                // 如果前置节点是头节点,阐明以后节点是第一个无效节点(头节点是虚节点),开始尝试获取锁
                if (p == head && tryAcquire(arg)) {
                    // 获取到锁,头指针挪动到以后 node
                    //setHead 办法是把以后节点置为虚节点,但并没有批改 waitStatus,因为它是始终须要用的数据。setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                
                // 阐明是 p 不为头结点 或者 p 为头节点且以后没有获取到锁(可能是非偏心锁被抢占了),// 这个时候就要判断以后 node 是否要被阻塞(被阻塞条件:前驱节点的 waitStatus 为 -1),避免有限循环浪费资源。if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {if (failed)
                cancelAcquire(node);
        }
    }
/** 判断以后 node 是否要被阻塞 **/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取前置节点的阻塞状态
        int ws = pred.waitStatus;
        // 如果 Node.SIGNAL -1 状态,阐明前置节点的状态为唤醒状态
        if (ws == Node.SIGNAL)
            // 能够间接 park(间接阻塞)return true;
           //waitStatus>0 是勾销状态 如果前置节点是勾销状态
        if (ws > 0) {
            // 循环向前查找勾销节点,把勾销节点从队列中剔除
            do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
            pred.next = node;
        } else {
             // 设置前置节点期待状态为 SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

CANCELLED 状态节点生成 cancelAcquire()

private void cancelAcquire(Node node) {
  // 将有效节点过滤
    if (node == null)
        return;
  // 设置该节点不关联任何线程,也就是虚节点
    node.thread = null;
    Node pred = node.prev;
  // 通过前驱节点,跳过勾销状态的 node
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
  // 获取过滤后的前驱节点的后继节点
    Node predNext = pred.next;
  // 把以后 node 的状态设置为 CANCELLED
    node.waitStatus = Node.CANCELLED;
  // 如果以后节点是尾节点,将从后往前的第一个非勾销状态的节点设置为尾节点
  // 更新失败的话,则进入 else,如果更新胜利,将 tail 的后继节点设置为 null
    if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
    // 如果以后节点不是 head 的后继节点,1: 判断以后节点前驱节点的是否为 SIGNAL,2: 如果不是,则把前驱节点设置为 SINGAL 看是否胜利
    // 如果 1 和 2 中有一个为 true,再判断以后节点的线程是否为 null
    // 如果上述条件都满足,把以后节点的前驱节点的后继指针指向以后节点的后继节点
        if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
      // 如果以后节点是 head 的后继节点,或者上述条件不满足,那就唤醒以后节点的后继节点
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

以上从 ReentrantLock 的获取锁,上锁来对 AQS 的获取锁过程做了介绍,后续会持续将 AQS 开释锁的原理做介绍,未完待续 …

参考

  • 从 ReentrantLock 的实现看 AQS 的原理及利用
  • Java 并发包基石 -AQS 详解

正文完
 0