关于锁:多线程学习队列同步器

7次阅读

共计 12011 个字符,预计需要花费 31 分钟才能阅读完成。

前言

AbstractQueuedSynchronizer,即 队列同步器 ,通过继承AbstractQueuedSynchronizer 并重写其办法能够实现锁或其它同步组件,本篇文章将对 AbstractQueuedSynchronizer 的应用和原理进行学习。

参考资料:《Java 并发编程的艺术》

注释

一. AbstractQueuedSynchronizer 的应用

AbstractQueuedSynchronizer的应用通常如下。

  • 创立 AbstractQueuedSynchronizer 的子类作为同步组件(例如 ReentrantLockCountDownLatch等)的动态外部类并重写 AbstractQueuedSynchronizer 规定的可重写的办法;
  • 同步组件通过调用 AbstractQueuedSynchronizer 提供的模板办法来实现同步组件的同步性能。

先对 AbstractQueuedSynchronizer 的可重写的办法进行阐明。AbstractQueuedSynchronizer是基于 模板设计模式 来实现锁或同步组件的,AbstractQueuedSynchronizer外部保护着一个字段叫做 state,该字段示意 同步状态 ,是一个整型变量,AbstractQueuedSynchronizer 规定了若干办法来操作 state 字段,但 AbstractQueuedSynchronizer 自身并没有对这些办法进行实现,而是要求 AbstractQueuedSynchronizer 的子类来实现这些办法,上面看一下这些办法的签名和正文。

办法签名 正文
protected boolean tryAcquire(int arg) 独占式地获取同步状态。该办法在独占式获取同步状态以前应该判断是否容许独占式获取,如果容许则尝试基于 CAS 形式来设置同步状态,设置胜利则示意获取同步状态胜利。
protected boolean tryRelease(int arg) 独占式地开释同步状态。能够了解为将同步状态还原为获取前的状态。
protected int tryAcquireShared(int arg) 共享式地获取同步状态。
protected boolean tryReleaseShared(int arg) 共享式地开释同步状态。
protected boolean isHeldExclusively() 判断以后线程是否独占以后队列同步器。

实际上,AbstractQueuedSynchronizer规定的这些可重写的办法,均会被 AbstractQueuedSynchronizer 提供的模板办法所调用,在基于 AbstractQueuedSynchronizer 实现同步组件时,可依据同步组件的理论性能来重写这些可重写办法,而后再通过调用模板办法来实现同步组件的性能。上面看一下 AbstractQueuedSynchronizer 提供的模板办法。

办法签名 正文
public final void acquire(int arg) 独占式获取同步状态,即独占式获取锁。获取胜利则该办法返回,获取失败则以后线程进入 同步队列 期待。
public final void acquireInterruptibly(int arg) throws InterruptedException 独占式获取同步状态,并响应中断。即如果获取同步状态失败,则会进入同步队列期待,此时如果线程被中断,则会退出期待状态并抛出中断异样。
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException acquireInterruptibly(int arg),并在其根底上指定了等待时间,若超时还未获取同步状态则返回false
public final void acquireShared(int arg) 共享式获取同步状态,即共享式获取锁。获取胜利则该办法返回,获取失败则以后线程进入同步队列期待,反对同一时刻多个线程获取到同步状态。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException 共享式获取同步状态,并响应中断。
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException acquireSharedInterruptibly(int arg),并在其根底上指定了等待时间,若超时还未获取同步状态则返回false
public final boolean release(int arg) 独占式地开释同步状态,即独占式地开释锁。胜利开释锁之后会将同步队列中的第一个节点的线程唤醒。
public final boolean releaseShared(int arg) 共享式地开释同步状态。
public final Collection<Thread> getQueuedThreads() 获取在同步队列上期待地线程。

上面联合 《Java 并发编程的艺术》 中的例子的简化版,来直观的展现应用 AbstractQueuedSynchronizer 来实现同步组件的不便,如下所示。

public class MyLock {private final Sync sync = new Sync();

    public void lock() {sync.acquire(1);
    }

    public void unlock() {sync.release(1);
    }

    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int unused) {if (getState() == 0) {throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    }

}

上述的 MyLock 是一个提供了最简略性能的不可重入锁,该锁的 Lock()unlock()办法全副是调用的 AbstractQueuedSynchronizer 提供的模板办法 acquire()release(),而后在 acquire()release()这两个模板办法中又会调用 Sync 重写的 tryAcquire()tryRelease()办法,最终的成果就是仅仅重写了 tryAcquire()tryRelease()办法,便实现了一个具备最简略性能的不可重入锁,而同步的具体实现细节比方同步队列等全都由 AbstractQueuedSynchronizer 来实现,借助AbstractQueuedSynchronizer,能够极大升高实现一个同步组件的门槛。

二. AbstractQueuedSynchronizer 的原理

AbstractQueuedSynchronizer 中,保护了一个 FIFO 的同步队列,当某个线程获取同步状态失败时,该线程会被封装成一个节点 Node 并退出同步队列中。本大节将联合同步队列和 AbstractQueuedSynchronizer 源码对 AbstractQueuedSynchronizer 的原理进行学习。

AbstractQueuedSynchronizer 中有两个字段 headtail,别离指向同步队列头节点和尾节点,同步队列中的每个节点也有两个字段 prevnext,别离指向上一节点和下一节。一个同步队列的根本构造如下所示。

当有新节点入队列时,新节点会退出到队列尾并成为新的尾节点,同时 tail 字段会指向新的尾节点。节点入队列如下所示。

通常,head字段指向的头节点示意以后胜利获取同步状态的线程所对应的节点,头节点开释同步状态后,会唤醒头节点的下一节点,当下一节点胜利获取同步状态后,会将 head 字段指向本人从而成为新的头节点,与此同时,新头节点的 prev 和老头节点的 next 字段会被置为null,以帮忙垃圾回收。头节点开释同步状态如下所示。

上述为同步队列的根本概览,上面将联合 AbstractQueuedSynchronizer 所提供的模板办法来对 独占式获取开释同步状态 共享式获取开释同步状态 进行剖析。

1. 独占式获取同步状态

独占式获取同步状态的 acquire() 办法如下所示。

public final void acquire(int arg) {if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();}

acquire()办法做了如下几件事件:

  • 先调用重写的 tryAcquire() 办法独占式地获取同步状态,获取胜利则返回,获取失败则执行下一步骤;
  • 调用 addWaiter() 办法基于以后线程创立一个节点 Node 并增加到同步队列中;
  • 调用 acquireQueued() 办法,使刚创立并退出了同步队列的 Node 进入自旋状态,在自旋状态中,Node会判断本人的上一节点是否是头节点,如果是则尝试获取同步状态,获取胜利则退出自旋状态,如果本人的上一节点不是头节点或者获取同步状态失败,则阻塞本人,直到被上一节点唤醒或者被中断,此时反复后面的自旋过程。

还是从源码动手,剖析创立节点 Node,入同步队列和自旋判断并获取同步状态这几个过程。首先是addWaiter() 办法,如下所示。

private Node addWaiter(Node mode) {
    // 基于以后线程创立一个节点
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        // 先尝试平安地将新创建的节点增加到同步队列尾
        // 如果增加胜利,则间接返回新创建的节点
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果新创建的节点是入同步队列的第一个节点,或者尝试增加失败
    // 则调用 enq()办法将节点平安地增加到同步队列中
    enq(node);
    return node;
}

addWaiter() 中,基于以后线程创立节点 Node 之后,会立刻尝试一次平安地将新创建的 Node 增加到队列尾,若失败的话则会调用 enq() 办法来让节点入队列,enq()的实现如下所示。

private Node enq(final Node node) {for (;;) {
        Node t = tail;
        if (t == null) {
            //node 如果是入同步队列的第一个节点,此时 t 为 null
            // 则先将 head 平安的指向一个空节点,而后 head 再赋值给 tail
            // 此时头节点和尾节点均为同一个空节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 尾节点不为 null 时,则尝试平安的将 node 设置为新的尾节点
            // 如果设置失败,则循环反复尝试设置,直到设置胜利为止
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

enq()办法理论就是在一个死循环中反复地基于 CAS 形式将新创建的节点入同步队列。胜利入队列后的节点会被传入 acquireQueued() 办法进入自旋状态,其实现如下所示。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // p 为以后节点的上一节点
            final Node p = node.predecessor();
            // 判断以后节点的上一节点是否是头节点
            // 如果是则尝试获取同步状态
            if (p == head && tryAcquire(arg)) {
                // 获取同步状态胜利,则将以后节点置为头节点
                setHead(node);
                p.next = null;
                failed = false;
                return interrupted;
            }
            // 先判断以后节点是否应该被阻塞
            // 如果应该被阻塞则调用 parkAndCheckInterrupt()办法进入阻塞状态
            // 否则自旋从新进行上述步骤
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {if (failed)
            cancelAcquire(node);
    }
}

acquireQueued()办法中,如果以后节点的上一节点不是头节点或者尝试获取同步状态失败,那么会在 shouldParkAfterFailedAcquire() 办法中判断以后节点的上一节点状态是否为 SIGNAL,如果是则会调用parkAndCheckInterrupt() 进入阻塞状态,如果不是则会将以后节点的上一节点状态置为SIGNAL,而后自旋地再执行上述步骤。

独占式获取同步状态还有两个模板办法,别离为 acquireInterruptibly()tryAcquireNanos(),其中 acquireInterruptibly() 能够响应中断,tryAcquireNanos()能够指定等待时间,上面将对这两个办法进行简要剖析。

acquireInterruptibly()的实现如下所示。

public final void acquireInterruptibly(int arg)
        throws InterruptedException {if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

acquireInterruptibly()办法中尝试获取同步状态失败时,会调用 doAcquireInterruptibly() 办法来将以后线程退出同步队列,如下所示。

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    // 基于以后线程创立节点并退出同步队列尾
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {for (;;) {final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {setHead(node);
                p.next = null;
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 期待过程中被中断则抛出中断异样
                throw new InterruptedException();}
    } finally {if (failed)
            cancelAcquire(node);
    }
}

doAcquireInterruptibly()的实现根本与 acquireQueued() 雷同,区别在于,在 doAcquireInterruptibly() 办法中进入期待状态时被中断的话,会抛出中断异样。

接下来再看一下 tryAcquireNanos() 办法的实现,如下所示。

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

tryAcquireNanos()办法中获取同步状态失败时,会调用 doAcquireNanos() 办法来将以后线程退出同步队列,如下所示。

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {if (nanosTimeout <= 0L)
        return false;
    // 计算期待的最晚工夫点
    final long deadline = System.nanoTime() + nanosTimeout;
    // 基于以后线程创立节点并退出同步队列尾
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {for (;;) {final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {setHead(node);
                p.next = null;
                failed = false;
                return true;
            }
            // 计算残余等待时间
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            // 如果以后线程应该被阻塞,并且残余等待时间大于 1000 纳秒,则应用 LockSupport 进入期待状态
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                // 如果以后线程是因为被中断而从期待状态返回,则抛出中断异样
                throw new InterruptedException();}
    } finally {if (failed)
            cancelAcquire(node);
    }
}

doAcquireNanos()办法的大体实现与 acquireQueued() 雷同,不同之处在于首先 doAcquireNanos() 至少只会在同步队列中阻塞 nanosTimeout 的工夫,超时的话会返回 false 示意期待获取同步状态失败,其次 doAcquireNanos() 也是响应中断的。

2. 独占式开释同步状态

AbstractQueuedSynchronizer提供了模板办法 release() 来实现独占式开释同步状态,如下所示。

public final boolean release(int arg) {
    // 尝试开释同步状态
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 开释同步状态之后,唤醒头节点的下一节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}

以后节点独占式地开释同步状态之后,会调用 unparkSuccessor() 办法来唤醒下一节点,如下所示。

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    // 如果下一节点为 null,或者下一节点状态为敞开,则从尾节点开始向前寻找满足唤醒条件的节点并唤醒
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

unparkSuccessor()办法会获取以后节点的下一节点,并判断下一节点是否为 null 或者状态是否为 CANCELLED,如果合乎其中任何一项,表明以后节点的下一节点不满足唤醒条件,那么须要从尾节点开始向前寻找满足唤醒条件的节点并唤醒,而唤醒条件就是节点不为null 以及节点状态不为CANCELLED

3. 共享式获取同步状态

共享式获取同步状态的模板办法 acquireShared() 如下所示。

public final void acquireShared(int arg) {
    // 获取同步状态失败则返回负值
    if (tryAcquireShared(arg) < 0)
        // 创立节点退出同步队列尾,并进入自旋状态
        doAcquireShared(arg);
}

tryAcquireShared()办法对其返回值进行了如下规定。

  • 返回负值,示意共享式获取同步状态失败;
  • 返回正值,示意共享式获取同步状态胜利,并且后续共享式获取同步状态还能胜利;
  • 返回 0,示意共享式获取同步状态胜利,但后续共享式获取同步状态会失败。

因而如果通过 tryAcquireShared() 办法共享式获取同步状态失败,则会调用 doAcquireShared() 办法基于以后线程创立节点增加到同步队列尾,并进入自旋状态,tryAcquireShared()办法如下所示。

private void doAcquireShared(int arg) {
    // 基于以后线程和 Node.SHARED 创立一个节点 Node,并退出同步队列
    //Node.SHARED 实际上是一个空 Node 对象,用于批示以后创立的节点是在共享模式下进行期待
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {final Node p = node.predecessor();
            // 获取以后节点的上一节点并判断是否是头节点
            if (p == head) {
                // 以后节点的上一节点是头节点,则尝试共享式地获取同步状态
                int r = tryAcquireShared(arg);
                //tryAcquireShared()办法返回非负值,示意获取同步状态胜利
                if (r >= 0) {
                    // 将以后节点设置为头节点,并判断下一节点是否是在共享模式下进行期待,如果是,则唤醒
                    setHeadAndPropagate(node, r);
                    p.next = null;
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {if (failed)
            cancelAcquire(node);
    }
}

doAcquireShared() 办法中,首先会基于以后线程创立一个节点 Node,并会设置节点的nextWaiter 字段为 Node.SHARED,其中nextWaiter 是用于批示以后节点是 独占模式下期待的节点 还是 共享模式下期待的节点 。创立好节点并增加到同步队列尾之后,节点就会进入自旋状态,整个自旋流程与独占式获取同步状态大体一致,而区别就在于自旋状态中节点胜利获取到了同步状态之后的操作,这里如果在自旋状态中胜利获取到了同步状态,那么就会对获取到同步状态的节点的下一节点进行判断,如果判断失去下一节点是在共享模式下期待的节点,则须要将其唤醒,整个判断是在setHeadAndPropagate() 办法中实现的,如下所示。

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    setHead(node);

    //propagate 为 tryAcquireShared()办法的返回值
    //propagate 为负值示意共享式获取同步状态失败
    //propagate 为 0 示意共享式获取同步状态胜利,然而同步状态后续无奈再被获取
    //propagate 为正值示意共享式获取同步状态失败,同步状态后续还能够被获取
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 如果以后节点的下一节点是共享模式下期待的节点,则将其唤醒
        if (s == null || s.isShared())
            doReleaseShared();}
}

setHeadAndPropagate()办法中会先将共享式获取到同步状态的节点置为头节点,而后如果同步状态后续还能够被获取并且节点的下一节点是在共享模式下期待的节点,那么调用 doReleaseShared() 办法来唤醒共享模式下期待的节点,如下所示。

private void doReleaseShared() {for (;;) {
        // h 示意本次循环执行时的头节点
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                // 如果头节点状态为 SIGNAL,示意头节点的下一节点处于期待状态
                // 则先将头节点状态置为 0,再将下一节点唤醒
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head)
            break;
    }
}

doReleaseShared()办法会每次获取循环执行时的头节点,因为节点状态为 SIGNAL 时节点的下一节点处于期待状态,所以如果头节点状态为 SIGNAL 则会先置头节点状态为 0 而后唤醒下一节点,下一节点被唤醒后进行自旋时如果共享式地获取到了同步状态,那么又会执行一轮上述的步骤,也就是说,共享模式下期待的节点的唤醒,会产生在同步状态开释后和上一节点获取到同步状态后,与之相比的独占模式下期待的节点的唤醒,只会产生在同步状态开释后。

下面的整个流程,以一个简略的例子加以阐明,下图为一个同步队列的节点示意图。

Node1在自旋过程中判断上一节点是头节点,并且获取同步状态胜利,此时 Node1 成为头节点,并判断同步状态还能持续获取以及下一节点 Node2 是在共享模式下期待的节点,所以 Node2 被唤醒,Node2被唤醒后进入自旋状态,并判断上一节点 Node1 是头节点,而后获取同步状态并胜利,此时 Node2 成为头节点,因而,在同步状态能够持续获取以及下一节点是共享模式下期待的节点的状况下,共享模式下期待的节点均会被唤醒,直到同步状态无奈再被获取或者下一节点是独占模式下期待的节点。

共享式获取同步状态还有两个办法 acquireSharedInterruptibly()tryAcquireSharedNanos(),相较于 acquireShared() 办法别离减少了响应中断和指定等待时间的性能,除此之外这两个办法的实现与 acquireShared() 统一,故这里不再探讨这两个办法了。

4. 共享式开释同步状态

共享式开释同步状态的模板办法 releaseShared() 如下所示。

public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();
        return true;
    }
    return false;
}

即调用重写的 tryReleaseShared() 办法共享式地开释同步状态胜利之后,会调用 doReleaseShared() 办法,而后面曾经晓得 doReleaseShared() 办法会每次获取循环执行时的头节点,而后唤醒头节点的下一节点,如果下一节点是共享模式下期待的节点,那么这个唤醒动作会被传递上来,如果下一节点是独占模式下期待的节点,则唤醒的动作只会作用于这个独占模式下期待的节点。

总结

AbstractQueuedSynchronizer提供了用于构建锁或者同步组件的根底框架,通常应用形式为创立一个 AbstractQueuedSynchronizer 的子类并作为同步组件的动态外部类,而后重写其规定的可重写办法,而后同步组件性能的实现须要通过调用 AbstractQueuedSynchronizer 提供的模板办法,这些模板办法会调用其规定的可重写办法,最终能够依照咱们的预期来实现同步性能。AbstractQueuedSynchronizer提供的模板办法,次要分为 独占式获取同步状态 独占式开释同步状态 共享式获取同步状态 共享式开释同步状态,这些模板办法为咱们提供了具体的同步实现细节比方期待和唤醒,咱们仅需将其规定的可重写办法重写好,便能牢靠地实现一个自定义同步组件。

正文完
 0