无界阻塞队列 PriorityBlockingQueue
介绍
PriorityBlockingQueue 是一个带有优先级的无界阻塞队列,每次出队返回的都是优先级最高或者最低的元素。在外部是应用均衡二叉树堆实现,所以遍历元素不保障有序
。
默认应用对象的 compareTo 办法进行比拟,如果须要自定义比拟规定能够自定义 comparators。
该类图能够看到,PriorityBlockingQueue 外部有一个数组 queue,用来寄存队列元素;size 用来寄存元素个数;allocationSpinLock 是个自旋锁,应用CAS
操作来保障同时只有一个线程来进行扩容队列,状态只有 0 和 1,0
示意以后没有进行扩容,1
示意正在扩容。因为是优先级队列,所以有一个比拟器 comparator 用来比拟大小,另外还有 lock 独占锁,notEmpty 条件变量来实现 take 办法的阻塞,因为是无界队列所以没有 notFull 条件变量,所以 put 是非阻塞的。
//二叉树最小堆的实现private transient Object[] queue;private transient int size;private transient volatile int allocationSpinLock;private transient Comparator<? super E> comparator;private final ReentrantLock lock;private final Condition notEmpty;
在构造函数中,默认队列容量为11
,默认比拟器为 null,也就是默认应用元素的 compareTo 办法来确定优先级,所以队列元素必须实现 Comparable 接口。
private static final int DEFAULT_INITIAL_CAPACITY = 11;public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null);}public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null);}public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity];}
offer 操作
offer 操作的作用是在队列中插入一个元素,因为是无界队列,所以始终返回 true。
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; //1. 如果以后元素个数 >= 队列容量 则扩容 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { //2. 默认比拟器为null Comparator<? super E> cmp = comparator; if (cmp == null) siftUpComparable(n, e, array); else //3. 自定义比拟器 siftUpUsingComparator(n, e, array, cmp); //4. 队列元素数量减少1,并唤醒notEmpty条件队列中的一个阻塞线程 size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true;}
如上代码并不简单,咱们次要看看如何进行扩容和在外部建堆。
咱们先看扩容逻辑:
private void tryGrow(Object[] array, int oldCap) { lock.unlock(); // must release and then re-acquire main lock Object[] newArray = null; //1. CAS胜利则扩容 if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { //oldCap<64则扩容执行oldCap+2,否则扩容50%,并且最大值为MAX_ARRAY_SIZE int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { allocationSpinLock = 0; } } //2. 第一个线程CAS胜利后,第二线程进入这段代码,而后第二个线程让出CPU,尽量让第一个线程获取到锁,但得不到保障 if (newArray == null) // back off if another thread is allocating Thread.yield(); lock.lock(); if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); }}
tryGrow 的作用就是扩容,然而为什么要在扩容前开释锁,而后应用 CAS 管制只有一个线程能够扩容胜利?
其实不开释锁也是 ok 的,也就是在扩容期间始终持有该锁,然而扩容须要工夫,这段时间内占用锁的话那么其余线程在这个时候就不能进行出队和入队操作,升高了并发性。所以为了进步性能,应用 CAS 来管制只有一个线程能够进行扩容,并且在扩容前开释锁,进而让其余线程能够进行入队和出队操作。
扩容线程扩容结束后会重置自旋锁变量 allocationSpinLock 为 0,这里并没有应用 UNSAFE 办法的 CAS 进行设置是因为同时只可能有一个线程获取到该锁,并且 allocationSpinLock 被润饰为了 volatile 的。
咱们接着看建堆算法:
private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; // 队列元素个数 > 0 则判断插入地位,否则间接入队 while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (key.compareTo((T) e) >= 0) break; array[k] = e; k = parent; } array[k] = key;}
相熟二叉堆的话,该段代码并不简单,咱们看下图具体构造:
首先咱们看parent = (k - 1) >>> 1
,首先 k - 1 就是拿到以后真正的下标的地位,随后 >>> 1
拿到父节点的地位,该图咱们得悉,k = 7
,执行(k - 1) >>> 1
之后失去的parent = 3
,依据下标咱们晓得是元素 6。
PriorityQueue 是一个齐全二叉树,且不容许呈现 null 节点,其父节点都比叶子节点小,这个是堆排序中的最小堆
。二叉树存入数组的形式很简略,就是从上到下,从左到右。齐全二叉树能够和数组中的地位一一对应:
- 左叶子节点 = 父节点下标 * 2 + 1
- 右叶子节点 = 父节点下标 * 2 + 2
- 父节点 = (叶子节点 - 1) / 2
实际上就是将要插入的元素 x 和它的父节点元素 6 做比照,如果比父节点大就始终向上挪动。
poll 操作
poll 操作的作用是获取队列外部堆树的根节点元素,如果队列为空,则返回 null。
public E poll() { final ReentrantLock lock = this.lock; //获取独占锁 lock.lock(); try { return dequeue(); } finally { //开释独占锁 lock.unlock(); }}
咱们次要看一下 dequeue 办法。
private E dequeue() { int n = size - 1; //队列为空,返回null if (n < 0) return null; else { Object[] array = queue; //1.获取头部元素 E result = (E) array[0]; //2. 获取队尾元素,并赋值为null E x = (E) array[n]; array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null)//3. siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; //4. return result; }}
该办法如果队列为空则间接返回 null,否则执行代码(1)获取数组第一个元素作为返回值寄存到变量 Result 中,这里须要留神,数组外面的第一个元素是优先级最小或者最大的元素,出队操作就是返回这个元素。而后代码(2)获取队列尾部元素并存放到变量 x 中,且置空尾部节点,而后执行代码(3)将变量 x 插入到数组下标为 0 的地位,之后从新调整堆为最大或者最小堆,而后返回。这里重要的是,去掉堆的根节点后,如何应用剩下的节点从新调整一个最大或者最小堆。上面咱们看下 siftDownComparable 的实现。
private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; int half = n >>> 1; // loop while a non-leaf while (k < half) { int child = (k << 1) + 1; // assume left child is least Object c = array[child]; int right = child + 1; if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; if (key.compareTo((T) c) <= 0) break; array[k] = c; k = child; } array[k] = key; }}
因为队列数组第 0 个元素为根,因而出队时要移除它。这时数组就不再是最小的堆了,所以须要调整堆。具体是从被移除的树根的左右子树中找一个最小的值来当树根,左右子树又会找本人左右子树外面那个最小值,这是一个递归过程,直到叶子节点完结递归。
假如目前队列内容如下图:
上图中树根的 leftChildVal = 4; rightChildVal = 6;因为4 < 6
,所以c = 4
。而后因为11 > 4
,也就是key > c
,所以应用元素 4 笼罩树根节点的值。
而后树根的左子树树根的左右孩子节点中的 leftChildVal = 8; rightChildVal = 10;因为8 < 10
,所以c = 8
。而后因为11 > 8
,也就是 key > c
,所以元素 8 作为树根左子树的根节点,当初树的形态如下图第三步所示。这时候判断是否k < half
,后果为 false,所以退出循环。而后把x = 11
的元素设置到数组下标为3
的中央,这时候堆树如下图第四步所示,至此调整堆结束。
put 操作
put 操作外部调用的是 offer 操作,因为是无界队列,所以不须要阻塞。
public void put(E e) { offer(e); // never need to block}
take 操作
take 操作的作用是获取队列外部堆树的根节点元素,如果队列为空则阻塞。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result;}
size 操作
获取队列元素个数。如下代码在返回 size 前加了锁,以保障在调用 size 办法时不会有其余线程进行入队和出队操作。另外,因为 size 变量没有被润饰为 volatie 的,所以这里加锁也保障了在多线程下 size 变量的内存可见性。
public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return size; } finally { lock.unlock(); }}
总结
PriorityBlockingQueue 相似于 ArrayBlockingQueue,在外部应用一个独占锁来管制同时只有一个线程能够进行入队和出队操作。另外,PriorityBlockingQueue 只应用了一个 notEmpty 条件变量而没有应用 notFull,因为是无界队列,执行 put 操作时永远不会处于 await 状态,所以也不须要被唤醒。而 take 办法是阻塞办法,并且是可被中断的。当须要寄存有优先级的元素时该队列比拟有用。