上一节,你应该学到了ReentrantLock底层基于AQS的3个小组件 state、owner、queue。并且理解了下一个线程1进行加锁批改owner和state的过程。还记得么?加锁胜利后,如下图所示的状态:
首次加锁的时候,只应用到了owner和state这两个小组件,并没有波及到期待队列。所以这一节,咱们持续看一下,如果有下一个线程—线程2,这个哥们过去加锁会是如何的?
间接从JDK源码层面了解AQS的另一个线程也来加锁的入队逻辑
<div class="output_wrapper" id="output_wrapper_id" style="width:fit-content;font-size: 16px; color: rgb(62, 62, 62); line-height: 1.6; word-spacing: 0px; letter-spacing: 0px; font-family: 'Helvetica Neue', Helvetica, 'Hiragino Sans GB', 'Microsoft YaHei', Arial, sans-serif;"><h3 id="hdddd" style="width:fit-content;line-height: inherit; margin: 1.5em 0px; font-weight: bold; font-size: 1.3em; margin-bottom: 2em; margin-right: 5px; padding: 8px 15px; letter-spacing: 2px; background-image: linear-gradient(to right bottom, rgb(43,48,70), rgb(43,48,70)); background-color: rgb(63, 81, 181); color: rgb(255, 255, 255); border-left: 10px solid rgb(255,204,0); border-radius: 5px; text-shadow: rgb(102, 102, 102) 1px 1px 1px; box-shadow: rgb(102, 102, 102) 1px 1px 2px;"><span style="font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;">间接从JDK源码层面了解AQS的另一个线程也来加锁的入队逻辑</span></h3></div>
当线程2这个哥们进行加锁的时候,假如线程1还没有开释锁,也就是基于下面的图的状态,线程2进行加锁。同样会走到如下lock办法的代码:
static final class NonfairSync extends Sync { final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
如果线程2进行lock,当执行compareAndSetState(0,1)的时候,因为state此时曾经是1了,必定会CAS操作失败,计入else逻辑,在NonFairSync的父类AQS中能够找到如下代码:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }// 接着又会调用NonFairSync实现的tryAcquire: protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
这个下面的tryAcquire办法理论调用了一个nonfairTryAcquire,从名字上看,叫做非偏心获取的一个办法。(前面讲非偏心锁的会讲到)。
然而当你看过这个办法的脉络你会发现,state是1,第一个if不满足,owner是线程1,以后是线程2,第二个if也不满足,后果间接返回了false。
所以到这里你会发现线程2加锁,截止到当初,会执行到如下图所示步骤3所示:
接着因为tryAcquire返回false,会进入&&前面的办法调用addWaiter(Node.EXCLUSIVE)。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}
addWaiter从名字上,你能够连蒙带猜下,其实这个办法的意思就是增加到期待队列的进行期待的意思。让咱们来看下:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
首先传入的是一个Node mode,就是Node.EXCLUSIVE,从名字上看就是一个独占Node的意思。
你能够这个Node.EXCLUSIVE看下:
static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3;}
果然,你能够看到Node中有一堆动态变量,通过null,空Node、1、1、-2、-3示意一些Node的角色类型。
接着往下看addWaitder办法:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); //省略前面代码 return node; }
这个new Node又做了什么?能够看下Node的构造方法和成员变量:
volatile Node prev;volatile Node next;volatile int waitStatus;volatile Thread thread;Node nextWaiter;Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread;}
除了next、prev、thread示意双向链表的前后指针和对应的数据元素之外,还有两个变量nextWaiter和waitStatus。能够从名字上猜出来,示意期待节点和期待状态的意思。
这里传入了thread=线程2,mode= EXCLUSIVE = null 。其实nextWaiter这里更像是个标记,示意独占类型的Node。或者说是线程2正在期待的是一个独占锁。创立的node如下图所示:
接着addwaiter创立实现节点node后,继续执行代码pred指针指向tail,然而默认tail是null,所以间接调用enq(node)办法,看样子是要进行入队。enqueue的意思。 代码如下:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
执行到这里就会失去如下后果:
AQS的实质:为啥叫做异步队列同步器?
<div class="output_wrapper" id="output_wrapper_id" style="width:fit-content;font-size: 16px; color: rgb(62, 62, 62); line-height: 1.6; word-spacing: 0px; letter-spacing: 0px; font-family: 'Helvetica Neue', Helvetica, 'Hiragino Sans GB', 'Microsoft YaHei', Arial, sans-serif;"><h3 id="hdddd" style="width:fit-content;line-height: inherit; margin: 1.5em 0px; font-weight: bold; font-size: 1.3em; margin-bottom: 2em; margin-right: 5px; padding: 8px 15px; letter-spacing: 2px; background-image: linear-gradient(to right bottom, rgb(43,48,70), rgb(43,48,70)); background-color: rgb(63, 81, 181); color: rgb(255, 255, 255); border-left: 10px solid rgb(255,204,0); border-radius: 5px; text-shadow: rgb(102, 102, 102) 1px 1px 1px; box-shadow: rgb(102, 102, 102) 1px 1px 2px;"><span style="font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;">AQS的实质:为啥叫做异步队列同步器?</span></h3></div>
接着咱们须要剖析下enq(node)这个入队办法了:
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
从脉络上看,是一个经典for循环+CAS 自旋操作。你能够跟着看下代码执行的思路:
1)第一次for循环
首先t指向tail,tail因为是null,t刚开始必定是null,进入第一个if。
接着通过CAS操作compareAndSetHead,将head指向了新建的一个Node,胜利后将tail指向了head。
private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update);}
所以会失去如下图所示后果:
2)第二次for循环
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
此时ReentrantLock的tail和head曾经指向了空的new Node()。
接着还是t=tail, t此时不为空了。走到了else逻辑,应用入参node节点的prev指向了t所指向的空Node。
之后通过CAS操作compareAndSetTail,将tail指向到入参node节点。
private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); }
最初通过t的next也指向了入参node节点。
也就是如下图所示:
从上图,咱们就可以看进去,线程2的node和空node连接起来,造成了一个双向链表。之前学习LinkedList你应该曾经晓得,双向链表也能够当做队列应用。所以这里你能够当做将node进行了入队操作。
这个其实就是AQS的实质,期待队列组件的作用。
当线程2进行了入队期待,这里你能够简化一下流程图,你能够失去如下的图:
加锁失败的时候如何借助AQS异步入队阻塞期待?
<div class="output_wrapper" id="output_wrapper_id" style="width:fit-content;font-size: 16px; color: rgb(62, 62, 62); line-height: 1.6; word-spacing: 0px; letter-spacing: 0px; font-family: 'Helvetica Neue', Helvetica, 'Hiragino Sans GB', 'Microsoft YaHei', Arial, sans-serif;"><h3 id="hdddd" style="width:fit-content;line-height: inherit; margin: 1.5em 0px; font-weight: bold; font-size: 1.3em; margin-bottom: 2em; margin-right: 5px; padding: 8px 15px; letter-spacing: 2px; background-image: linear-gradient(to right bottom, rgb(43,48,70), rgb(43,48,70)); background-color: rgb(63, 81, 181); color: rgb(255, 255, 255); border-left: 10px solid rgb(255,204,0); border-radius: 5px; text-shadow: rgb(102, 102, 102) 1px 1px 1px; box-shadow: rgb(102, 102, 102) 1px 1px 2px;"><span style="font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;">加锁失败的时候如何借助AQS异步入队阻塞期待?</span></h3></div>
入队后,接着就完结了么?不是,还须要批改下线程2的状态,将他进行挂起,既然曾经排上队了,就不要占用CPU资源了,是不是?
咱们看下是如何做的:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
之前咱们执行完了addWaiter,返回的节点是node,也就是线程2对应的期待节点,arg是1。接着进入了acquireQueued这个办法:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
目前期待队列状况如下:
这个办法外围脉络,是一个有限for循环,当中有两个if。
接着咱们看下细节:
1)第一次For循环:
首先上来应用一个辅助指针p,指向了node节点的前一个节点,node.predecessor其实就是p=node.prev。代码如下:
final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; }
因为head等于p,就还是尝试获取一次锁,tryAcquire(arg)。这里假如线程1还没有开释锁,tryAcquire(arg)必定还是会失败返回false,所以第一个if不成立。(如果获取胜利,这个if其实会将线程2移出队列的)
接着执行第二个if判断,先进行了shouldParkAfterFailedAcquire办法调用,第一个参数传入p,就是空Node,第二参数传入node,就是线程2对应的node。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
第一个参数传入p,就是空Node,第二参数传入node,就是线程2对应的node。
它们两个节点的waitStatus都是0。所以通过下面代码,会执行到最初一个else。
会通过CAS操作,将空Node的waitStatus状态(ws)从0改为Node.SIGNAL(-1)。如下图所示:
接着shouldParkAfterFailedAcquire就间接返回false,第一个条件false。就会间接进行下一次for循环了。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
2)第二次For循环
假如线程1还是没有开释锁,下面的for循环还是会进入如下办法,然而其实的pred也就是空Node的watiStatus曾经被改成SIGNAL(-1),所以之里会返回true。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
接着上面这个if第一个条件是ture会判断第二条件parkAndCheckInterrupt
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true;
parkAndCheckInterrupt这个办法从名字看叫做挂起并且查看线程是否被打断。代码如下
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
能够看到他外围调用了一个工具类LockSupport.park(this);
public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, 0L); setBlocker(t, null); }
这个底层是通过UNSAFE的C++代码实现的,咱们就不去看了。你只有晓得,这个park操作会将线程挂起,进入期待状态就能够了。还记得之前将线程的状态图么?
park操作会将线程挂起,进入Waiting期待状态。也就是说线程2加锁失败最终就是入队并且期待。
明天这一节,到这里就把AQS中入队的逻辑给大家讲清楚了。线程获取锁失败如何入队?如何挂起的?置信你都很分明了。你能够本人用第三个线程尝试加锁失败彻底图解AQS队列期待机制试试。最初学完,如果你能够画出这个图,就说嘛你真正明确了AQS的基本原理了。
小结&思考
<div class="output_wrapper" id="output_wrapper_id" style="width:fit-content;font-size: 16px; color: rgb(62, 62, 62); line-height: 1.6; word-spacing: 0px; letter-spacing: 0px; font-family: 'Helvetica Neue', Helvetica, 'Hiragino Sans GB', 'Microsoft YaHei', Arial, sans-serif;"><h3 id="hdddd" style="width:fit-content;line-height: inherit; margin: 1.5em 0px; font-weight: bold; font-size: 1.3em; margin-bottom: 2em; margin-right: 5px; padding: 8px 15px; letter-spacing: 2px; background-image: linear-gradient(to right bottom, rgb(43,48,70), rgb(43,48,70)); background-color: rgb(63, 81, 181); color: rgb(255, 255, 255); border-left: 10px solid rgb(255,204,0); border-radius: 5px; text-shadow: rgb(102, 102, 102) 1px 1px 1px; box-shadow: rgb(102, 102, 102) 1px 1px 2px;"><span style="font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;">小结&思考</span></h3></div>
尽管这个入队逻辑看着比较复杂,但其实大家能够形象出这个队列的设计是基于:CAS操作+Node状态+线程标记管制就能够了。
能够多思考下要害思维和关键点,不必太纠结细节。比方多思考下为啥设计了状态,是为了独自应用Condition吗?还是。。。。
这些思考才是最重要的!
下一节,咱们看下如果线程1开释了锁,如何唤醒队列中元素的。唤醒的时候如果有本地线程来加锁,还能插队!?所以下一节也会给大家介绍下什么是偏心和非偏心锁。
本文由博客一文多发平台 OpenWrite 公布!