本文首发于微信公众号【WriteOnRead】,欢送关注。
1. 概述
Semaphore 是并发包中的一个工具类,可了解为信号量。通常能够作为限流器应用,即限度拜访某个资源的线程个数,比方用于限度连接池的连接数。
打个艰深的比如,能够把 Semaphore 了解为一辆公交车:车上的座位数(初始的“许可”permits 数量)是固定的,行驶期间如果有人上车(获取许可),座位数(许可数量)就会缩小,当人满的时候不能再持续上车了(获取许可失败);而有人下车(开释许可)后就空出了一些座位,其他人就能够持续上车了。
上面具体分析其代码实现。
2. 代码剖析
Semaphore 的办法如下:
其中次要办法是 acquire() 和 release() 相干的一系列办法,它们的作用相似。咱们先从结构器开始剖析。
- 结构器
private final Sync sync;
// 初始化 Semaphore,传入指定的许可数量,非偏心
public Semaphore(int permits) {sync = new NonfairSync(permits);
}
// 初始化 Semaphore,传入指定的许可数量,指定是否偏心
public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
结构器初始化了 Sync 变量,依据传入的 fair 值指定为 FairSync 或 NonFairSync,上面剖析这三个类。
- 外部嵌套类 Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
// 结构器,将父类 AQS 的 state 变量初始化为给定的 permits
Sync(int permits) {setState(permits);
}
// 非偏心形式尝试获取许可(缩小 state 的值)final int nonfairTryAcquireShared(int acquires) {
// 自旋操作
for (;;) {
// 获取许可值(state),并尝试 CAS 批改为减去后的后果
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// 开释许可(减少 state 的值)protected final boolean tryReleaseShared(int releases) {for (;;) {
// 操作与获取相似,不同的在于此处是减少 state 值
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
// 一些办法未给出...
}
能够看到 Sync 类继承自 AQS,并重写了 AQS 的 tryReleaseShared 办法,其中获取和开释许可别离对应的是对 AQS 中 state 值的减法和加法操作。具体可参考前文对 AQS 共享模式的剖析「JDK 源码剖析 -AbstractQueuedSynchronizer(3)」。
- NonFairSync (非偏心版本实现)
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
// 调用父类 Sync 的结构器来实现
NonfairSync(int permits) {super(permits);
}
// 重写 AQS 的 tryAcquireShared 办法,代码实现在父类 Sync 中
protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);
}
}
- FairSync (偏心版本实现)
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
// 结构器调用父类 Sync 的结构器来实现
FairSync(int permits) {super(permits);
}
// 重写 AQS 的 tryAcquireShared 办法,尝试获取许可(permit)protected int tryAcquireShared(int acquires) {for (;;) {
// 若队列中有其余线程期待,则获取失败(这就是体现“偏心”的中央)if (hasQueuedPredecessors())
return -1;
// 获取以后的许可值
int available = getState();
// 计算残余值
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
PS: 体现“偏心”的中央在于 tryAcquireShared 办法中,偏心的版本会先判断队列中是否有其它线程在期待(hasQueuedPredecessors 办法)。
次要办法的代码实现:
// 获取一个许可(可中断)public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}
// 获取一个许可(不响应中断)public void acquireUninterruptibly() {sync.acquireShared(1);
}
// 尝试获取一个许可
public boolean tryAcquire() {return sync.nonfairTryAcquireShared(1) >= 0;
}
// 尝试获取一个许可(有超时期待)public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
// 开释一个许可
public void release() {sync.releaseShared(1);
}
还有一系列相似的操作,只不过获取 / 开释许可的数量能够指定:
// 获取指定数量的许可(可中断)public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
// 获取指定数量的许可(不可中断)public void acquireUninterruptibly(int permits) {if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
// 尝试获取指定数量的许可
public boolean tryAcquire(int permits) {if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
// 尝试获取指定数量的许可(有超时期待)public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
// 开释指定数量的许可
public void release(int permits) {if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
能够看到,Semaphore 的次要办法都是在嵌套类 FairSync 和 NonFairSync 及其父类 Sync 中实现的,外部嵌套类也是 AQS 的典型用法。
3. 场景举例
为了便于了解 Semaphore 的用法,上面简略举例剖析(仅供参考):
public class SemaphoreTest {public static void main(String[] args) {
// 初始化 Semaphore
// 这里的许可数为 2,即同时最多有 2 个线程能够获取到
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 50; i++) {new Thread(() -> {
try {
// 获取许可
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "正在执行..");
// 模仿操作
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {e.printStackTrace();
} finally {
// 开释许可
semaphore.release();}
}).start();}
}
}
/* 执行后果(仅供参考):Thread-0 正在执行..
Thread-1 正在执行..
Thread-2 正在执行..
Thread-3 正在执行..
...
*/
这里把 Semaphore 的初始许可值设为 2,示意最多有两个线程可同时获取到许可(运行程序可发现线程是两两一起执行的)。设置为其余值也是相似的。
比拟非凡的是,如果把 Semaphore 的初始许可值设为 1,能够当做“互斥锁”来应用。
4. 小结
Semaphore 是并发包中的一个工具类,其外部是基于 AQS 共享模式实现的。通常能够作为限流器应用,比方限定连接池等的大小。
相干浏览:
JDK 源码剖析 -AbstractQueuedSynchronizer(3)