关于java:万字长文解析ReentrantLock源码

27次阅读

共计 12157 个字符,预计需要花费 31 分钟才能阅读完成。

ReentrantLock 特色

特点:

1. 可重入
2. 偏心 / 非偏心
3. 可中断
4. 反对条件期待
5. 可设置锁超时

罕用 API

应用例子:

public class ReentrantLockTest {static ReentrantLock lock = new ReentrantLock(true);

    static class ClientThread extends Thread {
        @Override
        public void run() {System.out.println(Thread.currentThread() + "开始尝试获取锁");
            lock.lock();
            try {System.out.println(Thread.currentThread() + "胜利获取锁");
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {e.printStackTrace();
            } finally {lock.unlock();
                System.out.println(Thread.currentThread() + "实现开释锁");
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {ClientThread t1 = new ClientThread();
        ClientThread t2 = new ClientThread();
        ClientThread t3 = new ClientThread();
        t1.start();
        t2.start();
        t3.start();

        TimeUnit.SECONDS.sleep(10);
    }
}

源码剖析

获取锁

如果我应用上面的代码进行获取就行:

ReentrantLock lock = new ReentrantLock();
lock.lock();
lock.unlock();

ReentrantLock 默认调用的就是非偏心锁 调用栈:java.util.concurrent.locks.ReentrantLock#lock

  • java.util.concurrent.locks.ReentrantLock.NonfairSync#lock

    final void lock() {
      // 间接尝试加锁
      if (compareAndSetState(0, 1))
          setExclusiveOwnerThread(Thread.currentThread());
      else
          // 如果获取锁失败进入 AQS acquire 逻辑
          acquire(1);
    }

    如果 compareAndSetState(0, 1) 可能间接执行胜利,那么将间接完结办法的执行。如果失败,那么就会调用 acquire 办法如下:

    public final void acquire(int arg) {// tryAcquire(arg) 尝试获取锁
      // acquireQueued 获取锁失败进行期待队列
      if (!tryAcquire(arg) &&
          acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
          selfInterrupt();}

    咱们先看 tryAcquire 办法:java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire

  • java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire

他会间接调用到 nonfairTryAcquire 非偏心锁的加锁逻辑 外面有两个逻辑:

  • 如果以后状态无锁,间接尝试加锁,加锁胜利返回 true
  • 如果以后时锁重入,那么间接批改 AQS 状态共享变量值 state 等于 c + acquires, 加锁胜利返回 ture
  • 如果都不满足,那么返回加锁失败返回 false

    // 非偏心锁的逻辑
    // 如何了解插队, 这里的插队是以后队列中被唤醒的线程, 和以后退出的线程都能够被执行
    // 如果以后退出线程比队列中唤醒的线程先获取到锁, 就是插队景象
    final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
      int c = getState();
      // 无锁状态, 尝试竞争
      if (c == 0) {if (compareAndSetState(0, acquires)) { // 是否获取到锁
              setExclusiveOwnerThread(current);
              return true;
          }
      }
      // 以后线程持有锁, state 计数 +1
      else if (current == getExclusiveOwnerThread()) { // 判断是否是重入
          int nextc = c + acquires;
          if (nextc < 0) // overflow
              throw new Error("Maximum lock count exceeded");
          setState(nextc);
          return true;
      }
      return false;
    }

    如果 tryAcquire 调用实现后是获取锁胜利 acquire 办法执行完结,最初代表 lock 办法执行完结。

获取锁失败进入同步队列

如果获取锁失败,那么就会执行 acquire 代码前面段 if 逻辑的执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 这里其实能够分为两个办法来看

  • addWaiter(Node.EXCLUSIVE)
  • acquireQueued(xxx, arg)

依照执行程序,咱们先看 addWaiter(Node.EXCLUSIVE) 这里次要是入队的逻辑。addWaiter: java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter

private Node addWaiter(Node mode) {
    // 将以后线程转换为 AQS Node 节点
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // 尝试间接退出到尾节点
    Node pred = tail;
    if (pred != null) {
        // 以后节点的前驱节点指向尾节点
        node.prev = pred;
        // cas 批改 tail 节点,如果胜利返回 node 
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果失败,调用 enq
    enq(node);
    return node;
}

enq 是将以后节点插入队列,必要的时候会进行初始化

// 将节点插入队列,必要时进行初始化。private Node enq(final Node node) {for (;;) {
        Node t = tail;
        // 如果没有尾节点,那么须要进行初始化
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } 
        // 如果有尾节点 / 其实就是有头节点 / 曾经被初始化,通过 CAS 入队
        else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

后面咱们看看完了,以后获取锁的线程当获取锁失败的时候,胜利进入 AQS 队列,接下来咱们持续看 acquireQueued 又做了什么呢?

  • 如果是队列头节点,会再次尝试获取锁
  • 如果批改 java.util.concurrent.locks.AbstractQueuedSynchronizer.Node 状态位

    final boolean acquireQueued(final Node node, int arg) {
      boolean failed = true;
      try {
          // 是否中断
          boolean interrupted = false;
          for (;;) {
              // 获取 node 的前驱节点
              final Node p = node.predecessor();
              // 如果是头节点,再次尝试获取锁
              if (p == head && tryAcquire(arg)) {
                  // 将 node 设置为 头节点
                  setHead(node);
                  p.next = null; // help GC
                  failed = false;
                  return interrupted;
              }
              // 判断是否须要进行阻塞以后线程
              if (shouldParkAfterFailedAcquire(p, node) &&
                  // 阻塞线程
                  parkAndCheckInterrupt())
                  interrupted = true;
          }
      } finally {
          // 是否失败
          if (failed)
              // 如果失败,勾销获取锁
              cancelAcquire(node);
      }
    }

    下面咱们能够看到,for (;;) 中有两个判断

  • 如果是头节点,就调用 tryAcquire 尝试获取锁(之前咱们曾经剖析过 tryAcquire 了,咱们次要看前面个 if )
  • 如果不是就进入 shouldParkAfterFailedAcquire 办法

在调用 acquireQueued 这个过程中可能调用屡次 shouldParkAfterFailedAcquire 办法。shouldParkAfterFailedAcquire 会执行一下几个操作。

  • 能够用来批改以后节点的状态,
  • 和对链表上有效的节点出队

    /** 
     * 当获取锁失败后, 查看更新新节点状态如果是须要阻塞返回, true
     * <p>
     * 一个前继节点 waitStatus = 0, 第一次将持续设置为 SIGNAL, 通知以后线程筹备进入阻塞, 此时仍旧获取不到, 以后线程进入阻塞
     *
     * @param pred 前继节点
     * @param node 以后节点
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
      int ws = pred.waitStatus; // 前继节点的状态, 第一次进入的话, 肯定是 0
      if (ws == Node.SIGNAL)
          return true;
      if (ws > 0) {
          do {
              // 出队, 剔除有效的节点
              node.prev = pred = pred.prev;
          } while (pred.waitStatus > 0);
          pred.next = node;
      } else {
          // 第一次进来, pred.waitStatus = 0 执行这个分支
          // 将前继节点的状态批改为 SIGNAL, 示意 pred.next 节点须要被唤醒 (此时筹备进入阻塞, 然而还未被阻塞, 再次获取锁失败之后才会被阻塞)
          compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
      }
      return false;
    }

    当 Node 被批改 Node.SIGNAL 状态后,第一个 if 返回 true , 咱们再次回到 acquireQueued 办法,就会执行 parkAndCheckInterrupt 办法,就是将以后的线程 park 而后返回以后线程的中断状态。

    private final boolean parkAndCheckInterrupt() {
      // 阻塞线程
      LockSupport.park(this);
      // 返回线程中断状态
      return Thread.interrupted();}

    留神:这里线程 park 过后,其实获取锁就完结了前半段的操作,实现同步队列的入队,并且进入期待。咱们就须要期待解锁唤醒。

开释锁

开释锁的代码如下:

lock.unlock();

开释锁做了什么呢?

  • 开释以后锁的状态
  • 在 AQS 队列中去唤醒排队的头节点

调用栈如下:java.util.concurrent.locks.ReentrantLock#unlock

  • java.util.concurrent.locks.AbstractQueuedSynchronizer#release

咱们能够从 release 办法开始

// 解锁
public final boolean release(int arg) {if (tryRelease(arg)) {
        Node h = head;
        // 判断是否有须要唤醒的线程
        if (h != null && h.waitStatus != 0) //waitStatus 的值为 0, 只有当后继存在节点才会被设置为该值不为 0, 此时须要唤醒后继线程
            unparkSuccessor(h);
        return true;
    }
    return false;
}

开释锁,次要是调用 tryRelease, 首先就是思考之前的重入问题,间接对 state 进行 -1 , 而后如果 c == 0 示意以后线程不再持有锁,咱们就能够批改 ownerThread == null . 这个时候,最初批改 state 为新值。

// tryRelease 
protected final boolean tryRelease(int releases) {int c = getState() - releases;
    // 判断是否是以后线程持有锁
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        // 如果 state == 0 示意以后线程不在占有该锁
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

开释锁胜利后,再次回到 release 办法,会再次判断,如果 AQS 队列不为空,那么就进行排队线程唤醒。次要是调用 java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor

// 唤醒队列中的线程
private void unparkSuccessor(Node node) {
    // 将以后节点状态批改为 0  
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 反向查找
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 唤醒队列中的节点
        LockSupport.unpark(s.thread);
}

其实这里最要害的就是 LockSupport.unpark(s.thread); 这里就会回到 acquireQueued,执行唤醒后强锁的逻辑,仍然在 acquireQueued 外面。

开释锁后唤醒期待节点

以后节点被唤醒逻辑,首先会在 shouldParkAfterFailedAcquire 办法中出队,而后尝试加锁如果加锁胜利就返回 true.

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
        * This node has already set status asking a release
        * to signal it, so it can safely park.
        */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        // 出队的逻辑
        do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

再次竞争锁,次要是在 acquireQueued 办法中调用 tryAcquire 办法进行获取锁。如果获取锁失败,就又再次获取锁,如果获取锁胜利返回。

测试和实际

反对锁中断

如果通过 lock_.lockInterruptibly(); 形式加锁,如果以后线程呈现中断过后,会抛出 _java.lang.InterruptedException 线程中断异样,所以 ReentrantLock 反对可中断。相干源码:

/**
 * Convenience method to park and then check if interrupted
 *
 * @return {@code true} if interrupted
 */
private final boolean parkAndCheckInterrupt() {
    // LockSupport.park 会革除中断信号
    LockSupport.park(this);
    return Thread.interrupted();}


// 
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {for (;;) {final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 抛出中断异样
                throw new InterruptedException();}
    } finally {if (failed)
            cancelAcquire(node);
    }
}

试验代码:

public class ReentrantLockTest {static ReentrantLock lock = new ReentrantLock(true);

    static class ClientThread implements Runnable {

        @SneakyThrows
        @Override
        public void run() {System.out.println(Thread.currentThread() + "开始尝试获取锁");
            lock.lockInterruptibly();
            try {System.out.println(Thread.currentThread() + "胜利获取锁");
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {e.printStackTrace();
            } finally {lock.unlock();
                System.out.println(Thread.currentThread() + "实现开释锁");
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {Thread t1 = new Thread(new ClientThread(), "t1");
        Thread t2 = new Thread(new ClientThread(), "t2");
        Thread t3 = new Thread(new ClientThread(), "t3");
        t1.start();
        t2.start();
        // 锁中断
        //lock.lockInterruptibly();
        TimeUnit.SECONDS.sleep(1);
        t3.start();
        TimeUnit.SECONDS.sleep(1);
        t3.interrupt();



        TimeUnit.SECONDS.sleep(10);
    }
}

输入后果:

Thread[t1,5,main] 开始尝试获取锁
Thread[t2,5,main] 开始尝试获取锁
Thread[t1,5,main] 胜利获取锁
Thread[t3,5,main] 开始尝试获取锁
Exception in thread "t3" java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
    at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
    at io.zhengsh.juc._1lock.reentrantlock.ReentrantLockTest$ClientThread.run(ReentrantLockTest.java:18)
    at java.lang.Thread.run(Thread.java:748)
Thread[t1,5,main] 实现开释锁
Thread[t2,5,main] 胜利获取锁
Thread[t2,5,main] 实现开释锁 

获取锁设置超时

lock.tryLock(2, TimeUnit.SECONDS) 能够反对设置获取锁的超时工夫,能够无效的防止线程饥饿问题 测试代码:

public class ReentrantLockTryTest {static ReentrantLock lock = new ReentrantLock(true);

    static class ClientThread implements Runnable {

        @Override
        public void run() {System.out.println(Thread.currentThread() + "\t" + (System.currentTimeMillis() / 1000) + "\t 开始尝试获取锁");
            try {if (lock.tryLock(2, TimeUnit.SECONDS)) {System.out.println(Thread.currentThread() + "\t" + (System.currentTimeMillis() / 1000) + "\t 获取锁胜利");
                    TimeUnit.SECONDS.sleep(5);
                } else {System.out.println(Thread.currentThread() + "\t" + (System.currentTimeMillis() / 1000) + "\t 获取锁失败");
                }
            } catch (InterruptedException e) {e.printStackTrace();
            } finally {if (lock.isHeldByCurrentThread() && lock.isLocked()) {lock.unlock();
                    System.out.println(Thread.currentThread() + "\t" + (System.currentTimeMillis() / 1000) + "\t 实现开释锁");
                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {Thread t1 = new Thread(new ClientThread(), "t1");
        Thread t2 = new Thread(new ClientThread(), "t2");
        Thread t3 = new Thread(new ClientThread(), "t3");
        t1.start();
        t2.start();
        t3.start();
        //t1.interrupt();

        TimeUnit.SECONDS.sleep(20);
    }
}

输入后果

Thread[t1,5,main]    1653540581    开始尝试获取锁
Thread[t3,5,main]    1653540581    开始尝试获取锁
Thread[t2,5,main]    1653540581    开始尝试获取锁
Thread[t1,5,main]    1653540581    获取锁胜利
Thread[t3,5,main]    1653540583    获取锁失败
Thread[t2,5,main]    1653540583    获取锁失败
Thread[t1,5,main]    1653540586    实现开释锁 

条件期待队列应用

Condition 是在 java 1.5 中才呈现的,它用来代替传统的 Object 的 wait()、notify() 实现线程间的合作,相比应用 Object 的 wait()、notify(),应用 Condition 的 await()、signal() 这种形式实现线程间合作更加平安和高效。因而通常来说比拟举荐应用 Condition,阻塞队列实际上是应用了 Condition 来模仿线程间合作。Condition 是个接口,根本的办法就是 await() 和 signal() 办法;Condition 依赖于 Lock 接口,生成一个 Condition 的根本代码是 lock.newCondition() 调用 Condition 的 await() 和 signal() 办法,都必须在 lock 爱护之内,就是说必须在 lock.lock() 和 lock.unlock 之间才能够应用:

  • Conditon 中的 await() 对应 Object 的 wait();
  • Condition 中的 signal() 对应 Object 的 notify();
  • Condition 中的 signalAll() 对应 Object 的 notifyAll()。

测试场景:上面一个场景,须要 ABC3 个线程,A 线程打印 1 次,而后是 B 线程打印 2 次,再是 C 线程打印 3 次,线程交替打印。ABC 线程须要交替执行,咱们须要管制,线程的执行先后顺序 咱们能够应用多条件 Condition 来管制,每一个线程领有一个 condition 对象,调用各种的 await 办法,能够使线程期待,而后让别的线程调用这个 condition 对象的 signal 办法,唤醒线程。代码如下:

public class ReentrantLockConditionTest {

    private int data = 1;
    private Lock lock = new ReentrantLock();
    Condition condition1 = lock.newCondition();
    Condition condition2 = lock.newCondition();
    Condition condition3 = lock.newCondition();


    public void printA() {lock.lock();
        try {while (data != 1) {condition1.await();
            }
            // 打印 5 次
            for (int i = 0; i < 5; i++) {System.out.println(Thread.currentThread().getName() + "->" + data);
            }
            data = 2;
            // 告诉 B 线程
            condition2.signal();} catch (Exception e) {e.printStackTrace();
        } finally {lock.unlock();
        }
    }

    public void printB() {lock.lock();
        try {while (data != 2) {condition2.await();
            }
            // 打印 10 次
            for (int i = 0; i < 10; i++) {System.out.println(Thread.currentThread().getName() + "->" + data);
            }
            data = 3;
            // 告诉 C
            condition3.signal();} catch (Exception e) {e.printStackTrace();
        } finally {lock.unlock();
        }
    }

    public void printC() {lock.lock();
        try {while (data != 3) {condition3.await();
            }
            // 打印 15 次
            for (int i = 0; i < 15; i++) {System.out.println(Thread.currentThread().getName() + "->" + data);
            }
            data = 1;
            // 告诉 A
            condition1.signal();} catch (Exception e) {e.printStackTrace();
        } finally {lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {ReentrantLockConditionTest conditionTest = new ReentrantLockConditionTest();
        // A,B,C 交替执行
        new Thread(conditionTest::printA, "A").start();
        new Thread(conditionTest::printB, "B").start();
        new Thread(conditionTest::printC, "C").start();}

}

输入后果如下:

A ->1
B ->2
B ->2
C ->3
C ->3
C ->3

正文完
 0