乐趣区

关于java:CyclicBarrier人齐了司机就可以发车了

上一篇咱讲了 CountDownLatch 能够解决多个线程同步的问题,相比于 join 来说它的利用范畴更广,不仅能够利用在线程上,还能够利用在线程池上。然而 CountDownLatch 却是一次性的计数器 ,以王者农药来说,咱们不可能一场团战就决定较量的输赢,所以在某些场景下,咱们是须要重复使用某个期待性能的,这就是咱们明天要介绍的另一个配角——CyclicBarrier。

CyclicBarrier

CyclicBarrier 翻译为中文是循环(Cyclic)栅栏(Barrier)的意思,它的大略含意是实现一个可循环利用的屏障。

CyclicBarrier 作用是让一组线程互相期待,当达到一个共同点时,所有之前期待的线程再继续执行,且 CyclicBarrier 性能可重复使用。

举个栗子

比方磊哥要坐班车回老家,因为中途不容许上、下乘客,所以营运的公司为了收益最大化,就会等人满之后再发车。像这种等人坐满就发一班车的场景,就是 CyclicBarrier 所善于的,因为它能够重复使用(不像 CountDownLatch 那样只能用一次)。

CyclicBarrier VS CountDownLatch

CountDownLatch:一个或者多个线程,期待另外 N 个线程实现某个事件之后能力执行。

CountDownLatch 就像玩王者农药开局的加载一样,所有人要期待其他人都加载 100% 之后能力开始游戏。

CyclicBrrier:N 个线程互相期待,直到有足够数量的线程都达到屏障点之后,之前期待的线程就能够继续执行了。

CyclicBrrier 就像老司机开车一样,如果车上还有空余的座位,那么所有人都得等着,直到座位被坐满之后,老司机才会发车。

CyclicBarrier 应用

import java.util.Date;
import java.util.Random;
import java.util.concurrent.*;

public class CyclicBarrierExample {public static void main(String[] args) {
        // 创立 CyclicBarrier
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
            @Override
            public void run() {System.out.println("人满了,筹备发车:" + new Date());
            }
        });
        
        // 线程调用的工作
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                // 生成随机数 1-3
                int randomNumber = new Random().nextInt(3) + 1;
                // 进入工作
                System.out.println(String.format("我是:%s 再走:%d 秒就到车站了,当初工夫:%s",
                        Thread.currentThread().getName(), randomNumber, new Date()));
                try {
                    // 模仿执行
                    TimeUnit.SECONDS.sleep(randomNumber);
                    // 调用 CyclicBarrier
                    cyclicBarrier.await();
                    // 工作执行
                    System.out.println(String.format("线程:%s 上车,工夫:%s",
                            Thread.currentThread().getName(), new Date()));
                } catch (InterruptedException e) {e.printStackTrace();
                } catch (BrokenBarrierException e) {e.printStackTrace();
                }
            }
        };

        // 创立线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        // 执行工作 1
        threadPool.submit(runnable);
        // 执行工作 2
        threadPool.submit(runnable);
        // 执行工作 3
        threadPool.submit(runnable);
        // 执行工作 4
        threadPool.submit(runnable);

        // 期待所有工作执行完终止线程池
        threadPool.shutdown();}
}

以上代码执行后果如下:

从上述后果能够看出:当 CyclicBarrier 的计数器设置为 2 时,线程 2 和 线程 3 都到屏障点之后,老司机才会发第一波车,再 2s 之后,线程 1 和线程 4 也同时进入了屏障点,这时候老司机又能够再发一波车了。

实现原理

咱们先来看下 CyclicBarrier 的类图:

由上图可知 CyclicBarrier 是基于独占锁 ReentrantLock 实现的,其底层也是基于 AQS 的。

在 CyclicBarrier 类的外部有一个计数器 count,当 count 不为 0 时,每个线程在达到屏障点会先调用 await 办法将本人阻塞,此时计数器会减 1,直到计数器减为 0 的时候,所有因调用 await 办法而被阻塞的线程就会被唤醒继续执行。当 count 计数器变成 0 之后,就会进入下一轮阻塞,此时 parties(parties 是在 new CyclicBarrier(parties) 时设置的值)会将它的值赋值给 count 从而实现复用。

罕用办法

CyclicBarrier(parties):初始化互相期待的线程数量的构造方法。

CyclicBarrier(parties,Runnable barrierAction):初始化互相期待的线程数量以及屏障线程的构造方法,当 CyclicBarrier 的计数器变为 0 时,会执行 barrierAction 构造方法。

getParties():获取 CyclicBarrier 关上屏障的线程数量,也称为方数。

getNumberWaiting():获取正在 CyclicBarrier 上期待的线程数量。

await():在 CyclicBarrier 上进行阻塞期待,直到产生以下情景之一:
在 CyclicBarrier 上期待的线程数量达到 parties,则所有线程被开释,继续执行;

  • 以后线程被中断,则抛出 InterruptedException 异样,并进行期待,继续执行;
  • 其余期待的线程被中断,则以后线程抛出 BrokenBarrierException 异样,并进行期待,继续执行;
  • 其余期待的线程超时,则以后线程抛出 BrokenBarrierException 异样,并进行期待,继续执行;
  • 其余线程调用 CyclicBarrier.reset() 办法,则以后线程抛出 BrokenBarrierException 异样,并进行期待,继续执行。

await(timeout,TimeUnit):在 CyclicBarrier 上进行限时的阻塞期待,直到产生以下情景之一:

  • 在 CyclicBarrier 上期待的线程数量达到 parties,则所有线程被开释,继续执行;
  • 以后线程被中断,则抛出 InterruptedException 异样,并进行期待,继续执行;
  • 以后线程期待超时,则抛出 TimeoutException 异样,并进行期待,继续执行;
  • 其余期待的线程被中断,则以后线程抛出 BrokenBarrierException 异样,并进行期待,继续执行;
  • 其余期待的线程超时,则以后线程抛出 BrokenBarrierException 异样,并进行期待,继续执行;
  • 其余线程调用 CyclicBarrier.reset() 办法,则以后线程抛出 BrokenBarrierException 异样,并进行期待,继续执行。

isBroken():获取是否破损标记位 broken 的值,此值有以下几种状况:

  • CyclicBarrier 初始化时,broken=false,示意屏障未破损;
  • 如果正在期待的线程被中断,则 broken=true,示意屏障破损;
  • 如果正在期待的线程超时,则 broken=true,示意屏障破损;
  • 如果有线程调用 CyclicBarrier.reset() 办法,则 broken=false,示意屏障回到未破损状态。

reset():使得 CyclicBarrier 回归初始状态,直观来看它做了两件事:

  • 如果有正在期待的线程,则会抛出 BrokenBarrierException 异样,且这些线程进行期待,继续执行。
  • 将是否破损标记位 broken 置为 false。

总结

CyclicBrrier 是通过独占锁 ReentrantLock 实现计数器的原子性更新的,CyclicBrrier 最罕用的是 await() 办法,应用此办法会将计数器 -1,并判断以后的计数器是否为 0,如果不为 0 就会阻塞期待,并计时器为 0 之后,能力继续执行残余工作。CyclicBrrier 相比于 CountDownLatch 来说,它的劣势在于能够重复使用。

参考 & 鸣谢

blog.csdn.net/qq_39241239/article/details/87030142
blog.csdn.net/zzg1229059735/article/details/61191679
www.cnblogs.com/yaochunhui/p/13494689.html

关注公众号:「Java 中文社群」查看更多精彩内容。

退出移动版