Java并发指南7JUC的核心类AQS详解

28次阅读

共计 6872 个字符,预计需要花费 18 分钟才能阅读完成。

一行一行源码分析清楚 AbstractQueuedSynchronizer

转自 https://www.javadoop.com/post…

在分析 Java 并发包 java.util.concurrent 源码的时候,少不了需要了解 AbstractQueuedSynchronizer(以下简写 AQS)这个抽象类,因为它是 Java 并发包的基础工具类,是实现 ReentrantLock、CountDownLatch、Semaphore、FutureTask 等类的基础。

Google 一下 AbstractQueuedSynchronizer,我们可以找到很多关于 AQS 的介绍,但是很多都没有介绍清楚,因为大部分文章没有把其中的一些关键的细节说清楚。

本文将从 ReentrantLock 的公平锁源码出发,分析下 AbstractQueuedSynchronizer 这个类是怎么工作的,希望能给大家提供一些简单的帮助。

申明以下几点:

  1. 本文有点长,但是很简单很简单很简单,主要面向读者对象为并发编程的初学者,或者想要阅读 java 并发包源码的开发者。
  2. 建议在电脑上阅读,如果你想好好地理解所有的细节,而且你从来没看过相关的分析,你可能至少需要 20 分钟仔细看所有的描述,本文后面的 1/3 以上很简单,前面的 1/4 更简单,中间的部分要好好看。
  3. 如果你不知道为什么要看这个,我想告诉你,即使你看懂了所有的细节,你可能也不能把你的业务代码写得更好
  4. 源码环境 JDK1.7,看到不懂或有疑惑的部分,最好能自己打开源码看看。Doug Lea 大神的代码写得真心不错。
  5. 有很多英文注释我没有删除,这样读者可以参考着英文说的来,万一被我忽悠了呢
  6. 本文不分析共享模式,这样可以给读者减少很多负担,只要把独占模式看懂,共享模式读者应该就可以顺着代码看懂了。而且也不分析 condition 部分,所以应该说很容易就可以看懂了。
  7. 本文大量使用我们平时用得最多的 ReentrantLock 的概念,本质上来说是不正确的,读者应该清楚,AQS 不仅仅用来实现锁,只是希望读者可以用锁来联想 AQS 的使用场景,降低读者的阅读压力
  8. ReentrantLock 的公平锁和非公平锁只有一点点区别,没有任何阅读压力
  9. 你需要提前知道什么是 CAS(CompareAndSet)

废话结束,开始。

CLH 队列

此篇博客所有源码均来自 JDK 1.8

AQS 内部维护着一个 FIFO 队列,该队列就是 CLH 同步队列。

CLH 同步队列是一个 FIFO 双向队列,AQS 依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,AQS 则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到 CLH 同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。

在 CLH 同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),其定义如下:

static final class Node {
    /** 共享 */
    static final Node SHARED = new Node();

    /** 独占 */
    static final Node EXCLUSIVE = null;

    /**
     * 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态;*/
    static final int CANCELLED =  1;

    /**
     * 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
     */
    static final int SIGNAL    = -1;

    /**
     * 节点在等待队列中,节点线程等待在 Condition 上,当其他线程对 Condition 调用了 signal() 后,改节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
     */
    static final int CONDITION = -2;

    /**
     * 表示下一次共享式同步状态获取将会无条件地传播下去
     */
    static final int PROPAGATE = -3;

    /** 等待状态 */
    volatile int waitStatus;

    /** 前驱节点 */
    volatile Node prev;

    /** 后继节点 */
    volatile Node next;

    /** 获取同步状态的线程 */
    volatile Thread thread;

    Node nextWaiter;

    final boolean isShared() {return nextWaiter == SHARED;}

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {}

    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

CLH 同步队列结构图如下:

转存失败重新上传取消

入列

学了数据结构的我们,CLH 队列入列是再简单不过了,无非就是 tail 指向新节点、新节点的 prev 指向当前最后的节点,当前最后一个节点的 next 指向当前节点。代码我们可以看看 addWaiter(Node node) 方法:

    private Node addWaiter(Node mode) {
        // 新建 Node
        Node node = new Node(Thread.currentThread(), mode);
        // 快速尝试添加尾节点
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //CAS 设置尾节点
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 多次尝试
        enq(node);
        return node;
    }

addWaiter(Node node) 先通过快速尝试设置尾节点,如果失败,则调用 enq(Node node) 方法设置尾节点

    private Node enq(final Node node) {
        // 多次尝试,直到成功为止
        for (;;) {
            Node t = tail;
            //tail 不存在,设置为首节点
            if (t == null) {if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 设置为尾节点
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

在上面代码中,两个方法都是通过一个 CAS 方法 compareAndSetTail(Node expect, Node update) 来设置尾节点,该方法可以确保节点是线程安全添加的。在 enq(Node node) 方法中,AQS 通过“死循环”的方式来保证节点可以正确添加,只有成功添加后,当前线程才会从该方法返回,否则会一直执行下去。

过程图如下:

转存失败重新上传取消

出列

CLH 同步队列遵循 FIFO,首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点,这个过程非常简单,head 执行该节点并断开原首节点的 next 和当前节点的 prev 即可,注意在这个过程是不需要使用 CAS 来保证的,因为只有一个线程能够成功获取到同步状态。过程图如下:

转存失败重新上传取消

AQS 结构

先来看看 AQS 有哪些属性,搞清楚这些基本就知道 AQS 是什么套路了,毕竟可以猜嘛!

// 头结点,你直接把它当做 当前持有锁的线程 可能是最好理解的
private transient volatile Node head;
// 阻塞的尾节点,每个新的节点进来,都插入到最后,也就形成了一个隐视的链表
private transient volatile Node tail;
// 这个是最重要的,不过也是最简单的,代表当前锁的状态,0 代表没有被占用,大于 0 代表有线程持有当前锁
// 之所以说大于 0,而不是等于 1,是因为锁可以重入嘛,每次重入都加上 1
private volatile int state;
// 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入
// reentrantLock.lock() 可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; // 继承自 AbstractOwnableSynchronizer

怎么样,看样子应该是很简单的吧,毕竟也就四个属性啊。

AbstractQueuedSynchronizer 的等待队列示意如下所示,注意了,之后分析过程中所说的 queue,也就是阻塞队列不包含 head,不包含 head,不包含 head。

转存失败重新上传取消

等待队列中每个线程被包装成一个 node,数据结构是链表,一起看看源码吧:

static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    // 标识节点当前在共享模式下
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    // 标识节点当前在独占模式下
    static final Node EXCLUSIVE = null;

    // ======== 下面的几个 int 常量是给 waitStatus 用的 ===========
    /** waitStatus value to indicate thread has cancelled */
    // 代码此线程取消了争抢这个锁
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    // 官方的描述是,其表示当前 node 的后继节点对应的线程需要被唤醒
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    // 本文不分析 condition,所以略过吧,下一篇文章会介绍这个
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    // 同样的不分析,略过吧
    static final int PROPAGATE = -3;
    // =====================================================

    // 取值为上面的 1、-1、-2、-3,或者 0(以后会讲到)
    // 这么理解,暂时只需要知道如果这个值 大于 0 代表此线程取消了等待,// 也许就是说半天抢不到锁,不抢了,ReentrantLock 是可以指定 timeouot 的。。。volatile int waitStatus;
    // 前驱节点的引用
    volatile Node prev;
    // 后继节点的引用
    volatile Node next;
    // 这个就是线程本尊
    volatile Thread thread;

}

Node 的数据结构其实也挺简单的,就是 thread + waitStatus + pre + next 四个属性而已,大家先要有这个概念在心里。

上面的是基础知识,后面会多次用到,心里要时刻记着它们,心里想着这个结构图就可以了。下面,我们开始说 ReentrantLock 的公平锁。多嘴一下,我说的阻塞队列不包含 head 节点。

转存失败重新上传取消

首先,我们先看下 ReentrantLock 的使用方式。

// 我用个 web 开发中的 service 概念吧
public class OrderService {
    // 使用 static,这样每个线程拿到的是同一把锁,当然,spring mvc 中 service 默认就是单例,别纠结这个
    private static ReentrantLock reentrantLock = new ReentrantLock(true);

    public void createOrder() {
        // 比如我们同一时间,只允许一个线程创建订单
        reentrantLock.lock();
        // 通常,lock 之后紧跟着 try 语句
        try {// 这块代码同一时间只能有一个线程进来 ( 获取到锁的线程),// 其他的线程在 lock() 方法上阻塞,等待获取到锁,再进来
            // 执行代码...
            // 执行代码...
            // 执行代码...
        } finally {
            // 释放锁
            reentrantLock.unlock();}
    }
}

ReentrantLock 在内部用了内部类 Sync 来管理锁,所以真正的获取锁和释放锁是由 Sync 的实现类来控制的。

abstract static class Sync extends AbstractQueuedSynchronizer {

}

Sync 有两个实现,分别为 NonfairSync(非公平锁)和 FairSync(公平锁),我们看 FairSync 部分。

public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}

线程抢锁

很多人肯定开始嫌弃上面废话太多了,下面跟着代码走,我就不废话了。

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
      // 争锁
    final void lock() {acquire(1);
    }
      // 来自父类 AQS,我直接贴过来这边,下面分析的时候同样会这样做,不会给读者带来阅读压力
    // 我们看到,这个方法,如果 tryAcquire(arg) 返回 true, 也就结束了。// 否则,acquireQueued 方法会将线程压到队列中
    public final void acquire(int arg) { // 此时 arg == 1
        // 首先调用 tryAcquire(1) 一下,名字上就知道,这个只是试一试
        // 因为有可能直接就成功了呢,也就不需要进队列排队了,// 对于公平锁的语义就是:本来就没人持有锁,根本没必要进队列等待 (又是挂起,又是等待被唤醒的)
        if (!tryAcquire(arg) &&
            // tryAcquire(arg) 没有成功,这个时候需要把当前线程挂起,放到阻塞队列中。acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {selfInterrupt();
        }
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    // 尝试直接获取锁,返回值是 boolean,代表是否获取到锁
    // 返回 true:1. 没有线程在等待锁;2. 重入锁,线程本来就持有锁,也就可以理所当然可以直接获取
    protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();
        int c = getState();
        // state == 0 此时此刻没有线程持有锁
        if (c == 0) {
            // 虽然此时此刻锁是可以用的,但是这是公平锁,既然是公平,就得讲究先来后到,// 看看有没有别人在队列中等了半天了
            if (!hasQueuedPredecessors() &&
                // 如果没有线程在等待,那就用 CAS 尝试一下,成功了就获取到锁了,// 不成功的话,只能说明一个问题,就在刚刚几乎同一时刻有个线程抢先了 =_=
                // 因为刚刚还没人的,我判断过了 

更多内容请关注微信公众号【Java 技术江湖】

一位阿里 Java 工程师的技术小站。作者黄小斜,专注 Java 相关技术:SSM、SpringBoot、MySQL、分布式、中间件、集群、Linux、网络、多线程,偶尔讲点 Docker、ELK,同时也分享技术干货和学习经验,致力于 Java 全栈开发!(关注公众号后回复”Java“即可领取 Java 基础、进阶、项目和架构师等免费学习资料,更有数据库、分布式、微服务等热门技术学习视频,内容丰富,兼顾原理和实践,另外也将赠送作者原创的 Java 学习指南、Java 程序员面试指南等干货资源)

正文完
 0