前言

PriorityBlockingQueue是BlockingQueue接口的实现类,它是一种优先级阻塞队列,每次出队都返回优先级最高或最低的元素,其外部是用均衡二叉树堆实现的。这里的优先级指的是元素类必须实现Comparable接口,而后用compareTo()办法比拟元素的优先级大小,当然也可指定自定义的比拟器comparator。

实现原理

先来看看它的重要属性:

// 队列默认容量为11private static final int DEFAULT_INITIAL_CAPACITY = 11;// 队列最大容量private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;// 寄存元素的数组private transient Object[] queue;// 队列长度private transient int size;// 自定义比拟器private transient Comparator<? super E> comparator;// 操作元素数组的互斥锁private final ReentrantLock lock;// 数组非空条件private final Condition notEmpty;// 数组扩容操作的自璇锁,1示意正在扩容,0示意没有在扩容private transient volatile int allocationSpinLock;// 优先级队列private PriorityQueue<E> q;

再来看它的几个构造函数:

public PriorityBlockingQueue() {    this(DEFAULT_INITIAL_CAPACITY, null);}
public PriorityBlockingQueue(int initialCapacity) {    this(initialCapacity, null);}
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) {    if (initialCapacity < 1)        throw new IllegalArgumentException();    this.lock = new ReentrantLock();    this.notEmpty = lock.newCondition();    this.comparator = comparator;    this.queue = new Object[initialCapacity];}

再来看重要办法:
put():

public void put(E e) {    offer(e); // never need to block}

offer():

public boolean offer(E e) {    if (e == null)        throw new NullPointerException();    final ReentrantLock lock = this.lock;    lock.lock();    int n, cap;    Object[] array;    // 队列以后长度>=队列容量时,进行扩容    while ((n = size) >= (cap = (array = queue).length))           tryGrow(array, cap);    try {         Comparator<? super E> cmp = comparator;         // 未指定比拟器时,则应用默认的compareTo()来计算插入元素的地位         if (cmp == null)             siftUpComparable(n, e, array);         else              // 指定了时,则应用指定的比拟器计算地位             siftUpUsingComparator(n, e, array, cmp);         size = n + 1;         // 唤醒某个期待在notEmpty条件的线程         notEmpty.signal();    } finally {         lock.unlock();    }    return true;}

其中tryGrow()办法如下:

private void tryGrow(Object[] array, int oldCap) {    // 先开释操作数组的互斥锁,去尝试获取扩容的自璇锁    lock.unlock(); // must release and then re-acquire main lock    Object[] newArray = null;    // 尝试获取扩容的锁    if (allocationSpinLock == 0 &&        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {        try {            // 计算扩容后的新容量            int newCap = oldCap + ((oldCap < 64) ?                                   (oldCap + 2) : // grow faster if small                                   (oldCap >> 1));            // 新容量超出最大容量时,则取最大容量            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow                // 旧容量加1依然溢出时,抛内存溢出异样                int minCap = oldCap + 1;                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)                        throw new OutOfMemoryError();                newCap = MAX_ARRAY_SIZE;            }                        if (newCap > oldCap && queue == array)                newArray = new Object[newCap];     } finally {            allocationSpinLock = 0;     }    }    // 其余线程抢到了扩容锁并正在扩容时,以后线程则让出CPU调度权    if (newArray == null) // back off if another thread is allocating        Thread.yield();    // 获取操作数组的互斥锁    lock.lock();    // 扩容操作胜利时,将旧数组元素拷贝到扩容后的新数组    if (newArray != null && queue == array) {        queue = newArray;        System.arraycopy(array, 0, newArray, 0, oldCap);    }}

siftUpComparable()办法如下:

private static <T> void siftUpComparable(int k, T x, Object[] array) {    Comparable<? super T> key = (Comparable<? super T>) x;    while (k > 0) {        int parent = (k - 1) >>> 1;        Object e = array[parent];        if (key.compareTo((T) e) >= 0)            break;        array[k] = e;        k = parent;    }    array[k] = key;}

siftUpUsingComparator()办法如下:

private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) {    while (k > 0) {        int parent = (k - 1) >>> 1; Object e = array[parent]; if (cmp.compare(x, (T) e) >= 0)            break; array[k] = e; k = parent; }    array[k] = x;}

take():

public E take() throws InterruptedException {    final ReentrantLock lock = this.lock;    lock.lockInterruptibly();    E result;    try {         // 队列元素为空时,阻塞期待         while ( (result = dequeue()) == null)             notEmpty.await();    } finally {         lock.unlock();    }    return result;}

poll():

public E poll() {    final ReentrantLock lock = this.lock;    lock.lock();    try {        // 非阻塞,队列为空时返回null        return dequeue();    } finally {        lock.unlock();    }}

其中dequeue()办法如下:

private E dequeue() {    int n = size - 1;    if (n < 0)        return null;    else {        Object[] array = queue;        E result = (E) array[0];        E x = (E) array[n];        array[n] = null;        Comparator<? super E> cmp = comparator;        if (cmp == null)            siftDownComparable(0, x, array, n);        else             siftDownUsingComparator(0, x, array, n, cmp);        size = n;        return result;    }}

其中siftDownComparable()办法如下:

private static <T> void siftDownComparable(int k, T x, Object[] array, int n) {    if (n > 0) {        Comparable<? super T> key = (Comparable<? super T>)x;        int half = n >>> 1; // loop while a non-leaf        while (k < half) {            int child = (k << 1) + 1; // assume left child is least            Object c = array[child];            int right = child + 1;            if (right < n &&                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)                    c = array[child = right];            if (key.compareTo((T) c) <= 0)                    break;            array[k] = c;            k = child;        }        array[k] = key;    }}

siftDownUsingComparator()办法如下:

private static <T> void siftDownUsingComparator(int k, T x, Object[] array, int n, Comparator<? super T> cmp) {    if (n > 0) {        int half = n >>> 1;        while (k < half) {             int child = (k << 1) + 1;             Object c = array[child];             int right = child + 1;             if (right < n && cmp.compare((T) c, (T) array[right]) > 0)                            c = array[child = right];             if (cmp.compare(x, (T) c) <= 0)                            break;             array[k] = c;             k = child;       }       array[k] = x;    }}

先睡了,今天再剖析优先级的具体代码。晚安全世界!