关于java:AQS-都看完了Condition-原理可不能少

7次阅读

共计 8170 个字符,预计需要花费 21 分钟才能阅读完成。

前言

在介绍 AQS 时,其中有一个外部类叫做 ConditionObject,过后并没有进行介绍,并且在后续浏览源码时,会发现很多中央用到了 Condition,这时就会很惊讶,这个 Condition 到底有什么作用?那明天就通过浏览 Condition 源码,从而弄清楚 Condition 到底是做什么的?当然浏览这篇文章的时候心愿你曾经浏览了 AQS、ReentrantLock 以及 LockSupport 的相干文章或者有肯定的理解(当然小伙伴也能够间接跳到文末看总结)。

公众号:『刘志航』,记录工作学习中的技术、开发及源码笔记;时不时分享一些生存中的见闻感悟。欢送大佬来领导!

介绍

Object 的监视器办法:wait、notify、notifyAll 应该都不生疏,在多线程应用场景下,必须先应用 synchronized 获取到锁,而后才能够调用 Object 的 wait、notify。

Condition 的应用,相当于用 Lock 替换了 synchronized,而后用 Condition 替换 Object 的监视器办法。

Conditions(也称为条件队列或条件变量)为一种线程提供了一种暂停执行(期待),直到另一线程告诉被阻塞的线程,某些状态条件当初可能为真。

因为拜访到此共享状态信息产生在不同的线程中,因而必须对其进行爱护,所以会应用某种模式的锁。期待条件提供的要害属性是它以原子地开释了关联的锁,并且挂起以后线程,就像 Object.wait 一样。

Condition 实例实质上要绑定到锁。为了取得 Condition 实例,个别应用 Lock 实例的 newCondition() 办法。

Lock lock = new ReentrantLock();
Condition con = lock.newCondition();

根本应用

class BoundedBuffer {final Lock lock = new ReentrantLock();
    // condition 实例依赖于 lock 实例
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];

    int putPtr, takePtr, count;

    public void put(Object x) throws InterruptedException {lock.lock();
        try {
            //  put 时判断是否曾经满了
            // 则线程在 notFull 条件上排队阻塞
            while (count == items.length) {notFull.await();
            }
            items[putPtr] = x;
            if (++putPtr == items.length) {putPtr = 0;}
            ++count;
            // put 胜利之后,队列中有元素
            // 唤醒在 notEmpty 条件上排队阻塞的线程
            notEmpty.signal();} finally {lock.unlock();
        }
    }

    public Object take() throws InterruptedException {lock.lock();
        try {
            // take 时,发现为空
            // 则线程在 notEmpty 的条件上排队阻塞
            while (count == 0) {notEmpty.await();
            }
            Object x = items[takePtr];
            if (++takePtr == items.length) {takePtr = 0;}
            --count;
            // take 胜利,队列不可能是满的
            // 唤醒在 notFull 条件上排队阻塞的线程
            notFull.signal();
            return x;
        } finally {lock.unlock();
        }
    }
}

下面是官网文档的一个例子,实现了一个简略的 BlockingQueue,看懂这里,会发现在 同步队列 中很多中央都是用的这个逻辑。必要的代码阐明都曾经在代码中进行正文。

问题疑难

  1. Condition 和 AQS 有什么关系?
  2. Condition 的实现原理是什么?
  3. Condition 的期待队列和 AQS 的同步队列有什么区别和分割?

源码剖析

根本构造

通过 UML 能够看出,Condition 只是一个抽象类,它的次要实现逻辑是在 AQS 的外部类 ConditionObject 实现的。上面次要从 await 和 signal 两个办法动手,从源码理解 ConditionObject。

创立 Condition

Lock lock = new ReentrantLock();
Condition con = lock.newCondition();

个别应用 lock.newCondition() 创立条件变量。

public class ReentrantLock implements Lock, java.io.Serializable {

    private final Sync sync;

    public Condition newCondition() {return sync.newCondition();
    }
    // Sync 集成 AQS
    abstract static class Sync extends AbstractQueuedSynchronizer {final ConditionObject newCondition() {return new ConditionObject();
        }
    }
}

这里应用的是 ReentrantLock 的源码,外面调用的 sync.newCondition(),Sync 继承 AQS,其实就是创立了一个 AQS 外部类的 ConditionObject 的实例。

这里须要留神的是 lock 每调用一次 lock.newCondition() 都会有一个新的 ConditionObject 实例生成,就是说一个 lock 能够创立多个 Condition 实例。

Condition 参数

/** 条件队列的第一个节点 */
private transient Node firstWaiter;
/** 条件队列的最初一个节点 */
private transient Node lastWaiter;

await 办法

await 办法,会造成以后线程在期待,直到收到信号或被中断。

与此 Condition 相关联的锁被原子开释,并且出于线程调度目标,以后线程被禁用,并且处于休眠状态,直到产生以下四种状况之一:

  1. 其余一些线程调用此 Condition 的 signal 办法,而以后线程恰好被抉择为要唤醒的线程;
  2. 其余一些线程调用此 Condition 的 signalAll 办法;
  3. 其余一些线程中断以后线程,并反对中断线程挂起;
  4. 产生虚伪唤醒。

在所有状况下,在此办法能够返回之前,以后线程必须从新获取与此条件关联的锁。当线程返回时,能够保障放弃此锁。

当初来看 AQS 外部的实现逻辑:

public final void await() throws InterruptedException {
    // 响应中断
    if (Thread.interrupted())
        throw new InterruptedException();
    // 增加到条件队列尾部(期待队列)// 外部会创立 Node.CONDITION 类型的 Node
    Node node = addConditionWaiter();
    // 开释以后线程获取的锁(通过操作 state 的值)// 开释了锁就会被阻塞挂起
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 节点曾经不在同步队列中,则调用 park 让其在期待队列中挂着
    while (!isOnSyncQueue(node)) {
        // 调用 park 阻塞挂起以后线程
        LockSupport.park(this);
        // 阐明 signal 被调用了或者线程被中断,校验下唤醒起因
        // 如果因为终端被唤醒,则跳出循环
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // while 循环完结,线程开始抢锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 对立解决中断的
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

await 办法步骤如下:

  1. 创立 Node.CONDITION 类型的 Node 并增加到条件队列(ConditionQueue)的尾部;
  2. 开释以后线程获取的锁(通过操作 state 的值)
  3. 判断以后线程是否在同步队列(SyncQueue)中,不在的话会应用 park 挂起。
  4. 循环完结之后,阐明曾经曾经在同步队列(SyncQueue)中了,前面期待获取到锁,继续执行即可。

在这里肯定要把条件队列和同步队列进行辨别分明!!

条件队列 / 期待队列:即 Condition 的队列
同步队列:AQS 的队列。

上面对 await 外面重要办法进行浏览:

  • addConditionWaiter() 办法
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    // 判断尾节点状态,如果被勾销,则革除所有被勾销的节点
    if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 创立新节点,类型为 Node.CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 将新节点放到期待队列尾部
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

addConditionWaiter 办法能够看出,只是创立一个类型为 Node.CONDITION 的节点并放到条件队列尾部。同时通过这段代码还能够得出其余论断:

  1. 条件队列外部的 Node,只用到了 thread、waitStatus、nextWaiter 属性;
  2. 条件队列是单向队列。

作为比照,这里把条件队列和同步队列做出比照:

AQS 同步队列如下:

再来看下 Condition 的条件队列

waitStatus 在 AQS 中曾经进行了介绍:

  1. 默认状态为 0;
  2. waitStatus > 0 (CANCELLED 1) 阐明该节点超时或者中断了,须要从队列中移除;
  3. waitStatus = -1 SIGNAL 以后线程的前一个节点的状态为 SIGNAL,则以后线程须要阻塞(unpark);
  4. waitStatus = -2 CONDITION -2:该节点目前在条件队列;
  5. waitStatus = -3 PROPAGATE -3:releaseShared 应该被流传到其余节点,在共享锁模式下应用。
  • fullyRelease 办法(AQS)
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 获取以后节点的 state
        int savedState = getState();
        // 开释锁
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {throw new IllegalMonitorStateException();
        }
    } finally {if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

fullyRelease 办法是由 AQS 提供的,首先获取以后的 state,而后调用 release 办法进行开释锁。

public final boolean release(int arg) {if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

release 办法在 AQS 中做了具体的介绍。它的次要作用就是开释锁,并且须要留神的是:

  1. fullyRelease 会一次性开释所有的锁,所以说不论重入多少次,在这里都会全副开释的。
  2. 这里会抛出异样,次要是在开释锁失败时,这时就会在 finally 外面将节点状态置为 Node.CANCELLED。
  • isOnSyncQueue(node)

通过下面的流程,节点曾经放到了 条件队列 并且开释了持有的 ,而后就会挂起阻塞,直到 signal 唤醒。然而在挂起时要保障节点曾经不在同步队列(SyncQueue)中了才能够挂起。

final boolean isOnSyncQueue(Node node) {
    // 以后节点是条件队列节点,或者上一个节点是空
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;

    return findNodeFromTail(node);
}
// 从尾部开始遍历
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

如果一个节点(总是一个最后搁置在条件队列中的节点)当初正等待在同步队列上从新获取,则返回 true。

这段代码的次要作用判断节点是不是在同步队列中,如果不在同步队列中,前面才会调用 park 进行阻塞以后线程。这里就会有一个疑难:AQS 的同步队列和 Condition 的条件队列应该是无关的,这里为什么会要保障节点不在同步队列之后才能够进行阻塞?因为 signal 或者 signalAll 唤醒节点之后,节点就会被放到同步队列中。

线程到这里曾经被阻塞了,当有其余线程调用 signal 或者 signalAll 时,会唤醒以后线程。

而后会验证是否因中断唤醒以后线程,这里假如没有产生中断。那 while 循环的 isOnSyncQueue(Node node) 必然会返回 true,示意以后节点曾经在同步队列中了。

后续会调用 acquireQueued(node, savedState) 进行获取锁。

final boolean acquireQueued(final Node node, int arg) {
    // 是否拿到资源
    boolean failed = true;
    try {
        // 中断状态
        boolean interrupted = false;
        // 有限循环
        for (;;) {
            // 以后节点之前的节点
            final Node p = node.predecessor();
            // 前一个节点是头节点,阐明以后节点是 头节点的 next 即实在的第一个数据节点(因为 head 是虚构节点)// 而后再尝试获取资源
            if (p == head && tryAcquire(arg)) {
                // 获取胜利之后 将头指针指向以后节点
                setHead(node); 
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // p 不是头节点,或者 头节点未能获取到资源(非偏心状况下被别的节点抢占)// 判断 node 是否要被阻塞,获取不到锁就会始终阻塞
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {if (failed)
            cancelAcquire(node);
    }
}

这里就是 AQS 的逻辑了,同样能够浏览 AQS 的相干介绍。

  1. 一直获取本节点的上一个节点是否为 head,因为 head 是虚构节点,如果以后节点的上一个节点是 head 节点,则以后节点为 第一个数据节点 >
  2. 第一个数据节点一直的去获取资源,获取胜利,则将 head 指向以后节点;
  3. 以后节点不是头节点,或者 tryAcquire(arg) 失败(失败可能是非偏心锁)。这时候须要判断前一个节点状态决定 以后节点是否要被阻塞(前一个节点状态是否为 SIGNAL)。

值得注意的是,当节点放到 AQS 的同步队列时,也是进行争抢资源,同时设置 savedState 的值,这个值则是代表当初开释锁的时候开释了多少重入次数。

总体流程画图如下:

signal

public final void signal() {
    // 是否为以后持有线程
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        // firstWaiter 头节点指向条件队列头的下一个节点
        if ((firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 将原来的头节点和同步队列断开
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
                (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
 
    // 判断节点是否曾经在之前被勾销了
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 调用 enq 增加到 同步队列的尾部
    Node p = enq(node);
    int ws = p.waitStatus;
    // node 的上一个节点 批改为 SIGNAL 这样后续就能够唤醒本人了
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

enq 同样能够浏览 AQS 的代码

private Node enq(final Node node) {for (;;) {
        Node t = tail;
        // 尾节点为空 须要初始化头节点,此时头尾节点是一个
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 不为空 循环赋值
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

通过 enq 办法将节点放到 AQS 的同步队列之后,要将 node 的前一个节点的 waitStatus 设置为 Node.SIGNAL。signalAll 的代码也是相似。

总结

Q&A

Q: Condition 和 AQS 有什么关系?

A: Condition 是基于 AQS 实现的,Condition 的实现类 ConditionObject 是 AQS 的一个外部类,在外面共用了一部分 AQS 的逻辑。

Q: Condition 的实现原理是什么?

A: Condition 外部保护一个条件队列,在获取锁的状况下,线程调用 await,线程会被搁置在条件队列中并被阻塞。直到调用 signal、signalAll 唤醒线程,尔后线程唤醒,会放入到 AQS 的同步队列,参加争抢锁资源。

Q: Condition 的期待队列和 AQS 的同步队列有什么区别和分割?
A: Condition 的期待队列是单向链表,AQS 的是双向链表。二者之间并没有什么明确的分割。仅仅在节点从阻塞状态被唤醒后,会从期待队列挪到同步队列中。

结束语

本文次要是浏览 Condition 的相干代码,不过省略了线程中断等逻辑。有趣味的小伙伴。能够更深刻的钻研相干的源码。

正文完
 0