前言

咱们晓得,CountDownLatch的计数器是一次性的,它不能重置。也就是说,当count值变为0时,再调用await()办法会立刻返回,不会阻塞。
本文要说的CyclicBarrier就是一种能够重置计数器的线程同步工具类。CyclicBarrier字面意思是“回环屏障”,它能够让一组线程全副达到一个状态后再全副同时往下执行。之所以叫回环是因为当所有线程执行结束,并重置CyclicBarrier的状态后它能够被重用。而之所以叫屏障是因为当某个线程调用await办法后就会被阻塞,这个阻塞点就称为屏障,等其余所有线程都调用了await办法后,这组线程就会一起冲破屏障,并往下执行。

应用场景

两个子工作别离执行本人的工作,等它们都执行完后,主工作汇总子工作的后果,并做一些解决,解决实现后两个子工作又持续做其余事件。示例代码:

import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class CyclicBarrierDemo {    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {        try {            System.out.println("main task merge subtask result begin");            // simulate merge work            Thread.sleep(5000);            System.out.println("main task merge subtask result finished");        } catch (InterruptedException e) {            // ignore        }    });    public static void main(String[] args) {        Thread thread1 = new Thread(() -> {            try {                Thread.sleep(4000);                System.out.println("thread1 finished its work");                cyclicBarrier.await();                System.out.println("thread1 continue work");            } catch (InterruptedException | BrokenBarrierException e) {                // ignore            }        });        Thread thread2 = new Thread(() -> {            try {                Thread.sleep(5000);                System.out.println("thread2 finished its work");                cyclicBarrier.await();                System.out.println("thread2 continue work");            } catch (InterruptedException | BrokenBarrierException e) {                // ignore            }        });        thread1.start();        thread2.start();    }}

输入后果:

能够看到,线程1和线程2调用await()时,会被阻塞,等主线程工作实现后,线程1和线程2就会冲破屏障,持续往下执行。这里的主线程合并工作是可选的,也就是说能够间接new CyclicBarric(int parties),这种状况下就没有达到屏障后的合并工作,会间接在全副线程达到屏障后同时冲破屏障往下执行。能够比喻成举办同学聚会的场景。有20集体加入团聚,第1集体达到集合地点后要等其他人,第2个,第3个,...第19集体也须要等,当最初一个人到的时候,全副的20集体就能够登程去嗨皮了。

下面介绍的是“屏障”的利用场景,再来看个“回环”的利用场景。

假如一个工作由阶段1,阶段2,阶段3这三个阶段组成,每个线程都串行的顺次执行阶段1,2,3。当多个线程执行工作时,必须保障等所有线程都执行完阶段1后,能力执行阶段2,同样地,也必须保障所有线程都执行完阶段2后,能力执行阶段3。示例代码:

import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class CyclicBarrierDemo2 {    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);    public static void main(String[] args) {        Thread thread1 = new Thread(() -> {            try {                System.out.println("thread1 step 1");                cyclicBarrier.await();                System.out.println("thread1 step 2");                cyclicBarrier.await();                System.out.println("thread1 step 3");            } catch (InterruptedException | BrokenBarrierException e) {                // ignore            }        });        Thread thread2 = new Thread(() -> {            try {                System.out.println("thread2 step 1");                cyclicBarrier.await();                System.out.println("thread2 step 2");                cyclicBarrier.await();                System.out.println("thread2 step 3");            } catch (InterruptedException | BrokenBarrierException e) {                // ignore            }        });        thread1.start();        thread2.start(); }}

输入后果如下:

能够看到,实现了这种同阶段期待的成果。

实现原理

先来看看重要属性:

private static class Generation {    // 屏障是否被突破    boolean broken = false;}/** The lock for guarding barrier entry */private final ReentrantLock lock = new ReentrantLock();/** Condition to wait on until tripped */private final Condition trip = lock.newCondition();/** The number of parties */private final int parties;/* The command to run when tripped */private final Runnable barrierCommand;/** The current generation */private Generation generation = new Generation();/** * Number of parties still waiting. Counts down from parties to 0 on each generation. * It is reset to parties on each new generation or when broken.  */private int count;

能够看到,CyclicBarrier里用了独占锁ReentrantLock实现多线程间的计数器同步,parties示意当多少个线程达到屏障后,冲破屏障往下执行,而count示意以后还残余多少个线程还未达到屏障,当所有线程都冲破屏障后,它又会在新一轮(new generation)被重置为parties的值。也就是说,count和Generation是用来实现重置成果的。

再看看构造方法的属性赋值:

public CyclicBarrier(int parties, Runnable barrierAction) {    if (parties <= 0) throw new IllegalArgumentException();    this.parties = parties;    this.count = parties;    this.barrierCommand = barrierAction;}

再来看看要害办法:
await()

public int await() throws InterruptedException, BrokenBarrierException {    try {        // false示意不设置超时        return dowait(false, 0L);    } catch (TimeoutException toe) {        throw new Error(toe); // cannot happen    }}

dowait()办法代码如下:

// timed:是否超时期待, nanos:超时工夫private int dowait(boolean timed, long nanos)    throws InterruptedException, BrokenBarrierException, TimeoutException {    final ReentrantLock lock = this.lock;    lock.lock();    try {        final Generation g = generation;        if (g.broken)               throw new BrokenBarrierException();        if (Thread.interrupted()) {            breakBarrier();            throw new InterruptedException();        }        int index = --count;        // 如果index为0,示意所有线程都已达到了屏障,此时去执行初始化时设定的barrierCommand(如果有的话)        if (index == 0) {  // tripped           boolean ranAction = false;           try {               final Runnable command = barrierCommand;               if (command != null)                   command.run();               ranAction = true;               // 唤醒其余线程,并重置进行下一轮               nextGeneration();               // 返回               return 0;           } finally {               if (!ranAction)                   breakBarrier();           }        }        // 否则须要等其余线程都达到屏障        // loop until tripped, broken, interrupted, or timed out        for (;;) {             try {                 // 辨别超时期待与不超时期待                 if (!timed)                    trip.await();                 else if (nanos > 0L)                    nanos = trip.awaitNanos(nanos);             } catch (InterruptedException ie) {                   if (g == generation && ! g.broken) {                       breakBarrier();                       throw ie;                   } else {                    // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution.                        Thread.currentThread().interrupt();                   }            }            if (g.broken)                throw new BrokenBarrierException();            // g != generation 阐明被唤醒后已重置了轮次,阐明所有线程均已达到线程屏障,能够返回了。            if (g != generation)                return index;            // 期待超时,抛出超时异样                if (timed && nanos <= 0L) {                breakBarrier();                throw new TimeoutException();            }        }    } finally {        lock.unlock();    }}

其中,nextGeneration()办法如下:

private void nextGeneration() {    // signal completion of last generation    // 唤醒期待在trip条件(即屏障)上的其余所有线程    trip.signalAll();    // set up next generation    // 重置count的值为初始值parties    count = parties;    // 重置以后轮次    generation = new Generation();}

参考资料:
《Java并发编程之美》