JUCAQS共享式源码分析

6次阅读

共计 5418 个字符,预计需要花费 14 分钟才能阅读完成。

一、写在前面


上篇给大家聊了独占式的源码,具体参见《J.U.C|AQS 独占式源码分析》

这一章我们继续在 AQS 的源码世界中遨游,解读共享式同步状态的获取和释放。

二、什么是共享式


共享式与独占式唯一的区别是在于同一时刻可以有多个线程获取到同步状态。

我们以读写锁为例来看两者,一个线程在对一个资源文件进行读操作时,那么这一时刻对于文件的写操作均被阻塞,而其它线程的读操作可以同时进行。
当写操作要求对资源独占操作,而读操作可以是共享的,两种不同的操作对同一资源进行操作会是什么样的?看下图


共享式访问资源,其他共享时均被允许,而独占式被阻塞。


独占式访问资源时,其它访问均被阻塞。

通过读写锁给大家一起温故下独占式和共享式概念,上一节我们已经聊过独占式,本章我们主要聊共享式。

主要讲解方法

  • protected int tryAcquireShared(int arg);共享式获取同步状态,返回值 >= 0 表示获取成功,反之则失败。
  • protected boolean tryReleaseShared(int arg):共享式释放同步状态。

三、核心方法分析


    • *

3.1 同步状态的获取

public final void acquireShared(int arg)

共享式获取同步状态的顶级入口,如果当前线程未获取到同步状态,将会加入到同步队列中等待,与独占式唯一的区别是在于同一时刻可以有多个线程获取到同步状态。

方法源码

public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

方法函数解析

  • tryAcquireShared(arg):获取同步状态,返回值大于等于 0 表示获取成功,否则失败。
  • doAcquireShared(arg):共享式获取共享状态,包含构建节点,加入队列等待,唤醒节点等操作。

源码分析

同步器的 acquireShared 和 doAcquireShared 方法

// 请求共享锁的入口
public final void acquireShared(int arg) {// 当 state != 0 并且 tryAcquireShared(arg) < 0 时才去才获取资源
        if (tryAcquireShared(arg) < 0)
            // 获取锁
            doAcquireShared(arg);
    }
// 以共享不可中断模式获取锁
private void doAcquireShared(int arg) {// 将当前线程一共享方式构建成 node 节点并将其加入到同步队列的尾部。这里 addWaiter(Node.SHARED)操作和独占式基本一样,final Node node = addWaiter(Node.SHARED);
        // 是否成功标记
        boolean failed = true;
        try {
            // 等待过程是否被中断标记
            boolean interrupted = false;
            自旋
            for (;;) {
                // 获取当前节点的前驱节点
                final Node p = node.predecessor();
                // 判断前驱节点是否是 head 节点,也就是看自己是不是老二节点
                if (p == head) {
                    // 如果自己是老二节点,尝试获取资源锁, 返回三种状态
                    // state < 0 : 表示获取资源失败
                    // state = 0: 表示当前正好线程获取到资源,此时不需要进行向后继节点传播。// state > 0: 表示当前线程获取资源锁后,还有多余的资源,需要向后继节点继续传播,获取资源。int r = tryAcquireShared(arg);
                    // 获取资源成功
                    if (r >= 0) {
                        // 当前节点线程获取资源成功后,对后继节点进行逻辑操作
                        setHeadAndPropagate(node, r);
                        // setHeadAndPropagate(node, r) 已经对 node.prev = null, 在这有对 p.next = null; 等待 GC 进行垃圾收集。p.next = null; // help GC
                        // 如果等待过程被中断了,将中断给补上。if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                // 判断状态,寻找安全点,进入 waiting 状态,等着被 unpark()或 interrupt()
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {if (failed)
                cancelAcquire(node);
        }
    }

在 acquireShared(int arg)方法中,同步器调用 tryAcquireShared(arg)方法获取同步状态,返回同步状态有两种。

当同步状态大于等于 0 时:表示可以获取到同步状态,退出自旋,在 doAcquireShared(int arg)方法中可以看到节点获取资源退出自旋的条件就是大于等于 0

小于 0 会加入同步队列中等待被唤醒。

addWaiter 和 enq 方法

// 创建节点,并将节点加入到同步队列尾部中。private Node addWaiter(Node mode) {
        // 以共享方式为线程构建 Node 节点
        Node node = new Node(Thread.currentThread(), mode);
        // 尝试快速加入到队列尾部
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            // CAS 保证原子操作,将 node 节点加入到队列尾部
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 快速加入失败,走 enq(node)方法
        enq(node);
        return node;
}
// 以自旋的方式,将 node 节点加入到队列的尾部
private Node enq(final Node node) {
        // 自旋
        for (;;) {
            // 获取尾部节点
            Node t = tail;
            // 如果 tail 节点为空,说明同步队列还没初始化,必须先进行初始化
            if (t == null) { // Must initialize
                // CAS 保证原子操作,新建一个空 node 节点并将其设置为 head 节点
                if (compareAndSetHead(new Node()))
                    // 设置成功并将 tail 也指向该节点
                    tail = head;
            } else {
                // 将 node 节点加入到队列尾部
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

这两个方法和独占式的基本相同,注释中都标明了,在这就不多做解释了。

获取资源成功后对后继节点的操作 setHeadAndPropagate 方法

private void setHeadAndPropagate(Node node, int propagate) {
        // 记录老的 head 节点,以便核对
        Node h = head; // Record old head for check below
        // 将 node 设置成 head 节点
        setHead(node);
        // 这里表示:如果资源足够(propagate > 0)或者旧头节点为空(h == null)或者旧节点的 waitStatus 为 SIGNAL(-1)或者 PROPAGATE(-3)(h.waitStatus < 0)// 或者当前 head 节点不为空或者 waitStatus 为 SIGNAL(-1)或者 PROPAGATE(-3),此时需要继续唤醒后继节点来尝试获取资源。if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            // 当前 node 节点的后继节点
            Node s = node.next;
            // 如果后节点为空或者属于共享节点
            if (s == null || s.isShared())
                // 继续尝试获取资源
                doReleaseShared();}
    }

首先将当前节点设置为 head 节点 setHead(node),其次根据条件看是否对后继节点继续唤醒。

获取资源失败进行阻塞等待 unpark

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取前驱节点的等待状态
        int ws = pred.waitStatus;
        // 如果等待状态已经为 SIGNAL(表示当前当前节点的后继节点处于等待状态,如果当前节点释放了同步状态或者被中断,则会唤醒后继节点)if (ws == Node.SIGNAL)
            // 直接返回,表示可以安心的去休息了
            return true;
        // 如果前驱的节点的状态 ws > 0(表示该节点已经被取消或者中断,也就是成无效节点,需要从同步队列中取消的)
        if (ws > 0) {
            // 循环往前需寻找,知道寻找到一个有效的安全点(一个等待状态 <= 0 的节点,排在它后面)do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
            // 注意这一波操作后,获奖取消的节点全部变成 GC 可回收的废弃链。pred.next = node;
        } else {
            // 如果前驱正常,那就把前驱的状态设置成 SIGNAL,告诉它获取资源后通知自己一下。有可能失败,人家说不定刚刚释放完呢!compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

 private final boolean parkAndCheckInterrupt() {
        // 调用 park 方法使当前节点的线程进入 waiting
        LockSupport.park(this);    
        // 返回线程中断状态
        return Thread.interrupted();}

这两个方法和独占式基本相同。

接着看 doReleaseShared 这个比较复杂

private void doReleaseShared() {
        // 注意, 这里的头结点已经是上面新设定的头结点了, 从这里可以看出, 如果 propagate=0,
        // 不会进入 doReleaseShared 方法里面, 那就有共享式变成了独占式
        for (;;) { // 死循环以防在执行此操作时添加新节点:退出条件 h == head
            Node h = head;
            // 前提条件,当前的头节点不为空,并且不是尾节点
            if (h != null && h != tail) {
                // 当前头节点的等待状态
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    // 如果当前节点的状态为 SIGNAL,则利用 CAS 将其状态设置为 0(也就是初始状态)// 这里不直接设为 Node.PROPAGATE, 是因为 unparkSuccessor(h)中,如果 ws < 0 会设置为 0,所以 ws 先设置为 0,再设置为 PROPAGATE
                    // 这里需要控制并发,因为入口有 setHeadAndPropagate 跟 release 两个,避免两次 unpark
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases 设置失败,重新循环
                    // 唤醒后继节点
                    unparkSuccessor(h);
                }
                // 如果等待状态不为 0 则利用 CAS 将其状态设置为 PROPAGATE,以确保在释放资源时能够继续通知后继节点。else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed 如果 head 期间发生了改变,则需要从新循坏
                break;
        }
    }
private void unparkSuccessor(Node node) {
        
        int ws = node.waitStatus;
        // 在此再次判断当前头节点的的状态,如果小于 0 将设置为 0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        // 获取后继节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            // 如果后继节点为空或者等待状态大于 0 直接放弃。s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                // 循环从尾部往前寻找下一个等待状态不大于 0 的节点
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 唤醒该节点的线程
        if (s != null)
            LockSupport.unpark(s.thread);
    }

3.2 共享状态释放
最后一步释放资源就比较简单了。

public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();
            return true;
        }
        return false;
    }

四、总结


在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程会加入到队列中并进行自旋,出列的(或者停止自旋)的条件时前驱节点为头节点并且成功获取了同步状态。在释放同步状态时,调用 Release 方法释放同步状态,然后唤醒头节点的后继节点。

共享式方式在唤醒后继节点获得资源后会判断当前资源是否还有多余的,如果有会继续唤醒下一个节点。

正文完
 0