BlockingQueue与Condition原理解析

11次阅读

共计 6015 个字符,预计需要花费 16 分钟才能阅读完成。

 我在前段时间写了一篇关于 AQS 源码解析的文章 AbstractQueuedSynchronizer 超详细原理解析,在文章里边我说 JUC 包中的大部分多线程相关的类都和 AQS 相关,今天我们就学习一下依赖于 AQS 来实现的阻塞队列 BlockingQueue 的实现原理。本文中的源码未加说明即来自于以 ArrayBlockingQueue。
阻塞队列
 相信大多数同学在学习线程池时会了解阻塞队列的概念,熟记各种类型的阻塞队列对线程池初始化的影响。当从阻塞队列获取元素但是队列为空时,当前线程会阻塞直到另一个线程向阻塞队列中添加一个元素;类似的,当向一个阻塞队列加入元素时,如果队列已经满了,当前线程也会阻塞直到另外一个线程从队列中读取一个元素。阻塞队列一般都是先进先出的, 用来实现生产者和消费者模式。当发生上述两种情况时,阻塞队列有四种不同的处理方式,这四种方式分别为抛出异常,返回特殊值 (null 或在是 false),阻塞当前线程直到执行结束,最后一种是只阻塞固定时间,到时后还无法执行成功就放弃操作。这些方法都总结在下边这种表中了。

 我们就只分析 put 和 take 方法。
put 和 take 函数
 我们都知道,使用同步队列可以很轻松的实现生产者 - 消费者模式,其实,同步队列就是按照生产者 - 消费者的模式来实现的,我们可以将 put 函数看作生产者的操作,take 是消费者的操作。
 我们首先看一下 ArrayListBlock 的构造函数。它初始化了 put 和 take 函数中使用到的关键成员变量,分别是 ReentrantLock 和 Condition。
public ArrayBlockingQueue(int capacity, boolean fair) {
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
 ReentrantLock 是 AQS 的子类,其 newCondition 函数返回的 Condition 接口实例是定义在 AQS 类内部的 ConditionObject 实现类。它可以直接调用 AQS 相关的函数。

 put 函数会在队列末尾添加元素,如果队列已经满了,无法添加元素的话,就一直阻塞等待到可以加入为止。函数的源码如下所示。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 先获得锁
try {
while (count == items.length)
// 如果队列满了,就 NotFull 这个 Condition 对象上进行等待
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
// 这里可以注意的是 ArrayBlockingList 实际上使用 Array 实现了一个环形数组,
// 当 putIndex 达到最大时,就返回到起点,继续插入,
// 当然,如果此时 0 位置的元素还没有被取走,
// 下次 put 时,就会因为 cout == item.length 未被阻塞。
if (++putIndex == items.length)
putIndex = 0;
count++;
// 因为插入了元素,通知等待 notEmpty 事件的线程。
notEmpty.signal();
}
 我们会发现 put 函数使用了 wait/notify 的机制。与一般生产者 - 消费者的实现方式不同,同步队列使用 ReentrantLock 和 Condition 相结合的先获得锁,再等待的机制;而不是 Synchronized 和 Object.wait 的机制。这里的区别我们下一节再详细讲解。看完了生产者相关的 put 函数,我们再来看一下消费者调用的 take 函数。take 函数在队列为空时会被阻塞,一直到阻塞队列加入了新的元素。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
// 如果队列为空,那么在 notEmpty 对象上等待,
// 当 put 函数调用时,会调用 notEmpty 的 notify 进行通知。
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
E x = (E) items[takeIndex];
items[takeIndex] = null; // 取出 takeIndex 位置的元素
if (++takeIndex == items.length)
// 如果到了尾部,将指针重新调整到头部
takeIndex = 0;
count–;
….
// 通知 notFull 对象上等待的线程
notFull.signal();
return x;
}
await 操作
 我们发现 ArrayBlockingList 并没有使用 Object.wait,而是使用的 Condition.await,这是为什么呢?其中又有哪些原因呢?Condition 对象可以提供和 Object 的 wait 和 notify 一样的行为,但是后者必须先获取 synchronized 这个内置的 monitor 锁,才能调用;而 Condition 则必须先获取 ReentrantLock。这两种方式在阻塞等待时都会将相应的锁释放掉,但是 Condition 的等待可以中断,这是二者唯一的区别。
 我们先来看一下 Condition 的 wait 函数,wait 函数的流程大致如下图所示。

 wait 函数主要有三个步骤。一是调用 addConditionWaiter 函数,在 condition wait queue 队列中添加一个节点,代表当前线程在等待一个消息。然后调用 fullyRelease 函数,将持有的锁释放掉,调用的是 AQS 的函数,不清楚的同学可以查看本篇开头的介绍的文章。最后一直调用 isOnSyncQueue 函数判断节点是否被转移到 sync queue 队列上,也就是 AQS 中等待获取锁的队列。如果没有,则进入阻塞状态,如果已经在队列上,则调用 acquireQueued 函数重新获取锁。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 在 condition wait 队列上添加新的节点
Node node = addConditionWaiter();
// 释放当前持有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 由于 node 在之前是添加到 condition wait queue 上的,现在判断这个 node
// 是否被添加到 Sync 的获得锁的等待队列上,Sync 就是 AQS 的子类
//node 在 condition queue 上说明还在等待事件的 notify,
//notify 函数会将 condition queue 上的 node 转化到 Sync 的队列上。
while (!isOnSyncQueue(node)) {
//node 还没有被添加到 Sync Queue 上,说明还在等待事件通知
// 所以调用 park 函数来停止线程执行
LockSupport.park(this);
// 判断是否被中断, 线程从 park 函数返回有两种情况,一种是
// 其他线程调用了 unpark, 另外一种是线程被中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 代码执行到这里,已经有其他线程调用 notify 函数,或则被中断,该线程可以继续执行,但是必须先
// 再次获得调用 await 函数时的锁.acquireQueued 函数在 AQS 文章中做了介绍.
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
....
}

final int fullyRelease(Node node) {
//AQS 的方法,当前已经在锁中了,所以直接操作
boolean failed = true;
try {
int savedState = getState();
// 获取 state 当前的值,然后保存,以待以后恢复
// release 函数是 AQS 的函数,不清楚的同学请看开头介绍的文章。
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

private int checkInterruptWhileWaiting(Node node) {
// 中断可能发生在两个阶段中,一是在等待 signa 时, 另外一个是在获得 signal 之后
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}

final boolean transferAfterCancelledWait(Node node) {
// 这里要和下边的 transferForSignal 对应着看,这是线程中断进入的逻辑.那边是 signal 的逻辑
// 两边可能有并发冲突,但是成功的一方必须调用 enq 来进入 acquire lock queue 中.
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
// 如果失败了,说明 transferForSignal 那边成功了,等待 node 进入 acquire lock queue
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}

signal 操作
 signal 函数将 condition wait queue 队列中队首的线程节点转移等待获取锁的 sync queue 队列中。这样的话,wait 函数中调用 isOnSyncQueue 函数就会返回 true,导致 wait 函数进入最后一步重新获取锁的状态。
 我们这里来详细解析一下 condition wait queue 和 sync queue 两个队列的设计原理。condition wait queue 是等待消息的队列,因为阻塞队列为空而进入阻塞状态的 take 函数操作就是在等待阻塞队列不为空的消息。而 sync queue 队列则是等待获取锁的队列,take 函数获得了消息,就可以运行了,但是它还必须等待获取锁之后才能真正进行运行状态。
 signal 函数的示意图如下所示。

 signal 函数其实就做了一件事情,就是不断尝试调用 transferForSignal 函数,将 condition wait queue 队首的一个节点转移到 sync queue 队列中,直到转移成功。因为一次转移成功,就代表这个消息被成功通知到了等待消息的节点。
public final void signal() {
if (!isHeldExclusively())
// 如果当前线程没有获得锁,抛出异常
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
// 将 Condition wait queue 中的第一个 node 转移到 acquire lock queue 中.
doSignal(first);
}

private void doSignal(Node first) {
do {
// 由于生产者的 signal 在有消费者等待的情况下,必须要通知
// 一个消费者,所以这里有一个循环,直到队列为空
// 把 first 这个 node 从 condition queue 中删除掉
//condition queue 的头指针指向 node 的后继节点,如果 node 后续节点为 null, 那么也将尾指针也置为 null
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
//transferForSignal 将 node 转而添加到 Sync 的 acquire lock 队列
}

final boolean transferForSignal(Node node) {
// 如果设置失败,说明该 node 已经被取消了, 所以返回 false, 让 doSignal 继续向下通知其他未被取消的 node
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 将 node 添加到 acquire lock queue 中.
Node p = enq(node);
int ws = p.waitStatus;
// 需要注意的是这里的 node 进行了转化
//ws>0 代表 canceled 的含义所以直接 unpark 线程
// 如果 compareAndSetWaitStatus 失败,所以直接 unpark, 让线程继续执行 await 中的
// 进行 isOnSyncQueue 判断的 while 循环, 然后进入 acquireQueue 函数.
// 这里失败的原因可能是 Lock 其他线程释放掉了锁,同步设置 p 的 waitStatus
// 如果 compareAndSetWaitStatus 成功了呢?那么该 node 就一直在 acquire lock queue 中
// 等待锁被释放掉再次抢夺锁,然后再 unpark
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

后记
 后边一篇文章主要讲解如何自己使用 AQS 来创建符合自己业务需求的锁,请大家继续关注我的文章啦.一起进步偶。

正文完
 0