前言
在理论开发中,有时会遇到这样的场景:主工作须要期待若干子工作实现后,再进行后续的操作。这时能够用join或者本文的CountDownLatch实现。它们的区别在于CountDownLatch更加灵便。比方,子工作的工作分为两个阶段,主工作只需子工作实现第一个阶段即可开始主工作,无需等第二个阶段实现。这种场景join就无奈做到,CountDownLatch就能够实现。上面是实例代码。
import java.util.concurrent.CountDownLatch;public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(2); Worker worker1 = new Worker("worker1", countDownLatch); Worker worker2 = new Worker("worker2", countDownLatch); worker1.start(); worker2.start(); System.out.println("main task wait for work1 and work2 finish their stage 1"); countDownLatch.await(); System.out.println("main task begin to work"); Thread.sleep(3000); System.out.println("main task finished"); } static class Worker extends Thread { private final CountDownLatch count; public Worker(String name, CountDownLatch count) { super.setName(name); this.count = count; } @Override public void run() { try { Thread.sleep(5000); System.out.println(Thread.currentThread().getName() + " stage 1 finished"); count.countDown(); Thread.sleep(5000); System.out.println(Thread.currentThread().getName() + " stage 2 finished"); } catch (InterruptedException e) { // ignore } } }}
运行后果如下:
主线程期待work1和work2实现它们的第一个阶段工作后,就开始工作,无需期待第二个阶段也实现。而join只能期待子线程整个run()执行结束能力往后执行,因而CountDownLatch更加灵便。
实现原理
从CountDownLatch的命名可猜想,它外部应该用了一个计数器,每当子线程调用countDown()办法时,计数器就减1,减到0时,主线程就会从调用await()阻塞处昏迷返回。
先来看看构造方法:
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count);}
其中Sync是它的外部类,实现了AQS接口。
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { // 计数器为0,则获取锁胜利,能够从await()返回 // 否则须要期待 return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; // 计数器减1 int nextc = c-1; if (compareAndSetState(c, nextc)) // 减到0时会unpark唤醒阻塞在await()的线程 return nextc == 0; } }}
能够看到,它是一个共享锁实现,多个线程通过Sync来同步计数器count的值。
再来看罕用的await()和countDown()办法:
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1);}
await()调用的是AQS中的模板办法:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 调用子类Sync的tryAcquireShared办法,如果共享式获取锁失败,doAcquireSharedInterruptibly外面会让以后线程在队列里阻塞期待获取锁。 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg);}
public void countDown() { sync.releaseShared(1);}
countDown调用的也是AQS中的模板办法:
public final boolean releaseShared(int arg) { // 调用子类Sync的tryReleaseShared()共享式地开释锁, // 计数器减为0时,doReleaseShared外面会唤醒期待在await()办法处的线程。 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false;}
参考资料:
《Java并发编程之美》