共计 5355 个字符,预计需要花费 14 分钟才能阅读完成。
JDK 数组阻塞队列源码深刻分析
前言
在后面一篇文章从零开始本人入手写阻塞队列当中咱们认真介绍了阻塞队列提供给咱们的性能,以及他的实现原理,并且基于谈到的内容咱们本人实现了一个低配版的 数组阻塞队列 。在这篇文章当中咱们将认真介绍 JDK 具体是如何实现 数组阻塞队列 的。
阻塞队列的性能
而在本篇文章所谈到的阻塞队列当中,是在并发的状况下应用的,下面所谈到的是队列是 并发不平安 的,然而阻塞队列在并发下状况是平安的。阻塞队列的次要的需要如下:
- 队列根底的性能须要有,往队列当中放数据,从队列当中取数据。
- 所有的队列操作都要是 并发平安 的。
- 当队列满了之后再往队列当中放数据的时候,线程须要被挂起,当队列当中的数据被取出,让队列当中有空间的时候线程须要被唤醒。
- 当队列空了之后再往队列当中取数据的时候,线程须要被挂起,当有线程往队列当中退出数据的时候被挂起的线程须要被唤醒。
- 在咱们实现的队列当中咱们应用数组去存储数据,因而在构造函数当中须要提供数组的初始大小,设置用多大的数组。
下面就是数组阻塞队列给咱们提供的最外围的性能,其中将线程挂起和唤醒就是阻塞队列的外围,挂起和唤醒体现了“阻塞”这一核心思想。
数组阻塞队列设计
浏览这部分内容你须要相熟可重入锁 ReentrantLock
和条件变量 Condition
的应用。
数组的循环应用
因为咱们是应用数组存储队列当中的数据,从下表为 0 的地位开始,当咱们往队列当中退出一些数据之后,队列的状况可能如下,其中 head 示意队头,tail 示意队尾。
在上图的根底之上咱们再进行四次出队操作,后果如下:
在下面的状态下,咱们持续退出 8 个数据,那么布局状况如下:
咱们晓得上图在退出数据的时候不仅将数组后半局部的空间应用完了,而且能够持续应用前半部分没有应用过的空间,也就是说在队列外部实现了一个循环应用的过程。
字段设计
在 JDK 当中数组阻塞队列的实现是 ArrayBlockingQueue
类,在他的外部是应用数组实现的,咱们当初来看一下它的次要的字段,为了不便浏览将所有的解释阐明都写在的正文当中:
/** The queued items */
final Object[] items; // 这个就是具体存储数据的数组
/** items index for next take, poll, peek or remove */
int takeIndex; // 因为是队列 因而咱们须要晓得下一个出队的数据的下标 这个就是示意下一个将要出队的数据的下标
/** items index for next put, offer, or add */
int putIndex; // 咱们同时也须要下一个入队的数据的下标
/** Number of elements in the queue */
int count; // 统计队列当中一共有多少个数据
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
final ReentrantLock lock; // 因为阻塞队列是一种能够并发应用的数据结构
/** Condition for waiting takes */
private final Condition notEmpty; // 这个条件变量次要用于唤醒被 take 函数阻塞的线程 也就是从队列当中取数据的线程
/** Condition for waiting puts */
private final Condition notFull; // 这个条件变量次要用于唤醒被 put 函数阻塞的线程 也就是从队列当中放数据的线程
构造函数
构造函数的次要性能是申请指定大小的内存空间,并且对类的成员变量进行赋值操作。
public ArrayBlockingQueue(int capacity) {
// capacity 示意存储数据的数组的长度
this(capacity, false);
}
// fair 这个参数次要是用于阐明 是否应用偏心锁
// 如果为 true 示意应用偏心锁 执行效率低 然而各个线程进入临界区的程序是先来后到的程序 更加偏心
// 如果为 false 示意应用非偏心锁 执行效率更高
public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
// 对变量进行赋值操作
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();}
put 函数
这个函数是阻塞队列对外围的函数之一了,首先咱们须要理解的是,如果一个线程调用了这个函数往队列当中退出数据,如果此时队列曾经满了则线程须要被挂起,如果没有满则须要将数据退出到队列当中,也就是将数据存储到数组当中。留神还有一个很重要的一点是,当咱们往队列当中退出一个数据之后须要发一个信号给其余被 take
函数阻塞的线程,因为这些线程在取数据的时候可能队列当中曾经空了,因而须要将这些线程唤醒。
public void put(E e) throws InterruptedException {checkNotNull(e); // 保障输出的数据不为 null 代码在下方
final ReentrantLock lock = this.lock;
// 进行加锁操作,因为上面是临界区
lock.lockInterruptibly();
try {while (count == items.length) // 如果队列曾经满了 也就是队列当中数据的个数 count == 数组的长度的话 就须要将线程挂起
notFull.await();
// 当队列当中有空间的之后将数据退出到队列当中 这个函数在上面仔细分析 代码在下方
enqueue(e);
} finally {lock.unlock();
}
}
private static void checkNotNull(Object v) {if (v == null)
throw new NullPointerException();}
private void enqueue(E x) {// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
// 进入这个函数的线程曾经在 put 函数当中加上锁了 因而这里不须要加锁
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length) // 因为这个数据是循环应用的 因而能够回到下标为 0 的地位
// 因为队列当中的数据能够出队 因而下标为 0 的地位不存在数据能够应用
putIndex = 0;
count++;
// 在这里须要将一个被 take 函数阻塞的线程唤醒 如果调用这个办法的时候没有线程阻塞
// 那么调用这个办法相当于没有调用 如果有线程阻塞那么将会唤醒一个线程
notEmpty.signal();}
留神 :这里有一个中央非常容易被疏忽,那就是在将线程挂起的时候应用的是while
循环而不是 if
条件语句,代码:
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {while (count == items.length)
notFull.await();
enqueue(e);
} finally {lock.unlock();
}
这是因为,线程被唤醒之后并不会立马执行,因为线程在调用 await
办法的之后会开释锁🔒,他想再次执行还须要再次取得锁,而后就在他获取锁之前的这段时间外面,可能其余的线程也会从数组当中放数据,因而这个线程执行的时候队列可能还是满的,因而须要再次判断,否则就会笼罩数据,像这种唤醒之后并没有满足线程执行条件的景象叫做 虚伪唤醒 ,因而大家在写程序的时候要分外留神,当须要将线程挂起或者唤醒的之后,最好思考分明,如果不确定能够应用while
代替if
,这样的话更加保险。
take 函数
这个函数次要是从队列当中取数据,然而当队列为空的时候须要将调用这个办法的线程阻塞。当队列当中有数据的时候,就能够从队列当中取出数据,然而有一点很重要的就是当从队列当中取出数据之后,须要调用 signal
办法,用于唤醒被 put 函数阻塞的线程,因为从队列当中取出数据了,队列必定曾经不满了,因而能够唤醒被 put 函数阻塞的线程了。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 因为取数据的代码波及到数据竞争 也就是说多个线程同时竞争 数组数据 items 因而须要用锁爱护起来
lock.lockInterruptibly();
try {
// 当 count == 0 阐明队列当中没有数据
while (count == 0)
notEmpty.await();
// 当队列当中还有数据的时候能够将数据出队
return dequeue();} finally {lock.unlock();
}
}
private E dequeue() {// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 取出数据
E x = (E) items[takeIndex];
items[takeIndex] = null; // 将对应的地位设置为 null 数据就能够被垃圾收集器回收了
if (++takeIndex == items.length)
takeIndex = 0;
count--;
// 迭代器也须要出队 如果不了
if (itrs != null)
itrs.elementDequeued();
// 调用 signal 函数 将被 put 函数阻塞的线程唤醒 如果调用这个办法的时候没有线程阻塞
// 那么调用这个办法相当于没有调用 如果有线程阻塞那么将会唤醒一个线程
notFull.signal();
return x;
}
同样的情理这里也须要应用 while
循环去进行阻塞,否则可能存在 虚伪唤醒 ,可能队列当中没有数据返回的数据为 null,而且会毁坏队列的构造因为会波及队列的两个端点的值的扭转,也就是takeIndex
和putIndex
的扭转。
offer 函数
这个函数的作用和 put 函数一样,只不过当队列满了的时候,这个函数返回 false,退出数据胜利之后这个函数返回 true,上面的代码就比较简单了。
public boolean offer(E e) {checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果队列当中的数据个数和数组的长度相等 阐明队列满了 间接返回 false 即可
if (count == items.length)
return false;
else {enqueue(e);
return true;
}
} finally {lock.unlock();
}
}
add 函数
这个函数和下面两个函数的意义也是一样的,只不过当队列满了之后这个函数会抛出异样。
public boolean add(E e) {if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
poll 函数
这个函数和 take 函数的作用差不多,然而这个函数不会阻塞,当队列当中没有数据的时候间接返回 null,有数据的话返回数据。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {return (count == 0) ? null : dequeue();} finally {lock.unlock();
}
}
总结
在本篇文章当中次要介绍了 JDK 外部是如何实现 ArrayBlockingQueue
的,如果你对锁和队列的应用有肯定的理解本篇文章应该还是比拟容易了解的。在实现 ArrayBlockingQueue
当中有以下须要留神的点:
put
函数,如果在往队列当中退出数据的时候队列满了,则须要将线程挂起。在队列当中有空间之后,线程被唤醒继续执行,在往队列当中退出了数据之后,须要调用signal
办法,唤醒被take
函数阻塞的线程。take
函数,如果在往队列当中取出数据的时候队列空了,则须要将线程挂起。在队列当中有数据之后,线程被唤醒继续执行,在从队列当中取出数据之后,须要调用signal
办法,唤醒被put
函数阻塞的线程。- 在调用
await
函数的时候,须要小心 虚伪唤醒 景象。
以上就是本篇文章的所有内容了,我是LeHung,咱们下期再见!!!更多精彩内容合集可拜访我的项目:https://github.com/Chang-LeHu…
关注公众号:一无是处的钻研僧,理解更多计算机(Java、Python、计算机系统根底、算法与数据结构)常识。