一、基于 AQS 实现的锁
AQS(AbstractQueuedSynchronizer)是 Java 并发包 JUC 中十分重要的一个类,大部分锁都是基于 AQS 实现的,次要实现的类如下:
ReentrantLock
:可重入锁,独占锁,实现了偏心锁和非偏心锁,这个是上篇内容介绍的类,也是最罕用类,通常会和synchronized
作比拟。ReentrantReadWriteLock
:读写锁,可共享也可独占锁,读是共享锁,写是独占锁,也实现了偏心锁和非偏心锁。Semaphore
:信号锁,共享锁,也实现了偏心锁和非偏心锁,次要同于管制流量,比方:数据库连接池给你调配 10 个链接,来一个调配一个,如果 10 个都调配完了且没有开释那就期待开释。CountDownLatch
:闭锁,共享锁,也实现了偏心锁和非偏心锁,Latch 门闩的意思,比方:说四个人一个漂流艇,坐满了就推上水。
二、Semaphore
Semaphore 是什么
下面曾经介绍了Semaphore
,基于 AQS 实现的信号锁,是共享锁,实现了偏心锁和非偏心锁。能够用来管制同时拜访特定资源的线程数,通过协调各个线程以保障正当的应用资源。
Semaphore 应用场景
通常用于资源有明确拜访数量限度的场景,罕用于限流。
比方:数据库连接池,同时进行连贯的线程数量有限度,连贯不能超过肯定的数量,当连贯达到了限度的数量后,前面的线程只能排队期待后面的线程开释了数据库链接能力获取数据库链接。
比方:停车场场景,车位数量无限,同时只能听肯定数量的车,当停满了之后里面的车只能等外面的车进去能力进去停车。
Semaphore 的罕用办法
// 从信号锁获取获取一个锁,在获取到锁之前始终解决阻塞状态,除非线程被中断
acquire();
// 从信号锁获取获取指定数量锁,在获取到锁之前始终解决阻塞状态,除非线程被中断
acquire(int permits);
// 从信号锁获取一个锁,在获取到锁之前线程始终处于阻塞状态(疏忽中断)acquireUninterruptibly();
// 尝试从信号锁获取锁,返回获取胜利或者失败,不会阻塞线程
tryAcquire();
// 尝试从信好锁获取锁,指定获取工夫,在指定工夫内没有获取到则超时返回,不会阻塞线程
tryAcquire(long timeount, TimeUnit unit);
// 开释锁
release();
// 获取期待队列中是否还有期待线程
hadQueuedThreads();
// 获取期待队列里阻塞线程的数量
getQueuedLength();
// 清空锁,返回清空锁的数量
drainPermits();
// 返回可用的锁的数量
availabelPermits();
Semaphore 实现原理
初始化
Semaphore 提供了两个构造方法,默认构造方法创立指定锁数量的非偏心信号锁,另外一个构造方法多了一个指定是偏心锁还是非偏心锁的参数,源码如下:
// 构建指定数量锁的非偏心信号锁
public Semaphore(int permits) {sync = new NonfairSync(permits);
}
// 构建指定数量锁的偏心 / 非偏心信号锁
public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
获取锁过程
acquire
acquire()
是获取锁办法,调用了外部类同步器 Sync
继承了 AQS
理论调用 AQS
的acquireSharedInterruptibly()
外围,源码如下:
public void acquire() throws InterruptedExcetion {sync.acquireSharedInterruptibly();
}
加 JDK 中,与锁相干的办法,Interruptibly
示意可中断,也就是可中断锁。可中断锁的意思是 线程在期待获取锁的过程中是能够被中断的 ,换言之, 线程在期待锁的过程中能够响应中断。
acquireSharedInterruptiby
acquireSharedInterruptibly
办法是获取可中断锁,源码如下:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {if (Thread.interrupted())
// 检测线程的中断状态,如果曾经被中断了,就响应中断,该办法会革除线程中的中断标识
throw new InterruptedException();
// 尝试获取锁,arg 为锁的数量
// 当锁被获取完了之后,则为以后线程创立一个节点退出阻塞队列
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
acquireSharedInterruptibly
办法首先会判断以后线程的中断状态如果是中断状态则响应中断,抛异样,而后调用 tryAcquireShared()
办法获取锁,如果锁被获取完了就为以后线程创立一个节点退出期待队列。
tryAcquireShared
tryAcquireShared()
是 AQS
定义的一个模版办法,具体由子类实现,Semaphore
也实现了偏心锁和非偏心锁,两种锁大同小异,咱们具体来看一下偏心锁的具体实现,源码如下:
protected int tryAcquireShared(int acquires) {
// 自旋
for (;;) {
// 判断是否有排在本人后面的线程,如果有间接返回 -1,进入阻塞状态
if (hasQueuedPredecessors())
return -1;
// 获取同步状态的值(以后可用锁数量)
int available = getState();
// 残余锁数量,可用的 - 申请的
int remaining = available - acquires;
// 如果残余的锁小于 0, 或者设置胜利就返回,如果设置失败持续循环设置
// 如果残余锁数量小于 0,返回正数,示意获取锁失败
// 如果残余锁数量大于 0,且设置状态胜利,示意获取锁胜利
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
tryAcquireShared()
通过 自旋 +CAS的形式获取锁和保障线程平安。
doAcquireSharedInterruptibly
doAcquireSharedInterruptibly()
办法在偏心锁的时候如果以后线程后面有期待线程或者锁被获取完了之后,以后线程须要进入期待状态时会被调用,用于为以后线程创立节点并退出期待队列,源码如下:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 为以后线程创立共享模式节点退出队列结尾
final Node node = addWaiter(Node.SHARED);
// 操作失败标记
boolean failed = true;
try {
// 自旋
for (;;) {
// 获取以后节点的前一节点
final Node p = node.predecessor();
if (p == head) {
// 如果前一节点是头节点, 则尝试获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
// 如果获取胜利,设置头节点和共享模式流传
setHeadAndPropagate(node, r);
p.next = null;
failed = false;
return;
}
}
// 如果前一节点不是头节点或者没有获取到锁
// shouldParkAfterFailedAcquire 办法判断以后线程是否须要被阻塞
// parkAndCheckInterrupt 办法用于阻塞线程并检测线程是否被中断,如果被中断抛错
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();}
} finally {if (failed)
// 自旋异样退出 勾销线程获取锁
cancelAcquire(node);
}
}
获取锁过程总结
- 首先调用
Semaphore.acquire()
办法获取令牌,该办法调用内置同步器Sync.acquireSharedInterruptibly()
办法,该同步器继承AQS
理论调用的是AQS
的acquireSharedInterruptibly()
办法。 acquireSharedInterruptibly()
办法首先判断以后线程是否被中断,如果中断了就抛InterruptedException
异样,如果没有被中单,就调用tryAcquireShared()
办法尝试获取锁。tryAcquireShared()
办法AQS
只是定了一个模版办法由子类实现,Semaphore.Sync
同步器提供了两种实现,别离是FairSync
(偏心锁)和NonfairSync
(非偏心锁),这两种锁实现差不多,偏心锁就是多一个hasQueuedPredecessor()
办法判断是否有排在本人后面的线程,如果有则返回 -1。tryAcquireShared()
办法通过自旋或者锁,先获取以后可用锁,减去须要获取的锁获取到残余锁,如果残余锁小于 0 间接返回,示意获取失败,否则通过 CAS 去获取锁,胜利获取返回胜利,失败获取返回失败。tryAcquireShared()
如果获取锁失败,就要调用doAcquireSharedInterruptibly()
办法用于为以后线程创立节点退出期待队列,该办法首先创立共享模式的节点退出队列,而后自旋,判断以后节点是不是头节点,如果是头节点也尝试获取锁,获取胜利的话设置头节点并胜利返回,如果获取失败则会调用shouldParkAfterFailedAcquire()
判断以后线程是否须要期待,如果须要期待而后调用parkAndCheckInterrupt()
办法阻塞线程并判断线程是否中断,如果中断则抛错,如果没有中断就阻塞再通过自旋获取锁。如果自旋异样退出,则调用cancelAcquire()
办法勾销线程获取锁。
开释锁过程
release
Semaphore.relese()
办法用于开释锁,开释一个锁,源码如下:
public void release(){
// 开释一个共享锁
sync.releaseShared(1);
}
release()
办法调用 Semaphore.Sync
同步器的 releaseShared()
办法,该同步器继承与 AQS
理论调用的是 AQS.releaseShared()
办法。
releaseShared
releaseShared()
办法开释指定数量的共享锁,开释胜利之后会唤醒期待队列中的一个线程,源码如下:
public final boolean releaseShared(int arg) {
// 尝试开释锁
if (tryReleaseShared(arg)) {
// 开释胜利,唤醒期待队列中的线程
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared
tryReleaseShared()
办法是 AQS
定义的模版办法由子类实现,调用了Semaphore.tryReleaseShared()
,该办法通过自旋 +CAS 开释锁,源码如下:
protected final boolean tryReleaseShared(int releases) {
// 自旋
for(;;){
// 获取以后可用的锁数量
int current = getState();
// 可用的 + 开释的
int next = current + releases;
if (next < current) {throw new Error("Maximum permit count exceeded");
}
// 通过 CAS 批改状态值开释锁
if (compareAndSetState(current, next))
return true;
}
}
doReleaseShared
doReleaseShared()
办法用于开释锁胜利之后唤醒期待队列中的线程,源码如下:
private void doReleaseShared() {
// 自旋
for (;;) {
// 将队列头节点赋值与节点 h
Node h = head;
// 如果节点不为 null,且不等于尾节点
if (h != null && h != tail) {
// 失去 h 节点的状态
int ws = h.waitStatus;
// 如果节点状态是 Node.SIGNAL,就要唤醒节点 h 下一节点
if (ws == Node.SIGNAL) {
// 设置节点 h 的状态为勾销状态,如果失败就循环再试一次
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒节点 h 下一节点线程
unparkSuccessor(h);
}
// 如果节点 h 状态为 0,就设置 ws 的状态为 PROPAGATE,阐明下次循环的时候节点 h 应该无条件被流传
// 在 shouldParkAfterFailedAcquire 办法中应用
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// 如果队列中头节点发生变化就持续循环
// 如果没有发生变化就跳出循环
if (h == head)
break;
}
}
开释锁过程总结
- 首先调用
Semaphore.release()
办法开释锁,该办法调用同步器Sync
的releasShared()
办法,因为同步器继承与AQS
所以理论调用的是AQS.releaseShared()
办法。 AQS.releaseShared()
办法首先调用tryReleaseShared()
办法尝试开释锁,该办法是AQS
定义的模版办法,通过子类实现,调用的是Semaphore.tryReleaseShared()
办法。Semaphore.tryReleaseShared()
办法通过自旋 +CAS 开释锁,先获取以后的锁数量加上开释的锁数量,会判断超过判断,而后通过 CAS 批改锁的数量达到开释锁的目标。tryReleaseShared()
开释锁胜利之后,会调用AQS.doReleaseShared()
唤醒期待队列中的线程。AQS.doReleaseShared()
用于开释锁胜利之后唤醒期待队列中的线程,也是通过自旋 +CAS 实现,首先获取头节点 h,先判断节点 h 不为 null 且不是尾节点,失去节点 h 的状态,如果状态是Node.SIGNAL
阐明这个节点曾经要被唤醒应该唤醒下一节点,通过 CAS 操作设置节点 h 状态为勾销,而后调用unparkSuccessor()
办法唤醒下一节点。如果节点 h 的状态为 0,就设置状态为PROPAGATE
阐明下一次应该被无条件流传。如果队列中头节点发生变化就持续循环,没有发生变化就终止循环。
三、应用 Semaphor 实现停车场提示牌性能
每个停车场入口都有一个提示牌,下面显示着停车场残余的车位是多少,当残余车位为 0 时,则不容许车辆进入停车场,直到停车场有车来到停车场,这是提示牌显示新的残余车位数。
业务场景
- 停车场包容量为 10。
- 当一辆车进入停车场后,显示牌的残余车位数响应的减 1.
- 每有一辆车驶出停车场后,显示牌的残余车位数响应的加 1。
- 停车场残余车位有余时,车辆只能在里面期待。
代码实现
public class SemaphoreDemo {
/**
* 停车场同时包容 10 辆车
*/
private static Semaphore semaphore = new Semaphore(10);
public static void main(String[] args) {
// 模仿 15 辆车同时停车
for (int i = 0; i < 15; i++) {Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {System.out.println("=====" + Thread.currentThread().getName() + "车辆来到停车场");
if (semaphore.availablePermits() == 0) {
// 没有车位了
System.out.println("车位有余,请急躁期待," + Thread.currentThread().getName() + "车辆正在期待");
}
// 获取车位停车
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"胜利进入停车场");
// 模仿车辆在停车场停留的工夫
Thread.sleep(1000);
// 驶出停车场
semaphore.release();
System.out.println(Thread.currentThread().getName()+"驶出停车场");
} catch (InterruptedException e) {e.printStackTrace();
}
}
});
thread.start();}
}
}
下面代码输入如下:
=====Thread- 0 车辆来到停车场
=====Thread- 3 车辆来到停车场
Thread- 3 胜利进入停车场
=====Thread- 1 车辆来到停车场
=====Thread- 2 车辆来到停车场
Thread- 1 胜利进入停车场
=====Thread- 5 车辆来到停车场
=====Thread- 4 车辆来到停车场
Thread- 0 胜利进入停车场
=====Thread- 7 车辆来到停车场
Thread- 4 胜利进入停车场
Thread- 7 胜利进入停车场
Thread- 5 胜利进入停车场
=====Thread- 6 车辆来到停车场
Thread- 2 胜利进入停车场
Thread- 6 胜利进入停车场
=====Thread- 8 车辆来到停车场
Thread- 8 胜利进入停车场
=====Thread-10 车辆来到停车场
Thread-10 胜利进入停车场
=====Thread- 9 车辆来到停车场
=====Thread-14 车辆来到停车场
车位有余,请急躁期待,Thread-14 车辆正在期待
=====Thread-12 车辆来到停车场
车位有余,请急躁期待,Thread-12 车辆正在期待
=====Thread-11 车辆来到停车场
车位有余,请急躁期待,Thread-11 车辆正在期待
=====Thread-13 车辆来到停车场
车位有余,请急躁期待,Thread- 9 车辆正在期待
车位有余,请急躁期待,Thread-13 车辆正在期待
Thread-14 胜利进入停车场
Thread-10 驶出停车场
Thread- 2 驶出停车场
Thread- 6 驶出停车场
Thread-11 胜利进入停车场
Thread- 7 驶出停车场
Thread- 3 驶出停车场
Thread- 9 胜利进入停车场
Thread-13 胜利进入停车场
Thread- 1 驶出停车场
Thread- 0 驶出停车场
Thread- 4 驶出停车场
Thread- 5 驶出停车场
Thread- 8 驶出停车场
Thread-12 胜利进入停车场
Thread-13 驶出停车场
Thread- 9 驶出停车场
Thread-12 驶出停车场
Thread-14 驶出停车场
Thread-11 驶出停车场
从下面输入能够看出,当 10 个车位被停满了之后,再进来的 5 辆车进入期待状态直到有车驶出停车场,而后再停车,达到了咱们预期的成果。