CountDownLatch
CountDownLatch实用于在多线程的场景须要期待所有子线程全副执行结束之后再做操作的场景。
举个例子,早上部门散会,有人在上厕所,这时候须要期待所有人从厕所回来之后能力开始会议。
public class CountDownLatchTest { private static int num = 3; private static CountDownLatch countDownLatch = new CountDownLatch(num); private static ExecutorService executorService = Executors.newFixedThreadPool(num); public static void main(String[] args) throws Exception{ executorService.submit(() -> { System.out.println("A在上厕所"); try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); }finally { countDownLatch.countDown(); System.out.println("A上完了"); } }); executorService.submit(()->{ System.out.println("B在上厕所"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { countDownLatch.countDown(); System.out.println("B上完了"); } }); executorService.submit(()->{ System.out.println("C在上厕所"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); }finally { countDownLatch.countDown(); System.out.println("C上完了"); } }); System.out.println("期待所有人从厕所回来散会..."); countDownLatch.await(); System.out.println("所有人都好了,开始散会..."); executorService.shutdown(); }}
代码执行后果:
A在上厕所B在上厕所期待所有人从厕所回来散会...C在上厕所B上完了C上完了A上完了所有人都好了,开始散会...
初始化一个CountDownLatch实例传参3,因为咱们有3个子线程,每次子线程执行结束之后调用countDown()办法给计数器-1,主线程调用await()办法后会被阻塞,直到最初计数器变为0,await()办法返回,执行结束。他和join()办法的区别就是join会阻塞子线程直到运行完结,而CountDownLatch能够在任何时候让await()返回,而且用ExecutorService没法用join了,相比起来,CountDownLatch更灵便。
CountDownLatch基于AQS实现,volatile变量state维持倒数状态,多线程共享变量可见。
- CountDownLatch通过构造函数初始化传入参数理论为AQS的state变量赋值,维持计数器倒数状态
- 当主线程调用await()办法时,以后线程会被阻塞,当state不为0时进入AQS阻塞队列期待。
- 其余线程调用countDown()时,state值原子性递加,当state值为0的时候,唤醒所有调用await()办法阻塞的线程
CyclicBarrier
CyclicBarrier叫做回环屏障,它的作用是让一组线程全副达到一个状态之后再全副同时执行,而且他有一个特点就是所有线程执行结束之后是能够重用的。
public class CyclicBarrierTest { private static int num = 3; private static CyclicBarrier cyclicBarrier = new CyclicBarrier(num, () -> { System.out.println("所有人都好了,开始散会..."); System.out.println("-------------------"); }); private static ExecutorService executorService = Executors.newFixedThreadPool(num); public static void main(String[] args) throws Exception{ executorService.submit(() -> { System.out.println("A在上厕所"); try { Thread.sleep(4000); System.out.println("A上完了"); cyclicBarrier.await(); System.out.println("会议完结,A退出"); } catch (Exception e) { e.printStackTrace(); }finally { } }); executorService.submit(()->{ System.out.println("B在上厕所"); try { Thread.sleep(2000); System.out.println("B上完了"); cyclicBarrier.await(); System.out.println("会议完结,B退出"); } catch (Exception e) { e.printStackTrace(); }finally { } }); executorService.submit(()->{ System.out.println("C在上厕所"); try { Thread.sleep(3000); System.out.println("C上完了"); cyclicBarrier.await(); System.out.println("会议完结,C退出"); } catch (Exception e) { e.printStackTrace(); }finally { } }); executorService.shutdown(); }}
输入后果为:
A在上厕所B在上厕所C在上厕所B上完了C上完了A上完了所有人都好了,开始散会...-------------------会议完结,A退出会议完结,B退出会议完结,C退出
从后果来看和CountDownLatch十分类似,初始化传入3个线程和一个工作,线程调用await()之后进入阻塞,计数器-1,当计数器为0时,就去执行CyclicBarrier中构造函数的工作,当工作执行结束后,唤醒所有阻塞中的线程。这验证了CyclicBarrier让一组线程全副达到一个状态之后再全副同时执行的成果。
再举个例子来验证CyclicBarrier可重用的成果。
public class CyclicBarrierTest2 { private static int num = 3; private static CyclicBarrier cyclicBarrier = new CyclicBarrier(num, () -> { System.out.println("-------------------"); }); private static ExecutorService executorService = Executors.newFixedThreadPool(num); public static void main(String[] args) throws Exception { executorService.submit(() -> { System.out.println("A在上厕所"); try { Thread.sleep(4000); System.out.println("A上完了"); cyclicBarrier.await(); System.out.println("会议完结,A退出,开始撸代码"); cyclicBarrier.await(); System.out.println("C工作完结,上班回家"); cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } finally { } }); executorService.submit(() -> { System.out.println("B在上厕所"); try { Thread.sleep(2000); System.out.println("B上完了"); cyclicBarrier.await(); System.out.println("会议完结,B退出,开始摸鱼"); cyclicBarrier.await(); System.out.println("B摸鱼完结,上班回家"); cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } finally { } }); executorService.submit(() -> { System.out.println("C在上厕所"); try { Thread.sleep(3000); System.out.println("C上完了"); cyclicBarrier.await(); System.out.println("会议完结,C退出,开始摸鱼"); cyclicBarrier.await(); System.out.println("C摸鱼完结,上班回家"); cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } finally { } }); executorService.shutdown(); }}
输入后果:
A在上厕所B在上厕所C在上厕所B上完了C上完了A上完了-------------------会议完结,A退出,开始撸代码会议完结,B退出,开始摸鱼会议完结,C退出,开始摸鱼-------------------C摸鱼完结,上班回家C工作完结,上班回家B摸鱼完结,上班回家-------------------
从后果来看,每个子线程调用await()计数器减为0之后才开始持续一起往下执行,会议完结之后一起进入摸鱼状态,最初一天完结一起上班,这就是可重用。
CyclicBarrier还是基于AQS实现的,外部保护parties记录总线程数,count用于计数,最开始count=parties,调用await()之后count原子递加,当count为0之后,再次将parties赋值给count,这就是复用的原理。
- 当子线程调用await()办法时,获取独占锁,同时对count递加,进入阻塞队列,而后开释锁
- 当第一个线程被阻塞同时开释锁之后,其余子线程竞争获取锁,操作同1
- 直到最初count为0,执行CyclicBarrier构造函数中的工作,执行结束之后子线程持续向下执行
Semaphore
Semaphore叫做信号量,和后面两个不同的是,他的计数器是递增的。
public class SemaphoreTest { private static int num = 3; private static int initNum = 0; private static Semaphore semaphore = new Semaphore(initNum); private static ExecutorService executorService = Executors.newFixedThreadPool(num); public static void main(String[] args) throws Exception{ executorService.submit(() -> { System.out.println("A在上厕所"); try { Thread.sleep(4000); semaphore.release(); System.out.println("A上完了"); } catch (Exception e) { e.printStackTrace(); }finally { } }); executorService.submit(()->{ System.out.println("B在上厕所"); try { Thread.sleep(2000); semaphore.release(); System.out.println("B上完了"); } catch (Exception e) { e.printStackTrace(); }finally { } }); executorService.submit(()->{ System.out.println("C在上厕所"); try { Thread.sleep(3000); semaphore.release(); System.out.println("C上完了"); } catch (Exception e) { e.printStackTrace(); }finally { } }); System.out.println("期待所有人从厕所回来散会..."); semaphore.acquire(num); System.out.println("所有人都好了,开始散会..."); executorService.shutdown(); }}
输入后果为:
A在上厕所B在上厕所期待所有人从厕所回来散会...C在上厕所B上完了C上完了A上完了所有人都好了,开始散会...
略微和前两个有点区别,构造函数传入的初始值为0,当子线程调用release()办法时,计数器递增,主线程acquire()传参为3则阐明主线程始终阻塞,直到计数器为3才会返回。
Semaphore还还还是基于AQS实现的,同时获取信号量有偏心和非偏心两种策略
- 主线程调用acquire()办法时,用以后信号量值-须要获取的值,如果小于0,则进入同步阻塞队列,大于0则通过CAS设置以后信号量为残余值,同时返回残余值
- 子线程调用release()给以后信号量值计数器+1(减少的值数量由传参决定),同时不停的尝试因为调用acquire()进入阻塞的线程
总结
CountDownLatch通过计数器提供了比join更灵便的多线程管制形式,CyclicBarrier也能够达到CountDownLatch的成果,而且有可复用的特点,Semaphore则是采纳信号量递增的形式,开始的时候并不需要关注须要同步的线程个数,并且提供获取信号的偏心和非偏心策略。