乐趣区

关于aqs:转载浅谈Java的AQS

所谓 AQS,指的是 AbstractQueuedSynchronizer,它提供了一种实现阻塞锁和一系列依赖 FIFO 期待队列的同步器的框架,ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier 等并发类均是基于 AQS 来实现的,具体用法是通过继承 AQS 实现其模板办法,而后将子类作为同步组件的外部类。

理解一个框架最好的形式是读源码,说干就干。

AQS 是 JDK1.5 之后才呈现的,由赫赫有名的 Doug Lea 李大爷来操刀设计并开发实现,全副源代码(加正文)2315 行,整体难度中等。

* @since 1.5
* @author Doug Lea

根本框架

在浏览源码前,首先论述 AQS 的根本思维及其相干概念。

AQS 根本框架如下图所示:

AQS 保护了一个 volatile 语义 (反对多线程下的可见性) 的共享资源变量 state 和一个 FIFO 线程期待队列(多线程竞争 state 被阻塞时会进入此队列)。

State

首先说一下共享资源变量 state,它是 int 数据类型的,其拜访形式有 3 种:

  • getState()
  • setState(int newState)
  • compareAndSetState(int expect, int update)

上述 3 种形式均是原子操作,其中 compareAndSetState()的实现依赖于 Unsafe 的 compareAndSwapInt()办法。

private volatile int state;

// 具备内存读可见性语义
protected final int getState() {return state;}

// 具备内存写可见性语义
protected final void setState(int newState) {state = newState;}

// 具备内存读 / 写可见性语义
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

资源的共享形式分为 2 种:

  • 独占式(Exclusive)

只有单个线程可能胜利获取资源并执行,如 ReentrantLock。

  • 共享式(Shared)

多个线程可胜利获取资源并执行,如 Semaphore/CountDownLatch 等。

AQS 将大部分的同步逻辑均曾经实现好,继承的自定义同步器只须要实现 state 的获取 (acquire) 和开释 (release) 的逻辑代码就能够,次要包含上面办法:

  • tryAcquire(int):独占形式。尝试获取资源,胜利则返回 true,失败则返回 false。
  • tryRelease(int):独占形式。尝试开释资源,胜利则返回 true,失败则返回 false。
  • tryAcquireShared(int):共享形式。尝试获取资源。正数示意失败;0 示意胜利,但没有残余可用资源;负数示意胜利,且有残余资源。
  • tryReleaseShared(int):共享形式。尝试开释资源,如果开释后容许唤醒后续期待结点返回 true,否则返回 false。
  • isHeldExclusively():该线程是否正在独占资源。只有用到 condition 才须要去实现它。

AQS 须要子类复写的办法均没有申明为 abstract,目标是防止子类须要强制性覆写多个办法,因为个别自定义同步器要么是独占办法,要么是共享方法,只需实现 tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared 中的一种即可。

当然,AQS 也反对子类同时实现独占和共享两种模式,如 ReentrantReadWriteLock。

CLH 队列(FIFO)

AQS 是通过外部类 Node 来实现 FIFO 队列的,源代码解析如下:

static final class Node {
    
    // 表明节点在共享模式下期待的标记
    static final Node SHARED = new Node();
    // 表明节点在独占模式下期待的标记
    static final Node EXCLUSIVE = null;

    // 表征期待线程已勾销的
    static final int CANCELLED =  1;
    // 表征须要唤醒后续线程
    static final int SIGNAL    = -1;
    // 表征线程正在期待触发条件(condition)
    static final int CONDITION = -2;
    // 表征下一个 acquireShared 应无条件流传
    static final int PROPAGATE = -3;

    /**
     *   SIGNAL: 以后节点开释 state 或者勾销后,将告诉后续节点竞争 state。*   CANCELLED: 线程因 timeout 和 interrupt 而放弃竞争 state,以后节点将与 state 彻底拜拜
     *   CONDITION: 表征以后节点处于条件队列中,它将不能用作同步队列节点,直到其 waitStatus 被重置为 0
     *   PROPAGATE: 表征下一个 acquireShared 应无条件流传
     *   0: None of the above
     */
    volatile int waitStatus;
    
    // 前继节点
    volatile Node prev;
    // 后继节点
    volatile Node next;
    // 持有的线程
    volatile Thread thread;
    // 链接下一个期待条件触发的节点
    Node nextWaiter;

    // 返回节点是否处于 Shared 状态下
    final boolean isShared() {return nextWaiter == SHARED;}

    // 返回前继节点
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    
    // Shared 模式下的 Node 构造函数
    Node() {}

    // 用于 addWaiter
    Node(Thread thread, Node mode) {  
        this.nextWaiter = mode;
        this.thread = thread;
    }
    
    // 用于 Condition
    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

能够看到,waitStatus 非负的时候,表征不可用,负数代表处于期待状态,所以 waitStatus 只须要查看其正负符号即可,不必太多关注特定值。

获取资源(独占模式)

acquire(int)

首先解说独占模式 (Exclusive) 下的获取 / 开释资源过程,其入口办法为:

public final void acquire(int arg) {if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();}

tryAcquire(arg)为线程获取资源的办法函数,在 AQS 中定义如下:

protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}

很显著,该办法是空办法,且由 protected 润饰,阐明该办法须要由子类即自定义同步器来实现。

acquire()办法至多执行一次 tryAcquire(arg),若返回 true,则 acquire 间接返回,否则进入 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)办法。

acquireQueued 办法分为 3 个步骤:

  1. addWriter()将以后线程退出到期待队列的尾部,并标记为独占模式;
  2. acquireQueued()使线程在期待队列中获取资源,直到获取到资源返回,若整个期待过程被中断过,则返回 True,否则返回 False。
  3. 如果线程在期待过程中被中断过,则先标记上,待获取到资源后再进行自我中断 selfInterrupt(),将中断响应掉。

上面具体看看过程中波及到的各函数:

tryAcquire(int)

tryAcquire 尝试以独占的模式获取资源,如果获取胜利则返回 True,否则间接返回 False,默认实现是抛出 UnsupportedOperationException,具体实现由自定义扩大了 AQS 的同步器来实现。

addWaiter(Node)

addWaiter 为以后线程以指定模式创立节点,并将其增加到期待队列的尾部,其源码为:

private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);
    // 尝试将节点疾速插入期待队列,若失败则执行惯例插入(enq 办法)
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 惯例插入
    enq(node);
    return node;
}

再看 enq(node)办法:

private Node enq(final Node node) {for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

能够看到,惯例插入与疾速插入相比,有 2 点不同:

  1. 惯例插入是自旋过程(for(;;)),可能保障节点插入胜利;
  2. 比疾速插入多蕴含了 1 种状况,即以后期待队列为空时,须要初始化队列,行将待插入节点设置为头结点,同时为尾节点(因为只有一个嘛)。

惯例插入与疾速插入均依赖于 CAS,其实现依赖于 unsafe 类,具体代码如下:


private final boolean compareAndSetHead(Node update) {return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

private final boolean compareAndSetTail(Node expect, Node update) {return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

unsafe 中的 cas 操作均是 native 办法,由计算机 CPU 的 cmpxchg 指令来保障其原子性。

接着看 acquireQueued()办法:

acquireQueued(Node, int)

相干阐明已在代码中正文:

final boolean acquireQueued(final Node node, int arg) {
    // 标识是否获取资源失败
    boolean failed = true;
    try {
        // 标识以后线程是否被中断过
        boolean interrupted = false;
        // 自旋操作
        for (;;) {
            // 获取以后节点的前继节点
            final Node p = node.predecessor();
            // 如果前继节点为头结点,阐明排队马上排到本人了,能够尝试获取资源,若获取资源胜利,则执行下述操作
            if (p == head && tryAcquire(arg)) {
                // 将以后节点设置为头结点
                setHead(node);
                // 阐明前继节点曾经开释掉资源了,将其 next 置空,以不便虚拟机回收掉该前继节点
                p.next = null; // help GC
                // 标识获取资源胜利
                failed = false;
                // 返回中断标记
                return interrupted;
            }
            // 若前继节点不是头结点,或者获取资源失败,// 则须要通过 shouldParkAfterFailedAcquire 函数
            // 判断是否须要阻塞该节点持有的线程
            // 若 shouldParkAfterFailedAcquire 函数返回 true,// 则继续执行 parkAndCheckInterrupt()函数,// 将该线程阻塞并查看是否能够被中断,若返回 true,则将 interrupted 标记置于 true
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 最终获取资源失败,则以后节点放弃获取资源
        if (failed)
            cancelAcquire(node);
    }
}

具体看一下 shouldParkAfterFailedAcquire 函数:

// shouldParkAfterFailedAcquire 是通过前继节点的 waitStatus 值来判断是否阻塞以后节点的线程的
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 获取前继节点的 waitStatus 值 ws
    int ws = pred.waitStatus;
    // 如果 ws 的值为 Node.SIGNAL(-1),则间接返回 true
    // 阐明前继节点实现资源的开释或者中断后,会告诉以后节点的,回家等告诉就好了,不必自旋频繁地来打听音讯
    if (ws == Node.SIGNAL)
        return true;
    // 如果前继节点的 ws 值大于 0, 即为 1, 阐明前继节点处于放弃状态(Cancelled)
    // 那就持续往前遍历,直到以后节点的前继节点的 ws 值为 0 或正数
    // 此处代码很要害,节点往前挪动就是通过这里来实现的,直到节点的前继节点满足
    // if (p == head && tryAcquire(arg))条件,acquireQueued 办法才可能跳出自旋过程
    if (ws > 0) {
        do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 将前继节点的 ws 值设置为 Node.SIGNAL,以保障下次自旋时,shouldParkAfterFailedAcquire 间接返回 true
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

parkAndCheckInterrupt()函数则简略很多,次要调用 LockSupport 类的 park()办法阻塞以后线程,并返回线程是否被中断过。

private final boolean parkAndCheckInterrupt() {LockSupport.park(this);
    return Thread.interrupted();}

至此,独占模式下,线程获取资源 acquire 的代码就跟完了,总结一下过程:

  1. 首先线程通过 tryAcquire(arg)尝试获取共享资源,若获取胜利则间接返回,若不胜利,则将该线程以独占模式增加到期待队列尾部,tryAcquire(arg)由继承 AQS 的自定义同步器来具体实现;
  2. 以后线程退出期待队列后,会通过 acquireQueued 办法基于 CAS 自旋一直尝试获取资源,直至获取到资源;
  3. 若在自旋过程中,线程被中断过,acquireQueued 办法会标记此次中断,并返回 true。
  4. 若 acquireQueued 办法获取到资源后,返回 true,则执行线程自我中断操作 selfInterrupt()。
static void selfInterrupt() {Thread.currentThread().interrupt();}

开释资源(独占模式)

讲完获取资源,对应的讲一下 AQS 的开释资源过程,其入口函数为:

public final boolean release(int arg) {if (tryRelease(arg)) {
        // 获取到期待队列的头结点 h
        Node h = head;
        // 若头结点不为空且其 ws 值非 0,则唤醒 h 的后继节点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

逻辑并不简单,通过 tryRelease(arg)来开释资源,和 tryAcquire 相似,tryRelease 也是有继承 AQS 的自定义同步器来具体实现

tryRelease(int)

该办法尝试开释指定量的资源。

protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();
}

unparkSuccessor(Node)

该办法次要用于唤醒期待队列中的下一个阻塞线程。

private void unparkSuccessor(Node node) {
    // 获取以后节点的 ws 值
    int ws = node.waitStatus;
    // 将以后节点的 ws 值置 0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    // 若后继节点为 null 或者其 ws 值大于 0(放弃状态),则从期待队列的尾节点从后往前搜寻,// 搜寻到期待队列中最靠前的 ws 值非正且非 null 的节点
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果后继节点非 null,则唤醒该后继节点持有的线程
    if (s != null)
        LockSupport.unpark(s.thread);
}

后继节点的阻塞线程被唤醒后,就进入到 acquireQueued()的 if (p == head && tryAcquire(arg))的判断中,此时被唤醒的线程将尝试获取资源。

当然,如果被唤醒的线程所在节点的前继节点不是头结点,通过 shouldParkAfterFailedAcquire 的调整,也会挪动到期待队列的后面,直到其前继节点为头结点。

解说完独占模式下资源的 acquire/release 过程,上面开始解说共享模式下,线程如何实现资源的获取和共享。

获取资源(共享模式)

了解了独占模式下,资源的获取和开释过程,则共享模式下也就 so easy 了,首先看一下办法入口:

public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

执行 tryAcquireShared 办法获取资源,若获取胜利则间接返回,若失败,则进入期待队列,执行自旋获取资源,具体由 doAcquireShared 办法来实现。

tryAcquireShared(int)

同样的,tryAcquireShared(int)由继承 AQS 的自定义同步器来具体实现。

protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();
}

其返回值为负值代表失败;0 代表获取胜利,但无残余资源;正值代表获取胜利且有残余资源,其余线程可去获取。

doAcquireShared(int)

private void doAcquireShared(int arg) {
    // 将线程以共享模式增加到期待队列的尾部
    final Node node = addWaiter(Node.SHARED);
    // 初始化失败标记
    boolean failed = true;
    try {
        // 初始化线程中断标记
        boolean interrupted = false;
        for (;;) {
            // 获取以后节点的前继节点
            final Node p = node.predecessor();
            // 若前继节点为头结点,则执行 tryAcquireShared 获取资源
            if (p == head) {int r = tryAcquireShared(arg);
                // 若获取资源胜利,且有残余资源,将本人设为头结点并唤醒后续的阻塞线程
                if (r >= 0) {setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    // 如果中断标记位为真,则线程执行自我了断
                    if (interrupted)
                        selfInterrupt();
                    // 表征获取资源胜利
                    failed = false;
                    return;
                }
            }
            // houldParkAfterFailedAcquire(p, node)依据前继节点判断是否阻塞以后节点的线程
            // parkAndCheckInterrupt()阻塞以后线程并查看线程是否被中断过,若被中断过,将 interrupted 置为 true
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {if (failed)
            // 放弃获取资源
            cancelAcquire(node);
    }
}

能够发现,doAcquireShared 与独占模式下的 acquireQueued 大同小异,次要有 2 点不同:

  1. doAcquireShared 将线程的自我中断操作放在了办法体外部;
  2. 当线程获取到资源后,doAcquireShared 会将以后线程所在的节点设为头结点,若资源有残余则唤醒后续节点,比 acquireQueued 多了个唤醒后续节点的操作。

上述办法体现了共享的实质,即以后线程吃饱了后,若资源有残余,会招呼前面排队的来一起吃,好货色要大家一起分享嘛,哈哈。

上面具体看一下 setHeadAndPropagate(Node, int)函数:

private void setHeadAndPropagate(Node node, int propagate) {
    // 记录原来的头结点,上面过程会用到
    Node h = head; 
    // 设置新的头结点
    setHead(node);
    
    // 如果资源还有残余,则唤醒后继节点
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();}
}

能够看到,理论执行唤醒后继节点的办法是 doReleaseShared(),持续追踪:

private void doReleaseShared() {
    // 自旋操作
    for (;;) {
        // 获取期待队列的头结点
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue; 
                // 唤醒后继节点的线程
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

开释资源(共享模式)

首先进入到办法入口:

public final boolean releaseShared(int arg) {
    // 尝试开释资源
    if (tryReleaseShared(arg)) {
        // 唤醒后继节点的线程
        doReleaseShared();
        return true;
    }
    return false;
}

同样的,tryReleaseShared(int)由继承 AQS 的自定义同步器来具体实现。

doReleaseShared()上节解说 setHeadAndPropagate 已阐明过,不再赘述。

至此,共享模式下的资源获取 / 开释就解说完了,上面以一个具体场景来概括一下:

整个获取 / 开释资源的过程是通过流传实现的,如最开始有 10 个资源,线程 A、B、C 别离须要 5、4、3 个资源。

  • A 线程获取到 5 个资源,其发现资源还残余 5 个,则唤醒 B 线程;
  • B 线程获取到 4 个资源,其发现资源还残余 1 个,唤醒 C 线程;
  • C 线程尝试取 3 个资源,但发现只有 1 个资源,持续阻塞;
  • A 线程开释 1 个资源,其发现资源还残余 2 个,故唤醒 C 线程;
  • C 线程尝试取 3 个资源,但发现只有 2 个资源,持续阻塞;
  • B 线程开释 2 个资源,其发现资源还残余 4 个,唤醒 C 线程;
  • C 线程获取 3 个资源,其发现资源还剩 1 个,持续唤醒后续期待的 D 线程;
  • ……

总结

本文次要介绍了 AQS 在独占和共享两种模式下,如何进行资源的获取和开释 (tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared),须要留神的是,在 acquire() 和 acquireShared()办法中,线程在阻塞过程中均是疏忽中断的。

AQS 也能够通过 acquireInterruptibly()/acquireSharedInterruptibly()来反对线程在期待过程中响应中断。

篇幅无限,本文就解说到这里。对于 AQS 其余高级个性,感兴趣的读者可跟一下源码。

参考

https://www.cnblogs.com/iou123lg/p/9464385.html
https://www.cnblogs.com/waterystone/p/4920797.html
https://www.jianshu.com/p/0da2939391cf

作者:安中古天乐
链接:https://www.jianshu.com/p/0f8…
起源:简书
著作权归作者所有。商业转载请分割作者取得受权,非商业转载请注明出处。

退出移动版