一、ReentrantLock与AQS简介
在Java5.0之前,在协调对共享对象的拜访时能够应用的机制只有synchronized和volatile。Java5.0减少了一种新的机制:ReentrantLock。ReentrantLock并不是一种代替内置加锁的办法,而是作为一种可抉择的高级性能。ReentrantLock实现了Lock接口,提供了一种无条件的、可轮询的、定时的以及可中断的锁获取操作,所有加锁和解锁办法都是显式的。
咱们根本不会间接应用AQS,AQS是一个构建锁和同步器的框架,许多同步器都能够通过AQS很容易高效的结构进去,根本可能满足绝大多数状况的需要。不仅ReentrantLock,Semaphore、CountDownLatch、ReentrantReadWriteLock、FutureTask也是基于AQS构建的。AQS解决了实现同步器的大量细节,期待线程采纳FIFO队列操作程序;还负责管理同步器类中的状态 ,能够通过getState,setState以及compareAndSetState办法来操作。
二、ReentrantLock应用示例
public class ReentrantLockTest1 { public static ReentrantLock lock = new ReentrantLock(); int count = 0; public void run (){ lock.lock();//加锁 try { for(int i = 0; i < 1000; i++){ count++; } } finally { lock.unlock();//开释锁 } }}
以上代码通过lock来实现 count++的原子操作,lock.lock()用来获取锁,lock.unlock()用来开释锁。那么多线程下,如何保障同步操作?如何开释锁?如何判断没有竞争到锁的线程处于期待状态?什么时候唤醒期待线程?
三、ReentrantLock同步类实现(以下为外围代码摘录)
ReentrantLock是独占锁,有偏心锁和非偏心锁的策略实现。贴上ReentrantLock的源码来看下,具体看正文吧。
package java.util.concurrent.locks;import java.util.concurrent.TimeUnit;import java.util.Collection; public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; private final Sync sync; //同步器提供所有的实现机制,属于外部类,供外部调用 /** * 自定义一个同步器类,从它派生一个偏心和非偏心版本。其中state为持有锁的次数。 */ abstract static class Sync extends AbstractQueuedSynchronizer {//这就是传说的AQS,上面会具体提到 private static final long serialVersionUID = -5179523762034025860L; abstract void lock();//供外部类实现 /** * 获取非偏心锁,供tryAcquire调用 * 如果锁没有被其余线程持有,则获取锁并立刻返回将锁同步状态state设置为1。 * 如果以后线程持有锁,则同步状态state +1且立刻返回。 * 如果锁由另一个线程持有,则以后线程成为禁用线程调度目标和处于休眠状态,直到取得锁,此时锁持有计数设置为1 */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) {//没有线程持有锁时 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current);//设置为独占资源 return true; } } else if (current == getExclusiveOwnerThread()) {//以后线程持有锁时 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc);//设置同步状态值state return true; } return false; } /** * Sync重写AQS tryRelease办法 * 如果以后线程是锁的持有者,则同步状态state -1,如果state=0,锁开释; * 如果以后线程不是锁的持有者,抛出异样IllegalMonitorStateException */ protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } //以后线程是否独占资源 protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } ...... } //-----以下办法的实现都依赖继承于同步器Sync的实现!----------- /** * 非偏心锁对象 */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; //如果锁没有被其余线程持有,则获取锁并立刻返回将锁同步状态state设置为1。 final void lock() { if (compareAndSetState(0, 1)) //state为0才设置为1,不可重入! setExclusiveOwnerThread(Thread.currentThread());//设置为以后线程独占资源 else acquire(1); } //非偏心锁版本下重写AQS的tryAcquire protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } /** * 偏心锁对象 */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } /** * 偏心锁版本下重写AQS的tryAcquire */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } public ReentrantLock() { sync = new NonfairSync(); } /** * 创立ReentrantLock非偏心或偏心锁实例 */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } /** * 获取锁办法,供内部服务调用 */ public void lock() { sync.lock(); } /** * 尝试开释锁,供内部服务调用 */ public void unlock() { sync.release(1); } /** * 是否有线程持有该锁,有返回true,无则返回false,供内部服务调用 */ public boolean isLocked() { return sync.isLocked(); } ...... }
ReentrantLock的lock、unlock、isLocked办法在实现时要依赖自定义的Sync外部类(Sync依赖于AQS),Sync类提供了根本的同步器机制的实现,比方加锁办法nonfairTryAcquire和解锁办法tryRelease,还派生出两个偏心策略的类如FairSync和NonfairSync。在sync通过实现state的值来判断是否能够容许获取和开释,根本通过getState,setState以及compareAndSetState来操作的同步器的状态。至于线程的排队、期待、唤醒等,AQS都曾经实现好了。
通过这大节咱们晓得同步器是基于AQS构建的,实现了AQS最根本的操作包含各种模式的获取操作和开释操作。依据同步器不同,AQS获取操作能够是一种独占操作(ReentrantLock),也能够是非独占操作(Semaphore、CountDownLatch)。如果同步器反对独占获取操作,那么须要实现一些爱护办法:tryAcquire、tryRelease和isHeldExclusively等,而对于共享获取的同步器,则应该实现tryAcquireShared和tryReleaseShared等办法。那么AQS如何做进行线程排队、期待、唤醒呢?
四、AQS源码解析
AQS解决了实现同步器的大量细节,期待线程采纳FIFO双向队列操作程序。本节摘录AQS独占模式下的两个入口办法来开展:acquire(获取锁)和release(开释锁)
1.节点node
AQS外部保护了一个FIFO双向期待队列构造,head是队列头节点,tail是队列尾节点。双向链表构造里有两个指针,pre指针指向后任节点,next指针指向后继节点。当线程竞争失败时会创立一个包含此线程的Node节点退出期待队列尾部。期待队列如下图:
2.加锁
4.2.1 acquire办法
acquire是独占模式下获取锁的入口办法,acquire源码如下:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
acquire办法流程:
- 1、tryAcquire在第3节提到,如果没有线程持有锁或者是以后线程持有锁时,会获取资源间接返回;
- 2、如果锁被其余线程持有,会调用addWaiter办法结构Node节点退出期待队列尾部;
- 3、acquireQueued办法来以独占的不可中断的形式获取曾经在队列中的线程,如果自旋过程被中断过,返回true,否则返回false;
- 4、如果自旋过程中被中断过,acquireQueued办法不会停止,会在最终获取线程后,再调用selfInterrput(),中断以后线程;
- 5、如果在获取队列中的线程的过程中出现异常且获取锁失败时,会勾销线程获取操作(把该线程节点的waitStatus标记为CANCELLED,如果是尾节点将其移出队列;如果不是将会跳过此节点指向下一个节点)。
上面具体挖下acquire办法里的几个办法:
4.2.1.1 addWaiter
//为以后线程以给定模式创立排队节点。Node.EXCLUSIVE为独占模式, Node.SHARED共享模式private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 间接入队尾,这种状况表明队列中已有期待的节点 Node pred = tail; if (pred != null) { node.prev = pred;//把以后节点后任节点指向队尾节点 if (compareAndSetTail(pred, node)) {//cas增加到同步队列 pred.next = node;//把旧队尾的后置节点指向以后节点,此时以后节点变成新的队尾节点 return node; } } enq(node);//tail为null时,走enq办法初始化队列(刚开始)或者增加到队尾 return node; }
4.2.1.2 enq
private Node enq(final Node node) { for (;;) {//保持一直的自旋,直到退出队列 Node t = tail; if (t == null) { //刚开始队列为空,设置新节点为head节点,并把tail节点也指向它 if (compareAndSetHead(new Node())) tail = head; } else {//失常退出队尾 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
4.2.1.3 acquireQueued
通过addWaiter设置好队列后,下一步就看线程如何获取锁了。acquireQueued办法就是以独占的不可中断的形式获取曾经在队列中的线程。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true;//标记是否胜利获取锁 try { boolean interrupted = false;//标记是否中断过 for (;;) {//自旋取得锁 final Node p = node.predecessor();//获取后任节点 if (p == head && tryAcquire(arg)) {//节点为head时才有资格去获取锁 setHead(node);//旧head节点曾经拿到锁,把以后节点高为新head p.next = null; // 不便原head节点GC(setHead中曾经把node.prev=null,这标记着没有指针指向head了,而head也没有后继指针了,能够回收了) failed = false;//胜利取得锁 return interrupted; } //如果没有拿到锁,依据后任节点waitStatus状态判断是否须要挂起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())//执行挂起,判断以后线程是否有中断标记 interrupted = true; } } finally {//如果在获取队列中的线程的过程中出现异常且锁获取失败,会勾销线程获取操作(把该线程节点的waitStatus标记为CANCELLED,如果是尾节点将其移出队列;如果不是将会跳过此节点指向下一个节点) if (failed) cancelAcquire(node); } }
以上代码显示出,只有head节点有资格去获取锁,获取锁胜利后会把以后节点设置为新head,同时让GC回收旧的head节点对象 。如果在获取队列中的线程的过程中出现异常且锁获取失败,会勾销线程获取操作(把该线程节点的waitStatus标记为CANCELLED,如果是尾节点将其移出队列;如果不是将会跳过此节点指向下一个节点)。那如果不是头节点或者别的线程依然没有开释锁怎么办呢?还需看shouldParkAfterFailedAcquire办法和parkAndCheckIntrrupt办法。
4.2.1.4 shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus;//拿到后任节点状态 if (ws == Node.SIGNAL)//如果是SIGNAL状态,能够失常阻塞了,因为以后节点晓得会被失常告诉/唤醒 return true; if (ws > 0) {//后任状态为CANCELLED时(只有CANCELLED状态>0) do {//有限次循环,直到找到失常的后任节点状态后跳出循环 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //如果后任节点为0 or PROPAGATE状态,设置后任为SIGNAL状态 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
整个流程中,如果后任结点的状态不是SIGNAL,那么必须保障后任为SIGNAL,以便本人被失常唤醒。
4.2.1.5 parkAndCheckInterrupt
LockSupport.part(this)将线程挂起到waittng状态,真正进入阻塞状态,它须要uppark()、interrupt()办法来中断它,以此实例FIFO队列阻塞操作
private final boolean parkAndCheckInterrupt() { LockSupport.park(this);//进入阻塞状态 return Thread.interrupted();//以后线程是否被中断,Thread.interrupted()会革除以后线程的中断标记位(如果以后线程已中断,第一次调用这个办法的返回值是true,第二次调用这个办法的返回值为false) }
看完shouldParkAfterFailedAcquire()和parkAndCheckInterrupt(),再答复下面acquireQueued办法起初提出的问题(如果不是头节点或者别的线程依然没有开释锁怎么办呢?):没有取得锁的线程会去查看下本人的后任节点是否是SIGNAL状态,在保障后任节点为SIGNAL状态下,本人真正进入到waiting状态,筹备被唤醒。唤醒的办法能够是uppark()、interrupt(),唤醒后再次尝试获取锁,以此类推,直到获取锁为止。获取锁胜利后会把以后节点设置为新head,同时让GC回收旧的head节点对象。在此过程中还会判断线程有没有被中断过,如果被中断过一次,会在acquire办法中自我中断(锁开释时会中断线程,中断的操作比较复杂,请另行参考线程中断的文章)。
3.开释锁
4.3.1 release办法
下面摸索了如何加锁及队列阻塞的操作,上面再看看如何开释锁。如上第二节中锁开释调用unlock(),这个办法会进入AQS的release办法。AQS release是独占模式下开释锁的入口办法,release源码如下:
public final boolean release(int arg) { if (tryRelease(arg)) {//如果state=0时,能够开释锁 Node h = head; if (h != null && h.waitStatus != 0)//waitStatus为0状况,只有在新建节点时才会初始化为0,而新建的节点没有后继节点,不须要执行唤醒 unparkSuccessor(h);//唤醒期待队列里的下个线程 return true; } return false;}
release办法流程:
- 1、tryRelease在第3节提到,如果以后线程是锁的持有者,则同步状态state -1,如果state=0,锁才会开释;如果以后线程不是锁的持有者,抛出异样IllegalMonitorStateException
- 2、unparkSuccessor办法用于唤醒期待队列中下一个线程。
- 3、为什么h.waitStatus !=0时才会执行唤醒unparkSuccessor办法呢?head节点的waitStatus=0会怎么样?waitStatus为0,只有在addWaiter办法中新建节点时才会初始化为0,其余节点曾经在shouldParkAfterFailedAcquire办法里调整过了(参考4.2.1.4,那时head的waitStatus=SIGNAL)。而新建的节点没有后继节点,也就不须要唤醒了。
4.3.1.1 unparkSuccessor办法
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);//ws小于0,将设置头节点watistatus为0 Node s = node.next;//唤醒对象为后继节点 if (s == null || s.waitStatus > 0) {//后继节点为空或勾销场景 s = null; for (Node t = tail; t != null && t != node; t = t.prev)//从队尾向前找waitstatus <= 0的节点 if (t.waitStatus <= 0)//留神此处循环没有return,会始终往前找,找到离head最近的节点 s = t; } if (s != null) LockSupport.unpark(s.thread);//唤醒后继节点 }
unparkSuccessor首先会唤醒head的下一个节点,如果将被唤醒节点不为空,则间接通过unpart()办法来开释挂起的线程;如果head后继节点为空,则从后向前找,最初找到离head最近的节点。再联合acquireQueued()办法进入if (p == head && tryAcquire(arg))的判断来获取锁。因为拿不到锁的线程都会自旋,所以不必放心进不了这个判断if (p == head && tryAcquire(arg)),直到拿到锁完结。
到这里可能会有疑难,为什么后继节点要从队尾开始向前找呢,从前向后找不是更快吗?也就是说为什么循环中是t=tail & t=t.prev(反向),而不是t=head & t=t.next(正向)?返回到新建期待节点addWaiter办法看到,compareAndSetTail办法把新建的node增加到同步队列前曾经设置了节点的prev,而后任节点的next指向新节点是在增加到队列之后。这就阐明在unparkSuccessor办法来唤醒时,可能队尾节点还没来得及执行pred.next=node;这句话,t.next正向找后继节点就会漏掉这个新加的节点,用t.prev反向找更牢靠。
if (pred != null) { node.prev = pred;//把以后节点后任节点指向队尾节点 if (compareAndSetTail(pred, node)) {//cas增加到同步队列 pred.next = node;//把旧队尾的后置节点指向以后节点,此时以后节点变成新的队尾节点 return node; } }
五、ReentrantLock独占模式整体流程图
以上时整个独占模式下锁获取和开释的流程图,供本人参考。
总结
1、本文通过源码追踪的形式解析了本人对ReentrantLock和AQS的了解。ReentrantLock是AQS的实现。AQS设计节点waitStatus、队列状态status等状态属性,保护FIFO队列对线程节点进行阻塞以及独占锁策略的设计等诸多细节。
2、ReetrantLock通过自定义同步器并通过state的值来判断是否能够容许获取锁、开释锁或进入队列。
3、AQS队列中又通过waitStatus状态判断是否线程进入阻塞状态和唤醒线程。
4、AQS保护了一个同步队列,没有获取锁的线程会进行同步队列且会不停的自旋直到该线程的节点成为头部节点且取得了锁才进行自旋。
5、开释锁时会唤醒头节点的后继节点。
最初
欢送关注公众号:前程有光,支付一线大厂Java面试题总结+各知识点学习思维导+一份300页pdf文档的Java外围知识点总结!