关于java:通俗易懂的JUC源码剖析PriorityBlockingQueue

48次阅读

共计 4367 个字符,预计需要花费 11 分钟才能阅读完成。

前言

PriorityBlockingQueue 是 BlockingQueue 接口的实现类,它是一种优先级阻塞队列,每次出队都返回优先级最高或最低的元素,其外部是用均衡二叉树堆实现的。这里的优先级指的是元素类必须实现 Comparable 接口,而后用 compareTo() 办法比拟元素的优先级大小,当然也可指定自定义的比拟器 comparator。

实现原理

先来看看它的重要属性:

// 队列默认容量为 11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 队列最大容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 寄存元素的数组
private transient Object[] queue;
// 队列长度
private transient int size;
// 自定义比拟器
private transient Comparator<? super E> comparator;
// 操作元素数组的互斥锁
private final ReentrantLock lock;
// 数组非空条件
private final Condition notEmpty;
// 数组扩容操作的自璇锁,1 示意正在扩容,0 示意没有在扩容
private transient volatile int allocationSpinLock;
// 优先级队列
private PriorityQueue<E> q;

再来看它的几个构造函数:

public PriorityBlockingQueue() {this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity,
 Comparator<? super E> comparator) {if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}

再来看重要办法:
put():

public void put(E e) {offer(e); // never need to block
}

offer():

public boolean offer(E e) {if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    // 队列以后长度 >= 队列容量时,进行扩容
    while ((n = size) >= (cap = (array = queue).length))
           tryGrow(array, cap);
    try {
         Comparator<? super E> cmp = comparator;
         // 未指定比拟器时,则应用默认的 compareTo() 来计算插入元素的地位
         if (cmp == null)
             siftUpComparable(n, e, array);
         else 
             // 指定了时,则应用指定的比拟器计算地位
             siftUpUsingComparator(n, e, array, cmp);
         size = n + 1;
         // 唤醒某个期待在 notEmpty 条件的线程
         notEmpty.signal();} finally {lock.unlock();
    }
    return true;
}

其中 tryGrow() 办法如下:

private void tryGrow(Object[] array, int oldCap) {
    // 先开释操作数组的互斥锁,去尝试获取扩容的自璇锁
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    // 尝试获取扩容的锁
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
 0, 1)) {
        try {
            // 计算扩容后的新容量
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            // 新容量超出最大容量时,则取最大容量
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                // 旧容量加 1 依然溢出时,抛内存溢出异样
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
     } finally {allocationSpinLock = 0;}
    }
    // 其余线程抢到了扩容锁并正在扩容时,以后线程则让出 CPU 调度权
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    // 获取操作数组的互斥锁
    lock.lock();
    // 扩容操作胜利时,将旧数组元素拷贝到扩容后的新数组
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

siftUpComparable() 办法如下:

private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}

siftUpUsingComparator() 办法如下:

private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
 Comparator<? super T> cmp) {while (k > 0) {int parent = (k - 1) >>> 1;
 Object e = array[parent];
 if (cmp.compare(x, (T) e) >= 0)
            break;
 array[k] = e;
 k = parent;
 }
    array[k] = x;
}

take():

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
         // 队列元素为空时,阻塞期待
         while ((result = dequeue()) == null)
             notEmpty.await();} finally {lock.unlock();
    }
    return result;
}

poll():

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 非阻塞,队列为空时返回 null
        return dequeue();} finally {lock.unlock();
    }
}

其中 dequeue() 办法如下:

private E dequeue() {
    int n = size - 1;
    if (n < 0)
        return null;
    else {Object[] array = queue;
        E result = (E) array[0];
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else 
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}

其中 siftDownComparable() 办法如下:

private static <T> void siftDownComparable(int k, T x, Object[] array,
 int n) {if (n > 0) {Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1; // loop while a non-leaf
        while (k < half) {int child = (k << 1) + 1; // assume left child is least
            Object c = array[child];
            int right = child + 1;
            if (right < n &&
                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                    c = array[child = right];
            if (key.compareTo((T) c) <= 0)
                    break;
            array[k] = c;
            k = child;
        }
        array[k] = key;
    }
}

siftDownUsingComparator() 办法如下:

private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
 int n,
 Comparator<? super T> cmp) {if (n > 0) {
        int half = n >>> 1;
        while (k < half) {int child = (k << 1) + 1;
             Object c = array[child];
             int right = child + 1;
             if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
                            c = array[child = right];
             if (cmp.compare(x, (T) c) <= 0)
                            break;
             array[k] = c;
             k = child;
       }
       array[k] = x;
    }
}

先睡了,今天再剖析优先级的具体代码。晚安全世界!

正文完
 0