乐趣区

关于javascript:AQS-源码流程分析

导读

咱们日常开发中,常常会碰到并发的场景,在 Java 中语言体系里,咱们会想到 ReentrantLock、CountDownLatch、Semaphore 等工具,但你是否分明它们外部的实现原理?这些工具都很相似,底层都是基于 AbstractQueuedSynchronizer(AQS)来实现的。明天咱们就来一起学习 AQS 外部原理。俗话说知己知彼屡战屡败,如果咱们理解了其中的原理,用该类工具开发就能够做到事倍功半。

文|夏福利 网易智企资深开发工程师

一、AQS 执行框架

下图是 AQS 大体执行框架:

通过下面这张图可能理解到 AQS 大略的执行过程。ReentrantLock、CountDownLatch、Semaphore 都是在这个流程上封装的。

拿 ReentrantLock 来说,ReentrantLock 是独占锁, 能够是偏心锁,也能够是非偏心锁, 通过这张图能够很好了解了。

ReentrantLock 是独占锁体现在同一时刻只有一个线程可能尝试获取资源胜利, 其余获取失败的都会退出阻塞队列的队尾进行排队。

偏心锁就是线程严格依照阻塞队列的排列程序获取资源,先到先得,不得插队。如下图所示:

而非偏心锁就可能存在插队的可能。 例如,如果下面头节点被唤醒,正筹备尝试获取资源,这时来了一个线程也尝试获取资源,有可能新来的线程获取资源胜利,而头结点获取资源失败。这就是非偏心锁。

在 ReentrantLock 源码中能够看到非偏心锁会尝试获取资源时, 不会思考阻塞队列是否为空, 如果可能获取资源胜利则间接占用了资源。获取失败才会退出阻塞队列。代码如下:

final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();    int c = getState();    if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);            return true;        }    }    else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;        if (nextc < 0) // overflow            throw new Error("Maximum lock count exceeded");        setState(nextc);        return true;    }    return false;}

而对于偏心锁尝试获取资源时,会判断阻塞队列是否为空(和非偏心锁要害差异所在),如下:

static final class FairSync extends Sync {    private static final long serialVersionUID = -3000897897090466540L;
    final void lock() {        acquire(1);    }
    /**     * Fair version of tryAcquire.  Don't grant access unless     * recursive call or no waiters or is first.     */    @ReservedStackAccess    protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();        int c = getState();        if (c == 0) {if (!hasQueuedPredecessors() &&                compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);                return true;            }        }        else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;            if (nextc < 0)                throw new Error("Maximum lock count exceeded");            setState(nextc);            return true;        }        return false;    }}

二、AQS 实现原理详解

AQS 从名字上就晓得是抽象类,通过模板办法定义了下面那张图的流程。对于“资源占用”和“资源开释”的定义,则是交给具体的子类去定义去实现。

对于共享锁,子类须要实现如下两个办法:

protected int tryAcquireShared(int arg);protected boolean tryReleaseShared(int arg);
  • tryAcquireShared:共享形式。arg 为获取锁的次数, 尝试获取资源;返回值为:

    正数示意失败;

    0 示意胜利,但没有残余可用资源;

    负数示意胜利,且有残余资源;

<!—->

  • tryReleaseShared:共享形式。arg 为开释锁的次数, 尝试开释资源,如果开释后容许唤醒后续期待结点返回 True,否则返回 False;

而对于独占锁,子类须要实现三个办法:

protected boolean tryAcquire(int arg)protected boolean tryRelease(int arg)protected boolean isHeldExclusively()
  • isHeldExclusively: 该线程是否正在独占资源。只有用到 Condition 才须要去实现它;
  • tryAcquire: 独占形式。arg 为获取锁的次数,尝试获取资源,胜利则返回 True,失败则返回 False;
  • tryRelease: 独占形式。arg 为开释锁的次数,尝试开释资源,胜利则返回 True,失败则返回 False;

获取资源

首先,咱们看一下独占锁获取资源的过程。

在 AQS 中,独占锁的获取资源的外围代码如下:

public final void acquire(int arg) {// 当 tryAcquire 返回 true 就阐明获取到锁了,间接完结。// 反之,返回 false 的话,就须要执行前面的办法。if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();}

如果子类的 tryAcquire 返回 true, 则示意获取锁胜利,间接完结。

只有子类的 tryAcquire 办法返回 false,那么就阐明获取锁失败,就须要将本人退出队列。

private Node addWaiter(Node mode) {// 创立一个独占类型的节点    Node node = new Node(Thread.currentThread(), mode);    // Try the fast path of enq; backup to full enq on failure    Node pred = tail;    // 如果 tail 节点不是 null,就将新节点的 pred 节点设置为 tail 节点。// 并且将新节点设置成 tail 节点。if (pred != null) {node.prev = pred;        if (compareAndSetTail(pred, node)) {pred.next = node;            return node;}    }    // 如果 tail 节点是  null,或者 CAS 设置 tail 失败。// 在 enq 办法中解决    enq(node);    return node;}

如果 tail 节点为 null, 或者 CAS 设置 tail 失败,则通过自旋的形式退出尾结点。

private Node enq(final Node node) {for (;;) {Node t = tail;        // 如果 tail 是 null,就创立一个虚构节点,同时指向 head 和 tail,称为 初始化。if (t == null) {// Must initialize            if (compareAndSetHead(new Node()))                tail = head;        } else {// 如果不是 null            // 和 上个办法逻辑一样,将新节点追加到 tail 节点前面,并更新队列的 tail 为新节点。// 只不过这里是死循环的,失败了还能够再来。node.prev = t;            if (compareAndSetTail(t, node)) {t.next = node;                return t;}        }    }}

enq 办法的逻辑是什么呢?当 tail 是 null(没有初始化队列),就须要初始化队列了。CAS 设置 tail 失败,也会走这里,须要在 enq 办法中循环设置 tail。直到胜利。

下面的过程用一张图示意如下:

将本人退出到阻塞队列后(留神 addWaiter 办法返回的是以后节点 ),执行 acquireQueued() 办法,将以后节点对应的线程挂起,源码如下:

// 这里返回的节点是新创建的节点,arg 是申请的数量 final boolean acquireQueued(final Node node, int arg) {boolean failed = true;    try {        boolean interrupted = false;        for (;;) {// 找上一个节点            final Node p = node.predecessor();            // 如果上一个节点是 head,就尝试获取锁            // 如果 获取胜利,就将以后节点设置为 head,留神 head 节点是永远不会唤醒的。if (p == head && tryAcquire(arg)) {setHead(node);                p.next = null; // help GC                failed = false;                return interrupted;            }            // 在获取锁失败后,就须要阻塞了。// shouldParkAfterFailedAcquire ---> 查看上一个节点的状态,如果是 SIGNAL 就阻塞,否则就改成 SIGNAL。if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    } finally {if (failed)            cancelAcquire(node);    }}

这个办法有两个逻辑:

  • 如何将本人挂起?
  • 被唤醒之后做什么?

先答复第二个问题:被唤醒之后做什么?

尝试拿锁,胜利之后,将本人设置为 head,断开和 next 的连贯。

再看第一个问题:如何将本人挂起?

具体逻辑在 shouldParkAfterFailedAcquire 办法中:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;    //  如果他的上一个节点的 ws 是 SIGNAL,他就须要阻塞。if (ws == Node.SIGNAL)        // 阻塞        return true;    // 后任被勾销。跳过后任并重试。if (ws > 0) {do {            // 将后任的后任 赋值给 以后的后任            node.prev = pred = pred.prev;} while (pred.waitStatus > 0);        // 将后任的后任的 next 赋值为 以后节点        pred.next = node;    } else {// 如果没有勾销 || 0 || CONDITION || PROPAGATE,那么就将后任的 ws 设置成 SIGNAL.        // 为什么必须是 SIGNAL 呢?// 答:心愿本人的上一个节点在开释锁的时候,告诉本人(让本人获取锁)compareAndSetWaitStatus(pred, ws, Node.SIGNAL);    }    // 重来    return false;}

该办法的次要逻辑就是将前置节点的状态批改成 SIGNAL,通知他:你开释锁的时候记得唤醒我。其中如果前置节点被勾销了,就跳过他。那么在前置节点开释锁的时候,必定会唤醒这个节点。

下面是独占锁获取资源过程,共享锁获取资源的过程相似,会有略微的不同,外围代码如下:

private void doAcquireShared(int arg) {// 将本人退出阻塞队列        final Node node = addWaiter(Node.SHARED);        boolean failed = true;        try {boolean interrupted = false;            for (;;) {// 将本人挂起前,尝试再次获取锁,如果获取胜利,则将本人设置为头结点,并告诉唤醒下一节点                final Node p = node.predecessor();                if (p == head) {int r = tryAcquireShared(arg);                    if (r >= 0) {// 这里是和独占锁有区别的中央。这里岂但会将本人设置为头结点,而且会唤醒下一个节点,通过这种形式将所有期待共享锁的节点唤醒                        setHeadAndPropagate(node, r);                        p.next = null; // help GC                        if (interrupted)                            selfInterrupt();                        failed = false;                        return;}                }                                // 获取锁失败,则挂起。同独占锁逻辑                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    interrupted = true;            }        } finally {if (failed)                cancelAcquire(node);        }    }

这个办法同样是蕴含两个逻辑:

  • 如何将本人挂起?
  • 被唤醒之后做什么?

如何将本人挂起和独占锁没有区别。 唤醒之后做什么是和独占锁区别的要害: 如果以后节点唤醒后获取到了锁后,会唤醒下一个节点。下一个节点唤醒后会持续唤醒下下一个节点,从而将所有期待共享锁的线程唤醒。外围代码如下:

private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check below        // 将本人设置为头结点        setHead(node);        if (propagate > 0 || h == null || h.waitStatus < 0 ||            (h = head) == null || h.waitStatus < 0) {Node s = node.next;            if (s == null || s.isShared())                // 唤醒下一个节点                doReleaseShared();}    }

开释资源

下面讲了获取资源的逻辑,那如何开释资源呢?

同样,还是先看一下独占锁的开释逻辑:

public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;        // 所有的节点在将本人挂起之前,都会将前置节点设置成 SIGNAL,心愿前置节点开释的时候,唤醒本人。// 如果前置节点是 0,阐明前置节点曾经开释过了。不能反复开释了,前面将会看到开释后会将 ws 批改成 0.        if (h != null && h.waitStatus != 0)            unparkSuccessor(h);        return true;    }    return false;}

从这个办法的判断就能够看出,head 必须不等于 0。为什么呢?下面获取资源过程中说到:当一个节点尝试挂起本人之前,都会将前置节点设置成 SIGNAL -1,就算是第一个退出队列的节点,在获取锁失败后,也会将初始化节点设置的 ws 设置成 SIGNAL。

而这个判断也是避免多线程反复开释,那么在开释锁之后,必定会将 ws 状态设置成 0。避免反复操作。代码如下:

private void unparkSuccessor(Node node) {int ws = node.waitStatus;    if (ws < 0)        // 将 head 节点的 ws 改成 0,革除信号。示意,他曾经开释过了。不能反复开释。compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;    // 如果 next 是 null,或者 next 被勾销了。就从 tail 开始向上找节点。if (s == null || s.waitStatus > 0) {s = null;        // 从尾部开始,向前寻找最靠前的那个未被勾销的节点        for (Node t = tail; t != null && t != node; t = t.prev)            if (t.waitStatus <= 0)                s = t;    }    // 唤醒这个节点。if (s != null)        LockSupport.unpark(s.thread);}

唤醒之后的逻辑是什么样子的还记得吗?在下面解说资源获取时有讲到:

线程唤醒后尝试拿锁,拿锁胜利则设置本人为 head,断开后任 head 和本人的连贯。

final boolean acquireQueued(final Node node, long arg) {boolean failed = true;    try {        boolean interrupted = false;        // 唤醒之后再次进行 for 循环,尝试获取锁,获取胜利则将本人设置为头结点        for (;;) {final Node p = node.predecessor();            if (p == head && tryAcquire(arg)) {setHead(node);                p.next = null; // help GC                failed = false;                return interrupted;            }            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    }

再来看一下共享锁的开释逻辑,代码如下:

public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();            return true;        }        return false;    }

doReleaseShared() 代码如下:

private void doReleaseShared() {        for (;;) {Node h = head;            if (h != null && h != tail) {int ws = h.waitStatus;                if (ws == Node.SIGNAL) {// 将 head 节点的 ws 改成 0,革除信号。示意,他曾经开释过了。不能反复开释。if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                        continue;            // loop to recheck cases                                            // 唤醒下一个节点                    unparkSuccessor(h);                }                else if (ws == 0 &&                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                    continue;                // loop on failed CAS            }            if (h == head)                   // loop if head changed                break;        }    }

同样,唤醒后做什么呢?

线程唤醒后尝试拿锁,拿锁胜利则设置本人为 head,断开后任 head 和本人的连贯,并唤醒下一个节点,代码如下:

private void doAcquireShared(long arg) {final Node node = addWaiter(Node.SHARED);    boolean failed = true;    try {boolean interrupted = false;        // 唤醒后再次 for 循环,尝试获取锁,获取锁胜利,则设置本人为头结点,// 并唤醒下一个结点,从而始终流传上来,将所有期待共享锁的线程唤醒        for (;;) {final Node p = node.predecessor();            if (p == head) {long r = tryAcquireShared(arg);                if (r >= 0) {setHeadAndPropagate(node, r);                    p.next = null; // help GC                    if (interrupted)                        selfInterrupt();                    failed = false;                    return;}            }            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    }}

为了证实共享锁唤醒时是一个接一个被唤醒,咱们用一个 demo 来验证下,示例代码如下:

CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread t1 = new Thread(() -> {            try {                TimeUnit.SECONDS.sleep(1);                countDownLatch.await();                System.out.println("线程 1 被唤醒了");            } catch (InterruptedException e) {e.printStackTrace();            }        });
        Thread t2 = new Thread(() -> {            try {                TimeUnit.SECONDS.sleep(2);                countDownLatch.await();                System.out.println("线程 2 被唤醒了");            } catch (InterruptedException e) {e.printStackTrace();            }        });
        Thread t3 = new Thread(() -> {            try {                TimeUnit.SECONDS.sleep(3);                countDownLatch.await();                System.out.println("线程 3 被唤醒了");            } catch (InterruptedException e) {e.printStackTrace();            }        });
        t1.start();        t2.start();        t3.start();
        TimeUnit.SECONDS.sleep(4);        countDownLatch.countDown();}

下面示例代码中,线程 1、2、3 按程序退出阻塞队列,当主线程调用 countDown() 时,此时会唤醒线程 1,线程 1 唤醒后会唤醒线程 2,线程 2 会唤醒线程 3。从运行后果能够看出的确如此:

 线程 1 被唤醒了
线程 2 被唤醒了
线程 3 被唤醒了 

从而证实了下面的推断。

三、总结

独占锁和共享锁在获取资源失败时,都会将本人退出阻塞队列的尾部,并将前一节点的 ws 设置为 SINGAL,通知他:开释锁的时候记得唤醒我。

独占锁和共享锁的不同之处在于: 节点被唤醒后,独占锁线程不会唤醒下一个节点 (要唤醒必须被动开释锁,比方应用 ReentrantLock 最初要调用 release() 办法被动开释锁)。

而对于共享锁来说,只有一个节点被唤醒了,那就会持续唤醒下一个节点,下一个节点又会去唤醒下下一个节点,从而将所有期待共享锁的线程唤醒。

作者介绍

夏福利,网易智企资深开发工程师,次要负责网易七鱼在线智能客服的研发

退出移动版