关于java:JDK源码分析AbstractQueuedSynchronizer上

27次阅读

共计 5990 个字符,预计需要花费 15 分钟才能阅读完成。

一. Java 并发编程的基石

AQS是 Java 并发编程的根底,Java 类库提供的并发工具如 Semaphore, CountDownLatch, CyclicBarrier, ReentrantLock, ReadWriteLock 等等都是建设在 AQS 上的,依照 Doug Lea 的说法,AQS是一个并发根底框架,用户通过继承 AQS 并覆写 tryAcquire()tryRelease()来表白他所心愿的信号量管制形式,其余的细节则齐全由这个并发框架实现。
实践总是拗口的,所以还是间接来看细节好啦。

二. State 的含意

AQS中外围的字段天然是 private volatile int state,在用户层面表白管制state 的形式就是覆写 tryAcquiretryRelease办法。
最天然的想法是将 state 看做残余信号量,那么只有在残余信号量为正的状况下 tryAcquire 能力胜利,大多数并发工具都是这么操作的,比方 Semaphore,也有为了实现非凡的性能而在残余量为 0 的时候tryAcquire 能力胜利,比方 CountDownLatch
也就是说 state 的含意由具体所要实现的性能紧紧关联在一起,用户须要覆写 tryAcquiretryRelease来表白他所心愿进行的 state 管制。

三. 结构一个不可重入锁

我想来想去,AQS的解说着实不适宜自底向上,所以还是用从顶向下的形式解说好了。
Java 类库中的 Semaphore 对于 state 的解释很纯正,就是将它当做残余信号量,tryAcquire示意获取信号量,tryRelease示意开释信号量,这也合乎大家学操作系统课程时所接管的信号量 PV 概念。然而难堪的是,它应用的是 AQSShare模式,这对于初学者的了解切实是不太敌对。
而应用 Exclusive 模式的 ReentrantLock 中解决可重入的代码又太多,覆盖了最外围的代码。所以我决定还是本人结构一个简略的不可重入锁好了,这样的锁天然是不适用于理论业务的,不过很适宜解说:-)
自定义锁命名为 GLock,为了代码简洁也不实现Lock 接口了,不然又要覆写一大堆没用的办法,咱们怎么简洁怎么来。
Java 类库的并发工具应用 AQS 都是通过在外部持有一个继承 AQS 的子类来实现的,这个子类的命名约定俗成都是Sync,咱们也入乡随俗

public class GLock {
    private final Sync sync;
    public GLock() {
        // 设置总信号量为 1
        sync = new Sync(1);
    }
    
    /**
     * 获取锁
     */
    public void lock() {
        // 获取一个信号量
        sync.acquire(1);
    }

    /**
     * 开释锁
     */
    public void unlock() {
        // 开释一个信号量
        sync.release(1);
    }

    static class Sync extends AbstractQueuedSynchronizer {Sync(int permits) {setState(permits);
        }
        
        @Override
        protected boolean tryAcquire(int acquires) {int c = getState();
            if (c == 0)
                return false;

            return compareAndSetState(c, c - acquires);
        }

        @Override
        protected boolean tryRelease(int releases) {setState(getState() + releases);
            return true;
        }
    }
}

GLock的总信号量在初始化时设置为 1,并且应用 Exclusive 独占模式操作 AQS
GLock 完满地展现了信号量 PV 的概念——将 AQSstate字段作为残余信号量来对待,tryAcquire- 获取信号量,tryRelease- 开释信号量。

四. 进入 Acquire

假如业务逻辑调用了 GLock.lock() 办法,那么大家都晓得要么是胜利了,能够执行后续的业务逻辑,要么就是被阻塞住了,直到期待一段时间别的线程开释了锁,至于期待多久就天晓得了。
咋看之下,GLock.lock()办法中调用的是 sync.acquire(1),咱们写的tryAcquire 办法用在哪了?咱们来看一下 AQSacquire源码

public final void acquire(int acquires) {if (!tryAcquire(acquires) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), acquires)) 
    {selfInterrupt();
    }
}

是在 acquire 里回调了 tryAcquire 办法。
acquire办法中,存在以下状况

  1. tryAcquire胜利了,那么不须要执行 acquireQueuedselfInterupt(),间接返回,示意胜利获取了信号量,对于 GLock 也即意味着第一次获取锁就胜利了
  2. 如果 tryAcquire 失败了,这种状况须要进入 AQS 中的期待队列,咱们须要先在队列中占个地位 ,这也是addWaiter 办法要做的事件

五. AQS 的期待队列

FIFO队列是 AQS 的外围,其实这是一个双向链表,链表中的结点由 Node 类型示意

static final class Node {
    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
}

AQS 中蕴含 volatile Node headvolatile Node tail两个字段别离指向表头和表尾
后面说到第一次 tryAcquire 没有胜利时,须要进入期待队列,addWaiter办法的作用就是在表尾插入结点,参数 mode 有两种取值——Shared共享模式和 Exclusive 独占模式,这里应用的是独占模式

private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);
    enq(node);
    return node;
}
private Node enq(final Node node) {for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

能够看到,当队列为空时,必须先在队列头部插入一个空白结点,也就是说,只有队列被第一次应用,后续所有工夫,队列的头部肯定不为空
咱们假如此时 GLock 的信号量曾经被线程 X 给占用了,而当初有三个线程 T1, T2, T3 都尝试获取信号量,并且三个线程都刚刚完结调用 enq() 办法,那么此时 AQS 的期待队列看起来像是这样

请问在队列中插入结点后线程是立即休眠呢还是有别的操作?咱们持续看 acquireQueued 办法。
咱们来看下 acquireQueued 源码,简洁起见我删除了局部不影响解说的代码

final boolean acquireQueued(final Node node, int acquires) {
    boolean interrupted = false;
    for (; ;) {final Node p = node.predecessor();
        if (p == head && tryAcquire(acquires)) {setHead(node);
            p.next = null; // help GC
            failed = false;
            return interrupted;
        }
        if (shouldParkAfterFailedAcquire(p, node) &&
            parkAndCheckInterrupt())
        {interrupted = true;}
    }
}

咱们发现 acquireQueued() 整体是一个有限循环,不停地去尝试获取信号量,直到获取胜利才 return
然而获取信号量并不是在队列中的任何线程都能够进行的,咱们要特地留神这句代码 if (p == head && tryAcquire()),只有在以后线程所处结点是整个队列第二个结点时(第一个是头结点,它是个哑结点)才可能尝试获取信号量,这阐明, 任意时刻,队列中只有第一个线程有资格获取信号量
在咱们的例子中就是 T1 才有资格获取信号量,假如以后占用信号量的线程 X 的业务逻辑比较复杂,执行工夫较长,那么线程 T1 在执行第二次 tryAcquire() 时仍然没有获取到信号量(第一次是在 acquire() 中),就会进入到 shouldParkAfterFailedAcquire() 办法

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
        
        pred.next = node;
    } else {compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

这个 pred 前置结点就是头结点,依据结点中 waitStatus 值的不同,这个办法有不同的逻辑,上面咱们来看看 waitStatus 不同值的含意

六. 队列结点中的 waitStatus

waitStatus有如下取值

命名 取值 含意
CANCELLED 1 结点中的线程曾经勾销期待
SIGNAL -1 后继结点要求持有信号量的线程开释信号量时唤醒它
CONDITION -2 这个值只在 Condition 中用到,咱们临时不关怀
PROPAGATE -3 共享模式下,开释信号量的线程要求让期待队列中的线程尽可能地在进入休眠前取得信号量

CONDITION状态咱们不必管,在信号量 PV 操作中不会用到这个状态,所以咱们要钻研的就是默认初始化状态 0, CANCELLED, SIGNALPROPAGATE,进一步地,以后从易于读者了解的角度思考,应用的是AQS 独占模式,PROPAGATE临时也不须要思考
咱们以后在 GLock 中须要思考的有 0, CANCELLEDSIGNAL 三种结点状态。
还有很重要的一点是,头结点的 waitStatus 永远不会是 CANCELLED!,所以咱们最终须要考查的只有 0 和 SIGNAL 两种状态.


桥豆麻袋!你说头结点不会是 CANCELLED,那么为什么会有这样的代码?

if (ws > 0) {
    do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
    pred.next = node;
}

这是因为队列中的线程能够随时退出期待,所以当发现前驱结点的 waitStatusCANCELLED时,这阐明搞不好本人有机会成为队列中第二个结点哦(走狗屎运了,头结点到本节点之间的所有线程全副放弃了),所以始终往前找,直到找到第一个没有退出的结点(可能是头结点,也可能是其余结点,毕竟大家都是正经人,谁会随随便便让他人踩狗屎运呢)
用图谈话就是这样


请读者再翻到上一节去看 shouldParkAfterFailedAcquire() 办法的源码

  1. 假如头结点的 waitStatus 曾经是 SIGNAL 了,那么就会返回 true,对于acquireQueued() 办法中的 shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() 这一行代码而言,就会继续执行 parkAndCheckInterrupt() 办法
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);
    return Thread.interrupted();}

LockSupport.park()办法会让线程休眠,直到被其余线程唤醒

  1. 如果前驱结点曾经退出,则找到第一个没有退出的结点,而后返回false
  2. 前两种状况都不是,那么头结点的 waitStatus 肯定是 0 或者 PROPAGATE(后面说过CONDITION 值不会用在队列中),这两种值的解决办法都一样,将头结点的 waitStatus 设置为SIGNAL,示意本线程心愿取得信号量的那位兄弟在开释信号量时唤醒本人,因为本线程极有可能要进入休眠了

大家能够发现,除了第一种状况会让线程立即休眠外,其余的状况都会导致线程的执行流程又回到 acquireQueued() 中有限循环的结尾
对于状况 2,如果所有前驱线程都退出了,那么以后线程就有资格调用 tryAcquire,如果此时信号量被线程X 开释了,那么就能取得锁了;如果依然获取失败,则再次进入 shouldParkAfterFailedAcquire() 办法,将头结点状态置为SIGNAL,而后跟状况 3 一样,在休眠前执行最初一次tryAcquire

对于状况 3,会最初一次调用 tryAcquire,如果还是无奈失去信号量,就跟状况 1 一样,进入shouldParkAfterFailedAcquire() 间接返回 true,而后间接进入parkAndCheckInterrupt() 办法中进入休眠,期待唤醒

七. 多余的话

尽管咱们当初是以 AQS 的独占模式为主题进行剖析的,然而我还是想顺带提一下无关共享模式的货色。
上一节的状况 3,头结点的 waitStatus 为 0 和 PROPAGATE 时都不会立即休眠,而是会再多尝试一次 tryAcquire,些微不同的是,创立结点时waitStatus 主动初始化为 0,waitStatus不会本人扭转,PROPAGATE须要共享模式下的其余线程来被动设置
其实这就是我之前对 PROPAGATE 含意的解释:共享模式下,开释信号量的线程要求让期待队列中的线程尽可能地在进入休眠前取得信号量
PROPAGATE 这个值所要表白的语义我集体参详了很久很久,尽管源码正文里论述了作者心愿这个值所要实现的指标,然而十分难以了解,通过重复浏览源码我明确了 PROPAGATE 真正的含意——给线程多一次机会尝试 tryAcquire
其实默认状态 0 也实现了同样的性能,但我认为 PROPAGATE 是共享模式中表白心愿升高队列线程进入休眠几率的一种显式语义

八. 线程的呐喊,终于逃离了休眠的天堂

咳咳,题目起过头了。这一节咱们来剖析,在 acquireQueued() 中调用 tryAcquire 时胜利获取了信号量时的后续逻辑。不论是什么状况,取得信号量的后续逻辑都是一样的。

明天先写到这,啃鸡爪去了

正文完
 0