ReentrantLock,咱们称之为可重入锁。其中依赖了AbstractQueuedSynchronizer类来实现线程的同步。

ReentrantLock中定义了一个Sync的同步类,源码如下:

abstract static class Sync extends AbstractQueuedSynchronizer {        private static final long serialVersionUID = -5179523762034025860L;                 //形象办法        abstract void lock();           //非偏心,尝试获取资源        final boolean nonfairTryAcquire(int acquires) {            final Thread current = Thread.currentThread();            int c = getState();            if (c == 0) {                if (compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            }            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0) // overflow                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }  //独占形式,尝试开释资源,胜利返回true,失败返回false        protected final boolean tryRelease(int releases) {            int c = getState() - releases;            if (Thread.currentThread() != getExclusiveOwnerThread())                throw new IllegalMonitorStateException();            boolean free = false;            if (c == 0) {                free = true;                setExclusiveOwnerThread(null);            }            setState(c);            return free;        }        protected final boolean isHeldExclusively() {            // While we must in general read state before owner,            // we don't need to do so to check if current thread is owner            return getExclusiveOwnerThread() == Thread.currentThread();        }        final ConditionObject newCondition() {            return new ConditionObject();        }        // Methods relayed from outer class        final Thread getOwner() {            return getState() == 0 ? null : getExclusiveOwnerThread();        }        final int getHoldCount() {            return isHeldExclusively() ? getState() : 0;        }        final boolean isLocked() {            return getState() != 0;        }        /**         * Reconstitutes the instance from a stream (that is, deserializes it).         */        private void readObject(java.io.ObjectInputStream s)            throws java.io.IOException, ClassNotFoundException {            s.defaultReadObject();            setState(0); // reset to unlocked state        }    }

Sync类继承了AbstractQueuedSynchronizer,简称AQS。

AQS中提供了两种锁:

  • 独占锁,同一时刻只容许一个线程取得锁

    /**     * 独占形式,尝试获取资源,胜利返回true,失败返回false     * @param arg     * @return     */@Overrideprotected boolean tryAcquire(int arg) {  return super.tryAcquire(arg);}/**     * 独占形式,尝试开释资源,胜利返回true,失败返回false     * @param arg     * @return     */@Overrideprotected boolean tryRelease(int arg) {  return super.tryRelease(arg);}
  • 共享锁,同一时刻容许多个线程同时取得锁
  /**     * 共享形式,尝试开释资源,如果开释后容许唤醒后续期待节点则返回true,否则返回false     * @param arg     * @return     */    @Override    protected boolean tryReleaseShared(int arg) {        return super.tryReleaseShared(arg);    }    /**     * 共享形式。尝试获取资源。正数示意失败;0示意胜利,但没有残余可用资源;负数示意胜利,     且有残余可用资源。     * @param arg     * @return     */    @Override    protected int tryAcquireShared(int arg) {        return super.tryAcquireShared(arg);    }

外围变量

   /**     * 同步状态变量     */    private volatile int state;
  • state > 0:示意有线程曾经抢占到资源,然而并未开释,在重入的状况下state的值可能大于1
  • state = 0:示意以后锁资源处于闲暇状态
//保障多线程竞争下state的原子性 protected final boolean compareAndSetState(int expect, int update) {  // See below for intrinsics setup to support this  return unsafe.compareAndSwapInt(this, stateOffset, expect, update);}

ReentrantLock源码

  • 当调用ReentrantLock.lock()办法实际上是调用形象动态外部类sync.lock()办法。
public void lock() {  sync.lock();}

syanc有两个具体的实现:

偏心锁,必须依照FIFO的规定来拜访锁资源

  • static final class FairSync extends Sync {    private static final long serialVersionUID = -3000897897090466540L;    final void lock() {        acquire(1);    }    protected final boolean tryAcquire(int acquires) {        final Thread current = Thread.currentThread();        int c = getState();        if (c == 0) {            if (!hasQueuedPredecessors() &&                compareAndSetState(0, acquires)) {                setExclusiveOwnerThread(current);                return true;            }        }        else if (current == getExclusiveOwnerThread()) {            int nextc = c + acquires;            if (nextc < 0)                throw new Error("Maximum lock count exceeded");            setState(nextc);            return true;        }        return false;    }}

    非偏心锁,能够不依照FIFO的规定,间接尝试获取锁资源,默认应用非偏心锁

  •   static final class NonfairSync extends Sync {        private static final long serialVersionUID = 7316153563782823691L;        final void lock() {          //不论以后线程是否排队,间接通过CAS抢占锁资源,如果胜利则示意获取锁,          //否则这调用 acquire(1)执行锁竞争的逻辑;            if (compareAndSetState(0, 1))                setExclusiveOwnerThread(Thread.currentThread());            else                acquire(1);        }        protected final boolean tryAcquire(int acquires) {            return nonfairTryAcquire(acquires);        }    }

    acquire(int i)办法源码

/**  通过tryAcquire()办法尝试获取独占锁,如果胜利则返回true,否则返回false。  如果tryAcquire()办法返回false,则阐明以后锁被占用,只能通过addWaiter()办法将以后线程封装成Node并增加到AQS的同步队列中  acquireQueued()办法将Node作为参数,通过自旋去尝试获取锁*/public final void acquire(int arg) {  if (!tryAcquire(arg) &&      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))    selfInterrupt();}

CAS实现原理

protected final boolean compareAndSetState(int expect, int update) {  // 通过CAS乐观锁的形式来做比拟并替换,如果以后内存中state的值和预期值expect相等,则更新为update。如果更新胜利则返回true,否则返回false。//这个操作是原子性的,不波及state属性,也不会呈现线程平安问题  return unsafe.compareAndSwapInt(this, stateOffset, expect, update);}

state属性

state是AQS中的一个属性,它在不同的实现中所表白的含意是不一样的。对重入锁的实现来说,state示意同步状态,它有如下两个含意。

  • 当state=0时,示意无锁状态。
  • 当state>0时,示意曾经有线程取得了锁,也就是说state=1,然而因为ReentrantLock容许重入,所以当同一个线程屡次取得同步锁的时候,state会递增,比方重入5次,那么state=5。而在开释锁的时候,同样须要开释5次,直到state=0其余线程才有资格取得锁。

nonfairTryAcquire()办法源码

final boolean nonfairTryAcquire(int acquires) {  final Thread current = Thread.currentThread();//获取以后线程  int c = getState();//获取state值  if (c == 0) {//等于0示意无锁    if (compareAndSetState(0, acquires)) {//CAS比拟并替换state的值,胜利则示意获取锁      setExclusiveOwnerThread(current);//保留以后获取到线程的锁,下次访问此资源不须要再次竞争锁      return true;    }  }  else if (current == getExclusiveOwnerThread()) {//如果是同一个线程    //则间接减少重入的次数    int nextc = c + acquires;    if (nextc < 0) // overflow      throw new Error("Maximum lock count exceeded");    setState(nextc);    return true;  }  return false;}

nonfairTryAcquire()办法的实现逻辑如下。
判断以后锁的状态,c==0示意无锁,在无锁状态下通过compareAndSetState()办法批改state抢占锁资源。
○ 如果抢占胜利,则返回true。
○ 如果抢占失败,则返回false。
current == getExclusiveOwnerThread(),该判断阐明抢占到锁的线程和以后线程是同一个线程,示意线程重入,因而间接减少重入次数并保留到state字段中

AbstractQueuedSynchronizer.addWaiter(Node mode)

当tryAcquire()办法获取锁失败当前,会先调用addWaiter()办法把以后线程封装成Node退出同步队列中;源码如下

private Node addWaiter(Node mode) {//入参mode示意以后节点的状态,传递的参数是Node.EXCLUSIVE,示意独占状态。  Node node = new Node(Thread.currentThread(), mode);//把获取锁失败的线程封装成Node  Node pred = tail;//tail在AQS中示意队列对尾的,默认为null  if (pred != null) {//在tail不为null的状况下,队列中示意有节点    node.prev = pred;//把以后线程的Node的prev指向tail    if (compareAndSetTail(pred, node)) {//通过CAS把node退出AQS队列中,也就是设置为tail      pred.next = node;//把原来tail节点的next指向以后node      return node;    }  }  enq(node);//当tail=null时,把node增加到同步队列  return node;}

将以后线程封装成Node并进行存储,后续能够间接从节点中失去线程,再通过unpark(thread)办法来唤醒。
通过pred!=null判断以后链表是否曾经实现初始化,如果曾经实现初始化,则通过compareAndSetTail操作把以后线程的Node设置为tail节点,并建设双向关联。
如果链表还没初始化或者CAS增加失败(存在线程竞争),则调用enq()办法来实现增加操作。

enq()办法

private Node enq(final Node node) {  for (;;) {    Node t = tail;    if (t == null) { // 如果为null则调用CAS初始化。直到胜利初始化      if (compareAndSetHead(new Node()))        tail = head;    } else {      node.prev = t;      if (compareAndSetTail(t, node)) {        t.next = node;        return t;      }    }  }}

该办法采纳了自旋锁来实现同步队列的初始化,并把以后节点增加到了同步队列中。AQS的整体构造如图:

 ReentrantLock开释锁源码剖析

public void unlock() {  sync.release(1);}public final boolean release(int arg) {  if (tryRelease(arg)) {//开释胜利    Node h = head;//获取到AQS中的head节点    if (h != null && h.waitStatus != 0)      //如果head不为null且状态不等于0,则调用 unparkSuccessor(h)办法唤醒后续节点      unparkSuccessor(h);    return true;  }  return false;}

tryRelease(int releases) 通过批改state值来开释锁

protected final boolean tryRelease(int releases) {  int c = getState() - releases;//减去开释的次数  if (Thread.currentThread() != getExclusiveOwnerThread())    throw new IllegalMonitorStateException();  boolean free = false;  if (c == 0) {    free = true;    setExclusiveOwnerThread(null);//  }  setState(c);  return free;}

独占锁在加锁时状态会加1,在开释锁时状态回减1,同一个锁可重入后,可能会递增,呈现2,3,4,5这些值,只有调用unlock()办法的次数与调用lock()办法的次数相等,才会将ExclusiveOwnerThread线程设置为空,示意锁开释结束

unparkSuccessor(Node node)唤醒同步队列中的线程

private void unparkSuccessor(Node node) {  int ws = node.waitStatus;//获取head节点的状态  if (ws < 0)    compareAndSetWaitStatus(node, ws, 0);//设置节点状态为0  Node s = node.next;//失去head节点的下一个节点  if (s == null || s.waitStatus > 0) {    //如果下一个节点为null或者status>0,则cancelled状态    //通过从尾部节点开始扫描,找到间隔head最近的一个waitStatus<=0的节点    s = null;    for (Node t = tail; t != null && t != node; t = t.prev)      if (t.waitStatus <= 0)        s = t;  }  if (s != null)//如果next节点不为空,则间接唤醒这个线程    LockSupport.unpark(s.thread);}

unparkSuccessor()办法次要有两个逻辑。

  • 判断以后节点的状态,如果节点状态已生效,则从tail节点开始扫描,找到离head最近且状态为SIGNAL的节点。
  • 通过LockSupport.unpark()办法唤醒该节点。

    为什么要从tail开始往前扫描?
    这和enq()办法有关系,在enq()办法的逻辑中,把一个新节点增加到链表中的逻辑如下。
    将新节点的prev指向tail。
    通过CAS将tail设置为新节点,因为CAS是原子操作,所以可能保障线程的安全性。
    t.next=node,目标是设置原tail的next节点指向新节点。
    如果在CAS操作之后、t.next=node操作之前,存在其余线程调用unlock()办法从head开始往后遍历,因为t.next=node还没执行,所以链表的关系还没有建设残缺,就会导致遍历到t节点的时候被中断。而如果从tail往前遍历,就肯定不会呈现这个问题。

 开释锁的线程继续执行

回到AQS中的acquireQueued()办法,本来未抢占到锁的线程被阻塞在该办法中,当被阻塞的线程被唤醒后,持续从阻塞的地位开始执行,代码如下。

final boolean acquireQueued(final Node node, int arg) {  boolean failed = true;  try {    boolean interrupted = false;    for (;;) {      final Node p = node.predecessor();//返回上一个节点      if (p == head && tryAcquire(arg)) {//再次抢占锁资源        setHead(node);        p.next = null; // help GC        failed = false;        return interrupted;      }      if (shouldParkAfterFailedAcquire(p, node) &&          parkAndCheckInterrupt())//唤醒,进入下一次循环        interrupted = true;    }  } finally {    if (failed)      cancelAcquire(node);  }}