文章会首发公众号,搜寻关注:Different Java
摘要
- 如何保障线程平安
- JDK 并发包中的同步控制
- JDK 并发包中锁的特点
- 为什么要讲 AQS
- AQS 的外围数据结构
- AQS 排他锁如何申请
- AQS 排他锁如何开释
1. 如何保障线程平安
Java 多线程在对共享资源进行拜访时,如果不加以控制会存在线程平安问题,当咱们应用多线程对共享资源拜访时,通常会线程共享资源的进行拜访线程数的管制:
- 共享锁:咱们会对共享资源分派许可,只有拿到许可的线程才能够拜访到共享资源,否则就须要期待有许可可用 (当然这里也能够放弃期待)。拿到许可的线程在来到共享区间(不再访问共享资源)时必须要偿还许可,这样处于期待的线程才有可能拿到许可执行。(信号量采纳的就是这种机制)
- 排他锁:只有一个线程能够拜访到共享资源,其余线程在未拿到排他锁时只能在共享区间外期待。当持有锁的线程开释锁当前,其余期待的线程才有可能获取到锁执行
2. JDK 并发包中的同步控制
Java 提供了大量的工具让咱们对共享资源进行多线程访问控制,其中很多人最相熟的是 synchronized 关键字,除了该关键字,JDK 的并发包中也提供了大量的类来进行同步控制,次要有:
- ReentrantLock
- Semaphore
- ReadWriteLock
- CountDownLatch
- CyclicBarrier
- LockSupport
- Condition
3. JDK 包中的锁的特点
3.1 ReentrantLock
ReentrantLock 的作用和 synchronized 的作用基本一致,然而 ReentrantLock 应该须要手动 lock(如果锁已被占用则须要期待)和 unlock,并且反对线程中断,反对偏心锁 (依照申请锁的程序取得锁),尝试取得锁的时候能够指定最大等待时间 (如果不指定工夫则不进行期待,拿到锁返回 true,否则返回 false)。
ReentrantLock 搭配 Condition 的 await() 以及 signal() 相当于 synchronized 搭配 Object.wait() 以及 notify()。
3.2 Semaphore
Semaphore(信号量) 容许多个线程同时拜访一个共享资源,在结构信号量时必须指定容许的最大线程数,在应用信号量时,咱们会尝试获取一个许可,如果获取失败,则须要期待,直到有线程开释许可或者以后线程被中断。
3.3 ReadWriteLock
ReadWriteLock(读写锁) 是读写拆散的锁,读与读之间不须要阻塞互斥,但读写、写写之间须要阻塞互斥。ReadWriteLock 中能够获取一个读锁和一个写锁,通过读写锁能够进步读多写少的程序的并发运行效率。
3.4 CountDownLatch
CountDownLatch 是一个倒计数器,在结构 CountDownLatch 时咱们须要传一个数字,通过调用 CountDownLatch 的 await 办法,咱们能够阻塞以后线程,直至倒计数器减到 0,通过 CountDownLatch 的 countdown() 办法,咱们能够将倒计数器的值减 1。
CountDownLatch 的作用就好比乘坐飞机,咱们必须等所有的旅客检票实现当前,而后飞机能力腾飞。
3.4 CyclicBarrier
CyclicBarrier(循环栅栏),循环栅栏的作用相似 CountDownLatch,然而他会在将计数器减到 0 当前从新复原原始计数器的值,循环的概念也就是这么来的,而且循环栅栏反对在计数器减到 0 当前触发一个动作 (Runnable 接口的实现类)。
3.5 LockSupport
LockSupport 是一个线程阻塞工具,机制相似于信号量,通过 park() 能够生产一个许可,如果以后没有许可,线程会被阻塞,unpark() 开释一个许可。LockSupport 中的许可最多只有一个。
LockSupport 不会造成死锁,假如咱们的线程限先执行了 unpark,而后再调用 park 也不会阻塞 (因为 unpark 曾经开释了一个许可可被取得应用)。
Thread.resume() 假如先于 Thread.suspend() 办法先执行,则会导致线程死锁。
4. 为什么要讲 AQS
通过上图咱们能够看出,前文咱们所提到的锁在其外部都有一个 Sync 类 (采纳了组合的形式),而这个 Sync 类都继承自 AbstractQueuedSynchronizer 类。
4.1 AQS 外围数据结构
在 AQS 中与两个外部类:
- ConditionObject:保留着条件变量期待队列(由 Condition.await() 引起的期待),最终实现也是 Node
- Node:同步期待队列的具体实现类 (保留在期待在这个锁上的线程)
4.2 Node
Node 的数据结构如下:
- volatile Node prev:期待链表中的上一个元素
- volatile Node next:期待链表中的下一个元素
- volatile Thread thread:以后线程对象
- Node nextWaiter:下一个期待在条件变量队列中的节点
除了上述 4 个属性外,还有一个重要的属性就是:
- volatile int waitStatus
该属性示意的是节点在队列中的状态,括号中的为状态的 int 值:
- 初始状态(0)
- CANCELLED(1):线程勾销了期待,如果在获得锁的过程中产生了一些异样,则可能呈现勾销的状况,比方期待过程中呈现了中断异样或者 timeout
- SIGNAL(-1):示意节点须要被唤醒
- CONDITION(-2):示意线程期待在条件变量中
- PROPAGATE(-3)
4.3 ConditionObject
因为咱们这篇文章咱们只专一剖析一般的锁,不波及条件变量期待,所以读者敌人们可自行浏览源码。
5. 排他锁申请
上面这张图是 ReentrantLock 应用排他锁的流程图:
上面咱们剖析一下排他锁的应用过程,咱们能够以 ReentrantLock 为切入点,剖析一下 AQS 的实现:
public void lock() {sync.lock();
}
咱们在调用 ReentrantLock 的 lock 办法时,将会调用 Sync 的 lock 办法 (),这里的 Sync 是一个抽象类,咱们这里次要看非偏心锁的实现形式,也就是 NonfairSync 的 lock 办法,如下:
final void lock() {
// 1
if (compareAndSetState(0, 1))
// 2
setExclusiveOwnerThread(Thread.currentThread());
else
// 3
acquire(1);
}
- 通过 CAS 尝试性取得锁(扭转 AQS 中的 state 属性值为 1),如果尝试取得锁胜利,执行第 2 步骤,否则执行第三步骤
- CAS 获取锁胜利当前,设置持有锁的线程为以后线程
- 如果没有获取到锁,则调用 acquire 尝试性取得许可,因为这里是排他锁,有且只有一个线程能够持有该锁
5.1 acquire
acquire 办法是 AQS 中的一个办法,如下:
public final void acquire(int arg) {if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();}
首先调用 tryAcquire 再次尝试获取锁,该办法在 AQS 中的实现是抛出 UnsupportedOperationException 异样,也就是说子类必须要实现这个办法,上面咱们看一下 tryAcquire 在 ReentrantLock$NonfairSync 办法中的实现,该办法最终会调用 ReentrantLock$Sync 中的 nonfairTryAcquire(int acquires) 办法如下:
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
这里的次要逻辑就是先获取一下锁的状态,如果锁的状态是未占有 (state 值为 0),则再次应用 CAS 尝试去占有锁,如果尝试胜利,则返回 true。
通过 CAS 尝试获取锁失败当前,判断锁是否被以后线程持有,如果是,调用 setState(nextc) 将 AQS 中的 state 的值 +1,而后返回 true(这里解决了重入的问题)。
上述两种状况如果都不成立,则返回 false,示意以后线程获取锁失败。
5.2 acquireQueued 办法
public final void acquire(int arg) {if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();}
当咱们尝试获取锁失败当前,会调用 acquireQueued 该线程增加至期待队列,那么这个期待队列中的 Node 是怎么构建进去的呢?答案就是 addWaiter 办法。
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
addWaiter 办法中首先构建了一个 Node 节点,Node 节点中的线程为以后线程,而后尝试将该节点增加到 AQS 的期待队列中,如果原来期待队列的尾节点不为 NULL 的会采纳 CAS 疾速失败的办法进行增加,如果胜利增加,则返回这个新节点,否则调用 enq 增加新节点到期待队列中,办法如下:
private Node enq(final Node node) {for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 如果队列中还没有节点,CAS 初始化一个空的头结点,并把尾节点设置为头结点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// CAS 设置以后节点 node 为尾节点,并把原来的尾节点的 next 指向以后节点 node
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
如果队列中临时还没有节点的话,就初始化一个空的头节点,并将尾节点设置为头结点,而后将以后节点 node 的前一个节点改成 tail 节点,并通过 CAS 直至胜利将 node 改为 tail 节点,并且原先的 tail 节点的 next 批改为以后节点 note,而后返回以后节点 node 的前置节点(也就是原来的 tail 节点)。
当节点增加胜利当前,咱们会被动调用 acquireQueued 尝试获取锁,办法如下:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 开启自旋
for (;;) {
// 获取以后节点 node 的前继节点
final Node p = node.predecessor();
// 如果前继节点是头结点(虚节点),那么意味着以后节点是第一个数据节点,那么久尝试获取锁,如果获取锁胜利,将以后节点设置为头结点,setHead 办法会将节点中的线程和 prev 节点都置为空
if (p == head && tryAcquire(arg)) {setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果 node 的前继节点不是头结点或者获取锁失败,那么须要判断以后节点是否须要挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 获取如果产生异样,则勾销获取锁申请
if (failed)
cancelAcquire(node);
}
}
如果判断以后节点线程是否须要挂起,次要依附 shouldParkAfterFailedAcquire 来判断,如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
通过上述源码,咱们能够看到,只有以后节点的前继节点的状态是 SINGAL(待唤醒状态)时,以后节点线程能够被挂起( 这里次要是为了避免有限循环资源节约 )。上述代码还解决前置节点被勾销了非凡状况,将以后节点的前置节点批改为队列中最初一个 waitSatus 不是勾销状态的节点。
将以后节点的线程挂起办法如下,实现采纳的 LockSupport。
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);
return Thread.interrupted();}
勾销获取所申请的办法是 cancelAcquire,如下:
private void cancelAcquire(Node node) {
// 过滤有效节点
if (node == null)
return;
node.thread = null;
// 跳过所有处于勾销的节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 找到最初一个无效节点的后继节点
Node predNext = pred.next;
// 将以后节点的状态设置为勾销
node.waitStatus = Node.CANCELLED;
// 如果以后节点是尾节点,设置尾节点为最初一个无效节点
if (node == tail && compareAndSetTail(node, pred)) {
// 将最初一个无效节点的 next 设置为 null
compareAndSetNext(pred, predNext, null);
} else {
// 如果最初一个无效节点不是头结点并且(最初一个无效节点是 SINGAL 状态或者能够被设置为 SGINAL 状态),并且最初一个无效节点的线程不为 null
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
// 如果以后节点后的后置节点是无效节点(存在且状态不为勾销状态),设置最初一个无效节点的 next 为以后节点的后继节点
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 上述条件如果均不成立,唤醒以后节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
6. 排他锁开释
public void unlock() {sync.release(1);
}
ReentrantLock 通过 unlock 办法来解锁,该办法会调用 AQS 中的 release 办法,如下:
public final boolean release(int arg) {if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease 是由具体的 AQS 的子类来实现,上面看一下 ReentrantLock 中的 Sync 中的实现,如下:
protected final boolean tryRelease(int releases) {int c = getState() - releases;
// 只有持有锁的线程才能够执行 unlock 操作
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 解锁胜利,设置独占线程为空
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
上面再回到 AQS 的 release 办法,如下:
public final boolean release(int arg) {if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
如果解锁胜利,并且以后节点状态不是初始状态,而后就调用 unparkSuccessor 办法唤醒 head 后继节点的线程,办法如下:
private void unparkSuccessor(Node node) {
// 获取以后节点状态
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取节点的后继节点
Node s = node.next;
// 如果节点的后继节点为 null 或者曾经被勾销,则从尾节点找到一个未勾销的节点进行唤醒
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒线程
LockSupport.unpark(s.thread);
}
解锁的过程还是绝对比较简单,当一个线程在开释 ReentrantLock 的时候,须要从期待队列中唤醒其余须要唤醒的节点。
本期的 Java AQS 介绍到这,我是 shysh95,咱们下期再见!