共计 3642 个字符,预计需要花费 10 分钟才能阅读完成。
CyclicBarrier
CyclicBarrier 也是一个同步辅助类,它允许一组线程相互等待直到到达某个工作屏障点,通过他可以完成多线程之间的相互等待。每个线程都就绪之后才能执行后面的操作。和 CountLatch 有相似的地方都是通过计数器来实现的。当某个线程执行了 await()方法后就进入等待状态,计数器进行加 1 操作,当增加后的值达到我们设定的值后,线程被唤醒,继续执行后续操作。CyclicBarrier 是可重用的计数器,CyclicBarrier 的使用场景和 CountDownLatch 的使用场景很相似,可以用于多线程计算数据最后总计结果。
CyclicBarrier 和 CountDownLatch 的使用区别:
- CountDownLatch 的计数器只能使用一次,CyclicBarrier 可以用 reset 方法重置。
- CountDownLatch 是一个线程等待其他线程完成某个操作后才能继续执行。也就是一个或个多线程等待其他的关系,而 CyclicBarrier 是实现了多个线程之间的相互等待,所有线程都满足了条件之后才能继续使用。
演示代码
@Slf4j
public class CyclicBarrierExample1 {private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {race(threadNum);
} catch (Exception e) {log.error("exception", e);
}
});
}
executor.shutdown();}
private static void race(int threadNum) throws Exception {Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
输出结果如下:
20:43:46.324 [pool-1-thread-1] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 0 is ready
20:43:47.322 [pool-1-thread-2] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 1 is ready
20:43:48.323 [pool-1-thread-3] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 2 is ready
20:43:49.323 [pool-1-thread-4] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 3 is ready
20:43:50.325 [pool-1-thread-5] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 4 is ready
20:43:50.325 [pool-1-thread-5] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 4 continue
20:43:50.325 [pool-1-thread-1] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 0 continue
20:43:50.325 [pool-1-thread-2] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 1 continue
20:43:50.325 [pool-1-thread-4] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 3 continue
20:43:50.325 [pool-1-thread-3] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 2 continue
20:43:51.325 [pool-1-thread-6] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 5 is ready
20:43:52.326 [pool-1-thread-1] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 6 is ready
20:43:53.326 [pool-1-thread-2] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 7 is ready
20:43:54.326 [pool-1-thread-4] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 8 is ready
20:43:55.327 [pool-1-thread-3] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 9 is ready
20:43:55.327 [pool-1-thread-3] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 9 continue
20:43:55.327 [pool-1-thread-6] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 5 continue
20:43:55.327 [pool-1-thread-1] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 6 continue
20:43:55.327 [pool-1-thread-2] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 7 continue
20:43:55.327 [pool-1-thread-4] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 8 continue
我们定义了 private static CyclicBarrier barrier = new CyclicBarrier(5),每次调用 await 后加 1 知道等于 5 就一起执行接下来的操作。
我们在创建 CyclicBarrier 的时候是可以传入一段 runable 的,下面看一下代码
@Slf4j
public class CyclicBarrierExample3 {private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {log.info("callback is running");
});
public static void main(String[] args) throws Exception {ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {race(threadNum);
} catch (Exception e) {log.error("exception", e);
}
});
}
executor.shutdown();}
private static void race(int threadNum) throws Exception {Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
在线程到达 = 执行屏障时,线程会优先执行这个 runable
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {log.info("callback is running");
});
正文完