前言

LinkedBlockingQueue实现了BlockingQueue,它是阻塞队列的一种,可用于线程池中。不同于ConcurrentLinkedQueue的CAS非阻塞算法,它底层是用锁实现的阻塞队列。

实现原理

先来看要害属性:

// 队列容量,最大为Integer.MAX_VALUEprivate final int capacity;// 队列长度private final AtomicInteger count = new AtomicInteger();// 头结点transient Node<E> head;// 尾结点private transient Node<E> last;// 移除操作的锁,take/poll办法用到private final ReentrantLock takeLock = new ReentrantLock();// 移除操作须要期待的条件notEmpty,与takeLock绑定private final Condition notEmpty = takeLock.newCondition();// 入队操作的锁,put/offer办法用到private final ReentrantLock putLock = new ReentrantLock();// 入队操作须要期待的条件notFull,与putLock绑定private final Condition notFull = putLock.newCondition();

能够看到,LinkedBlockingQueue外部是用单向链表实现的,并且它有两把锁:takeLock和putLock,以及对应的两个期待条件:notEmpty和notFull。takeLock管制同一时刻只有一个线程从队列头部获取/移除元素,putLock管制同一时刻只有一个线程在队列尾部增加元素。

再来看要害办法:

1.无参构造函数

public LinkedBlockingQueue() {    // 调用有参构造函数,初始化容量capacity为int最大值    this(Integer.MAX_VALUE);}

2.有参构造函数

public LinkedBlockingQueue(int capacity) {    // 容量不能小于0,留神也不能等于0,这点与惯例的汇合不同    if (capacity <= 0) throw new IllegalArgumentException();    this.capacity = capacity;    // 初始化头结点和尾结点为哑节点    last = head = new Node<E>(null);}

3.put()操作

public void put(E e) throws InterruptedException {    if (e == null) throw new NullPointerException();     int c = -1;    Node<E> node = new Node<E>(e);    final ReentrantLock putLock = this.putLock;    final AtomicInteger count = this.count;    putLock.lockInterruptibly();    try {        while (count.get() == capacity) {            notFull.await();        }        enqueue(node);        c = count.getAndIncrement();        if (c + 1 < capacity)            notFull.signal();    } finally {        putLock.unlock();    }    if (c == 0)        signalNotEmpty();}

4.put()操作

public boolean offer(E e) {    if (e == null) throw new NullPointerException();    final AtomicInteger count = this.count;    if (count.get() == capacity)           return false;    int c = -1;    Node<E> node = new Node<E>(e);    final ReentrantLock putLock = this.putLock;    putLock.lock();    try {        if (count.get() < capacity) {            enqueue(node);            c = count.getAndIncrement();            if (c + 1 < capacity)                notFull.signal();        }    } finally {        putLock.unlock();    }    if (c == 0)        signalNotEmpty();    return c >= 0;}

5.take()操作

public E take() throws InterruptedException {    E x;    int c = -1;    final AtomicInteger count = this.count;    final ReentrantLock takeLock = this.takeLock;    takeLock.lockInterruptibly();    try {           while (count.get() == 0) {               notEmpty.await();           }           x = dequeue();           c = count.getAndDecrement();           if (c > 1)               notEmpty.signal();     } finally {          takeLock.unlock();     }     if (c == capacity)         signalNotFull();     return x;}

6.poll()操作

public E poll() {    final AtomicInteger count = this.count;    if (count.get() == 0)        return null;    E x = null;    int c = -1;    final ReentrantLock takeLock = this.takeLock;    takeLock.lock();    try {         if (count.get() > 0) {             x = dequeue();             c = count.getAndDecrement();             if (c > 1)                notEmpty.signal();         }    } finally {         takeLock.unlock();    }    if (c == capacity)        signalNotFull();    return x;}

今天更新,先睡啦,晚安全世界!