JUCAQS独占式源码分析

35次阅读

共计 5033 个字符,预计需要花费 13 分钟才能阅读完成。

一、写在前面


上篇文章通过 ReentrantLock 的加锁和释放锁过程给大家聊了聊 AQS 架构以及实现原理,具体参见《J.U.C|AQS 的原理》。

理解了原理,我们在来看看再来一步一步的聊聊其源码是如何实现的。

本章给大家聊聊 AQS 中独占式获取和释放共享状态的流程,主要根据 tryAcquire(int arg) — > tryRelease(int arg) 来讲。

二、什么是独占式


AQS 的同步队列提供两种模式即独占式(EXCLUSIVE)和 共享式(SHARED)。

本章我们主要聊独占式:即同一时刻只能有一个线程获取同步状态,其它获取同步状态失败的线程则会加入到同步队列中进行等待。

主要讲解方法:

  • tryAcquire(int):独占方式。尝试获取资源,成功则返回 true,失败则返回 false。
  • tryRelease(int):独占方式。尝试释放资源,成功则返回 true,失败则返回 false。

有对同步队列不明白的请看《J.U.C| 同步队列(CLH)》

三、核心方法分析


3.1 共享状态的获取

acquire(int arg)

独占式获取同步状态的顶级入口 acquire(int arg) 方法,如果线程获取到共享这状态则直接返回, 否则把当前线程构造成独占式(node.EXCLUSIVE)模式节点并添加到同步队列尾部,直到获取到资源为止,整个过程忽略中断。

方法源码

public final void acquire(int arg) {if (!tryAcquire(arg) && 
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
            selfInterrupt();} 
 }

方法函数:

  • tryAcquire(arg):尝试获取同步状态、获取成功则直接返回。
  • addWaiter(Node.EXCLUSIVE):当同步状态获取失败时,构建一个独占式节点并将其加入到同步队列的尾部。
  • acquireQueued(Node, arg)) : 获取该节点指定数量的资源,通过自旋的方式直到获取成功,返回是该节点线程的中断状态。
  • selfInterrupt():将中断补上(因其获取资源的整个过程是忽略中断的所以最后手动将中断补上)

    源码分析

tryAcquire(arg)

protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
    }

???什么鬼?直接抛出异常?AQS 中对共享状态的获取没有提供具体的实现,等待子类根据自己的场景去实现。有没有人疑惑,那为什么不是 abstract 的尼?因为 AQS 不止是独占模式的锁需要继承它还有别人也需要继承它,总不能让别人也来实现一个无关的方法吧。

addWaiter(Node node)

private Node addWaiter(Node mode) {
// 以给定的模式来构建节点,mode 有两种模式 
//  共享式 SHARED,独占式 EXCLUSIVE;
  Node node = new Node(Thread.currentThread(), mode);
    // 尝试快速将该节点加入到队列的尾部
    Node pred = tail;
     if (pred != null) {
        node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 如果快速加入失败,则通过 anq 方式入列
        enq(node);
        return node;
    }

addWaiter(Node mode) 方法尝试将当前 Node 节点快速加入到队列的尾部,如果快速加入失败则通过 enq(node) 方法自旋加入。

enq(final Node node)

private Node enq(final Node node) {
// CAS 自旋,直到加入队尾成功        
for (;;) {
    Node t = tail;
        if (t == null) { // 如果队列为空,则必须先初始化 CLH 队列,新建一个空节点标识作为 Hader 节点, 并将 tail 指向它
            if (compareAndSetHead(new Node()))
                tail = head;
            } else {// 正常流程,加入队列尾部
                node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                }
            }
        }
    }

enq(final Node node) 方法通过自旋的方式将当前 Node 节点加入到队列尾部,直到成功为止。

注:在这不管是快速还是自旋的方式将当前 Node 节点加入到队列尾部都是通过 compareAndSetTail(t, node) 来保证线程安全的,这也是典型实现无锁化线程安全的方式,CAS 自旋 volatile 变量。

acquireQueued(final Node, int arg)

final boolean acquireQueued(final Node node, int arg) {
    // 是否拿到资源
    boolean failed = true;
        try {
            // 标记等待过程中是否被中断过
            boolean interrupted = false;
            // 自旋
           for (;;) {
            // 获取当前节点的前驱节点
           final Node p = node.predecessor();
           // 如果其前驱节点为 head 节点,说明此节点有资格去获取资源了。(可能是被前驱节点唤醒, 也可能被 interrupted 了的)if (p == head && tryAcquire(arg)) {
            // 拿到资源后将自己设置为 head 节点,setHead(node);
           // 将前驱节点 p.next = nul 在 setHead(node); 中已经将 node.prev = null 设置为空了,方便 GC 回收前驱节点,也相当于出列。p.next = null; // help GC
         failed = false;
         return interrupted;
        }
    // 如果不符合上述条件,说明自己可以休息了,进入 waiting 状态,直到被 unpark()
        if (shouldParkAfterFailedAcquire(p, node) &&
            parkAndCheckInterrupt())
         interrupted = true;
     } finally {if (failed)
            cancelAcquire(node);
     }
}

当前节点的线程在‘死循环’中尝试获取同步状态,前提是只有其前驱节点为 head 节点时才有尝试获取同步状态的资格,否则继续在同步队列中等待被唤醒。

Why?

  • 因为只有 head 是成功获取同步状态的节点,而 head 节点的线程在释放同步状态的同时,会唤醒后继节点,后继节点在被唤醒后检测自己的前驱节点是否是 head 节点,如果是则会通过自旋尝试获取同步状态。
  • 维护 CLH 的 FIFO 原则。该方法中节点自旋获取同步状态。

如下图

shouldParkAfterFailedAcquire(Node pred, Node node)

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 拿到前驱的状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
           // 如果已经告诉过前驱节点,获取到资源后通知自己下,那就可以安心的去休息了。return true;
        if (ws > 0) {
           // 如果前驱节点放弃了,那就循环一直往前找,直到找到一个正常等待状态的节点,排在他后面
            do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
            pred.next = node;
        } else {
        // 如果前驱状态为 0 或者 PROPAGATE 状态,那就把前驱状态设置成 SIGNAL,告诉它获取资源后通知下自己。compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

此方法检测自己前驱节点是否时 head 节点,如果是则尝试获取同步状态,不是则再次回到同步队列中找到一个舒适地方(也就是找到一个 waitStatus > 0 的节点,排在他后面继续等待)休息,并告诉前驱节点释放同步状态或者被中断后通知自己下(compareAndSetWaitStatus(pred, ws, Node.SIGNAL))。

注意:在此查找一个舒适区域休息(waitStatus > 0 的节点)时那些不符合条件的节点会形成了一个无效链,等待 GC 回收。

private final boolean parkAndCheckInterrupt() {
        // 调用 park 方法是线程进入 waiting 状态
        LockSupport.park(this);
        // 如果被唤醒查看是不是被中断状态
        return Thread.interrupted();}

最后调用 park 方法使节点中线程进入 wating 状态,等待被 unpark() 唤醒。

小结

  1. 请求线程首先调用 tryAcquire(arg) 方法尝试获取同步状态,成功则直接返回。
  2. 如果失败:
  • 构造一个独占式节点 Node.EXCLUSIVE
  • addWaiter(Node.EXCLUSIVE) 将该节点尝试快速加入到队列尾部,成功则直接返回该节点,失败则调用 enq(final Node node) 方法利用自旋 CAS 将该节点加入到队列尾部。
  1. 调用 acquireQueued(final Node, int arg)方法找到一个舒适的休息区,并通知前驱节点在释放同步状态或者被中断后唤醒自己从新尝试获取同步状态。
  2. 最后如果节点线程在等待时被中断,则将中断补上 selfInterrupt()

到这独占式获取共享状态已经聊完了,下面我们一起来看看释放共享状态的过程。

3.2 共享状态的释放

release(int arg)

独占式释放共享资源的顶级入口 release(int arg) 方法,彻底释放共享状态(state = 0)并唤醒其后继节点来获取共享状态。

方法源码

 public final boolean release(int arg) {if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                // 唤醒 head 节点的后继节点。unparkSuccessor(h);
            return true;
        }
        return false;
    }

源码分析
tryRelease(arg)

protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();
    }

tryRelease(int arg) 和 tryAcquire(arg) 语义基本相同,留给子类去实现。

unparkSuccessor(h)

private void unparkSuccessor(Node node) {
        // 获取当前节点的等待状态
        int ws = node.waitStatus;
        
        if (ws < 0)
            // 如果节点状态小于 0(),将其状态设置为 0
            compareAndSetWaitStatus(node, ws, 0);
         // 获取其下一个需要唤醒的节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 如果下一个节点为 null, 或者等待状态大于 0(被取消的状态)继续往下查找
            直到等待状态小于等于 0 的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // 唤醒该节点等待线程
            LockSupport.unpark(s.thread);
    }

小结

  • 首先调用实现者的 tryRelease(),失败则返回 false
  • 成功则找到下一个有效的节点并唤醒它。

到这独占式获取同步和释放同步状态的源码已经分析完了。有没有懵尼?懵了也别怕最后我们再来张流程图帮助大家理解。

结合上面源码分析,应该对 AQS 独占式获取和释放共享状态的源码有所了解了吧。

四、总结


分析了独占式同步状态的获取和释放过程,适当做下总结:在获取同步状态时,同步器维持一个同步队列,获取状态失败的线程都会加入到队列中并在队列中进行自旋,出列(或者停止自旋的)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用 tryRelease(int arg) 方法释放同步状态,然后唤醒头节点的后继节点。

正文完
 0