基本概念
阻塞队列(BlockingQueue)是一个反对两个附加操作的队列。这两个附加的操作反对阻塞的插入和移除办法。
1)反对阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
2)反对阻塞的移除办法:意思是在队列为空时,获取元素的线程会期待队列变为非空
阻塞队列一共有7种,咱们着重讲一下
ArrayBlockingQueue ,
LinkedBlockingQueue ,
DelayQueue,
SynchronousQueue
这四种阻塞队列
ArrayBlockingQueue
基于数组实现有界的阻塞队列(循环数组)
类的继承
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
次要成员变量
private static final long serialVersionUID = -817911632652898426L;
final Object[] items; //底层存储元素的数组
int takeIndex; //进行取操作时的下标
int putIndex;//进行放操作时的下标
int count;//队列中元素的数量
final ReentrantLock lock;//阻塞时用的锁
private final Condition notEmpty;//满时的condition队列
private final Condition notFull;//空时的condition队列
结构器
参数有容量和全局锁是否是偏心锁
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
不必确定是否是偏心锁,默认是非偏心锁
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
在第一个结构器的前提下,将整个汇合移入阻塞队列
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
次要办法
put()
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
1.首先判断增加的是否非空,是空的会抛出异样。
2.给put办法上锁
3.当汇合元素数量和汇合的长度相等时,调用put办法的线程将会被放入notFull队列上期待。
4.如果不相等,则enqueue(),也就是让e进入队列。
咱们再看看enqueue()办法(入队办法)
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
其实就是让该元素入队,并且唤醒因为汇合空而期待的线程。
take()办法同理。
LinkedBlockingQueue
LinkedBlockingQueue底层是基于链表实现的,所以其根本成员变量和LinkedList差不多。
类的继承关系
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
结构器
无参结构器,默认容量为最大容量
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
手动设定容量
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
将整个汇合挪入队列中,默认容量同样是最大容量。
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
次要成员变量
链表就肯定会有节点
外部节点类
和ArrayBlockingQueue不同的是,它有两个全局锁,一个负责放元素,一个负责取元素。
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
除了节点之外。
private transient Node<E> last;//尾节点
transient Node<E> head;//头节点
private final AtomicInteger count = new AtomicInteger();//计算以后阻塞队列中的元素个数
private final int capacity;//容量
//获取并移除元素时应用的锁,如take, poll, etc
private final ReentrantLock takeLock = new ReentrantLock();
//notEmpty条件对象,当队列没有数据时用于挂起执行删除的线程
private final Condition notEmpty = takeLock.newCondition();
//增加元素时应用的锁如 put, offer, etc
private final ReentrantLock putLock = new ReentrantLock();
//notFull条件对象,当队列数据已满时用于挂起执行增加的线程
private final Condition notFull = putLock.newCondition();
次要办法
put()办法
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
根本和ArrayBlockingQueue一样,只是锁的数量不同,导致有一些轻微的区别。
代码示例
public class TestDemo16 {
private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
public static void main(String[] args) {
new Thread("put"){
@Override
public void run() {
//增加元素
for(int i=0; i<10; i++){
System.out.println("put: "+i);
try {
queue.put(i);
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
new Thread("take"){
@Override
public void run() {
//获取元素
for(int i=0; i<10; i++){
try {
System.out.println("take: "+queue.take());
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
}
}
DelayQueue
基于PriorityQueue 延时阻塞队列,DelayQueue中的元素只有当其延时工夫达到,才可能去以后队列中获取到该元素,DelayQueue是一个无界队列。次要用于缓存零碎的设计、定时工作零碎的设计。
实现DelayQueue的三个步骤
第一步:继承Delayed接口
第二步:实现getDelay(TimeUnit unit),该办法返回以后元素还须要延时多长时间,单位是纳秒
第三步:实现compareTo办法来指定元素的程序
例如;
class Test implements Delayed {
private long time; //Test实例延时工夫
public Test(long time, TimeUnit unit){
this.time = System.currentTimeMillis() + unit.toMillis(time);
}
@Override
public long getDelay(TimeUnit unit) {
return this.time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
long diff = this.time - ((Test)o).time;
if(diff <= 0){
return -1;
}else{
return 1;
}
}
}
DelayQueue<Test> queue = new DelayQueue<>();
queue.put(new Test(5, TimeUnit.SECONDS));
queue.put(new Test(10, TimeUnit.SECONDS));
queue.put(new Test(15, TimeUnit.SECONDS));
System.out.println("begin time: "+ LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
for(int i=0; i<3; i++){
try {
Test test = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("time: "+LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
}
SynchronousQueue
SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须期待一个take操作,否则不能持续增加元素。它反对偏心拜访队列。默认状况下线程采纳非公平性策略拜访队列。应用以下构造方法能够创立公平性拜访的SynchronousQueue,如果设置为true,则期待的线程会采纳先进先出的程序拜访队列
SynchronousQueue能够看成是一个传球手,负责把生产者线程解决的数据间接传递给消费者线程。队列自身并不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue.
public static void main(String[] args) throws InterruptedException {
SynchronousQueue queue=new SynchronousQueue();
LinkedBlockingQueue
new Thread("put"){
@Override
public void run() {
System.out.println("put has started");
for(int i=0;i<5;i++){
System.out.println("put after takeThread");
try {
queue.put((int)((Math.random() * 100) + 1));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("put has ended");
}
}.start();
new Thread("take"){
@Override
public void run() {
System.out.println("take has started");
for(int i=0;i<5;i++){
try {
System.out.println("take from putThread"+queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("put has ended");
}
}.start();
}
总结
1:ArrayBlockingQueue和LinkedBlockingQueue的区别和分割?
1)数据存储容器不一样,ArrayBlockingQueue采纳数组去存储数据、LinkedBlockingQueue采纳链表去存储数据。
2)ArrayBlockingQueue(循环数组)采纳数组去存储数据,不会产生额定的对象实例; LinkedBlockingQueue采纳链表去存储数据,在插入和删除元素只与一个节点无关,须要去生成一个额定的Node对象,这可能长时间内须要并发解决大批量的数据,对于性能和前期GC会产生影响。
3)ArrayBlockingQueue是有界的,初始化时必须要指定容量;LinkedBlockingQueue默认是无界的,Integer.MAX_VALUE, 当增加速度大于删除速度、有可能造成内存溢出。
4) ArrayBlockingQueue在读和写应用的锁是一样的,即增加操作和删除操作应用的是同一个ReentrantLock,没有实现锁拆散;LinkedBlockingQueue实现了锁拆散,增加的时候采纳putLock、删除的时候采纳takeLock,这样能进步队列的吞吐量。
2:ArrayBlockingQueue能够应用两把锁提高效率吗?
不能,次要起因是ArrayBlockingQueue底层循环数组来存储数据,LinkedBlockingQueue底层 链表来存储数据,链表队列的增加和删除,只是和某一个节点无关,为了避免head和last相互影响,就须要有一个原子性的计数器,每个增加操作先退出队列,计数器+1,这样是为了保障队列在移除的时候, 长度是大于等于计数器的,通过原子性的计数器,使得以后增加和移除互不烦扰。对于循环数据来说,当咱们走到最初一个地位须要返回到第一个地位,这样的操作是无奈原子化,只能应用同一把锁来解决。
发表回复