关于java:JDK成长记19ReenranctLock2加锁入队的AQS底层原理

46次阅读

共计 11611 个字符,预计需要花费 30 分钟才能阅读完成。

上一节,你应该学到了 ReentrantLock 底层基于 AQS 的 3 个小组件 state、owner、queue。并且理解了下一个线程 1 进行加锁批改 owner 和 state 的过程。还记得么?加锁胜利后,如下图所示的状态:

首次加锁的时候,只应用到了 owner 和 state 这两个小组件,并没有波及到期待队列。所以这一节,咱们持续看一下,如果有下一个线程—线程 2,这个哥们过去加锁会是如何的?

间接从 JDK 源码层面了解 AQS 的另一个线程也来加锁的入队逻辑

<div class=”output_wrapper” id=”output_wrapper_id” style=”width:fit-content;font-size: 16px; color: rgb(62, 62, 62); line-height: 1.6; word-spacing: 0px; letter-spacing: 0px; font-family: ‘Helvetica Neue’, Helvetica, ‘Hiragino Sans GB’, ‘Microsoft YaHei’, Arial, sans-serif;”><h3 id=”hdddd” style=”width:fit-content;line-height: inherit; margin: 1.5em 0px; font-weight: bold; font-size: 1.3em; margin-bottom: 2em; margin-right: 5px; padding: 8px 15px; letter-spacing: 2px; background-image: linear-gradient(to right bottom, rgb(43,48,70), rgb(43,48,70)); background-color: rgb(63, 81, 181); color: rgb(255, 255, 255); border-left: 10px solid rgb(255,204,0); border-radius: 5px; text-shadow: rgb(102, 102, 102) 1px 1px 1px; box-shadow: rgb(102, 102, 102) 1px 1px 2px;”><span style=”font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;”> 间接从 JDK 源码层面了解 AQS 的另一个线程也来加锁的入队逻辑 </span></h3></div>

当线程 2 这个哥们进行加锁的时候,假如线程 1 还没有开释锁,也就是基于下面的图的状态,线程 2 进行加锁。同样会走到如下 lock 办法的代码:

 static final class NonfairSync extends Sync {final void lock() {if (compareAndSetState(0, 1))
       setExclusiveOwnerThread(Thread.currentThread());
     else
       acquire(1);
   }


   protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
   }
  }

如果线程 2 进行 lock,当执行 compareAndSetState(0,1)的时候,因为 state 此时曾经是 1 了,必定会 CAS 操作失败,计入 else 逻辑,在 NonFairSync 的父类 AQS 中能够找到如下代码:

public final void acquire(int arg) {if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();}



// 接着又会调用 NonFairSync 实现的 tryAcquire:protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
  }



final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
        return true;
      }
    }

    else if (current == getExclusiveOwnerThread()) {
      int nextc = c + acquires;
      if (nextc < 0) // overflow
        throw new Error("Maximum lock count exceeded");
      setState(nextc);
      return true;
    }
    return false;
  }

这个下面的 tryAcquire 办法理论调用了一个 nonfairTryAcquire,从名字上看,叫做非偏心获取的一个办法。(前面讲非偏心锁的会讲到)。

然而当你看过这个办法的脉络你会发现,state 是 1,第一个 if 不满足,owner 是线程 1,以后是线程 2,第二个 if 也不满足,后果间接返回了 false。

所以到这里你会发现线程 2 加锁,截止到当初,会执行到如下图所示步骤 3 所示:

接着因为 tryAcquire 返回 false,会进入 && 前面的办法调用 addWaiter(Node.EXCLUSIVE)。

public final void acquire(int arg) {if (!tryAcquire(arg) &&
   acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
   selfInterrupt();}

addWaiter 从名字上,你能够连蒙带猜下,其实这个办法的意思就是增加到期待队列的进行期待的意思。让咱们来看下:

  private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);
   // Try the fast path of enq; backup to full enq on failure
   Node pred = tail;
   if (pred != null) {
     node.prev = pred;
     if (compareAndSetTail(pred, node)) {
       pred.next = node;
       return node;
     }
   }

   enq(node);
   return node;
  }

首先传入的是一个 Node mode,就是Node.EXCLUSIVE, 从名字上看就是一个独占 Node 的意思。

你能够这个 Node.EXCLUSIVE 看下:

static final class Node {
  /** Marker to indicate a node is waiting in shared mode */
  static final Node SHARED = new Node();
  /** Marker to indicate a node is waiting in exclusive mode */
  static final Node EXCLUSIVE = null;


  /** waitStatus value to indicate thread has cancelled */
  static final int CANCELLED = 1;

  /** waitStatus value to indicate successor's thread needs unparking */
  static final int SIGNAL  = -1;

  /** waitStatus value to indicate thread is waiting on condition */
  static final int CONDITION = -2;

  /**
   * waitStatus value to indicate the next acquireShared should
   * unconditionally propagate
   */
  static final int PROPAGATE = -3;

}

果然,你能够看到 Node 中有一堆动态变量,通过 null,空 Node、1、1、-2、- 3 示意一些 Node 的角色类型。

接着往下看 addWaitder 办法:

  private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);
     // 省略前面代码
   return node;

  }

这个 new Node 又做了什么?能够看下 Node 的构造方法和成员变量:

volatile Node prev;

volatile Node next;

volatile int waitStatus;

volatile Thread thread;

Node nextWaiter;



Node(Thread thread, Node mode) {   // Used by addWaiter
 this.nextWaiter = mode;
 this.thread = thread;

}

除了 next、prev、thread 示意双向链表的前后指针和对应的数据元素之外,还有两个变量 nextWaiter 和 waitStatus。能够从名字上猜出来,示意期待节点和期待状态的意思。

这里传入了 thread= 线程 2,mode= EXCLUSIVE = null。其实 nextWaiter 这里更像是个标记,示意独占类型的 Node。或者说是线程 2 正在期待的是一个独占锁。创立的 node 如下图所示:

接着 addwaiter 创立实现节点 node 后,继续执行代码 pred 指针指向 tail,然而默认 tail 是 null,所以间接调用 enq(node)办法,看样子是要进行入队。enqueue 的意思。代码如下:

 private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);
  // Try the fast path of enq; backup to full enq on failure
  Node pred = tail;
  if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
      pred.next = node;
      return node;
    }
  }

  enq(node);
  return node;

 }

执行到这里就会失去如下后果:

AQS 的实质:为啥叫做异步队列同步器?

<div class=”output_wrapper” id=”output_wrapper_id” style=”width:fit-content;font-size: 16px; color: rgb(62, 62, 62); line-height: 1.6; word-spacing: 0px; letter-spacing: 0px; font-family: ‘Helvetica Neue’, Helvetica, ‘Hiragino Sans GB’, ‘Microsoft YaHei’, Arial, sans-serif;”><h3 id=”hdddd” style=”width:fit-content;line-height: inherit; margin: 1.5em 0px; font-weight: bold; font-size: 1.3em; margin-bottom: 2em; margin-right: 5px; padding: 8px 15px; letter-spacing: 2px; background-image: linear-gradient(to right bottom, rgb(43,48,70), rgb(43,48,70)); background-color: rgb(63, 81, 181); color: rgb(255, 255, 255); border-left: 10px solid rgb(255,204,0); border-radius: 5px; text-shadow: rgb(102, 102, 102) 1px 1px 1px; box-shadow: rgb(102, 102, 102) 1px 1px 2px;”><span style=”font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;”>AQS 的实质:为啥叫做异步队列同步器?</span></h3></div>

接着咱们须要剖析下 enq(node)这个入队办法了:

private Node enq(final Node node) {for (;;) {
    Node t = tail;
    if (t == null) { // Must initialize
      if (compareAndSetHead(new Node()))
       tail = head;
    } else {
      node.prev = t;
      if (compareAndSetTail(t, node)) {
        t.next = node;
        return t;
      }
    }
  }
 }

从脉络上看,是一个经典 for 循环 +CAS 自旋操作。你能够跟着看下代码执行的思路:

1)第一次 for 循环

首先 t 指向 tail,tail 因为是 null,t 刚开始必定是 null,进入第一个 if。

接着通过 CAS 操作compareAndSetHead,将 head 指向了新建的一个 Node,胜利后将 tail 指向了 head。

private final boolean compareAndSetHead(Node update) {return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

所以会失去如下图所示后果:

2)第二次 for 循环

  private Node enq(final Node node) {for (;;) {
     Node t = tail;
     if (t == null) { // Must initialize
       if (compareAndSetHead(new Node()))
         tail = head;
     } else {
       node.prev = t;
       if (compareAndSetTail(t, node)) {
         t.next = node;
         return t;
       }
     }
   }
  }

此时 ReentrantLock 的 tail 和 head 曾经指向了空的 new Node()。

接着还是 t =tail, t 此时不为空了。走到了 else 逻辑,应用入参 node 节点的 prev 指向了 t 所指向的空 Node。

之后通过CAS 操作 compareAndSetTail, 将 tail 指向到入参 node 节点。

  private final boolean compareAndSetTail(Node expect, Node update) {return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
  }

最初通过 t 的 next 也指向了入参 node 节点。

也就是如下图所示:

从上图,咱们就可以看进去,线程 2 的 node 和空 node 连接起来,造成了一个双向链表。之前学习 LinkedList 你应该曾经晓得,双向链表也能够当做队列应用。所以这里你能够当做将 node 进行了入队操作。

这个其实就是 AQS 的实质,期待队列组件的作用。

当线程 2 进行了入队期待,这里你能够简化一下流程图,你能够失去如下的图:

加锁失败的时候如何借助 AQS 异步入队阻塞期待?

<div class=”output_wrapper” id=”output_wrapper_id” style=”width:fit-content;font-size: 16px; color: rgb(62, 62, 62); line-height: 1.6; word-spacing: 0px; letter-spacing: 0px; font-family: ‘Helvetica Neue’, Helvetica, ‘Hiragino Sans GB’, ‘Microsoft YaHei’, Arial, sans-serif;”><h3 id=”hdddd” style=”width:fit-content;line-height: inherit; margin: 1.5em 0px; font-weight: bold; font-size: 1.3em; margin-bottom: 2em; margin-right: 5px; padding: 8px 15px; letter-spacing: 2px; background-image: linear-gradient(to right bottom, rgb(43,48,70), rgb(43,48,70)); background-color: rgb(63, 81, 181); color: rgb(255, 255, 255); border-left: 10px solid rgb(255,204,0); border-radius: 5px; text-shadow: rgb(102, 102, 102) 1px 1px 1px; box-shadow: rgb(102, 102, 102) 1px 1px 2px;”><span style=”font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;”> 加锁失败的时候如何借助 AQS 异步入队阻塞期待?</span></h3></div>

入队后,接着就完结了么?不是,还须要批改下线程 2 的状态,将他进行挂起,既然曾经排上队了,就不要占用 CPU 资源了,是不是?

咱们看下是如何做的:

  public final void acquire(int arg) {if (!tryAcquire(arg) &&
     acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
     selfInterrupt();}

之前咱们执行完了 addWaiter, 返回的节点是 node,也就是线程 2 对应的期待节点,arg 是 1。接着进入了 acquireQueued 这个办法:

final boolean acquireQueued(final Node node, int arg) {
   boolean failed = true;
   try {
     boolean interrupted = false;
     for (;;) {final Node p = node.predecessor();
       if (p == head && tryAcquire(arg)) {setHead(node);
         p.next = null; // help GC
         failed = false;
         return interrupted;
       }

       if (shouldParkAfterFailedAcquire(p, node) &&
         parkAndCheckInterrupt())
         interrupted = true;
     }
   } finally {if (failed)
       cancelAcquire(node);
   }
  }

目前期待队列状况如下:

这个办法外围脉络,是一个有限 for 循环,当中有两个 if。

接着咱们看下细节:

1)第一次 For 循环:

首先上来应用一个辅助指针 p,指向了 node 节点的前一个节点,node.predecessor 其实就是 p =node.prev。代码如下:

final Node predecessor() throws NullPointerException {
     Node p = prev;
     if (p == null)
       throw new NullPointerException();
     else
       return p;
   }

因为 head 等于 p, 就还是尝试获取一次锁,tryAcquire(arg)。这里假如线程 1 还没有开释锁,tryAcquire(arg)必定还是会失败返回 false,所以第一个 if 不成立。(如果获取胜利,这个 if 其实会将线程 2 移出队列的)

接着执行第二个 if 判断,先进行了 shouldParkAfterFailedAcquire 办法调用,第一个参数传入 p,就是空 Node, 第二参数传入 node,就是线程 2 对应的 node。

  private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
   int ws = pred.waitStatus;
   if (ws == Node.SIGNAL)
     /*
      * This node has already set status asking a release
      * to signal it, so it can safely park.
      */
     return true;

   if (ws > 0) {
     /*
        * Predecessor was cancelled. Skip over predecessors and
      * indicate retry.
      */
     do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
     pred.next = node;
   } else {
     /*
      * waitStatus must be 0 or PROPAGATE. Indicate that we
      * need a signal, but don't park yet. Caller will need to
      * retry to make sure it cannot acquire before parking.
      */
     compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
   }
   return false;
  }

第一个参数传入 p,就是空 Node, 第二参数传入 node,就是线程 2 对应的 node。

它们两个节点的 waitStatus 都是 0。所以通过下面代码,会执行到最初一个 else。

会通过 CAS 操作,将空 Node 的 waitStatus 状态 (ws) 从 0 改为 Node.SIGNAL(-1)。如下图所示:

接着 shouldParkAfterFailedAcquire 就间接返回 false,第一个条件 false。就会间接进行下一次 for 循环了。

 final boolean acquireQueued(final Node node, int arg) {
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {setHead(node);
        p.next = null; // help GC
        failed = false;
        return interrupted;
      }

      if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
        interrupted = true;
    }
  } finally {if (failed)
      cancelAcquire(node);
  }
 }

2)第二次 For 循环

假如线程 1 还是没有开释锁,下面的 for 循环还是会进入如下办法,然而其实的 pred 也就是空 Node 的 watiStatus 曾经被改成 SIGNAL(-1), 所以之里会返回 true。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
   int ws = pred.waitStatus;
   if (ws == Node.SIGNAL)
     /*
      * This node has already set status asking a release
      * to signal it, so it can safely park.
      */
     return true;

   if (ws > 0) {
     /*
        * Predecessor was cancelled. Skip over predecessors and
      * indicate retry.
      */
     do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
       pred.next = node;
     } else {

     /*
      * waitStatus must be 0 or PROPAGATE. Indicate that we
      * need a signal, but don't park yet. Caller will need to
      * retry to make sure it cannot acquire before parking.
      */
     compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
   }
   return false;

  }

接着上面这个 if 第一个条件是 ture 会判断第二条件 parkAndCheckInterrupt

  if (shouldParkAfterFailedAcquire(p, node) &&
         parkAndCheckInterrupt())
         interrupted = true;

parkAndCheckInterrupt 这个办法从名字看叫做挂起并且查看线程是否被打断。代码如下

  private final boolean parkAndCheckInterrupt() {LockSupport.park(this);
   return Thread.interrupted();}

能够看到他外围调用了一个工具类 LockSupport.park(this);

 public static void park(Object blocker) {Thread t = Thread.currentThread();
  setBlocker(t, blocker);
  UNSAFE.park(false, 0L);
  setBlocker(t, null);
 }

这个底层是通过 UNSAFE 的 C ++ 代码实现的,咱们就不去看了。你只有晓得,这个 park 操作会将线程挂起,进入期待状态就能够了。还记得之前将线程的状态图么?

park 操作会将线程挂起,进入 Waiting 期待状态。也就是说线程 2 加锁失败最终就是入队并且期待。

明天这一节,到这里就把 AQS 中入队的逻辑给大家讲清楚了。线程获取锁失败如何入队?如何挂起的?置信你都很分明了。你能够本人用第三个线程尝试加锁失败彻底图解 AQS 队列期待机制试试。最初学完,如果你能够画出这个图,就说嘛你真正明确了 AQS 的基本原理了。

小结 & 思考

<div class=”output_wrapper” id=”output_wrapper_id” style=”width:fit-content;font-size: 16px; color: rgb(62, 62, 62); line-height: 1.6; word-spacing: 0px; letter-spacing: 0px; font-family: ‘Helvetica Neue’, Helvetica, ‘Hiragino Sans GB’, ‘Microsoft YaHei’, Arial, sans-serif;”><h3 id=”hdddd” style=”width:fit-content;line-height: inherit; margin: 1.5em 0px; font-weight: bold; font-size: 1.3em; margin-bottom: 2em; margin-right: 5px; padding: 8px 15px; letter-spacing: 2px; background-image: linear-gradient(to right bottom, rgb(43,48,70), rgb(43,48,70)); background-color: rgb(63, 81, 181); color: rgb(255, 255, 255); border-left: 10px solid rgb(255,204,0); border-radius: 5px; text-shadow: rgb(102, 102, 102) 1px 1px 1px; box-shadow: rgb(102, 102, 102) 1px 1px 2px;”><span style=”font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;”> 小结 & 思考 </span></h3></div>

尽管这个入队逻辑看着比较复杂,但其实大家能够形象出这个队列的设计是基于:CAS 操作 +Node 状态 + 线程标记管制就能够了。

能够多思考下要害思维和关键点,不必太纠结细节。比方多思考下为啥设计了状态,是为了独自应用 Condition 吗?还是。。。。

这些思考才是最重要的!

下一节,咱们看下如果线程 1 开释了锁,如何唤醒队列中元素的。唤醒的时候如果有本地线程来加锁,还能插队!?所以下一节也会给大家介绍下什么是偏心和非偏心锁。

本文由博客一文多发平台 OpenWrite 公布!

正文完
 0