共计 4689 个字符,预计需要花费 12 分钟才能阅读完成。
应用样例
ThreadA
ThreadB
ThreadC
ThreadD - (插队专业户)
Lock lock = new ReentrantLock();
lock.lock();
// ThreadA 阻塞在这里
lock.newCondition().await();
// do business..
lock.unlock();
// ———— 无牵无挂的分割线 ————
lock.lock();
// ThreadB 进入这里,触发 signal
lock.newCondition().signal();
lock.unlock();
1、图解 await
条件:
await() 在 lock() 和 unlock() 之间执行——执行 await 的办法必然持有锁(owner 记录的线程)
步骤 A:
await 执行时会做三件事:
1. 革除 state 和 owner
2. 唤醒工作队列的 head.next
3. 退出并阻塞在期待队列中
步骤 B:
head.next 节点绑定的线程(图中为 ThreadB)被唤醒,试图抢占锁。抢占胜利从工作队列移除(当然 ThreadB 执行 await 时,也会反复步骤 A)
2、图解 signal
signal 只做一件事,将期待队列的头节点开释迁徙至工作队列的尾部(尾插,图中绿色线)
ThreadA 真正的开释要等 unlock() 办法
以下用代码具体解释下面的两幅图
一、构造阐明
Condition 构造
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
Node 构造
class Node {
/** 独占 */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/** 持有线程 */
volatile Thread thread;
/** 期待队列相干 */
volatile int waitStatus;
Node nextWaiter;
/** 工作队列相干 */
volatile Node prev;
volatile Node next;
二、await
public final void await() throws InterruptedException {if (Thread.interrupted())
throw new InterruptedException();
// == 1、创立 condition waiter 队列 (一个新的期待队列),node 是尾节点
Node node = addConditionWaiter();
// == 2、state 状态清零,解除阻塞状态 -> 此时其它线程能够抢占锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// == 3.1、不在工作队列中
// 这里的限度:只有在工作队列的 node,能力跳出循环,进入 3.2 逻辑
while (!isOnSyncQueue(node)) {LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// == 3.2、在工作队列中,尝试排队获取锁(未获取到锁则阻塞在工作队列中)if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
1、创立期待队列
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter
private Node addConditionWaiter() {
Node t = lastWaiter;
// 移除“勾销”状态的节点
if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();
t = lastWaiter;
}
// 创立 node 节点,waitStatus 标记为 condition->`-2`
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
2. 开释 lock
java.util.concurrent.locks.AbstractQueuedSynchronizer#fullyRelease
final int fullyRelease(Node node) {
boolean failed = true;
try {int savedState = getState();
// 还原 state,exclusiveOwnerThread 清空;工作队列中本来被阻塞的第一个线程(ThreadB)开释
if (release(savedState)) {
failed = false;
return savedState;
} else {throw new IllegalMonitorStateException();
}
} finally {if (failed)
node.waitStatus = Node.CANCELLED;
}
}
3.1 是否在工作队列中
final boolean isOnSyncQueue(Node node) {
// -- 期待队列节点,或无前置节点的,肯定不在工作队列,返回 false
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// -- 在队列中,返回 true
if (node.next != null) // If has successor, it must be on queue
return true;
// -- 从工作队列的尾部查找
return findNodeFromTail(node);
}
3.2 排队获取
(在 ReentrantLock 已剖析过,间接粘贴过去)
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 循环中
for (;;) {final Node p = node.predecessor();
// ### 前置节点是头节点,有机会尝试获取
//(联合下一个 if 判断,会自旋两次,也就是说有两次尝试获取机会)if (p == head && tryAcquire(arg)) {setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// ### 第 1 次将 waitstatus 设置成 signal 返回 false
// ### 第 2 次判断 waitstatus==signal 返回 true
if (shouldParkAfterFailedAcquire(p, node)
// === 线程阻塞(将来唤醒时,从此处继续执行)&& parkAndCheckInterrupt())
interrupted = true;
}
} finally {if (failed)
cancelAcquire(node);
}
}
###
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// -- 第二次调用
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
pred.next = node;
}
// -- 第一次调用
else {compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
===
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);
// 以后线程是否被中断
return Thread.interrupted();}
三、signal
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal
public final void signal() {
// exclusiveOwnerThread 持有线程断定
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
// 唤醒的是期待队列头节点
if (first != null)
// == 队列节点转移(期待队列 -> 工作队列)doSignal(first);
}
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#doSignal
private void doSignal(Node first) {
do {
// -- 开释期待队列头节点
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (
// -- 节点转移
!transferForSignal(first)
&& (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 尾插工作队列
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
正文完