关于java-ee:CountDownLatchCyclicBarrierSemaphoreExchanger-的详细解析

31次阅读

共计 11362 个字符,预计需要花费 29 分钟才能阅读完成。


本文次要介绍和比照咱们罕用的几种并发工具类,次要波及 CountDownLatchCyclicBarrierSemaphoreExchanger 相干的内容,如果对多线程相干内容不相熟,能够看笔者之前的一些文章:

  • 《Java 并发编程 - 线程根底》
  • 《总算把线程六种状态的转换说分明了!》
  • [《[高频面试]解释线程池的各个参数含意》](https://mp.weixin.qq.com/s/mX…
  • 《晓得线程池的四种回绝策略吗?》
  • 《java 中常见的六种线程池详解》
  • 《基于 synchronized 的锁的深度解析》????举荐
  • 《JAVA 中常见的阻塞队列详解》
  • 《优雅敞开线程池的计划》

  • 介绍 CountDownLatchCyclicBarrier 两者的应用与区别,他们都是期待多线程实现,是一种并发流程的管制伎俩,
  • 介绍 SemaphoreExchanger 的应用,semaphore 是信号量,能够用来管制容许的线程数,而 Exchanger 能够用来替换两个线程间的数据。

CountDownLatch

  • CountDownLatchJDK5 之后退出的一种并发流程管制工具,它在 java.util.concurrent 包下
  • CountDownLatch 容许一个或多个线程期待其余线程实现操作,这里须要留神,是能够是一个期待也能够是多个来期待
  • CountDownLatch 的构造函数如下,它承受一个 int 类型的参数作为计数器,即如果你想期待N 个线程实现,那么这里就传入 N
    public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
  • 其中有两个外围的办法 countDownawait,其中 当咱们调用 countDown 办法时相应的 N 的值减 1,而 await 办法则会阻塞以后线程,直到 N 的值变为零。
  • 说起来比拟形象,上面咱们通过理论案例来阐明。

多个线程期待一个线程

  • 在咱们生存中最典型的案例就是体育中的跑步,假如当初咱们要进行一场赛跑,那么所有的选手都须要期待裁判员的起跑命令,这时候,咱们将其抽象化每个选手对应的是一个线程,而裁判员也是一个线程,那么就是多个选手的线程再期待裁判员线程的命令来执行
  • 咱们通过 CountDownLatch 来实现这一案例,那么期待的个数 N 就是下面的裁判线程的个数,即为 1,

    /**
     * @url i-code.onlien
     * 云栖简码
     */
    public static void main(String[] args) throws InterruptedException {
        // 模仿跑步较量,裁判说开始,所有选手开始跑,咱们能够应用 countDownlatch 来实现

        // 这里须要期待裁判说开始,所以时等着一个线程
        CountDownLatch countDownLatch = new CountDownLatch(1);

        new Thread(() ->{
            try {System.out.println(Thread.currentThread().getName() +"已筹备");
                countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"开始跑~~");

        },"选手 1").start();
        new Thread(() ->{
            try {System.out.println(Thread.currentThread().getName() +"已筹备");
                countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"开始跑~~");

        },"选手 2").start();

        TimeUnit.SECONDS.sleep(1);
        System.out.println("裁判:准备~~~");
        countDownLatch.countDown();
        System.out.println("裁判:跑~~~");
    }
  • 运行后果如下:

在上述代码中,咱们首先创立了一个计数为 1 的 CountDownLatch 对象,这代表咱们须要期待的线程数,之后再创立了两个线程,用来代表选手线程,同时在选手的线程中咱们都调用了 await 办法,让线程进入阻塞状态,直到 CountDownLatch 的计数为零后再执行前面的内容,在主线程 main 办法中咱们期待 1 秒后执行 countDown 办法,这个办法就是减一,此时的 N 则为零了,那么选手线程则开始执行前面的内容,整体的输入如上图所示

一个 / 多个线程期待多个线程

  • 同样从咱们生存中的场景来形象,假如公司要组织出游,大巴车接送,当凑够五个人大巴车则发车登程,这里就是大巴车须要期待这五个人全副到齐能力继续执行,咱们形象之后用 CountDownLatch 来实现,那么的计数个数 N 则为 5,因为要期待这五个,通过代码实现如下:

    public static void main(String[] args) throws InterruptedException {
        /**
         * i-code.online
         * 云栖简码 
         */
        // 期待的个数
        CountDownLatch countDownLatch = new CountDownLatch(5);

        for (int i = 0; i < 5; i++) {new Thread(()->{System.out.println(Thread.currentThread().getName() + "从住所登程...");
                try {TimeUnit.SECONDS.sleep((long) (Math.random()*10));
                } catch (InterruptedException e) {e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "达到目的地 -----");
                countDownLatch.countDown();},"人员 -"+i).start();}

        System.out.println("大巴正在期待人员中.....");
        countDownLatch.await();
        System.out.println("----- 所有人到齐,登程 -----");
    }
  • 下面代码执行后果如下:

从上述代码中咱们能够看到,定义了一个计数为 5 的 countDownLatch,之后通过循环创立五个线程,模仿五个人员,当他们达到指定地点后执行 countDown 办法,对计数减一。主线程相当于是大巴车的线程,执行 await 办法进行阻塞,只有当 N 的值减到 0 后则执行前面的输入

CountDownLatch 次要办法介绍

  • 构造函数:
public CountDownLatch(int count) {};

它的构造函数是传入一个参数,该参数 count 是须要倒数的数值。

  • await():调用 await() 办法的线程开始期待,直到倒数完结,也就是 count 值为 0 的时候才会继续执行。
  • await(long timeout, TimeUnit unit)await() 有一个重载的办法,外面会传入超时参数,这个办法的作用和 await() 相似,然而这里能够设置超时工夫,如果超时就不再期待了。
  • countDown():把数值倒数 1,也就是将 count 值减 1,直到减为 0 时,之前期待的线程会被唤起。

下面的案例介绍了 CountDownLatch 的应用,然而 CountDownLatch 有个特点,那就是不可能重用,比方曾经实现了倒数,那可不可以在下一次持续去从新倒数呢?是能够的,一旦倒数到 0 则完结了,无奈再次设置循环执行,然而咱们理论需要中有很多场景中须要循环来解决,这时候咱们能够应用 CyclicBarrier 来实现

CyclicBarrier

  • CyclicBarrierCountDownLatch 比拟类似,当期待到肯定数量的线程后开始执行某个工作
  • CyclicBarrier 的字面意思是能够循环应用的屏障,它的性能就是让一组线程达到一个屏障(同步点)时被阻塞,直到最初一个线程达到屏障时,屏障才会散会,此时所有被屏障阻塞的线程都将继续执行。如下演示

  • 上图中能够看到,到线程达到屏障后阻塞,直到最初一个也达到后,则全副放行
  • 首先咱们来看下它的构造函数,如下:
    public CyclicBarrier(int parties) {this(parties, null);
    }

    public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
  • CyclicBarrier(int parties) 构造函数提供了int 类型的参数,代表的是须要拦挡的线程数量,而每个线程通过调用 await 办法来通知 CyclicBarrier 我达到屏障点了,而后阻塞
  • CyclicBarrier(int parties, Runnable barrierAction) 构造函数是为咱们提供的一个高级办法,加了一个 barrierAction 的参数,这是一个 Runnable 类型的,也就是一个线程,它示意当所有线程达到屏障后,悠闲触发 barrierAction 线程执行,再执行各个线程之后的内容

案例

  • 假如你要和你女朋友约会,约定了一个工夫地点,那么不论你们谁先到都会期待另一个到才会登程取约会~ 那么这时候咱们通过CyclicBarrier 的来实现,这里咱们须要来拦挡的线程就是两个。具体实现 如下:
    /*
    CyclicBarrier 与 countDownLatch 比拟类似,也是期待线程实现,不过 countDownLatch 是 await 期待其余的线程通过 countDown 的数量,达到肯定数则执行,而 CyclicBarrier 则是间接看 await 的数量,达到肯定数量间接全副执行,*/
    public static void main(String[] args) {
        // 好比情侣约会,不论谁先到都的等另一个,这里就是两个线程,CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

        new Thread(() ->{System.out.println("疾速拾掇,出门~~~");
            try {TimeUnit.MILLISECONDS.sleep(500);
                System.out.println("到了约会地点期待女朋友前来~~");
                cyclicBarrier.await();
                System.out.println("女朋友到来嗨皮登程~~ 约会");
            } catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();
            }

        },"男朋友").start();
        new Thread(() ->{System.out.println("缓缓拾掇,出门~~~");
            try {TimeUnit.MILLISECONDS.sleep(5000);
                System.out.println("到了约会地点期待男朋友前来~~");
                cyclicBarrier.await();
                System.out.println("男朋友到来嗨皮登程~~ 约会");
            } catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();
            }
        },"女朋友").start();}
  • 代码执行后果如下:

下面代码,绝对简略,创立一个拦挡数为 2 的屏障,之后创立两个线程,调用 await 办法,只有当调用两次才会触发前面的流程。

  • 咱们再写一个案例 sh,应用含有Runnable 参数的构造函数;和之前 CountDownLatch 的案例类似,公司组织出游,这时候必定有很多大巴在期待接送,大巴不会等所有的 人都到才登程,而是每坐满一辆车就登程一辆,这种场景咱们就能够应用 CyclicBarrier 来实现,实现如下:

    /*
    CyclicBarrier 是可重复使用到,也就是每当几个满足是不再期待执行,比方公司组织出游,安顿了好多辆大把,每坐满一辆就发车,不再期待,相似这种场景,实现如下:*/

    public static void main(String[] args) {
        // 公司人数
        int peopleNum = 2000;
        // 每二十五个人一辆车,凑够二十五则发车~
        CyclicBarrier cyclicBarrier = new CyclicBarrier(25,() ->{
            // 达到 25 人登程
            System.out.println("------------25 人数凑齐登程 ------------");
        });

        for (int j = 1; j <= peopleNum; j++) {new Thread(new PeopleTask("People-"+j,cyclicBarrier)).start();}

    }

    static class PeopleTask implements Runnable{

        private String name;
        private  CyclicBarrier cyclicBarrier;
        public PeopleTask(String name,CyclicBarrier cyclicBarrier){
            this.name = name;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {System.out.println(name+"从家里登程,正在返回聚合地....");
            try {TimeUnit.MILLISECONDS.sleep(((int) Math.random()*1000));
            } catch (InterruptedException e) {e.printStackTrace();
            }
            System.out.println(name+"达到集合地点,期待其他人..");
            try {cyclicBarrier.await();
            } catch (InterruptedException e) {e.printStackTrace();
            } catch (BrokenBarrierException e) {e.printStackTrace();
            }

        }
    }

CyclicBarrier 和 CountDownLatch 的异同

相同点:

  • 都能阻塞一个或一组线程,直到某个预设的条件达成产生,再对立登程

不同点:

  • 可重复性:CountDownLatch 的计数器只能应用一次,到达到 0 后就不能再次应用了,除非新建实例;而 CyclicBarrier 的计数器是能够复用循环的,所以 CyclicBarrier 能够用在更简单的场景,能够随时调用 reset办法来重制拦挡数,如计算产生谬误时能够间接充值计数器,让线程从新执行一次。
  • 作用对象:CyclicBarrier 要等固定数量的线程都达到了屏障地位能力继续执行,而 CountDownLatch 只需期待数字倒数到 0,也就是说 CountDownLatch 作用于事件,但 CyclicBarrier 作用于线程;CountDownLatch 是在调用了 countDown 办法之后把数字倒数减 1,而 CyclicBarrier 是在某线程开始期待后把计数减 1
  • 执行动作:CyclicBarrier 有执行动作 barrierAction,而 CountDownLatch 没这个性能。

Semaphore

  • Semaphore(信号量)是用来管制同时拜访特定资源的线程数量,它通过协调各个线程,以保障正当的应用公共资源,

  • 从图中能够看出,信号量的一个最次要的作用就是,来管制那些须要限度并发访问量的资源。具体来讲,信号量会保护“许可证”的计数,而线程去访问共享资源前,必须先拿到许可证(acquire 办法)。线程能够从信号量中去“获取”一个许可证,一旦线程获取之后,信号量持有的许可证就转移过来了,所以信号量手中残余的许可证要减一。
  • 同理,线程也能够“开释”一个许可证,如果线程开释了许可证(release 办法),这个许可证相当于被归还给信号量了,于是信号量中的许可证的可用数量加一。当信号量领有的许可证数量减到 0 时,如果下个线程还想要取得许可证,那么这个线程就必须期待,直到之前失去许可证的线程开释,它能力获取。因为线程在没有获取到许可证之前不能进一步去拜访被爱护的共享资源,所以这就管制了资源的并发访问量,这就是整体思路。

案例

  • 如咱们平时开发中典型的数据库操作,这是一个密集IO 操作,咱们能够启动很多线程然而数据库的连接池是有限度的,假如咱们设置容许五个链接,如果咱们开启太多线程间接操作则会出现异常,这时候咱们能够通过信号量来管制,让始终最多只有五个线程来获取连贯。代码如下:
    /*
        Semaphore 是信号量,能够用来控制线程的并发数,能够协调各个线程,以达到正当的应用公共资源
     */

    public static void main(String[] args) {
        // 创立 10 个容量的线程池
        final ExecutorService service = Executors.newFixedThreadPool(100);
        // 设置信号量的值 5,也就是容许五个线程来执行
        Semaphore s = new Semaphore(5);
        for (int i = 0; i < 100; i++) {service.submit(() ->{
                try {s.acquire();
                } catch (InterruptedException e) {e.printStackTrace();
                }
                try {System.out.println("数据库耗时操作"+Thread.currentThread().getName());
                    TimeUnit.MILLISECONDS.sleep(3000);
                } catch (InterruptedException e) {e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "正在执行....");
                s.release();});
        }

    }

如上代码,创立了一个容量 100 的线程池,模仿咱们程序中大量的线程,增加一百个工作,让线程池执行。创立了一个容量为 5 的信号量,在线程中咱们调用 acquire 来取得信号量的许可,只有取得了能力只能上面的内容不然阻塞。当执行完后开释该许可,通过 release 办法,

  • 通过下面的演示,有没有感觉十分眼生,对,就是和咱们之前接触过的锁很类似,只是锁是只容许一个线程拜访,那咱们能不能将信号量的容量设置为 1 呢?这当然是能够的,当咱们设置为 1 时其实就和咱们的锁的性能是统一的,如下代码:
    private static int count = 0;
    /*
        Semaphore 中如果咱们容许的的许可证数量为 1,那么它的成果与锁类似。*/
    public static void main(String[] args) throws InterruptedException {final ExecutorService service = Executors.newFixedThreadPool(10);

        Semaphore semaphore = new Semaphore(1);
        for (int i = 0; i < 10000; i++) {service.submit(() ->{
                try {semaphore.acquire();
                } catch (InterruptedException e) {e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "执行了");
                count ++;
                semaphore.release();});
        }
        service.shutdown();
        TimeUnit.SECONDS.sleep(5);
        System.out.println(count);

    }

其余次要办法介绍

  • public boolean tryAcquire()tryAcquire 和锁的 trylock 思维是统一的,是尝试获取许可证,相当于看看当初有没有闲暇的许可证,如果有就获取,如果当初获取不到也没关系,不用陷入阻塞,能够去做别的事。
  • public boolean tryAcquire(long timeout, TimeUnit unit):是一个重载的办法,它外面传入了超时工夫。比方传入了 3 秒钟,则意味着最多期待 3 秒钟,如果期待期间获取到了许可证,则往下继续执行;如果超时工夫到,仍然获取不到许可证,它就认为获取失败,且返回 false。
  • int availablePermits():返回此信号量中以后可用的许可证数
  • int getQueueLength():返回正在期待许可证的线程数
  • boolean hasQueuedThreads():判断是否有线程正在期待获取许可证
  • void reducePermits(int reduction):缩小 reduction 个许可证,是个 protected 办法
  • Collection<Thread> getQueuedThreads():返回正在期待获取许可证的线程汇合,是个 protected 办法

Exchanger

  • Exchanger(替换者)是一个用于线程间合作的工具类,它次要用于进行线程间数据的替换,它有一个同步点,当两个线程达到同步点时能够将各自的数据传给对方,如果一个线程先达到同步点则会期待另一个达到同步点,达到同步点后调用 exchange 办法能够传递本人的数据并且取得对方的数据。
  • 咱们假如当初须要录入一些重要的账单信息,为了保障筹备,让两个人别离录入,之后再进行比照后是否统一,避免谬误繁盛。上面通过代码来演示:
public class ExchangerTest {

    /*
    Exchanger 替换,用于线程间合作的工具类,能够替换线程间的数据,其提供一个同步点,当线程达到这个同步点后进行数据间的交互,遗传算法能够如此来实现,以及校对工作也能够如此来实现
     */

    public static void main(String[] args) {
        /*
        模仿 两个工作人员录入记录,为了避免谬误,两者录的雷同内容,程序仅从校对,看是否有谬误不统一的
         */

        // 开拓两个容量的线程池
        final ExecutorService service = Executors.newFixedThreadPool(2);

        Exchanger<InfoMsg> exchanger = new Exchanger<>();

        service.submit(() ->{
            // 模仿数据 线程 A 的
            InfoMsg infoMsg = new InfoMsg();
            infoMsg.content="这是线程 A";
            infoMsg.id ="10001";
            infoMsg.desc = "1";
            infoMsg.message = "message";
            System.out.println("正在执行其余...");
            try {TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            try {final InfoMsg exchange = exchanger.exchange(infoMsg);
                System.out.println("线程 A 替换数据 ====== 失去"+ exchange);
                if (!exchange.equals(infoMsg)){System.out.println("数据不统一~~ 请稽核");
                    return;
                }
            } catch (InterruptedException e) {e.printStackTrace();
            }
        });
        service.submit(() ->{
            // 模仿数据 线程 B 的
            InfoMsg infoMsg = new InfoMsg();
            infoMsg.content="这是线程 B";
            infoMsg.id ="10001";
            infoMsg.desc = "1";
            infoMsg.message = "message";
            System.out.println("正在执行其余...");
            try {TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            try {final InfoMsg exchange = exchanger.exchange(infoMsg);
                System.out.println("线程 B 替换数据 ====== 失去"+ exchange);
                if (!exchange.equals(infoMsg)){System.out.println("数据不统一~~ 请稽核");
                    return;
                }
            } catch (InterruptedException e) {e.printStackTrace();
            }
        });

        service.shutdown();}

    static class InfoMsg{
        String id;
        String name;
        String message;
        String content;
        String desc;

        @Override
        public String toString() {
            return "InfoMsg{" +
                    "id='" + id + '\'' +
                    ", name='" + name + '\'' +
                    ", message='" + message + '\'' +
                    ", content='" + content + '\'' +
                    ", desc='" + desc + '\'' +
                    '}';
        }

        @Override
        public boolean equals(Object o) {if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            InfoMsg infoMsg = (InfoMsg) o;
            return Objects.equals(id, infoMsg.id) &&
                    Objects.equals(name, infoMsg.name) &&
                    Objects.equals(message, infoMsg.message) &&
                    Objects.equals(content, infoMsg.content) &&
                    Objects.equals(desc, infoMsg.desc);
        }

        @Override
        public int hashCode() {return Objects.hash(id, name, message, content, desc);
        }
    }
}
  • 运行后果如下:

下面代码运行能够看到,当咱们线程 A/B 达到同步点即调用 exchange 后进行数据的替换,拿到对方的数据再与本人的数据比照能够做到稽核 的成果

  • Exchanger 同样能够用于遗传算法中,选出两个对象进行交互两个的数据通过穿插规定失去两个混同的后果。
  • Exchanger 中嗨提供了一个办法 public V exchange(V x, long timeout, TimeUnit unit) 次要是用来避免两个程序中一个始终没有执行 exchange 而导致另一个始终陷入期待状态,这是能够用这个办法,设置超时工夫,超过这个工夫则不再期待。


本文由 AnonyStar 公布, 可转载但需申明原文出处。
欢送关注微信公账号:云栖简码 获取更多优质文章
更多文章关注笔者博客:云栖简码 i-code.online

正文完
 0