前言

AbstractQueuedSynchronizer,即队列同步器,通过继承AbstractQueuedSynchronizer并重写其办法能够实现锁或其它同步组件,本篇文章将对AbstractQueuedSynchronizer的应用和原理进行学习。

参考资料:《Java并发编程的艺术》

注释

一. AbstractQueuedSynchronizer的应用

AbstractQueuedSynchronizer的应用通常如下。

  • 创立AbstractQueuedSynchronizer的子类作为同步组件(例如ReentrantLockCountDownLatch等)的动态外部类并重写AbstractQueuedSynchronizer规定的可重写的办法;
  • 同步组件通过调用AbstractQueuedSynchronizer提供的模板办法来实现同步组件的同步性能。

先对AbstractQueuedSynchronizer的可重写的办法进行阐明。AbstractQueuedSynchronizer是基于模板设计模式来实现锁或同步组件的,AbstractQueuedSynchronizer外部保护着一个字段叫做state,该字段示意同步状态,是一个整型变量,AbstractQueuedSynchronizer规定了若干办法来操作state字段,但AbstractQueuedSynchronizer自身并没有对这些办法进行实现,而是要求AbstractQueuedSynchronizer的子类来实现这些办法,上面看一下这些办法的签名和正文。

办法签名正文
protected boolean tryAcquire(int arg)独占式地获取同步状态。该办法在独占式获取同步状态以前应该判断是否容许独占式获取,如果容许则尝试基于CAS形式来设置同步状态,设置胜利则示意获取同步状态胜利。
protected boolean tryRelease(int arg)独占式地开释同步状态。能够了解为将同步状态还原为获取前的状态。
protected int tryAcquireShared(int arg)共享式地获取同步状态。
protected boolean tryReleaseShared(int arg)共享式地开释同步状态。
protected boolean isHeldExclusively()判断以后线程是否独占以后队列同步器。

实际上,AbstractQueuedSynchronizer规定的这些可重写的办法,均会被AbstractQueuedSynchronizer提供的模板办法所调用,在基于AbstractQueuedSynchronizer实现同步组件时,可依据同步组件的理论性能来重写这些可重写办法,而后再通过调用模板办法来实现同步组件的性能。上面看一下AbstractQueuedSynchronizer提供的模板办法。

办法签名正文
public final void acquire(int arg)独占式获取同步状态,即独占式获取锁。获取胜利则该办法返回,获取失败则以后线程进入同步队列期待。
public final void acquireInterruptibly(int arg) throws InterruptedException独占式获取同步状态,并响应中断。即如果获取同步状态失败,则会进入同步队列期待,此时如果线程被中断,则会退出期待状态并抛出中断异样。
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedExceptionacquireInterruptibly(int arg),并在其根底上指定了等待时间,若超时还未获取同步状态则返回false
public final void acquireShared(int arg)共享式获取同步状态,即共享式获取锁。获取胜利则该办法返回,获取失败则以后线程进入同步队列期待,反对同一时刻多个线程获取到同步状态。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException共享式获取同步状态,并响应中断。
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedExceptionacquireSharedInterruptibly(int arg),并在其根底上指定了等待时间,若超时还未获取同步状态则返回false
public final boolean release(int arg)独占式地开释同步状态,即独占式地开释锁。胜利开释锁之后会将同步队列中的第一个节点的线程唤醒。
public final boolean releaseShared(int arg)共享式地开释同步状态。
public final Collection<Thread> getQueuedThreads()获取在同步队列上期待地线程。

上面联合《Java并发编程的艺术》中的例子的简化版,来直观的展现应用AbstractQueuedSynchronizer来实现同步组件的不便,如下所示。

public class MyLock {    private final Sync sync = new Sync();    public void lock() {        sync.acquire(1);    }    public void unlock() {        sync.release(1);    }    private static class Sync extends AbstractQueuedSynchronizer {        @Override        protected boolean tryAcquire(int unused) {            if (compareAndSetState(0, 1)) {                setExclusiveOwnerThread(Thread.currentThread());                return true;            }            return false;        }        @Override        protected boolean tryRelease(int unused) {            if (getState() == 0) {                throw new IllegalMonitorStateException();            }            setExclusiveOwnerThread(null);            setState(0);            return true;        }    }}

上述的MyLock是一个提供了最简略性能的不可重入锁,该锁的Lock()unlock()办法全副是调用的AbstractQueuedSynchronizer提供的模板办法acquire()release(),而后在acquire()release()这两个模板办法中又会调用Sync重写的tryAcquire()tryRelease()办法,最终的成果就是仅仅重写了tryAcquire()tryRelease()办法,便实现了一个具备最简略性能的不可重入锁,而同步的具体实现细节比方同步队列等全都由AbstractQueuedSynchronizer来实现,借助AbstractQueuedSynchronizer,能够极大升高实现一个同步组件的门槛。

二. AbstractQueuedSynchronizer的原理

AbstractQueuedSynchronizer中,保护了一个FIFO的同步队列,当某个线程获取同步状态失败时,该线程会被封装成一个节点Node并退出同步队列中。本大节将联合同步队列和AbstractQueuedSynchronizer源码对AbstractQueuedSynchronizer的原理进行学习。

AbstractQueuedSynchronizer中有两个字段headtail,别离指向同步队列头节点和尾节点,同步队列中的每个节点也有两个字段prevnext,别离指向上一节点和下一节。一个同步队列的根本构造如下所示。

当有新节点入队列时,新节点会退出到队列尾并成为新的尾节点,同时tail字段会指向新的尾节点。节点入队列如下所示。

通常,head字段指向的头节点示意以后胜利获取同步状态的线程所对应的节点,头节点开释同步状态后,会唤醒头节点的下一节点,当下一节点胜利获取同步状态后,会将head字段指向本人从而成为新的头节点,与此同时,新头节点的prev和老头节点的next字段会被置为null,以帮忙垃圾回收。头节点开释同步状态如下所示。

上述为同步队列的根本概览,上面将联合AbstractQueuedSynchronizer所提供的模板办法来对独占式获取开释同步状态共享式获取开释同步状态进行剖析。

1. 独占式获取同步状态

独占式获取同步状态的acquire()办法如下所示。

public final void acquire(int arg) {    if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();}

acquire()办法做了如下几件事件:

  • 先调用重写的tryAcquire()办法独占式地获取同步状态,获取胜利则返回,获取失败则执行下一步骤;
  • 调用addWaiter()办法基于以后线程创立一个节点Node并增加到同步队列中;
  • 调用acquireQueued()办法,使刚创立并退出了同步队列的Node进入自旋状态,在自旋状态中,Node会判断本人的上一节点是否是头节点,如果是则尝试获取同步状态,获取胜利则退出自旋状态,如果本人的上一节点不是头节点或者获取同步状态失败,则阻塞本人,直到被上一节点唤醒或者被中断,此时反复后面的自旋过程。

还是从源码动手,剖析创立节点Node,入同步队列和自旋判断并获取同步状态这几个过程。首先是addWaiter()办法,如下所示。

private Node addWaiter(Node mode) {    //基于以后线程创立一个节点    Node node = new Node(Thread.currentThread(), mode);    Node pred = tail;    if (pred != null) {        node.prev = pred;        //先尝试平安地将新创建的节点增加到同步队列尾        //如果增加胜利,则间接返回新创建的节点        if (compareAndSetTail(pred, node)) {            pred.next = node;            return node;        }    }    //如果新创建的节点是入同步队列的第一个节点,或者尝试增加失败    //则调用enq()办法将节点平安地增加到同步队列中    enq(node);    return node;}

addWaiter()中,基于以后线程创立节点Node之后,会立刻尝试一次平安地将新创建的Node增加到队列尾,若失败的话则会调用enq()办法来让节点入队列,enq()的实现如下所示。

private Node enq(final Node node) {    for (;;) {        Node t = tail;        if (t == null) {            //node如果是入同步队列的第一个节点,此时t为null            //则先将head平安的指向一个空节点,而后head再赋值给tail            //此时头节点和尾节点均为同一个空节点            if (compareAndSetHead(new Node()))                tail = head;        } else {            //尾节点不为null时,则尝试平安的将node设置为新的尾节点            //如果设置失败,则循环反复尝试设置,直到设置胜利为止            node.prev = t;            if (compareAndSetTail(t, node)) {                t.next = node;                return t;            }        }    }}

enq()办法理论就是在一个死循环中反复地基于CAS形式将新创建的节点入同步队列。胜利入队列后的节点会被传入acquireQueued()办法进入自旋状态,其实现如下所示。

final boolean acquireQueued(final Node node, int arg) {    boolean failed = true;    try {        boolean interrupted = false;        for (;;) {            //p为以后节点的上一节点            final Node p = node.predecessor();            //判断以后节点的上一节点是否是头节点            //如果是则尝试获取同步状态            if (p == head && tryAcquire(arg)) {                //获取同步状态胜利,则将以后节点置为头节点                setHead(node);                p.next = null;                failed = false;                return interrupted;            }            //先判断以后节点是否应该被阻塞            //如果应该被阻塞则调用parkAndCheckInterrupt()办法进入阻塞状态            //否则自旋从新进行上述步骤            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)            cancelAcquire(node);    }}

acquireQueued()办法中,如果以后节点的上一节点不是头节点或者尝试获取同步状态失败,那么会在shouldParkAfterFailedAcquire()办法中判断以后节点的上一节点状态是否为SIGNAL,如果是则会调用parkAndCheckInterrupt()进入阻塞状态,如果不是则会将以后节点的上一节点状态置为SIGNAL,而后自旋地再执行上述步骤。

独占式获取同步状态还有两个模板办法,别离为acquireInterruptibly()tryAcquireNanos(),其中acquireInterruptibly()能够响应中断,tryAcquireNanos()能够指定等待时间,上面将对这两个办法进行简要剖析。

acquireInterruptibly()的实现如下所示。

public final void acquireInterruptibly(int arg)        throws InterruptedException {    if (Thread.interrupted())        throw new InterruptedException();    if (!tryAcquire(arg))        doAcquireInterruptibly(arg);}

acquireInterruptibly()办法中尝试获取同步状态失败时,会调用doAcquireInterruptibly()办法来将以后线程退出同步队列,如下所示。

private void doAcquireInterruptibly(int arg)    throws InterruptedException {    //基于以后线程创立节点并退出同步队列尾    final Node node = addWaiter(Node.EXCLUSIVE);    boolean failed = true;    try {        for (;;) {            final Node p = node.predecessor();            if (p == head && tryAcquire(arg)) {                setHead(node);                p.next = null;                failed = false;                return;            }            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                //期待过程中被中断则抛出中断异样                throw new InterruptedException();        }    } finally {        if (failed)            cancelAcquire(node);    }}

doAcquireInterruptibly()的实现根本与acquireQueued()雷同,区别在于,在doAcquireInterruptibly()办法中进入期待状态时被中断的话,会抛出中断异样。

接下来再看一下tryAcquireNanos()办法的实现,如下所示。

public final boolean tryAcquireNanos(int arg, long nanosTimeout)        throws InterruptedException {    if (Thread.interrupted())        throw new InterruptedException();    return tryAcquire(arg) ||        doAcquireNanos(arg, nanosTimeout);}

tryAcquireNanos()办法中获取同步状态失败时,会调用doAcquireNanos()办法来将以后线程退出同步队列,如下所示。

private boolean doAcquireNanos(int arg, long nanosTimeout)        throws InterruptedException {    if (nanosTimeout <= 0L)        return false;    //计算期待的最晚工夫点    final long deadline = System.nanoTime() + nanosTimeout;    //基于以后线程创立节点并退出同步队列尾    final Node node = addWaiter(Node.EXCLUSIVE);    boolean failed = true;    try {        for (;;) {            final Node p = node.predecessor();            if (p == head && tryAcquire(arg)) {                setHead(node);                p.next = null;                failed = false;                return true;            }            //计算残余等待时间            nanosTimeout = deadline - System.nanoTime();            if (nanosTimeout <= 0L)                return false;            //如果以后线程应该被阻塞,并且残余等待时间大于1000纳秒,则应用LockSupport进入期待状态            if (shouldParkAfterFailedAcquire(p, node) &&                nanosTimeout > spinForTimeoutThreshold)                LockSupport.parkNanos(this, nanosTimeout);            if (Thread.interrupted())                //如果以后线程是因为被中断而从期待状态返回,则抛出中断异样                throw new InterruptedException();        }    } finally {        if (failed)            cancelAcquire(node);    }}

doAcquireNanos()办法的大体实现与acquireQueued()雷同,不同之处在于首先doAcquireNanos()至少只会在同步队列中阻塞nanosTimeout的工夫,超时的话会返回false示意期待获取同步状态失败,其次doAcquireNanos()也是响应中断的。

2. 独占式开释同步状态

AbstractQueuedSynchronizer提供了模板办法release()来实现独占式开释同步状态,如下所示。

public final boolean release(int arg) {    //尝试开释同步状态    if (tryRelease(arg)) {        Node h = head;        if (h != null && h.waitStatus != 0)            //开释同步状态之后,唤醒头节点的下一节点            unparkSuccessor(h);        return true;    }    return false;}

以后节点独占式地开释同步状态之后,会调用unparkSuccessor()办法来唤醒下一节点,如下所示。

private void unparkSuccessor(Node node) {    int ws = node.waitStatus;    if (ws < 0)        compareAndSetWaitStatus(node, ws, 0);    Node s = node.next;    //如果下一节点为null,或者下一节点状态为敞开,则从尾节点开始向前寻找满足唤醒条件的节点并唤醒    if (s == null || s.waitStatus > 0) {        s = null;        for (Node t = tail; t != null && t != node; t = t.prev)            if (t.waitStatus <= 0)                s = t;    }    if (s != null)        LockSupport.unpark(s.thread);}

unparkSuccessor()办法会获取以后节点的下一节点,并判断下一节点是否为null或者状态是否为CANCELLED,如果合乎其中任何一项,表明以后节点的下一节点不满足唤醒条件,那么须要从尾节点开始向前寻找满足唤醒条件的节点并唤醒,而唤醒条件就是节点不为null以及节点状态不为CANCELLED

3. 共享式获取同步状态

共享式获取同步状态的模板办法acquireShared()如下所示。

public final void acquireShared(int arg) {    //获取同步状态失败则返回负值    if (tryAcquireShared(arg) < 0)        //创立节点退出同步队列尾,并进入自旋状态        doAcquireShared(arg);}

tryAcquireShared()办法对其返回值进行了如下规定。

  • 返回负值,示意共享式获取同步状态失败;
  • 返回正值,示意共享式获取同步状态胜利,并且后续共享式获取同步状态还能胜利;
  • 返回0,示意共享式获取同步状态胜利,但后续共享式获取同步状态会失败。

因而如果通过tryAcquireShared()办法共享式获取同步状态失败,则会调用doAcquireShared()办法基于以后线程创立节点增加到同步队列尾,并进入自旋状态,tryAcquireShared()办法如下所示。

private void doAcquireShared(int arg) {    //基于以后线程和Node.SHARED创立一个节点Node,并退出同步队列    //Node.SHARED实际上是一个空Node对象,用于批示以后创立的节点是在共享模式下进行期待    final Node node = addWaiter(Node.SHARED);    boolean failed = true;    try {        boolean interrupted = false;        for (;;) {            final Node p = node.predecessor();            //获取以后节点的上一节点并判断是否是头节点            if (p == head) {                //以后节点的上一节点是头节点,则尝试共享式地获取同步状态                int r = tryAcquireShared(arg);                //tryAcquireShared()办法返回非负值,示意获取同步状态胜利                if (r >= 0) {                    //将以后节点设置为头节点,并判断下一节点是否是在共享模式下进行期待,如果是,则唤醒                    setHeadAndPropagate(node, r);                    p.next = null;                    if (interrupted)                        selfInterrupt();                    failed = false;                    return;                }            }            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)            cancelAcquire(node);    }}

doAcquireShared()办法中,首先会基于以后线程创立一个节点Node,并会设置节点的nextWaiter字段为Node.SHARED,其中nextWaiter是用于批示以后节点是独占模式下期待的节点还是共享模式下期待的节点。创立好节点并增加到同步队列尾之后,节点就会进入自旋状态,整个自旋流程与独占式获取同步状态大体一致,而区别就在于自旋状态中节点胜利获取到了同步状态之后的操作,这里如果在自旋状态中胜利获取到了同步状态,那么就会对获取到同步状态的节点的下一节点进行判断,如果判断失去下一节点是在共享模式下期待的节点,则须要将其唤醒,整个判断是在setHeadAndPropagate()办法中实现的,如下所示。

private void setHeadAndPropagate(Node node, int propagate) {    Node h = head;    setHead(node);    //propagate为tryAcquireShared()办法的返回值    //propagate为负值示意共享式获取同步状态失败    //propagate为0示意共享式获取同步状态胜利,然而同步状态后续无奈再被获取    //propagate为正值示意共享式获取同步状态失败,同步状态后续还能够被获取    if (propagate > 0 || h == null || h.waitStatus < 0 ||        (h = head) == null || h.waitStatus < 0) {        Node s = node.next;        //如果以后节点的下一节点是共享模式下期待的节点,则将其唤醒        if (s == null || s.isShared())            doReleaseShared();    }}

setHeadAndPropagate()办法中会先将共享式获取到同步状态的节点置为头节点,而后如果同步状态后续还能够被获取并且节点的下一节点是在共享模式下期待的节点,那么调用doReleaseShared()办法来唤醒共享模式下期待的节点,如下所示。

private void doReleaseShared() {    for (;;) {        //h示意本次循环执行时的头节点        Node h = head;        if (h != null && h != tail) {            int ws = h.waitStatus;            if (ws == Node.SIGNAL) {                //如果头节点状态为SIGNAL,示意头节点的下一节点处于期待状态                //则先将头节点状态置为0,再将下一节点唤醒                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                    continue;                unparkSuccessor(h);            }            else if (ws == 0 &&                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                continue;        }        if (h == head)            break;    }}

doReleaseShared()办法会每次获取循环执行时的头节点,因为节点状态为SIGNAL时节点的下一节点处于期待状态,所以如果头节点状态为SIGNAL则会先置头节点状态为0而后唤醒下一节点,下一节点被唤醒后进行自旋时如果共享式地获取到了同步状态,那么又会执行一轮上述的步骤,也就是说,共享模式下期待的节点的唤醒,会产生在同步状态开释后和上一节点获取到同步状态后,与之相比的独占模式下期待的节点的唤醒,只会产生在同步状态开释后。

下面的整个流程,以一个简略的例子加以阐明,下图为一个同步队列的节点示意图。

Node1在自旋过程中判断上一节点是头节点,并且获取同步状态胜利,此时Node1成为头节点,并判断同步状态还能持续获取以及下一节点Node2是在共享模式下期待的节点,所以Node2被唤醒,Node2被唤醒后进入自旋状态,并判断上一节点Node1是头节点,而后获取同步状态并胜利,此时Node2成为头节点,因而,在同步状态能够持续获取以及下一节点是共享模式下期待的节点的状况下,共享模式下期待的节点均会被唤醒,直到同步状态无奈再被获取或者下一节点是独占模式下期待的节点。

共享式获取同步状态还有两个办法acquireSharedInterruptibly()tryAcquireSharedNanos(),相较于acquireShared()办法别离减少了响应中断和指定等待时间的性能,除此之外这两个办法的实现与acquireShared()统一,故这里不再探讨这两个办法了。

4. 共享式开释同步状态

共享式开释同步状态的模板办法releaseShared()如下所示。

public final boolean releaseShared(int arg) {    if (tryReleaseShared(arg)) {        doReleaseShared();        return true;    }    return false;}

即调用重写的tryReleaseShared()办法共享式地开释同步状态胜利之后,会调用doReleaseShared()办法,而后面曾经晓得doReleaseShared()办法会每次获取循环执行时的头节点,而后唤醒头节点的下一节点,如果下一节点是共享模式下期待的节点,那么这个唤醒动作会被传递上来,如果下一节点是独占模式下期待的节点,则唤醒的动作只会作用于这个独占模式下期待的节点。

总结

AbstractQueuedSynchronizer提供了用于构建锁或者同步组件的根底框架,通常应用形式为创立一个AbstractQueuedSynchronizer的子类并作为同步组件的动态外部类,而后重写其规定的可重写办法,而后同步组件性能的实现须要通过调用AbstractQueuedSynchronizer提供的模板办法,这些模板办法会调用其规定的可重写办法,最终能够依照咱们的预期来实现同步性能。AbstractQueuedSynchronizer提供的模板办法,次要分为独占式获取同步状态独占式开释同步状态共享式获取同步状态共享式开释同步状态,这些模板办法为咱们提供了具体的同步实现细节比方期待和唤醒,咱们仅需将其规定的可重写办法重写好,便能牢靠地实现一个自定义同步组件。