共计 5051 个字符,预计需要花费 13 分钟才能阅读完成。
前言
咱们晓得,CountDownLatch 的计数器是一次性的,它不能重置。也就是说,当 count 值变为 0 时,再调用 await()办法会立刻返回,不会阻塞。
本文要说的 CyclicBarrier 就是一种能够重置计数器的线程同步工具类。CyclicBarrier 字面意思是“回环屏障”,它能够让一组线程全副达到一个状态后再全副同时往下执行。之所以叫回环是因为当所有线程执行结束,并重置 CyclicBarrier 的状态后它能够被重用。而之所以叫屏障是因为当某个线程调用 await 办法后就会被阻塞,这个阻塞点就称为屏障,等其余所有线程都调用了 await 办法后,这组线程就会一起冲破屏障,并往下执行。
应用场景
两个子工作别离执行本人的工作,等它们都执行完后,主工作汇总子工作的后果,并做一些解决,解决实现后两个子工作又持续做其余事件。示例代码:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
try {System.out.println("main task merge subtask result begin");
// simulate merge work
Thread.sleep(5000);
System.out.println("main task merge subtask result finished");
} catch (InterruptedException e) {// ignore}
});
public static void main(String[] args) {Thread thread1 = new Thread(() -> {
try {Thread.sleep(4000);
System.out.println("thread1 finished its work");
cyclicBarrier.await();
System.out.println("thread1 continue work");
} catch (InterruptedException | BrokenBarrierException e) {// ignore}
});
Thread thread2 = new Thread(() -> {
try {Thread.sleep(5000);
System.out.println("thread2 finished its work");
cyclicBarrier.await();
System.out.println("thread2 continue work");
} catch (InterruptedException | BrokenBarrierException e) {// ignore}
});
thread1.start();
thread2.start();}
}
输入后果:
能够看到,线程 1 和线程 2 调用 await()时,会被阻塞,等主线程工作实现后,线程 1 和线程 2 就会冲破屏障,持续往下执行。这里的主线程合并工作是可选的,也就是说能够间接 new CyclicBarric(int parties),这种状况下就没有达到屏障后的合并工作,会间接在全副线程达到屏障后同时冲破屏障往下执行。能够比喻成举办同学聚会的场景。有 20 集体加入团聚,第 1 集体达到集合地点后要等其他人,第 2 个,第 3 个,… 第 19 集体也须要等,当最初一个人到的时候,全副的 20 集体就能够登程去嗨皮了。
下面介绍的是“屏障”的利用场景,再来看个“回环”的利用场景。
假如一个工作由阶段 1,阶段 2,阶段 3 这三个阶段组成,每个线程都串行的顺次执行阶段 1,2,3。当多个线程执行工作时,必须保障等所有线程都执行完阶段 1 后,能力执行阶段 2,同样地,也必须保障所有线程都执行完阶段 2 后,能力执行阶段 3。示例代码:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo2 {private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
public static void main(String[] args) {Thread thread1 = new Thread(() -> {
try {System.out.println("thread1 step 1");
cyclicBarrier.await();
System.out.println("thread1 step 2");
cyclicBarrier.await();
System.out.println("thread1 step 3");
} catch (InterruptedException | BrokenBarrierException e) {// ignore}
});
Thread thread2 = new Thread(() -> {
try {System.out.println("thread2 step 1");
cyclicBarrier.await();
System.out.println("thread2 step 2");
cyclicBarrier.await();
System.out.println("thread2 step 3");
} catch (InterruptedException | BrokenBarrierException e) {// ignore}
});
thread1.start();
thread2.start();}
}
输入后果如下:
能够看到,实现了这种同阶段期待的成果。
实现原理
先来看看重要属性:
private static class Generation {
// 屏障是否被突破
boolean broken = false;
}
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
/**
* Number of parties still waiting. Counts down from parties to 0 on each generation.
* It is reset to parties on each new generation or when broken.
*/
private int count;
能够看到,CyclicBarrier 里用了独占锁 ReentrantLock 实现多线程间的计数器同步,parties 示意当多少个线程达到屏障后,冲破屏障往下执行,而 count 示意以后还残余多少个线程还未达到屏障,当所有线程都冲破屏障后,它又会在新一轮 (new generation) 被重置为 parties 的值。也就是说,count 和 Generation 是用来实现重置成果的。
再看看构造方法的属性赋值:
public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
再来看看要害办法:
await()
public int await() throws InterruptedException, BrokenBarrierException {
try {
// false 示意不设置超时
return dowait(false, 0L);
} catch (TimeoutException toe) {throw new Error(toe); // cannot happen
}
}
dowait()办法代码如下:
// timed:是否超时期待, nanos:超时工夫
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {breakBarrier();
throw new InterruptedException();}
int index = --count;
// 如果 index 为 0,示意所有线程都已达到了屏障,此时去执行初始化时设定的 barrierCommand(如果有的话)if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 唤醒其余线程,并重置进行下一轮
nextGeneration();
// 返回
return 0;
} finally {if (!ranAction)
breakBarrier();}
}
// 否则须要等其余线程都达到屏障
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
// 辨别超时期待与不超时期待
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {if (g == generation && ! g.broken) {breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to // "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
// g != generation 阐明被唤醒后已重置了轮次,阐明所有线程均已达到线程屏障,能够返回了。if (g != generation)
return index;
// 期待超时,抛出超时异样
if (timed && nanos <= 0L) {breakBarrier();
throw new TimeoutException();}
}
} finally {lock.unlock();
}
}
其中,nextGeneration()办法如下:
private void nextGeneration() {
// signal completion of last generation
// 唤醒期待在 trip 条件 (即屏障) 上的其余所有线程
trip.signalAll();
// set up next generation
// 重置 count 的值为初始值 parties
count = parties;
// 重置以后轮次
generation = new Generation();}
参考资料:
《Java 并发编程之美》