共计 3191 个字符,预计需要花费 8 分钟才能阅读完成。
一、简介
什么是阻塞队列?咱们都晓得队列具备先进先出的特点,那么有阻塞个性(即队列满了阻塞生产者,队列空了阻塞消费者)的队列,称为阻塞队列。
阻塞队列被广泛应用于 生产者 - 消费者
模式中,在理论开发中,咱们常常应用 LinkedBlockingQueue 来作为阻塞队列,而为什么应用 LinkedBlockingQueue 而不应用 ArrayBlockingQueue,上面会通过对 LinkedBlockingQueue 的源码解读来解答这个问题。
二、LinkedBlockingQueue 的根本应用
2.1 根本构造
LinkedBlockingQueue 是基于 单向链表 的队列,结构图如下
2.2 罕用 api
BlockingQueue 接口提供了许多办法,罕用的如下:
办法 \ 解决形式 | 抛出异样 | 返回非凡值 | 始终阻塞 | 超时退出 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | put(e, time, unit) |
移除 | remove(e) | poll() | take() | poll(time, unit) |
查看 | element() | peek() | – | – |
能够看到,插入、移除元素的时候都各有 4 种办法,每种办法有对应的解决形式(抛出异样、返回非凡值、始终阻塞、超时退出
)
2.3 简略示例
上面是一个模仿生产者 - 消费者的简略示例,生产者线程定时向阻塞队列中插入音讯,消费者线程一直从阻塞队列中生产音讯
public class LinkedBlockingQueueMain {private final static BlockingQueue<String> queue = new LinkedBlockingQueue<>();
public static void main(String[] args) {new Thread(new Producer()).start();
new Thread(new Consumer()).start();}
static class Producer implements Runnable {
@Override
public void run() {
int count = 0;
while (true) {
try {queue.put("message" + count++);
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {e.printStackTrace();
}
}
}
}
static class Consumer implements Runnable {
@Override
public void run() {while (true) {
try {String msg = queue.take();
System.out.println("生产到音讯:" + msg);
} catch (InterruptedException e) {e.printStackTrace();
}
}
}
}
}
三、源码解读
3.1 要害属性
- int capacity:队列容量
- AtomicInteger count:以后队列所蕴含的元素个数
- Node<E> head:头节点
- Node<E> last:尾节点
- ReentrantLock takeLock:take 锁
- Condition notEmpty:队列不空(条件)
- ReentrantLock putLock:put 锁
- Condition notFull:队列不满(条件)
能够看到这里有两把锁,一把是移除元素用的 take 锁,另一把是插入元素用的 put 锁,这是与 ArrayBlockingQueue 的重要区别
3.2 插入元素
这里介绍插入元素的 put 办法,如果队列满了,则阻塞生产者,直到队列中有足够的地位来寄存元素
。大抵步骤如下:
- putLock 加锁
- 判断队列元素个数 是否曾经达到容量大小,如果是则调用 notFull 条件的 await 办法,使以后生产者线程进入期待状态,直到被唤醒。否则间接下一步
- 将以后元素插入队尾
- 判断队列如果还没满,则会唤醒在期待 notFull 条件的某个生产者线程
- putLock 解锁
- 如果本来队列为空,则唤醒在期待 notEmpty 条件的某个消费者线程
源码如下:
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// putLock 加锁
putLock.lockInterruptibly();
try {
// 判断队列元素个数是否曾经达到容量大小,是则调用 notFull 条件的 await 办法,使以后生产者线程进入期待状态
while (count.get() == capacity) {notFull.await();
}
// 当还有闲暇地位或以后线程被唤醒,将以后元素插入队尾
enqueue(node);
c = count.getAndIncrement();
// 判断队列如果还没满,则会唤醒在期待 notFull 条件的某个生产者线程
if (c + 1 < capacity)
notFull.signal();} finally {
// putLock 解锁
putLock.unlock();}
// 如果 c 等于 0,阐明队列中本来没有元素,则唤醒在期待 notEmpty 条件的某个消费者线程
if (c == 0)
signalNotEmpty();}
3.3 移除元素
这里介绍移除元素的 take 办法,如果队列为空,则阻塞消费者,直到队列中有元素可供生产
。大抵步骤如下:
- takeLock 加锁
- 判断 队列是否为空,如果是则调用 notEmpty 条件的 await 办法,使以后消费者线程进入期待状态,直到被唤醒。否则间接下一步
- 移出队头元素
- 判断队列如果还有元素,则会唤醒在期待 notEmpty 条件的某个消费者线程
- takeLock 解锁
- 如果本来队列是满的,则唤醒在期待 notFull 条件的某个生产者线程
源码如下:
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// takeLock 加锁
takeLock.lockInterruptibly();
try {
// 当队列为空,则调用 notEmpty 条件的 await 办法,使以后消费者线程进入期待状态
while (count.get() == 0) {notEmpty.await();
}
// 当队列元素不为空或以后消费者线程被唤醒,将队头元素出队
x = dequeue();
c = count.getAndDecrement();
// 如果移除队头元素后队列中还有其余元素,则会唤醒在期待 notEmpty 条件的某个消费者线程
if (c > 1)
notEmpty.signal();} finally {
// takeLock 解锁
takeLock.unlock();}
// 如果原来队列是满的,则唤醒在期待 notFull 条件的某个生产者线程
if (c == capacity)
signalNotFull();
return x;
}
四、总结
阻塞队列在并发编程中利用十分宽泛,罕用于生产者 - 消费者模式,在队列满或者空的状况下,能够阻塞生产者或者消费者,直到队列不满或者不空。
理论开发过程中个别应用 LinkedBlockingQueue 而不是 ArrayBlockingQueue,因为 其外部领有两把锁 putLock 和 takeLock,意味着在生产者在生产数据的同时,消费者能够生产数据
,而 ArrayBlockingQueue 外部只有一把锁,生产者在生产数据的同时,消费者无奈生产数据。