欢送来到《并发王者课》,本文是该系列文章中的第18篇。
在线程的同步中,阻塞队列是一个绕不过来的话题,它是同步器底层的要害。所以,咱们在本文中将为你介绍阻塞队列的基本原理,以理解它的工作机制和它在Java中的实现。本文略微有点长,倡议先理解纲要再细看章节。
一、阻塞队列介绍
在生活中,置信你肯定见过下图的三三两两,也见过其中的秩序井然。凌乱,是失控的开始。想想看,在没有秩序的状况下,拥挤的人流蜂拥而上非常危险,轻则挤出一身臭汗,重则造成踩踏事变。而秩序,则让状况免于凌乱,排好队大家都难受。
面对人流,咱们通过排队解决凌乱。而面对多线程,咱们也通过队列让线程间免于凌乱,这就是阻塞队列为何而存在。
所谓阻塞队列,你能够了解它是这样的一种队列:
- 当线程试着往队列里放数据时,如果它曾经满了,那么线程将进入期待;
- 而当线程试着从队列里取数据时,如果它曾经空了,那么线程将进入期待。
上面这张图展现了多线程是如何通过阻塞队列进行合作的:
从图中能够看到,对于阻塞队列数据的读写并不局限于单个线程,往往存在多个线程的竞争。
二、实现简略的阻塞队列
接下来咱们先抛开JUC中简单的阻塞队列,来设计一个简略的阻塞队列,以理解它的核心思想。
在上面的阻塞队列中,咱们设计一个队列queue
,并通过limit
字段限定它的容量。enqueue()
办法用于向队列中放入数据,如果队列已满则期待;而dequeue()
办法则用于从数据中取出数据,如果队列为空则期待。
public class BlockingQueue { private final List<Object> queue = new LinkedList<>(); private final int limit; public BlockingQueue(int limit) { this.limit = limit; } public synchronized void enqueue(Object item) throws InterruptedException { while (this.queue.size() == this.limit) { print("队列已满,期待中..."); wait(); } this.queue.add(item); if (this.queue.size() == 1) { notifyAll(); } print(item, "曾经放入!"); } public synchronized Object dequeue() throws InterruptedException { while (this.queue.size() == 0) { print("队列空的,期待中..."); wait(); } if (this.queue.size() == this.limit) { notifyAll(); } Object item = this.queue.get(0); print(item, "曾经拿到!"); return this.queue.remove(0); } public static void print(Object... args) { StringBuilder message = new StringBuilder(getThreadName() + ":"); for (Object arg : args) { message.append(arg); } System.out.println(message); } public static String getThreadName() { return Thread.currentThread().getName(); }}
定义lanLingWang
线程向队列中放入数据,niumo
线程从队列中取出数据。
public static void main(String[] args) { BlockingQueue blockingQueue = new BlockingQueue(1); Thread lanLingWang = new Thread(() -> { try { String[] items = { "A", "B", "C", "D", "E" }; for (String item: items) { Thread.sleep(500); blockingQueue.enqueue(item); } } catch (InterruptedException e) { e.printStackTrace(); } }); lanLingWang.setName("兰陵王"); Thread niumo = new Thread(() -> { try { while (true) { blockingQueue.dequeue(); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); lanLingWang.setName("兰陵王"); niumo.setName("牛魔王"); lanLingWang.start(); niumo.start(); }
运行后果如下:
牛魔王:队列空的,期待中...兰陵王:A曾经放入!牛魔王:A曾经拿到!兰陵王:B曾经放入!牛魔王:B曾经拿到!兰陵王:C曾经放入!兰陵王:队列已满,期待中...牛魔王:C曾经拿到!兰陵王:D曾经放入!兰陵王:队列已满,期待中...牛魔王:D曾经拿到!兰陵王:E曾经放入!牛魔王:E曾经拿到!牛魔王:队列空的,期待中...
从后果中能够看到,设计的阻塞队列曾经能够无效工作,你能够认真地品一品输入的后果。当然,这个阻塞是极其简略的,在上面一节中,咱们将介绍Java中的阻塞队列设计。
三、Java中的BlockingQueue
Java中的阻塞队列有两个外围接口:BlockingQueue和BlockingDeque,相干的接口实现设继承关系如下图所示。相比于上一节中咱们自定义的阻塞队列,Java中的实现要简单很多。不过,你不用为此放心,了解阻塞队列最重要的是了解它的思维和实现的思路,况且Java中的实现其实很有意思,读起来也比拟轻松。
从图中能够看出,BlockingQueue接口继承了Queue接口和Collection接口,并有LinkedBlockingQueue和ArrayBlockingQueue两种实现。这里有个有意思的中央,继承Queue接口很容易了解,能够为什么要继承Collection接口?先卖个关子,你能够思考一会,稍后会给出答案。
1. 外围办法
BlockingQueue中义了对于阻塞队列所须要的一系列办法,它们彼此之间看起来很像,从外表上看不出显著的差异。对于这些办法,你不用死记硬背,下图的表格中将这些办法分为了A、B、C、D这四种类型,分类之后再去了解它们会容易很多:
类型 | A 抛出异样 | B 返回特定值 | C 阻塞 | D 超时限定 |
---|---|---|---|---|
Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
Remove | remove() | poll() | take( ) | poll(time, unit) |
Examine | Element() | peek() | -- | -- |
其中局部要害办法的解释如下:
add(E e)
:在不违反容量限度的前提下,向队列中插入数据。如果胜利,返回true,否则抛出异样;offer(E e)
:在不违反容量限度的前提下,向队列中插入数据。如果胜利,返回true
,否则返回false
;offer(E e, long timeout, TimeUnit unit)
:如果队列中没有足够的空间,将期待一段时间;put(E e)
:在不违反容量限度的前提下,向队列中插入数据。如果没有足够的空间,将进入期待;poll(long timeout, TimeUnit unit)
:从队列的头部获取数据,并移除数据。如果没有数据的话,将会期待指定的工夫;take()
:从队列的头部获取数据并移除。如果没有可用数据,将进入期待
将这些办法填入后面的那张图,它应该长这样:
2. LinkedBlockingQueue
LinkedBlockingQueue实现了BlockingQueue接口,听从先进先出(FIFO)的准则,提供了可选的有界阻塞队列( Optionally Bounded )的能力,并且是线程平安的。
外围数据结构
int capacity
: 设定队列容量;Node<E> head
: 队列的头部元素;Node<E> last
: 队列的尾部元素;AtomicInteger count
: 队列中元素的总数统计。
LinkedBlockingQueue的数据结构并不简单,不过须要留神的是,数据结构中并不蕴含List,仅有head
和last
两个Node,设计上比拟奇妙。
外围结构
LinkedBlockingQueue()
: 空结构;LinkedBlockingQueue(int capacity)
: 指定容量结构。
线程安全性
ReentrantLock takeLock
: 获取元素时的锁;ReentrantLock putLock
: 写入元素时的锁。
留神,LinkedBlockingQueue有两把锁,读取和写入的锁是拆散的!这和上面的ArrayBlockingQueue并不相同。
上面截取了LinkedBlockingQueue中读写的局部代码,值得你认真品一品。品的时候,要重点关注两把锁的应用和读写时数据结构是如何变动的。
- 队列插入示例代码剖析
public boolean add(E e) { addLast(e); return true; } public void addLast(E e) { if (!offerLast(e)) throw new IllegalStateException("Deque full"); } public boolean offerFirst(E e) { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; lock.lock(); try { return linkFirst(node); } finally { lock.unlock(); } }
- 队列读取示例代码剖析
public E poll(long timeout, TimeUnit unit) throws InterruptedException { return pollFirst(timeout, unit); }public E pollFirst(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { E x; while ( (x = unlinkFirst()) == null) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return x; } finally { lock.unlock(); } }
最初说下LinkedBlockingQueue为什么要继承Collection接口。咱们晓得,Collection接口有remove()
这样的移除办法,而这些办法在队列中也是有应用场景的。比方,你把一个数据谬误地放入了队列,或者你须要移除曾经生效的数据,那么Collection的一些办法就派上了用场。
3. ArrayBlockingQueue
ArrayBlockingQueue是BlockingQueue接口的另外一种实现,它与LinkedBlockingQueue在设计指标上的的要害不同,在于它是有界的。
外围数据结构
Object[] items
: 队列元素汇合;int takeIndex
: 下次获取数据时的索引地位;int putIndex
: 下次写入数据时的索引地位;int count
: 队列总量计数。
从数据结构中能够看出,ArrayBlockingQueue应用的是数组,而数组是有界的。
外围结构
ArrayBlockingQueue(int capacity)
: 限定容量的结构;ArrayBlockingQueue(int capacity, boolean fair)
: 限定容量和公平性,默认是不偏心的;ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)
:带有初始化队列元素的结构。
线程安全性
ReentrantLock lock
:队列读取和写入的锁。
在读写锁方面,后面曾经说过,LinkedBlockingQueue和ArrayBlockingQueue是不同的,ArrayBlockingQueue只有一把锁,读写用的都是它。
- 队列写入示例代码剖析
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
上面截取了ArrayBlockingQueue中读写的局部代码,值得你认真品一品。品的时候,要重点关注读写锁的应用和读写时数据结构是如何变动的。
- 队列读取示例代码剖析
public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
四、Java中的BlockingDeque
在Java中,BlockingDeque与BlockingQueue是一对孪生兄弟似的存在,它们长得切实太像了,不留神的话很容易混同。
然而,BlockingDeque与BlockingQueue外围不同在于,BlockingQueue只可能从尾部写入、从头部读取,应用上很有限度。而BlockingDeque则反对从任意端读写,在读写时能够指定头部和尾部,丰盛了阻塞队列的应用场景。
1. 外围办法
相较于BlockingQueue,BlockingDeque的办法显然要更丰盛一些,毕竟它反对了双端的读写。然而,丰盛归丰盛,在类型上依然和BlockingQueue是统一的,你依然能够参考下面的A、B、C、D四种类型来分类了解。为了节约篇幅,咱们这里就不再列举,只选取了其中的局部办法作了解释:
add(E e)
:在不违反容量限度的前提下,在对列的尾部插入数据;addFirst(E e)
:从头部插入数据,容量不够就抛错;addLast(E e)
:从尾部插入数据,容量不够就抛错;getFirst()
:从头部读取数据;getLast()
:从尾部读取数据,但不会移除数据;offer(E e)
:写入数据;offerFirst(E e)
:从头部写入数据。
将BlockingDeue放入后面的那张图,就是这样:
2. LinkedBlockingDeue
LinkedBlockingDeue是BlockingDeque的外围实现。
外围数据结构
int capacity
:容量设置;Node<E> head
:队列头部;Node<E> last
:队列尾部;int count
:队列计数。
外围结构
LinkedBlockingDeque()
: 空的结构;LinkedBlockingDeque(int capacity)
: 指定容量的结构;LinkedBlockingDeque(Collection<? extends E> c)
:结构时初始化队列。
线程安全性
ReentrantLock lock
:读写锁。留神,读写用的是同一把锁。
上面截取了LinkedBlockingDeue中读写的局部代码,值得你认真品一品。品的时候,要重点关注读写锁的应用和读写时数据结构是如何变动的
- 队列插入示例代码剖析
public void addFirst(E e) { if (!offerFirst(e)) throw new IllegalStateException("Deque full");}public boolean offerFirst(E e) { if (e == null) throw new NullPointerException(); Node < E > node = new Node < E > (e); final ReentrantLock lock = this.lock; lock.lock(); try { return linkFirst(node); } finally { lock.unlock(); }}
- 队列读取示例代码剖析
public E pollFirst() { final ReentrantLock lock = this.lock; lock.lock(); try { return unlinkFirst(); } finally { lock.unlock(); }}
小结
以上就是对于阻塞队列的全部内容,相较于后面的系列文章,这次的内容明显增加了很多。看起来很简略,然而不要小瞧它。了解阻塞队列,首先要了解它所要解决的问题,以及它的接口设计。接口的设计往往示意的是它所提供的外围能力,所以了解了接口的设计,就胜利了一半。
在Java中,从接口层面,阻塞队列分为BlockingQueue和BlockingDeque的两大类,其次要差别在于双端读写的限度不同。其中,BlockingQueue有LinkedBlockingDeue和ArrayBlockingQueue两种要害实现,而BlockingDeque则有LinkedBlockingDeue实现。
注释到此结束,祝贺你又上了一颗星✨
夫子的试炼
- 从数据机构、队列的初始化、锁、性能等方面比拟LinkedBlockingDeue和ArrayBlockingQueue的不同。
延长浏览与参考资料
- Talk about LinkedBlockingQueue
- Blocking Queues
- 《并发王者课》纲要与更新进度总览
对于作者
关注公众号【技术八点半】,及时获取文章更新。传递有品质的技术文章,记录平凡人的成长故事,偶然也聊聊生存和现实。晚上8:30推送作者品质原创,早晨20:30推送行业深度好文。
如果本文对你有帮忙,欢送点赞、关注、监督,咱们一起从青铜到王者。