乐趣区

关于java:并发王者课铂金05致胜良器无处不在的阻塞队列究竟是何面目

欢送来到《并发王者课》,本文是该系列文章中的 第 18 篇

在线程的同步中,阻塞队列 是一个绕不过来的话题,它是同步器底层的要害。所以,咱们在本文中将为你介绍阻塞队列的基本原理,以理解它的工作机制和它在 Java 中的实现。本文略微有点长,倡议先理解纲要再细看章节。

一、阻塞队列介绍

在生活中,置信你肯定见过下图的三三两两,也见过其中的秩序井然。凌乱,是失控的开始 。想想看,在没有秩序的状况下,拥挤的人流蜂拥而上非常危险,轻则挤出一身臭汗,重则造成踩踏事变。 而秩序,则让状况免于凌乱,排好队大家都难受

面对人流,咱们通过排队解决凌乱。而面对多线程,咱们也通过队列让线程间免于凌乱,这就是阻塞队列为何而存在。

所谓阻塞队列,你能够了解它是这样的一种队列:

  • 当线程试着往队列里放数据时,如果它曾经满了,那么线程将进入期待
  • 而当线程试着从队列里取数据时,如果它曾经空了,那么线程将进入期待

上面这张图展现了多线程是如何通过阻塞队列进行合作的:

从图中能够看到,对于阻塞队列数据的读写并不局限于单个线程,往往存在多个线程的竞争。

二、实现简略的阻塞队列

接下来咱们先抛开 JUC 中简单的阻塞队列,来设计一个简略的阻塞队列,以理解它的核心思想。

在上面的阻塞队列中,咱们设计一个队列 queue,并通过limit 字段限定它的容量。enqueue()办法用于向队列中放入数据,如果队列已满则期待;而 dequeue() 办法则用于从数据中取出数据,如果队列为空则期待。

public class BlockingQueue {private final List<Object> queue = new LinkedList<>();
    private final int limit;

    public BlockingQueue(int limit) {this.limit = limit;}

    public synchronized void enqueue(Object item) throws InterruptedException {while (this.queue.size() == this.limit) {print("队列已满,期待中...");
            wait();}
        this.queue.add(item);
        if (this.queue.size() == 1) {notifyAll();
        }
        print(item, "曾经放入!");
    }


    public synchronized Object dequeue() throws InterruptedException {while (this.queue.size() == 0) {print("队列空的,期待中...");
            wait();}
        if (this.queue.size() == this.limit) {notifyAll();
        }
        Object item = this.queue.get(0);
        print(item, "曾经拿到!");
        return this.queue.remove(0);
    }

    public static void print(Object... args) {StringBuilder message = new StringBuilder(getThreadName() + ":");
        for (Object arg : args) {message.append(arg);
        }
        System.out.println(message);
    }

    public static String getThreadName() {return Thread.currentThread().getName();}
}

定义 lanLingWang 线程向队列中放入数据,niumo线程从队列中取出数据。

  public static void main(String[] args) {BlockingQueue blockingQueue = new BlockingQueue(1);
    Thread lanLingWang = new Thread(() -> {
      try {String[] items = {"A", "B", "C", "D", "E"};
        for (String item: items) {Thread.sleep(500);
          blockingQueue.enqueue(item);
        }
      } catch (InterruptedException e) {e.printStackTrace();
      }
    });
    lanLingWang.setName("兰陵王");
    Thread niumo = new Thread(() -> {
      try {while (true) {blockingQueue.dequeue();
          Thread.sleep(1000);
        }
      } catch (InterruptedException e) {e.printStackTrace();
      }
    });
    lanLingWang.setName("兰陵王");
    niumo.setName("牛魔王");

    lanLingWang.start();
    niumo.start();}

运行后果如下:

牛魔王: 队列空的,期待中...
兰陵王:A 曾经放入!牛魔王:A 曾经拿到!兰陵王:B 曾经放入!牛魔王:B 曾经拿到!兰陵王:C 曾经放入!兰陵王: 队列已满,期待中...
牛魔王:C 曾经拿到!兰陵王:D 曾经放入!兰陵王: 队列已满,期待中...
牛魔王:D 曾经拿到!兰陵王:E 曾经放入!牛魔王:E 曾经拿到!牛魔王: 队列空的,期待中...

从后果中能够看到,设计的阻塞队列曾经能够无效工作,你能够认真地品一品输入的后果。当然,这个阻塞是极其简略的,在上面一节中,咱们将介绍 Java 中的阻塞队列设计。

三、Java 中的 BlockingQueue

Java 中的阻塞队列有两个外围接口:BlockingQueueBlockingDeque,相干的接口实现设继承关系如下图所示。相比于上一节中咱们自定义的阻塞队列,Java 中的实现要简单很多。不过,你不用为此放心, 了解阻塞队列最重要的是了解它的思维和实现的思路,况且 Java 中的实现其实很有意思,读起来也比拟轻松

从图中能够看出,BlockingQueue 接口继承了 Queue 接口和 Collection 接口,并有 LinkedBlockingQueue 和 ArrayBlockingQueue 两种实现。这里有个有意思的中央,继承 Queue 接口很容易了解,能够为什么要继承 Collection 接口?先卖个关子,你能够思考一会,稍后会给出答案

1. 外围办法

BlockingQueue 中义了对于阻塞队列所须要的一系列办法,它们彼此之间看起来很像,从外表上看不出显著的差异。对于这些办法,你不用死记硬背,下图的表格中将这些办法分为了 A、B、C、D 这四种类型,分类之后再去了解它们会容易很多:

类型 A 抛出异样 B 返回特定值 C 阻塞 D 超时限定
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine Element() peek()

其中局部要害办法的解释如下:

  • add(E e):在不违反容量限度的前提下,向队列中插入数据。如果胜利,返回 true,否则抛出异样
  • offer(E e):在不违反容量限度的前提下,向队列中插入数据。如果胜利,返回true,否则返回false
  • offer(E e, long timeout, TimeUnit unit):如果队列中没有足够的空间,将期待一段时间;
  • put(E e):在不违反容量限度的前提下,向队列中插入数据。如果没有足够的空间,将进入期待
  • poll(long timeout, TimeUnit unit):从队列的头部获取数据,并移除数据。如果没有数据的话,将会期待指定的工夫;
  • take():从队列的头部获取数据并移除。如果没有可用数据,将进入期待

将这些办法填入后面的那张图,它应该长这样:

2. LinkedBlockingQueue

LinkedBlockingQueue 实现了 BlockingQueue 接口,听从先进先出(FIFO)的准则,提供了可选的有界阻塞队列(Optionally Bounded)的能力,并且是线程平安的。

  • 外围数据结构

    • int capacity: 设定队列容量;
    • Node<E> head: 队列的头部元素;
    • Node<E> last: 队列的尾部元素;
    • AtomicInteger count: 队列中元素的总数统计。

LinkedBlockingQueue 的数据结构并不简单,不过须要留神的是,数据结构中并不蕴含 List,仅有 headlast两个 Node,设计上比拟奇妙。

  • 外围结构

    • LinkedBlockingQueue(): 空结构;
    • LinkedBlockingQueue(int capacity): 指定容量结构。
  • 线程安全性

    • ReentrantLock takeLock: 获取元素时的锁;
    • ReentrantLock putLock: 写入元素时的锁。

留神,LinkedBlockingQueue 有两把锁,读取和写入的锁是拆散的!这和上面的 ArrayBlockingQueue 并不相同。

上面截取了 LinkedBlockingQueue 中读写的局部代码,值得你认真品一品。品的时候,要重点关注两把锁的应用和读写时数据结构是如何变动的

  • 队列插入示例代码剖析
 public boolean add(E e) {addLast(e);
        return true;
    }

    public void addLast(E e) {if (!offerLast(e))
            throw new IllegalStateException("Deque full");
    }

    public boolean offerFirst(E e) {if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {return linkFirst(node);
        } finally {lock.unlock();
        }
    }
  • 队列读取示例代码剖析
 public E poll(long timeout, TimeUnit unit) throws InterruptedException {return pollFirst(timeout, unit);
    }
public E pollFirst(long timeout, TimeUnit unit)
        throws InterruptedException {long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            E x;
            while ((x = unlinkFirst()) == null) {if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return x;
        } finally {lock.unlock();
        }
    }

最初说下 LinkedBlockingQueue 为什么要继承 Collection 接口。咱们晓得,Collection 接口有 remove() 这样的移除办法,而这些办法在队列中也是有应用场景的。比方,你把一个数据谬误地放入了队列,或者你须要移除曾经生效的数据,那么 Collection 的一些办法就派上了用场。

3. ArrayBlockingQueue

ArrayBlockingQueue 是 BlockingQueue 接口的另外一种实现,它与 LinkedBlockingQueue 在设计指标上的的要害不同,在于它是有界的

  • 外围数据结构

    • Object[] items: 队列元素汇合;
    • int takeIndex: 下次获取数据时的索引地位;
    • int putIndex: 下次写入数据时的索引地位;
    • int count: 队列总量计数。

从数据结构中能够看出,ArrayBlockingQueue 应用的是数组,而数组是有界的。

  • 外围结构

    • ArrayBlockingQueue(int capacity):限定容量的结构;
    • ArrayBlockingQueue(int capacity, boolean fair):限定容量和公平性,默认是不偏心的;
    • ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c):带有初始化队列元素的结构。
  • 线程安全性

    • ReentrantLock lock:队列读取和写入的锁。

在读写锁方面,后面曾经说过,LinkedBlockingQueue 和 ArrayBlockingQueue 是不同的,ArrayBlockingQueue 只有一把锁,读写用的都是它。

  • 队列写入示例代码剖析
 public boolean offer(E e) {checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {if (count == items.length)
                return false;
            else {enqueue(e);
                return true;
            }
        } finally {lock.unlock();
        }
    }
    
   private void enqueue(E x) {// assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();}

上面截取了 ArrayBlockingQueue 中读写的局部代码,值得你认真品一品。品的时候,要重点关注读写锁的应用和读写时数据结构是如何变动的

  • 队列读取示例代码剖析
 public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);
     final ReentrantLock lock = this.lock;
     lock.lockInterruptibly();
     try {while (count == 0) {if (nanos <= 0)
                 return null;
             nanos = notEmpty.awaitNanos(nanos);
         }
         return dequeue();} finally {lock.unlock();
     }
 }

 private E dequeue() {// assert lock.getHoldCount() == 1;
     // assert items[takeIndex] != null;
     final Object[] items = this.items;
     @SuppressWarnings("unchecked")
     E x = (E) items[takeIndex];
     items[takeIndex] = null;
     if (++takeIndex == items.length)
         takeIndex = 0;
     count--;
     if (itrs != null)
         itrs.elementDequeued();
     notFull.signal();
     return x;
 }

四、Java 中的 BlockingDeque

在 Java 中,BlockingDeque 与 BlockingQueue 是一对孪生兄弟似的存在,它们长得切实太像了,不留神的话很容易混同。

然而,BlockingDeque 与 BlockingQueue 外围不同在于,BlockingQueue 只可能从尾部写入、从头部读取,应用上很有限度。而 BlockingDeque 则反对从任意端读写,在读写时能够指定头部和尾部,丰盛了阻塞队列的应用场景

1. 外围办法

相较于 BlockingQueue,BlockingDeque 的办法显然要更丰盛一些,毕竟它反对了双端的读写。然而,丰盛归丰盛,在类型上依然和 BlockingQueue 是统一的,你依然能够参考下面的 A、B、C、D 四种类型来分类了解。为了节约篇幅,咱们这里就不再列举,只选取了其中的局部办法作了解释:

  • add(E e):在不违反容量限度的前提下,在对列的尾部插入数据;
  • addFirst(E e):从头部插入数据,容量不够就抛错;
  • addLast(E e):从尾部插入数据,容量不够就抛错;
  • getFirst():从头部读取数据;
  • getLast():从尾部读取数据,但不会移除数据;
  • offer(E e):写入数据;
  • offerFirst(E e):从头部写入数据。

将 BlockingDeue 放入后面的那张图,就是这样:

2. LinkedBlockingDeue

LinkedBlockingDeue 是 BlockingDeque 的外围实现。

  • 外围数据结构

    • int capacity:容量设置;
    • Node<E> head:队列头部;
    • Node<E> last:队列尾部;
    • int count:队列计数。
  • 外围结构

    • LinkedBlockingDeque():空的结构;
    • LinkedBlockingDeque(int capacity):指定容量的结构;
    • LinkedBlockingDeque(Collection<? extends E> c):结构时初始化队列。
  • 线程安全性

    • ReentrantLock lock:读写锁。留神,读写用的是同一把锁

上面截取了 LinkedBlockingDeue 中读写的局部代码,值得你认真品一品。品的时候,要重点关注读写锁的应用和读写时数据结构是如何变动的

  • 队列插入示例代码剖析
public void addFirst(E e) {if (!offerFirst(e))
        throw new IllegalStateException("Deque full");
}
public boolean offerFirst(E e) {if (e == null) throw new NullPointerException();
    Node < E > node = new Node < E > (e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {return linkFirst(node);
    } finally {lock.unlock();
    }
}
  • 队列读取示例代码剖析
public E pollFirst() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {return unlinkFirst();
    } finally {lock.unlock();
    }
}

小结

以上就是对于阻塞队列的全部内容,相较于后面的系列文章,这次的内容明显增加了很多。看起来很简略,然而不要小瞧它 。了解阻塞队列,首先要了解它所要解决的问题,以及它的接口设计。 接口的设计往往示意的是它所提供的外围能力,所以了解了接口的设计,就胜利了一半

在 Java 中,从接口层面,阻塞队列分为 BlockingQueue 和 BlockingDeque 的两大类,其次要差别在于双端读写的限度不同。其中,BlockingQueue 有 LinkedBlockingDeue 和 ArrayBlockingQueue 两种要害实现,而 BlockingDeque 则有 LinkedBlockingDeue 实现。

注释到此结束,祝贺你又上了一颗星✨

夫子的试炼

  • 从数据机构、队列的初始化、锁、性能等方面比拟 LinkedBlockingDeue 和 ArrayBlockingQueue 的不同。

延长浏览与参考资料

  • Talk about LinkedBlockingQueue
  • Blocking Queues
  • 《并发王者课》纲要与更新进度总览

对于作者

关注公众号【技术八点半】,及时获取文章更新。传递有品质的技术文章,记录平凡人的成长故事,偶然也聊聊生存和现实。晚上 8:30 推送作者品质原创,早晨 20:30 推送行业深度好文。

如果本文对你有帮忙,欢送 点赞 关注 监督 ,咱们一起 从青铜到王者

退出移动版