在上一篇文章《从 0 到 1 实现自己的阻塞队列(上)》中,我们已经实现了一个可以使用的阻塞队列版本。在这篇文章中,我们可以继续我们的冒险之旅,将我们的阻塞队列提升到接近 JDK 版本的水平上。
更进一步优化效率
我们一直使用的都是 Object.notifyAll()
或者 condition.signalAll()
这样会唤醒所有线程的方法,那么如果只有一个线程能够顺利执行,但是其他线程都要再次回到等待状态继续休眠,那不是非常的浪费吗?比如如果有 N 个消费者线程在等待队列中出现元素,那么当一个元素被插入以后所有 N 个消费者线程都被全部唤醒,最后也只会有一个消费者线程能够真正拿到元素并执行完成,其他线程不是都被白白唤醒了吗?我们为什么不用只会唤醒一个线程的 Object.notify()
和condition.signal()
方法呢?
拆分条件变量
在阻塞队列中,我们可以使用 Object.notify()
或者 condition.signal()
这样只唤醒一个线程的方法,但是会有一些前提条件:
- 首先,在一个条件变量上等待的线程必须是同一类型的。比如一个条件变量上只有消费者线程在等待,另一个条件变量上只有生产者线程在等待。这么做的目的就是防止发生我们在插入时想唤醒的是消费者线程,但是唤醒了一个生产者线程,这个生产者线程又因为队列已满又进入了等待状态,这样我们需要唤醒的消费者线程就永远不会被唤醒了。
- 另外还有一点就是这个条件变量上等待的线程只能互斥执行,如果 N 个生产者线程可以同时执行,我们也就不需要一个一个唤醒了,这样反而会让效率降低。当然,在我们的阻塞队列当中,不管是插入还是弹出操作同一时间都只能有一个线程在执行,所以自然就满足这个要求了。
所以,我们只需要满足第一个要求让不同类型的线程在不同的条件变量上等待就可以了。那么具体要怎么做呢?
首先,我们自然是要把原来的一个条件变量 condition
给拆分成两个实例变量 notFull
和notEmpty
,这两个条件变量虽然对应于同一互斥锁,但是两个条件变量的等待和唤醒操作是完全隔离的。这两个条件变量分别代表队列未满和队列非空两个条件,消费者线程因为是被队列为空的情况所阻塞的,所以就应该等待队列非空条件得到满足;而生产者线程因为是被队列已满的情况所阻塞的,自然就要等待队列未满条件的成立。
/** 队列未满的条件变量 */
private final Condition notFull = lock.newCondition();
/** 队列非空的条件变量 */
private final Condition notEmpty = lock.newCondition();
所以在 put()
和take()
方法中,我们就需要把 take()
方法中原来的 condition.await()
修改为等待队列非空条件,即 notEmpty.await()
;而put()
方法中的 condition.await()
自然是要修改为等待队列未满条件成立,即 notFull.await()
。既然我们把等待条件变量的语句都改了,那么唤醒的语句也要做同样的修改,put()
操作要唤醒等待的消费者线程,所以是 notEmpty.signal()
;take()
操作要唤醒的生产者线程,所以是notFull.signal()
。修改完成后的代码如下,大家可以参考一下:
/**
* 将指定元素插入队列
*
* @param e 待插入的对象
*/
public void put(Object e) throws InterruptedException {lock.lockInterruptibly();
try {while (count == items.length) {
// 队列已满时进入休眠
// 等待队列未满条件得到满足
notFull.await();}
// 执行入队操作,将对象 e 实际放入队列中
enqueue(e);
// 插入元素后唤醒一个等待队列非空条件成立的线程
notEmpty.signal();} finally {lock.unlock();
}
}
/**
* 从队列中弹出一个元素
*
* @return 被弹出的元素
*/
public Object take() throws InterruptedException {lock.lockInterruptibly();
try {while (count == 0) {
// 队列为空时进入休眠
// 等待队列非空条件得到满足
notEmpty.await();}
// 执行出队操作,将队列中的第一个元素弹出
Object e = dequeue();
// 弹出元素后唤醒一个等待队列未满条件成立的线程
notFull.signal();
return e;
} finally {lock.unlock();
}
}
验证程序的效率
既然我们对阻塞队列做了效率上的改进,那么就让我们来实际检验一下吧。我们还是之前已经提供的检验程序,但是不同的是,为了明显地看出效率上的变化,我们需要修改一下程序中的两个变量。首先,我们需要把检验程序中运行的线程数 threads
增加到 400,然后我们需要把每个线程执行的次数改为 100 次,就像下面这样:
// 创建 400 个线程
final int threads = 400;
// 每个线程执行 100 次
final int times = 100;
最后我们分别使用改进前和改进后的版本来执行这个这个阻塞队列,在我的电脑上,改进前的版本耗时为 7.80s,改进后的版本耗时为 1.35s。看起来我们对阻塞队列的效率做了一个非常大的提升,非常好,那我们还有没有办法再加快一点呢?
还能不能更快?
在上面的阻塞队列实现中,我们主要使用的就是 put()
和take()
两个操作。而因为有互斥锁 ReentrantLock
的保护,所以这两个方法在同一时间只能有一个线程调用。也就是说,生产者线程在操作队列时同样会阻塞消费者线程。不过从我们的代码中看,实际上 put()
方法和 take()
方法之间需要有互斥锁保护的共享数据访问只发生在入队操作 enqueue
方法和出队操作 dequeue
方法之中。在这两个方法里,对于 putIndex
和takeIndex
的访问是完全隔离的,enqueue
只使用 putIndex
,而dequeue
只使用 takeIndex
,那么线程间的竞争性数据就只剩下 count 了。这样的话,如果我们能解决 count 的更新问题是不是就可以把锁lock
拆分为两个互斥锁,分别让生产者线程和消费者线程使用了呢?这样的话生产者线程在操作时就只会阻塞生产者线程而不会阻塞消费者线程了,消费者线程也是一样的道理。
拆分锁
这时候就要请出我们很熟悉的一种同步工具 CAS
了,CAS
是一个原子操作,它会接收两个参数,一个是当前值,一个是目标值,如果当前值已经发生了改变,那么就会返回失败,而如果当前值没有变化,就会将这个变量修改为目标值。在 Java 中,我们一般会通过 java.util.concurrent
中的 AtomicInteger
来执行 CAS 操作。在 AtomicInteger
类上有原子性的增加与减少方法,每次调用都可以保证对指定的对象进行增加或减少,并且即使有多个线程同时执行这些操作,它们的结果也仍然是正确的。
首先,为了保证入队和出队操作之间的互斥特性移除后两个方法能够并发执行,那么我们就要保证对 count 的更新是线程安全的。因此,我们首先需要把实例变量 count
的类型从 int
修改为 AtomicInteger
,而AtomicInteger
类就提供了我们需要的原子性的增加与减少接口。
/** 队列中的元素总数 */
private AtomicInteger count = new AtomicInteger(0);
然后对应地,我们需要将入队方法中的 count++
和出队方法中的 count--
分别改为 Atomic
原子性的加 1 方法 getAndIncrement
与减 1 方法getAndDecrement
。
/**
* 入队操作
*
* @param e 待插入的对象
*/
private void enqueue(Object e) {
// 将对象 e 放入 putIndex 指向的位置
items[putIndex] = e;
// putIndex 向后移一位,如果已到末尾则返回队列开头(位置 0)
if (++putIndex == items.length)
putIndex = 0;
// 增加元素总数
count.getAndIncrement();}
/**
* 出队操作
*
* @return 被弹出的元素
*/
private Object dequeue() {
// 取出 takeIndex 指向位置中的元素
// 并将该位置清空
Object e = items[takeIndex];
items[takeIndex] = null;
// takeIndex 向后移一位,如果已到末尾则返回队列开头(位置 0)
if (++takeIndex == items.length)
takeIndex = 0;
// 减少元素总数
count.getAndDecrement();
// 返回之前代码中取出的元素 e
return e;
}
到这里,我们就已经解决了 put()
和take()
方法之间的数据竞争问题,两个方法现在就可以分别用两个锁来控制了。虽然相同类型的线程仍然是互斥的,例如生产者和生产者之间同一时间只能有一个生产者线程在操作队列。但是在生产者线程和消费者线程之间将不用再继续互斥,一个生产者线程和一个消费者线程可以在同一时间操作同一阻塞队列了。所以,我们在这里可以将互斥锁 lock
拆为两个,分别保证生产者线程和消费者线程的互斥性,我们将它们命名为插入锁 putLock
和弹出锁 takeLock
。同时,原来的条件变量也要分别对应于不同的互斥锁了,notFull
要对应于 putLock
,因为插入元素的生产者线程需要等待队列未满条件,那么notEmpyt
自然就要对应于 takeLock
了。
/** 插入锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** 队列未满的条件变量 */
private final Condition notFull = putLock.newCondition();
/** 弹出锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 队列非空的条件变量 */
private final Condition notEmpty = takeLock.newCondition();
最后我们要对 put()
和take()
方法中的 signal()
调用做出一些调整。因为在上文中提到的,在使用条件变量时一定要先持有条件变量所对应的互斥锁,而在 put()
和take()
方法中,使用 signal()
方法唤醒的都是另一种类型的线程,例如生产者线程唤醒消费者,消费者线程唤醒生产者。这样我们调用 signal()
方法的条件变量就和 try 语句中持有的锁不一致了,所以我们必须将直接的 xxx.signal()
调用替换为一个私有方法调用。而在私有方法中,我们会先获取与条件变量对应的锁,然后再调用条件变量的 signal()
方法。比如在下面的 signalNotEmpty()
方法中,我们就要先获取 takeLock
才能调用 notEmpty.signal()
;而在signalNotFull()
方法中,我们就要先获取 putLock
才能调用notFull.signal()
。
/**
* 唤醒等待队列非空条件的线程
*/
private void signalNotEmpty() {
// 为了唤醒等待队列非空条件的线程,需要先获取对应的 takeLock
takeLock.lock();
try {
// 唤醒一个等待非空条件的线程
notEmpty.signal();} finally {takeLock.unlock();
}
}
/**
* 唤醒等待队列未满条件的线程
*/
private void signalNotFull() {
// 为了唤醒等待队列未满条件的线程,需要先获取对应的 putLock
putLock.lock();
try {
// 唤醒一个等待队列未满条件的线程
notFull.signal();} finally {putLock.unlock();
}
}
解决死锁问题
但直接把 notFull.signal()
换成 signalNotFull()
,把notEmpty.signal()
换成 signalNotEmpty()
还不够,因为在我们的代码中,原来的 notFull.signal()
和notEmpty.signal()
都是在持有锁的 try 语句块当中的。一旦我们做了调用私有方法的替换,那么 put()
和take()
方法就会以相反的顺序同时获取 putLock
和takeLock
两个锁。有一些读者可能已经意识到这样会产生死锁问题了,那么我们应该怎么解决它呢?
最好的方法就是不要同时加两个锁,我们完全可以在释放前一个之后再使用 signal()
方法来唤醒另一种类型的线程。就像下面的 put()
与take()
方法中所做的一样,我们可以在执行完入队操作之后就释放插入锁 putLock
,然后才运行signalNotEmpty()
方法去获取 takeLock
并调用与其对应的条件变量 notEmpty
的signal()
方法,在 take()
方法中也是一样的道理。
/**
* 将指定元素插入队列
*
* @param e 待插入的对象
*/
public void put(Object e) throws InterruptedException {putLock.lockInterruptibly();
try {while (count.get() == items.length) {
// 队列已满时进入休眠
// 等待队列未满条件得到满足
notFull.await();}
// 执行入队操作,将对象 e 实际放入队列中
enqueue(e);
} finally {putLock.unlock();
}
// 唤醒等待队列非空条件的线程
// 为了防止死锁,不能在释放 putLock 之前获取 takeLock
signalNotEmpty();}
/**
* 从队列中弹出一个元素
*
* @return 被弹出的元素
*/
public Object take() throws InterruptedException {
Object e;
takeLock.lockInterruptibly();
try {while (count.get() == 0) {
// 队列为空时进入休眠
// 等待队列非空条件得到满足
notEmpty.await();}
// 执行出队操作,将队列中的第一个元素弹出
e = dequeue();} finally {takeLock.unlock();
}
// 唤醒等待队列未满条件的线程
// 为了防止死锁,不能在释放 takeLock 之前获取 putLock
signalNotFull();
return e;
}
到了这里我们就顺利地把原来单一的一个 lock
锁拆分为了插入锁 putLock
和takeLock
,这样生产者线程和消费者线程就可以同时运行了。
最后的细节优化——优化唤醒其他线程的效率
啊?我们的阻塞队列到了这里还能再继续优化吗?其实我们做的优化已经足够多了,基本上影响比较大的优化我们都做了,但是还有一些细节是可以最后完善一下的。比如说如果队列并没有为空或者已满时,我们插入或者弹出了元素其实都是不需要唤醒任何线程的,多余的唤醒操作需要先获取 ReentrantLock
锁才能调用对应的条件变量的 signal()
方法,而获取锁是一个成本比较大的操作。所以我们最好是能在队列真的为空或者已满以后,成功插入或弹出元素时,再去获取锁并唤醒等待的线程。
也就是说我们会将 signalNotEmpty();
修改为 if (c == 0) signalNotEmpty();
,而把signalNotFull();
修改为if (c == items.length) signalNotFull();
,也就是只有在必要的时候才去唤醒另一种类型的线程。但是这种修改又会引入另外一种问题,例如有 N 个消费者线程在等待队列非空,这时有两个生产者线程插入了两个元素,但是这两个插入操作是连续发生的,也就是说只有第一个生产者线程在插入元素之后调用了signalNotEmpty()
,第二个线程看到队列原本是非空的就不会调用唤醒方法。在这种情况下,实际就只有一个消费者线程被唤醒了,而实际上队列中还有一个元素可供消费。那么我们如何解决这个问题呢?
比较简单的一种方法就是,生产者线程和消费者线程不止会唤醒另一种类型的线程,而且也会唤醒同类型的线程。比如在生产者线程中如果插入元素之后发现队列还未满,那么就可以调用 notFull.signal()
方法来唤醒其他可能存在的等待状态的生产者线程,对于消费者线程所使用的 take()
方法也是类似的处理方式。相对来说 signal 方法较低,而互斥锁的 lock 方法成本较高,而且会影响到另一种类型线程的运行。所以通过这种方式尽可能地少调用 signalNotEmpty()
和signalNotFull()
方法会是一种还不错的优化手段。
优化后的 put()
和take()
方法如下:
/**
* 将指定元素插入队列
*
* @param e 待插入的对象
*/
public void put(Object e) throws InterruptedException {
int c = -1;
putLock.lockInterruptibly();
try {while (count.get() == items.length) {
// 队列已满时进入休眠
// 等待队列未满条件得到满足
notFull.await();}
// 执行入队操作,将对象 e 实际放入队列中
enqueue(e);
// 增加元素总数
c = count.getAndIncrement();
// 如果在插入后队列仍然没满,则唤醒其他等待插入的线程
if (c + 1 < items.length)
notFull.signal();} finally {putLock.unlock();
}
// 如果插入之前队列为空,才唤醒等待弹出元素的线程
// 为了防止死锁,不能在释放 putLock 之前获取 takeLock
if (c == 0)
signalNotEmpty();}
/**
* 从队列中弹出一个元素
*
* @return 被弹出的元素
*/
public Object take() throws InterruptedException {
Object e;
int c = -1;
takeLock.lockInterruptibly();
try {while (count.get() == 0) {
// 队列为空时进入休眠
// 等待队列非空条件得到满足
notEmpty.await();}
// 执行出队操作,将队列中的第一个元素弹出
e = dequeue();
// 减少元素总数
c = count.getAndDecrement();
// 如果队列在弹出一个元素后仍然非空,则唤醒其他等待队列非空的线程
if (c - 1 > 0)
notEmpty.signal();} finally {takeLock.unlock();
}
// 只有在弹出之前队列已满的情况下才唤醒等待插入元素的线程
// 为了防止死锁,不能在释放 takeLock 之前获取 putLock
if (c == items.length)
signalNotFull();
return e;
}
成品出炉!
恭喜大家,经过一番漫长的探索,我们终于彻底完成了我们的阻塞队列实现之旅。如果你能坚持到这里,我相信你已经对多线程编程的实践方法有了非常深刻的理解。最后让我们来看一看我们最终完成的成品代码——一个完整的阻塞队列实现吧。
完整的阻塞队列代码
public class BlockingQueue {
/** 存放元素的数组 */
private final Object[] items;
/** 弹出元素的位置 */
private int takeIndex;
/** 插入元素的位置 */
private int putIndex;
/** 队列中的元素总数 */
private AtomicInteger count = new AtomicInteger(0);
/** 插入锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** 队列未满的条件变量 */
private final Condition notFull = putLock.newCondition();
/** 弹出锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 队列非空的条件变量 */
private final Condition notEmpty = takeLock.newCondition();
/**
* 指定队列大小的构造器
*
* @param capacity 队列大小
*/
public BlockingQueue(int capacity) {if (capacity <= 0)
throw new IllegalArgumentException();
items = new Object[capacity];
}
/**
* 入队操作
*
* @param e 待插入的对象
*/
private void enqueue(Object e) {
// 将对象 e 放入 putIndex 指向的位置
items[putIndex] = e;
// putIndex 向后移一位,如果已到末尾则返回队列开头(位置 0)
if (++putIndex == items.length)
putIndex = 0;
}
/**
* 出队操作
*
* @return 被弹出的元素
*/
private Object dequeue() {
// 取出 takeIndex 指向位置中的元素
// 并将该位置清空
Object e = items[takeIndex];
items[takeIndex] = null;
// takeIndex 向后移一位,如果已到末尾则返回队列开头(位置 0)
if (++takeIndex == items.length)
takeIndex = 0;
// 返回之前代码中取出的元素 e
return e;
}
/**
* 将指定元素插入队列
*
* @param e 待插入的对象
*/
public void put(Object e) throws InterruptedException {
int c = -1;
putLock.lockInterruptibly();
try {while (count.get() == items.length) {
// 队列已满时进入休眠
// 等待队列未满条件得到满足
notFull.await();}
// 执行入队操作,将对象 e 实际放入队列中
enqueue(e);
// 增加元素总数
c = count.getAndIncrement();
// 如果在插入后队列仍然没满,则唤醒其他等待插入的线程
if (c + 1 < items.length)
notFull.signal();} finally {putLock.unlock();
}
// 如果插入之前队列为空,才唤醒等待弹出元素的线程
// 为了防止死锁,不能在释放 putLock 之前获取 takeLock
if (c == 0)
signalNotEmpty();}
/**
* 唤醒等待队列非空条件的线程
*/
private void signalNotEmpty() {
// 为了唤醒等待队列非空条件的线程,需要先获取对应的 takeLock
takeLock.lock();
try {
// 唤醒一个等待非空条件的线程
notEmpty.signal();} finally {takeLock.unlock();
}
}
/**
* 从队列中弹出一个元素
*
* @return 被弹出的元素
*/
public Object take() throws InterruptedException {
Object e;
int c = -1;
takeLock.lockInterruptibly();
try {while (count.get() == 0) {
// 队列为空时进入休眠
// 等待队列非空条件得到满足
notEmpty.await();}
// 执行出队操作,将队列中的第一个元素弹出
e = dequeue();
// 减少元素总数
c = count.getAndDecrement();
// 如果队列在弹出一个元素后仍然非空,则唤醒其他等待队列非空的线程
if (c - 1 > 0)
notEmpty.signal();} finally {takeLock.unlock();
}
// 只有在弹出之前队列已满的情况下才唤醒等待插入元素的线程
// 为了防止死锁,不能在释放 takeLock 之前获取 putLock
if (c == items.length)
signalNotFull();
return e;
}
/**
* 唤醒等待队列未满条件的线程
*/
private void signalNotFull() {
// 为了唤醒等待队列未满条件的线程,需要先获取对应的 putLock
putLock.lock();
try {
// 唤醒一个等待队列未满条件的线程
notFull.signal();} finally {putLock.unlock();
}
}
}
有兴趣的读者可以把我们完成的这个阻塞队列类和 JDK 中的 java.util.concurrent.LinkedBlockingQueue
类做一个比较,相信大家可以发现这两个类非常的相似,这足以说明我们费劲千辛万苦实现的这个阻塞队列类已经非常接近 JDK 中的阻塞队列类的质量了。
总结
恭喜大家终于完整地读完了这篇文章!在这篇文章中,我们从一个最简单的阻塞队列版本开始,一路解决了各种问题,最终得到了一个完整、高质量的阻塞队列实现。我们一起来回忆一下我们解决的问题吧。从最简单的阻塞队列开始,我们首先用互斥锁 synchronized
关键字解决了并发控制问题,保证了队列在多线程访问情况下的正确性。然后我们用条件变量 Object.wati()
、Object.notifyAll()
解决了休眠唤醒问题,使队列的效率得到了飞跃性地提高。为了保障队列的安全性,不让外部代码可以访问到我们所使用的对象锁和条件变量,所以我们使用了显式锁 ReentrantLock
,并通过锁对象lock
的newCondition()
方法创建了与其相对应的条件变量对象。最后,我们对队列中的条件变量和互斥锁分别做了拆分,使队列的效率得到了进一步的提高。当然,最后我们还加上了一点对唤醒操作的有条件调用优化,使整个阻塞队列的实现变得更加完善。