关于java:JUC两图理解Condition的等待唤醒逻辑

5次阅读

共计 4689 个字符,预计需要花费 12 分钟才能阅读完成。

应用样例

ThreadA
ThreadB
ThreadC
ThreadD - (插队专业户)

Lock lock = new ReentrantLock();
lock.lock();
// ThreadA 阻塞在这里
lock.newCondition().await();
// do business..
lock.unlock();

// ———— 无牵无挂的分割线 ————

lock.lock();
// ThreadB 进入这里,触发 signal
lock.newCondition().signal();
lock.unlock();

1、图解 await

条件:

await() 在 lock() 和 unlock() 之间执行——执行 await 的办法必然持有锁(owner 记录的线程)

步骤 A:

await 执行时会做三件事:
1. 革除 state 和 owner
2. 唤醒工作队列的 head.next
3. 退出并阻塞在期待队列中

步骤 B:

head.next 节点绑定的线程(图中为 ThreadB)被唤醒,试图抢占锁。抢占胜利从工作队列移除(当然 ThreadB 执行 await 时,也会反复步骤 A)

2、图解 signal

signal 只做一件事,将期待队列的头节点开释迁徙至工作队列的尾部(尾插,图中绿色线)
ThreadA 真正的开释要等 unlock() 办法

以下用代码具体解释下面的两幅图

一、构造阐明

Condition 构造

public class ConditionObject implements Condition, java.io.Serializable {
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;

Node 构造

class Node {

    /** 独占 */
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    
    /** 持有线程 */
    volatile Thread thread;
    
    /** 期待队列相干 */
    volatile int waitStatus;
    Node nextWaiter;
    
    /** 工作队列相干 */
    volatile Node prev;
    volatile Node next;

二、await

public final void await() throws InterruptedException {if (Thread.interrupted())
        throw new InterruptedException();
    // == 1、创立 condition waiter 队列 (一个新的期待队列),node 是尾节点
    Node node = addConditionWaiter();
    // == 2、state 状态清零,解除阻塞状态 -> 此时其它线程能够抢占锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    
    // == 3.1、不在工作队列中
    // 这里的限度:只有在工作队列的 node,能力跳出循环,进入 3.2 逻辑
   while (!isOnSyncQueue(node)) {LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    
    // == 3.2、在工作队列中,尝试排队获取锁(未获取到锁则阻塞在工作队列中)if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

1、创立期待队列

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 移除“勾销”状态的节点
    if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();
        t = lastWaiter;
    }
    
    // 创立 node 节点,waitStatus 标记为 condition->`-2`
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

2. 开释 lock

java.util.concurrent.locks.AbstractQueuedSynchronizer#fullyRelease
final int fullyRelease(Node node) {
    boolean failed = true;
    try {int savedState = getState();
        // 还原 state,exclusiveOwnerThread 清空;工作队列中本来被阻塞的第一个线程(ThreadB)开释
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {throw new IllegalMonitorStateException();
        }
    } finally {if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

3.1 是否在工作队列中

final boolean isOnSyncQueue(Node node) {
    // -- 期待队列节点,或无前置节点的,肯定不在工作队列,返回 false
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // -- 在队列中,返回 true
    if (node.next != null) // If has successor, it must be on queue
        return true;
    // -- 从工作队列的尾部查找
    return findNodeFromTail(node);
}

3.2 排队获取

(在 ReentrantLock 已剖析过,间接粘贴过去)

java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 循环中
        for (;;) {final Node p = node.predecessor();
            // ### 前置节点是头节点,有机会尝试获取
            //(联合下一个 if 判断,会自旋两次,也就是说有两次尝试获取机会)if (p == head && tryAcquire(arg)) {setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // ### 第 1 次将 waitstatus 设置成 signal 返回 false 
            // ###    第 2 次判断 waitstatus==signal 返回 true
            if (shouldParkAfterFailedAcquire(p, node) 
                    // === 线程阻塞(将来唤醒时,从此处继续执行)&& parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {if (failed)
            cancelAcquire(node);
    }
}

###
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // -- 第二次调用
    if (ws == Node.SIGNAL)
        return true;
    
    if (ws > 0) {
        do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
        pred.next = node;
    } 
    // -- 第一次调用
    else {compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

===
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);
    // 以后线程是否被中断
    return Thread.interrupted();}

三、signal

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal
public final void signal() {
    // exclusiveOwnerThread 持有线程断定
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    // 唤醒的是期待队列头节点
    if (first != null)
        // == 队列节点转移(期待队列 -> 工作队列)doSignal(first);
}
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#doSignal
private void doSignal(Node first) {
    do {
        // -- 开释期待队列头节点
        if ((firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (
              // -- 节点转移
              !transferForSignal(first) 
              && (first = firstWaiter) != null);
}


final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 尾插工作队列
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
正文完
 0