共计 3356 个字符,预计需要花费 9 分钟才能阅读完成。
本文首发于微信公众号【WriteOnRead】,欢送关注。
1. 概述
CountDownLatch 是并发包中的一个工具类,它的典型利用场景为:一个线程期待几个线程执行,待这几个线程完结后,该线程再继续执行。
简略起见,能够把它了解为一个倒数的计数器:初始值为线程数,每个线程完结时执行减 1 操作,当计数器减到 0 时期待的线程再继续执行。
2. 代码剖析
CountDownLatch 的类签名和次要办法如下:
public class CountDownLatch {}
罕用办法为:await()、await(long, TimeUnit) 和 countDown。其中两个 await 都是让以后线程进入期待状态(获取资源失败);而 countDown 办法是将计数器减去 1,当计数器为 0 的时候,那些处于期待状态的线程会继续执行(获取资源胜利)。
结构器代码如下:
private final Sync sync;
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
结构器(该结构器是惟一的)传入一个正整数,且初始化了 sync 变量,Sync 是外部的一个嵌套类,继承自 AQS。
- await / await(long, TimeUnit)
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
- countDown
public void countDown() {sync.releaseShared(1);
}
其中,acquireSharedInterruptibly、tryAcquireSharedNanos 和 releaseShared 都是 AQS 中「共享模式」的办法,具体代码可参考前文「JDK 源码剖析 -AbstractQueuedSynchronizer(3)」的剖析。
嵌套类 Sync 代码如下:
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
// 结构器,初始化 AQS 的 state 变量
Sync(int count) {setState(count);
}
int getCount() {return getState();
}
// 尝试获取资源的操作
// 只有当 state 变量为 0 的时候能力获取胜利(返回 1)protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;
}
// 尝试开释资源的操作
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {int c = getState();
if (c == 0)
return false;
// 该操作就是尝试把 state 变量减去 1
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
Sync 继承了 AQS 抽象类,依据 AQS 可知,acquireSharedInterruptibly 和 tryAcquireSharedNanos 办法的实现都调用了 tryAcquireShared。
流程阐明:通常先把 CountDownLatch 的计数器(state)初始化为 N,执行 wait 操作就是尝试以共享模式获取资源,而每次 countDown 操作就是将 N 减去 1,只有当 N 减到 0 的时候,能力获取胜利(tryAcquireShared 办法),而后继续执行。
3. 场景举例
为便于了解该类的用法,举两个简略的例子来阐明它的应用场景。
- 场景 1:一个线程期待多个线程执行完之后再继续执行
public void test() throws InterruptedException {
int count = 5;
// CountDownLatch 的初始化计数器为 5
// 留神线程数和计数器保持一致
CountDownLatch countDownLatch = new CountDownLatch(count);
for (int i = 0; i < count; i++) {
int finalI = i;
new Thread(() -> {
try {TimeUnit.SECONDS.sleep(finalI);
} catch (InterruptedException e) {e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "is working ..");
// 每个线程执行完结时执行 countDown
countDownLatch.countDown();}).start();}
// 主线程进入期待状态(尝试获取资源,胜利后能力继续执行)countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "go on ..");
}
/* 输入后果:Thread-0 is working ..
Thread-1 is working ..
Thread-2 is working ..
Thread-3 is working ..
Thread-4 is working ..
main go on ..
*/
- 场景 2:一个线程达到指定条件后,告诉另一个线程
private static volatile List<Integer> list = new ArrayList<>();
private static void test() {CountDownLatch countDownLatch = new CountDownLatch(1);
new Thread(() -> {if (list.size() != 5) {
try {
// list 的大小为 5 时再继续执行,否则期待
// 期待 state 减到 0
countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "start..");
}).start();
new Thread(() -> {for (int i = 0; i < 10; i++) {list.add(i);
System.out.println(Thread.currentThread().getName() + "add" + i);
if (list.size() == 5) {
// 满足条件时将 state 减 1
countDownLatch.countDown();}
try {TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {e.printStackTrace();
}
}
}).start();}
/* 输入后果:Thread-1 add 0
Thread-1 add 1
Thread-1 add 2
Thread-1 add 3
Thread-1 add 4
Thread-0 start..
Thread-1 add 5
Thread-1 add 6
Thread-1 add 7
Thread-1 add 8
Thread-1 add 9
*/
4. 小结
CountDownLatch 能够了解为一个倒数的计数器,它的典型利用场景就是一个线程期待几个线程执行完结后再继续执行。其外部是基于 AQS 的共享模式实现的。
相干浏览:
JDK 源码剖析 -AbstractQueuedSynchronizer(3)