乐趣区

关于java:快来看看AQS-和-CountDownLatch-有怎么样的关系

前言

CountDownLatch 一个同步辅助工具,同样是基于 AQS 实现,本篇文件次要是介绍 CountDownLatch 的应用,以及源码。

公众号:『刘志航』,记录工作学习中的技术、开发及源码笔记;时不时分享一些生存中的见闻感悟。欢送大佬来领导!

介绍

一个同步辅助工具,容许一个或多个线程期待,直到在其余线程中执行的一组操作实现为止

一个 CountDownLatch 初始化为给定计数。在 await 办法阻塞,调用 countDown 办法会缩小计数直到达到零,尔后所有期待的线程被开释,任何后续调用 await 都会立刻返回。这是一次性的景象 – 计数不能复位。如果你须要一个版本重置计数,请思考应用 CyclicBarrier。

CountDownLatch 是一种通用的同步工具,可用于多种用处。

  1. 用作一个简略的开 / 关锁存器,或者门:所有线程调用 await 在门口期待,直到被调用 countDown 的线程关上。
  2. 初始化计数为 N,用一个线程期待,直到 N 个线程实现某项操作,或某些动作曾经实现 N 次。

CountDownLatch 一个有用的属性是,它不要求调用 countDown 线程期待计数达到零之前持续,它只是阻止任何线程通过 await,直到所有线程能够通过。

根本应用

在我之前 CAS 那篇文章《从 JUC 源码看 CAS,我做了个笔记 ……》中介绍 CAS 举例时应用了 CountDownLatch,其代码如下:

public class CasTest {private static final CountDownLatch LATCH = new CountDownLatch(10);

    private static int NUM_I = 0;
    private static volatile int NUM_J = 0;
    private static final AtomicInteger NUM_K = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {ExecutorService threadPool = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {threadPool.execute(new Runnable() {public void run() {for (int j = 0; j < 10000; j++) {
                        NUM_I++;
                        NUM_J++;
                        NUM_K.incrementAndGet();}
                    LATCH.countDown();}
            });
        }
        LATCH.await();

        System.out.println("NUM_I =" + NUM_I);
        System.out.println("NUM_J =" + NUM_J);
        System.out.println("NUM_K =" + NUM_K.get());
        threadPool.shutdown();}

}

简略介绍下这段代码的次要逻辑及性能:

  1. CountDownLatch 初始化计数为 10。
  2. 开 10 个线程去解决业务逻辑,业务逻辑完结会调用 LATCH.countDown() 对计数进行 -1 操作。
  3. 在 LATCH.await() 处会阻塞期待,直到 LATCH 的值为 0,即 10 个线程业务都解决完结。
  4. 而后主线程继续执行。

问题疑难

  1. CountDownLatch 和 AQS 有什么关系?
  2. CountDownLatch 的实现原理是什么?

源码剖析

根本构造

通过类图能够看出,CountDownLatch 外部存在一个动态类 Sync,而 Sync 继承了 AbstractQueuedSynchronizer。具体外部是如何实现的,则上面通过源码和画图一步一步的进行介绍。

初始化

public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

通过初始化结构器能够看出,在 new 创建对象时必须传递一个 int 类型的非正数。实现逻辑能够看出,是创立了一个 Sync 对象。

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {setState(count);
    }

    int getCount() {return getState();
    }

}   

之前在介绍 AQS 源码中曾经介绍了 state 的含意,state 在不同子类中代表不同的含意。

  1. 在 ReentrantLock 中 state 代表加锁状态,0 没有线程取得锁,大于等于 1 曾经有线程取得锁,大于 1 阐明该取得锁的线程多次重入。
  2. 在 ReentrantReadWriteLock 中 state 代表锁的状态。state 为 0,没有线程持有锁,state 的高 16 为代表读锁状态,低 16 为代表写锁状态。通过位运算能够获取读写锁的理论值。
  3. 而在这里(CountDownLatch)则代表门闩或者说计数的值。

countDown

public void countDown() {sync.releaseShared(1);
}

递加锁存器的计数:

  • 如果以后计数大于零,则递加。
  • 如果计数达到零,则开释所有期待的线程。
  • 如果那么以后计数等于零没有任何反馈。

此处调用的是 AQS 的 releaseShard() 办法,开释共享资源。

// AQS 代码
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();
        return true;
    }
    return false;
}

在 AQS 开释共享资源办法中 tryReleaseShared(arg) 局部是在 CountDownLatch 的外部类 Sync 中实现的,代码局部如下:

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

递加更新 state,如果 state 为 0 则返回 false,否则返回 true。

此时再对照下面 AQS 代码,发现:如果 tryReleaseShared 返回 true,则会唤醒后续节点开始执行操作。所以也就是说,如果 state 不为 0,则不会唤醒后续节点,直到 state 为 0。

await

public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}

导致以后线程期待,直到锁存器倒计数至零,除非线程被中断。

  • 如果以后计数为零,则此办法立刻返回。
  • 如果以后计数大于零,则以后线程用于线程调度目标,禁用并始终处于休眠状态的两件事件之一产生:

    • 因调用 countDown 办法使计数达到 0;
    • 其余某些线程中断以后线程。
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

AQS 定义了 tryAcquireShared 返回值分为 3 种:

  1. 小于 0: 示意失败;
  2. 等于 0: 示意共享模式获取资源胜利,但后续的节点不能以共享模式获取胜利;
  3. 大于 0: 示意共享模式获取资源胜利,后续节点在共享模式获取也可能会胜利,在这种状况下,后续期待线程必须查看可用性。

其中 tryAcquireShared 同样由 CountDownLatch 的外部类 Sync 中实现,外部逻辑次要是判断 state 的值,进行返回。

在外部实现中返回的值只有 1 和 -1,阐明在 state == 0 时,返回 1,即唤醒后续节点。不等于 0 时,会阻塞。

protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;
}

总结

Q: CountDownLatch 和 AQS 有什么关系?

A: CountDownLatch 是基于 AQS 的共享模式实现的。

Q: CountDownLatch 的实现原理是什么?

A: 能够参考下面的源码解析,进行总结介绍。CountDownLatch 是基于 AQS 共享模式实现的,在初始化时必须传入计数,该计数实际上是 AQS 的 state 值。在 countDown 时对 state 进行递加,在 当 state 为 0 时 会唤醒 AQS 队列中的所有期待的节点(因为是共享模式)。而 await 办法是判断 state 的值,如果不是 0,则所有线程在队列中阻塞,期待唤醒。

Q: state 在代表的含意是什么?
A:

  1. 在 ReentrantLock 中 state 代表加锁状态,0 没有线程取得锁,大于等于 1 曾经有线程取得锁,大于 1 阐明该取得锁的线程多次重入。
  2. 在 ReentrantReadWriteLock 中 state 代表锁的状态。state 为 0,没有线程持有锁,state 的高 16 为代表读锁状态,低 16 为代表写锁状态。通过位运算能够获取读写锁的理论值。
  3. 而在这里(CountDownLatch)则代表门闩或者说计数的值。

结束语

本文次要介绍了 CyclicBarrier 的罕用形式,通过源码形式,剖析如何达到屏障以及回环的成果。不对之处,请多斧正。

相干材料

  1. Java SE API:https://docs.oracle.com/javas…
退出移动版