从0到1实现自己的阻塞队列上

41次阅读

共计 13273 个字符,预计需要花费 34 分钟才能阅读完成。

阻塞队列不止是一道热门的面试题,同时也是许多并发处理模型的基础,比如常用的线程池类 ThreadPoolExecutor 内部就使用了阻塞队列来保存等待被处理的任务。而且在大多数经典的多线程编程资料中,阻塞队列都是其中非常重要的一个实践案例。甚至可以说只有自己动手实现了一个阻塞队列才能真正掌握多线程相关的 API。

在这篇文章中,我们会从一个最简单的原型开始一步一步完善为一个类似于 JDK 中阻塞队列实现的真正实用的阻塞队列。在这个过程中,我们会一路涉及 synchronized 关键字、条件变量、显式锁 ReentrantLock 等等多线程编程的关键技术,最终掌握 Java 多线程编程的完整理论和实践知识。

阅读本文需要了解基本的多线程编程概念与互斥锁的使用,还不了解的读者可以参考一下这篇文章《多线程中那些看不见的陷阱》中到 ReentrantLock 部分为止的内容。

什么是阻塞队列?

阻塞队列 是这样的一种数据结构,它是一个队列(类似于一个 List),可以存放 0 到 N 个元素。我们可以对这个队列执行插入或弹出元素操作,弹出元素操作就是获取队列中的第一个元素,并且将其从队列中移除;而插入操作就是将元素添加到队列的末尾。当队列中没有元素时,对这个队列的弹出操作将会被阻塞,直到有元素被插入时才会被唤醒;当队列已满时,对这个队列的插入操作就会被阻塞,直到有元素被弹出后才会被唤醒。

在线程池中,往往就会用阻塞队列来保存那些暂时没有空闲线程可以直接执行的任务,等到线程空闲之后再从阻塞队列中弹出任务来执行。一旦队列为空,那么线程就会被阻塞,直到有新任务被插入为止。

一个最简单的版本

代码实现

我们先来实现一个最简单的队列,在这个队列中我们不会添加任何线程同步措施,而只是实现了最基本的队列与阻塞特性。那么首先,一个队列可以存放一定量的元素,而且可以执行插入元素和弹出元素的操作。然后因为这个队列还是一个阻塞队列,那么在队列为空时,弹出元素的操作将会被阻塞,直到队列中被插入新的元素可供弹出为止;而在队列已满的情况下,插入元素的操作将会被阻塞,直到队列中有元素被弹出为止。

下面我们会将这个最初的阻塞队列实现类拆解为独立的几块分别讲解和实现,到最后就能拼装出一个完整的阻塞队列类了。为了在阻塞队列中保存元素,我们首先要定义一个数组来保存元素,也就是下面代码中的 items 字段了,这是一个 Object 数组,所以可以保存任意类型的对象。在最后的构造器中,会传入一个 capacity 参数来指定 items 数组的大小,这个值也就是我们的阻塞队列的大小了。

takeIndexputIndex就是我们插入和弹出元素的下标位置了,为什么要分别用两个整型来保存这样的位置呢?因为阻塞队列在使用的过程中会不断地被插入和弹出元素,所以可以认为元素在数组中是像贪吃蛇一样一步一步往前移动的,每次弹出的都是队列中的第一个元素,而插入的元素则会被添加到队列的末尾。当下标到达末尾时会被设置为 0,从数组的第一个下标位置重新开始向后增长,形成一个不断循环的过程。

那么如果队列中存储的个数超过 items 数组的长度时,新插入的元素岂不是会覆盖队列开头还没有被弹出的元素了吗?这时我们的最后一个字段 count 就能派上用场了,当 count 等于 items.length 时,插入操作就会被阻塞,直到队列中有元素被弹出时为止。那么这种阻塞是如何实现的呢?我们接下来来看一下 put() 方法如何实现。

    /** 存放元素的数组 */
    private final Object[] items;
    
    /** 弹出元素的位置 */
    private int takeIndex;

    /** 插入元素的位置 */
    private int putIndex;
    
    /** 队列中的元素总数 */
    private int count;
    
    /**
     * 指定队列大小的构造器
     *
     * @param capacity  队列大小
     */
    public BlockingQueue(int capacity) {if (capacity <= 0)
            throw new IllegalArgumentException();
        // putIndex, takeIndex 和 count 都会被默认初始化为 0
        items = new Object[capacity];
    }

下面是 put()take()方法的实现,put()方法向队列末尾添加新元素,而 take() 方法从队列中弹出最前面的一个元素,我们首先来看一下我们目前最关心的 put() 方法。在 put() 方法的开头,我们可以看到有一个判断 count 是否达到了 items.length(队列大小)的 if 语句,如果 count 不等于 items.length,那么就表示队列还没有满,随后就直接调用了 enqueue 方法对元素进行了入队。enqueue方法的实现会在稍后介绍,这里我们只需要知道这个入队方法会将元素放入到队列中并对 count 加 1 就可以了。在成功插入元素之后我们就会通过 break 语句跳出最外层的无限 while 循环,从方法中返回。

但是如果这时候队列已满,那么 count 的值就会等于 items.length,这将会导致我们调用 Thread.sleep(200L) 使当前线程休眠 200 毫秒。当线程从休眠中恢复时,又会进入下一次循环,重新判断条件 count != items.length。也就是说,如果队列没有弹出元素使我们可以完成插入操作,那么线程就会一直处于“判断 -> 休眠”的循环而无法从put() 方法中返回,也就是进入了“阻塞”状态。

随后的 take() 方法也是一样的道理,只有在队列不为空的情况下才能顺利弹出元素完成任务并返回,如果队列一直为空,调用线程就会在循环中一直等待,直到队列中有元素插入为止。

    /**
     * 将指定元素插入队列
     *
     * @param e 待插入的对象
     */
    public void put(Object e) throws InterruptedException {while (true) {
            // 直到队列未满时才执行入队操作并跳出循环
            if (count != items.length) {
                // 执行入队操作,将对象 e 实际放入队列中
                enqueue(e);
                break;
            }

            // 队列已满的情况下休眠 200ms
            Thread.sleep(200L);
        }
    }

    /**
     * 从队列中弹出一个元素
     *
     * @return  被弹出的元素
     */
    public Object take() throws InterruptedException {while (true) {
            // 直到队列非空时才继续执行后续的出队操作并返回弹出的元素
            if (count != 0) {
                // 执行出队操作,将队列中的第一个元素弹出
                return dequeue();}

            // 队列为空的情况下休眠 200ms
            Thread.sleep(200L);
        }
    }

在上面的 put()take()方法中分别调用了入队方法 enqueue 和出队方法 dequeue,那么这两个方法到底需要如何实现呢?下面是这两个方法的源代码,我们可以看到,在入队方法enqueue() 中,总共有三步操作:

  1. 首先把指定的对象 e 保存到 items[putIndex] 中,putIndex指示的就是我们插入元素的位置。
  2. 之后,我们会将 putIndex 向后移一位,来确定下一次插入元素的下标位置,如果已经到了队列末尾我们就会把 putIndex 设置为 0,回到队列的开头。
  3. 最后,入队操作会将 count 值加 1,让 count 值和队列中的元素个数一致。

而出队方法 dequeue 中执行的操作则与入队方法 enqueue 相反。

    /**
     * 入队操作
     *
     * @param e 待插入的对象
     */
    private void enqueue(Object e) {
        // 将对象 e 放入 putIndex 指向的位置
        items[putIndex] = e;

        // putIndex 向后移一位,如果已到末尾则返回队列开头(位置 0)
        if (++putIndex == items.length)
            putIndex = 0;

        // 增加元素总数
        count++;
    }

    /**
     * 出队操作
     *
     * @return  被弹出的元素
     */
    private Object dequeue() {
        // 取出 takeIndex 指向位置中的元素
        // 并将该位置清空
        Object e = items[takeIndex];
        items[takeIndex] = null;

        // takeIndex 向后移一位,如果已到末尾则返回队列开头(位置 0)
        if (++takeIndex == items.length)
            takeIndex = 0;

        // 减少元素总数
        count--;

        // 返回之前代码中取出的元素 e
        return e;
    }

到这里我们就可以将这个三个模块拼接为一个完整的阻塞队列类 BlockingQueue 了。完整的代码如下,大家可以拷贝到 IDE 中,或者自己重新实现一遍,然后我们就可以开始上手用一用我们刚刚完成的阻塞队列了。

public class BlockingQueue {

    /** 存放元素的数组 */
    private final Object[] items;

    /** 弹出元素的位置 */
    private int takeIndex;

    /** 插入元素的位置 */
    private int putIndex;

    /** 队列中的元素总数 */
    private int count;

    /**
     * 指定队列大小的构造器
     *
     * @param capacity  队列大小
     */
    public BlockingQueue(int capacity) {if (capacity <= 0)
            throw new IllegalArgumentException();
        items = new Object[capacity];
    }

    /**
     * 入队操作
     *
     * @param e 待插入的对象
     */
    private void enqueue(Object e) {
        // 将对象 e 放入 putIndex 指向的位置
        items[putIndex] = e;

        // putIndex 向后移一位,如果已到末尾则返回队列开头(位置 0)
        if (++putIndex == items.length)
            putIndex = 0;

        // 增加元素总数
        count++;
    }

    /**
     * 出队操作
     *
     * @return  被弹出的元素
     */
    private Object dequeue() {
        // 取出 takeIndex 指向位置中的元素
        // 并将该位置清空
        Object e = items[takeIndex];
        items[takeIndex] = null;

        // takeIndex 向后移一位,如果已到末尾则返回队列开头(位置 0)
        if (++takeIndex == items.length)
            takeIndex = 0;

        // 减少元素总数
        count--;

        // 返回之前代码中取出的元素 e
        return e;
    }

    /**
     * 将指定元素插入队列
     *
     * @param e 待插入的对象
     */
    public void put(Object e) throws InterruptedException {while (true) {
            // 直到队列未满时才执行入队操作并跳出循环
            if (count != items.length) {
                // 执行入队操作,将对象 e 实际放入队列中
                enqueue(e);
                break;
            }

            // 队列已满的情况下休眠 200ms
            Thread.sleep(200L);
        }
    }

    /**
     * 从队列中弹出一个元素
     *
     * @return  被弹出的元素
     */
    public Object take() throws InterruptedException {while (true) {
            // 直到队列非空时才继续执行后续的出队操作并返回弹出的元素
            if (count != 0) {
                // 执行出队操作,将队列中的第一个元素弹出
                return dequeue();}

            // 队列为空的情况下休眠 200ms
            Thread.sleep(200L);
        }
    }

}

测验阻塞队列实现

既然已经有了阻塞队列的实现,那么我们就写一个测试程序来测试一下吧。下面是一个对阻塞队列进行并发的插入和弹出操作的测试程序,在这个程序中,会创建 2 个生产者线程向阻塞队列中插入数字 0~19;同时也会创建 2 个消费者线程从阻塞队列中弹出 20 个数字,并打印这些数字。而且在程序中也统计了整个程序的耗时,会在所有子线程执行完成之后打印出程序的总耗时。

这里我们期望这个测验程序能够以任意顺序输出 0~19 这 20 个数字,然后打印出程序的总耗时,那么实际执行情况会如何呢?

public class BlockingQueueTest {public static void main(String[] args) throws Exception {

        // 创建一个大小为 2 的阻塞队列
        final BlockingQueue q = new BlockingQueue(2);

        // 创建 2 个线程
        final int threads = 2;
        // 每个线程执行 10 次
        final int times = 10;

        // 线程列表,用于等待所有线程完成
        List<Thread> threadList = new ArrayList<>(threads * 2);
        long startTime = System.currentTimeMillis();

        // 创建 2 个生产者线程,向队列中并发放入数字 0 到 19,每个线程放入 10 个数字
        for (int i = 0; i < threads; ++i) {
            final int offset = i * times;
            Thread producer = new Thread(() -> {
                try {for (int j = 0; j < times; ++j) {q.put(new Integer(offset + j));
                    }
                } catch (Exception e) {e.printStackTrace();
                }
            });

            threadList.add(producer);
            producer.start();}

        // 创建 2 个消费者线程,从队列中弹出 20 次数字并打印弹出的数字
        for (int i = 0; i < threads; ++i) {Thread consumer = new Thread(() -> {
                try {for (int j = 0; j < times; ++j) {Integer element = (Integer) q.take();
                        System.out.println(element);
                    }
                } catch (Exception e) {e.printStackTrace();
                }
            });

            threadList.add(consumer);
            consumer.start();}

        // 等待所有线程执行完成
        for (Thread thread : threadList) {thread.join();
        }

        // 打印运行耗时
        long endTime = System.currentTimeMillis();
        System.out.println(String.format("总耗时:%.2fs", (endTime - startTime) / 1e3));
    }
}

在我的电脑上运行这段程序的输出为:

0
1
2
3
4
5
null
10
8
7
14
9
16
15
18
17
null

不仅是打印出了很多个null,而且打印出 17 行之后就不再打印更多数据,而且程序也就一直没有打印总耗时并结束了。为什么会发生这种情况呢?

原因就是在我们实现的这个阻塞队列中完全没有线程同步机制,所以同时并发进行的 4 个线程(2 个生产者和 2 个消费者)会同时执行阻塞队列的 put()take()方法。这就可能会导致各种各样并发执行顺序导致的问题,比如两个生产者同时对阻塞队列进行插入操作,有可能就会在 putIndex 没更新的情况下对同一下标位置又插入了一次数据,导致了数据还没被消费就被覆盖了;而两个消费者也可能会在 takeIndex 没更新的情况下又获取了一次已经被清空的位置,导致打印出了null。最后因为这些原因都有可能会导致消费者线程最后还没有弹出 20 个数字 count 就已经为 0 了,这时消费者线程就会一直处于阻塞状态无法退出了。

那么我们应该如何给阻塞队列加上线程同步措施,使它的运行不会发生错误呢?

一个线程安全的版本

使用互斥锁来保护队列操作

之前碰到的并发问题的核心就是多个线程同时对阻塞队列进行插入或弹出操作,那么我们有没有办法让同一时间只能有一个线程对阻塞队列进行操作呢?

也许很多读者已经想到了,我们最常用的一种并发控制方式就是 synchronized 关键字。通过 synchronized,我们可以让一段代码同一时间只能有一个线程进入;如果在同一个对象上通过synchronized 加锁,那么 put()take()两个方法可以做到同一时间只能有一个线程调用两个方法中的任意一个。比如如果有一个线程调用了 put() 方法插入元素,那么其他线程再调用 put() 方法或者 take() 就都会被阻塞直到前一个线程完成对 put() 方法的调用了。

在这里,我们只修改 put()take()方法,把这两个方法中对 enqueuedequeue的调用都包装到一个 synchronized (this) {...} 的语句块中,保证了同一时间只能有一个线程进入这两个语句块中的任意一个。如果对 synchronized 之类的线程同步机制还不熟悉的读者,建议先看一下这篇介绍多线程同步机制的文章《多线程中那些看不见的陷阱》再继续阅读之后的内容,相信会有事半功倍的效果。

    /**
     * 将指定元素插入队列
     *
     * @param e 待插入的对象
     */
    public void put(Object e) throws InterruptedException {while (true) {synchronized (this) {
                // 直到队列未满时才执行入队操作并跳出循环
                if (count != items.length) {
                    // 执行入队操作,将对象 e 实际放入队列中
                    enqueue(e);
                    break;
                }
            }

            // 队列已满的情况下休眠 200ms
            Thread.sleep(200L);
        }
    }

    /**
     * 从队列中弹出一个元素
     *
     * @return  被弹出的元素
     */
    public Object take() throws InterruptedException {while (true) {synchronized (this) {
                // 直到队列非空时才继续执行后续的出队操作并返回弹出的元素
                if (count != 0) {
                    // 执行出队操作,将队列中的第一个元素弹出
                    return dequeue();}
            }

            // 队列为空的情况下休眠 200ms
            Thread.sleep(200L);
        }
    }

再次测试

我们再来试一试这个新的阻塞队列实现,在我的电脑上测试程序的输出如下:

0
1
2
3
10
11
4
5
6
12
13
14
15
7
8
9
16
17
18
19
总耗时:1.81s

这下看起来结果就对了,而且多跑了几次也都能稳定输出所有 0~19 的 20 个数字。看起来非常棒,我们成功了,来给自己鼓个掌吧!

但是仔细那么一看,好像最后的耗时是不是有一些高了?虽然“1.81 秒”也不是太长的时间,但是好像一般计算机程序做这么一点事情只要一眨眼的功夫就能完成才对呀。为什么这个阻塞队列会这么慢呢?

一个更快的阻塞队列

让我们先来诊断一下之前的阻塞队列中到底是什么导致了效率的降低,因为 put()take()方法是阻塞队列的核心,所以我们自然从这两个方法看起。在这两个方法里,我们都看到了同一段代码 Thread.sleep(200L),这段代码会让put()take()方法分别在队列已满和队列为空的情况下进入一次固定的 200 毫秒的休眠,防止线程占用过多的 CPU 资源。但是如果队列在这 200 毫秒里发生了变化,那么线程也还是在休眠状态无法马上对变化做出响应。比如如果一个调用 put() 方法的线程因为队列已满而进入了 200 毫秒的休眠,那么即使队列已经被消费者线程清空了,它也仍然会忠实地等到 200 毫秒之后才会重新尝试向队列中插入元素,中间的这些时间就都被浪费了。

但是如果我们去掉这段休眠的代码,又会导致 CPU 的使用率过高的问题。那么有没有一种方法可以平衡两者的利弊,同时得到两种情况的好处又没有各自的缺点呢?

使用条件变量优化阻塞唤醒

为了完成上面这个困难的任务,既要马儿跑又要马儿不吃草。那么我们就需要有一种方法,既让线程进入休眠状态不再占用 CPU,但是在队列发生改变时又能及时地被唤醒来重试之前的操作了。既然用了对象锁synchronized,那么我们就找找有没有与之相搭配的同步机制可以实现我们的目标。

Object 类,也就是所有 Java 类的基类里,我们找到了三个有意思的方法 Object.wait()Object.notify()Object.notifyAll()。这三个方法是需要搭配在一起使用的,其功能与操作系统层面的 条件变量 类似。条件变量是这样的一种线程同步工具:

  1. 每个条件变量都会有一个对应的互斥锁,要调用条件变量的 wait() 方法,首先需要持有条件变量对应的这个互斥锁。之后,在调用条件变量的 wait() 方法时,首先会释放已持有的这个互斥锁,然后当前线程进入休眠状态,等待被 Object.notify() 或者 Object.notifyAll() 方法唤醒;
  2. 调用 Object.notify() 或者 Object.notifyAll() 方法可以唤醒因为 Object.wait() 进入休眠状态的线程,区别是 Object.notify() 方法只会唤醒一个线程,而 Object.notifyAll() 会唤醒所有线程。

因为我们之前的代码中通过 synchronized 获取了对应于 this 引用 的对象锁,所以自然也就要用 this.wait()this.notify()this.notifyAll() 方法来使用与这个对象锁对应的条件变量了。下面是使用条件变量改造后的 put()take()方法。还是和之前一样,我们首先以 put() 方法为例分析具体的改动。首先,我们去掉了最外层的 while 循环,然后我们把 Thread.sleep 替换为了 this.wait(),以此在队列已满时进入休眠状态,等待队列中的元素被弹出后再继续。在队列满足条件,入队操作成功后,我们通过调用this.notifyAll() 唤醒了可能在等待队列非空条件的调用 take() 的线程。take()方法的实现与 put() 也基本类似,只是操作相反。

    /**
     * 将指定元素插入队列
     *
     * @param e 待插入的对象
     */
    public void put(Object e) throws InterruptedException {synchronized (this) {if (count == items.length) {
                // 队列已满时进入休眠
                this.wait();}

            // 执行入队操作,将对象 e 实际放入队列中
            enqueue(e);

            // 唤醒所有休眠等待的进程
            this.notifyAll();}
    }

    /**
     * 从队列中弹出一个元素
     *
     * @return  被弹出的元素
     */
    public Object take() throws InterruptedException {synchronized (this) {if (count == 0) {
                // 队列为空时进入休眠
                this.wait();}

            // 执行出队操作,将队列中的第一个元素弹出
            Object e = dequeue();

            // 唤醒所有休眠等待的进程
            this.notifyAll();

            return e;
        }
    }

但是我们在测试程序运行之后发现结果好像又出现了问题,在我电脑上的输出如下:

0
19
null
null
null
null
null
null
null
null
null
18
null
null
null
null
null
null
null
null
总耗时:0.10s

虽然我们解决了耗时问题,现在的耗时已经只有 0.10s 了,但是结果中又出现了大量的null,我们的阻塞队列好像又出现了正确性问题。那么问题出在哪呢?建议读者可以先自己尝试分析一下,这样有助于大家积累解决多线程并发问题的能力。

while 循环判断条件是否满足

经过分析,我们看到,在调用 this.wait() 后,如果线程被 this.notifyAll() 方法唤醒,那么就会直接开始直接入队 / 出队操作,而不会再次检查 count 的值是否满足条件。而在我们的程序中,当队列为空时,可能会有很多消费者线程在等待插入元素。此时,如果有一个生产者线程插入了一个元素并调用了 this.notifyAll(),则所有消费者线程都会被唤醒,然后依次执行出队操作,那么第一个消费者线程之后的所有线程拿到的都将是 null 值。而且同时,在这种情况下,每一个执行完出队操作的消费者线程也同样会调用this.notifyAll() 方法,这样即使队列中已经没有元素了,后续进入等待的消费者线程仍然会被自己的同类所唤醒,消费根本不存在的元素,最终只能返回null

所以要解决这个问题,核心就是在线程从 this.wait() 中被唤醒时也仍然要重新检查一遍 count 值是否满足要求,如果 count 不满足要求,那么当前线程仍然调用 this.wait() 回到等待状态当中去继续休眠。而我们是没办法预知程序在第几次判断条件时可以得到满足条件的 count 值从而继续执行的,所以我们必须让程序循环执行“判断条件 -> 不满足条件继续休眠”这样的流程,直到 count 满足条件为止。那么我们就可以使用一个 while 循环来包裹 this.wait() 调用和对 count 的条件判断,以此达到这个目的。

下面是具体的实现代码,我们在其中把 count 条件(队列未满 / 非空)作为 while 条件,然后在 count 值还不满足要求的情况下调用 this.wait() 方法使当前线程进入等待状态继续休眠。

    /**
     * 将指定元素插入队列
     *
     * @param e 待插入的对象
     */
    public void put(Object e) throws InterruptedException {synchronized (this) {while (count == items.length) {
                // 队列已满时进入休眠
                this.wait();}

            // 执行入队操作,将对象 e 实际放入队列中
            enqueue(e);

            // 唤醒所有休眠等待的进程
            this.notifyAll();}
    }

    /**
     * 从队列中弹出一个元素
     *
     * @return  被弹出的元素
     */
    public Object take() throws InterruptedException {synchronized (this) {while (count == 0) {
                // 队列为空时进入休眠
                this.wait();}

            // 执行出队操作,将队列中的第一个元素弹出
            Object e = dequeue();

            // 唤醒所有休眠等待的进程
            this.notifyAll();

            return e;
        }
    }

再次运行我们的测试程序,在我的电脑上得到了如下的输出:

0
10
1
2
11
12
13
3
4
14
5
6
15
16
7
17
8
18
9
19
总耗时:0.11s

耗时只有 0.11s,而且结果也是正确的,看来我们得到了一个又快又好的阻塞队列实现。这是一个里程碑式的版本,我们实现了一个真正可以在程序代码中使用的阻塞队列,到这里可以说你已经学会了如何实现一个阻塞队列了,让我们为自己鼓个掌吧。

当时进度条出卖了我,这篇文章还有不少内容。既然我们已经学会如何实现一个真正可用的阻塞队列了,我们为什么还要继续看这么多内容呢?别慌,虽然我们已经实现了一个真正可用的版本,但是如果我们更进一步的话就可以实现一个 JDK 级别的高强度版本了,这听起来是不是非常的诱人?让我们继续我们的旅程吧。

一个更安全的版本

我们之前的版本中使用这些同步机制:synchronized (this)this.wait()this.notifyAll(),这些同步机制都和当前对象 this 有关。因为 synchronized (obj) 可以使用任意对象对应的对象锁,而 Object.wati()Object.notifyAll()方法又都是 public 方法。也就是说不止在阻塞队列类内部可以使用这个阻塞队列对象的对象锁及其对应的条件变量,在外部的代码中也可以任意地获取阻塞队列对象上的对象锁和对应的条件变量,那么就有可能发生外部代码滥用阻塞队列对象上的对象锁导致阻塞队列性能下降甚至是发生死锁的情况。那我们有没有什么办法可以让阻塞队列在这方面变得更安全呢?

使用显式锁

最直接的方式当然是请出 JDK 在 1.5 之后引入的代替 synchronized 关键字的显式锁 ReentrantLock 类了。ReentrantLock类是一个可重入互斥锁,互斥指的是和 synchronized 一样,同一时间只能有一个线程持有锁,其他获取锁的线程都必须等待持有锁的线程释放该锁。而可重入指的就是同一个线程可以重复获取同一个锁,如果在获取锁时这个锁已经被当前线程所持有了,那么这个获取锁的操作仍然会直接成功。

一般我们使用 ReentrantLock 的方法如下:

lock.lock();
try {做一些操作}
finally {lock.unlock();
}

上面的 lock 变量就是一个 ReentrantLock 类型的对象。在这段代码中,释放锁的操作 lock.unlock() 被放在了 finally 块中,这是为了保证线程在获取到锁之后,不论出现异常或者什么特殊情况都能保证正确地释放互斥锁。如果不这么做就可能会导致持有锁的线程异常退出后仍然持有该锁,其他需要获取同一个锁的线程就永远运行不了。

那么在我们的阻塞队列中应该如何用 ReentrantLock 类来改写呢?

首先,我们显然要为我们的阻塞队列类添加一个实例变量 lock 来保存用于在不同线程间实现互斥访问的 ReentrantLock 锁。然后我们要将原来的 synchronized(this) {...} 格式的代码修改为上面使用 ReentrantLock 进行互斥访问保护的实现形式,也就是 lock.lock(); try {...} finally {lock.unlock();} 这样的形式。

但是原来与 synchronized 所加的对象锁相对应的条件变量使用方法 this.wait()this.notifyAll()应该如何修改呢?ReentrantLock已经为你做好了准备,我们可以直接调用 lock.newCondition() 方法来创建一个与互斥锁 lock 相对应的条件变量。然后为了在不同线程中都能访问到这个条件变量,我们同样要新增一个实例变量 condition 来保存这个新创建的条件变量对象。然后我们原来使用的 this.wait() 就需要修改为 condition.await(),而this.notifyAll() 就修改为了condition.signalAll()

    /** 显式锁 */
    private final ReentrantLock lock = new ReentrantLock();

    /** 锁对应的条件变量 */
    private final Condition condition = lock.newCondition();
    
    /**
     * 将指定元素插入队列
     *
     * @param e 待插入的对象
     */
    public void put(Object e) throws InterruptedException {lock.lockInterruptibly();
        try {while (count == items.length) {
                // 队列已满时进入休眠
                // 使用与显式锁对应的条件变量
                condition.await();}

            // 执行入队操作,将对象 e 实际放入队列中
            enqueue(e);

            // 通过条件变量唤醒休眠线程
            condition.signalAll();} finally {lock.unlock();
        }
    }

    /**
     * 从队列中弹出一个元素
     *
     * @return  被弹出的元素
     */
    public Object take() throws InterruptedException {lock.lockInterruptibly();
        try {while (count == 0) {
                // 队列为空时进入休眠
                // 使用与显式锁对应的条件变量
                condition.await();}

            // 执行出队操作,将队列中的第一个元素弹出
            Object e = dequeue();

            // 通过条件变量唤醒休眠线程
            condition.signalAll();

            return e;
        } finally {lock.unlock();
        }
    }

到这里,我们就完成了使用显式锁 ReentrantLock 所需要做的所有改动了。整个过程中并不涉及任何逻辑的变更,我们只是把 synchronized (this) {...} 修改为了 lock.lock() try {...} finally {lock.unlock();},把this.wait() 修改为了 condition.await(),把this.notifyAll() 修改为了 condition.signalAll()。就这样,我们的锁和条件变量因为是private 字段,所以外部的代码就完全无法访问了,这让我们的阻塞队列变得更加安全,是时候可以提供给其他人使用了。

但是这个版本的阻塞队列仍然还有很大的优化空间,继续阅读下一篇文章,相信你就可以实现出 JDK 级别的阻塞队列了。

正文完
 0