应用样例
ThreadAThreadBThreadCThreadD - (插队专业户)Lock lock = new ReentrantLock();lock.lock();// ThreadA阻塞在这里lock.newCondition().await();// do business..lock.unlock();// ———— 无牵无挂的分割线 ————lock.lock();// ThreadB进入这里,触发signallock.newCondition().signal();lock.unlock();
1、图解await
条件:
await()在lock()和unlock()之间执行——执行await的办法必然持有锁(owner记录的线程)
步骤A:
await执行时会做三件事:
1.革除state和owner
2.唤醒工作队列的head.next
3.退出并阻塞在期待队列中
步骤B:
head.next节点绑定的线程(图中为ThreadB)被唤醒,试图抢占锁。抢占胜利从工作队列移除(当然ThreadB执行await时,也会反复步骤A)
2、图解signal
signal只做一件事,将期待队列的头节点开释迁徙至工作队列的尾部(尾插,图中绿色线)
ThreadA真正的开释要等unlock()办法
以下用代码具体解释下面的两幅图
一、构造阐明
Condition构造
public class ConditionObject implements Condition, java.io.Serializable { /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter;
Node构造
class Node { /** 独占 */ static final Node EXCLUSIVE = null; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** 持有线程 */ volatile Thread thread; /** 期待队列相干 */ volatile int waitStatus; Node nextWaiter; /** 工作队列相干 */ volatile Node prev; volatile Node next;
二、await
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // == 1、创立condition waiter队列(一个新的期待队列),node是尾节点 Node node = addConditionWaiter(); // == 2、state状态清零,解除阻塞状态->此时其它线程能够抢占锁 int savedState = fullyRelease(node); int interruptMode = 0; // == 3.1、不在工作队列中 // 这里的限度:只有在工作队列的node,能力跳出循环,进入3.2逻辑 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // == 3.2、在工作队列中,尝试排队获取锁(未获取到锁则阻塞在工作队列中) if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode);}
1、创立期待队列
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#addConditionWaiterprivate Node addConditionWaiter() { Node t = lastWaiter; // 移除“勾销”状态的节点 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // 创立node节点,waitStatus标记为condition->`-2` Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node;}
2.开释lock
java.util.concurrent.locks.AbstractQueuedSynchronizer#fullyReleasefinal int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); // 还原state,exclusiveOwnerThread清空;工作队列中本来被阻塞的第一个线程(ThreadB)开释 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; }}
3.1是否在工作队列中
final boolean isOnSyncQueue(Node node) { // -- 期待队列节点,或无前置节点的,肯定不在工作队列,返回false if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // -- 在队列中,返回true if (node.next != null) // If has successor, it must be on queue return true; // -- 从工作队列的尾部查找 return findNodeFromTail(node);}
3.2排队获取
(在ReentrantLock已剖析过,间接粘贴过去)
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueuedfinal boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 循环中 for (;;) { final Node p = node.predecessor(); // ### 前置节点是头节点,有机会尝试获取 //(联合下一个if判断,会自旋两次,也就是说有两次尝试获取机会) if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // ### 第1次将waitstatus设置成signal返回false // ### 第2次判断waitstatus==signal返回true if (shouldParkAfterFailedAcquire(p, node) // === 线程阻塞(将来唤醒时,从此处继续执行) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}###private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // -- 第二次调用 if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } // -- 第一次调用 else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}===private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 以后线程是否被中断 return Thread.interrupted();}
三、signal
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signalpublic final void signal() { // exclusiveOwnerThread持有线程断定 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; // 唤醒的是期待队列头节点 if (first != null) // == 队列节点转移(期待队列->工作队列) doSignal(first);}
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#doSignalprivate void doSignal(Node first) { do { // -- 开释期待队列头节点 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while ( // -- 节点转移 !transferForSignal(first) && (first = firstWaiter) != null);}final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 尾插工作队列 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true;}