关于并发:近万字图文并茂详解AQS加锁流程

48次阅读

共计 8290 个字符,预计需要花费 21 分钟才能阅读完成。

靓仔靓女们好,咱们又见面了,我是 公众号:java 小杰要加油 ,现就职于京东,致力于分享 java 相干常识,包含但不限于并发、多线程、锁、mysql 以及 京东面试真题

AQS 介绍

  • AQS 全称是AbstractQueuedSynchronizer,是一个形象队列同步器,JUC 并发包中的大部分的并发工具类,都是基于 AQS 实现的,所以了解了 AQS 就算是四舍五入把握了 JUC 了(好一个四舍五入学习法)那么 AQS 到底有什么神奇之处呢?有什么特点呢?让咱们明天就来拔光它,一探到底!

    • state:代表被抢占的锁的状态
    • 队列:没有抢到锁的线程会包装成一个 node 节点寄存到一个双向链表中

AQS 大略长这样,如图所示:

你说我轻易画的,我可不是轻易画的啊,我是有 bear 而来, 来看下 AQS 根本属性的代码

那么这个 Node 节点又蕴含什么呢?来吧,展现。

那么咱们就能够把这个队列变的更具体一点

怎么忽然进去个 exclusiveOwnerThread?还是保留以后取得锁的线程,哪里来的呢
还记得咱们 AQS 一开始继承了一个类吗

这个 exclusiveOwnerThread 就是它外面的属性

再次回顾总结一下,AQS 属性如下:

  1. state:代表被抢占的锁的状态
  2. exclusiveOwnerThread:以后取得锁的线程
  3. 队列:没有抢到锁的线程会包装成一个 node 节点寄存到一个双向链表中

    • Node 节点:

       * thread:  以后 node 节点包装的线程
       * waitStatus:以后节点的状态
       * pre: 以后节点的前驱节点
       * next: 以后节点的后继节点
       * nextWaiter: 示意以后节点对锁的模式,独占锁的话就是 null, 共享锁为 Node()
       

好了,咱们对 AQS 大略是什么货色什么构造长什么样子有了个分明的认知,上面咱们间接上硬菜,从源码角度剖析下,AQS 加锁,它这个构造到底是怎么变动的呢?

注:以下剖析的都是独占模式下的加锁

  • 独占模式 : 锁只容许一个线程取得 NODE.EXCLUSIVE
  • 共享模式:锁容许多个线程取得 NODE.SHARED

AQS 加锁源码——acquire

 public final void acquire(int arg) {if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();}

乍一看这是什么啊,没关系,咱们能够把它画成流程图不便咱们了解,流程图如下

上面咱们来一个一个剖析,图文并茂,来吧宝贝儿。

AQS 加锁源码——tryAcquire

 protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
    }

这是什么状况?怎么间接抛出了异样?其实这是由 AQS 子类重写的办法,就相似 lock 锁,由子类定义尝试获取锁的具体逻辑

咱们平时应用 lock 锁时往往如下(若不想看 lock 锁怎么实现的能够间接跳转到下一节

ReentrantLock lock = new ReentrantLock();
        lock.lock();
        try{//todo}finally {lock.unlock();
        }

咱们看下 lock.lock() 源码

    public void lock() {sync.lock();
    }

这个 sync 又是什么呢,咱们来看下 lock 类的总体属性就好了

所以咱们来看下 默认非偏心锁的加锁实现

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        
        final void lock() {
            // 将 state 状态从 0 设为 1 CAS 形式
            if (compareAndSetState(0, 1))
                // 如果设定胜利的话,则将以后线程(就是本人)设为占有锁的线程
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // 设置失败的话,就以后线程没有抢到锁,而后进行 AQS 父类的这个办法
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            // 调用非偏心锁的办法
            return nonfairTryAcquire(acquires);
        }
    }

当初压力又来到了 nonfairTryAcquire(acquires) 这里

 final boolean nonfairTryAcquire(int acquires) {
            // 取得以后线程
            final Thread current = Thread.currentThread();
            // 取得以后锁的状态
            int c = getState();
            // 如果锁的状态是 0 的话,就表明还没有线程获取到这个锁
            if (c == 0) {
                // 进行 CAS 操作,将锁的状态改为 acquires,因为是可重入锁,所以这个数字可能是 >0 的数字
                if (compareAndSetState(0, acquires)) {
                    // 将以后持有锁的线程设为本人
                    setExclusiveOwnerThread(current);
                    // 返回 获取锁胜利
                    return true;
                }
            }// 如果以后锁的状态不是 0,判断以后获取锁的线程是不是本人,如果是的话
            else if (current == getExclusiveOwnerThread()) {
                // 则重入数加 acquires(这里 acquires 是 1)1->2  3->4 这样
                int nextc = c + acquires;
                if (nextc < 0) // overflow   异样检测
                    throw new Error("Maximum lock count exceeded");
                // 将锁的状态设为以后值
                setState(nextc);
                // 返回获取锁胜利
                return true;
            }
            // 以后获取锁的线程不是本人,获取锁失败,返回
            return false;
        }

由此可见,回到方才的问题,AQS 中的 tryAcquire 是由子类实现具体逻辑的

AQS 加锁源码——addWaiter

如果咱们获取锁失败的话,就要把以后线程包装成一个 Node 节点,那么具体是怎么包装的呢,也须要化妆师经纪人吗?
咱们来看下源码就晓得了
addWaiter(Node.EXCLUSIVE), arg) 这就代表增加的是独占模式的节点

 private Node addWaiter(Node mode) {
        // 将以后线程包装成一个 Node 节点
        Node node = new Node(Thread.currentThread(), mode);
        // 申明一个 pred 指针指向尾节点
        Node pred = tail;
        // 尾节点不为空
        if (pred != null) {
            // 将以后节点的前置指针指向 pred
            node.prev = pred;
            //CAS 操作将以后节点设为尾节点,tail 指向以后节点
            if (compareAndSetTail(pred, node)) {
                //pred 下一节点指针指向以后节点
                pred.next = node;
                // 返回以后节点(此时以后节点就曾经是尾节点)return node;
            }
        }
        // 如果尾节点为空或者 CAS 操作失败
        enq(node);
        return node;
    }

其中 node 的构造函数是这样的

 Node(Thread thread, Node mode) {     // Used by addWaiter
      this.nextWaiter = mode;
      this.thread = thread;
  }

咱们能够通过图解的办法来更直观的来看下 addWaiter 做了什么

由图可知,如果已经尾节点不为空的时候,node 节点会退出到队列开端,那么如果已经尾节点为空或者 CAS 失败调用
enq(node); 会怎么样呢?

AQS 加锁源码——enq

 private Node enq(final Node node) {
        // 死循环,直到有返回值
        for (;;) {
            // 申明一个 t 的指针指向 tail
            Node t = tail;
            // 如果尾巴节点为空
            if (t == null) { // Must initialize
                // 则 CAS 设置一个节点为头节点(头节点并没有包装线程!)这也是提早初始化头节点
                if (compareAndSetHead(new Node()))
                    // 将尾指针指向头节点
                    tail = head;
            } else {  // 如果尾节点不为空,则阐明这是 CAS 失败
              // 将 node 节点前驱节点指向 t
                node.prev = t;
                // 持续 CAS 操作将本人设为尾节点
                if (compareAndSetTail(t, node)) {
                    // 将 t 的 next 指针指向本人(此时本人真的是尾节点了)t.next = node;
                    // 返回本人节点的前置节点, 队列的倒数第二个
                    return t;
                }
            }
        }
    }
  • 队列中的 头节点 ,是 提早初始化 的,加锁时用到的时候才去输入话,并不是一开始就有这个头节点的
  • 头节点并不保留任何线程

end 尾分叉

     // 将 node 节点前驱节点指向 t              
      node.prev = t;                    1
      // 持续 CAS 操作将本人设为尾节点
      if (compareAndSetTail(t, node)) { 2
          // 将 t 的 next 指针指向本人(此时本人真的是尾节点了)t.next = node;                3
          // 返回本人节点的前置节点, 队列的倒数第二个
          return t;
      }

咱们留神到,enq函数有下面三行代码,3 是在 2 执行胜利后才会执行的,因为咱们这个代码无时无刻都在并发执行,存在一种可能就是

1 执行胜利,2 执行失败(cas 并发操作),3 没有执行,所以就只有一个线程 1,2,3 都执行胜利,其余线程 1 执行胜利,2,3 没有执行胜利,呈现尾分叉状况,如图所示

这些分叉失败的节点,在当前的循环中他们还会执行 1,直总会指向新的尾节点,1,2,3 这么执行,早晚会入队

AQS 加锁源码——acquireQueued

final boolean acquireQueued(final Node node, int arg) {
        // 是否有异样产生
        boolean failed = true;
        try {
            // 中断标记
            boolean interrupted = false;
            // 开始自旋,要么以后线程尝试去拿到锁,要么抛出异样
            for (;;) {
                // 取得以后节点的上一个节点
                final Node p = node.predecessor();
                // 判断这个节点是不是头节点,如果是头节点则获取锁
                if (p == head && tryAcquire(arg)) {
                    // 将此节点设为头节点
                    setHead(node);
                    // 原来的头节点出队
                    p.next = null; // help GC
                    failed = false;
                    // 返回是否中断
                    return interrupted;
                }
                // 阐明 p 不是头节点
                // 或者
                // p 是头节点然而获取锁失败
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 中断标记设为 true
                    interrupted = true;
            }
        } finally {
           // 如果有异样产生的话
            if (failed)
            // 勾销以后线程竞争锁,将以后 node 节点状态设置为 cancel
                cancelAcquire(node);
        }
    }

其中有一行代码是setHead(node);

 private void setHead(Node node) {
        head = node;
        node.thread = null;  // 将 head 节点的线程置为空
        node.prev = null;
    }
  • 为什么要将头节点的线程置为空呢,是因为在 tryAcquire(arg)中就曾经记录了以后获取锁的线程了,在记录就多此一举了,咱们看前文中提到的 nonfairTryAcquire(acquires) 其中有一段代码
   if (compareAndSetState(0, acquires)) {
          // 将以后持有锁的线程设为本人   
          setExclusiveOwnerThread(current);  
          // 返回 获取锁胜利
          return true;
      }

可见 setExclusiveOwnerThread(current); 就曾经记录了取得锁的线程了

咱们 acquireQueued 返回值是中断标记,true 示意中断过,false 示意没有中断过,还记得咱们一开始吗,回到最后的终点

public final void acquire(int arg) {if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();}

如果返回了 true, 代表此线程有中断过,那么调用 selfInterrupt(); 办法,将以后线程中断一下

 static void selfInterrupt() {Thread.currentThread().interrupt();}

AQS 加锁源码——shouldParkAfterFailedAcquire

程序运行到这里就阐明

 // 阐明 p 不是头节点
// 或者
// p 是头节点然而获取锁失败
if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    // 中断标记设为 true
    interrupted = true;
}

咱们来剖析下 shouldParkAfterFailedAcquire(p, node) 的源码外面到底做了什么?

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取以后节点的前置节点的状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
           // 如果是 SIGNAL(-1)状态间接返回 true, 代表此节点能够挂起
            // 因为前置节点状态为 SIGNAL 在适当状态 会唤醒后继节点
            return true;
        if (ws > 0) {
            // 如果是 cancelled
            do {
                // 则从后往前依此跳过 cancelled 状态的节点
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            // 将找到的符合标准的节点的后置节点指向以后节点
            pred.next = node;
        } else {
            // 否则将前置节点期待状态设置为 SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

其中的 node.prev = pred = pred.prev; 能够看成

pred = pred.prev;

node.prev = pred;

可见一顿操作后,队列中跳过了节点状态为 cancelled 的节点

AQS 加锁源码——parkAndCheckInterrupt

shouldParkAfterFailedAcquire 返回 true 时就代表容许以后线程挂起而后就执行 parkAndCheckInterrupt()这个函数

 private final boolean parkAndCheckInterrupt() {
        // 挂起以后线程   线程卡在这里不再下执行,直到 unpark 唤醒
        LockSupport.park(this);
        return Thread.interrupted();}

所以以后线程就被挂起啦

AQS 加锁源码——cancelAcquire

咱们还记得前文中提到 acquireQueued 中的一段代码

  try { } finally {if (failed)
                cancelAcquire(node);
        }

这是抛出异样时解决节点的代码,上面来看下源代码

private void cancelAcquire(Node node) {
        // 过滤掉有效节点
        if (node == null)
            return;
        // 以后节点线程置为空
        node.thread = null;
        // 获取以后节点的前一个节点
        Node pred = node.prev;
        // 跳过勾销的节点
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // 记录过滤后的节点的后置节点
        Node predNext = pred.next;
        // 将以后节点状态改为 CANCELLED
        node.waitStatus = Node.CANCELLED;

        // 如果以后节点是 tail 尾节点 则将从后往前找到第一个非勾销状态的节点设为 tail 尾节点
        if (node == tail && compareAndSetTail(node, pred)) {
            // 如果设置胜利,则 tail 节点前面的节点会被设置为 null
            compareAndSetNext(pred, predNext, null);
        } else {

            int ws;
            // 如果以后节点不是首节点的后置节点
            if (pred != head &&  // 并且
                    // 如果前置节点的状态是 SIGNAL
                ((ws = pred.waitStatus) == Node.SIGNAL || // 或者
                        // 状态小于 0 并且设置状态为 SIGNAL 胜利
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                    // 并且前置节点线程不为 null 时
                pred.thread != null) {
                // 记录下以后节点的后置节点
                Node next = node.next;
                // 如果后置节点不为空 并且后置节点的状态小于 0
                if (next != null && next.waitStatus <= 0)
                    // 把以后节点的前驱节点的后继指针指向以后节点的后继节点
                    compareAndSetNext(pred, predNext, next);
            } else {
                // 唤醒以后节点的下一个节点
                unparkSuccessor(node);
            }
            // 将以后节点下一节点指向本人
            node.next = node; // help GC
        }
    }

看起来太简单了,不过没关系,咱们能够拆开看,其中有这一段代码

       // 以后节点线程置为空
        node.thread = null;
        // 获取以后节点的前一个节点
        Node pred = node.prev;
        // 跳过勾销的节点
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // 记录过滤后的节点的后置节点
        Node predNext = pred.next;
        // 将以后节点状态改为 CANCELLED
        node.waitStatus = Node.CANCELLED;

如图所示

通过 while 循环从后往前找到 signal 状态的节点,跳过两头 cancelled 状态的节点,同时将以后节点状态改为 CANCELLED

咱们能够把这简单的判断条件转换成图来直观的看一下

  • 以后节点是尾节点时, 队列变成这样

  • 以后节点是 head 后继节点

  • 以后节点不是尾节点也不是头节点的后继节点(队列中的某个一般节点)

总结

太不容易了家人们,终于到了这里,咱们再来 总结一下整体的流程

    public final void acquire(int arg) {if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();}

1. 基于 AQS 实现的子类去实现 tryAcquire 尝试获取锁

2. 如果获取锁失败,则把以后节点通过 addWaiter 办法包装成 node 节点插入队列

  • 如果尾节点为空或者 CAS 操作失败则调用 enq 办法保障胜利插入到队列,若节点为空则初始化头节点

3.acquireQueued办法,入队后的节点持续获取锁(此节点的前置节点是头节点)或者挂起

    • shouldParkAfterFailedAcquire判断节点是否应该挂起

      • 如果以后节点的前置节点是 signal 状态,则返回 true, 能够挂起
      • 如果以后节点的前置节点是 cancelled,则队列会从以后节点的前一个节点开始从后向前遍历跳过 cacelled 状态的节点,将以后节点和非 cacelled 状态的节点连接起来,返回 false, 不能够挂起
      • 否则将前置节点期待状态设置为 SIGNAL,返回 false, 不能够挂起
    • parkAndCheckInterrupt挂起以后线程
    • cancelAcquire将以后节点状态改为 cancelld
    1. selfInterrupt(); 设置中断标记,将中断补上

    往期精彩举荐

    • 京东这道面试题你会吗?
    • ?线程池为什么能够复用,我是蒙圈了。。。
    • 学会了 volatile,你变心了,我看到了
    • mysql 能够靠索引,而我只能靠打工,加油,打工人!

    絮絮叨叨

    如果大家感觉这篇文章对本人有一点点帮忙的话

    若文章有误欢送指出 ,靓仔靓女,咱们下篇文章见, 扫一扫,关注我,开启咱们的故事

    正文完
     0