Java开发中,咱们的应用程序常常会应用多线程进步程序的运行效率,多线程状况下拜访线程共享变量可能会带来并发问题,此时就须要并发锁解决并发问题。Java提供了两种类型的并发管制机制:synchonrized关键字和AQS框架,二者各有劣势,不过在加锁解锁场景比拟灵便的状况下,咱们往往会采纳AQS框架来解决并发问题。本文会对Java中的AQS框架的构造和源码进行简略介绍。本文大多数内容参考了这篇博客

<!--more-->

AQS构造

AQS的全称是AbstractQueuedSynchronizer(形象的队列式的同步器),AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如罕用的ReentrantLock/Semaphore/CountDownLatch等。

如下图所示,AQS次要蕴含两局部内容:共享资源和期待队列。AQS底层曾经对这两局部内容提供了很多办法。

  1. 共享资源:共享资源是一个volatile的int类型变量。
  2. 期待队列:期待队列是一个线程平安的队列,当线程拿不到锁时,会被park并放入队列。
  3. 新线程:非偏心状况下,新线程会先尝试间接获取资源,获取不到才进入队列。

核心思想

同步器的外围办法是acquire和release操作,其背地的思维也比拟简洁明确。

acquire操作是这样的:

// acquire操作while (以后同步器的状态不容许获取操作) {    如果以后线程不在队列中,则将其插入队列    阻塞以后线程}如果线程位于队列中,则将其移出队列

release操作是这样的:

更新同步器的状态if (新的状态容许某个被阻塞的线程获取胜利)    解除队列中一个或多个线程的阻塞状态

从这两个操作中的思维中咱们能够提取出三大要害操作:同步器的状态变更、线程阻塞和开释、插入和移出队列。所以为了实现这两个操作,须要协调三大要害操作引申进去的三个根本组件:

  • 同步器状态的原子性治理;
  • 线程阻塞与解除阻塞;
  • 队列的治理;

同步器状态的原子性治理

AQS类应用单个int(32位)来保留同步状态,并暴露出getState、setState以及compareAndSet操作来读取和更新这个同步状态。其中属性state被申明为volatile,并且通过应用CAS指令来实现compareAndSetState,使得当且仅当同步状态领有一个统一的期望值的时候,才会被原子地设置成新值,这样就达到了同步状态的原子性治理,确保了同步状态的原子性、可见性和有序性。

线程阻塞与解除阻塞

直到JSR166,阻塞线程和解除线程阻塞都是基于Java的内置管程,没有其它非基于Java内置管程的API能够用来达到阻塞线程和解除线程阻塞。惟一能够抉择的是Thread.suspend和Thread.resume,然而它们都有无奈解决的竞态问题,所以也没法用,目前该办法根本已被摈弃。具体不能用的起因能够官网给出的回答。

j.u.c.locks包提供了LockSupport类来解决这个问题。办法LockSupport.park阻塞以后线程直到有个LockSupport.unpark办法被调用。unpark的调用是没有被计数的,因而在一个park调用前屡次调用unpark办法只会解除一个park操作。另外,它们作用于每个线程而不是每个同步器。一个线程在一个新的同步器上调用park操作可能会立刻返回,因为在此之前能够有多余的unpark操作。然而,在短少一个unpark操作时,下一次调用park就会阻塞。尽管能够显式地勾销多余的unpark调用,但并不值得这样做。在须要的时候屡次调用park会更高效。park办法同样反对可选的绝对或相对的超时设置,以及与JVM的Thread.interrupt联合 ,可通过中断来unpark一个线程。

队列的治理

整个框架的外围就是如何治理线程阻塞队列,该队列是严格的FIFO队列,因而不反对线程优先级的同步。同步队列的最佳抉择是本身没有应用底层锁来结构的非阻塞数据结构,业界次要有两种抉择,一种是MCS锁,另一种是CLH锁。其中CLH个别用于自旋,然而相比MCS,CLH更容易实现勾销和超时,所以同步队列抉择了CLH作为实现的根底。

CLH队列理论并不那么像队列,它的出队和入队与理论的业务应用场景密切相关。它是一个链表队列,通过AQS的两个字段head(头节点)和tail(尾节点)来存取,这两个字段是volatile类型,初始化的时候都指向了一个空节点。

条件队列

上一节的队列其实是AQS的同步队列,这一节的队列是条件队列,队列的治理除了有同步队列,还有条件队列。AQS只有一个同步队列,然而能够有多个条件队列。AQS框架提供了一个ConditionObject类,给保护独占同步的类以及实现Lock接口的类应用。

ConditionObject类实现了Condition接口,Condition接口提供了相似Object管程式的办法,如await、signal和signalAll操作,还扩大了带有超时、检测和监控的办法。ConditionObject类无效地将条件与其它同步操作联合到了一起。该类只反对Java格调的管程拜访规定,这些规定中,当且仅当以后线程持有锁且要操作的条件(condition)属于该锁时,条件操作才是非法的。这样,一个ConditionObject关联到一个ReentrantLock上就体现的跟内置的管程(通过Object.wait等)一样了。两者的不同仅仅在于办法的名称、额定的性能以及用户能够为每个锁申明多个条件。

ConditionObject类和AQS共用了外部节点,有本人独自的条件队列。signal操作是通过将节点从条件队列转移到同步队列中来实现的,没有必要在须要唤醒的线程从新获取到锁之前将其唤醒。

源码剖析

咱们次要通过独占式同步状态的获取和开释、共享式同步状态的获取和开释来看下AQS是如何实现的。

独占式同步状态的获取

独占式同步状态调用的办法是acquire,代码如下:

public final void acquire(int arg) {    if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();}

上述代码次要实现了同步状态获取、节点结构、退出同步队列以及在同步队列中自旋期待的相干工作,其次要逻辑是:首先调用子类实现的tryAcquire办法,该办法保障线程平安的获取同步状态,如果同步状态获取失败,则结构独占式同步节点(同一时刻只能有一个线程胜利获取同步状态)并通过addWaiter办法将该节点退出到同步队列的尾部,最初调用acquireQueued办法,使得该节点以自旋的形式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒次要依附前驱节点的出队或阻塞线程被中断来实现。

上面来首先来看下节点结构和退出同步队列是如何实现的。代码如下:

private Node addWaiter(Node mode) {        // 以后线程结构成Node节点        Node node = new Node(Thread.currentThread(), mode);        // Try the fast path of enq; backup to full enq on failure        // 尝试疾速在尾节点后新增节点 晋升算法效率 先将尾节点指向pred        Node pred = tail;        if (pred != null) {            //尾节点不为空  以后线程节点的前驱节点指向尾节点            node.prev = pred;            //并发解决 尾节点有可能曾经不是之前的节点 所以须要CAS更新            if (compareAndSetTail(pred, node)) {                //CAS更新胜利 以后线程为尾节点 原先尾节点的后续节点就是以后节点                pred.next = node;                return node;            }        }        //第一个入队的节点或者是尾节点后续节点新增失败时进入enq        enq(node);        return node;    }private Node enq(final Node node) {        for (;;) {            Node t = tail;            if (t == null) { // Must initialize                //尾节点为空  第一次入队  设置头尾节点统一 同步队列的初始化                if (compareAndSetHead(new Node()))                    tail = head;            } else {                //所有的线程节点在结构实现第一个节点后 顺次退出到同步队列中                node.prev = t;                if (compareAndSetTail(t, node)) {                    t.next = node;                    return t;                }            }        }    }

节点进入同步队列之后,就进入了一个自旋的过程,每个线程节点都在自省地察看,当条件满足,获取到了同步状态,就能够从这个自旋过程中退出,否则仍旧留在这个自旋过程中并会阻塞节点的线程,代码如下:

final boolean acquireQueued(final Node node, int arg) {        boolean failed = true;        try {            boolean interrupted = false;            for (;;) {                //获取以后线程节点的前驱节点                final Node p = node.predecessor();                //前驱节点为头节点且胜利获取同步状态                if (p == head && tryAcquire(arg)) {                    //设置以后节点为头节点                    setHead(node);                    p.next = null; // help GC                    failed = false;                    return interrupted;                }                //是否阻塞                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    interrupted = true;            }        } finally {            if (failed)                cancelAcquire(node);        }    }

再来看看shouldParkAfterFailedAcquire和parkAndCheckInterrupt是怎么来阻塞以后线程的,代码如下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {        //前驱节点的状态决定后续节点的行为     int ws = pred.waitStatus;        if (ws == Node.SIGNAL)            /*前驱节点为-1 后续节点能够被阻塞             * This node has already set status asking a release             * to signal it, so it can safely park.             */            return true;        if (ws > 0) {            /*             * Predecessor was cancelled. Skip over predecessors and             * indicate retry.             */            do {                node.prev = pred = pred.prev;            } while (pred.waitStatus > 0);            pred.next = node;        } else {            /*前驱节点是初始或者共享状态就设置为-1 使后续节点阻塞             * waitStatus must be 0 or PROPAGATE.  Indicate that we             * need a signal, but don't park yet.  Caller will need to             * retry to make sure it cannot acquire before parking.             */            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);        }        return false;    }private final boolean parkAndCheckInterrupt() {        //阻塞线程        LockSupport.park(this);        return Thread.interrupted();    }

独占式同步获取锁示例图

独占式同步状态的开释

当同步状态获取胜利之后,以后线程从acquire办法返回,对于锁这种并发组件而言,就意味着以后线程获取了锁。有获取同步状态的办法,就存在其对应的开释办法,该办法为release,当初来看下这个办法的实现,代码如下:

public final boolean release(int arg) {        if (tryRelease(arg)) {//同步状态开释胜利            Node h = head;            if (h != null && h.waitStatus != 0)                //间接开释头节点                unparkSuccessor(h);            return true;        }        return false;    }private void unparkSuccessor(Node node) {        /*         * If status is negative (i.e., possibly needing signal) try         * to clear in anticipation of signalling.  It is OK if this         * fails or if status is changed by waiting thread.         */        int ws = node.waitStatus;        if (ws < 0)            compareAndSetWaitStatus(node, ws, 0);        /*寻找符合条件的后续节点         * Thread to unpark is held in successor, which is normally         * just the next node.  But if cancelled or apparently null,         * traverse backwards from tail to find the actual         * non-cancelled successor.         */        Node s = node.next;        if (s == null || s.waitStatus > 0) {            s = null;            for (Node t = tail; t != null && t != node; t = t.prev)                if (t.waitStatus <= 0)                    s = t;        }        if (s != null)            //唤醒后续节点            LockSupport.unpark(s.thread);    }

独占式开释是非常简单而且明确的。

总结下独占式同步状态的获取和开释:在获取同步状态时,同步器保护一个同步队列,获取状态失败的线程都会被退出到队列中并在队列中进行自旋;移出队列的条件是前驱节点为头节点且胜利获取了同步状态。在开释同步状态时,同步器调用tryRelease办法开释同步状态,而后唤醒头节点的后继节点。

共享式同步状态的获取

共享式同步状态调用的办法是acquireShared,代码如下:

public final void acquireShared(int arg) {        //获取同步状态的返回值大于等于0时示意能够获取同步状态        //小于0时示意能够获取不到同步状态  须要进入队列期待        if (tryAcquireShared(arg) < 0)            doAcquireShared(arg);    }private void doAcquireShared(int arg) {        //和独占式一样的入队操作        final Node node = addWaiter(Node.SHARED);        boolean failed = true;        try {            boolean interrupted = false;            //自旋            for (;;) {                final Node p = node.predecessor();                if (p == head) {                    int r = tryAcquireShared(arg);                    if (r >= 0) {                        //前驱结点为头节点且胜利获取同步状态 可退出自旋                        setHeadAndPropagate(node, r);                        p.next = null; // help GC                        if (interrupted)                            selfInterrupt();                        failed = false;                        return;                    }                }                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    interrupted = true;            }        } finally {            if (failed)                cancelAcquire(node);        }    }private void setHeadAndPropagate(Node node, int propagate) {        Node h = head; // Record old head for check below        //退出自旋的节点变成首节点        setHead(node);               if (propagate > 0 || h == null || h.waitStatus < 0 ||            (h = head) == null || h.waitStatus < 0) {            Node s = node.next;            if (s == null || s.isShared())                doReleaseShared();        }    }

与独占式一样,共享式获取也须要开释同步状态,通过调用releaseShared办法能够开释同步状态,代码如下:

public final boolean releaseShared(int arg) {        //开释同步状态        if (tryReleaseShared(arg)) {            //唤醒后续期待的节点            doReleaseShared();            return true;        }        return false;    }private void doReleaseShared() {        /*         * Ensure that a release propagates, even if there are other         * in-progress acquires/releases.  This proceeds in the usual         * way of trying to unparkSuccessor of head if it needs         * signal. But if it does not, status is set to PROPAGATE to         * ensure that upon release, propagation continues.         * Additionally, we must loop in case a new node is added         * while we are doing this. Also, unlike other uses of         * unparkSuccessor, we need to know if CAS to reset status         * fails, if so rechecking.         */        //自旋    for (;;) {            Node h = head;            if (h != null && h != tail) {                int ws = h.waitStatus;                if (ws == Node.SIGNAL) {                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                        continue;            // loop to recheck cases                    //唤醒后续节点            unparkSuccessor(h);                }                else if (ws == 0 &&                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                    continue;                // loop on failed CAS            }            if (h == head)                   // loop if head changed                break;        }    }

unparkSuccessor办法和独占式是一样的。

AQS的利用

AQS被大量的利用在了同步工具上。

ReentrantLock:ReentrantLock类应用AQS同步状态来保留锁反复持有的次数。当锁被一个线程获取时,ReentrantLock也会记录下以后取得锁的线程标识,以便查看是否是反复获取,以及当谬误的线程试图进行解锁操作时检测是否存在非法状态异样。ReentrantLock也应用了AQS提供的ConditionObject,还向外裸露了其它监控和监测相干的办法。

ReentrantReadWriteLock:ReentrantReadWriteLock类应用AQS同步状态中的16位来保留写锁持有的次数,剩下的16位用来保留读锁的持有次数。WriteLock的构建形式同ReentrantLock。ReadLock则通过应用acquireShared办法来反对同时容许多个读线程。

Semaphore:Semaphore类(信号量)应用AQS同步状态来保留信号量的以后计数。它外面定义的acquireShared办法会缩小计数,或当计数为非正值时阻塞线程;tryRelease办法会减少计数,在计数为正值时还要解除线程的阻塞。

CountDownLatch:CountDownLatch类应用AQS同步状态来示意计数。当该计数为0时,所有的acquire操作(对应到CountDownLatch中就是await办法)能力通过。

FutureTask:FutureTask类应用AQS同步状态来示意某个异步计算工作的运行状态(初始化、运行中、被勾销和实现)。设置(FutureTask的set办法)或勾销(FutureTask的cancel办法)一个FutureTask时会调用AQS的release操作,期待计算结果的线程的阻塞解除是通过AQS的acquire操作实现的。

SynchronousQueues:SynchronousQueues类应用了外部的期待节点,这些节点能够用于协调生产者和消费者。同时,它应用AQS同步状态来管制当某个消费者生产以后一项时,容许一个生产者持续生产,反之亦然。

除了这些j.u.c提供的工具,还能够基于AQS自定义合乎本人需要的同步器。

我是御狐神,欢送大家关注我的微信公众号

本文最先公布至微信公众号,版权所有,禁止转载!