java中队列的介绍和使用

36次阅读

共计 4260 个字符,预计需要花费 11 分钟才能阅读完成。

Queue:基本上,一个队列就是一个先入先出(FIFO)的数据结构
Queue 接口与 List、Set 同一级别,都是继承了 Collection 接口。LinkedList 实现了 Deque 接 口。
Queue 的实现
1、没有实现的阻塞接口的 LinkedList:实现了 java.util.Queue 接口和 java.util.AbstractQueue 接口
内置的不阻塞队列:PriorityQueue 和 ConcurrentLinkedQueue
PriorityQueue 和 ConcurrentLinkedQueue 类在 Collection Framework 中加入两个具体集合实现。
PriorityQueue 类实质上维护了一个有序列表。加入到 Queue 中的元素根据它们的天然排序(通过其 java.util.Comparable 实现)或者根据传递给构造函数的 java.util.Comparator 实现来定位。
ConcurrentLinkedQueue 是基于链接节点的、线程安全的队列。并发访问不需要同步。因为它在队列的尾部添加元素并从头部删除它们,所以只要不需要知道队列的大 小,

 ConcurrentLinkedQueue 对公共集合的共享访问就可以工作得很好。收集关于队列大小的信息会很慢,需要遍历队列。

2、实现阻塞接口的:
java.util.concurrent 中加入了 BlockingQueue 接口和五个阻塞队列类。它实质上就是一种带有一点扭曲的 FIFO 数据结构。不是立即从队列中添加或者删除元素,线程执行操作阻塞,直到有空间或者元素可用。

 五个队列所提供的各有不同:

* ArrayBlockingQueue:一个由数组支持的有界队列。
* LinkedBlockingQueue:一个由链接节点支持的可选有界队列。
* PriorityBlockingQueue:一个由优先级堆支持的无界优先级队列。
* DelayQueue:一个由优先级堆支持的、基于时间的调度队列。
* SynchronousQueue:一个利用 BlockingQueue 接口的简单聚集(rendezvous)机制。

下表显示了 jdk1.5 中的阻塞队列的操作:

add 增加一个元索 如果队列已满,则抛出一个 IIIegaISlabEepeplian 异常
remove 移除并返回队列头部的元素 如果队列为空,则抛出一个 NoSuchElementException 异常
element 返回队列头部的元素 如果队列为空,则抛出一个 NoSuchElementException 异常
offer 添加一个元素并返回 true 如果队列已满,则返回 false
poll 移除并返问队列头部的元素 如果队列为空,则返回 null
peek 返回队列头部的元素 如果队列为空,则返回 null
put 添加一个元素 如果队列满,则阻塞
take 移除并返回队列头部的元素 如果队列为空,则阻塞

remove、element、offer、poll、peek 其实是属于 Queue 接口。

阻塞队列的操作可以根据它们的响应方式分为以下三类:aad、removee 和 element 操作在你试图为一个已满的队列增加元素或从空队列取得元素时 抛出异常。当然,在多线程程序中,队列在任何时间都可能变成满的或空的,所以你可能想使用 offer、poll、peek 方法。这些方法在无法完成任务时 只是给出一个出错示而不会抛出异常。

注意:poll 和 peek 方法出错进返回 null。因此,向队列中插入 null 值是不合法的

最后,我们有阻塞操作 put 和 take。put 方法在队列满时阻塞,take 方法在队列空时阻塞。

LinkedBlockingQueue 的容量是没有上限的(说的不准确,在不指定时容量为 Integer.MAX_VALUE,不要然的话在 put 时怎么会受阻呢),但是也可以选择指定其最大容量,它是基于链表的队列,此队列按 FIFO(先进先出)排序元素。

ArrayBlockingQueue 在构造时需要指定容量,并可以选择是否需要公平性,如果公平参数被设置 true,等待时间最长的线程会优先得到处理(其实就是通过将 ReentrantLock 设置为 true 来 达到这种公平性的:即等待时间最长的线程会先操作)。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。它是基于数组的阻塞循环队 列,此队列按 FIFO(先进先出)原则对元素进行排序。

PriorityBlockingQueue 是一个带优先级的 队列,而不是先进先出队列。元素按优先级顺序被移除,该队列也没有上限(看了一下源码,PriorityBlockingQueue 是对 PriorityQueue 的再次包装,是基于堆数据结构的,而 PriorityQueue 是没有容量限制的,与 ArrayList 一样,所以在优先阻塞 队列上 put 时是不会受阻的。虽然此队列逻辑上是无界的,但是由于资源被耗尽,所以试图执行添加操作可能会导致 OutOfMemoryError),但是如果队列为空,那么取元素的操作 take 就会阻塞,所以它的检索操作 take 是受阻的。另外,往入该队列中的元 素要具有比较能力。

DelayQueue(基于 PriorityQueue 来实现的)是一个存放 Delayed 元素的无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于或等于零的值时,则出现期满,poll 就以移除这个元素了。此队列不允许使用 null 元素。

一个例子:
[Java] 纯文本查看 复制代码
?
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package com.yao;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BlockingQueueTest {
/**
定义装苹果的篮子
*/
public static class Basket{
// 篮子,能够容纳 3 个苹果
BlockingQueue<String> basket = new ArrayBlockingQueue<String>(3);

// 生产苹果,放入篮子
public void produce() throws InterruptedException{
// put 方法放入一个苹果,若 basket 满了,等到 basket 有位置
basket.put(“An apple”);
}
// 消费苹果,从篮子中取走
public String consume() throws InterruptedException{
// get 方法取出一个苹果,若 basket 为空,等到 basket 有苹果为止
String apple = basket.take();
return apple;
}

public int getAppleNumber(){
return basket.size();
}

}
// 测试方法
public static void testBasket() {
// 建立一个装苹果的篮子
final Basket basket = new Basket();
// 定义苹果生产者
class Producer implements Runnable {
public void run() {

try {while (true) {
  // 生产苹果
  System.out.println("生产者准备生产苹果:"
    + System.currentTimeMillis());
  basket.produce();
  System.out.println("生产者生产苹果完毕:"
    + System.currentTimeMillis());
  System.out.println("生产完后有苹果:"+basket.getAppleNumber()+"个");
  // 休眠 300ms
  Thread.sleep(300);
 }
} catch (InterruptedException ex) {}

}
}
// 定义苹果消费者
class Consumer implements Runnable {
public void run() {

try {while (true) {
  // 消费苹果
  System.out.println("消费者准备消费苹果:"
    + System.currentTimeMillis());
  basket.consume();
  System.out.println("消费者消费苹果完毕:"
    + System.currentTimeMillis());
  System.out.println("消费完后有苹果:"+basket.getAppleNumber()+"个");
  // 休眠 1000ms
  Thread.sleep(1000);
 }
} catch (InterruptedException ex) {}

}
}

ExecutorService service = Executors.newCachedThreadPool();
Producer producer = new Producer();
Consumer consumer = new Consumer();
service.submit(producer);
service.submit(consumer);
// 程序运行 10s 后,所有任务停止
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
service.shutdownNow();
}
public static void main(String[] args) {
BlockingQueueTest.testBasket();
}
}

正文完
 0