乐趣区

关于java:ReentrantLock源码学习

ReentrantLock, 咱们称之为可重入锁。其中依赖了 AbstractQueuedSynchronizer 类来实现线程的同步。

ReentrantLock 中定义了一个 Sync 的同步类,源码如下:

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

                 // 形象办法
        abstract void lock();

           // 非偏心,尝试获取资源
        final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

  // 独占形式,尝试开释资源,胜利返回 true,失败返回 false
        protected final boolean tryRelease(int releases) {int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        final ConditionObject newCondition() {return new ConditionObject();
        }

        // Methods relayed from outer class

        final Thread getOwner() {return getState() == 0 ? null : getExclusiveOwnerThread();}

        final int getHoldCount() {return isHeldExclusively() ? getState() : 0;}

        final boolean isLocked() {return getState() != 0;
        }

        /**
         * Reconstitutes the instance from a stream (that is, deserializes it).
         */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

Sync 类继承了 AbstractQueuedSynchronizer,简称 AQS。

AQS 中提供了两种锁:

  • 独占锁,同一时刻只容许一个线程取得锁

    /**
         * 独占形式,尝试获取资源,胜利返回 true,失败返回 false
         * @param arg
         * @return
         */
    @Override
    protected boolean tryAcquire(int arg) {return super.tryAcquire(arg);
    }
    
    
    /**
         * 独占形式,尝试开释资源,胜利返回 true,失败返回 false
         * @param arg
         * @return
         */
    @Override
    protected boolean tryRelease(int arg) {return super.tryRelease(arg);
    }
  • 共享锁,同一时刻容许多个线程同时取得锁
  /**
     * 共享形式,尝试开释资源,如果开释后容许唤醒后续期待节点则返回 true,否则返回 false
     * @param arg
     * @return
     */
    @Override
    protected boolean tryReleaseShared(int arg) {return super.tryReleaseShared(arg);
    }

    /**
     * 共享形式。尝试获取资源。正数示意失败;0 示意胜利,但没有残余可用资源;负数示意胜利,且有残余可用资源。* @param arg
     * @return
     */
    @Override
    protected int tryAcquireShared(int arg) {return super.tryAcquireShared(arg);
    }

外围变量

   /**
     * 同步状态变量
     */
    private volatile int state;
  • state > 0:示意有线程曾经抢占到资源,然而并未开释,在重入的状况下 state 的值可能大于 1
  • state = 0:示意以后锁资源处于闲暇状态
// 保障多线程竞争下 state 的原子性 
protected final boolean compareAndSetState(int expect, int update) {
  // See below for intrinsics setup to support this
  return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

ReentrantLock 源码

  • 当调用 ReentrantLock.lock() 办法实际上是调用形象动态外部类 sync.lock() 办法。
public void lock() {sync.lock();
}

syanc 有两个具体的实现:

偏心锁,必须依照 FIFO 的规定来拜访锁资源

  • static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
    
        final void lock() {acquire(1);
        }
        protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

    非偏心锁,能够不依照 FIFO 的规定,间接尝试获取锁资源,默认应用非偏心锁

  •   static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
            final void lock() {
              // 不论以后线程是否排队,间接通过 CAS 抢占锁资源,如果胜利则示意获取锁,// 否则这调用 acquire(1) 执行锁竞争的逻辑;
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
            }
        }

    acquire(int i) 办法源码

/**
  通过 tryAcquire() 办法尝试获取独占锁,如果胜利则返回 true,否则返回 false。如果 tryAcquire() 办法返回 false,则阐明以后锁被占用,只能通过 addWaiter() 办法将以后线程封装成 Node 并增加到 AQS 的同步队列中
  acquireQueued() 办法将 Node 作为参数,通过自旋去尝试获取锁
*/
public final void acquire(int arg) {if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();}

CAS 实现原理

protected final boolean compareAndSetState(int expect, int update) {
  // 通过 CAS 乐观锁的形式来做比拟并替换,如果以后内存中 state 的值和预期值 expect 相等,则更新为 update。如果更新胜利则返回 true,否则返回 false。// 这个操作是原子性的,不波及 state 属性,也不会呈现线程平安问题
  return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

state 属性

state 是 AQS 中的一个属性,它在不同的实现中所表白的含意是不一样的。对重入锁的实现来说,state 示意同步状态,它有如下两个含意。

  • 当 state= 0 时,示意无锁状态。
  • 当 state>0 时,示意曾经有线程取得了锁,也就是说 state=1,然而因为 ReentrantLock 容许重入,所以当同一个线程屡次取得同步锁的时候,state 会递增,比方重入 5 次,那么 state=5。而在开释锁的时候,同样须要开释 5 次,直到 state= 0 其余线程才有资格取得锁。

nonfairTryAcquire() 办法源码

final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();// 获取以后线程
  int c = getState();// 获取 state 值
  if (c == 0) {// 等于 0 示意无锁
    if (compareAndSetState(0, acquires)) {//CAS 比拟并替换 state 的值,胜利则示意获取锁
      setExclusiveOwnerThread(current);// 保留以后获取到线程的锁,下次访问此资源不须要再次竞争锁
      return true;
    }
  }
  else if (current == getExclusiveOwnerThread()) {// 如果是同一个线程
    // 则间接减少重入的次数
    int nextc = c + acquires;
    if (nextc < 0) // overflow
      throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
  }
  return false;
}

nonfairTryAcquire() 办法的实现逻辑如下。
判断以后锁的状态,c== 0 示意无锁,在无锁状态下通过 compareAndSetState() 办法批改 state 抢占锁资源。
○ 如果抢占胜利,则返回 true。
○ 如果抢占失败,则返回 false。
current == getExclusiveOwnerThread(),该判断阐明抢占到锁的线程和以后线程是同一个线程,示意线程重入,因而间接减少重入次数并保留到 state 字段中

AbstractQueuedSynchronizer.addWaiter(Node mode)

当 tryAcquire() 办法获取锁失败当前,会先调用 addWaiter() 办法把以后线程封装成 Node 退出同步队列中;源码如下

private Node addWaiter(Node mode) {// 入参 mode 示意以后节点的状态,传递的参数是 Node.EXCLUSIVE,示意独占状态。Node node = new Node(Thread.currentThread(), mode);// 把获取锁失败的线程封装成 Node
  Node pred = tail;//tail 在 AQS 中示意队列对尾的,默认为 null
  if (pred != null) {// 在 tail 不为 null 的状况下,队列中示意有节点
    node.prev = pred;// 把以后线程的 Node 的 prev 指向 tail
    if (compareAndSetTail(pred, node)) {// 通过 CAS 把 node 退出 AQS 队列中,也就是设置为 tail
      pred.next = node;// 把原来 tail 节点的 next 指向以后 node
      return node;
    }
  }
  enq(node);// 当 tail=null 时,把 node 增加到同步队列
  return node;
}

将以后线程封装成 Node 并进行存储,后续能够间接从节点中失去线程,再通过 unpark(thread) 办法来唤醒。
通过 pred!=null 判断以后链表是否曾经实现初始化,如果曾经实现初始化,则通过 compareAndSetTail 操作把以后线程的 Node 设置为 tail 节点,并建设双向关联。
如果链表还没初始化或者 CAS 增加失败(存在线程竞争),则调用 enq() 办法来实现增加操作。

enq() 办法

private Node enq(final Node node) {for (;;) {
    Node t = tail;
    if (t == null) { // 如果为 null 则调用 CAS 初始化。直到胜利初始化
      if (compareAndSetHead(new Node()))
        tail = head;
    } else {
      node.prev = t;
      if (compareAndSetTail(t, node)) {
        t.next = node;
        return t;
      }
    }
  }
}

该办法采纳了自旋锁来实现同步队列的初始化,并把以后节点增加到了同步队列中。AQS 的整体构造如图:

ReentrantLock 开释锁源码剖析

public void unlock() {sync.release(1);
}

public final boolean release(int arg) {if (tryRelease(arg)) {// 开释胜利
    Node h = head;// 获取到 AQS 中的 head 节点
    if (h != null && h.waitStatus != 0)
      // 如果 head 不为 null 且状态不等于 0,则调用 unparkSuccessor(h) 办法唤醒后续节点
      unparkSuccessor(h);
    return true;
  }
  return false;
}

tryRelease(int releases) 通过批改 state 值来开释锁

protected final boolean tryRelease(int releases) {int c = getState() - releases;// 减去开释的次数
  if (Thread.currentThread() != getExclusiveOwnerThread())
    throw new IllegalMonitorStateException();
  boolean free = false;
  if (c == 0) {
    free = true;
    setExclusiveOwnerThread(null);//
  }
  setState(c);
  return free;
}

独占锁在加锁时状态会加 1,在开释锁时状态回减 1,同一个锁可重入后,可能会递增,呈现 2,3,4,5 这些值, 只有调用 unlock() 办法的次数与调用 lock() 办法的次数相等 ,才会将 ExclusiveOwnerThread 线程设置为空,示意锁开释结束

unparkSuccessor(Node node) 唤醒同步队列中的线程

private void unparkSuccessor(Node node) {

  int ws = node.waitStatus;// 获取 head 节点的状态
  if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);// 设置节点状态为 0
  Node s = node.next;// 失去 head 节点的下一个节点
  if (s == null || s.waitStatus > 0) {
    // 如果下一个节点为 null 或者 status>0,则 cancelled 状态
    // 通过从尾部节点开始扫描,找到间隔 head 最近的一个 waitStatus<= 0 的节点
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
      if (t.waitStatus <= 0)
        s = t;
  }
  if (s != null)// 如果 next 节点不为空,则间接唤醒这个线程
    LockSupport.unpark(s.thread);
}

unparkSuccessor() 办法次要有两个逻辑。

  • 判断以后节点的状态,如果节点状态已生效,则从 tail 节点开始扫描,找到离 head 最近且状态为 SIGNAL 的节点。
  • 通过 LockSupport.unpark() 办法唤醒该节点。

    为什么要从 tail 开始往前扫描?
    这和 enq() 办法有关系,在 enq() 办法的逻辑中,把一个新节点增加到链表中的逻辑如下。
    将新节点的 prev 指向 tail。
    通过 CAS 将 tail 设置为新节点,因为 CAS 是原子操作,所以可能保障线程的安全性。
    t.next=node,目标是设置原 tail 的 next 节点指向新节点。
    如果在 CAS 操作之后、t.next=node 操作之前,存在其余线程调用 unlock() 办法从 head 开始往后遍历,因为 t.next=node 还没执行,所以链表的关系还没有建设残缺,就会导致遍历到 t 节点的时候被中断。而如果从 tail 往前遍历,就肯定不会呈现这个问题。

开释锁的线程继续执行

回到 AQS 中的 acquireQueued() 办法,本来未抢占到锁的线程被阻塞在该办法中,当被阻塞的线程被唤醒后,持续从阻塞的地位开始执行,代码如下。

final boolean acquireQueued(final Node node, int arg) {
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {final Node p = node.predecessor();// 返回上一个节点
      if (p == head && tryAcquire(arg)) {// 再次抢占锁资源
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return interrupted;
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())// 唤醒,进入下一次循环
        interrupted = true;
    }
  } finally {if (failed)
      cancelAcquire(node);
  }
}
退出移动版