关于程序员:并发专栏队列同步器-AQS-以及-Reentrantlock-应用

1次阅读

共计 23394 个字符,预计需要花费 59 分钟才能阅读完成。

队列同步器 AQS 以及 Reentrantlock 利用

Java 中的大部分同步类都是基于 AbstractQueuedSynchronizer(简称为 AQS)实现的。

ReentrantLockReentrantReadWriteLockSemaphore(信号量)CountDownLatch偏心锁 非偏心锁

ThreadPoolExecutor 都和 AQS 有间接关系,所以理解 AQS 的形象实现,并在此基础上联合上述各类的实现细节,很快就能够把 JUC 一网打尽,不至于查看源码时一头雾水,失落主线。

是什么

AQS 是 AbstractQueuedSynchronizer 的简称,翻译成中文就是 形象队列同步器,这三个单词离开来看:

  • Abstract(形象):也就是说,AQS 是一个抽象类,只实现一些次要的逻辑,有些办法推延到子类实现
  • Queued(队列):队列有啥特色呢?先进先出(FIFO)对吧?也就是说,AQS 是用先进先出队列来存储数据的
  • Synchronizer(同步器):即 AQS 是 实现同步性能的

以上概括一下,AQS 是一个用来构建锁和同步器的框架,应用 AQS 能简略而又高效地结构出同步器。

程序员么,看原理类内容,不看源码“下饭”,就不叫深刻学习,所以,适当关上下源码联合着看,很容易了解了。

一、框架结构

首先,咱们通过上面的架构图来整体理解一下 AQS 框架

  • 上图中有色彩的为 Method,无色彩的为 Attribution。
  • 总的来说,AQS 框架共分为五层,自上而下由浅入深,从 AQS 对外裸露的 API 到底层根底数据。
  • 当有自定义同步器接入时,只需重写第一层所须要的局部办法即可,不须要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先通过第一层的 API 进入 AQS 外部办法,而后通过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的期待队列解决,而这些解决形式均依赖于第五层的根底数据提供层。

第一层

如果你当初关上 IDE,你会发现咱们常常应用的 ReentrantLockReentrantReadWriteLockSemaphoreCountDownLatch,都是【聚合】了一个【队列同步器】的子类实现线程访问控制的,也就是咱们说的第一层,API 层。

为什么要聚合一个同步器的子类呢,这其实就是一个典型的模板办法模式的长处:

  • 咱们应用的锁是面向使用者的,它定义了使用者与锁交互的接口,暗藏了实现的细节,咱们就像范式那样间接应用就能够了,很简略
  • 而同步器面向的是锁的实现,比方 Doug Lea 大神,或者咱们业务自定义的同步器,它简化了锁的实现形式,屏蔽了同步状态治理,线程排队,期待 / 唤醒等底层操作

这能够让咱们应用起来更加不便,因为咱们绝大多数都是在应用锁,实现锁之后,其外围就是要使用方便。

同步器的设计是就基于模板办法模式的,如果须要自定义同步器个别的形式是这样(模板办法模式很经典的一个利用):

  1. 使用者继承 AbstractQueuedSynchronizer 并重写指定的办法。(这些重写办法很简略,无非是对于共享资源 state 的获取和开释)
  2. 将 AQS 组合在自定义同步组件的实现中,并调用其模板办法,而这些模板办法又会调用使用者重写的办法。

这和咱们以往通过实现接口的形式有很大区别,这是模板办法模式很经典的一个使用,上面简略的给大家介绍一下模板办法模式,模板办法模式是一个很容易了解的设计模式之一。

模板办法模式是基于”继承“的,次要是为了在不扭转模板构造的前提下在子类中从新定义模板中的内容以实现复用代码。

模板办法模式,都有两类办法,子类可重写的办法和模板类提供的模板办法,那 AQS 中必定也有这两类办法,其实就是咱们说的第一层 API 层中的所有办法,咱们来看看

  1. 哪些是自定义同步器可重写的办法?
  2. 哪些是形象同步器提供的模版办法?
同步器可重写的办法

同步器提供的可重写办法只有 5 个,这大大不便了锁的使用者:

按理说,须要重写的办法也应该有 abstract 来润饰的,为什么这里没有?起因其实很简略,下面的办法我曾经用色彩辨别成了两类:

  • 独占式:一个工夫点只能执行一个线程
  • 共享式:一个工夫点可多个线程同时执行

表格办法形容中所说的 同步状态 就是上文提到的有 volatile 润饰的 state,所以咱们在 重写 下面几个办法时,还能够通过同步器提供的上面三个办法(AQS 提供的)来获取或批改同步状态:

而独占式和共享式操作 state 变量的区别也就很简略了,咱们能够通过批改 State 字段示意的同步状态来实现多线程的独占模式和共享模式(加锁过程)

略微具体点步骤如下:

同步器提供的模版办法

下面咱们将同步器的实现办法分为独占式和共享式两类,模版办法其实除了提供以上两类模版办法之外,只是多了 响应中断 超时限度 的模版办法供 Lock 应用,来看一下

先不必记上述办法的性能,目前你只须要理解个大略性能就好。另外,置信你也留神到了:

下面的办法都有 final 关键字润饰,阐明子类不能重写这个办法

看到这你兴许有点乱了,咱们略微演绎一下:

程序员还是看代码心里虚浮一点,咱们再来用代码阐明一下下面的关系(留神代码中的正文,以下的代码并不是很谨严,只是为了简略阐明上图的代码实现):

/**
 * 自定义互斥锁
 */
public class MyMutex implements Lock {

    // 动态外部类 - 自定义同步器
    private static class MySync extends AbstractQueuedSynchronizer{
        @Override
        protected boolean tryAcquire(int arg) {
            // 调用 AQS 提供的办法,通过 CAS 保障原子性
            if (compareAndSetState(0, arg)){
                // 咱们实现的是互斥锁,所以标记获取到同步状态(更新 state 胜利)的线程,// 次要为了判断是否可重入(一会儿会阐明)setExclusiveOwnerThread(Thread.currentThread());
                // 获取同步状态胜利,返回 true
                return true;
            }
            // 获取同步状态失败,返回 false
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            // 未领有锁却让开释,会抛出 IMSE
            if (getState() == 0){throw new IllegalMonitorStateException();
            }
            // 能够开释,清空排它线程标记
            setExclusiveOwnerThread(null);
            // 设置同步状态为 0,示意开释锁
            setState(0);
            return true;
        }

        // 是否独占式持有
        @Override
        protected boolean isHeldExclusively() {return getState() == 1;
        }

        // 后续会用到,次要用于期待 / 告诉机制,每个 condition 都有一个与之对应的条件期待队列,在锁模型中阐明过
        Condition newCondition() {return new ConditionObject();
        }
    }

    // 聚合自定义同步器
    private final MySync sync = new MySync();


    @Override
    public void lock() {
        // 阻塞式的获取锁,调用同步器模版办法独占式,获取同步状态
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        // 调用同步器模版办法可中断式获取同步状态
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        // 调用本人重写的办法,非阻塞式的获取同步状态
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        // 调用同步器模版办法,可响应中断和超时工夫限度
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        // 开释锁
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        // 应用自定义的条件
        return sync.newCondition();}
}

再关上 IDE,看看 ReentrantLock ReentrantReadWriteLock Semaphore(信号量) CountDownLatch 的实现,会发现,他们都是依照这个构造实现的,是不感觉会了一个,剩下的几个常见的也差不多了。

接着咱们就来看一看 AQS 的模版办法到底是怎么实现锁的

二、AQS 实现剖析 | 原理

AQS 核心思想是,如果被申请的共享资源闲暇,那么就将以后申请资源的线程设置为无效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就须要肯定的阻塞期待唤醒机制来保障锁调配。这个机制次要用的是 CLH 队列的变体实现的,将临时获取不到锁的线程退出到队列中。

CLH:Craig、Landin and Hagersten 队列,是单向链表,AQS 中的队列是 CLH 变体的虚构双向队列(FIFO),AQS 是通过将每条申请共享资源的线程封装成一个节点来实现锁的调配。

次要原理图如下:

AQS 应用一个 volatile 的 int 类型的成员变量来示意同步状态,通过内置的 FIFO 队列来实现资源获取的排队工作,通过 CAS 实现对 state 值的批改。

队列中每个排队的个体就是一个 Node,所以咱们来看一下 Node 的构造

AQS 数据结构

Node 节点

AQS 外部保护了一个同步队列,用于治理同步状态。

  • 当线程获取同步状态失败时,就会将以后线程以及期待状态等信息结构成一个 Node 节点,将其退出到同步队列中尾部,阻塞该线程
  • 当同步状态被开释时,会唤醒同步队列中“首节点”的线程获取同步状态

为了将上述步骤弄清楚,咱们须要来看一看 Node 构造(如果你能关上 IDE 一起看那是极好的)

解释一下几个办法和属性值的含意:

办法和属性值 含意
waitStatus 以后节点在队列中的状态
thread 示意处于该节点的线程
prev 前驱指针
predecessor 返回前驱节点,没有的话抛出 NullPointerException
nextWaiter 指向下一个处于 CONDITION 状态的节点(因为本篇文章不讲述 Condition Queue 队列,这个指针不多介绍)
next 后继指针

线程两种锁的模式:

模式 含意
SHARED 示意线程以共享的模式期待锁
EXCLUSIVE 示意线程正在以独占的形式期待锁

waitStatus 有上面几个枚举值:

枚举 含意
0 当一个 Node 被初始化的时候的默认值
CANCELLED 为 1,示意线程获取锁的申请曾经勾销了
CONDITION 为 -2,示意节点在期待队列中,节点线程期待唤醒
PROPAGATE 为 -3,以后线程处在 SHARED 状况下,该字段才会应用
SIGNAL 为 -1,示意线程曾经筹备好了,就等资源开释了

乍一看有点芜杂,咱们还是将其归类阐明一下:

下面这几个状态阐明有个印象就好,有了 Node 的构造阐明铺垫,你也就能设想同步队列的根本构造了:

一般来说,自定义同步器要么是独占形式,要么是共享形式,它们也只需实现 tryAcquire-tryReleasetryAcquireShared-tryReleaseShared 中的一种即可。AQS 也反对自定义同步器同时实现独占和共享两种形式,如 ReentrantReadWriteLock。ReentrantLock 是独占锁,所以实现了 tryAcquire-tryRelease。

前置常识根本铺垫结束,咱们来看一看独占式获取同步状态的整个过程

独占式获取同步状态

故事要从范式 lock.lock() 开始,,或者能够联合着 ReentrantLock 来看,也能够(先不要在意偏心锁和非偏心锁,他们在底层是雷同的)

public void lock() {
    // 阻塞式的获取锁,调用同步器模版办法,获取同步状态
    sync.acquire(1);
}

进入 AQS 的模版办法 acquire()

public final void acquire(int arg) {
  // 调用自定义同步器重写的 tryAcquire 办法
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();}

首先,会通过 API 层实现的 tryAcquire() 办法,尝试非阻塞的获取同步状态,如果该办法返回了 True,则阐明以后线程获取锁胜利,如果获取失败(tryAcquire 返回 false),则会调用 addWaiter 办法结构 Node 节点(Node.EXCLUSIVE 独占式)并平安的(CAS)退出到同步队列【尾部】

private Node addWaiter(Node mode) {
      // 结构 Node 节点,蕴含以后线程信息以及节点模式【独占 / 共享】Node node = new Node(Thread.currentThread(), mode);
      // 新建变量 pred 将指针指向 tail 指向的节点
    Node pred = tail;
      // 如果尾节点不为空
    if (pred != null) {
          // 新退出的节点前驱节点指向尾节点
        node.prev = pred;

          // 因为如果多个线程同时获取同步状态失败都会执行这段代码
        // 所以,通过 CAS 形式确保安全的设置以后节点为最新的尾节点
        if (compareAndSetTail(pred, node)) {
              // 已经的尾节点的后继节点指向以后节点
            pred.next = node;
              // 返回新构建的节点
            return node;
        }
    }
      // 尾节点为空,阐明以后节点是第一个被退出到同步队列中的节点
      // 须要一个入队操作
    enq(node);
    return node;
}

// 如果 Pred 指针是 Null(阐明期待队列中没有元素),或者以后 Pred 指针和 Tail 指向的地位不同(阐明被别的线程曾经批改),就须要看一下 Enq 的办法
private Node enq(final Node node) {
      // 通过“死循环”确保节点被正确增加,最终将其设置为尾节点之后才会返回,这里应用 CAS 的理由和下面一样
    for (;;) {
        Node t = tail;
          // 第一次循环,如果尾节点为 null
        if (t == null) { // Must initialize
              // 构建一个哨兵节点,并将头部指针指向它
            if (compareAndSetHead(new Node()))
                  // 尾部指针同样指向哨兵节点
                tail = head;
        } else {
              // 第二次循环,将新节点的前驱节点指向 t
            node.prev = t;
              // 将新节点退出到队列尾节点
            if (compareAndSetTail(t, node)) {
                  // 前驱节点的后继节点指向以后新节点,实现双向队列
                t.next = node;
                return t;
            }
        }
    }
}

如果没有被初始化,须要进行初始化一个头结点进去(正文中的哨兵结点)。但请留神,初始化的头结点并不是以后线程节点,而是调用了无参构造函数的节点。如果经验了初始化或者并发导致队列中有元素,则与之前的办法雷同。其实,addWaiter 就是一个在双端链表增加尾节点的操作,须要留神的是,双端链表的头结点是一个无参构造函数的头结点。

总结一下,线程获取锁的时候,过程大体如下:

  1. 当没有线程获取到锁时,线程 1 获取锁胜利。
  2. 线程 2 申请锁,然而锁被线程 1 占有。
  3. 如果再有线程要获取锁,顺次在队列中往后排队即可。

上边解释了 addWaiter 办法,这个办法其实就是把对应的线程以 Node 的数据结构模式退出到双端队列里,返回的是一个蕴含该线程的 Node。而这个 Node 会作为参数,进入到 acquireQueued 办法中。acquireQueued 办法能够对排队中的线程进行“获锁”操作。

总的来说,一个线程获取锁失败了,被放入期待队列,acquireQueued 会把放入队列中的线程一直去获取锁,直到获取胜利或者不再须要获取(中断)。

上面咱们从“何时出队列?”和“如何出队列?”两个方向来剖析一下 acquireQueued 源码:

final boolean acquireQueued(final Node node, int arg) {
    // 标记是否胜利拿到资源
    boolean failed = true;
    try {
        // 标记期待过程中是否中断过
        boolean interrupted = false;
          // "死循环",自旋,要么获取锁,要么中断
        for (;;) {
              // 获取以后节点的前驱节点
            final Node p = node.predecessor();
              // 如果 p 是头结点,阐明以后节点在实在数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)// 这就是为什么有个空的头结点
            if (p == head && tryAcquire(arg)) {
                  // 获取锁胜利,头指针挪动到以后 node
                setHead(node);
                  // 将哨兵节点的后继节点置为空,不便 GC
                p.next = null; // help GC
                failed = false;
                  // 返回中断标识
                return interrupted;
            }
              // 以后节点的前驱节点不是头节点
              //【或者】以后节点的前驱节点是头节点但获取同步状态失败
            // 阐明 p 为头节点且以后没有获取到锁(可能是非偏心锁被抢占了)或者是 p 不为头结点,这个时候就要判断以后 node 是否要被阻塞(被阻塞条件:前驱节点的 waitStatus 为 -1),避免有限循环浪费资源。具体两个办法上面细细剖析
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {if (failed)
            cancelAcquire(node);
    }
}

注:setHead 办法是把以后节点置为虚节点,但并没有批改 waitStatus,因为它是始终须要用的数据。

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

获取同步状态胜利会返回能够了解了,然而如果失败就会始终陷入到“死循环”中浪费资源吗?很显然不是,shouldParkAfterFailedAcquire(p, node)parkAndCheckInterrupt() 就会将线程获取同步状态失败的线程挂起,咱们持续向下看

// 靠前驱节点判断以后线程是否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
      // 获取前驱节点的状态
    int ws = pred.waitStatus;
      // 如果是 SIGNAL 状态,即期待被占用的资源开释,间接返回 true
      // 筹备持续调用 parkAndCheckInterrupt 办法
    // 阐明头结点处于唤醒状态
    if (ws == Node.SIGNAL)
        return true;
      // ws 大于 0 阐明是 CANCELLED 状态,勾销状态
     /*
     * 如果前驱放弃了,那就始终往前找,直到找到最近一个失常期待的状态,并排在它的后边。* 留神:那些放弃的结点,因为被本人“加塞”到它们前边,它们相当于造成一个无援用链,稍后就会被保安大叔赶走了(GC 回收)!*/
    if (ws > 0) {
        // 循环判断前驱节点的前驱节点是否也为 CANCELLED 状态,疏忽该状态的节点,从新连贯队列
        do {
            // 循环向前查找勾销节点,把勾销节点从队列中剔除
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
          // 将以后节点的前驱节点设置为设置为 SIGNAL 状态,用于后续唤醒操作
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

到这里你兴许有个问题:

这个中央设置前驱节点为 SIGNAL 状态到底有什么作用?

保留这个问题,咱们陆续揭晓

如果前驱节点的 waitStatus 是 SIGNAL 状态,即 shouldParkAfterFailedAcquire 办法会返回 true,程序会持续向下执行 parkAndCheckInterrupt 办法,parkAndCheckInterrupt 次要用于挂起以后线程,阻塞调用栈,返回以后线程的中断状态

private final boolean parkAndCheckInterrupt() {// 线程挂起,程序不会持续向下执行 调用 park()使线程进入 waiting 状态
    LockSupport.park(this);
      // 依据 park 办法 API 形容,程序在下述三种状况会持续向下执行
      //     1. 被 unpark 
      //     2. 被中断(interrupt)
      //     3. 其余不合逻辑的返回才会持续向下执行
      
      // 因上述三种状况程序执行至此,返回以后线程的中断状态,并清空中断状态
      // 如果因为被中断,该办法会返回 true
    return Thread.interrupted();}

上述办法的流程图如下:

从上图能够看出,跳出以后循环的条件是当“前置节点是头结点,且以后线程获取锁胜利”。为了避免因死循环导致 CPU 资源被节约,咱们会判断前置节点的状态来决定是否要将以后线程挂起,具体挂起流程用流程图示意如下(shouldParkAfterFailedAcquire 流程):

被唤醒的程序会继续执行 acquireQueued 办法里的循环,如果获取同步状态胜利,则会返回 interrupted = true 的后果

程序持续向调用栈下层返回,最终回到 AQS 的模版办法 acquire

public final void acquire(int arg) {if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();}

你兴许会有纳闷:

程序曾经胜利获取到同步状态并返回了,怎么会有个自我中断呢?

static void selfInterrupt() {Thread.currentThread().interrupt();}

到这里对于获取同步状态咱们还脱漏了一条线,acquireQueued 的 finally 代码块如果你认真看你兴许马上就会有纳闷:

到底什么状况才会执行 if(failed) 外面的代码?

if (failed)
  cancelAcquire(node);

这段代码被执行的条件是 failed 为 true,失常状况下,如果跳出循环,failed 的值为 false,如果不能跳出循环貌似怎么也不能执行到这里,所以只有不失常的状况才会执行到这里,也就是会产生异样,才会执行到此处

查看 try 代码块,只有两个办法会抛出异样:

  • node.processor() 办法
  • 本人重写的 tryAcquire() 办法

先看前者:

很显然,这里抛出的异样不是重点,那就以 ReentrantLock 重写的 tryAcquire() 办法为例

另外,下面剖析 shouldParkAfterFailedAcquire 办法还对 CANCELLED 的状态进行了判断,那么

什么时候会生成勾销状态的节点呢?

acquireQueued 办法中的 finally 代码:

// java.util.concurrent.locks.AbstractQueuedSynchronizer

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true; // 标记是否胜利拿到资源
    try {
    ...
        for (;;) {final Node p = node.predecessor(); // 拿到前驱
            // 如果前驱是 head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大开释完资源唤醒本人的,当然也可能被 interrupt 了)。if (p == head && tryAcquire(arg)) {
                ...
                failed = false;
        ...
            }
            ...
    } finally {if (failed)
            cancelAcquire(node);
        }
}

答案就在 cancelAcquire 办法中,咱们来看看 cancelAcquire 到底怎么设置 / 解决 CANNELLED 的

private void cancelAcquire(Node node) {
       // 疏忽有效节点
       if (node == null)
           return;
       // 将关联的线程信息清空
       node.thread = null;

       // 跳过同样是勾销状态的前驱节点
       Node pred = node.prev;
       while (pred.waitStatus > 0)
           node.prev = pred = pred.prev;

       // 跳出下面循环后找到前驱无效节点,并获取该无效节点的后继节点
       Node predNext = pred.next;

       // 将以后节点的状态置为 CANCELLED
       node.waitStatus = Node.CANCELLED;

       // 如果以后节点处在尾节点,间接从队列中删除本人就好
    // 更新失败的话,则进入 else,如果更新胜利,将 tail 的后继节点设置为 null
       if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);
       } else {
           int ws;
             // 1. 如果以后节点的无效前驱节点不是头节点,也就是说以后节点不是头节点的后继节点
           if (pred != head &&
               // 2. 判断以后节点无效前驱节点的状态是否为 SIGNAL
               ((ws = pred.waitStatus) == Node.SIGNAL ||
                // 3. 如果不是,尝试将前驱节点的状态置为 SIGNAL
                (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
               // 判断以后节点无效前驱节点的线程信息是否为空
               pred.thread != null) {
                 // 上述条件满足
               Node next = node.next;
                 // 将以后节点无效前驱节点的后继节点指针指向以后节点的后继节点
               if (next != null && next.waitStatus <= 0)
                   compareAndSetNext(pred, predNext, next);
           } else {
                 // 如果以后节点的前驱节点是头节点,或者上述其余条件不满足,就唤醒以后节点的后继节点
               unparkSuccessor(node);
           }
                    
           node.next = node; // help GC
       }

看到这个正文你可能有些乱了,其外围目标就是从期待队列中移除 CANCELLED 的节点,并从新拼接整个队列,

以后的流程:

  • 获取以后节点的前驱节点,如果前驱节点的状态是 CANCELLED,那就始终往前遍历,找到第一个 waitStatus <= 0 的节点,将找到的 Pred 节点和以后 Node 关联,将以后 Node 设置为 CANCELLED。
  • 依据以后节点的地位,思考以下三种状况:

    (1) 以后节点是尾节点。

    (2) 以后节点是 Head 的后继节点。

    (3) 以后节点不是 Head 的后继节点,也不是尾节点。

根据上述第二条,咱们来剖析每一种状况的流程。

以后节点是尾节点。

以后节点是 Head 的后继节点。

以后节点不是 Head 的后继节点,也不是尾节点。

通过下面的流程,咱们对于 CANCELLED 节点状态的产生和变动曾经有了大抵的理解,然而为什么所有的变动都是对 Next 指针进行了操作,而没有对 Prev 指针进行操作呢?什么状况下会对 Prev 指针进行操作?

执行 cancelAcquire 的时候,以后节点的前置节点可能曾经从队列中进来了(曾经执行过 Try 代码块中的 shouldParkAfterFailedAcquire 办法了),如果此时批改 Prev 指针,有可能会导致 Prev 指向另一个曾经移除队列的 Node,因而这块变动 Prev 指针不平安。shouldParkAfterFailedAcquire 办法中,会执行上面的代码,其实就是在解决 Prev 指针。shouldParkAfterFailedAcquire 是获取锁失败的状况下才会执行,进入该办法后,阐明共享资源已被获取,以后节点之前的节点都不会呈现变动,因而这个时候变更 Prev 指针比拟平安。

do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);

至此,获取同步状态的过程就完结了,咱们简略的用流程图阐明一下整个过程

获取锁的过程就这样的完结了,先暂停几分钟整顿一下本人的思路。咱们下面还没有阐明 SIGNAL 的作用,SIGNAL 状态信号到底是干什么用的?这就波及到锁的开释了,咱们来持续理解,整体思路和锁的获取是一样的,然而开释过程就绝对简略很多了

独占式开释同步状态

故事要从 unlock() 办法说起

public void unlock() {
    // 开释锁
    sync.release(1);
}

调用 AQS 模版办法 release,进入该办法

public final boolean release(int arg) {
      // 调用自定义同步器重写的 tryRelease 办法尝试开释同步状态
    if (tryRelease(arg)) {
          // 开释胜利,获取头节点
        Node h = head;
          // 存在头节点,并且 waitStatus 不是初始状态
          // 通过获取的过程咱们曾经剖析了,在获取的过程中会将 waitStatus 的值从初始状态更新成 SIGNAL 状态
        if (h != null && h.waitStatus != 0)
              // 解除线程挂起状态
            unparkSuccessor(h);
        return true;
    }
    return false;
}

查看 unparkSuccessor 办法,理论是要唤醒头节点的后继节点

private void unparkSuccessor(Node node) {      
      // 获取头节点的 waitStatus
    int ws = node.waitStatus;
    if (ws < 0)
          // 清空头节点的 waitStatus 值,即置为 0
        compareAndSetWaitStatus(node, ws, 0);
  
      // 获取头节点的后继节点
    Node s = node.next;
      // 判断以后节点的后继节点是否是勾销状态,如果是,须要移除,从新连贯队列
    if (s == null || s.waitStatus > 0) {
        s = null;
          // 从尾节点向前查找,找到队列第一个 waitStatus 状态小于 0 的节点
        for (Node t = tail; t != null && t != node; t = t.prev)
              // 如果是独占式,这里小于 0,其实就是 SIGNAL
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
          // 解除线程挂起状态
        LockSupport.unpark(s.thread);
}

有同学可能有疑难:

为什么这个中央是从队列尾部向前查找不是 CANCELLED 的节点?

起因有两个:

第一,先回看节点退出队列的情景:

private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

节点入队并不是原子操作,代码第 6、7 行

node.prev = pred; 
compareAndSetTail(pred, node)

这两个中央能够看作是尾节点入队的原子操作,如果此时代码还没执行到 pred.next = node; 这时又凑巧执行了 unparkSuccessor 办法,就没方法从前往后找了,因为后继指针还没有连接起来,所以须要从后往前找

第二点起因,在下面图解产生 CANCELLED 状态节点的时候,先断开的是 Next 指针,Prev 指针并未断开,因而这也是必须要从后往前遍历才可能遍历齐全部的 Node

同步状态至此就曾经胜利开释了,之前获取同步状态被挂起的线程就会被唤醒,持续从上面代码第 3 行返回执行:

private final boolean parkAndCheckInterrupt() {LockSupport.park(this);
    return Thread.interrupted();}

持续返回下层调用栈,从上面代码 15 行开始执行,从新执行循环,再次尝试获取同步状态

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {if (failed)
            cancelAcquire(node);
    }
}

到这里,对于独占式获取 / 开释锁的流程曾经闭环了,然而对于 AQS 的另外两个模版办法还没有介绍

  • 响应中断
  • 超时限度

独占式响应中断获取同步状态

故事要从 lock.lockInterruptibly() 办法说起

public void lockInterruptibly() throws InterruptedException {
    // 调用同步器模版办法可中断式获取同步状态
    sync.acquireInterruptibly(1);
}

有了后面的了解,了解独占式可响应中断的获取同步状态形式,真是一眼就能明确了:

public final void acquireInterruptibly(int arg)
        throws InterruptedException {if (Thread.interrupted())
        throw new InterruptedException();
      // 尝试非阻塞式获取同步状态失败,如果没有获取到同步状态,执行代码 7 行
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

持续查看 doAcquireInterruptibly 办法:

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {for (;;) {final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                  // 获取中断信号后,不再返回 interrupted = true 的值,而是间接抛出 InterruptedException 
                throw new InterruptedException();}
    } finally {if (failed)
            cancelAcquire(node);
    }
}

没想到 JDK 外部也有如此相近的代码,可响应中断获取锁没什么深奥的,就是被中断抛出 InterruptedException 异样(代码第 17 行),这样就逐层返回下层调用栈捕捉该异样进行下一步操作了

趁热打铁,来看看另外一个模版办法:

独占式超时限度获取同步状态

这个很好了解,就是给定一个时限,在该时间段内获取到同步状态,就返回 true,否则,返回 false。好比线程给本人定了一个闹钟,闹铃一响,线程就本人返回了,这就不会使本人是阻塞状态了

既然波及到超时限度,其外围逻辑必定是计算工夫距离,因为在超时工夫内,必定是屡次尝试获取锁的,每次获取锁必定有工夫耗费,所以计算工夫距离的逻辑就像咱们在程序打印程序耗时 log 那么简略

nanosTimeout = deadline – System.nanoTime()

故事要从 lock.tryLock(time, unit) 办法说起

public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    // 调用同步器模版办法,可响应中断和超时工夫限度
    return sync.tryAcquireNanos(1, unit.toNanos(time));
}

来看 tryAcquireNanos 办法

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

是不是和下面 acquireInterruptibly 办法长相很具体了,持续查看来 doAcquireNanos 办法,看程序, 该办法也是 throws InterruptedException,咱们在中断文章中说过,办法标记上有 throws InterruptedException 阐明该办法也是能够响应中断的,所以你能够了解超时限度是 acquireInterruptibly 办法的加强版,具备超时和非阻塞管制的双保险

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
      // 超时工夫内,为获取到同步状态,间接返回 false
    if (nanosTimeout <= 0L)
        return false;
      // 计算超时截止工夫
    final long deadline = System.nanoTime() + nanosTimeout;
      // 以独占形式退出到同步队列中
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {for (;;) {final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
              // 计算新的超时工夫
            nanosTimeout = deadline - System.nanoTime();
              // 如果超时,间接返回 false
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                    // 判断是最新超时工夫是否大于阈值 1000    
                nanosTimeout > spinForTimeoutThreshold)
                  // 挂起线程 nanosTimeout 长时间,工夫到,主动返回
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();}
    } finally {if (failed)
            cancelAcquire(node);
    }
}

下面的办法应该不是很难懂,然而又同学可能在第 27 行上有所困惑

为什么 nanosTimeout 和 自旋超时阈值 1000 进行比拟?

/**
 * The number of nanoseconds for which it is faster to spin
 * rather than to use timed park. A rough estimate suffices
 * to improve responsiveness with very short timeouts.
 */
static final long spinForTimeoutThreshold = 1000L;

其实 doc 说的很分明,说白了,1000 nanoseconds 工夫曾经十分十分短暂了,没必要再执行挂起和唤醒操作了,不如间接以后线程间接进入下一次循环

到这里,咱们自定义的 MyMutex 只差 Condition 没有阐明了,不晓得你累了吗?我还在保持

Condition

下面曾经介绍了 AQS 所提供的外围性能,当然它还有很多其余的个性,这里咱们来持续说下 Condition 这个组件。Condition 是在 Java 1.5 中才呈现的,它用来代替传统的 Objectwait()notify() 实现线程间的合作,相比应用 Objectwait()notify(),应用 Condition 中的 await()signal() 这种形式实现线程间合作更加平安和高效。因而通常来说比拟举荐应用 Condition

其中 AbstractQueueSynchronizer 中实现了 Condition 中的办法,次要对外提供 awaite(Object.wait())signal(Object.notify()) 调用。

如果过后你了解了这个模型,再看 Condition 的实现,基本就不是问题了,首先 Condition 还是一个接口,必定也是须要有实现类的

那故事就从 lock.newCondition 说起吧

public Condition newCondition() {
    // 应用自定义的条件
    return sync.newCondition();}

自定义同步器重封装了该办法:

Condition newCondition() {return new ConditionObject();
}

ConditionObject 就是 Condition 的实现类,该类就定义在了 AQS 中,只有两个成员变量:

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

所以,咱们只须要来看一下 ConditionObject 实现的 await / signal 办法来应用这两个成员变量就能够了

public final void await() throws InterruptedException {if (Thread.interrupted())
        throw new InterruptedException();
      // 同样构建 Node 节点,并退出到期待队列中
    Node node = addConditionWaiter();
      // 开释同步状态
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
          // 挂起以后线程
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

这里留神用词,在介绍获取同步状态时,addWaiter 是退出到【同步队列】,就是上图说的入口期待队列,这里说的是【期待队列】,所以 addConditionWaiter 必定是构建了一个本人的队列:

private Node addConditionWaiter() {
    Node t = lastWaiter;
    
    if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();
        t = lastWaiter;
    }
      // 新构建的节点的 waitStatus 是 CONDITION,留神不是 0 或 SIGNAL 了
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
      // 构建单向同步队列
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

这里有敌人可能会有疑难:

为什么这里是单向队列,也没有应用 CAS 来保障退出队列的安全性呢?

因为 await 是 Lock 范式 try 中应用的,阐明曾经获取到锁了,所以就没必要应用 CAS 了,至于是单向,因为这里还不波及到竞争锁,只是做一个条件期待队列

在 Lock 中能够定义多个条件,每个条件都会对应一个 条件期待队列,所以将上图丰盛阐明一下就变成了这个样子:

线程曾经按相应的条件退出到了条件期待队列中,那如何再尝试获取锁呢?signal / signalAll 办法就曾经排上用场了

public final void signal() {if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

Signal 办法通过调用 doSignal 办法,只唤醒条件期待队列中的第一个节点

private void doSignal(Node first) {
    do {if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
          // 调用该办法,将条件期待队列的线程节点挪动到同步队列中
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

持续看 transferForSignal 办法

final boolean transferForSignal(Node node) {if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

       // 从新进行入队操作
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
          // 唤醒同步队列中该线程
        LockSupport.unpark(node.thread);
    return true;
}

所以咱们再用图解一下唤醒的整个过程

到这里,了解 signalAll 就非常简单了,只不过循环判断是否还有 nextWaiter,如果有就像 signal 操作一样,将其从条件期待队列中移到同步队列中

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

不知你还是否记得,我在并发编程之期待告诉机制 中还说过一句话

没有非凡起因尽量用 signalAll 办法

什么时候能够用 signal 办法也在其中做了阐明,请大家自行查看吧

LockSupport

从下面我能够看到,当须要阻塞或者唤醒一个线程的时候,AQS 都是应用 LockSupport 这个工具类来实现的。

LockSupport 是用来创立锁和其余同步类的根本线程阻塞原语

每个应用 LockSupport 的线程都会与一个许可关联,如果该许可可用,并且可在过程中应用,则调用 park()将会立刻返回,否则可能阻塞。如果许可尚不可用,则能够调用 unpark 使其可用。然而留神许可不可重入,也就是说只能调用一次 park()办法,否则会始终阻塞。

LockSupport 定义了一系列以 park 结尾的办法来阻塞以后线程,unpark(Thread thread)办法来唤醒一个被阻塞的线程。如下:

park(Object blocker) 办法的 blocker 参数,次要是用来标识以后线程在期待的对象,该对象次要用于问题排查和系统监控。

park 办法和 unpark(Thread thread)都是成对呈现的,同时 unpark 必须要在 park 执行之后执行,当然并不是说没有不调用 unpark 线程就会始终阻塞,park 有一个办法,它带了工夫戳(parkNanos(long nanos):为了线程调度禁用以后线程,最多期待指定的等待时间,除非许可可用)。

park() 办法的源码如下:

public static void park() {UNSAFE.park(false, 0L);
}

unpark(Thread thread)办法源码如下:

public static void unpark(Thread thread) {if (thread != null)
        UNSAFE.unpark(thread);
}

从下面能够看出,其外部的实现都是通过 UNSAFE(sun.misc.Unsafe UNSAFE)来实现的,其定义如下:

public native void park(boolean var1, long var2);
public native void unpark(Object var1);

两个都是 native 本地办法。Unsafe 是一个比拟危险的类,次要是用于执行低级别、不平安的办法汇合。只管这个类和所有的办法都是公开的(public),然而这个类的应用依然受限,你无奈在本人的 java 程序中间接应用该类,因为只有授信的代码能力取得该类的实例。

这里我还要多说一个细节,从条件期待队列移到同步队列是有时间差的,所以应用 await() 办法也是范式的,同样在该文章中做了解释

有时间差,就会有偏心和不偏心的问题,想要全面理解这个问题,咱们就要走近 ReentrantLock 中来看了,除了理解偏心 / 不偏心问题,查看 ReentrantLock 的利用还是要反过来验证它应用的 AQS 的,咱们持续吧

三、ReentrantLock 是如何利用的 AQS

独占式的典型利用就是 ReentrantLock 了,咱们来看看它是如何重写这个办法的

乍一看挺奇怪的,怎么外面自定义了三个同步器:其实 NonfairSync,FairSync 只是对 Sync 做了进一步划分:

从名称上你应该也晓得了,这就是你听到过的 偏心锁 / 非偏心锁

何为偏心锁 / 非偏心锁?

生存中,排队讲求先来后到视为偏心。程序中的公平性也是合乎申请锁的相对工夫的,其实就是 FIFO,否则视为不偏心

咱们来比照一下 ReentrantLock 是如何实现偏心锁和非偏心锁的

其实没什么大不了,偏心锁就是判断同步队列是否还有先驱节点的存在,只有没有先驱节点能力获取锁;而非偏心锁是不论这个事的,能获取到同步状态就能够,就这么简略,那问题来了:

为什么会有偏心锁 / 非偏心锁的设计?

思考这个问题,咱们需从新回顾下面的锁获取实现图了,其实下面我曾经走漏了一点

次要有两点起因:

起因一:

复原挂起的线程到真正锁的获取还是有时间差的,从人类的角度来看这个工夫微不足道,然而从 CPU 的角度来看,这个时间差存在的还是很显著的。所以非偏心锁能更充沛的利用 CPU 的工夫片,尽量减少 CPU 闲暇状态工夫

起因二:

不知你是否还记得我在 面试问,创立多少个线程适合?文章中重复提到过,应用多线程很重要的考量点是线程切换的开销,设想一下,如果采纳非偏心锁,当一个线程申请锁获取同步状态,而后开释同步状态,因为不须要思考是否还有前驱节点,所以刚开释锁的线程在此刻再次获取同步状态的几率就变得十分大,所以就缩小了线程的开销

置信到这里,你也就明确了,为什么 ReentrantLock 默认结构器用的是非偏心锁同步器

public ReentrantLock() {sync = new NonfairSync();
}

看到这里,感觉非偏心锁 perfect,非也,有得必有失

应用非偏心锁会有什么问题?

偏心锁保障了排队的公平性,非偏心锁霸气的漠视这个规定,所以就有可能导致排队的长时间在排队,也没有机会获取到锁,这就是传说中的 “饥饿”

如何抉择偏心锁 / 非偏心锁?

置信到这里,答案曾经在你心中了,如果为了更高的吞吐量,很显然非偏心锁是比拟适合的,因为节俭很多线程切换工夫,吞吐量天然就下来了,否则那就用偏心锁还大家一个偏心

咱们还差最初一个环节,真的要挺住

可重入锁

到这里,咱们还没剖析 ReentrantLock 的名字,JDK 起名这么有考究,必定有其含意,直译过去【可重入锁】

为什么要反对锁的重入?

试想,如果是一个有 synchronized 润饰的递归调用办法,程序第二次进入被本人阻塞了岂不是很大的笑话,所以 synchronized 是反对锁的重入的

Lock 是新轮子,天然也要反对这个性能,其实现也很简略,请查看偏心锁和非偏心锁比照图,其中有一段代码:

// 判断以后线程是否和已占用锁的线程是同一个
else if (current == getExclusiveOwnerThread())

认真看代码,你兴许发现,我后面的一个阐明是谬误的,我要从新解释一下

重入的线程会始终将 state + 1,开释锁会 state – 1 直至等于 0,下面这样写也是想帮忙大家疾速的辨别

总结

本文是一个长文,阐明了为什么要造 Lock 新轮子,如何规范的应用 Lock,AQS 是什么,是如何实现锁的,联合 ReentrantLock 反推 AQS 中的一些利用以及其独有的一些个性

独占式获取锁就这样介绍完了,咱们还差 AQS 共享式 xxxShared 没有剖析,联合共享式,接下来咱们来浏览一下 Semaphore,ReentrantReadWriteLock 和 CountLatch 等

另外也欢送大家的留言,如有谬误之处还请指出,我的手酸了,眼睛干了,我去筹备撸下一篇…..

灵魂诘问

  1. 为什么更改 state 有 setState() , compareAndSetState() 两种形式,感觉后者更平安,然而锁的眼帘中有好多中央都应用了 setState(),平安吗?
  2. 上面代码是一个转账程序,是否存在死锁或者锁的其余问题呢?

    class Account {
      private int balance;
      private final Lock lock
              = new ReentrantLock();
      // 转账
      void transfer(Account tar, int amt){while (true) {if(this.lock.tryLock()) {
            try {if (tar.lock.tryLock()) {
                try {
                  this.balance -= amt;
                  tar.balance += amt;
                } finally {tar.lock.unlock();
                }
              }//if
            } finally {this.lock.unlock();
            }
          }//if
        }//while
      }//transfer
    }

AQS 核心思想是,如果被申请的共享资源闲暇,则将以后申请资源的线程设置为无效的工作线程,并且将共享资源设置为锁定状态。如果被申请的共享资源被占用,那么就须要一套线程阻塞期待以及被唤醒时锁调配的机制,这个机制 AQS 是用 CLH 队列锁实现的,行将临时获取不到锁的线程退出到队列中。

参考与起源

  1. Java 并发实战
  2. Java 并发编程的艺术
  3. https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html
  4. https://github.com/Snailclimb/JavaGuide/blob/master/docs/java/Multithread/AQS.md
  5. https://www.javadoop.com/post/AbstractQueuedSynchronizer-2
  6. https://www.cnblogs.com/waterystone/p/4920797.html
  7. https://www.cnblogs.com/chengxiao/archive/2017/07/24/7141160….
  8. https://dayarch.top/p/java-aqs-and-reentrantlock.html
  9. https://ifeve.com/java%E5%B9%B6%E5%8F%91%E4%B9%8Baqs%E8%AF%A6…
  10. https://www.cnblogs.com/liqiangchn/p/11960944.html

本文由 mdnice 多平台公布

正文完
 0