关于后端:10分钟从实现和使用场景聊聊并发包下的阻塞队列

4次阅读

共计 12187 个字符,预计需要花费 31 分钟才能阅读完成。

上篇文章 12 分钟从 Executor 自顶向下彻底搞懂线程池中咱们聊到线程池,而线程池中蕴含阻塞队列

这篇文章咱们次要聊聊并发包下的阻塞队列

阻塞队列

什么是队列?

队列的实现能够是数组、也能够是链表,能够实现先进先出的程序队列,也能够实现先进后出的栈队列

那什么是阻塞队列?

在经典的生产者 / 消费者模型中,生产者们将生产的元素放入队列,而消费者们从队列获取元素生产

当队列已满,咱们会手动阻塞生产者,直到消费者生产再来手动唤醒生产者

当队列为空,咱们会手动阻塞消费者,直到生产者生产再来手动唤醒消费者

在这个过程中因为应用的是一般队列,阻塞与唤醒咱们须要手动操作,保障同步机制

阻塞队列在队列的根底上提供期待 / 告诉性能,用于线程间的通信,防止线程竞争死锁

生产者能够看成往线程池增加工作的用户线程,而消费者则是线程池中的工作线程

当阻塞队列为空时阻塞工作线程获取工作,当阻塞队列已满时阻塞用户线程向队列中增加工作(创立非核心线程、回绝策略)

API

阻塞队列提供一下四种增加、删除元素的 API,咱们罕用阻塞期待 / 超时阻塞期待的 API

办法名 抛出异样 返回 true/false 阻塞期待 超时阻塞期待
增加 add(Object) offer(Object) put(Object) offer(Object,long,TimeUnit)
删除 remove() poll() take() poll(long,TimeUnit)
  1. 抛出异样: 队满 add 抛出异样IllegalStateExceptio;队空 remove 抛出异样NoSuchElementException
  2. 返回值: 队满 offer 返回 false,队空 poll 返回 null
  3. 阻塞期待: 队满时 put 会阻塞线程 或 队空时 take 会阻塞线程
  4. 超时阻塞期待: 在阻塞期待、返回 true/false 的根底上减少超时期待(期待肯定工夫就退出期待)
阻塞队列的偏心与不偏心

什么是阻塞队列的偏心与不偏心?

当阻塞队列已满时,如果是偏心的,那么阻塞的线程依据先后顺序从阻塞队列中获取元素,不偏心则反之

实际上阻塞队列的偏心与不偏心,要看实现阻塞队列的锁是否偏心

阻塞队列个别默认应用不偏心锁

ArrayBlockingQueue

从名称看就能够晓得它是数组实现的,咱们先来看看它有哪些重要字段

 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
         implements BlockingQueue<E>, java.io.Serializable {
 ​
     // 存储元素的数组
     final Object[] items;
 ​
     // 记录元素出队的下标
     int takeIndex;
 ​
     // 记录元素入队的下标
     int putIndex;
 ​
     // 队列中元素数量
     int count;​
     // 应用的锁
     final ReentrantLock lock;
 ​
     // 出队的期待队列,作用于消费者
     private final Condition notEmpty;
 ​
     // 入队的期待队列,作用于生产者
     private final Condition notFull;
     
 }

看完关键字段,咱们能够晓得:ArrayBlockingQueue由数组实现、应用并发包下的可重入锁、同时用两个期待队列作用生产者和消费者

为什么出队、入队要应用两个下标记录?

实际上它是一个环形数组,在初始化后就不扭转大小,后续查看源码天然能明确它是环形数组

在结构器中、初始化数组容量,同时应用非偏心锁

     public ArrayBlockingQueue(int capacity) {this(capacity, false);
     }
 ​
     public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)
             throw new IllegalArgumentException();
         this.items = new Object[capacity];
         // 锁是否为偏心锁
         lock = new ReentrantLock(fair);
         notEmpty = lock.newCondition();
         notFull =  lock.newCondition();}

ArrayBlockingQueue 的公平性是由 ReentrantLock 来实现的

咱们来看看入队办法,入队办法都大同小异,咱们本文都查看反对超时、响应中断的办法

     public boolean offer(E e, long timeout, TimeUnit unit)
         throws InterruptedException {
         // 查看空指针
         checkNotNull(e);
         // 获取超时纳秒
         long nanos = unit.toNanos(timeout);
         final ReentrantLock lock = this.lock;
         // 加锁
         lock.lockInterruptibly();
         try {
             // 如果队列已满
             while (count == items.length) {
                 // 超时则返回入队失败,否则生产者期待对应工夫
                 if (nanos <= 0)
                     return false;
                 nanos = notFull.awaitNanos(nanos);
             }
             // 入队
             enqueue(e);
             return true;
         } finally {
             // 解锁
             lock.unlock();}
     }

间接应用可重入锁保障同步,如果队列已满,在此期间判断是否超时,超时就返回,未超时期待;未满则执行入队办法

     private void enqueue(E x) {
         // 队列数组
         final Object[] items = this.items;
         // 往入队下标增加值
         items[putIndex] = x;
         // 自增入队下标 如果已满则定位到 0 成环
         if (++putIndex == items.length)
             putIndex = 0;
         // 统计数量减少
         count++;
         // 唤醒消费者
         notEmpty.signal();}

在入队中,次要是增加元素、批改下次增加的下标、统计队列中的元素和唤醒消费者,到这以及能够阐明它的实现是环形数组

ArrayBlockingQueue由环形数组实现的阻塞队列,固定容量不反对动静扩容,应用非偏心的 ReertrantLock 保障入队、出队操作的原子性,应用两个期待队列存储期待的生产者、消费者,实用于在并发量不大的场景

LinkedBlockingQueue

LinkedBlockingQueue从名称上来看,就是应用链表实现的,咱们来看看它的关键字段

 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
         implements BlockingQueue<E>, java.io.Serializable {
     // 节点
     static class Node<E> {
         // 存储元素
         E item;
 ​
         // 下一个节点
         Node<E> next;
         
         //...
     }
 ​
     // 容量下限
     private final int capacity;
 ​
     // 队列元素数量
     private final AtomicInteger count = new AtomicInteger();
 ​
     // 头节点
     transient Node<E> head;
 ​
     // 尾节点
     private transient Node<E> last;
 ​
     // 出队的锁
     private final ReentrantLock takeLock = new ReentrantLock();
 ​
     // 出队的期待队列
     private final Condition notEmpty = takeLock.newCondition();
 ​
     // 入队的锁
     private final ReentrantLock putLock = new ReentrantLock();
 ​
     // 入队的期待队列
     private final Condition notFull = putLock.newCondition();}

从字段中,咱们能够晓得它应用单向链表的节点、且用首尾节点记录队列的头尾,并且它应用两把锁、两个期待队列作用于队头、尾,与 ArrayBlockingQueue 相比可能减少并发性能

有个奇怪的中央:都应用锁了,为什么记录元素数量 count 却应用原子类呢?

这是因为两把锁,作用于入队与出队的操作,入队与出队也可能并发执行,同时批改 count,因而要应用原子类保障批改数量的原子性

在初始化时须要设置容量大小,否则会设置成无界的阻塞队列(容量是 int 的最大值)

当生产速度小于生产速度时,阻塞队列中会沉积工作,进而导致容易产生 OOM

     public LinkedBlockingQueue() {this(Integer.MAX_VALUE);
     }
 ​
     public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();
         this.capacity = capacity;
         last = head = new Node<E>(null);
     }

来看看入队操作

     public boolean offer(E e, long timeout, TimeUnit unit)
         throws InterruptedException {
 ​
         if (e == null) throw new NullPointerException();
         long nanos = unit.toNanos(timeout);
         int c = -1;
         final ReentrantLock putLock = this.putLock;
         final AtomicInteger count = this.count;
         // 加锁
         putLock.lockInterruptibly();
         try {
             // 队列已满,超时返回,不超时期待
             while (count.get() == capacity) {if (nanos <= 0)
                     return false;
                 nanos = notFull.awaitNanos(nanos);
             }
             // 入队
             enqueue(new Node<E>(e));
             // 先获取再自增 c 中存储的是旧值
             c = count.getAndIncrement();
             // 如果数量没满 唤醒生产者
             if (c + 1 < capacity)
                 notFull.signal();} finally {
             // 解锁
             putLock.unlock();}
         // 如果旧值为 0 阐明该入队操作前是空队列,唤醒消费者来生产
         if (c == 0)
             signalNotEmpty();
         return true;
     }

入队操作相似,只不过在此期间如果数量没满唤醒生产者生产,队列为空唤醒消费者来生产,从而减少并发性能

入队只是扭转指向关系

     // 增加节点到开端
     private void enqueue(Node<E> node) {last = last.next = node;}

唤醒消费者前要先获取锁

     private void signalNotEmpty() {
         final ReentrantLock takeLock = this.takeLock;
         takeLock.lock();
         try {notEmpty.signal();
         } finally {takeLock.unlock();
         }
     }

出队操作也相似

     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
         E x = null;
         int c = -1;
         long nanos = unit.toNanos(timeout);
         final AtomicInteger count = this.count;
         final ReentrantLock takeLock = this.takeLock;
         takeLock.lockInterruptibly();
         try {
             // 队列为空 超时返回空,否则期待
             while (count.get() == 0) {if (nanos <= 0)
                     return null;
                 nanos = notEmpty.awaitNanos(nanos);
             }
             // 出队
             x = dequeue();
             c = count.getAndDecrement();
             // 队列中除了以后线程获取的工作外还有工作就去唤醒消费者生产
             if (c > 1)
                 notEmpty.signal();} finally {takeLock.unlock();
         }
         // 原来队列已满就去唤醒生产者 生产
         if (c == capacity)
             signalNotFull();
         return x;
     }

LinkedBlockingQueueArrayBlockingQueue 的出队、入队实现相似

只不过 LinkedBlockingQueue 入队、出队获取 / 开释的锁不同,并且在此过程中不同状况回去唤醒其余的生产者、消费者从而进一步晋升并发性能

LinkedBlockingQueue 由单向链表实现的阻塞队列,记录首尾节点;默认是无界、非偏心的阻塞队列(初始化时要设置容量否则可能 OOM),应用两把锁、两个期待队列,别离操作入队、出队的生产者、消费者,在入队、出队操作期间不同状况还会去唤醒生产者、消费者,从而进一步晋升并发性能,实用于并发量大的场景

LinkedBlockingDeque

LinkedBlockingDeque实现与 LinkedBlockQueue 相似,在 LinkedBlockQueue 的根底上反对从队头、队尾进行增加、删除的操作

它是一个双向链表,带有一系列 First、Last 的办法,比方:offerLastpollFirst

因为 LinkedBlockingDeque 双向,罕用其来 实现工作窃取算法,从而缩小线程的竞争

什么是工作窃取算法?

比方多线程解决多个阻塞队列的工作(一一对应),每个线程从队头获取工作解决,当 A 线程解决完它负责的阻塞队列所有工作时,它再从队尾窃取其余阻塞队列的工作,这样就不会产生竞争,除非队列中只剩一个工作,才会产生竞争

ForkJoin框架就应用其来充当阻塞队列,咱们后文再聊这个框架

PriorityBlockingQueue

PriorityBlockingQueue 是优先级排序的无界阻塞队列,阻塞队列依照优先级进行排序

应用堆排序,具体排序算法由 ComparableComparator实现比拟规定

  1. 默认:泛型中的对象须要实现 Comparable 比拟规定,依据 compareTo 办法规定排序
  2. 结构器中指定比拟器Comparator 依据比拟器规定排序
     @Test
     public void testPriorityBlockingQeque() {
         // 默认应用 Integer 实现 Comparable 的升序
         PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>(6);
         queue.offer(99);
         queue.offer(1099);
         queue.offer(299);
         queue.offer(992);
         queue.offer(99288);
         queue.offer(995);
         //99 299 992 995 1099 99288
         while (!queue.isEmpty()){System.out.print(" "+queue.poll());
         }
 ​
         System.out.println();
         // 指定 Comparator 降序
         queue = new PriorityBlockingQueue<>(6, (o1, o2) -> o2-o1);
         queue.offer(99);
         queue.offer(1099);
         queue.offer(299);
         queue.offer(992);
         queue.offer(99288);
         queue.offer(995);
         //99288 1099 995 992 299 99
         while (!queue.isEmpty()){System.out.print(" "+queue.poll());
         }
     }

实用于须要依据优先级排序解决的场景

DelayQueue

Delay 是一个延时获取元素的无界阻塞队列,延时最长排在队尾

Delay 队列元素实现 Delayed 接口通过 getDelay 获取延时工夫

 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
     implements BlockingQueue<E> { }
 ​
 public interface Delayed extends Comparable<Delayed> {long getDelay(TimeUnit unit);
 }

DelayQueue 利用场景

  1. 缓存零碎的设计:DelayQueue 寄存缓存有效期,当能够获取到元素时,阐明缓存过期
  2. 定时任务调度:将定时工作的工夫设置为延时工夫,一旦能够获取到工作就开始执行

以定时线程池 ScheduledThreadPoolExecutor 的定时工作 ScheduledFutureTask 为例,它实现 Delayed 获取提早执行的工夫

  1. 创建对象时, 初始化数据

             ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);
                 //time 记录以后对象提早到什么时候能够应用, 单位是纳秒
                 this.time = ns;
                 this.period = period;
                 //sequenceNumber 记录元素在队列中先后顺序  sequencer 原子自增
                 //AtomicLong sequencer = new AtomicLong();
                 this.sequenceNumber = sequencer.getAndIncrement();}
  2. 实现 Delayed 接口的 getDelay 办法

     public long getDelay(TimeUnit unit) {return unit.convert(time - now(), NANOSECONDS);
     }
  3. Delay 接口继承了 Comparable 接口,目标是要实现 compareTo 办法来持续排序

             public int compareTo(Delayed other) {if (other == this) // compare zero if same object
                     return 0;
                 if (other instanceof ScheduledFutureTask) {ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                     long diff = time - x.time;
                     if (diff < 0)
                         return -1;
                     else if (diff > 0)
                         return 1;
                     else if (sequenceNumber < x.sequenceNumber)
                         return -1;
                     else
                         return 1;
                 }
                 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
                 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
             }

SynchronousQueue

SynchronousQueue 是一个默认下反对非偏心不存储元素的阻塞队列

每个 put 操作要期待一个 take 操作, 否则不能持续增加元素会阻塞

应用偏心锁

     @Test
     public void testSynchronousQueue() throws InterruptedException {final SynchronousQueue<Integer> queue = new SynchronousQueue(true);
         new Thread(() -> {
             try {queue.put(1);
                 queue.put(2);
             } catch (InterruptedException e) {e.printStackTrace();
             }
         }, "put12 线程").start();
 ​
         new Thread(() -> {
             try {queue.put(3);
                 queue.put(4);
             } catch (InterruptedException e) {e.printStackTrace();
             }
         }, "put34 线程").start();
 ​
         TimeUnit.SECONDS.sleep(1);
         System.out.println(Thread.currentThread().getName() + "拿出" + queue.take());
         TimeUnit.SECONDS.sleep(1);
         System.out.println(Thread.currentThread().getName() + "拿出" + queue.take());
         TimeUnit.SECONDS.sleep(1);
         System.out.println(Thread.currentThread().getName() + "拿出" + queue.take());
         TimeUnit.SECONDS.sleep(1);
         System.out.println(Thread.currentThread().getName() + "拿出" + queue.take());
     }
 // 后果 因为应用偏心锁 1 在 2 前,3 在 4 前
 //main 拿出 1
 //main 拿出 3
 //main 拿出 2
 //main 拿出 4 

SynchronousQueue 队列自身不存储元素,负责把生产者的数据传递给消费者,适宜传递性的场景

在该场景下吞吐量会比 ArrayBlockingQueue,LinkedBlockingQueue 高

LinkedTransferQueue

LinkedTransferQueue 是一个链表组成的无界阻塞队列,领有 transfer()tryTransfer()办法

transfer()

如果有消费者在期待接管元素,transfer(e)会把元素 e 传输给消费者

如果没有消费者在期待接管元素,transfer(e)会将元素 e 寄存在队尾,直到有消费者获取了才返回

     @Test
     public void testTransfer() throws InterruptedException {LinkedTransferQueue queue = new LinkedTransferQueue();
         new Thread(()->{
             try {
                 // 阻塞直到被获取
                 queue.transfer(1);
                 // 生产者放入的 1 被取走了
                 System.out.println(Thread.currentThread().getName()+"放入的 1 被取走了");
             } catch (InterruptedException e) {e.printStackTrace();
             }
         },"生产者").start();
 ​
         TimeUnit.SECONDS.sleep(3);
         //main 取出队列中的元素
         System.out.println(Thread.currentThread().getName()+"取出队列中的元素");
         queue.poll();}

tryTransfer()无论消费者是否生产都间接返回

     @Test
     public void testTryTransfer() throws InterruptedException {LinkedTransferQueue<Integer> queue = new LinkedTransferQueue<>();
         //false
         System.out.println(queue.tryTransfer(1));
         //null
         System.out.println(queue.poll());
 ​
         new Thread(()->{
             try {
                 // 消费者取出 2
                 System.out.println(Thread.currentThread().getName()+"取出"+queue.poll(2, TimeUnit.SECONDS));
             } catch (InterruptedException e) {e.printStackTrace();
             }
         },"消费者").start();
         TimeUnit.SECONDS.sleep(1);
         //true
         System.out.println(queue.tryTransfer(2));
     }

tryTransfer(long,TimeUnit) 在超时工夫内消费者生产元素返回 true,反之返回 false

总结

ArrayBlockingQueue 由环形数组实现,固定容量无奈扩容,应用非偏心的可重入锁锁、两个期待队列操作入队、出队操作,适宜并发小的场景

LinkedBlockingQueue 由单向链表实现,默认无界,应用两个可重入锁、两个期待队列进行入队、出队操作,并在此期间可能唤醒生产者或消费者线程,以此进步并发性能

LinkedBlockingDeque 由双向链表实现,在 LinkedBlockingQueue 的根底上,可能在队头、队尾都进行增加、删除操作,实用工作窃取算法 1

PriorityBlockingQueue 由堆排序实现的优先级队列,具体排序算法由 Comparable、Comparator 来实现,实用于须要依据优先级排序解决工作的场景

DelayQueue 是一个延时队列,队列中存储的元素须要实现 Delayed 接口来获取延时工夫,实用于缓存生效、定时工作的场景

SynchronousQueue 不存储元素,只将生产者生产的元素传递给消费者,实用于传递性的场景,比方不同线程间传递数据

LinkedTransgerQueue 是传输形的阻塞队列,实用于单个元素传递的场景

在应用无界的阻塞队列时,须要设置容量,防止存储工作太多导致 OOM

最初(不要白嫖,一键三连求求拉~)

本篇文章被支出专栏 由点到线,由线到面,深入浅出构建 Java 并发编程常识体系,感兴趣的同学能够继续关注喔

本篇文章笔记以及案例被支出 gitee-StudyJava、github-StudyJava 感兴趣的同学能够 stat 下继续关注喔 \~

案例地址:

Gitee-JavaConcurrentProgramming/src/main/java/E_BlockQueue

Github-JavaConcurrentProgramming/src/main/java/E_BlockQueue

有什么问题能够在评论区交换,如果感觉菜菜写的不错,能够点赞、关注、珍藏反对一下~

关注菜菜,分享更多干货,公众号:菜菜的后端私房菜

本文由博客一文多发平台 OpenWrite 公布!

正文完
 0