前言

  • 上篇文章咱们证实了synchronized关键字的个性:无锁、偏差锁、轻量锁、重(chong)偏差、重(chong)轻量、分量锁。能够说synchronized是jvm层面实现同步的形式。在jdk中,存在一个叫java.util.concurrent的包,简称JUC,它是一个jdk层面的并发包,外面存在了大量与并发编程相干的api,其中最代表意义的就是atomic和lock两种类别,前者是基于乐观锁CAS(Compare And Swap)的实现,后者是基于AQS(Abstract Queued Synchronizer)实现。本文将具体解说下AQS原理以及依据两个案例来解读ReentrantLock源码。
  • 两个案例:

    1.线程A独自加锁
    2.线程A正在持有锁的过程中,线程t1来加锁

一、AQS原理

  • AQS简称Abstract Queued Synchronizer,它的外围是基于一个双向链表组成的队列(CLH队列) + volatile关键字润饰的int类型变量实现的。(对于volatile关键字能够参考其余博主的一些总结: 传送门),大抵外围能够以如下图来出现:

    简略总结就是:外部应用双向链表保护了一个队列,其中Node数据结构为此队列的基石,外部保护了prev(指向上一个节点)、next(指向下一个节点)、waitStatus(以后node的状态)、thread(以后保护的线程)四个重要的属性。其中waitStatus别离有如下取值:
Node中waitStatus具体取值含意
CANCELLED(1)中断或勾销,此状态下的节点会从队列中移除
SIGNAL(-1)此状态下的节点肯定是在队列排队中
CONDITION(-2)条件阻塞,比如说外部因Condition而阻塞的节点
PROPAGATE(-3)示意下一个acquireShared应该无条件流传
0默认状态

除此之外,队列中还保护了三个属性,head(指向队列中的头节点)、state(锁的状态)、tail(指向队列中的尾节点)。其中,state的取值有两种状况,将以如下表展现进去:

AQS中state具体取值含意
0示意以后锁没有被线程持有
1示意以后锁正在被线程持有
大于1示意以后锁被线程重入了(重入锁),这里要留神:ReentrantLock重入了几次,就要开释几次锁

二、案例1:线程A独自加锁

  • 代码如下:

    public class SimpleThreadLock {    static ReentrantLock lock = new ReentrantLock(true);    public static void main(String[] args) throws InterruptedException {        Thread a = new Thread(() -> {            try {                lock.lock();                System.out.println("Get lock");            } catch (Exception e) {                e.printStackTrace();            } finally {                lock.unlock();            }        }, "线程a");        a.start();        a.join();        System.out.println("end");    }}

    代码也比较简单,就是在主线程中创立了一个线程,并且外部去应用ReentrantLock加锁,获取到锁后就打印出Get lock这句话,当t1线程执行完后再继续执行主线程的逻辑。这里就不一步步演示断点了,间接上源码。

  • 这里先阐明下ReentrantLock重载的两个构造方法

    // 默认非偏心锁public ReentrantLock() {    sync = new NonfairSync();}// 若传入true则是偏心锁public ReentrantLock(boolean fair) {    sync = fair ? new FairSync() : new NonfairSync();}

    因为咱们传入了true进去,所以此时,它是一把偏心锁。

  • lock.lock()办法,因为咱们指定了应用偏心锁,所以最终会进入ReentrantLock外部保护的FairSync类的lock办法

    // FairSync类下的lock办法final void lock() {    acquire(1);}

    于是,咱们须要找到acquire办法,此办法为AQS(父类AbstractQueuedSynchronizer)的办法,所以最终会进入如下这么一段代码:

    public final void acquire(int arg) {    if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();}

    这段代码,看似很精简,然而它做的事真的太多了。稀释的才是精髓呀!好了,咱们不偏题,持续依照咱们的主题:线程A独自加锁。不过要持续往下看,还是要加深下acquire办法的含意,咱们必须要tryAcquire办法返回false,能力持续走if条件中前面的逻辑,以及if条件外部的逻辑。于是,咱们间接看tryAcquire办法源码:

  • tryAcquire办法

    protected boolean tryAcquire(int arg) {    throw new UnsupportedOperationException();}

    tryAcquire办法是一个protected办法,外部间接抛出了一个异样,还记得咱们是从哪个类掉用到父类AbstractQueuedSynchronizer的acquire办法的?没错,就是FairSync类。那么咱们就间接定位到FairSync类的tryAcquire办法呗。

    protected final boolean tryAcquire(int acquires) {    // 拿到以后线程,也就是线程A    final Thread current = Thread.currentThread();    // 拿到以后aqs的state变量,咱们没有批改过它,    // 默认为0    int c = getState();    if (c == 0) {        // 进入此逻辑,此逻辑跟acquire办法有点相似        // 必须要hasQueuedPredecessors()办法返回false        // 能力持续往下执行,于是咱们把hasQueuedPredecessors的源码也贴出来        if (!hasQueuedPredecessors() &&            compareAndSetState(0, acquires)) {            setExclusiveOwnerThread(current);            return true;        }    }    else if (current == getExclusiveOwnerThread()) {        int nextc = c + acquires;        if (nextc < 0)            throw new Error("Maximum lock count exceeded");        setState(nextc);        return true;    }    return false;}
  • hasQueuedPredecessors办法源码

    public final boolean hasQueuedPredecessors() {    // 拿到aqs中的tail    Node t = tail;     // 拿到aqs中的head    Node h = head;    Node s;    return h != t &&        ((s = h.next) == null || s.thread != Thread.currentThread());}

    此办法涵盖的情景比拟多,然而就以后情景而言,它很容易了解,在以后情景中,咱们压根没操作过tail和head那么h 必定等于 t,所以此办法返回false,返回false后,咱们回到FairSync类的tryAcquire办法,

    protected final boolean tryAcquire(int acquires) {    // .... 上半局部代码省略    if (c == 0) {         // 在以后情景下,hasQueuedPredecessors返回的是false         // 也就是说会持续走if前面的逻辑,         // if前面的逻辑就是执行CAS操作,         // 将state属性从0设置成1         // 因为此时只有一个线程在执行,         // 这个cas操作肯定是胜利的         // cas胜利后就会执行setExclusiveOwnerThread代码,这段代码很有用         // 它是一个赋值的操作,也就是记录         // 以后领有锁的线程        if (!hasQueuedPredecessors() &&            compareAndSetState(0, acquires)) {            setExclusiveOwnerThread(current);            return true;        }    }    // .... 下半局部else if逻辑也省略了    return false;}

    通过上述代码中的正文,咱们能够发现,线程A加锁胜利后会返回true,至此,tryAcquire的返回值为true。还记的咱们是从哪个办法进来的吗?是的,是从父类AbstractQueuedSynchronizer的acquire办法进来的,下面总结到了,只有当tryAcquire返回false,才会持续往下执行。至此,线程A独自加锁的案例就完结了。通过这么一个单线程加锁的案例,如果你认为AQS很简略的话,那就大错特错了,单线程加锁的案例中,咱们仅应用到了AQS中的state变量,CLH队列却始终没有波及到,而且从加锁到加锁完结的整个过程,咱们连一个Node类型的数据结构都没有看到过。那Node类型的数据结构什么时候会被用到呢?咱们来看下一个案例线程A正在持有锁的过程中,线程t1来加锁

三、案例2:线程A正在持有锁的过程中,线程t1来加锁

  • 同样的,咱们革新下代码:

    public class TwoThreadLock {    static ReentrantLock lock = new ReentrantLock(true);    public static void main(String[] args) throws InterruptedException {        new Thread(() -> {            try {                lock.lock();                System.out.println("Thread a get lock");                TimeUnit.SECONDS.sleep(60);            } catch (Exception e) {                e.printStackTrace();            } finally {                lock.unlock();            }        }, "线程a").start();        Thread t1 = new Thread(() -> {            try {                lock.lock();                System.out.println("Thread t1 get lock");            } catch (Exception e) {                e.printStackTrace();            } finally {                lock.unlock();            }        }, "线程t1");        t1.start();        t1.join();        System.out.println("end");    }}

    上段代码,毫无疑问,线程t1在调用lock.lock()办法时,就阻塞到那里了,要等线程a睡60s后才会继续执行,那么这外面到底做了哪些事呢?咱们来一起钻研下。

  • 同案例1,应用的是偏心锁,最终必定会调用到tryAcquire办法去,咱们这次就一次性的把tryAcquire办法给讲清楚

    protected final boolean tryAcquire(int acquires) {    // 拿到以后线程,也就是线程t1    final Thread current = Thread.currentThread();    // 拿到以后aqs的state变量,此时的c是多少呢?    // 没错,是1,因为锁曾经被线程A占有了,此时的    // state为1。于是它会走else if逻辑    int c = getState();    if (c == 0) {        // 进入此逻辑,此逻辑跟acquire办法有点相似        // 必须要hasQueuedPredecessors()办法返回false        // 能力持续往下执行,于是咱们把hasQueuedPredecessors的源码也贴出来        if (!hasQueuedPredecessors() &&            compareAndSetState(0, acquires)) {            setExclusiveOwnerThread(current);            return true;        }    }    // 走了else if逻辑,它也发现以后持有锁的线程不是本人呀,于是间接return false    // 这里顺带解释下这个else if的逻辑,这个else if    // 就是判断以后调用lock办法的线程是不是和以后持有    // 锁的线程一样,如果是一样的,则将state + 1并赋值给nextc属性    // 这就示意了ReentrantLock反对重入性    // 那么什么时候会呈现nextc属性小于0的状况呢?    // nextc是一个int类型,当超过了它的存储返回后    // 会呈现小于0的状况 ===> 也就是说ReentrantLock    // 的重入次数最大为反对int类型最大值    else if (current == getExclusiveOwnerThread()) {        int nextc = c + acquires;        if (nextc < 0)            throw new Error("Maximum lock count exceeded");        setState(nextc);        return true;    }    return false;}

    通过上述代码块中的正文可知,线程t1的加锁流程并没有这么顺利,在tryAcquire办法中返回了false,那这代表了什么呢?是的,它代表着线程t1能够持续走acquire前面的逻辑了,咱们持续把acquire办法贴出来:

    public final void acquire(int arg) {    // 在案例2的状况下,tryAcquire办法返回了false    // 于是会执行前面的acquireQueued(addWaiter(Node.EXCLUSIVE), arg)    // 当acquireQueued(addWaiter(Node.EXCLUSIVE), arg)返回了true才会执行外部的selfInterrupt()办法    if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();}

    于是,咱们先理解下addWaiter(Node.EXCLUSIVE)办法,它的源码如下:

    private Node addWaiter(Node mode) {    // 此时的mode是由上述代码块传入的,    // 它的值为Node.EXCLUSIVE ===> 这是一个空节点,    // 值为null,    // 创立了一个node节点, 外部保护了以后线程(线程t1),并且它的next节点为null(有Node的构造方法可知)    Node node = new Node(Thread.currentThread(), mode);    // 拿到aqs队列中的tail属性,    // 此时必定为null啊(aqs队列都没初始化,哪来的队尾节点)    Node pred = tail;    if (pred != null) {        node.prev = pred;        if (compareAndSetTail(pred, node)) {            pred.next = node;            return node;        }    }        // 此时pred为null,即不会走下面的if逻辑,于是执行enq办法,记住:此时传入enq办法时的形参为新new进去的Node    // 外部保护的是以后线程(线程t1)    enq(node);    return node;}

    下面代码块的正文也说了,最终会执行到enq办法,enq方干啥的呢?猜一下?是的,它就是初始aqs队列的。咱们来看一下它的源码:

    /** 形参node外部保护的线程为t2, 并且它的next属性指向为null */private Node enq(final Node node) {    // 此处写了一个死循环,也就是常说的自旋锁    for (;;) {        // 自旋的过程中        // 第一次自旋:        //  拿到队尾元素, 此时队列都没有,必定为null        //  发现队列中的tail指向的是null,于是初始化tail节点,并让aqs中的head指向了tail,        //  至此,aqs繁难版本的队列就进去啦,        //  head和tail指向同一个node,并且此node外部        //  保护的thread、prev、next、waitStatus全是默认值        // 因为是if else逻辑,所以初始化tail属性后,就会进行第二次自旋        // 第二次自旋:        //  再次拿到tail, 因为第一次自旋把tail给初始化了,所以此时拿到的tail不为null, 于是走了else逻辑        //  在else中,次要操作的是形参node, 还记得形参node是什么吗? ==> 保护以后线程(线程t1)的node节点,        //  此时会将node的上一个节点指向t节点        //  同时进行cas操作,将node节点变成tail        //  当cas胜利后,再设置t的next指向node        //  最终返回这个t.        //  此时此刻这个t是什么样的数据结构呢?        //  此时的这个t就是队列中的head节点了,        //  并且它的next为node(保护线程t1)        //  所以此时此刻队列中当初有两个元素了        Node t = tail;        if (t == null) { // Must initialize            if (compareAndSetHead(new Node()))                tail = head;        } else {            node.prev = t;            if (compareAndSetTail(t, node)) {                t.next = node;                return t;            }        }    }}

    代码中的正文形容了enq的过程,我专门画了一个图来形容aqs队列产生的过程,帮忙了解:

    enq初始化aqs队列的过程后,就执行到了addWaiter办法的进口了

    private Node addWaiter(Node mode) {    // ....上述代码省略    // enq初始化队列后,会将node进行返回    // 这个node就是保护线程t1的node,它曾经是    // 队列中的队列了    enq(node);    return node;}

    addWaiter办法执行完了之后,将继续执行acquire办法

    public final void acquire(int arg) {    if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();}

    此时应该接着执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)了,因为addWaiter办法曾经执行实现,返回的是领有以后线程的node,同时它也是以后队列中的队尾。咱们来查看下acquireQueued的源码:

    /** node形参为保护以后线程(t1)的节点, 同时arg为1 */final boolean acquireQueued(final Node node, int arg) {    boolean failed = true;    try {        boolean interrupted = false;        // 此处又自旋了        for (;;) {            // 获取到以后节点的上一个节点,在            // 以后案例下,它是head节点            final Node p = node.predecessor();            // 第一次自旋:            //   做判断,发现上一个节点是head节点            //   于是继续执行加锁办法tryAcquire            //   因为在以后案例下,线程a睡眠了60s            //   必定还是加锁失败的,加锁失败后,            //   则走上面的逻辑,这里就是为了以后            //   节点持续上锁、因为有可能后面的            //   节点曾经开释锁了,或者说被park            //   的线程被unpark了,要持续自旋,            //   尝试获取锁            if (p == head && tryAcquire(arg)) {                setHead(node);                p.next = null; // help GC                failed = false;                return interrupted;            }                        // 判断以后这个节点是否须要park            // 什么是park?就是应用unsafe类来阻塞指定的线程,            // 在shouldParkAfterFailedAcquire办法中            // 传入的是以后节点和上一个节点,            // 大抵逻辑为:            //   1. 判断以后节点的上一个节点(即p)的waitStatus是不是SIGNAL(-1)状态,如果是则返回true            //     SIGNAL代表什么呢?下面的表格中有说到            //     SIGNAL代表这个Node是处于排队状态            //     因而能够得出一个论断:如果上一个节点也处于排队状态            //     那么我就返回true,进而执行parkAndCheckInterrupt办法,parkAndCheckInterrupt办法就是让park以后线程,让以后线程进入阻塞状态,自旋再此暂停            //   2. 如果p节点的waitStatus为正数,即不是中断或者勾销状态            //      那么它会将p的waitStatus置为-1.并返回false            //      进而进入第二次自旋,当进入第二次自旋时,若下面还未获取锁胜利,那么以后线程就会被park            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)            cancelAcquire(node);    }}

    所以,当线程t2在执行到此办法时,发现head即线程a对应的node的waitStatus为0,于是会自旋一次将head的waitStatus置为-1,而后再持续自旋,此时本人尝试加锁又失败了,此时就会进入park状态。所以就在acquireQueued办法处阻塞了,期待线程a开释锁后唤醒线程t1。至此案例2的加锁过程也完结了

四、总结

  • 本次只是基于两个简略的案例来意识ReentrantLock加锁流程的源码,其中还有很多其余的case没有波及到。这两种案例算是意识ReentrantLock加锁源码的入门吧。下篇博客将介绍下基于这两种案例的解锁过程。
  • ReentrantLock加锁流程波及到每个办法的具体步骤可查看在github中的总结:传送门
  • I am a slow walker, but I never walk backwards.