AQS
是AbstractQueuedSynchronizer
的简称。
AbstractQueuedSynchronizer 同步状态
AbstractQueuedSynchronizer
外部有一个state
属性,用于批示同步的状态:
private volatile int state;
state
的字段是个int
型的,它的值在AbstractQueuedSynchronizer
中是没有具体的定义的,只有子类继承AbstractQueuedSynchronizer
那么state
才有意义,如在ReentrantLock
中,state=0
示意资源未被锁住,而state>=1
的时候,示意此资源曾经被另外一个线程锁住。
AbstractQueuedSynchronizer
中尽管没有具体获取、批改state
的值,然而它为子类提供一些操作state
的模板办法:
获取状态
protected final int getState() { return state; }
更新状态
protected final void setState(int newState) { state = newState; }
CAS更新状态
protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
AQS 期待队列
AQS 期待列队是一个双向队列,队列中的成员都有一个prev
和next
成员,别离指向它后面的节点和前面的节点。
队列节点
在AbstractQueuedSynchronizer
外部,期待队列节点由外部动态类Node
示意:
static final class Node { ...}
节点模式
队列中的节点有两种模式:
- 独占节点:同一时刻只能有一个线程拜访资源,如
ReentrantLock
- 共享节点:同一时刻容许多个线程拜访资源,如
Semaphore
节点的状态
期待队列中的节点有五种状态:
- CANCELLED:此节点对应的线程,曾经被勾销
- SIGNAL:此节点的下一个节点须要一个唤醒信号
- CONDITION:以后节点正在条件期待
- PROPAGATE:共享模式下会流传唤醒信号,就是说当一个线程应用共享模式拜访资源时,如果胜利拜访到资源,就会持续唤醒期待队列中的线程。
自定义同步锁
为了便于了解,应用AQS本人实现一个简略的同步锁,感受一下应用AQS实现同步锁是如许的轻松。
上面的代码自定了一个CustomLock
类,继承了AbstractQueuedSynchronizer
,并且还实现了Lock
接口。CustomLock
类是一个简略的可重入锁,类中只须要重写AbstractQueuedSynchronizer
中的tryAcquire
与tryRelease
办法,而后在批改大量的调用就能够实现一个最根本的同步锁。
public class CustomLock extends AbstractQueuedSynchronizer implements Lock { @Override protected boolean tryAcquire(int arg) { int state = getState(); if(state == 0){ if( compareAndSetState(state, arg)){ setExclusiveOwnerThread(Thread.currentThread()); System.out.println("Thread: " + Thread.currentThread().getName() + "拿到了锁"); return true; } }else if(getExclusiveOwnerThread() == Thread.currentThread()){ int nextState = state + arg; setState(nextState); System.out.println("Thread: " + Thread.currentThread().getName() + "重入"); return true; } return false; } @Override protected boolean tryRelease(int arg) { int state = getState() - arg; if(getExclusiveOwnerThread() != Thread.currentThread()){ throw new IllegalMonitorStateException(); } boolean free = false; if(state == 0){ free = true; setExclusiveOwnerThread(null); System.out.println("Thread: " + Thread.currentThread().getName() + "开释了锁"); } setState(state); return free; } @Override public void lock() { acquire(1); } @Override public void unlock() { release(1); } ...}
CustomLock
是实现了Lock
接口,所以要重写lock
和unlock
办法,不过办法的代码很少只须要调用AQS中的acquire
和release
。
而后为了演示AQS的性能写了一个小演示程序,启动两根线程,别离命名为线程A
和线程B
,而后同时启动,调用runInLock
办法,模仿两条线程同时拜访资源的场景:
public class CustomLockSample { public static void main(String[] args) throws InterruptedException { Lock lock = new CustomLock(); new Thread(()->runInLock(lock), "线程A").start(); new Thread(()->runInLock(lock), "线程B").start(); } private static void runInLock(Lock lock){ try { lock.lock(); System.out.println("Hello: " + Thread.currentThread().getName()); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } }}
拜访资源(acquire)
在CustomLock的lock办法中,调用了 acquire(1)
,acquire
的代码如下 :
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
- CustomLock.tryAcquire(...):
CustomLock.tryAcquire
判断以后线程是否可能拜访同步资源 - addWaiter(...):将以后线程增加到期待队列的队尾,以后节点为独占模型(Node.EXCLUSIVE)
- acquireQueued(...):如果以后线程可能拜访资源,那么就会放行,如果不能那以后线程就须要阻塞。
- selfInterrupt:设置线程的中断标记
留神: 在acquire办法中,如果tryAcquire(arg)返回true, 就间接执行完了,线程被放行了。所以的前面的办法调用acquireQueued、addWaiter都是tryAcquire(arg)返回false时才会被调用。
tryAcquire 的作用
tryAcquire
在AQS类中是一个间接抛出异样的实现:
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}
而在咱们自定义的 CustomLock 中,重写了此办法:
@Override protected boolean tryAcquire(int arg) { int state = getState(); if(state == 0){ if( compareAndSetState(state, arg)){ setExclusiveOwnerThread(Thread.currentThread()); System.out.println("Thread: " + Thread.currentThread().getName() + "拿到了锁"); return true; } }else if(getExclusiveOwnerThread() == Thread.currentThread()){ int nextState = state + arg; setState(nextState); System.out.println("Thread: " + Thread.currentThread().getName() + "重入"); return true; } return false; }
tryAcquire
办法返回一个布而值,true
示意以后线程可能拜访资源,false
以后线程不能拜访资源,所以tryAcquire
的作用:决定线程是否可能拜访受爱护的资源。tryAcquire
外面的逻辑在子类能够自由发挥,AQS不关怀这些,只须要晓得能不能拜访受爱护的资源,而后来决定线程是放行还是进行期待队列(阻塞)。
因为是在多线程环境下执行,所以不同的线程执行tryAcquire
时会返回不同的值,假如线程A比线程B要快一步,先达到compareAndSetState
设置state的值成员并胜利,那线程A就会返回true,而 B 因为state的值不为0或者compareAndSetState
执行失败,而返回false。
线程B 抢占锁流程
下面拜访到线程A胜利取得了锁,那线程B就会抢占失败,接着执行前面的办法。
线程的入队
线程的入队是逻辑是在addWaiter
办法中,addWaiter办法的具体逻辑也不须要说太多,如果你晓得链表
的话,就非常容易了解了,最终的后果就是将新线程增加到队尾。AQS的中有两个属性head
、tail
别离指定期待队列的队首和队尾。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
须要留神的是在enq
办法中,初始化队列的时候,会新建一个Node
做为head
和tail
,而后在之后的循环中将参数node
增加到队尾,队列初始化完后,外面会有两个节点,一个是空的结点new Node()
另外一个就是对该当火线程的结点。
因为线程A在tryAcquire
时返回了true
,所以它会被间接放行,那么只有B线程会进入addWaiter
办法,此时的期待队列如下:
留神: 期待队列内的节点都是正在期待资源的线程,如果一个线程间接可能拜访资源,那它压根就不须要进入期待队列,会被放行。
线程B 的阻塞
线程B被增加到期待队列的尾部后,会继续执行acquireQueued
办法,这个办法就是AQS阻塞线程的中央,acquireQueued
办法代码的一些解释:
- 里面是一个
for (;;)
有限循环,这个很重要 - 会从新调用一次
tryAcquire(arg)
判断线程是否可能拜访资源了 node.predecessor()
获取参数node
的前一个节点shouldParkAfterFailedAcquire
判断以后线程获取锁失败后,需不需要阻塞parkAndCheckInterrupt()
应用LockSupport
阻塞以后线程,
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
shouldParkAfterFailedAcquire 判断是否要阻塞
shouldParkAfterFailedAcquire
接管两个参数:前一个节点、以后节点,它会判断前一个节点的waitStatus
属性,如果前一个节点的waitStatus=Node.SIGNAL
就会返回true:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
acquireQueued
办法在循环中会屡次调用shouldParkAfterFailedAcquire
,在期待队列中节点的waitStatus
的属性默认为0,所以第一次执行shouldParkAfterFailedAcquire
会执行:
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
更新完pred.waitStatus
后,节点的状态如下:
而后shouldParkAfterFailedAcquire
返回false,回到acquireQueued
的循环体中,又去抢锁还是失败了,又会执行shouldParkAfterFailedAcquire
,第二次循环时此时的pred.waitStatus
等于Node.SIGNAL
那么就会返回true。
parkAndCheckInterrupt 阻塞线程
这个办法就比拟直观了, 就是将线程的阻塞住:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
为什么是一个for (;;)
有限循环呢
先看一个for (;;)
的退出条件,只有node
的前一个节点是head
并且tryAcquire返回true时才会退出循环,否则的话线程就会被parkAndCheckInterrupt
阻塞。
线程被parkAndCheckInterrupt
阻塞后就不会向上面执行了,然而等到它被唤醒后,它还在for (;;)
体中,而后又会持续先去抢占锁,而后如果还是失败,那又会处于期待状态,所以始终循环上来,就只有两个后果:
- 抢到锁退出循环
- 抢占锁失败,期待下一次唤醒再次抢占锁
线程 A 开释锁
线程A的业务代码执行实现后,会调用CustomLock.unlock
办法,开释锁。unlock办法外部调用的release(1)
:
public void unlock() { release(1); }
release
是AQS类的办法,它跟acquire
相同是开释的意思:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
办法体中的tryRelease
是不是有点眼生,没错,它也是在实现CustomLock
类时重写的办法,首先在tryRelease
中会判断以后线程是不是曾经取得了锁,如果没有就间接抛出异样,否则的话计算state的值,如果state为0的话就能够开释锁了。
protected boolean tryRelease(int arg) { int state = getState() - arg; if(getExclusiveOwnerThread() != Thread.currentThread()){ throw new IllegalMonitorStateException(); } boolean free = false; if(state == 0){ free = true; setExclusiveOwnerThread(null); System.out.println("Thread: " + Thread.currentThread().getName() + "开释了锁"); } setState(state); return free; }
release
办法只做了两件事:
- 调用
tryRelease
判断以后线程开释锁是否胜利 - 如果以后线程锁开释锁胜利,唤醒其余线程(也就是正在期待中的B线程)
tryRelease
返回true后,会执行if外面的代码块:
if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; }
先回顾一下当初的期待队列的样子:
依据下面的图,来走下流程:
- 首先拿到
head
属性的对象,也就是队列的第一个对象 - 判断
head
不等于空,并且waitStatus!=0,很显著当初的waitStatus是等于Node. SIGNAL
的,它的值是-1
所以if (h != null && h.waitStatus != 0)
这个if必定是满足条件的,接着执行unparkSuccessor(h)
:
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; ... if (s != null) LockSupport.unpark(s.thread); }
unparkSuccessor
首先将node.waitStatus
设置为0,而后获取node的下一个节点,最初调用LockSupport.unpark(s.thread)
唤醒线程,至此咱们的B线程就被唤醒了。
此时的队列又回到了,线程B刚刚入队的样子:
线程B 唤醒之后
线程A开释锁后,会唤醒线程B,回到线程B的阻塞点,acquireQueued
的for循环中:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
线程唤醒后的第一件事就是,拿到它的上一个节点(以后是head结点),而后应用if判断
if (p == head && tryAcquire(arg))
依据当初期待队列中的节点状态,p == head
是返回true的,而后就是tryAcquire(arg)
了,因为线程A曾经开释了锁,那当初的线程B天然就能获取到锁了,所以tryAcquire(arg)也会返回true。
设置队列头
线路B拿到锁后,会调用setHead(node)
本人设置为队列的头:
private void setHead(Node node) { head = node; node.thread = null; node.prev = null;}
调用setHead(node)
后队列会产生些变动 :
移除上一个节点
setHead(node)
执行完后,接着按上一个节点齐全移除:
p.next = null;
此时的队列:
线程B 开释锁
线程B 开释锁的流程与线程A基本一致,只是以后队列中曾经没有须要唤醒的线程,所以不须要执行代码去唤醒其余线程:
if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; }
h != null && h.waitStatus != 0
这里的h.waitStatus
曾经是0了,不满足条件,不会去唤醒其余线程。
总结
文中通过自定义一个CustomLock
类,而后通过查看AQS源码来学习AQS的局部原理。通过残缺的走完锁的获取、开释两个流程,加深对AQS的了解,心愿对大家有所帮忙。
欢送关注我的公众号:架构文摘,取得独家整顿120G的收费学习资源助力你的架构师学习之路!公众号后盾回复
arch028
获取材料: