前言

在 JUC 中线程同步器除了 CountDownLatch 和 CycleBarrier ,还有一个叫做 Semaphore (信号量),同样是基于 AQS 实现的。上面来看看信号量的外部原理。

公众号:『 刘志航 』,记录工作学习中的技术、开发及源码笔记;时不时分享一些生存中的见闻感悟。欢送大佬来领导!

介绍

一个计数信号量。 从概念上讲,信号量保护了一组许可。 如果有必要,在许可可用之前调用 acquire 办法会被阻塞,直到许可证可用。 调用 release 办法会减少了一个许可证,从而开释被阻塞的线程。

  1. 申明时指定初始许可数量。
  2. 调用 acquire(int permits) 办法,指定指标许可数量。
  3. 调用 release(int permits) 办法,公布指定的许可数量。

在许可数量没有达到指定指标数量时,调用 acquire 办法的线程会被阻塞。

根本应用

public class SemaphoreTest1 {    private static final Semaphore SEMAPHORE = new Semaphore(0);    public static void main(String[] args) throws InterruptedException {        ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,                new LinkedBlockingQueue<>(1024),                new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(),                new ThreadPoolExecutor.AbortPolicy());        for (int i = 0; i < 5; i++) {            pool.submit(() -> {                try {                    Thread.sleep(1000 + new Random().nextInt(1000));                } catch (InterruptedException ignored) {                }                System.out.println("以后线程: " + Thread.currentThread().getName() + " 公布一个许可");                SEMAPHORE.release(1);            });        }        System.out.println("-----> 这里是主线程");        SEMAPHORE.acquire(5);        System.out.println("-----> 主线程执行结束");        pool.shutdown();    }}
-----> 这里是主线程以后线程: Thread-pool-2 公布一个许可以后线程: Thread-pool-4 公布一个许可以后线程: Thread-pool-1 公布一个许可以后线程: Thread-pool-0 公布一个许可以后线程: Thread-pool-3 公布一个许可-----> 主线程执行结束

下面这个办法也是模仿了相似 CountDownLatch 的用法, 在子线程执行结束之后,主线程继续执行。只不过 Semaphore 和 CountDownLatch 区别最大的是:

Semaphore 是从指定数值开始减少,直到达到许可数量,而后被阻塞线程开始继续执行。

CountDownLatch 是从指定数量的线程开始缩小,直到为 0 时,被阻塞的线程开始继续执行。

当然这只是最简略的用法,除此让主线程期待,同样也能够让其余线程期待,而后再开始执行。

问题疑难

  1. Semaphore 和 AQS 有什么关系?
  2. Semaphore 和 CountDownLatch 有什么区别?

源码剖析

根本构造

通过类图能够看出在 Semaphore 外面有一个动态外部类 Sync 继承了 AQS,同时为了辨别偏心和非偏心的状况,Sync 别离有两个子类:NonfairSync 、FairSync。

上面依据案例别离从构造函数、acquire()、release() 动手,从而理解外部实现原理。

初始化

public Semaphore(int permits) {    sync = new NonfairSync(permits);}

初始化默认非偏心锁, 同时须要传入指定许可数, 能够看到这块代码是调用的 AQS 的 setState(permits) 办法。代码如下:

static final class NonfairSync extends Sync {    private static final long serialVersionUID = -2694183684443567898L;    NonfairSync(int permits) {        super(permits);    }}abstract static class Sync extends AbstractQueuedSynchronizer {        private static final long serialVersionUID = 1192457210091910933L;        Sync(int permits) {            setState(permits);        } }

setState 办法其实就是对 AQS 的 state 进行赋值。

补充

  1. 在 ReentrantLock 中 state 代表加锁状态,0 没有线程取得锁,大于等于 1 曾经有线程取得锁,大于 1 阐明该取得锁的线程多次重入。
  2. 在 ReentrantReadWriteLock 中 state 代表锁的状态。state 为 0 ,没有线程持有锁,state 的高 16 为代表读锁状态,低 16 为代表写锁状态。通过位运算能够获取读写锁的理论值。
  3. 而在这里 (CountDownLatch)则代表门闩或者说计数的值。

如果对 state 有所忘记,能够浏览后面的 AQS 、CAS 相干代码。 state 在这里代表的是信号量的许可数量。

acquire()

public void acquire() throws InterruptedException {    sync.acquireSharedInterruptibly(1);}public void acquire(int permits) throws InterruptedException {    if (permits < 0) throw new IllegalArgumentException();    sync.acquireSharedInterruptibly(permits);}

acquire() 和 acquire(int permits) 调用的都是 sync.acquireSharedInterruptibly(permits) 办法,只不过一个反对传递参数,一个默认为 1。

acquireSharedInterruptibly 办法,其实就是 Sync 继承自 AQS 的。

这块能够浏览 AQS 的文章,这里简略介绍下:

private void doAcquireSharedInterruptibly(int arg)    throws InterruptedException {    final Node node = addWaiter(Node.SHARED);    boolean failed = true;    try {        for (;;) {            final Node p = node.predecessor();            if (p == head) {                int r = tryAcquireShared(arg);                if (r >= 0) {                    setHeadAndPropagate(node, r);                    p.next = null; // help GC                    failed = false;                    return;                }            }            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                throw new InterruptedException();        }    } finally {        if (failed)            cancelAcquire(node);    }}
  1. 在失败后会应用 doAcquireSharedInterruptibly(arg); 一直获取资源;
  2. final Node node = addWaiter(Node.SHARED); 会创立节点以共享模式放到队列里;
  3. 在循环中一直判断前一个节点,如果是 head,则尝试获取共享资源;
  4. 在共享模式下获取到资源后会应用 setHeadAndPropagate(node, r); 设置头节点,同时唤醒后续节点。

tryAcquireShared 是须要子类实现,也就是在 Semaphore.Sync 的实现类中实现了,这里以 FairSync 做解说:

static final class FairSync extends Sync {    private static final long serialVersionUID = 2014338818796000944L;    FairSync(int permits) {        super(permits);    }    protected int tryAcquireShared(int acquires) {        for (;;) {            // 如果后面有节点,则间接返回 -1 示意失败            if (hasQueuedPredecessors())                return -1;            // 获取以后信号量            int available = getState();            // 获取以后残余量            int remaining = available - acquires;            // 如果小于 0 或者 CAS 设置信号量胜利 则间接返回            if (remaining < 0 ||                compareAndSetState(available, remaining))                return remaining;        }    }}

而这段代码的含意:

  1. 如果后面有节点,则间接阻塞;
  2. 如果以后残余信号量小于 0 ,则返回负值,间接阻塞;
  3. 如果以后残余量大于等于 0 ,会 CAS 更新信号量,并返回非正数。

这块数值的含意,在 AQS 中定义了,含意如下:

  1. 小于 0: 示意失败;
  2. 等于 0: 示意共享模式获取资源胜利,但后续的节点不能以共享模式获取胜利;
  3. 大于 0: 示意共享模式获取资源胜利,后续节点在共享模式获取也可能会胜利,在这种状况下,后续期待线程必须查看可用性。

release()

public void release() {    sync.releaseShared(1);}public void release(int permits) {    if (permits < 0) throw new IllegalArgumentException();    sync.releaseShared(permits);}

公布许可证的给定数量,该数量减少可用的许可数量。 看其外部调用的是 Sync 的 releaseShared, 其实就是 AQS 的对应办法:

public final boolean releaseShared(int arg) {    if (tryReleaseShared(arg)) {        doReleaseShared();        return true;    }    return false;}

如果实现tryReleaseShared返回true,以共享模式开释资源。 其中的 tryReleaseShared 局部由 Semaphore.Sync 中实现,逻辑如下:

protected final boolean tryReleaseShared(int releases) {    for (;;) {        // 获取以后 state        int current = getState();        // 对 state 进行减少        int next = current + releases;        if (next < current) // overflow            throw new Error("Maximum permit count exceeded");        // 应用 CAS 赋值        if (compareAndSetState(current, next))            return true;    }}

通过下面代码能够看出,在 Semaphore 的 release 办法中次要就是对 state 进行减少,减少胜利后会调用 AQS 的 doReleaseShared 办法唤醒头节点。

总结

Q&A

Q: 既然 Semaphore 也是基于 AQS, 那在 Semaphore 中 state 的含意代表什么?
A: 在 Semaphore 中 state 代表许可数量,acquire 办法当许可小于指定数量会阻塞线程,release 办法减少许可当许可减少胜利则唤醒阻塞节点。

Q: Semaphore 基于 AQS 具体是怎么实现的呢?
A:

  1. 初始设置 state 的初始值,即初始许可数量。
  2. acquire 办法设置指标数量,当指标数量大于以后数量时,会阻塞线程并将其放到阻塞队列中。此处基于 AQS 实现。
  3. release 对 state 进行减少,胜利后会调用 AQS 的 doReleaseShared 唤醒头结点。同样是基于 AQS 实现。

Q: Semaphore 和 CountDownLatch 有什么区别?
A: Semaphore 的计数器是递减的,而 CountDownLatch 是递加的。相同点就是计数器都不能够重置。

结束语

在浏览 Semaphore 源码过程中,发现其次要性能都是基于 AQS 实现的,能够回顾浏览 AQS 的相干笔记。同样 Semaphore 也反对偏心和非偏心模式,这块就须要小伙伴本人去浏览啦。