前言

看完 CountDownLatch 正筹备示意一番,忽然看到了一个 CyclicBarrier —— 回环屏障。沃特?回环还屏障?说比 CountDownLatch 要多一个回环,那咱可得瞧一瞧,看一看了!

公众号:『 刘志航 』,记录工作学习中的技术、开发及源码笔记;时不时分享一些生存中的见闻感悟。欢送大佬来领导!

介绍

一个同步辅助,它容许一组线程的所有期待彼此达成独特屏障点。

CyclicBarrier 在波及固定线程数且必须期待彼此的程序十分有用。

该屏障被称为回环屏障 ,因为它在期待的线程被开释后能够被从新利用。

CyclicBarrier 反对一个可选的 Runnable 命令,该命令在阻碍中的最初一个线程达到之后,但在开释任何线程之前,每个屏障点运行一次。

此屏障操作对于在任何一方持续之前更新共享状态很有用。

通过下面的源码正文根本能够得出以下论断:

  1. CyclicBarrier 和 CountDownLatch 相似,但它是一组线程期待,直到在其余线程中执行的一组操作实现为止。
  2. CountDownLatch 是计数递加,完结后再调用 await 或者 countdown 都会立刻返回,然而 CyclicBarrier 能够重置屏障。
  3. CyclicBarrier 还能够传入参数 Runnable ,Runnable 会在开释线程之前执行。

根本应用

既然下面总结了三个论断,上面当然从三个方面演示如何应用的:

- 屏障性能

public class CyclicBarrierTest {    private static final CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(11);    public static void main(String[] args) throws BrokenBarrierException, InterruptedException {        ExecutorService pool = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,                new LinkedBlockingQueue<>(1024),                new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(),                new ThreadPoolExecutor.AbortPolicy());        for (int i = 0; i < 10; i++) {            pool.submit(() -> {                try {                    System.out.println(Thread.currentThread().getName() + " 开始执行");                    Thread.sleep(5000);                    System.out.println(Thread.currentThread().getName() + " 执行完结,筹备调用 await");                    CYCLIC_BARRIER.await();                } catch (InterruptedException | BrokenBarrierException e) {                    e.printStackTrace();                }            });        }        System.out.println("主线程执行 —————————————— >>>");        CYCLIC_BARRIER.await();        System.out.println("主线程继续执行 —————————————— >>>");        pool.shutdown();    }}

通过下面代码其实模仿了个相似 CountDownLatch 的性能,让所有线程期待,直到都调用 await 之后,各个线程继续执行,同时主线程也持续往下执行。

不过绝对 CountDownLatch 的指定一个线程或多个期待,直到其余线程执行完结,期待的线程才继续执行来说,CyclicBarrier 相对来说还是逊色。

差异总结如下:

  1. CountDownLatch 是指定期待的线程,其余线程进行 countDown,等计数为 0 时,期待的线程继续执行。
  2. CyclicBarrier 是一组线程调用 await 进行期待,当所有的都进入期待的时候,这一组就会一起冲破屏障继续执行。

- 回环性能

public class CyclicBarrierTest2 {    private static final CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(5);    public static void main(String[] args) throws BrokenBarrierException, InterruptedException {                ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,                new LinkedBlockingQueue<>(1024),                new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(),                new ThreadPoolExecutor.AbortPolicy());        for (int i = 0; i < 5; i++) {            pool.submit(() -> {                try {                    System.out.println(Thread.currentThread().getName() + " 开始执行");                    CYCLIC_BARRIER.await();                    System.out.println(Thread.currentThread().getName() + " 冲破屏障 >>> 1");                    CYCLIC_BARRIER.await();                    System.out.println(Thread.currentThread().getName() + " 冲破屏障 >>>>> 2");                    CYCLIC_BARRIER.await();                } catch (InterruptedException | BrokenBarrierException e) {                    e.printStackTrace();                }            });        }        pool.shutdown();    }}

下面演示的回环的用法。

- 回环 Runnable

这块只须要在申明的 CyclicBarrier 批改为以下即可:

private static final CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(5, new Runnable() {    @Override    public void run() {        System.out.println("执行一次 Runnable ");    }});

打印后果如下:

能够看出只是在下一个计数开始之前,先执行 Runnable 。至于是不是在开释屏障之前,那很容易,间接 Debug 走一遭就晓得了!专门录制了个视频:

通过 debug 能够看出Runnable 会在开释线程之前执行

问题疑难?

  1. CyclicBarrier 和 AQS 有什么关系?
  2. CyclicBarrier 的实现原理是什么?
  3. CyclicBarrier 是如何实现回环的?

上面就带着疑难去源码浏览,一探到底!

源码剖析

根本构造

通过 UML 乍一看,CyclicBarrier 和 AQS 并无什么关系,那上面开始从参数结构器await()办法别离看源码。

参数

public class CyclicBarrier {    /**     * 屏障的每次应用都示意为一个生成实例。     * broken 示意屏障是否被突破。     */    private static class Generation {        boolean broken = false;    }    /** 锁 */    private final ReentrantLock lock = new ReentrantLock();    /** 条件期待,直到屏障 */    private final Condition trip = lock.newCondition();    /** 期待计数 */    private final int parties;    /* The command to run when tripped */    private final Runnable barrierCommand;    /** 以后 generation 新创建的*/    private Generation generation = new Generation();    /** 仍在期待的 parties 数量,递加 为 0 会重置 */    private int count; }

通过下面能够看出:

外部应用了一个动态类 Generation ,它有什么性能呢?通过正文理解到,每次应用屏障的时候都会生成,具体有什么用,其实就是用来标示屏障是否被突破。

外部还有一个 parties 示意期待计数,count 示意仍在期待的计数。

那就持续往下看吧!

结构器

public CyclicBarrier(int parties, Runnable barrierAction) {    if (parties <= 0) throw new IllegalArgumentException();    this.parties = parties;    this.count = parties;    this.barrierCommand = barrierAction;}

这里的入参有两个:

  • parties(期待计数):记录多少个线程调用 await 之后,才会一起突破屏障。
  • barrierAction:冲破屏障前执行的行为。
  • 然而会同时对 parties 和 count 赋值为传入的 parties。

单参数结构,其实就是将 barrierAction 赋值为 null。

await() 办法

在示例中用的 await() 办法, 那就从 await() 办法动手:

public int await() throws InterruptedException, BrokenBarrierException {    try {        return dowait(false, 0L);    } catch (TimeoutException toe) {        throw new Error(toe); // cannot happen    }}

await() 才是重头戏, 先来依据源码正文,理解是干嘛的,看看作者怎么讲:

  1. 等到所有各方都在此阻碍上调用await。
  2. 如果以后线程不是最初达到的线程,则出于线程调度目标将其禁用,并使其处于休眠状态,直到产生以下状况之一:

    1. 最初一个线程达到;
    2. 其余一些线程中断以后线程;
    3. 其余一些线程中断其余正在期待的线程之一;
    4. 期待屏障的时候其余线程超时;
    5. 其余一些线程在此屏障上调用 reset。

看到这些,咱们最想看的当然是 2.1 ,期待最初一个线程达到屏障,之后所有的线程一起继续执行。

private int dowait(boolean timed, long nanos)    throws InterruptedException, BrokenBarrierException,            TimeoutException {        // 加锁    final ReentrantLock lock = this.lock;    lock.lock();    try {        // 在这里用到了这个代        final Generation g = generation;        if (g.broken)            throw new BrokenBarrierException();        // 线程终中断标示        if (Thread.interrupted()) {            breakBarrier();            throw new InterruptedException();        }        // 对计数进行递加        int index = --count;        // 如果是 0 则        if (index == 0) {  // tripped            boolean ranAction = false;            try {                final Runnable command = barrierCommand;                // 不是 null 先执行行为                if (command != null)                    // 这里不是新开线程                    command.run();                ranAction = true;                // 下一代                nextGeneration();                return 0;            } finally {                // 工作未胜利时,即 ranAction 还是 false 突破屏障                if (!ranAction)                    breakBarrier();            }        }        // loop until tripped, broken, interrupted, or timed out        // 自旋        for (;;) {            try {                // 没有设置超时工夫                if (!timed)                // 进入期待                    trip.await();                else if (nanos > 0L)                    nanos = trip.awaitNanos(nanos);            } catch (InterruptedException ie) {                if (g == generation && ! g.broken) {                    breakBarrier();                    throw ie;                } else {                    Thread.currentThread().interrupt();                }            }            if (g.broken)                throw new BrokenBarrierException();            // 曾经下一代了            if (g != generation)                return index;            if (timed && nanos <= 0L) {                breakBarrier();                throw new TimeoutException();            }        }    } finally {        lock.unlock();    }}

这一大坨代码,齐全没有看的欲望,间接划过去吧!

所以…… 间接看到了这里吧。

代码还是要浏览的,离开来看(异样流程省略):

  1. 应用了 ReentrantLock 互斥锁,因而对 count、broken 的批改是原子性的。
  2. 对 count 进行 --count 操作,这样就了解为什么说 count 是仍在期待的计数,或者说还有多少能力达到屏障点。
  3. 当 count 为 0 ,示意达到屏障点了

    1. command 不为 null,会先执行 command.run(), 值得注意的是这里并不是新开了个线程。
    2. nextGeneration()开始新的下一代,即重置 count 为 parties。
    3. 在 finally 外面应用 breakBarrier() 突破屏障。
  4. 当 count 不是 0

    1. 自旋,直到是 0.

这前面还有两个办法不能少:

private void nextGeneration() {    // 唤醒线程    trip.signalAll();    // 更新 count 为 parties    count = parties;    // 更新 Generation    generation = new Generation();}
// 突破屏障,并唤醒全副private void breakBarrier() {    generation.broken = true;    count = parties;    trip.signalAll();}

reset()

public void reset() {    final ReentrantLock lock = this.lock;    lock.lock();    try {        breakBarrier();   // break the current generation        nextGeneration(); // start a new generation    } finally {        lock.unlock();    }}

将屏障重置为其初始状态,reset() 办法其实还是调用的 breakBarrier() 和 nextGeneration(),前者时突破以后代,后者是开始新的一轮。

总结

Q: CyclicBarrier 和 AQS 有什么关系?
A: 通过浏览源码,其实发现是应用了 ReentrantLock 互斥锁 以及 Condition 的期待唤醒性能。

Q: CyclicBarrier 的实现原理是什么?
A: 外部含有两个计数,别离是 parties 和 count ,初始是二者相等,当有线程调用 await() 时,count 递加,只有 count 不为 0 , 就会阻塞线程,直到 count 递加为 0 时,此时会所有线程一起开释,同时将 count 重置为 parties。

Q: CyclicBarrier 是如何实现回环的?
A: 应用两个计数,count 递加,当 count 为 0 时,会重置为 parties,从而达到回环成果。

Q: 为什么 count 的 --count 操作没有应用 CAS?
A: 因为曾经 lock.lock() 了,应用了 ReentrantLock 锁可能保障 count 的原子性。

CyclicBarrier 和 CountDownLatch 的区别

  1. 回环:CyclicBarrier 能够回环,从新计数。CountDownLatch 只能一轮。
  2. 计数器:CyclicBarrier 的计数器本人保护递加, CountDownLatch 的计数器保护则是交给使用者。
  3. 阻塞线程:CyclicBarrier 阻塞的是本身,当达到屏障后,所有被阻塞的线程一起开释。CountDownLatch 能够指定阻塞线程。

结束语

本文次要介绍了 CyclicBarrier 的罕用形式,通过源码形式,剖析如何达到屏障以及回环的成果。不对之处,请多斧正。