共计 4425 个字符,预计需要花费 12 分钟才能阅读完成。
一、样例和原理
惯例应用
int i = 3;
// == 1. 初始化
CountDownLatch cd = new CountDownLatch(i);
while (i>0){new Thread(()->{
try {TimeUnit.SECONDS.sleep(1L);
System.out.println("Biz-Thread is over");
} catch (InterruptedException e) {e.printStackTrace();
}
// == 3. 计数递加
cd.countDown();}).start();
i--;
}
// == 2. 阻塞
cd.await();
共享模式
构造如图,与 AQS 家族的 ReentrantLock 比照,最大的差异在于——CountDownLatch 是共享模式,ReentrantLock 是独占模式
差别体现在两个层面
一、代码层面
Node 节点:
static final class Node {
/** 共享 */
static final Node SHARED = new Node();
/** 独占 */
static final Node EXCLUSIVE = null;
二、性能层面
共享模式会开释全副的共享节点的绑定线程(head 节点会向下挪动,head=head.next);而独占模式只会开释 head.next 节点绑定的线程
共享模式的个性,在下一章 逆向应用
局部更为清晰
逆向应用
int i = 2;
CountDownLatch cd = new CountDownLatch(1);
while (i>0){new Thread(()->{
try {System.out.println(Thread.currentThread().getName()+"筹备工作实现,期待主业务");
cd.await(); // == 业务线程阻塞在此处
System.out.println(Thread.currentThread().getName()+"业务开始");
TimeUnit.SECONDS.sleep(1L);
System.out.println(Thread.currentThread().getName()+"业务完结");
} catch (InterruptedException e) {e.printStackTrace();
}
}).start();
i--;
}
TimeUnit.SECONDS.sleep(3L);
// == 开释全副的业务线程
cd.countDown();
System.out.println(Thread.currentThread().getName()+"主业务完结");
循环中,图中的三个办法配合,shared 类型的节点会挨个失去开释
(当然 next 的指向也会开释,只是图中未体现)
二、源码剖析
1. 初始化
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
java.util.concurrent.CountDownLatch.Sync#Sync
protected final void setState(int newState) {
// ## 将 state 赋值
state = newState;
}
2.await
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {if (Thread.interrupted())
throw new InterruptedException();
// -- a. 尝试获取(判断 state 状态)if (tryAcquireShared(arg) < 0)
// -- b. 获取共享锁
doAcquireSharedInterruptibly(arg);
}
a. 尝试获取(判断 state 状态)
protected int tryAcquireShared(int acquires) {
// 例子中 state 是个负数,返回 -1
return (getState() == 0) ? 1 : -1;
}
b-1. 获取共享锁
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// == 队列构建
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {for (;;) {final Node p = node.predecessor();
// == head.next 尝试获取
if (p == head) {// $$ 1.countdown()办法将 state 计数清 0 时,返回 1; 未清 0,返回 -1
int r = tryAcquireShared(arg);
if (r >= 0) {
// $$ 3.state 清 0 状况(最初一个 countDown 执行后)// ##### b-2. 头节点从新设置,并开释 shared
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// $$ 2.ReentrantLock 时剖析过这部分,间接附上论断不再开展
// 第 1 次将 waitstatus 设置成 signal 返回 false
// 第 2 次判断 waitstatus==signal 返回 true
if (shouldParkAfterFailedAcquire(p, node)
// === 线程阻塞(唤醒时,从此处继续执行)&& parkAndCheckInterrupt())
throw new InterruptedException();}
} finally {if (failed)
cancelAcquire(node);
}
}
上述代码的这部分(##### b-2. 头节点从新设置,并开释 shared
)须要仔细分析下,
具体见下一章节
b-2. 头节点从新设置,并开释 shared
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// == 头节点挪动(入参 node 此时是 head.next),head=head.next
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null
// 共享节点都会执行上面的开释逻辑
|| s.isShared()){
// ## countDown 也会调用这个办法,此处不做剖析
doReleaseShared();}
}
}
// == 头节点挪动,资源开释
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
3.countDown
public void countDown() {sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
// == a.state 递加
// 递加后 state>0,返回 false
// 递加后 state=0,返回 true(进入 b 逻辑)if (tryReleaseShared(arg)) {
// == b. 开释
doReleaseShared();
return true;
}
return false;
}
a.state 递加
protected boolean tryReleaseShared(int releases) {for (;;) {int c = getState();
if (c == 0)
return false;
// cas 形式 -1
int nextc = c-1;
if (compareAndSetState(c, nextc))
// - 1 后 state= 0 则返回 true
return nextc == 0;
}
}
b. 开释
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 以后节点设置为头节点
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
// == 开释共享锁
doReleaseShared();}
}
// == 开释共享锁
private void doReleaseShared() {for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// ### cas 将 waitstatus 由 - 1 改成 0 失败,再次循环
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)){continue;}
// ### cas 将 waitstatus 由 - 1 改成 0 胜利,h.next 绑定的线程解除阻塞
unparkSuccessor(h);
}
else if (ws == 0
// 头节点的 waitstatus 由 0 改成 -3
&& !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)){continue;}
}
// -- 如果执行过程中头节点未扭转,跳出循环;// -- 如果执行过程中头节点发生变化,再次在循环中执行以上操作
if (h == head)
break;
}
}
重点察看这部分逻辑
### cas 将 waitstatus 由 - 1 改成 0 胜利,h.next 绑定的线程解除阻塞
private void unparkSuccessor(Node node) {
// 确保 waitstatus 由 - 1 改成 0(cas 形式)int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 尾节点或 cancle 节点非凡解决
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// == 解锁 node.next 绑定的线程
if (s != null)
LockSupport.unpark(s.thread);
}
正文完