形象同步队列 AQS 解析
AQS——锁的底层反对
AbstractQueuedSynchronizer 形象同步队列简称 AQS,它是实现同步器的根底组件,并发包中的锁底层都是应用 AQS 来实现的,上面看下 AQS 的类图构造。
该图可知,AQS 是一个 FIFO
的双向队列,其外部通过节点 head 和 tail 记录队首和队尾的元素,队列元素类型为Node
。
其中 Node 里的 thread 变量用来寄存进入 AQS 队列里的线程,而 SHARED 用来标记线程是获取共享资源时被阻塞挂起放入 AQS 队列的;EXCLUSIVE 用来标记该线程是获取独占资源时被挂起放入 AQS 队列中;waitStatus 记录以后线程期待状态,能够为CANCELLED(线程勾销)
、SIGNAL(线程须要被唤醒)
、CONDITION(线程在条件队列中期待)
、PROPAGATE(开释共享资源时告诉其余节点)
;prev 记录以后节点的前驱节点,next 则是后驱节点。
在 AQS 中维持了一个繁多的状态信息state
,能够通过 getState、setState、compareAndSetState 函数批改值。
- 对于 ReentrantLock 的实现,state 能够示意以后线程获取锁的次数;
- 对于读写锁 ReentrantReadWriteLock,state 的高 16 位示意读状态,也就是获取该锁的次数,低 16 位示意获取到写锁线程可重入的次数;
- 对于 Semaphore 来说,state 示意以后可用信号的个数;
- 对于 CountDownlatch 来说,state 用来示意计数器以后的值;
AQS 有个外部类 ConditionObject,它用来联合锁实现线程同步。ConditionObject 能够间接拜访 AQS 对象外部的变量,比方 state 状态值和队列。
ConditionObject 是 条件变量 ,每个条件变量对应一个条件队列(单向链表队列), 用来寄存调用条件变量的 await 办法后被阻塞的线程,而 firstWaiter 示意队首元素,lastWaiter 示意队尾元素。
这里咱们先说一下 waitStatus 所示意的几个状态。
- CANCELLED(值为:1):示意以后节点已勾销调度。当 timeout 或被中断(响应中断的状况下),会触发变更为此状态,进入该状态后的节点将不会再变动。
- SIGNAL(值为:-1):示意后继节点在期待以后节点唤醒。后继节点入队时,会将前继节点的状态更新为 SIGNAL。
- CONDITION(值为:-2):示意节点期待在 Condition 上,当其余线程调用了 Condition 的 signal()办法后,CONDITION 状态的节点将 从条件队列转移到同步队列中,期待获取同步锁。
- PROPAGATE(值为:-3):共享模式下,前继节点不仅会唤醒其后继节点,同时也可能会唤醒后继的后继节点。
- 值为:0:新节点入队时的默认状态。
对于 AQS 来说,线程同步的要害就是对状态值 state 进行操作,依据 state 是否属于一个线程,操作 state 的形式分为独占和共享。
独占形式下获取资源通过:void acquire(int arg)
、void acquireInterruptibly(int arg)
。
独占形式下开释资源通过:boolean release(int arg)
。
共享形式下获取资源通过:void acquireShared(int arg)
、void acquireSharedInterruptibly(int arg)
。
共享形式下开释资源通过:boolean releaseShared(int arg)
。
在独占形式中获取资源与具体线程绑定的,也就是说如果一个线程获取到资源就会标记是这个线程获取到了,其余线程再通过操作 state 获取资源就会发现该资源不是本人持有的,随后阻塞。
比方独占锁 ReentrantLock 的实现中:当一个线程获取到了 ReentrantLock 锁,在 AQS 外部首先应用 CAS 操作将 state 值从 0 改为 1,而后设置以后锁的持有者为以后线程,当该线程再次获取锁时发现它就是锁的持有者,则会把状态值从 1 改为 2,也就是设置 可重入次数,而当另外一个线程获取锁时发现自己并不是该锁的持有者就会被放入 AQS 阻塞队列后挂起。
而共享形式的获取资源是和具体线程不相干的,当多个线程去申请资源时通过 CAS 获取资源,当一个线程获取到资源后,别的线程再去获取时如果以后资源还能够满足需要的话,则只须要通过 CAS 形式获取即可。
比方 Semaphore 信号量,当一个线程通过 acquire 办法获取信号量时,会首先看以后信号量个数是否满足需要,不满足则将以后线程放入阻塞队列,满足则通过自旋 CAS 获取信号量。
在独占形式中,获取和开释资源流程如下:
public final void acquire(int arg) {if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();}
当一个线程调用了 acquire 办法获取独占资源时,首先应用 tryAcquire 办法尝试获取资源,具体就是设置状态变量 state 的值,胜利即间接返回;失败的话则将以后线程封装为类型为 Node.EXCLUSIVE 的 Node 节点随后插入到 AQS 阻塞队列的尾部,并调用 LockSupport.park(this)挂起本人。
public final boolean release(int arg) {if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
当一个线程调用 release 办法时会尝试应用 tryRelease 操作开释资源,这里也是设置状态变量 state 的值,随后调用 LockSupport.unpark(thread) 办法激活 AQS 队列中被阻塞的一个线程。被激活的线程则应用 tryAcquire 尝试,看以后变量 state 值是否还能满足本人的须要,满足则持续向下执行,否则还是被放入队列中挂起。
📢 须要留神:AQS 类并没有提供 tryAcquire、tryRelease 办法,须要由具体子类来实现,依据不同场景应用 CAS 算法尝试批改 state 状态值,并且 state 状态值的增减代表什么意义。
比方继承自 AQS 实现的独占锁 ReentrantLock,当 status 为 0 时示意锁闲暇,为 1 时示意锁曾经被占用。在重写 tryAcquire 时,在外部须要应用 CAS 算法查看以后 state 是否为 0,如果为 0 则应用 CAS 设置为 1,并设置以后锁的持有者为以后线程,而后返回 true,如果 CAS 失败则返回 false。
在共享形式中,获取和开释资源流程如下:
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
当线程调用 acquireShared 获取共享资源时,会首先通过 tryAcquireShared 来尝试获取资源,具体还是设置状态变量 state 的值,胜利间接返回,失败则将以后线程封装为类型 Node.SHARED 的 Node 节点后插入到 AQS 阻塞队列尾部,并应用 LockSupport.park(this)挂起本人。
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();
return true;
}
return false;
}
当线程调用 releaseShared 时还是通过尝试 tryReleaseShared 办法来开释资源,也是设置状态变量 state 的值,随后应用 LockSupport.unpark(thread)来激活 AQS 阻塞队列中被阻塞的一个线程,被激活的线程应用 tryReleaseShared 办法查看以后 state 是否还满足本人须要,满足则激活线程持续向下执行,否则还是被放入 AQS 队列中并被挂起。
📢 同样须要留神,AQS 类并没有提供可用的 tryAcquireShared、tryReleaseShared 办法,须要子类去实现。
比方继承自 AQS 实现的读写锁 ReentrantReadWriteLock 外面的读锁在重写 tryAcquireShared 时,首先查看写锁是否被其余线程持有,如果是则间接返回 false,否则应用 CAS 递增 state 的高 16 位(在 ReentrantReadWriteLock 中,state 的高 16 位为获取读锁的次数)。
⚠️ 基于 AQS 实现的锁除了须要重写上述介绍的办法外,还须要重写 isHeldExclusively 办法,来判断锁是被以后线程独占还是被共享。
另外咱们发现 acquireInterruptibly(int arg)
、acquireSharedInterruptibly(int arg)
都带有 Interruptibly 关键字。那么带和不带这个关键字有什么区别?
其实不带 Interruptibly 关键字办法示意不对中断进行响应,也就是线程在调用不带 Interruptibly 的办法获取资源或者获取失败被挂起时,其余线程中断该线程,那么该线程不会因为被中断而抛出异样,持续获取资源或被挂起,也就是不对终端进行响应,疏忽中断。
而带 Interruptibly 关键字则是会抛出 InterruptedException 异样并返回。
上面咱们看一下 AQS 如何保护队列,次要查看入队操作。
当一个线程获取锁失败后该线程会被转换为 Node 节点,而后应用 enq(final Node node)办法将该节点插入到 AQS 阻塞队列。
private Node enq(final Node node) {for (;;) {
Node t = tail;//(1)if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))//(2)tail = head;
} else {
node.prev = t;//(3)if (compareAndSetTail(t, node)) {//(4)t.next = node;
return t;
}
}
}
}
如上代码,在第一次循环的时候当 AQS 队列状态如图(默认状况)所示,头尾均指向 null;当执行到代码(1)时候,节点 t 指向了尾部节点,队列状态如图步骤(1)所示,这时 t 为 null,执行代码(2)时候应用 CAS 算法设置一个哨兵节点为头节点,如果设置胜利则让尾部节点也指向哨兵节点,这时队列状态如图步骤(2)所示。
接下来咱们还须要插入 node 节点,所以在第二次循环后又执行到代码(1),队列状态如下图步骤(3)所示;而后执行代码(3)设置 node 的前驱节点为尾部节点,队列状态如下图步骤(4)所示;随后通过 CAS 算法来设置 node 节点为尾部节点,CAS 胜利后队列状态如下图步骤(5)所示;随后将原来的尾部节点的后驱节点设置为 node 节点,就实现了双向链表。队列状态如下图步骤(6)所示。
AQS——条件变量的反对
synchronized 和条件变量一样都能够实现线程同步,它们的不同在于 synchronized 同时只能和一个共享变量 notify 或 wait 办法实现同步,而 AQS 的一个锁能够对应多个条件变量。
接下来咱们看一下例子。
public static void main(String[] args) {final ReentrantLock lock = new ReentrantLock();// (1)
final Condition condition = lock.newCondition();// (2)
lock.lock(); // (3)
try {System.out.println("begin wait...");
condition.await(); // (4)
System.out.println("end wait...");
} catch (Exception e) {lock.unlock(); // (5)
}
lock.lock(); // (6)
try {System.out.println("begin signal...");
condition.signal(); // (7)
System.out.println("end signal...");
} catch (Exception e) {lock.unlock(); // (8)
}
}
这段代码首先创立另一个独占锁 ReentrantLock 对象,也是基于 AQS 实现的。
第二步应用创立的 Lock 对象的 newCondition()办法创立了一个 ConditionObject 变量,这个变量就是 Lock 锁对应的一个条件变量。
📢 一个 Lock 对象能够创立多个条件变量。
第三步获取独占锁,随后第四步调用条件变量的 await()办法阻塞挂起以后线程。当其余线程调用了条件变量的 signal()办法时,被阻塞的线程的才会从 await 处返回,须要留神,和调用 Object 的 wait()办法一样,如果没有获取到锁就调用的话,则会抛出 IllegalMonitorStateException 异样。第五步开释获取的锁。
在下面代码中,lock.newCondition()的作用其实是 new 了一个在 AQS 外部申明的 ConditionObject 对象,ConditionObject 是 AQS 的外部类,能够拜访 AQS 外部的变量(例如状态变量 state)和办法。在每个条件变量外部都保护了一个条件队列 (单向链表队列),用来寄存调用条件变量的 await() 办法时被阻塞的线程。留神这个条件队列和 AQS 队列不是一回事。
咱们看一下 await()办法源码:
public final void await() throws InterruptedException {if (Thread.interrupted())
throw new InterruptedException();
// 创立新的 node 节点,并插入到条件队列开端(1)Node node = addConditionWaiter();
// 开释锁并返回状态位(2)int savedState = fullyRelease(node);
int interruptMode = 0;
// 调用 park 办法阻塞挂起以后线程(3)while (!isOnSyncQueue(node)) {LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//...
}
该办法中,当线程调用条件变量的 await()办法时,在外部会结构一个类型为 Node.CONDITION 的 node 节点,而后将该节点插入到条件队列开端,之后以后线程会开释获取到的锁,也就是操作 state 值,并被阻塞挂起。
public final void signal() {if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
当另外一个线程调用条件变量的 signal 办法时(必须先调用锁的 lock()办法获取锁),在外部会把条件队列外面的一个线程节点从条件队列外面移除并放入 AQS 的阻塞队列外面,而后激活这个线程。
📢 须要留神的是,AQS 只提供了 ConditionObject 的实现,并没有提供 newCondition 函数,须要子类实现。
上面看一下在 await()办法阻塞后,如何放入条件队列的。
private Node addConditionWaiter() {
// 获取尾部节点
Node t = lastWaiter;
// 如果 lastWaiter 不为空,则查看该队列是否有被 Cancel 的节点
if (t != null && t.waitStatus != Node.CONDITION) {
// 遍历条件队列节点,移除已被勾销的节点
unlinkCancelledWaiters();
t = lastWaiter;
}
// 利用以后线程构建一个代表以后线程的节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node; // 没有尾节点就插入到头节点
else
t.nextWaiter = node;// 尾节点的后驱节点等于以后节点
lastWaiter = node; // 尾节点等于以后节点
return node;
}
📢 留神:当多个线程调用 lock.lock()办法时,只有一个线程获取到锁,其余线程就会被转换到 Node 节点插入到对应的 AQS 阻塞队列,并自旋 CAS 尝试获取锁。
如果获取到锁的线程又调用了对应的条件变量的 await 办法,则该线程会开释获取到的锁,并被转换为 Node 节点插入到条件变量对应的条件队列外面。
当另外一个线程调用条件变量的 signal 或者 signalAll 办法时,会把条件队列外面的一个或者全副 Node 节点挪动到 AQS 的阻塞队列外面,期待机会获取锁。
总结:一个锁对应一个 AQS 阻塞队列,对应多个条件变量,每个条件变量都有本人的一个条件队列。
实现自定义独占锁
/**
* @author 神秘杰克
* 公众号: Java 菜鸟程序员
* @date 2022/1/20 实现自定义独占锁
* @Description
*/
public class NonReentrantLock implements Lock, Serializable {
// 自定义实现 AQS
private static class Sync extends AbstractQueuedSynchronizer {
// 是否持有锁
@Override
protected boolean isHeldExclusively() {return getState() == 1;
}
// 如果 state == 0 尝试获取锁
@Override
protected boolean tryAcquire(int arg) {
assert arg == 1;
if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 尝试开释锁 设置 state == 0
@Override
protected boolean tryRelease(int arg) {
assert arg == 1;
if (getState() == 0) {throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 提供条件变量接口
Condition newCondition() {return new ConditionObject();
}
}
private final Sync sync = new Sync();
@Override
public void lock() {sync.acquire(1);
}
public boolean isLocked() {return sync.isHeldExclusively();
}
@Override
public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {sync.release(1);
}
@Override
public Condition newCondition() {return sync.newCondition();
}
}
依据自定义锁实现生产者消费者
/**
* @author 神秘杰克
* 公众号: Java 菜鸟程序员
* @date 2022/1/20
* @Description 生产者消费者模型
*/
public class LockTest {final static NonReentrantLock lock = new NonReentrantLock();
final static Condition consumerCondition = lock.newCondition();
final static Condition producerCondition = lock.newCondition();
final static Queue<String> QUEUE = new LinkedBlockingQueue<>();
final static int QUEUE_SIZE = 10;
public static void main(String[] args) {LockTest lockTest = new LockTest();
// 启消费者线程
for (int i = 0; i < 10; i++) {new Thread(() -> {for (int j = 0; j < 2; j++) {
try {TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {e.printStackTrace();
}
System.out.println("消费者生产了" + lockTest.get());
}
}, "consumer_" + i).start();}
// 启动生产者线程
for (int i = 0; i < 2; i++) {new Thread(() -> {for (int j = 0; j < 10; j++) {
try {TimeUnit.MILLISECONDS.sleep(new Random().nextInt(200));
} catch (InterruptedException e) {e.printStackTrace();
}
lockTest.put("物品 -" + new Random().nextInt(1000));
}
}, "产品 -" + i).start();}
}
private void put(String name) {
// 获取独占锁
lock.lock();
try {
// 如果队列满了, 则期待
while (QUEUE.size() == QUEUE_SIZE) {producerCondition.await();
}
QUEUE.add(name);
System.out.println(Thread.currentThread().getName() + "生产了" + name);
// 唤醒生产线程
consumerCondition.signalAll();} catch (Exception e) {e.printStackTrace();
} finally {lock.unlock();
}
}
private String get() {
String ret = "";
// 获取独占锁
lock.lock();
try {
// 如果队列空了, 则期待
while (QUEUE.size() == 0) {consumerCondition.await();
}
// 生产一个元素
ret = QUEUE.poll();
// 唤醒生产线程
producerCondition.signalAll();} catch (Exception e) {e.printStackTrace();
} finally {lock.unlock();
}
return ret;
}
}