前言

上一节看了基于数据的有界阻塞队列 ArrayBlockingQueue 的源码,通过浏览源码理解到在 ArrayBlockingQueue 中入队列和出队列操作都是用了 ReentrantLock 来保障线程平安。上面咱们看另一种有界阻塞队列:LinkedBlockingQueue。

公众号:『 刘志航 』,记录工作学习中的技术、开发及源码笔记;时不时分享一些生存中的见闻感悟。欢送大佬来领导!

介绍

一个基于链接节点的,可选绑定的 BlockingQueue 阻塞队列。

对元素 FIFO(先进先出)进行排序。队列的头部是已在队列中停留最长工夫的元素。队列的尾部是最短时间呈现在队列中的元素。将新元素插入队列的尾部,并检索队列操作获取队列结尾的元素。

基于连表的队列通常具备比基于数组的队列有更高的吞吐量,然而大多数并发应用程序中的可预测性较差。

根本应用

public class LinkedBlockingQueueTest {    private static final LinkedBlockingQueue<String> QUEUE = new LinkedBlockingQueue<>(10);    public static void main(String[] args) throws InterruptedException {        // 入队列        QUEUE.put("put 入队列, 队列满则会阻塞期待");        QUEUE.add("add 入队列, 队列满则会抛出异样");        QUEUE.offer("offer 入队列, 队列满会返回 false");        // 出队列        // 队列空返回 null        String poll = QUEUE.poll();        // 队列空会阻塞期待        String take = QUEUE.take();        // 仅仅看一下最早入队列的元素        String peek = QUEUE.peek();    }}

问题疑难

  1. LinkedBlockingQueue 的实现原理是什么?
  2. LinkedBlockingQueue 和 ArrayBlockingQueue 的区别是什么?

源码剖析

根本构造

参数介绍

static class Node<E> {        E item;    /**    * One of:    * - 真正的后继节点    * - 有值,示意后继者是head.next    * - null,示意没有后继(这是最初一个节点)    */    Node<E> next;    Node(E x) { item = x; }}

首先在 LinkedBlockingQueue 中有一个动态外部类 Node<E> 反对泛型,上面看下其余字段:

/** 初始容量,如果没有,则为Integer.MAX_VALUE */private final int capacity;/** 以后元素数 */private final AtomicInteger count = new AtomicInteger();/*** 链表头* 不变的是: head.item == null*/transient Node<E> head;/*** 链表尾* 不变的是: last.next == null*/private transient Node<E> last;/** 执行 take, poll 等操作须要获取到 takeLock */private final ReentrantLock takeLock = new ReentrantLock();/** 期待执行 take 操作的线程,会放入这个条件队列 */private final Condition notEmpty = takeLock.newCondition();/** 执行 put, offer 等操作须要获取到 putLock */private final ReentrantLock putLock = new ReentrantLock();/** 期待执行 put 操作的线程,会被放入这个条件队列 */private final Condition notFull = putLock.newCondition();

构造函数

public LinkedBlockingQueue() {    this(Integer.MAX_VALUE);}// 创立时指定容量public LinkedBlockingQueue(int capacity) {    if (capacity <= 0) throw new IllegalArgumentException();    this.capacity = capacity;    last = head = new Node<E>(null);}

通过构造函数能够看出,在初始化 LinkedBlockingQueue 时,如果不传入容量则会默认指定 Integer.MAX_VALUE。

增加元素

add 办法是间接调用的父类 AbstractQueue 的办法,外部调用的 LinkedBlockingQueue 本人实现的 offer 办法

public boolean add(E e) {    if (offer(e))        return true;    else        throw new IllegalStateException("Queue full");}

次要浏览的还是 LinkedBlockingQueue 的 put 和 offer 办法:

public void put(E e) throws InterruptedException {    // 插入元    if (e == null) throw new NullPointerException();    // Note: 所有put / take / etc中的约定是预设本地变量    // 放弃计数为负示意失败,除非置位。    int c = -1;    Node<E> node = new Node<E>(e);    final ReentrantLock putLock = this.putLock;    final AtomicInteger count = this.count;    putLock.lockInterruptibly();    try {               // 如果曾经到最大容量,则期待         while (count.get() == capacity) {            notFull.await();        }        enqueue(node);        // 总数进行减少, 返回的是先前的容量        c = count.getAndIncrement();        // 判断是否须要唤醒入队列阻塞的线程        if (c + 1 < capacity)            notFull.signal();    } finally {        putLock.unlock();    }    if (c == 0)        // 唤醒因调用 notEmpty 的 await 办法而被阻塞的线程        signalNotEmpty();}
public boolean offer(E e) {    // 为空抛出异样    if (e == null) throw new NullPointerException();    final AtomicInteger count = this.count;    // 如果曾经到最大容量,返回 false    if (count.get() == capacity)        return false;    int c = -1;    Node<E> node = new Node<E>(e);    final ReentrantLock putLock = this.putLock;    putLock.lock();    try {        if (count.get() < capacity) {            enqueue(node);            c = count.getAndIncrement();            if (c + 1 < capacity)                notFull.signal();        }    } finally {        putLock.unlock();    }    if (c == 0)        signalNotEmpty();    return c >= 0;}

通过下面两段代码能够看出 put 和 offer 的最大区别在于是否阻塞。 put 办法当队列达到指定容量时,会阻塞,期待有元素出队列。而 offer 办法会间接返回 false。

同时两个办法操作元素入队列都是调用的 enqueue(node) 办法,上面一起看下 enqueue 办法。

private void enqueue(Node<E> node) {    // assert putLock.isHeldByCurrentThread();    // assert last.next == null;    last = last.next = node;}

在 enqueue 办法中,间接指定以后尾节点的 next 为传入的元素即可。

获取元素

public E poll() {    final AtomicInteger count = this.count;    // 队列为空返回 null    if (count.get() == 0)        return null;    E x = null;    int c = -1;    // 加锁    final ReentrantLock takeLock = this.takeLock;    takeLock.lock();    try {        if (count.get() > 0) {            x = dequeue();            // 缩小队列元素计数,返回的是旧值            c = count.getAndDecrement();            if (c > 1)            // 旧值大于 1 ,就是以后大于 0            // 唤醒调用 notEmpty.await 期待的线程                notEmpty.signal();        }    } finally {        takeLock.unlock();    }    if (c == capacity)        // 如果旧值等于 capacity 阐明以后空了一个地位        signalNotFull();    return x;}
public E take() throws InterruptedException {    E x;    int c = -1;    final AtomicInteger count = this.count;    final ReentrantLock takeLock = this.takeLock;    takeLock.lockInterruptibly();    try {        // 阻塞期待        while (count.get() == 0) {            notEmpty.await();        }        x = dequeue();        c = count.getAndDecrement();        if (c > 1)            notEmpty.signal();    } finally {        takeLock.unlock();    }    if (c == capacity)        signalNotFull();    return x;}

通过下面代码能够看出 poll 和 take 办法逻辑大致相同。区别就是在以后队列为空时的解决逻辑。poll 在以后队列为空时返回 null,take 会阻塞期待,晓得以后队列中有元素。

poll 和 take 都试用 dequeue() 办法从队列中获取元素。

private E dequeue() {    // assert takeLock.isHeldByCurrentThread();    // assert head.item == null;    Node<E> h = head;    Node<E> first = h.next;    h.next = h; // help GC    head = first;    E x = first.item;    first.item = null;    return x;}

dequeue() 办法逻辑就是获取头节点,并将 head 指向下一个节点。

查看元素

public E peek() {    if (count.get() == 0)        return null;    final ReentrantLock takeLock = this.takeLock;    takeLock.lock();    try {        Node<E> first = head.next;        if (first == null)            return null;        else            return first.item;    } finally {        takeLock.unlock();    }}

peek() 办法比较简单,间接获取 head 的元素值即可。

总结

Q&A

Q: LinkedBlockingQueue 的实现原理?

A: LinkedBlockingQueue 是基于链表实现的,外部应用 ReentrantLock 互斥锁,避免并发搁置元素或者取出元素的抵触问题。

  1. take、poll、peek 等从队列中获取元素的操作共用 takeLock 锁。
  2. add、put、offer 等向队列中增加元素的操作独特 putLock 锁。
  3. notEmpty 和 notFull 是 Condition 类型,在 take 和 put 操作时,如果如果队列为空或者队列已满,会调用相应的 await 将线程放入条件队列。

Q: 入队列和出队列办法之间的区别是什么?

办法作用
add增加元素,队列满了,增加失败抛出异样
offer增加元素, 队列满了,增加失败,返回 false
put增加元素,队列满了,阻塞期待
poll弹出元素,队列为空则返回 null
take弹出元素,队列为空则期待队列中有元素
peek查看队列中放入最早的一个元素

结束语

LinkedBlockingQueue 应用和 ArrayBlockingQueue 并没有什么区别,外部实现都是应用的 ReentrantLock,能够对照着浏览。同时 Condition 这块也须要着重理解一下。