关于java:CountDownLatch与CyclicBarrier的基本使用

19次阅读

共计 7898 个字符,预计需要花费 20 分钟才能阅读完成。

1 概述

CountDownLatch以及 CyclicBarrier 都是 Java 外面的同步工具之一,本文介绍了两者的基本原理以及根本应用办法。

2 CountDownLatch

CountDownLatch是一个同步工具类,常见的应用场景包含:

  • 容许一个或多个线程期待一系列的其余线程完结
  • 在串行化工作中须要进行并行化解决,并期待所有并行化工作完结,串行化工作能力持续进行

比方思考这样一个场景,在一个电商网站中,用户点击了首页,须要一部分的商品,同时显示它们的价格,那么,调用的流程应该是:

  • 获取商品
  • 计算售价
  • 返回所有商品的最终售价

解决这样的问题能够应用串行化或并行化操作,串行化就是逐个计算商品的售价,并返回,并行化就是获取商品后,并行计算每一个商品的售价,最初返回,显然后一种计划要比前一种要好,那么这时候就能够用上 CountDownLatch 了。

一份简略的模仿代码如下:

import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.util.concurrent.ThreadLocalRandom.current;

public class CountDownLatchExample {public static void main(String[] args) throws InterruptedException{List<Price> list = IntStream.rangeClosed(1,10).mapToObj(Price::new).collect(Collectors.toList());
        // 计数器大小为商品列表的长度
        final CountDownLatch latch = new CountDownLatch(list.size());
        // 线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        list.forEach(p-> executor.execute(()->{System.out.println("Product"+p.id+"start calculate price");
            try{
                // 随机休眠模仿业务操作耗时
                TimeUnit.SECONDS.sleep(current().nextInt(10));
                p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7));
                System.out.println("Product"+p.id+"calculate price completed");
            }catch (InterruptedException e){e.printStackTrace();
            }finally {
                // 每实现计算一个商品,将计数器减 1,留神须要放在 finally 中
                latch.countDown();}
        }));
        // 主线程阻塞直到所有的计数器为 0,也就是期待所有的子工作计算价格结束
        latch.await();
        System.out.println("All of prices calculate finished");
        // 手动终止,不然不会完结运行
        executor.shutdown();}

    private static class Price{
        private final int id;
        private double price;

        public Price(int id) {this.id = id;}

        public int getId() {return id;}

        public double getPrice() {return price;}

        public void setPrice(double price) {this.price = price;}
    }
}

输入:

代码比较简单,要害中央用上了正文,能够看到代码执行程序如下:

  • 创立多个工作计算商品的价格
  • 主线程阻塞
  • 计算实现后,将计数器减 1
  • 当计数器为 0 时,主线程退出阻塞状态

值得注意的是计数器减 1 的操作须要放在 finally 中,因为有可能会出现异常,如果出现异常导致计数器不能缩小,那么主线程会始终阻塞。

另外,CountDownLatch还有一个 await(long timeout,TimeUnit unit) 办法,是带有超时参数的,也就是说,如果在超时工夫内,计数器的值还是大于 0(还有工作没执行实现),会使得以后线程退出阻塞状态。

3 CyclicBarrier

CyclicBarrierCountDownLatch 有很多相似的中央,也是一个同步工具类,容许多个线程在执行完相应的操作之后彼此期待达到同一个 barrier point(屏障点)。CyclicBarrier 也适宜某个串行化的工作被拆分为多个并行化工作,这点与 CountDownLatch 相似,然而 CyclicBarrier 具备的一个更弱小的性能是,CyclicBarrier能够被重复使用。

3.1 期待实现

先简略说一下 CyclicBarrier 的实现原理:

  • 初始化 CyclicBarrier,传入一个int 参数,示意分片(parites),通常意义上来说分片数就是工作的数量
  • 同时串行化执行多个工作
  • 工作执行实现后,调用await(),期待其余线程也达到barrier point
  • 当所有线程达到后,持续以串行化形式运行工作

常见的应用办法是设置分片数为工作数 +1,这样,能够在主线程中执行 await(),期待所有子工作实现。比方上面是应用CyclicBarrier 实现同样性能的模仿代码:

import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.util.concurrent.ThreadLocalRandom.current;

public class CountDownLatchExample {public static void main(String[] args) throws InterruptedException,BrokenBarrierException{List<Price> list = IntStream.rangeClosed(1,10).mapToObj(Price::new).collect(Collectors.toList());
        final CyclicBarrier barrier = new CyclicBarrier(11);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        list.forEach(p-> executor.execute(()->{System.out.println("Product"+p.id+"start calculate price");
            try{TimeUnit.SECONDS.sleep(current().nextInt(10));
                p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7));
                System.out.println("Product"+p.id+"calculate price completed");
            }catch (InterruptedException e){e.printStackTrace();
            }finally {
                try{barrier.await();
                }catch (InterruptedException | BrokenBarrierException e){e.printStackTrace();
                }
            }
        }));
        barrier.await();
        System.out.println("All of prices calculate finished");
        executor.shutdown();}

    private static class Price{
        private final int id;
        private double price;

        public Price(int id) {this.id = id;}

        public int getId() {return id;}

        public double getPrice() {return price;}

        public void setPrice(double price) {this.price = price;}
    }
}

输入雷同,代码大部分类似,不同的中央有:

  • latch.countDown()替换成了barrier.await()
  • latch.await()替换成了barrier.await()
  • 线程池的外围线程数替换成了10

await()办法会期待所有的线程达到barrier point,下面代码执行流程简述如下:

  • 初始化CyclicBarrier,分片数为 11(子线程数 +1)
  • 主线程调用await(),期待子线程执行实现
  • 子线程各自进行商品价格的计算,计算实现后,调用await(),期待其余线程也达到barrier point
  • 当所有子线程计算实现后,因为没有后续操作,所以子线程运行完结,同时因为主线程还有后续操作,会先输入提示信息再终止线程池

留神一个很大的不同就是这里的线程池外围线程数目改成了 10,那么,为什么须要 10?

因为如果是设置一个小于 10 的外围线程个数,因为线程池是会先创立外围线程来执行工作,外围线程满了之后,放进工作队列中,而假如只有 5 个外围线程,那么:

  • 5 个线程进行计算价格
  • 另外 5 个工作放在工作队列中

这样的话,会呈现死锁,因为计算中的线程须要队列中的工作达到 barrier point 能力完结,而队列中的工作须要外围线程计算结束后,能力调度进去计算,这样死锁就呈现了。

3.2 重复使用

CyclicBarrierCountDownLatch 的一个最大不同是,CyclicBarrier能够被重复使用,原理上来说,await()会将外部计数器减 1,当计数器减为 0 时,会主动进行计数器(分片数)重置。比方,在下面的代码中,因为遇上促销流动,须要对商品的价格再次进行计算:

import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.util.concurrent.ThreadLocalRandom.current;

public class CountDownLatchExample {public static void main(String[] args) throws InterruptedException,BrokenBarrierException{List<Price> list = IntStream.rangeClosed(1,10).mapToObj(Price::new).collect(Collectors.toList());
        final CyclicBarrier barrier = new CyclicBarrier(11);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        list.forEach(p-> executor.execute(()->{System.out.println("Product"+p.id+"start calculate price.");
            try{TimeUnit.SECONDS.sleep(current().nextInt(10));
                p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7));
                System.out.println("Product"+p.id+"calculate price completed.");
            }catch (InterruptedException e){e.printStackTrace();
            }finally {
                try{barrier.await();
                }catch (InterruptedException | BrokenBarrierException e){e.printStackTrace();
                }
            }
        }));
        barrier.await();
        System.out.println("All of prices calculate finished.");
        
        // 复制的一段雷同代码
        list.forEach(p-> executor.execute(()->{System.out.println("Product"+p.id+"start calculate price again.");
            try{TimeUnit.SECONDS.sleep(current().nextInt(10));
                p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7));
                System.out.println("Product"+p.id+"calculate price completed.");
            }catch (InterruptedException e){e.printStackTrace();
            }finally {
                try{barrier.await();
                }catch (InterruptedException | BrokenBarrierException e){e.printStackTrace();
                }
            }
        }));
        barrier.await();
        System.out.println("All of prices calculate finished again.");
        executor.shutdown();}

    private static class Price{
        private final int id;
        private double price;

        public Price(int id) {this.id = id;}

        public int getId() {return id;}

        public double getPrice() {return price;}

        public void setPrice(double price) {this.price = price;}
    }
}

将计算价格的代码复制一遍,其中没有手动批改计数器,只是调用await(),输入如下:

能够看到,并没有对 CycliBarrier 进行相似 reset 之类的操作,然而仍然能按失常逻辑运行,这是因为 await() 外部会保护一个计数器,当计数器为 0 的时候,会主动进行重置,上面是 await()OpenJDK 11下的源码:

public int await() throws InterruptedException, BrokenBarrierException {
    try {return this.dowait(false, 0L);
    } catch (TimeoutException var2) {throw new Error(var2);
    }
}
    
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
    ReentrantLock lock = this.lock;
    lock.lock();

    byte var9;
    try {
        //...
        int index = --this.count;
        if (index != 0) {
            // 计数器不为 0 的状况
            //....
        }

        boolean ranAction = false;

        try {
            Runnable command = this.barrierCommand;
            if (command != null) {command.run();
            }

            ranAction = true;
            
            this.nextGeneration();
            var9 = 0;
        } finally {if (!ranAction) {this.breakBarrier();
            }

        }
    } finally {lock.unlock();
    }

    return var9;
}

private void nextGeneration() {this.trip.signalAll();
    this.count = this.parties;
    this.generation = new CyclicBarrier.Generation();}

当计数器为 0 时,会生成新的 Generation,并将var9 置为 0,最初返回 var9(在这个办法中var9 只有一处赋值,就是代码中的var9=0,能够了解成间接返回 0)。

3.3 CyclicBarrier其余的一些罕用办法

  • CyclicBarrier(int parties,Runnable barrierAction):结构的时候传入一个 Runnable,示意所有线程达到barrier point 时,会调用该Runnable
  • await(long timeout,TimeUnit unit):与无参的 await() 相似,底层调用的是雷同的doWait(),不过减少了超时性能
  • isBroken():返回 broken 状态,某个线程因为执行 await 而进入阻塞,此时如果执行了中断操作(比方 interrupt),那么isBroken() 会返回 true。须要留神,处于broken 状态的 CyclicBarrier 不能被间接应用,须要调用 reset() 进行重置

4 总结

上面是 CountDownLatchCyclicBarrier的一些简略比拟,相同点如下:

  • 都是 java.util.concurrent 包下的线程同步工具类
  • 都能够用于“主线程阻塞始终期待,直到子工作实现,主线程才继续执行”的状况

不同点:

  • CountDownLatchawait() 办法会期待计数器归 0,而 CyclicBarrierawait()会期待其余线程达到barrier point
  • CyclicBarrier外部的计数器是能够被重置的,然而 CountDownLatch 不能够
  • CyclicBarrier是由 LockCondition实现的,而 CountDownLatch 是由同步控制器 AQS 实现的
  • 结构时 CyclicBarrier 不容许 parties 为 0,而 CountDownLatch 容许 count 为 0

正文完
 0