一、样例和原理
惯例应用
int i = 3;// == 1.初始化CountDownLatch cd = new CountDownLatch(i);while (i>0){ new Thread(()->{ try { TimeUnit.SECONDS.sleep(1L); System.out.println("Biz-Thread is over"); } catch (InterruptedException e) { e.printStackTrace(); } // == 3.计数递加 cd.countDown(); }).start(); i--;}// == 2.阻塞cd.await();
共享模式
构造如图,与AQS家族的ReentrantLock比照,最大的差异在于——CountDownLatch是共享模式,ReentrantLock是独占模式
差别体现在两个层面
一、代码层面
Node节点:
static final class Node {
/** 共享 */static final Node SHARED = new Node();/** 独占 */static final Node EXCLUSIVE = null;
二、性能层面
共享模式会开释全副的共享节点的绑定线程(head节点会向下挪动,head=head.next);而独占模式只会开释head.next节点绑定的线程
共享模式的个性,在下一章逆向应用
局部更为清晰
逆向应用
int i = 2;CountDownLatch cd = new CountDownLatch(1);while (i>0){ new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+" 筹备工作实现,期待主业务"); cd.await(); // == 业务线程阻塞在此处 System.out.println(Thread.currentThread().getName()+" 业务开始"); TimeUnit.SECONDS.sleep(1L); System.out.println(Thread.currentThread().getName()+" 业务完结"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); i--;}TimeUnit.SECONDS.sleep(3L);// == 开释全副的业务线程cd.countDown();System.out.println(Thread.currentThread().getName()+" 主业务完结");
循环中,图中的三个办法配合,shared类型的节点会挨个失去开释
(当然next的指向也会开释,只是图中未体现)
二、源码剖析
1.初始化
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count);}java.util.concurrent.CountDownLatch.Sync#Syncprotected final void setState(int newState) { // ## 将state赋值 state = newState;}
2.await
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1);}public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // -- a.尝试获取(判断state状态) if (tryAcquireShared(arg) < 0) // -- b.获取共享锁 doAcquireSharedInterruptibly(arg);}
a.尝试获取(判断state状态)
protected int tryAcquireShared(int acquires) { // 例子中state是个负数,返回-1 return (getState() == 0) ? 1 : -1;}
b-1.获取共享锁
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // == 队列构建 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); // == head.next尝试获取 if (p == head) { // $$ 1.countdown()办法将state计数清0时,返回1;未清0,返回-1 int r = tryAcquireShared(arg); if (r >= 0) { // $$ 3.state清0状况(最初一个countDown执行后) // ##### b-2.头节点从新设置,并开释shared setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // $$ 2.ReentrantLock时剖析过这部分,间接附上论断不再开展 // 第1次将waitstatus设置成signal返回false // 第2次判断waitstatus==signal返回true if (shouldParkAfterFailedAcquire(p, node) // === 线程阻塞(唤醒时,从此处继续执行) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}
上述代码的这部分(##### b-2.头节点从新设置,并开释shared
)须要仔细分析下,
具体见下一章节
b-2.头节点从新设置,并开释shared
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // == 头节点挪动(入参node此时是head.next),head=head.next setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null // 共享节点都会执行上面的开释逻辑 || s.isShared()){ // ## countDown也会调用这个办法,此处不做剖析 doReleaseShared(); } }}// == 头节点挪动,资源开释private void setHead(Node node) { head = node; node.thread = null; node.prev = null;}
3.countDown
public void countDown() { sync.releaseShared(1);}public final boolean releaseShared(int arg) { // == a.state递加 // 递加后state>0,返回false // 递加后state=0,返回true(进入b逻辑) if (tryReleaseShared(arg)) { // == b.开释 doReleaseShared(); return true; } return false;}
a.state递加
protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; // cas形式-1 int nextc = c-1; if (compareAndSetState(c, nextc)) // -1后state=0则返回true return nextc == 0; }}
b.开释
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below // 以后节点设置为头节点 setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) // == 开释共享锁 doReleaseShared(); }}// == 开释共享锁private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { // ### cas将waitstatus由-1改成0失败,再次循环 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)){ continue; } // ### cas将waitstatus由-1改成0胜利,h.next绑定的线程解除阻塞 unparkSuccessor(h); } else if (ws == 0 // 头节点的waitstatus由0改成-3 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)){ continue; } } // -- 如果执行过程中头节点未扭转,跳出循环; // -- 如果执行过程中头节点发生变化,再次在循环中执行以上操作 if (h == head) break; }}
重点察看这部分逻辑
### cas将waitstatus由-1改成0胜利,h.next绑定的线程解除阻塞private void unparkSuccessor(Node node) { // 确保waitstatus由-1改成0(cas形式) int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 尾节点或cancle节点非凡解决 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // == 解锁node.next绑定的线程 if (s != null) LockSupport.unpark(s.thread);}