共计 4358 个字符,预计需要花费 11 分钟才能阅读完成。
概述
CountDownLatch 容许一个或多个线程期待直到在其余线程中执行的一组操作实现的同步辅助。简略来说,就是 CountDownLatch 外部保护了一个计数器,每个线程实现本人的操作之后都会将计数器减一,而后会在计数器的值变为 0 之前始终阻塞,直到计数器的值变为 0.
简略应用
这个例子次要演示了,如何利用 CountDownLatch 去协调多个线程同时开始运行。这个时候的 CountDownLatch 中的计数器的事实含意是期待创立的线程个数,每个线程在开始工作之前都会调用 await() 办法阻塞,直到所有线程都创立好,每当一个线程创立好后,都会提交调用 countDown() 办法将计数器的值减一 (代表待创立的线程数减一)。
public static void main(String[] args) {Test countDownLatchTest=new Test();
countDownLatchTest.runThread();}
// 计数器为 10,代表有 10 个线程期待创立
CountDownLatch countDownLatch=new CountDownLatch(10);
/**
* 创立一个线程
* @return
*/
private Thread createThread(int i){Thread thread=new Thread(new Runnable() {
@Override
public void run() {
try {
// 在此期待,直到计数器变为 0
countDownLatch.await();
System.out.println("thread"+Thread.currentThread().getName()+"筹备结束"+System.currentTimeMillis());
}catch (InterruptedException e){e.printStackTrace();
}
}
});
thread.setName("thread-"+i);
return thread;
}
public void runThread(){ExecutorService executorService= Executors.newFixedThreadPool(10);
try {for(int i=0;i<10;i++){Thread.sleep(100);
executorService.submit(createThread(i));
// 一个线程创立好了,待创立的线程数减一
countDownLatch.countDown();}
}catch (InterruptedException e){e.printStackTrace();
}
}
上面咱们就以这个例子,来解释源码:
源码剖析
继承体系
从锁的分类上来讲,CountDownLatch 其实是一个”共享锁“。还有一个须要留神的是 CountDownLath 是响应中断的,如果线程在对锁进行操作的期间产生了中断,会间接抛出 InterruptedException。
源码剖析
计数器的实质是什么?
方才咱们也提到了,CountDownLatch 中一个十分重要的货色就是计数器。那么咱们首先须要剖析的就是源码中哪个局部充当了计数器的角色。
咱们通过构造方法来查看:
咱们的代码CountDownLatch countDownLatch=new CountDownLatch(10);
背地实际上是调用了上面这个办法:
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
而这个 Sync 的实例化又做了什么工作呢?
Sync(int count) {setState(count); // 就是批改了 AQS 中的 state 值
}
当初曾经解决了咱们的第一个问题,实际上 AQS 中的 state 充当了计数器。
await 办法
- await 办法实际上是调用了 sync 的一个办法
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}
- sync 的
void acquireSharedInterruptibly(int arg)
的实现如下
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {if (Thread.interrupted())
// 如果线程中断了,则抛异样。// 证实了之前所说的 CountDownLatch 是会响应中断的
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
- 如果没有中断,就会调用
tryAcquireShared(arg)
它的实现十分的简略,如果 state 为 0,就返回 1,否则返回 – 1
protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;
}
- 如果 state 不为 0,就会返回 – 1,if 条件成立,就会调用
doAcquireSharedInterruptibly(arg)
这个办法的实现,略微简单一点,但这个办法也不生疏了,它的性能就是把该线程退出期待队列中并阻塞,然而在入队之后,不肯定会立刻 park 阻塞,它会判断本人是否是第二个节点,如果是就会再次尝试获取。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {for (;;) {final Node p = node.predecessor(); // 获取以后节点的前驱节点
if (p == head) {// 前一个节点是头节点
int r = tryAcquireShared(arg); // 去看一看 state 是否为 0,步骤 3 剖析过
if (r >= 0) {
// 如果 state 目前为 0,就出队
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 进入阻塞队列阻塞,如果产生中断,则抛异样
throw new InterruptedException();}
} finally {if (failed)
cancelAcquire(node);
}
CountDownLatch 的 await 办法比其它几个锁的实现简略得多。不过须要留神的一点就是 CountDownLatch 是会响应中断的,这一点在源码中也有多处体现。
countDown 办法
- countDown 办法实际上是调用 sync 中的一个办法
public void countDown() {sync.releaseShared(1);
}
boolean releaseShared(int arg)
的具体实现如下:
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();
return true;
}
return false;
}
tryReleaseShared(arg)
办法的具体实现如下:
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {// 自旋
int c = getState();
if (c == 0)// 计数器曾经都是 0 了,当然会开释失败咯
return false;
int nextc = c-1;// 开释后,计数器减一
if (compareAndSetState(c, nextc))//CAS 批改计数器
return nextc == 0;
}
}
这个办法就是去尝试间接批改 state 的值。如果 state 的批改胜利,且批改后的 state 值为 0,就会返回 true。就会执行
doReleaseShared();
办法。
doReleaseShared();
的实现如下,它的作用就是 state 为 0 的时候,去唤醒期待队列中的线程。
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) { // 自旋
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
当初根本源码曾经剖析结束了,只有了解了 AQS 和 CountDownLatch 的计数器到底是什么,就可能很好的了解 CountDownLatch 的原理了。
关注微信公众号:【入门小站】, 解锁更多知识点