关于java:通俗易懂的JUC源码剖析ArrayBlockingQueue

44次阅读

共计 1975 个字符,预计需要花费 5 分钟才能阅读完成。

前言

ArrayBlockingQueue 也是 BlockingQueue 接口的实现类,从它的命名就能猜出来,它底层是用数组实现的,不同于 LinkedBlockingQueue 的链表构造。

实现原理

首先来看它的要害属性:

// 寄存元素的数组
final Object[] items;
// 记录下次 take 操作的数组地位
int takeIndex;
// 记录下次 put 操作的数组地位
int putIndex;
// 数组长度
int count;
// 操作数组的锁
final ReentrantLock lock;
// 数组非空条件
private final Condition notEmpty;
// 数组未满条件
private final Condition notFull;

再来看它的重要办法:
offer()

public boolean offer(E e) {checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
         // 队列满了,则返回 false
         if (count == items.length)
             return false;
         else {
             // 队列没满,则入队
             enqueue(e);
             return true; 
         }
    } finally {lock.unlock();
    }
}

put()

public void put(E e) throws InterruptedException {checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 数组满了则阻塞期待,用 while 不必 if,是为了防止虚伪唤醒
        while (count == items.length)
            notFull.await();
        // 队列未满时入队    
        enqueue(e);
    } finally {lock.unlock();
    }
}

poll()

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 数组为空返回 null,否则返回移除的元素
        return (count == 0) ? null : dequeue();} finally {lock.unlock();
    }
}

take()

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
       // 数组为空则阻塞期待
        while (count == 0)
            notEmpty.await();
        // 数组不为空时返回移除的元素    
        return dequeue();} finally {lock.unlock();
    }
}

与 LinkedBlockingQueue 相似,take()、put() 是阻塞的,poll()、offer() 是非阻塞的。
其中,enqueue()、dequeue() 办法如下:

private void enqueue(E x) {// assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null; 
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
           putIndex = 0;
    count++;
    notEmpty.signal();}
private E dequeue() {// assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null; 
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    // 移除元素时,要手动将 takeIndex 地位处的元素置为 null,让它被垃圾回收掉,否则会导致内存透露
    items[takeIndex] = null; // help GC
    if (++takeIndex == items.length)
           takeIndex = 0;
    count--;
    if (itrs != null)
           itrs.elementDequeued();
    notFull.signal();
    return x;
}

因为数组的个性,ArrayBlockingQueue 并没有像 LinkedBlockingQueue 那样应用 2 把锁,而是应用 takeIndex 和 putIndex 记录下一次入队 / 出队操作的数组地位。并且 takeIndex,putIndex,count 三个变量都没有用 volatile 润饰或者 Atomic 原子类,因为他们的批改都在加锁后的环境中进行的,曾经保障了线程平安。

参考资料:
《Java 并发编程之美》

正文完
 0