前言

Java并发包(JUC:java.util.concurrent)中提供了很多并发工具,这其中,很多咱们耳熟能详的并发工具,ReentrantLock、Semaphore,它们的实现都用到了一个独特的基类--AbstractQueuedSynchronizer,简称AQS。AQS是一个用来构建锁和同步器的框架,应用AQS能简略且高效地结构出利用宽泛的大量的同步器,比方咱们提到的ReentrantLock,Semaphore,其余的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。当然,咱们本人也能利用AQS十分轻松容易地结构出合乎咱们本人需要的同步器。

AQS负责管理同步器类中的状态,他治理了一个整数状态信息,能够通过getState、setState、compareAndSetState等类型办法来进行操作。ReentrantLock示意所有线程曾经反复获取所的次数,Semaphore示意残余许可数量。

ReentrantLock

1.1 ReentrantLock概览

ReentrantLock是一个可重入的互斥锁,又被称为“独占锁”。ReentrantLock 类实现了 Lock ,它领有与 synchronized 雷同的并发性和内存语义,然而增加了相似锁投票、定时锁等待和可中断锁等待的一些个性。此外,它还提供了在强烈争用状况下更佳的性能。(换句话说,当许多线程都想访问共享资源时,JVM 能够花更少的时候来调度线程,把更多工夫用在执行线程上。)
ReentrantLock锁在同一个工夫点只能被一个线程锁持有。

可重入:ReentrantLock锁,能够被单个线程屡次获取。

ReentrantLock分为“偏心锁”和“非偏心锁”。它们的区别体现在获取锁的机制上是否偏心。ReentrantLock在同一个工夫点只能被一个线程获取(当某线程获取到“锁”时,其它线程就必须期待);ReentraantLock是通过一个FIFO(先进先出)的期待队列来治理获取该锁所有线程的。在“偏心锁”的机制下,线程顺次排队获取锁;而“非偏心锁”在锁是可获取状态时,不论本人是不是在队列的结尾都会尝试获取锁而不须要马上排队(获取不到锁再排队)。

通常咱们会将ReentrantLock和synchronized做比拟,两者孰优孰劣,其实在java官网对synchronized进行了诸多优化之后(偏差锁、轻量锁、自适应自旋、锁打消、锁粗化...),两者的性能差距并不大,只是在某些方面存在差别而已,在此用一张表格做一个简略的比照:

// **************************Synchronized的应用形式**************************// 1.用于代码块synchronized (this) {}// 2.用于对象synchronized (object) {}// 3.用于办法public synchronized void test () {}// 4.可重入for (int i = 0; i < 100; i++) {    synchronized (this) {}}// **************************ReentrantLock的应用形式**************************public void test () throw Exception {    // 1.初始化抉择偏心锁、非偏心锁    ReentrantLock lock = new ReentrantLock(true);    // 2.可用于代码块    lock.lock();    try {        try {            // 3.反对多种加锁形式,比拟灵便; 具备可重入个性            if(lock.tryLock(100, TimeUnit.MILLISECONDS)){ }        } finally {            // 4.手动开释锁            lock.unlock()        }    } finally {        lock.unlock();    }}

1.2 ReentrantLock与AQS

下面咱们对ReentrantLock有了一个大略的理解,接下来能够来看看ReentrantLock和AQS之间到底是什么关系呢,其实ReentrantLock的底层就是应用的AQS实现的,咱们一起来看看源码:

/**ReentrantLock 实现了Lock类 其中有一个字段,Sync **/public class ReentrantLock implements Lock, java.io.Serializable {    private static final long serialVersionUID = 7373984872572414699L;        private final Sync sync;    /**     * Sync就是继承了AQS的抽象类,此锁的同步控制根底。     * 上面将Sync细分为偏心锁和非偏心锁。     * 应用AQS的state来示意锁的重入次数。     */    abstract static class Sync extends AbstractQueuedSynchronizer {        private static final long serialVersionUID = -5179523762034025860L;        /**         * 形象的lock办法,别离给偏心锁和非偏心锁具体实现。         */        abstract void lock();        ...    }    /**     * 非偏心锁     */    static final class NonfairSync extends Sync {        private static final long serialVersionUID = 7316153563782823691L;        ...    }    /**     * 偏心锁     */    static final class FairSync extends Sync {        private static final long serialVersionUID = -3000897897090466540L;                final void lock() {            acquire(1);        }        ...    }    /**     * ReentrantLock的默认构造方法,默认创立非偏心锁     */    public ReentrantLock() {        sync = new NonfairSync();    }    /**     * 通过传入boolean参数来决定创立偏心锁还是非偏心锁。     */    public ReentrantLock(boolean fair) {        sync = fair ? new FairSync() : new NonfairSync();    }

ReentrantLock中的偏心锁和非偏心锁实现形式差异不大,差异在于偏心锁判断是否间接进入队列,先看看非偏心锁的加锁流程源码:

static final class NonfairSync extends Sync {        private static final long serialVersionUID = 7316153563782823691L;        /**加锁办法**/        final void lock() {            //若通过CAS设置变量State(同步状态)胜利,也就是获取锁胜利,则将以后线程设置为独占线程。            if (compareAndSetState(0, 1))                setExclusiveOwnerThread(Thread.currentThread());            else            //若通过CAS设置变量State(同步状态)失败,也就是获取锁失败,则进入Acquire办法进行后续解决。                acquire(1);        }        protected final boolean tryAcquire(int acquires) {            return nonfairTryAcquire(acquires);        }    }

再看下偏心锁源码中获锁的形式:

static final class FairSync extends Sync {        private static final long serialVersionUID = -3000897897090466540L;        final void lock() {            acquire(1);        }        ...    }

通过查看偏心锁和非偏心锁获取锁的源码得悉,都会通过lock()办法区上锁,然而lock()办法到底是怎么上锁的呢?带着这样的疑难,咱们持续跟踪一下会发现,这个办法属于FairSync和NonfairSync的父类AQS(AbstractQueuedSynchronizer)的外围办法。
在理解AQS之前,咱们带着几个疑难:

  • 下面提到的通过CAS尝试扭转state的状态来示意获取锁胜利或者失败,如果获取失败了,调用acquire()办法,是怎么实现的呢?
  • 获取不到锁,这个线程是在干嘛呢?一直尝试锁?还是挂起期待唤醒?

上面咱们会对AQS以及ReentrantLock和AQS的关联做具体介绍。

AQS

理解AQS之前,先看看AQS的整体框架:

  • 上图中有色彩的为Method,无色彩的为Attribution。
  • AQS框架共分为五层,自上而下由浅入深,从AQS对外裸露的API到底层根底数据。
  • 当有自定义同步器接入时,只需重写第一层所须要的局部办法即可,不须要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先通过第一层的API进入AQS外部办法,而后通过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的期待队列解决,而这些解决形式均依赖于第五层的根底数据提供层。

AQS概述

AQS保护一个用Volatile的int类型state的共享资源,通过内置的FIFO来实现获取资源线程的排队工作。(这个内置的同步队列称为"CLH"队列)。该队列由一个一个的Node结点组成,每个Node结点保护一个prev援用和next援用,别离指向本人的前驱和后继结点。AQS保护两个指针,别离指向队列头部head和尾部tail。

其实就是个变体双端双向链表
当线程获取资源失败(比方tryAcquire时试图设置state状态失败),会被结构成一个结点退出CLH队列中,同时以后线程会被阻塞在队列中(通过LockSupport.park实现,其实是期待态)。当持有同步状态的线程开释同步状态时,会唤醒后继结点,而后此结点线程持续退出到对同步状态的抢夺中。

Node节点

static final class Node {        /** waitStatus值,示意线程已被勾销(期待超时或者被中断)*/        static final int CANCELLED =  1;        /** waitStatus值,示意后继线程须要被唤醒(unpaking)*/        static final int SIGNAL    = -1;        /**waitStatus值,示意结点线程期待在condition上,当被signal后,会从期待队列转移到同步到队列中 */        static final int CONDITION = -2;       /** waitStatus值,示意下一次共享式同步状态会被无条件地流传上来        static final int PROPAGATE = -3;        /** 期待状态,初始为0 */        volatile int waitStatus;        /**以后结点的前一个结点 */        volatile Node prev;        /** 以后结点的后一个结点 */        volatile Node next;        /** 与以后结点关联的排队中的线程 */        volatile Thread thread;        /** ...... */    }

同步状态State

/**     * The synchronization state.     */    private volatile int state;        protected final int getState() {        return state;    }        protected final void setState(int newState) {        state = newState;    }        protected final boolean compareAndSetState(int expect, int update) {        // See below for intrinsics setup to support this        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);    }

通过AQS中state的源码能够看出,对于state的办法都是final润饰的,阐明子类中无奈重写它们。咱们能够通过批改State字段示意的同步状态来实现多线程的独占模式和共享模式(加锁过程)
一般来说,自定义同步器要么是独占形式,要么是共享形式,它们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。AQS也反对自定义同步器同时实现独占和共享两种形式,如ReentrantReadWriteLock。ReentrantLock是独占锁,所以实现了tryAcquire-tryRelease。

独占模式(通过ReentrantLock了解AQS)

获取同步状态--acquire()

public final void acquire(int arg) {        if (!tryAcquire(arg) &&            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            selfInterrupt();    }
  • tryAcquire:首先,调用tryAcquire办法,若返回true,意味着获取同步状态胜利,前面的逻辑不再执行;若返回false,也就是获取同步状态失败,进入acquireQueued步骤;
  • acquireQueued:此时,获取同步状态失败,结构独占式同步结点,通过addWatiter将此结点增加到同步队列的尾部(此时可能会有多个线程结点试图退出同步队列尾部,须要以线程平安的形式增加);
  • selfInterrupt:该结点以在队列中尝试获取同步状态,若获取不到,则阻塞结点线程,直到被前驱结点唤醒或者被中断。

以非偏心锁为例,能够看到获取锁的大体流程

final boolean nonfairTryAcquire(int acquires) {            //获取以后线程            final Thread current = Thread.currentThread();            //获取state的值            int c = getState();            //如果state为0,阐明以后没有其余线程占用共享资源,能够尝试获取锁            if (c == 0) {                //用CAS批改state的值                if (compareAndSetState(0, acquires)) {                    //批改胜利,获取锁胜利,解决以后线程                    setExclusiveOwnerThread(current);                    //返回获取锁胜利                    return true;                }            }            //如果state不为0,判断占用锁的线程是否是以后线程            else if (current == getExclusiveOwnerThread()) {                //如果是以后线程,对state做增量递增                int nextc = c + acquires;                if (nextc < 0) // overflow                    throw new Error("Maximum lock count exceeded");                    //设置state                setState(nextc);                //返回获取锁胜利(可重入的原理)                return true;            }            return false;        }
  • 获取以后线程;
  • 获取state的值;
  • 如果state为0,阐明以后没有其余线程占用共享资源,能够尝试获取锁;
  • 用CAS批改state的值;批改胜利,获取锁胜利,解决以后线程;
  • 返回获取锁胜利;
  • 如果state不为0,判断占用锁的线程是否是以后线程;
  • 如果是以后线程,对state做增量递增;
  • 返回获取锁胜利(可重入的原理)。

值的留神得中央是,偏心锁中多了一部判断,再来看看偏心锁获取锁的大略流程:

protected final boolean tryAcquire(int acquires) {            //获取以后线程;            final Thread current = Thread.currentThread();            //获取state的值;            int c = getState();            //如果state为0,阐明以后没有其余线程占用共享资源,能够尝试获取锁            if (c == 0) {                //判断以后期待队列中是否存在无效节点的办法,                //如果返回False,阐明以后线程能够争取共享资源;                //如果返回True,阐明队列中存在无效节点,以后线程必须退出到期待队列中。                //用CAS批改state的值;批改胜利,获取锁胜利,解决以后线程;                if (!hasQueuedPredecessors() &&                    compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            }            //如果state不为0,判断占用锁的线程是否是以后线程;            else if (current == getExclusiveOwnerThread()) {                //如果是以后线程,对state做增量递增;                int nextc = c + acquires;                if (nextc < 0)                    throw new Error("Maximum lock count exceeded");                    //返回获取锁胜利(可重入的原理)。                setState(nextc);                return true;            }            return false;        }
/**判断以后期待队列中是否存在无效节点的办法**/public final boolean hasQueuedPredecessors() {        // The correctness of this depends on head being initialized 其正确性取决于是否初始化头结点        // before tail and on head.next being accurate if the current 如果以后节点在头结点和尾结点之间        // thread is first in queue.  线程在队列中第一个        // 获取以后尾结点        Node t = tail; // Read fields in reverse initialization order        // 获取以后尾结点        Node h = head;        Node s;        return h != t &&            ((s = h.next) == null || s.thread != Thread.currentThread());    }
双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是在第二个节点开始的。当h != t时:     1. 如果(s = h.next)==null,期待队列正在有线程进行初始化,但只是进行到了Tail指向Head,没有将Head指向Tail,       此时队列中有元素,须要返回True(这块具体见下边addWaiter()曾经enq()代码剖析)。    2. 如果(s = h.next) != null,阐明此时队列中至多有一个无效节点,    3. 如果此时s.thread ==Thread.currentThread(),阐明期待队列的第一个无效节点中的线程与以后线程雷同,       那么以后线程是能够获取资源的;    4. 如果s.thread != Thread.currentThread(),阐明期待队列的第一个无效节点线程与以后线程不同,以后线程必须退出进期待队列。

进入队列 addWaiter()

当尝试获取锁胜利,则间接完结并返回;
当尝试获取锁失败,则进入下一步:线程退出期待队列addWaiter()

private Node addWaiter(Node mode) {        //用以后线程和传入的锁模式新建一个Node        Node node = new Node(Thread.currentThread(), mode);        //先获取到以后的尾结点,将尾结点指向到Node pred        Node pred = tail;        //如果获取到的尾结点不为空        if (pred != null) {            //将新建节点的前置节点设置为下面获取到的tail(tail节点的后间接点此时为null)            node.prev = pred;            //利用CAS批改尾结点的值(将新建的node节点设置为tail尾结点)            if (compareAndSetTail(pred, node)) {                //如果设置胜利了,将之前获取到的pred(批改之前的尾结点)的后置节点批改为以后新建的node节点                pred.next = node;                //返回批改胜利的新的节点(此时此新建的这个node节点为新的tail节点)                return node;            }        }        //如果获取到的tail节点为null,(阐明期待队列中没有元素),        //或者以后Pred指针和Tail指向的地位不同(阐明被别的线程曾经批改),        //enq()中解决了以上两种状况        enq(node);        return node;    }

如果没有被初始化,须要进行初始化一个头结点进去。但请留神,初始化的头结点并不是以后线程节点,而是调用了无参构造函数的节点。如果经验了初始化或者并发导致队列中有元素,则与之前的办法雷同。其实,addWaiter就是一个在双端链表增加尾节点的操作,须要留神的是,双端链表的头结点是一个无参构造函数的头结点。

private Node enq(final Node node) {        for (;;) {            //获取以后的尾结点指向到t            Node t = tail;            //如果此时尾结点为空            if (t == null) { // Must initialize  必须要初始化一个头结点                //利用CAS初始化一个头部(调用无参结构,头肩点外部都为null)                if (compareAndSetHead(new Node()))                    //将尾结点设置为头节点(此时就一个节拍板结点和尾结点是一个)                    tail = head;            } else {                //如果尾结点不为null,将新建的节点node的前置节点设置为t                node.prev = t;                //通过CAS批改尾结点,将尾结点指向为新的node,                //这个办法次要是对tailOffset和Expect进行比拟,如果tailOffset的Node和Expect的Node地址是雷同的,                //那么设置Tail的值为Update的值。                if (compareAndSetTail(t, node)) {                    //批改胜利,将之前的尾结点(当初的倒数第二个节点)的后置节点设置为新的尾结点(node)                    //返回倒数第二个节点                    t.next = node;                    return t;                }            }        }    }

何时出列 acquireQueued()

通过下面的addWaiter()办法咱们能够晓得,通过将一个新建的Node节点放入到队列里等待获取锁的资格,那么进入队列之后,怎么能力获取到锁呢?接下来咱们来看看线程怎么获取锁。
总的来说,一个线程获取锁失败了,被放入期待队列,acquireQueued会把放入队列中的线程一直去获取锁,直到获取胜利或者不再须要获取(中断)。

final boolean acquireQueued(final Node node, int arg) {        // 标记是否胜利拿到资源        boolean failed = true;        try {            //标记是否被中断过            boolean interrupted = false;            //开始自旋            for (;;) {                //获取以后节点的前置节点                final Node p = node.predecessor();                //如果前置节点是头节点,阐明以后节点是第一个无效节点(头节点是虚节点),开始尝试获取锁                if (p == head && tryAcquire(arg)) {                    //获取到锁,头指针挪动到以后node                    //setHead办法是把以后节点置为虚节点,但并没有批改waitStatus,因为它是始终须要用的数据。                    setHead(node);                    p.next = null; // help GC                    failed = false;                    return interrupted;                }                                // 阐明是p不为头结点 或者 p为头节点且以后没有获取到锁(可能是非偏心锁被抢占了),                //这个时候就要判断以后node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),避免有限循环浪费资源。                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    interrupted = true;            }        } finally {            if (failed)                cancelAcquire(node);        }    }
/**判断以后node是否要被阻塞**/private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {        //获取前置节点的阻塞状态        int ws = pred.waitStatus;        //如果 Node.SIGNAL -1 状态,阐明前置节点的状态为唤醒状态        if (ws == Node.SIGNAL)            //能够间接park(间接阻塞)            return true;           //waitStatus>0是勾销状态 如果前置节点是勾销状态        if (ws > 0) {            // 循环向前查找勾销节点,把勾销节点从队列中剔除            do {                node.prev = pred = pred.prev;            } while (pred.waitStatus > 0);            pred.next = node;        } else {             // 设置前置节点期待状态为SIGNAL            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);        }        return false;    }

CANCELLED状态节点生成 cancelAcquire()

private void cancelAcquire(Node node) {  // 将有效节点过滤    if (node == null)        return;  // 设置该节点不关联任何线程,也就是虚节点    node.thread = null;    Node pred = node.prev;  // 通过前驱节点,跳过勾销状态的node    while (pred.waitStatus > 0)        node.prev = pred = pred.prev;  // 获取过滤后的前驱节点的后继节点    Node predNext = pred.next;  // 把以后node的状态设置为CANCELLED    node.waitStatus = Node.CANCELLED;  // 如果以后节点是尾节点,将从后往前的第一个非勾销状态的节点设置为尾节点  // 更新失败的话,则进入else,如果更新胜利,将tail的后继节点设置为null    if (node == tail && compareAndSetTail(node, pred)) {        compareAndSetNext(pred, predNext, null);    } else {        int ws;    // 如果以后节点不是head的后继节点,1:判断以后节点前驱节点的是否为SIGNAL,2:如果不是,则把前驱节点设置为SINGAL看是否胜利    // 如果1和2中有一个为true,再判断以后节点的线程是否为null    // 如果上述条件都满足,把以后节点的前驱节点的后继指针指向以后节点的后继节点        if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {            Node next = node.next;            if (next != null && next.waitStatus <= 0)                compareAndSetNext(pred, predNext, next);        } else {      // 如果以后节点是head的后继节点,或者上述条件不满足,那就唤醒以后节点的后继节点            unparkSuccessor(node);        }        node.next = node; // help GC    }}

以上从ReentrantLock的获取锁,上锁来对AQS的获取锁过程做了介绍,后续会持续将AQS开释锁的原理做介绍,未完待续...

参考

  • 从ReentrantLock的实现看AQS的原理及利用
  • Java并发包基石-AQS详解