关于java:并发系列四基于两种案例来认识ReentrantLock源码加锁过程公平锁

6次阅读

共计 8988 个字符,预计需要花费 23 分钟才能阅读完成。

前言

  • 上篇文章咱们证实了 synchronized 关键字的个性:无锁、偏差锁、轻量锁、重 (chong) 偏差、重 (chong) 轻量、分量锁 。能够说 synchronized 是 jvm 层面实现同步的形式。在 jdk 中,存在一个叫java.util.concurrent 的包,简称 JUC,它是一个 jdk 层面的并发包,外面存在了大量与并发编程相干的 api,其中最代表意义的就是atomic 和 lock 两种类别,前者是基于乐观锁 CAS(Compare And Swap) 的实现,后者是基于 AQS(Abstract Queued Synchronizer) 实现。本文将具体解说下 AQS 原理以及依据两个案例来解读 ReentrantLock 源码。
  • 两个案例:

    1. 线程 A 独自加锁
    2. 线程 A 正在持有锁的过程中,线程 t1 来加锁

一、AQS 原理

  • AQS 简称 Abstract Queued Synchronizer,它的外围是基于一个双向链表组成的队列(CLH 队列) + volatile 关键字润饰的 int 类型变量实现的。(对于 volatile 关键字能够参考其余博主的一些总结: 传送门),大抵外围能够以如下图来出现:

    简略总结就是:外部应用双向链表保护了一个队列,其中 Node 数据结构为此队列的基石,外部保护了 prev(指向上一个节点 )、next( 指向下一个节点 )、waitStatus( 以后 node 的状态 )、thread( 以后保护的线程 ) 四个重要的属性。其中 waitStatus 别离有如下取值:
Node 中 waitStatus 具体取值 含意
CANCELLED(1) 中断或勾销,此状态下的节点会从队列中移除
SIGNAL(-1) 此状态下的节点肯定是在队列排队中
CONDITION(-2) 条件阻塞,比如说外部因 Condition 而阻塞的节点
PROPAGATE(-3) 示意下一个 acquireShared 应该无条件流传
0 默认状态

除此之外,队列中还保护了三个属性,head(指向队列中的头节点 )、state( 锁的状态 )、tail( 指向队列中的尾节点)。其中,state 的取值有两种状况,将以如下表展现进去:

AQS 中 state 具体取值 含意
0 示意以后锁没有被线程持有
1 示意以后锁正在被线程持有
大于 1 示意以后锁被线程重入了 (重入锁), 这里要留神:ReentrantLock 重入了几次,就要开释几次锁

二、案例 1:线程 A 独自加锁

  • 代码如下:

    public class SimpleThreadLock {static ReentrantLock lock = new ReentrantLock(true);
    
        public static void main(String[] args) throws InterruptedException {Thread a = new Thread(() -> {
                try {lock.lock();
                    System.out.println("Get lock");
                } catch (Exception e) {e.printStackTrace();
                } finally {lock.unlock();
                }
            }, "线程 a");
    
            a.start();
            a.join();
            System.out.println("end");
        }
    }

    代码也比较简单,就是在主线程中创立了一个线程,并且外部去应用 ReentrantLock 加锁,获取到锁后就打印出 Get lock 这句话,当 t1 线程执行完后再继续执行主线程的逻辑。这里就不一步步演示断点了,间接上源码。

  • 这里先阐明下 ReentrantLock 重载的两个构造方法

    // 默认非偏心锁
    public ReentrantLock() {sync = new NonfairSync();
    }
    
    // 若传入 true 则是偏心锁
    public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}

    因为咱们传入了 true 进去,所以此时,它是一把偏心锁。

  • lock.lock()办法,因为咱们指定了应用偏心锁,所以最终会进入 ReentrantLock 外部保护的 FairSync 类的 lock 办法

    // FairSync 类下的 lock 办法
    final void lock() {acquire(1);
    }

    于是,咱们须要找到 acquire 办法,此办法为 AQS(父类 AbstractQueuedSynchronizer)的办法,所以最终会进入如下这么一段代码:

    public final void acquire(int arg) {if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();}

    这段代码,看似很精简,然而它做的事真的太多了。稀释的才是精髓呀!好了,咱们不偏题,持续依照咱们的主题:线程 A 独自加锁。不过要持续往下看,还是要加深下 acquire 办法的含意,咱们必须要tryAcquire 办法返回 false,能力持续走 if 条件中前面的逻辑,以及 if 条件外部的逻辑。于是,咱们间接看 tryAcquire 办法源码:

  • tryAcquire 办法

    protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
    }

    tryAcquire 办法是一个 protected 办法,外部间接抛出了一个异样,还记得咱们是从哪个类掉用到父类 AbstractQueuedSynchronizer 的 acquire 办法的?没错,就是 FairSync 类。那么咱们就间接定位到 FairSync 类的 tryAcquire 办法呗。

    protected final boolean tryAcquire(int acquires) {
        // 拿到以后线程,也就是线程 A
        final Thread current = Thread.currentThread();
    
        // 拿到以后 aqs 的 state 变量,咱们没有批改过它,// 默认为 0
        int c = getState();
        if (c == 0) {
            // 进入此逻辑,此逻辑跟 acquire 办法有点相似
            // 必须要 hasQueuedPredecessors()办法返回 false
            // 能力持续往下执行,于是咱们把 hasQueuedPredecessors 的源码也贴出来
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
  • hasQueuedPredecessors办法源码

    public final boolean hasQueuedPredecessors() {
        // 拿到 aqs 中的 tail
        Node t = tail; 
        // 拿到 aqs 中的 head
        Node h = head;
        Node s;
    
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

    此办法涵盖的情景比拟多,然而就以后情景而言,它很容易了解,在以后情景中,咱们压根没操作过 tail 和 head 那么 h 必定等于 t,所以此办法返回 false,返回 false 后,咱们回到 FairSync 类的 tryAcquire 办法,

    protected final boolean tryAcquire(int acquires) {
        // .... 上半局部代码省略
        if (c == 0) {
             // 在以后情景下,hasQueuedPredecessors 返回的是 false
             // 也就是说会持续走 if 前面的逻辑,// if 前面的逻辑就是执行 CAS 操作,// 将 state 属性从 0 设置成 1
             // 因为此时只有一个线程在执行,// 这个 cas 操作肯定是胜利的
             // cas 胜利后就会执行 setExclusiveOwnerThread 代码,这段代码很有用
             // 它是一个赋值的操作,也就是记录
             // 以后领有锁的线程
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
                return true;
            }
        }
        // .... 下半局部 else if 逻辑也省略了
        return false;
    }

    通过上述代码中的正文,咱们能够发现,线程 A 加锁胜利后会返回 true,至此,tryAcquire 的返回值为 true。还记的咱们是从哪个办法进来的吗?是的,是从父类 AbstractQueuedSynchronizer 的 acquire 办法进来的,下面总结到了,只有当 tryAcquire 返回 false,才会持续往下执行。至此,线程 A 独自加锁的案例就完结了。通过这么一个单线程加锁的案例,如果你认为 AQS 很简略的话,那就大错特错了,单线程加锁的案例中,咱们仅应用到了 AQS 中的 state 变量,CLH 队列却始终没有波及到,而且从加锁到加锁完结的整个过程,咱们连一个 Node 类型的数据结构都没有看到过。那 Node 类型的数据结构什么时候会被用到呢? 咱们来看下一个案例 线程 A 正在持有锁的过程中,线程 t1 来加锁

三、案例 2:线程 A 正在持有锁的过程中,线程 t1 来加锁

  • 同样的,咱们革新下代码:

    public class TwoThreadLock {static ReentrantLock lock = new ReentrantLock(true);
    
        public static void main(String[] args) throws InterruptedException {new Thread(() -> {
                try {lock.lock();
                    System.out.println("Thread a get lock");
                    TimeUnit.SECONDS.sleep(60);
                } catch (Exception e) {e.printStackTrace();
                } finally {lock.unlock();
                }
            }, "线程 a").start();
    
            Thread t1 = new Thread(() -> {
                try {lock.lock();
                    System.out.println("Thread t1 get lock");
                } catch (Exception e) {e.printStackTrace();
                } finally {lock.unlock();
                }
            }, "线程 t1");
    
            t1.start();
            t1.join();
    
            System.out.println("end");
        }
    }

    上段代码,毫无疑问,线程 t1 在调用 lock.lock()办法时,就阻塞到那里了,要等线程 a 睡 60s 后才会继续执行,那么这外面到底做了哪些事呢?咱们来一起钻研下。

  • 同案例 1,应用的是偏心锁,最终必定会调用到 tryAcquire 办法去,咱们这次就一次性的把 tryAcquire 办法给讲清楚

    protected final boolean tryAcquire(int acquires) {
        // 拿到以后线程,也就是线程 t1
        final Thread current = Thread.currentThread();
    
        // 拿到以后 aqs 的 state 变量,此时的 c 是多少呢?// 没错,是 1,因为锁曾经被线程 A 占有了,此时的
        // state 为 1。于是它会走 else if 逻辑
        int c = getState();
        if (c == 0) {
            // 进入此逻辑,此逻辑跟 acquire 办法有点相似
            // 必须要 hasQueuedPredecessors()办法返回 false
            // 能力持续往下执行,于是咱们把 hasQueuedPredecessors 的源码也贴出来
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 走了 else if 逻辑,它也发现以后持有锁的线程不是本人呀,于是间接 return false
        // 这里顺带解释下这个 else if 的逻辑,这个 else if
        // 就是判断以后调用 lock 办法的线程是不是和以后持有
        // 锁的线程一样,如果是一样的,则将 state + 1 并赋值给 nextc 属性
        // 这就示意了 ReentrantLock 反对重入性
        // 那么什么时候会呈现 nextc 属性小于 0 的状况呢?// nextc 是一个 int 类型,当超过了它的存储返回后
        // 会呈现小于 0 的状况 ===> 也就是说 ReentrantLock
        // 的重入次数最大为反对 int 类型最大值
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

    通过上述代码块中的正文可知,线程 t1 的加锁流程并没有这么顺利,在 tryAcquire 办法中返回了 false,那这代表了什么呢?是的,它代表着线程 t1 能够持续走 acquire 前面的逻辑了,咱们持续把 acquire 办法贴出来:

    public final void acquire(int arg) {
        // 在案例 2 的状况下,tryAcquire 办法返回了 false
        // 于是会执行前面的 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
        // 当 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)返回了 true 才会执行外部的 selfInterrupt()办法
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();}

    于是,咱们先理解下 addWaiter(Node.EXCLUSIVE) 办法,它的源码如下:

    private Node addWaiter(Node mode) {
        // 此时的 mode 是由上述代码块传入的,// 它的值为 Node.EXCLUSIVE ===> 这是一个空节点,// 值为 null,// 创立了一个 node 节点, 外部保护了以后线程(线程 t1),并且它的 next 节点为 null(有 Node 的构造方法可知)
        Node node = new Node(Thread.currentThread(), mode);
        // 拿到 aqs 队列中的 tail 属性,// 此时必定为 null 啊(aqs 队列都没初始化,哪来的队尾节点)
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        
        // 此时 pred 为 null,即不会走下面的 if 逻辑,于是执行 enq 办法,记住:此时传入 enq 办法时的形参为新 new 进去的 Node
        // 外部保护的是以后线程(线程 t1)
        enq(node);
        return node;
    }

    下面代码块的正文也说了,最终会执行到 enq 办法,enq 方干啥的呢?猜一下?是的,它就是初始 aqs 队列的。咱们来看一下它的源码:

    /**
     形参 node 外部保护的线程为 t2, 并且它的 next 属性指向为 null
     */
    private Node enq(final Node node) {
        // 此处写了一个死循环,也就是常说的自旋锁
        for (;;) {
            // 自旋的过程中
            // 第一次自旋://  拿到队尾元素, 此时队列都没有,必定为 null
            //  发现队列中的 tail 指向的是 null,于是初始化 tail 节点,并让 aqs 中的 head 指向了 tail,//  至此,aqs 繁难版本的队列就进去啦,//  head 和 tail 指向同一个 node,并且此 node 外部
            //  保护的 thread、prev、next、waitStatus 全是默认值
            // 因为是 if else 逻辑,所以初始化 tail 属性后,就会进行第二次自旋
            // 第二次自旋:
            //  再次拿到 tail, 因为第一次自旋把 tail 给初始化了,所以此时拿到的 tail 不为 null, 于是走了 else 逻辑
            //  在 else 中,次要操作的是形参 node, 还记得形参 node 是什么吗? ==> 保护以后线程 (线程 t1) 的 node 节点,//  此时会将 node 的上一个节点指向 t 节点
            //  同时进行 cas 操作,将 node 节点变成 tail
            //  当 cas 胜利后,再设置 t 的 next 指向 node
            //  最终返回这个 t.
            //  此时此刻这个 t 是什么样的数据结构呢?//  此时的这个 t 就是队列中的 head 节点了,//  并且它的 next 为 node(保护线程 t1)
            //  所以此时此刻队列中当初有两个元素了
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

    代码中的正文形容了 enq 的过程,我专门画了一个图来形容 aqs 队列产生的过程,帮忙了解:

    enq 初始化 aqs 队列的过程后,就执行到了 addWaiter 办法的进口了

    private Node addWaiter(Node mode) {
        // .... 上述代码省略
        // enq 初始化队列后,会将 node 进行返回
        // 这个 node 就是保护线程 t1 的 node,它曾经是
        // 队列中的队列了
        enq(node);
        return node;
    }

    addWaiter 办法执行完了之后,将继续执行 acquire 办法

    public final void acquire(int arg) {if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();}

    此时应该接着执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 了,因为 addWaiter 办法曾经执行实现,返回的是领有以后线程的 node,同时它也是以后队列中的队尾。咱们来查看下 acquireQueued 的源码:

    /**
     node 形参为保护以后线程 (t1) 的节点,
     同时 arg 为 1
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 此处又自旋了
            for (;;) {
                // 获取到以后节点的上一个节点,在
                // 以后案例下,它是 head 节点
                final Node p = node.predecessor();
                // 第一次自旋://   做判断,发现上一个节点是 head 节点
                //   于是继续执行加锁办法 tryAcquire
                //   因为在以后案例下,线程 a 睡眠了 60s
                //   必定还是加锁失败的,加锁失败后,//   则走上面的逻辑,这里就是为了以后
                //   节点持续上锁、因为有可能后面的
                //   节点曾经开释锁了,或者说被 park
                //   的线程被 unpark 了,要持续自旋,//   尝试获取锁
                if (p == head && tryAcquire(arg)) {setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                
                // 判断以后这个节点是否须要 park
                // 什么是 park?就是应用 unsafe 类来阻塞指定的线程,
                // 在 shouldParkAfterFailedAcquire 办法中
                // 传入的是以后节点和上一个节点,// 大抵逻辑为://   1. 判断以后节点的上一个节点 (即 p) 的 waitStatus 是不是 SIGNAL(-1)状态,如果是则返回 true
                //     SIGNAL 代表什么呢?下面的表格中有说到
                //     SIGNAL 代表这个 Node 是处于排队状态
                //     因而能够得出一个论断:如果上一个节点也处于排队状态
                //     那么我就返回 true,进而执行 parkAndCheckInterrupt 办法,parkAndCheckInterrupt 办法就是让 park 以后线程,让以后线程进入阻塞状态,自旋再此暂停
                //   2. 如果 p 节点的 waitStatus 为正数,即不是中断或者勾销状态
                //      那么它会将 p 的 waitStatus 置为 -1. 并返回 false
                //      进而进入第二次自旋,当进入第二次自旋时,若下面还未获取锁胜利,那么以后线程就会被 park
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {if (failed)
                cancelAcquire(node);
        }
    }

    所以,当线程 t2 在执行到此办法时,发现 head 即线程 a 对应的 node 的 waitStatus 为 0,于是会自旋一次将 head 的 waitStatus 置为-1,而后再持续自旋,此时本人尝试加锁又失败了,此时就会进入 park 状态。所以就在acquireQueued 办法处阻塞了,期待线程 a 开释锁后唤醒线程 t1。至此案例 2 的加锁过程也完结了

四、总结

  • 本次只是基于两个简略的案例来意识 ReentrantLock 加锁流程的源码,其中还有很多其余的 case 没有波及到。这两种案例算是意识 ReentrantLock 加锁源码的入门吧。下篇博客将介绍下基于这两种案例的解锁过程。
  • ReentrantLock 加锁流程波及到每个办法的具体步骤可查看在 github 中的总结:传送门
  • I am a slow walker, but I never walk backwards.
正文完
 0