一、简介

什么是阻塞队列?咱们都晓得队列具备先进先出的特点,那么有阻塞个性(即队列满了阻塞生产者,队列空了阻塞消费者)的队列,称为阻塞队列

阻塞队列被广泛应用于生产者-消费者模式中,在理论开发中,咱们常常应用LinkedBlockingQueue来作为阻塞队列,而为什么应用LinkedBlockingQueue而不应用ArrayBlockingQueue,上面会通过对LinkedBlockingQueue的源码解读来解答这个问题。

二、LinkedBlockingQueue的根本应用

2.1 根本构造

LinkedBlockingQueue是基于单向链表的队列,结构图如下

2.2 罕用api

BlockingQueue接口提供了许多办法,罕用的如下:

办法\解决形式抛出异样返回非凡值始终阻塞超时退出
插入add(e)offer(e)put(e)put(e, time, unit)
移除remove(e)poll()take()poll(time, unit)
查看element()peek()--

能够看到,插入、移除元素的时候都各有4种办法,每种办法有对应的解决形式(抛出异样、返回非凡值、始终阻塞、超时退出

2.3 简略示例

上面是一个模仿生产者-消费者的简略示例,生产者线程定时向阻塞队列中插入音讯,消费者线程一直从阻塞队列中生产音讯

public class LinkedBlockingQueueMain {    private final static BlockingQueue<String> queue = new LinkedBlockingQueue<>();    public static void main(String[] args) {        new Thread(new Producer()).start();        new Thread(new Consumer()).start();    }    static class Producer implements Runnable {        @Override        public void run() {            int count = 0;            while (true) {                try {                    queue.put("message" + count++);                    TimeUnit.MILLISECONDS.sleep(500);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        }    }    static class Consumer implements Runnable {        @Override        public void run() {            while (true) {                try {                    String msg = queue.take();                    System.out.println("生产到音讯:" + msg);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        }    }}

三、源码解读

3.1 要害属性

  • int capacity:队列容量
  • AtomicInteger count:以后队列所蕴含的元素个数
  • Node<E> head:头节点
  • Node<E> last:尾节点
  • ReentrantLock takeLocktake锁
  • Condition notEmpty队列不空(条件)
  • ReentrantLock putLockput锁
  • Condition notFull队列不满(条件)

能够看到这里有两把锁,一把是移除元素用的take锁,另一把是插入元素用的put锁,这是与ArrayBlockingQueue的重要区别

3.2 插入元素

这里介绍插入元素的put办法,如果队列满了,则阻塞生产者,直到队列中有足够的地位来寄存元素。大抵步骤如下:

  1. putLock加锁
  2. 判断队列元素个数是否曾经达到容量大小,如果是则调用notFull条件的await办法,使以后生产者线程进入期待状态,直到被唤醒。否则间接下一步
  3. 将以后元素插入队尾
  4. 判断队列如果还没满,则会唤醒在期待notFull条件的某个生产者线程
  5. putLock解锁
  6. 如果本来队列为空,则唤醒在期待notEmpty条件的某个消费者线程

源码如下:

public void put(E e) throws InterruptedException {    if (e == null) throw new NullPointerException();    int c = -1;    Node<E> node = new Node<E>(e);    final ReentrantLock putLock = this.putLock;    final AtomicInteger count = this.count;        // putLock加锁    putLock.lockInterruptibly();    try {        // 判断队列元素个数是否曾经达到容量大小,是则调用notFull条件的await办法,使以后生产者线程进入期待状态        while (count.get() == capacity) {            notFull.await();        }        // 当还有闲暇地位或以后线程被唤醒,将以后元素插入队尾        enqueue(node);        c = count.getAndIncrement();        // 判断队列如果还没满,则会唤醒在期待notFull条件的某个生产者线程        if (c + 1 < capacity)            notFull.signal();    } finally {        // putLock解锁        putLock.unlock();    }    // 如果c等于0,阐明队列中本来没有元素,则唤醒在期待notEmpty条件的某个消费者线程    if (c == 0)        signalNotEmpty();}

3.3 移除元素

这里介绍移除元素的take办法,如果队列为空,则阻塞消费者,直到队列中有元素可供生产。大抵步骤如下:

  1. takeLock加锁
  2. 判断队列是否为空,如果是则调用notEmpty条件的await办法,使以后消费者线程进入期待状态,直到被唤醒。否则间接下一步
  3. 移出队头元素
  4. 判断队列如果还有元素,则会唤醒在期待notEmpty条件的某个消费者线程
  5. takeLock解锁
  6. 如果本来队列是满的,则唤醒在期待notFull条件的某个生产者线程

源码如下:

public E take() throws InterruptedException {    E x;    int c = -1;    final AtomicInteger count = this.count;    final ReentrantLock takeLock = this.takeLock;    // takeLock加锁    takeLock.lockInterruptibly();    try {        // 当队列为空,则调用notEmpty条件的await办法,使以后消费者线程进入期待状态        while (count.get() == 0) {            notEmpty.await();        }        // 当队列元素不为空或以后消费者线程被唤醒,将队头元素出队        x = dequeue();        c = count.getAndDecrement();        // 如果移除队头元素后队列中还有其余元素,则会唤醒在期待notEmpty条件的某个消费者线程        if (c > 1)            notEmpty.signal();    } finally {        // takeLock解锁        takeLock.unlock();    }    // 如果原来队列是满的,则唤醒在期待notFull条件的某个生产者线程    if (c == capacity)        signalNotFull();    return x;}

四、总结

阻塞队列在并发编程中利用十分宽泛,罕用于生产者-消费者模式,在队列满或者空的状况下,能够阻塞生产者或者消费者,直到队列不满或者不空。

理论开发过程中个别应用LinkedBlockingQueue而不是ArrayBlockingQueue,因为其外部领有两把锁putLock和takeLock,意味着在生产者在生产数据的同时,消费者能够生产数据,而ArrayBlockingQueue外部只有一把锁,生产者在生产数据的同时,消费者无奈生产数据。