突击并发编程 JUC 系列演示代码地址:
https://github.com/mtcarpenter/JavaTutorial
什么是阻塞队列
阻塞队列(BlockingQueue)是一个反对两个附加操作的队列。这两个附加的操作反对阻塞的插入和移除办法。
- 反对阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
- 反对阻塞的移除办法:意思是在队列为空时,获取元素的线程会期待队列变为非空。
阻塞队列罕用于生产者和消费者的场景,生产者是向队列里增加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来寄存元素、消费者用来获取元素的容器。
插入和移除操作的 4 种解决形式
办法 / 解决形式 | 抛出异样 | 返回非凡值 | 始终阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除办法 | remove() | poll() | take() | poll(time,unit) |
查看办法 | element() | peek() | 不可用 | 不可用 |
- 抛出异样:当队列满时,如果再往队列里插入元素,会抛出
IllegalStateException
(”Queue full”)异样。当队列空时,从队列里获取元素会抛出NoSuchElementException
异样。 - 返回非凡值:当往队列插入元素时,会返回元素是否插入胜利,胜利返回 true。如果是移除办法,则是从队列里取出一个元素,如果没有则返回 null。
- 始终阻塞:当阻塞队列满时,如果生产者线程往队列里 put 元素,队列会始终阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里 take 元素,队列会阻塞住消费者线程,直到队列不为空。
- 超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的工夫,生产者线程就会退出。
如果是无界阻塞队列,队列不可能会呈现满的状况,所以应用 put 或 offer 办法永远不会被阻塞,而且应用 offer 办法时,该办法永远返回 true。
ArrayBlockingQueue
ArrayBlockingQueue
是一个用数组实现的有界阻塞队列。此队列依照先进先出(FIFO)的准则对元素进行排序。
默认状况下不保障线程偏心的拜访队列,所谓偏心拜访队列是指阻塞的线程,能够依照阻塞的先后顺序拜访队列,即先阻塞线程先拜访队列。非公平性是对先期待的线程是非偏心的,当队列可用时,阻塞的线程都能够抢夺拜访队列的资格,有可能先阻塞的线程最初才拜访队列。为了保障公平性,通常会升高吞吐量。
阻塞式写办法
在 ArrayBlockingQueue
中提供了两个阻塞式写办法,别离如下(在该队列中,无论是阻塞式写办法还是非阻塞式写办法,都不容许写入 null)。
void put(E e)
:向队列的尾部插入新的数据,当队列已满时调用该办法的线程会进入阻塞,直到有其余线程对该线程执行了中断操作,或者队列中的元素被其余线程生产。boolean offer(E e, long timeout, TimeUnit unit)
:向队列尾部写入新的数据,当队列已满时执行该办法的线程在指定的工夫单位内将进入阻塞,直到到了指定的超时工夫后,或者在此期间有其余线程对队列数据进行了生产。
put() 办法示例
public class ArrayBlockingQueueExample1 {public static void main(String[] args) {ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
try {queue.put("class 1");
queue.put("class 2");
queue.put("class 3");
// 超过指定得容量以后线程阻塞
queue.put("class 4");
} catch (InterruptedException e) {e.printStackTrace();
}
}
}
非阻塞式写办法
当队列已满时写入数据,如果不想使得以后线程进入阻塞,那么就能够应用非阻塞式的写操作方法。
boolean add(E e)
:向队列尾部写入新的数据,当队列已满时不会进入阻塞,然而该办法会抛出队列已满的异样。boolean offer(E e)
:向队列尾部写入新的数据,当队列已满时不会进入阻塞,并且会立刻返回 false。
add() 办法示例
public class ArrayBlockingQueueExample2 {public static void main(String[] args) {ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
queue.add("class 1");
queue.add("class 2");
queue.add("class 3");
// 超过指定容量 抛出异样
queue.add("class 4");
}
}
// 抛出异样
阻塞式读办法
E take()
:从队列头部获取数据,并且该数据会从队列头部移除,当队列为空时执行 take 办法的线程将进入阻塞,直到有其余线程写入新的数据,或者以后线程被执行了中断操作。E poll(long timeout, TimeUnit unit)
:从队列头部获取数据并且该数据会从队列头部移除,如果队列中没有任何元素时则执行该办法,以后线程会阻塞指定的工夫,直到在此期间有新的数据写入,或者阻塞的以后线程被其余线程中断,当线程因为超时退出阻塞时,返回值为 null。
take() 办法示例
public class ArrayBlockingQueueExample3 {public static void main(String[] args) {ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
queue.add("class 1");
queue.add("class 2");
queue.add("class 3");
try {
// 取出对头元素
System.out.println(queue.take());
} catch (InterruptedException e) {e.printStackTrace();
}
// 队列大小
System.out.println(queue.size());
}
}
//class 1
// 2
非阻塞式读办法
E poll()
:从队列头部获取数据并且该数据会从队列头部移除,当队列为空时,该办法不会使得以后线程进入阻塞,而是返回 null 值。E peek()
:当队列为空时,该办法不会使得以后线程进入阻塞,而是返回 null 值。
public class ArrayBlockingQueueExample4 {public static void main(String[] args) {ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
// 队列无元素 间接返回 null
System.out.println(queue.poll());
System.out.println(queue.peek());
}
}
// null
// null
局部源码
public void put(E e) throws InterruptedException {
// 查看元素
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 获取锁
lock.lockInterruptibly();
try {
// 元素满 始终阻塞,队列非满时,被唤醒
while (count == items.length)
notFull.await();
// 入队
enqueue(e);
} finally {lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 获取锁
lock.lockInterruptibly();
try {
// 队列为空 期待
while (count == 0)
notEmpty.await();
// 出队
return dequeue();} finally {lock.unlock();
}
}
LinkedBlockingQueue
LinkedBlockingQueue
是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE
。此队列依照先进先出的准则对元素进行排序。
PriorityBlockingQueue
PriorityBlockingQueue
是一个反对优先级的无界阻塞队列。默认状况下元素采取天然程序升序排列。也能够自定义类实现 compareTo()
办法来指定元素排序规定,或者初始化 PriorityBlockingQueue
时,指定结构参数 Comparator 来对元素进行排序。须要留神的是不能保障同优先级元素的程序。
public class PriorityBlockingQueueExample1 {public static void main(String[] args) {PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue();
queue.offer(1);
queue.offer(12);
queue.offer(21);
queue.offer(6);
// 外部排序
System.out.println(queue.poll()); // 1
System.out.println(queue.poll()); // 6
System.out.println(queue.poll()); // 12
System.out.println(queue.poll()); //21
}
}
DelayQueue
DelayQueue
是一个反对延时获取元素的无界阻塞队列。队列应用 PriorityQueue
来实现。队列中的元素必须实现 Delayed
接口,在创立元素时能够指定多久能力从队列中获取以后元素。只有在提早期满时能力从队列中提取元素。
DelayQueue
十分有用,能够将 DelayQueue
使用在以下利用场景。
- 缓存零碎的设计:能够用
DelayQueue
保留缓存元素的有效期,应用一个线程循环查问DelayQueue
,一旦能从DelayQueue
中获取元素时,示意缓存有效期到了。 - 定时任务调度:应用
DelayQueue
保留当天将会执行的工作和执行工夫,一旦从DelayQueue
中获取到工作就开始执行,比方TimerQueue
就是应用DelayQueue
实现的。
DelayQueue
队列的元素必须实现 Delayed
接口。咱们能够参考 ScheduledThreadPoolExecutor
里ScheduledFutureTask
类的实现。
public class DelayQueueExample1 {public static void main(String[] args) throws InterruptedException {DelayQueue<DelayedEntry> queue = new DelayQueue<>();
// 延期 3 秒 解决
queue.put(new DelayedEntry("A", 30000L));
// 延期 10 秒解决
queue.add(new DelayedEntry("B", 10000L));
// 延期 20 秒解决
queue.add(new DelayedEntry("C", 20000L));
int size = queue.size();
System.out.println("以后工夫是:" + LocalDateTime.now());
// 从延时队列中获取元素,将输入 A,B,C
for (int i = 0; i < size; i++) {System.out.println(queue.take() + "------" + LocalDateTime.now());
}
}
}
/**
* 继承 Delayed 接口
*/
class DelayedEntry implements Delayed {
/**
* 元素数据内容
*/
private final String value;
/**
* 用于计算生效工夫
*/
private final long exeTime;
DelayedEntry(String value, long exeTime) {
this.value = value;
this.exeTime = exeTime + System.currentTimeMillis();}
@Override
public long getDelay(TimeUnit unit) {return exeTime - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {DelayedEntry t = (DelayedEntry) o;
if (this.exeTime < t.exeTime) {return -1;} else if (this.exeTime > t.exeTime) {return 1;} else {return 0;}
}
@Override
public String toString() {
return "DelayedEntry{" +
"value=" + value +
", exeTime=" + exeTime +
'}';
}
}
// 以后工夫是:2020-10-15T16:26:37.167
//DelayedEntry{value=B, exeTime=1602750407104} ------ 2020-10-15T16:26:47.117
// DelayedEntry{value=C, exeTime=1602750417104} ------ 2020-10-15T16:26:57.105
//DelayedEntry{value=A, exeTime=1602750427104} ------ 2020-10-15T16:27:07.104
SynchronousQueue
SynchronousQueue
是一个不存储元素的阻塞队列。每一个 put 操作必须期待一个 take 操作,否则不能持续增加元素。
它反对偏心拜访队列。默认状况下线程采纳非公平性策略拜访队列。应用以下构造方法能够创立公平性拜访的SynchronousQueue
,如果设置为 true,则期待的线程会采纳先进先出的程序拜访队列。
LinkedTransferQueue
LinkedTransferQueue
是一个由链表构造组成的无界阻塞 TransferQueue
队列。绝对于其余阻塞队列,LinkedTransferQueue
多了 tryTransfer
和transfer
办法。
- transfer 办法
如果以后有消费者正在期待接管元素(消费者应用
take()
办法或带工夫限度的 poll()办法时),transfer
办法能够把生产者传入的元素立即transfer
(传输)给消费者。如果没有消费者在期待接管元素,transfer 办法会将元素寄存在队列的 tail 节点,并等到该元素被消费者生产了才返回。transfer
办法的要害代码如下 - tryTransfer 办法
tryTransfer
办法是用来试探生产者传入的元素是否能间接传给消费者。如果没有消费者期待接管元素,则返回 false。和transfer
办法的区别是tryTransfer
办法无论消费者是否接管,办法立刻返回,而transfer
办法是必须等到消费者生产了才返回。
对于带有工夫限度的tryTransfer(E e,long timeout,TimeUnit unit)
办法,试图把生产者传入的元素间接传给消费者,然而如果没有消费者生产该元素则期待指定的工夫再返回,如果超时还没生产元素,则返回 false,如果在超时工夫内生产了元素,则返回 true。
LinkedBlockingDeque
LinkedBlockingDeque
是一个由链表构造组成的双向阻塞队列。所谓双向队列指的是能够从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就缩小了一半的竞争。相比其余的阻塞队列,LinkedBlockingDeque
多了 addFirst
、addLast
、offerFirst
、offerLast
、peekFirst
和peekLast
等办法,以 First
单词结尾的办法,示意插入、获取(peek)或移除双端队列的第一个元素。以 Last
单词结尾的办法,示意插入、获取或移除双端队列的最初一个元素。另外,插入方法 add
等同于 addLast
,移除办法remove
等效于 removeFirst
。然而take
办法却等同于 takeFirst
,不晓得是不是JDK
的 bug,应用时还是用带有 First
和Last
后缀的办法更分明。
在初始化LinkedBlockingDeque
时能够设置容量避免其适度收缩。另外,双向阻塞队列能够使用在“工作窃取”模式中。
欢送关注公众号 山间木匠, 我是小春哥,从事 Java 后端开发,会一点前端、通过继续输入系列技术文章以文会友,如果本文能为您提供帮忙,欢送大家关注、点赞、分享反对,_咱们下期再见!_