关于java:JUCCountDownLatch共享节点队列

34次阅读

共计 4425 个字符,预计需要花费 12 分钟才能阅读完成。

一、样例和原理

惯例应用

int i = 3;
// == 1. 初始化
CountDownLatch cd = new CountDownLatch(i);

while (i>0){new Thread(()->{
        try {TimeUnit.SECONDS.sleep(1L);
            System.out.println("Biz-Thread is over");
        } catch (InterruptedException e) {e.printStackTrace();
        }
        
        // == 3. 计数递加
        cd.countDown();}).start();
    i--;
}

// == 2. 阻塞
cd.await();

共享模式


构造如图,与 AQS 家族的 ReentrantLock 比照,最大的差异在于——CountDownLatch 是共享模式,ReentrantLock 是独占模式

差别体现在两个层面
一、代码层面
Node 节点:
static final class Node {

/** 共享 */
static final Node SHARED = new Node();
/** 独占 */
static final Node EXCLUSIVE = null;

二、性能层面

共享模式会开释全副的共享节点的绑定线程(head 节点会向下挪动,head=head.next);而独占模式只会开释 head.next 节点绑定的线程

共享模式的个性,在下一章 逆向应用 局部更为清晰

逆向应用

int i = 2;
CountDownLatch cd = new CountDownLatch(1);

while (i>0){new Thread(()->{
        try {System.out.println(Thread.currentThread().getName()+"筹备工作实现,期待主业务");
            cd.await();  // == 业务线程阻塞在此处
            
            System.out.println(Thread.currentThread().getName()+"业务开始");
            TimeUnit.SECONDS.sleep(1L);
            System.out.println(Thread.currentThread().getName()+"业务完结");
        } catch (InterruptedException e) {e.printStackTrace();
        }

    }).start();
    i--;
}

TimeUnit.SECONDS.sleep(3L);
// == 开释全副的业务线程
cd.countDown();
System.out.println(Thread.currentThread().getName()+"主业务完结");

循环中,图中的三个办法配合,shared 类型的节点会挨个失去开释
(当然 next 的指向也会开释,只是图中未体现)

二、源码剖析

1. 初始化

public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

java.util.concurrent.CountDownLatch.Sync#Sync
protected final void setState(int newState) {
    // ## 将 state 赋值
    state = newState;
}

2.await

public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {if (Thread.interrupted())
        throw new InterruptedException();
     // -- a. 尝试获取(判断 state 状态)if (tryAcquireShared(arg) < 0)
        // -- b. 获取共享锁
        doAcquireSharedInterruptibly(arg);
}

a. 尝试获取(判断 state 状态)

protected int tryAcquireShared(int acquires) {
    // 例子中 state 是个负数,返回 -1
    return (getState() == 0) ? 1 : -1;
}

b-1. 获取共享锁

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // == 队列构建
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {for (;;) {final Node p = node.predecessor();
            // == head.next 尝试获取
            if (p == head) {// $$ 1.countdown()办法将 state 计数清 0 时,返回 1; 未清 0,返回 -1
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // $$ 3.state 清 0 状况(最初一个 countDown 执行后)// ##### b-2. 头节点从新设置,并开释 shared
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }

            // $$ 2.ReentrantLock 时剖析过这部分,间接附上论断不再开展
            // 第 1 次将 waitstatus 设置成 signal 返回 false 
            // 第 2 次判断 waitstatus==signal 返回 true
            if (shouldParkAfterFailedAcquire(p, node) 
                    // === 线程阻塞(唤醒时,从此处继续执行)&& parkAndCheckInterrupt())
                throw new InterruptedException();}
    } finally {if (failed)
            cancelAcquire(node);
    }
}

上述代码的这部分(##### b-2. 头节点从新设置,并开释 shared)须要仔细分析下,
具体见下一章节

b-2. 头节点从新设置,并开释 shared

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    // == 头节点挪动(入参 node 此时是 head.next),head=head.next
    setHead(node);
    
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null 
            // 共享节点都会执行上面的开释逻辑
            || s.isShared()){
             // ## countDown 也会调用这个办法,此处不做剖析
            doReleaseShared();}
    }
}

// == 头节点挪动,资源开释
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

3.countDown

public void countDown() {sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    // == a.state 递加
    // 递加后 state>0,返回 false
    // 递加后 state=0,返回 true(进入 b 逻辑)if (tryReleaseShared(arg)) {
        // == b. 开释
        doReleaseShared();
        return true;
    }
    return false;
}

a.state 递加

protected boolean tryReleaseShared(int releases) {for (;;) {int c = getState();
        if (c == 0)
            return false;
        // cas 形式 -1
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
             // - 1 后 state= 0 则返回 true
            return nextc == 0;
    }
}

b. 开释

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    // 以后节点设置为头节点
    setHead(node);
    
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            // == 开释共享锁
            doReleaseShared();}
}

// == 开释共享锁
private void doReleaseShared() {for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                // ### cas 将 waitstatus 由 - 1 改成 0 失败,再次循环
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)){continue;}
                // ### cas 将 waitstatus 由 - 1 改成 0 胜利,h.next 绑定的线程解除阻塞
                unparkSuccessor(h);
            }
            
            else if (ws == 0 
                     // 头节点的 waitstatus 由 0 改成 -3
                    && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)){continue;}
        }
        
        // -- 如果执行过程中头节点未扭转,跳出循环;// -- 如果执行过程中头节点发生变化,再次在循环中执行以上操作
        if (h == head)                   
            break;
    }
}

重点察看这部分逻辑

### cas 将 waitstatus 由 - 1 改成 0 胜利,h.next 绑定的线程解除阻塞
private void unparkSuccessor(Node node) {
    // 确保 waitstatus 由 - 1 改成 0(cas 形式)int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 尾节点或 cancle 节点非凡解决
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // == 解锁 node.next 绑定的线程
    if (s != null)
        LockSupport.unpark(s.thread);
}

正文完
 0