1 AQS 概述

AQS 的全称为(AbstractQueuedSynchronizer),中文即“队列同步器”,这个类放在 java.util.concurrent.locks 包上面。

AQS是用来构建锁或者其余同步组件的根底框架,它应用了一个int成员变量示意同步状态,通过内置的FIFO队列来实现资源获取线程的排队工作。应用 AQS 能简略且高效地结构出利用宽泛的大量的同步器,比方上篇文章写的ReentrantLock与ReentrantReadWriteLock。除此之外,AQS还能结构出Semaphore,FutureTask(jdk1.7) 等同步器。

2 AQS 原理

2.1 同步队列

AQS 是依赖 CLH 队列锁来实现同步状态的治理。以后线程获取同步状态失败时,同步器会将以后线程以及期待状态等信息构建为一个节点(Node)并将其退出同步队列,同步会阻塞以后线程,当同步状态开释时,会将首节点中的线程唤醒,使其再次尝试获取同步状态。

CLH(Craig,Landin,and Hagersten)队列是一个虚构的双向队列(FIFO双向队列)(虚构的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS 是将每条申请共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的调配

同步队列中的节点(Node)用来保留获取同步状态失败的线程援用、期待状态以及前驱和后继节点信息。

属性类型与名称形容
int waitStatus期待状态(如CANCELLED=1、SIGNAL=-1、CONDITION=-2、PROPAGATE=-3、INITIAL=0)
Node prev前驱节点(当节点退出同步队列时被设置,在尾部增加)
Node next后继节点
Thread thread以后获取同步状态的线程

节点源码如下:

static final class Node {    // 示意该节点期待模式为共享式,通常记录于nextWaiter,    // 通过判断nextWaiter的值能够判断以后结点是否处于共享模式    static final Node SHARED = new Node();    // 示意节点处于独占式模式,与SHARED绝对    static final Node EXCLUSIVE = null;    // waitStatus的不同状态    // 以后结点是因为超时或者中断勾销的,进入该状态后将无奈复原    static final int CANCELLED =  1;    // 以后结点的后继结点是(或者将要)由park导致阻塞的,当结点被开释或者勾销时,须要通过unpark唤醒后继结点    static final int SIGNAL    = -1;    // 表明结点在期待队列中,结点线程期待在Condition上    // 当其余线程对Condition调用了signal()办法时,会将其退出到同步队列中       static final int CONDITION = -2;    // 下一次共享式同步状态的获取将会无条件地向后继结点流传    static final int PROPAGATE = -3;    volatile int waitStatus;    // 记录前驱结点    volatile Node prev;    // 记录后继结点    volatile Node next;    // 记录以后的线程    volatile Thread thread;    // 用于记录共享模式(SHARED), 也能够用来记录CONDITION队列    Node nextWaiter;    // 通过nextWaiter的记录值判断以后结点的模式是否为共享模式    final boolean isShared() {    return nextWaiter == SHARED;}    // 获取以后结点的前置结点    final Node predecessor() throws NullPointerException { ... }    // 用于初始化时创立head结点或者创立SHARED结点    Node() {}    // 在addWaiter办法中应用,用于创立一个新的结点    Node(Thread thread, Node mode) {             this.nextWaiter = mode;        this.thread = thread;    }    // 在CONDITION队列中应用该构造函数新建结点    Node(Thread thread, int waitStatus) {         this.waitStatus = waitStatus;        this.thread = thread;    }}// 记录头结点private transient volatile Node head;// 记录尾结点private transient volatile Node tail;

节点是形成同步队列的根底,同步器领有首节点(Head)和尾节点(Tail),没有胜利获取同步状态的线程将会成为节点退出该队列的尾部。同步器提供了一个基于CAS的设置尾节点的办法:compareAndSetTail(Node expect, Node update),它须要传递以后线程“认为”的尾节点和以后节点,只有设置胜利后,以后节点才正式与之前的尾节点建设关联。

首节点是获取同步状态胜利的节点,首节点的线程在开释同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态胜利时将本人设置为首节点。设置首节点是通过获取同步状态胜利的线程来实现的,不须要应用CAS来保障,只需将首节点设置成为原首节点的后继节点并断开原首节点的next援用即可。

2.2 同步状态

2.2.1 独占式(EXCLUSIVE)

独占式(EXCLUSIVE)获取需重写tryAcquiretryRelease办法,并拜访acquirerelease办法实现相应的性能。

    public final void acquire(int arg) {        // 如果线程间接获取胜利,或者再尝试获取胜利后都是间接工作,        // 如果是从阻塞状态中唤醒开始工作的线程,将以后的线程中断                if (!tryAcquire(arg) &&            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            selfInterrupt();    }    // 封装线程,新建结点并退出到同步队列中    private Node addWaiter(Node mode) {        Node node = new Node(Thread.currentThread(), mode);        Node pred = tail;        // 尝试入队, 胜利返回        if (pred != null) {            node.prev = pred;            // CAS操作设置队尾            if (compareAndSetTail(pred, node)) {                pred.next = node;                return node;            }        }        // 通过CAS操作自旋实现node入队操作        enq(node);        return node;    }    // 在同步队列中期待获取同步状态    final boolean acquireQueued(final Node node, int arg) {        boolean failed = true;        try {            boolean interrupted = false;            // 自旋            for (;;) {                final Node p = node.predecessor();                // 前驱节点是否为头节点&&tryAcquire获取同步状态                if (p == head && tryAcquire(arg)) {                    setHead(node);                    p.next = null;                    failed = false;                    return interrupted;                }                // 获取不到同步状态,将前置结点标为SIGNAL状态并且通过park操作将Node封装的线程阻塞                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    interrupted = true;            }        } finally {            if (failed)                // 如果获取失败,将node标记为CANCELLED                cancelAcquire(node);        }    }

独占式获取同步状态流程:

通过调用同步器的release(int arg)办法能够开释同步状态,该办法在开释了同步状态之后,会唤醒其后继节点(进而使后继节点从新尝试获取同步状态)。

public final boolean release(int arg) {    // 首先尝试开释并更新同步状态    if (tryRelease(arg)) {        Node h = head;        // 查看是否须要唤醒后置结点        if (h != null && h.waitStatus != 0)            // 唤醒后置结点            unparkSuccessor(h);        return true;    }    return false;}// 唤醒后继结点private void unparkSuccessor(Node node) {    int ws = node.waitStatus;    // 通过CAS操作将waitStatus更新为0    if (ws < 0)        compareAndSetWaitStatus(node, ws, 0);    Node s = node.next;    // 查看后置结点,若为空或者状态为CANCELLED,找到后置非CANCELLED结点    if (s == null || s.waitStatus > 0) {        s = null;        for (Node t = tail; t != null && t != node; t = t.prev)            if (t.waitStatus <= 0)                s = t;    }    // 唤醒后继结点    if (s != null)        LockSupport.unpark(s.thread);}

2.2.2 共享式(SHARED)

共享式获取与独占式获取最次要的区别在于同一时刻是否有多个线程同时获取到同步状态

共享式(SHARED)获取需重写tryAcquireSharedtryReleaseShared办法,并拜访acquireSharedreleaseShared办法实现相应的性能。与独占式绝对,共享式反对多个线程同时获取到同步状态并进行工作,如 Semaphore、CountDownLatch、 CyclicBarrier等。ReentrantReadWriteLock 能够看成是组合式,因为 ReentrantReadWriteLock 也就是读写锁容许多个线程同时对某一资源进行读。

public final void acquireShared(int arg) {    // 尝试共享式获取同步状态,如果胜利获取则能够继续执行,否则执行doAcquireShared    if (tryAcquireShared(arg) < 0)        // 以共享式不停得尝试获取同步状态        doAcquireShared(arg);}private void doAcquireShared(int arg) {    // 向同步队列中新增一个共享式的结点    final Node node = addWaiter(Node.SHARED);    // 标记获取失败状态    boolean failed = true;    try {        // 标记中断状态(若在该过程中被中断是不会响应的,须要手动中断)        boolean interrupted = false;        // 自旋        for (;;) {            // 获取前置结点            final Node p = node.predecessor();            // 若前置结点为头结点            if (p == head) {                // 尝试获取同步状态                int r = tryAcquireShared(arg);                // 若获取到同步状态。                if (r >= 0) {                    // 此时,以后结点存储的线程复原执行,须要将以后结点设置为头结点并且向后流传,                    // 告诉合乎唤醒条件的结点一起复原执行                    setHeadAndPropagate(node, r);                    p.next = null;                    // 须要中断,中断以后线程                    if (interrupted)                        selfInterrupt();                    // 获取胜利                    failed = false;                    return;                }            }            // 获取同步状态失败,须要进入阻塞状态            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        // 获取失败,CANCELL node        if (failed)            cancelAcquire(node);    }}// 将node设置为同步队列的头结点,并且向后告诉以后结点的后置结点,实现流传private void setHeadAndPropagate(Node node, int propagate) {    Node h = head;     setHead(node);    // 向后流传    if (propagate > 0 || h == null || h.waitStatus < 0 ||        (h = head) == null || h.waitStatus < 0) {        Node s = node.next;        if(s == null || s.isShared())            doReleaseShared();    }}

与独占式一样,共享式获取也须要开释同步状态,通过调用releaseShared(intarg)办法能够开释同步状态,开释同步状态胜利后,会唤醒后置结点,并且保障流传性。

public final boolean releaseShared(int arg) {    // 尝试开释同步状态    if (tryReleaseShared(arg)) {        // 胜利后唤醒后置结点        doReleaseShared();        return true;    }    return false;}// 唤醒后置结点private void doReleaseShared() {    // 循环的目标是为了避免新结点在该过程中进入同步队列产生的影响,同时要保障CAS操作的实现    for (;;) {        Node h = head;        if (h != null && h != tail) {            int ws = h.waitStatus;            if (ws == Node.SIGNAL) {                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                        continue;                                unparkSuccessor(h);            }            else if (ws == 0 &&                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                continue;                        }        if (h == head)                               break;    }}

2.2.3 超时获取形式

通过调用同步器的doAcquireNanos(int arg, long nanosTimeout)办法能够超时获取同步状态,即在指定的时间段内获取同步状态,如果获取到同步状态则返回true,否则,返回false。该办法提供了传统Java同步操作(比方synchronized关键字)所不具备的个性。

private boolean doAcquireNanos(int arg, long nanosTimeout)        throws InterruptedException {    if (nanosTimeout <= 0L)        return false;    // 计算超时的工夫=以后虚拟机的工夫+设置的超时工夫    final long deadline = System.nanoTime() + nanosTimeout;    // 调用addWaiter将以后线程封装成独占模式的节点,并且退出到同步队列尾部。    final Node node = addWaiter(Node.EXCLUSIVE);    boolean failed = true;    try {        // 自旋        for (;;) {            final Node p = node.predecessor();            if (p == head && tryAcquire(arg)) {                // 如果以后节点的前驱节点为头结点,则让以后节点去尝试获取锁。                setHead(node);                p.next = null;                 failed = false;                return true;            }            // 如果以后节点的前驱节点不是头结点,或以后节点获取锁失败,            // 则再次判断以后线程是否曾经超时。            nanosTimeout = deadline - System.nanoTime();            if (nanosTimeout <= 0L)                return false;            // 调用shouldParkAfterFailedAcquire办法,通知以后节点的前驱节点,马上进入            // 期待状态了,即做好进入期待状态前的筹备。            if (shouldParkAfterFailedAcquire(p, node) &&                nanosTimeout > spinForTimeoutThreshold)                // 调用LockSupport.parkNanos办法,将以后线程设置成超时期待的状态。                LockSupport.parkNanos(this, nanosTimeout);            if (Thread.interrupted())                throw new InterruptedException();        }    } finally {        if (failed)            cancelAcquire(node);    }}

由下面代码可知,超时获取也是调用addWaiter将以后线程封装成独占模式的节点,并且退出到同步队列尾部。

超时获取与独占式获取同步状态区别在于获取同步状态失败后的解决。如果以后线程获取同步状态失败,则判断是否超时(nanosTimeout小于等于0示意曾经超时);如果没有超时,从新计算超时距离nanosTimeout,而后使以后线程期待nanosTimeout纳秒(当已到设置的超时工夫,该线程会从LockSupport.parkNanos(Object blocker, long nanos)办法返回)。

独占式超时获取同步状态流程:

2.3 模板办法

AQS 应用一个 int 成员变量来示意同步状态,通过内置的 FIFO 队列来实现获取资源线程的排队工作。AQS 应用 CAS 对该同步状态进行原子操作实现对其值的批改。

private volatile int state;// 共享变量,应用volatile润饰保障线程可见性

同步状态state通过 protected 类型的getStatesetStatecompareAndSetState办法进行操作

// 返回同步状态的以后值protected final int getState() {        return state;}// 设置同步状态的值protected final void setState(int newState) {        state = newState;}// CAS更新同步状态,该办法可能保障状态设置的原子性protected final boolean compareAndSetState(int expect, int update) {        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);}

同步器的设计是基于模板办法模式的,也就是说,使用者须要继承同步器并重写指定的办法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板办法,而这些模板办法将会调用使用者重写的办法。

自定义同步器时须要重写上面几个 AQS 提供的模板办法:

isHeldExclusively()// 该线程是否正在独占资源。只有用到condition才须要去实现它。tryAcquire(int)// 独占形式。尝试获取资源,胜利则返回true,失败则返回false。tryRelease(int)// 独占形式。尝试开释资源,胜利则返回true,失败则返回false。tryAcquireShared(int)// 共享形式。尝试获取资源。正数示意失败;0示意胜利,但没有残余可用资源;负数示意胜利,且有残余资源。tryReleaseShared(int)// 共享形式。尝试开释资源,胜利则返回true,失败则返回false。

同步器提供的模板办法基本上分为3类:独占式获取与开释同步状态、共享式获取与开释同步状态和查问同步队列中的期待线程状况。

一般来说,自定义同步器要么是独占办法,要么是共享形式,他们也只需实现tryAcquire-tryReleasetryAcquireShared-tryReleaseShared中的一种即可。

以 ReentrantLock 为例,state 初始化为 0,示意未锁定状态。A 线程 lock()时,会调用 tryAcquire()独占该锁并将 state+1。尔后,其余线程再 tryAcquire()时就会失败,直到 A 线程 unlock()到 state=0(即开释锁)为止,其它线程才有机会获取该锁。当然,开释锁之前,A 线程本人是能够反复获取此锁的(state 会累加),这就是可重入的概念。但要留神,获取多少次就要开释如许次,这样能力保障 state 是能回到零态的。

再以 CountDownLatch 以例,工作分为 N 个子线程去执行,state 也初始化为 N(留神 N 要与线程个数统一)。这 N 个子线程是并行执行的,每个子线程执行完后 countDown()一次,state 会 CAS(Compare and Swap)减 1。等到所有子线程都执行完后(即 state=0),会 unpark()主调用线程,而后主调用线程就会从 await()函数返回,持续后续动作。

但 AQS 也反对自定义同步器同时实现独占和共享两种形式,如ReentrantReadWriteLock

上面就来学习几个罕用的并发同步工具。

3 Semaphore(信号量)

Semaphore(信号量)用来管制 同时拜访特定资源的线程数量,它通过协调各个线程,以保障正当的应用公共资源。synchronized 和 ReentrantLock 都是一次只容许一个线程拜访某个资源,而Semaphore(信号量)能够指定多个线程同时拜访某个资源

以停车场为例。假如一个停车场只有10个车位,这时如果同时来了15辆车,则只容许其中10辆不受妨碍的进入。剩下的5辆车则必须在入口期待,尔后来的车也都不得不在入口处期待。这时,如果有5辆车来到停车场,放入5辆;如果又来到2辆,则又能够放入2辆,如此往返。

在这个停车场零碎中,车位即是共享资源,每辆车就好比一个线程,信号量就是空车位的数目。

Semaphore中蕴含了一个实现了AQS的同步器Sync,以及它的两个子类FairSync和NonFairSync。查看Semaphore类构造:

可见Semaphore也是辨别偏心模式和非偏心模式的。

  • 偏心模式: 调用 acquire 的程序就是获取许可证的程序,遵循 FIFO。
  • 非偏心模式: 抢占式的。

Semaphore 对应的两个构造方法如下:

   public Semaphore(int permits) {        sync = new NonfairSync(permits);    }    public Semaphore(int permits, boolean fair) {        sync = fair ? new FairSync(permits) : new NonfairSync(permits);    }

这两个构造方法,都必须提供许可的数量,第二个构造方法能够指定是偏心模式还是非偏心模式,默认非偏心模式。

Semaphore实现原理这里就不剖析了,能够参考死磕 java同步系列之Semaphore源码解析这篇文章。

须要明确的是,Semaphore也是共享锁的一种实现。它默认结构AQS的state为permits。当执行工作的线程数量超出permits,那么多余的线程将会被放入阻塞队列Park,并自旋判断state是否大于0。只有当state大于0的时候,阻塞的线程能力继续执行,此时先前执行工作的线程继续执行release办法,release办法使得state的变量会加1,那么自旋的线程便会判断胜利。如此,每次只有最多不超过permits数量的线程能自旋胜利,便限度了执行工作线程的数量。

Semaphore罕用于做流量管制,特地是专用资源无限的利用场景。

罕用办法形容
acquire()/acquire(int permits)获取许可证。获取许可失败,会进入AQS的队列中排队。
tryAcquire()/tryAcquire(int permits)获取许可证。获取许可失败,间接返回false。
tryAcquire(long timeout, TimeUnit unit)/
tryAcquire(int permits, long timeout, TimeUnit unit)
超时期待获取许可证。
release()偿还许可证。
intavailablePermits()返回此信号量中以后可用的许可证数。
intgetQueueLength()返回正在期待获取许可证的线程数。
booleanhasQueuedThreads()是否有线程正在期待获取许可证。
void reducePermits(int reduction)缩小reduction个许可证,是个protected办法。
Collection getQueuedThreads()返回所有期待获取许可证的线程汇合,是个protected办法。

应用示例:

public class SemaphoreTest {    private static final int THREAD_COUNT = 50;    public static void main(String[] args) throws InterruptedException {        // 创立一个具备固定线程数量的线程池对象        ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);        // 一次只能容许执行的线程数量        final Semaphore semaphore = new Semaphore(10);        for (int i = 0; i < THREAD_COUNT; i++) {            final int threadNum = i;            threadPool.execute(() -> {                try {                    semaphore.acquire();// 获取1个许可,所以可运行线程数量为10/1=10                    test(threadNum);                    semaphore.release();// 开释1个许可,所以可运行线程数量为10/1=10                } catch (InterruptedException e) {                    e.printStackTrace();                }            });        }        threadPool.shutdown();    }    public static void test(int threadNum) throws InterruptedException {        Thread.sleep(1000);// 模仿申请的耗时操作        System.out.println("threadNum:" + threadNum);        Thread.sleep(1000);// 模仿申请的耗时操作    }}

在代码中,尽管有50个线程在执行,然而只容许10个并发执行。Semaphore的构造方法Semaphore(int permits)承受一个整型的数字,示意可用的许可证数量。Semaphore(10)示意容许10个线程获取许可证,也就是最大并发数是10。Semaphore的用法也很简略,首先线程应用Semaphore的acquire()办法获取一个许可证,应用完之后调用release()办法偿还许可证。

除了 acquire办法之外,另一个比拟罕用的与之对应的办法是tryAcquire办法,该办法如果获取不到许可就立刻返回 false。

4 CountDownLatch (倒计时器)

4.1 概述

在日常开发中常常会遇到须要在主线程中开启多个线程去并行执行工作,并且主线程须要期待所有子线程执行结束后再进行汇总的场景。jdk 1.5之前个别都应用线程的join()办法来实现这一点,然而join办法不够灵便,难以满足不同场景的须要,所以jdk 1.5之后concurrent包提供了CountDownLatch这个类。

CountDownLatch是一种同步辅助工具,它容许一个或多个线程期待其余线程实现操作

CountDownLatch是通过一个计数器来实现的,计数器的初始化值为线程的数量。每当一个线程实现了本人的工作后,计数器的值就相应得减1。当计数器达到0时,示意所有的线程都已实现工作,而后在闭锁上期待的线程就能够复原执行工作。

CountDownLatch的办法:

办法形容
await()调用该办法的线程等到构造方法传入的 N 减到 0 的时候,能力持续往下执行。
await(long timeout, TimeUnit unit)调用该办法的线程等到指定的 timeout 工夫后,不论 N 是否减至为 0,都会持续往下执行。
countDown()使 CountDownLatch 初始值 N 减 1。
getCount()获取以后 CountDownLatch 保护的值,也就是AQS的state的值。

CountDownLatch的实现原理,能够查看 【JUC】JDK1.8源码剖析之CountDownLatch(五)一文。

依据源码剖析可知,CountDownLatch是AQS中共享锁的一种实现。AbstractQueuedSynchronizer中保护了一个volatile类型的整数state,volatile能够保障多线程环境下该变量的批改对每个线程都可见,并且因为该属性为整型,因此对该变量的批改也是原子的。

CountDownLatch默认结构 AQS 的 state 值为 count。创立一个CountDownLatch对象时,所传入的整数N就会赋值给state属性。

当调用countDown()办法时,其实是调用了tryReleaseShared办法以CAS的操作来对state减1;而调用await()办法时,以后线程就会判断state属性是否为0。如果为0,阻塞线程被唤醒持续往下执行;如果不为0,则使以后线程放入阻塞队列Park,直至最初一个线程调用了countDown()办法使得state == 0,再唤醒在await()办法中期待的线程。

特地留神的是

CountDownLatch 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 应用结束后,它不能再次被应用。如果须要能重置计数,能够应用CyclicBarrier

4.2 利用场景

CountDownLatch次要利用场景:

  1. 实现最大的并行性:同时启动多个线程,实现最大水平的并行性。例如110跨栏较量中,所有运动员筹备好起跑姿态,进入到准备状态,期待裁判一声枪响。裁判开了枪,所有运动员才能够开跑。
  2. 开始执行前期待N个线程实现各自工作:例如一群学生在教室考试,学生们都实现了作答,老师才能够进行收卷操作。

案例:

public class CountDownLatchTest {    private static final int THREAD_COUNT = 30;    public static void main(String[] args) throws InterruptedException {        ExecutorService threadPool = Executors.newFixedThreadPool(10);        final CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);        for (int i = 0; i < THREAD_COUNT; i++) {            final int threadNum = i;            threadPool.execute(() -> {                try {                    Thread.sleep(1000);// 模仿申请的耗时操作                    System.out.println("子线程:" + threadNum);                    Thread.sleep(1000);// 模仿申请的耗时操作                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    countDownLatch.countDown();// 示意一个申请曾经被实现                }            });        }        System.out.println("主线程启动...");        countDownLatch.await();        threadPool.shutdown();        System.out.println("子线程执行结束...");        System.out.println("主线程执行结束...");    }}

下面的代码中,咱们定义了申请的数量为30,当这 30 个申请被解决实现之后,才会打印子线程执行结束

主线程在启动其余线程后调用 CountDownLatch.await() 办法进入阻塞状态,直到其余线程实现各自的工作才被唤醒。

开启的30个线程必须援用闭锁对象,因为他们须要告诉 CountDownLatch 对象,他们曾经实现了各自的工作。这种告诉机制是通过 CountDownLatch.countDown()办法来实现的;每调用一次这个办法,在构造函数中初始化的 count 值就减 1。所以当30个线程都调用了这个办法后,count 的值才等于0,而后主线程就能通过 await()办法,继续执行本人的工作。

5 CyclicBarrier(循环栅栏)

5.1 概述

CyclicBarrier的字面意思是可循环应用(Cyclic)的屏障(Barrier)。它要做的事件是,让一组线程达到一个屏障(也能够叫同步点)时被阻塞,直到最初一个线程达到屏障时,屏障才会开门,所有被屏障拦挡的线程才会持续运行。CyclicBarrier 的性能和利用场景与CountDownLatch都十分相似。

CyclicBarrier罕用办法:

罕用办法形容
await()在所有线程都曾经在此 barrier上并调用 await 办法之前,将始终期待。
await(long timeout, TimeUnit unit)所有线程都曾经在此屏障上调用 await 办法之前将始终期待,或者超出了指定的等待时间。
getNumberWaiting()返回以后在屏障处期待的线程数目。
getParties()返回要求启动此 barrier 的线程数目。
isBroken()查问此屏障是否处于损坏状态。
reset()将屏障重置为其初始状态。

5.2 源码剖析

构造函数:

// 创立一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于期待状态时启动,但它不会在启动 barrier 时执行预约义的操作。public CyclicBarrier(int parties) {    this(parties, null);}// 创立一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于期待状态时启动,并在启动 barrier 时执行给定的屏障操作。// 该操作由最初一个进入 barrier 的线程执行。public CyclicBarrier(int parties, Runnable barrierAction) {    if (parties <= 0) throw new IllegalArgumentException();    this.parties = parties;    this.count = parties;    this.barrierCommand = barrierAction;}

其中,parties 就示意屏障拦挡的线程数量

CyclicBarrier 的最重要的办法就是 await 办法,await() 办法就像建立起一个栅栏的行为一样,将线程挡住了,当拦住的线程数量达到 parties 的值时,栅栏才会关上,线程才得以通过执行

    public int await() throws InterruptedException, BrokenBarrierException {        try {            return dowait(false, 0L);        } catch (TimeoutException toe) {            throw new Error(toe);        }    }

当调用 await() 办法时,实际上调用的是dowait(false, 0L)办法。查看dowait(boolean timed, long nanos)

    // 当线程数量或者申请数量达到 count 时 await 之后的办法才会被执行。    private int count;    private int dowait(boolean timed, long nanos)        throws InterruptedException, BrokenBarrierException,               TimeoutException {        final ReentrantLock lock = this.lock;        // 获取”独占锁“        lock.lock();        try {            // 保留“以后的generation”            final Generation g = generation;            // 如果以后代损坏,抛出异样            if (g.broken)                throw new BrokenBarrierException();            // 如果线程中断,抛出异样            if (Thread.interrupted()) {                // 将损坏状态设置为 true,并唤醒所有阻塞在此栅栏上的线程                breakBarrier();                throw new InterruptedException();            }            // 将“count计数器”-1            int index = --count;            // 当 count== 0,阐明最初一个线程曾经达到栅栏            if (index == 0) {                boolean ranAction = false;                try {                    final Runnable command = barrierCommand;                    // 执行栅栏工作                    if (command != null)                        command.run();                    ranAction = true;                    // 将 count 重置为 parties 属性的初始化值                    // 唤醒之前期待的线程,并更新generation。                    nextGeneration();                    // 完结,等价于return index                    return 0;                } finally {                    if (!ranAction)                        breakBarrier();                }            }            // 以后线程始终阻塞,直到“有parties个线程达到barrier” 或 “以后线程被中断” 或 “超时”这3者条件之一产生            // 以后线程才继续执行。            for (;;) {                try {                    // 如果没有工夫限度,则间接期待,直到被唤醒。                    if (!timed)                        trip.await();                    // 如果有工夫限度,则期待指定工夫再唤醒(超时期待)。                    else if (nanos > 0L)                        nanos = trip.awaitNanos(nanos);                } catch (InterruptedException ie) {                    // 以后代没有损坏                    if (g == generation && ! g.broken) {                        // 让栅栏生效                        breakBarrier();                        throw ie;                    } else {                        // 下面条件不满足,阐明这个线程不是这代的。                        // 就不会影响以后这代栅栏执行逻辑。中断。                        Thread.currentThread().interrupt();                    }                }                // 如果“以后generation曾经损坏”,则抛出异样。                if (g.broken)                    throw new BrokenBarrierException();                  // 如果“generation曾经换代”,则返回index。                if (g != generation)                    return index;                 // 如果是“超时期待”,并且工夫已到,则通过breakBarrier()终止CyclicBarrier                // 唤醒CyclicBarrier中所有期待线程。                if (timed && nanos <= 0L) {                    breakBarrier();                    throw new TimeoutException();                }            }        } finally {            // 开释“独占锁(lock)”            lock.unlock();        }    }

generation是CyclicBarrier的一个成员变量:

   /**     * Generation一代的意思。     * CyclicBarrier是能够循环应用的,用它来标记本代和下一代。     * broken:以后代是否损坏的标记。标记有线程产生了中断,或者异样,就是工作没有实现。     */    private static class Generation {        boolean broken = false;    }    // 实现独占锁    private final ReentrantLock lock = new ReentrantLock();    // 实现多个线程之间互相期待告诉,就是满足某些条件之后,线程能力执行,否则就期待    private final Condition trip = lock.newCondition();    // 初始化时屏障数量     private final int parties;    // 当条件满足(即屏障数量为0)之后,会回调这个Runnable    private final Runnable barrierCommand;    //以后代    private Generation generation = new Generation();    // 残余的屏障数量count    private int count;

在CyclicBarrier中,同一批的线程属于同一代,即同一个generation;CyclicBarrier中通过generation对象,记录属于哪一代。
当有parties个线程达到barrier,generation就会被更新换代。

总结

  1. CyclicBarrier 外部通过一个 count 变量作为计数器,cout 的初始值为 parties 属性的初始化值,每当一个线程到了栅栏,那么就将计数器减1。如果 count 值为 0 了,示意这是这一代最初一个线程达到栅栏,就会将代更新并重置计数器,并唤醒所有之前期待在栅栏上的线程。
  2. 如果在期待的过程中,线程中断都也会抛出BrokenBarrierException异样,并且这个异样会流传到其余所有的线程,CyclicBarrier会被损坏。
  3. 如果超出指定的等待时间,以后线程会抛出 TimeoutException 异样,其余线程会抛出BrokenBarrierException异样,CyclicBarrier会被损坏。

5.3 利用场景

CyclicBarrier 能够用于多线程计算数据,最初合并计算结果的利用场景。比方咱们用一个 Excel 保留了用户所有银行流水,每个 Sheet 保留一个帐户近一年的每笔银行流水,当初须要统计用户的日均银行流水,先用多线程解决每个 sheet 里的银行流水,都执行完之后,失去每个 sheet 的日均银行流水,最初,再用 barrierAction 用这些线程的计算结果,计算出整个 Excel 的日均银行流水。

示例 1:

public class CyclicBarrierTest1 {    private static final int THREAD_COUNT = 30;    // 须要同步的线程数量    private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);    public static void main(String[] args) throws InterruptedException {        // 创立线程池        ExecutorService threadPool = Executors.newFixedThreadPool(10);        for (int i = 0; i < THREAD_COUNT; i++) {            final int threadNum = i;            Thread.sleep(1000);            threadPool.execute(() -> {                System.out.println("childThread:" + threadNum + " is ready");                try {                    // 期待60秒,保障子线程齐全执行完结                    cyclicBarrier.await(60, TimeUnit.SECONDS);                } catch (InterruptedException e) {                    e.printStackTrace();                } catch (BrokenBarrierException e) {                    e.printStackTrace();                } catch (TimeoutException e) {                    e.printStackTrace();                }                System.out.println("childThread:" + threadNum + " is finish");            });        }        threadPool.shutdown();    }}

运行后果如下:

childThread:0 is readychildThread:1 is readychildThread:2 is readychildThread:3 is readychildThread:4 is readychildThread:4 is finishchildThread:0 is finishchildThread:1 is finishchildThread:3 is finishchildThread:2 is finishchildThread:5 is readychildThread:6 is readychildThread:7 is readychildThread:8 is readychildThread:9 is readychildThread:9 is finishchildThread:8 is finish... ...

能够看到当线程数量也就是申请数量达定义的 5 个的时候, await办法之后的办法才被执行。

另外,CyclicBarrier 还提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程达到屏障时,优先执行barrierAction,不便解决更简单的业务场景。示例代码如下:

public class CyclicBarrierTest2 {    private static final int THREAD_COUNT = 30;    // 须要同步的线程数量    private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {        System.out.println("------优先执行------");    });    public static void main(String[] args) throws InterruptedException {        // 创立线程池        ExecutorService threadPool = Executors.newFixedThreadPool(10);        for (int i = 0; i < THREAD_COUNT; i++) {            final int threadNum = i;            Thread.sleep(1000);            threadPool.execute(() -> {                System.out.println("childThread:" + threadNum + " is ready");                try {                    // 期待60秒,保障子线程齐全执行完结                    cyclicBarrier.await(60, TimeUnit.SECONDS);                } catch (InterruptedException e) {                    e.printStackTrace();                } catch (BrokenBarrierException e) {                    e.printStackTrace();                } catch (TimeoutException e) {                    e.printStackTrace();                }                System.out.println("childThread:" + threadNum + " is finish");            });        }        threadPool.shutdown();    }}

运行后果如下:

childThread:0 is readychildThread:1 is readychildThread:2 is readychildThread:3 is readychildThread:4 is ready------优先执行------childThread:4 is finishchildThread:0 is finishchildThread:1 is finishchildThread:3 is finishchildThread:2 is finishchildThread:5 is readychildThread:6 is readychildThread:7 is readychildThread:8 is readychildThread:9 is ready------优先执行------childThread:9 is finishchildThread:6 is finish... ...

5.4 CyclicBarrier和CountDownLatch的区别

  1. CountDownLatch的计数器只能应用一次。而CyclicBarrier的计数器能够应用reset()办法重置,可屡次应用。
  2. 侧重点不同。CountDownLatch多用于某一个线程期待若干个其余线程执行完工作之后,它才执行;而CyclicBarrier个别用于多个线程相互期待至一个同步点,而后这些线程再持续一起执行。
  3. CyclicBarrier还提供其余有用的办法,比方getNumberWaiting办法能够取得Cyclic-Barrier阻塞的线程数量;isBroken()办法用来理解阻塞的线程是否被中断。

参考

JAVA并发编程的艺术

AQS原理学习笔记

【JUC】JDK1.8源码剖析之AbstractQueuedSynchronizer(二)

死磕 java同步系列之Semaphore源码解析

【JUC】JDK1.8源码剖析之CountDownLatch(五)

并发编程之 CyclicBarrier 源码剖析

Java并发之CyclicBarrier