共计 11150 个字符,预计需要花费 28 分钟才能阅读完成。
ArrayBlockingQueue
有界的阻塞队列, 外部是一个数组, 有边界的意思是: 容量是无限的, 必须进行初始化, 指定它的容量大小, 以先进先出的形式存储数据, 最新插入的在对尾, 最先移除的对象在头部。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** 队列元素 */
final Object[] items;
/** 下一次读取操作的地位, poll, peek or remove */
int takeIndex;
/** 下一次写入操作的地位, offer, or add */
int putIndex;
/** 元素数量 */
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
* 它采纳一个 ReentrantLock 和相应的两个 Condition 来实现。*/
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
/** 指定大小 */
public ArrayBlockingQueue(int capacity) {this(capacity, false);
}
/**
* 指定容量大小与指定拜访策略
* @param fair 指定独占锁是偏心锁还是非偏心锁。非偏心锁的吞吐量比拟高,偏心锁能够保障每次都是期待最久的线程获取到锁;*/
public ArrayBlockingQueue(int capacity, boolean fair) {}
/**
* 指定容量大小、指定拜访策略与最后蕴含给定汇合中的元素
* @param c 将此汇合中的元素在构造方法期间就先增加到队列中
*/
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {}}
ArrayBlockingQueue
在生产者放入数据和消费者获取数据, 都是共用一个锁对象, 由此也意味着两者无奈真正并行运行。依照实现原理来剖析,ArrayBlockingQueue
齐全能够采纳拆散锁, 从而实现生产者和消费者操作的齐全并行运行。然而事实上并没有如此, 因为ArrayBlockingQueue
的数据写入曾经足够笨重, 以至于引入独立的锁机制, 除了给代码带来额定的复杂性外, 其在性能上齐全占不到任何便宜。- 通过构造函数得悉, 参数
fair
管制对象外部是否采纳偏心锁, 默认采纳非偏心锁。 items、takeIndex、putIndex、count
等属性并没有应用volatile
润饰, 这是因为拜访这些变量 (通过办法获取) 应用都在锁内, 并不存在可见性问题, 如size()
。- 另外有个独占锁
lock
用来对出入对操作加锁, 这导致同时只有一个线程能够拜访入队出队。
Put
源码剖析
/** 进行入队操作 */
public void put(E e) throws InterruptedException {
// e 为 null,则抛出 NullPointerException 异样
checkNotNull(e);
// 获取独占锁
final ReentrantLock lock = this.lock;
/**
* lockInterruptibly()
* 获取锁定,除非以后线程为 interrupted
* 如果锁没有被另一个线程占用并且立刻返回,则将锁定计数设置为 1。* 如果以后线程曾经保留此锁,则放弃计数将递增 1,该办法立刻返回。* 如果锁被另一个线程放弃,则以后线程将被禁用以进行线程调度,并且处于休眠状态
*
*/
lock.lockInterruptibly();
try {
// 空队列
while (count == items.length)
// 进行条件期待解决
notFull.await();
// 入队操作
enqueue(e);
} finally {
// 开释锁
lock.unlock();}
}
/** 真正的入队 */
private void enqueue(E x) {// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
// 获取以后元素
final Object[] items = this.items;
// 按下一个插入索引进行元素增加
items[putIndex] = x;
// 计算下一个元素应该寄存的下标,能够了解为循环队列
if (++putIndex == items.length)
putIndex = 0;
count++;
// 唤起消费者
notEmpty.signal();}
这里因为在操作共享变量前加了锁, 所以不存在内存不可见问题, 加锁后获取的共享变量都是从主内存中获取的, 而不是在 CPU 缓存或者寄存器外面的值, 开释锁后批改的共享变量值会刷新到主内存。
另外这个队列应用循环数组实现, 所以在计算下一个元素寄存下标时候有些非凡。另外
insert
后调用notEmpty.signal()
; 是为了激活调用notEmpty.await();
阻塞后放入notEmpty
条件队列的线程。
Take
源码剖析
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {while (count == 0)
notEmpty.await();
return dequeue();} finally {lock.unlock();
}
}
private E dequeue() {// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
// 这里有些非凡
if (itrs != null)
// 放弃队列中的元素和迭代器的元素统一
itrs.elementDequeued();
notFull.signal();
return x;
}
Take
操作和Put
操作很相似
// 该类的迭代器,所有的迭代器共享数据,队列扭转会影响所有的迭代器
transient Itrs itrs = null; // 其寄存了目前所创立的所有迭代器。/**
* 迭代器和它们的队列之间的共享数据,容许队列元素被删除时更新迭代器的批改。*/
class Itrs {void elementDequeued() {// assert lock.getHoldCount() == 1;
if (count == 0)
// 队列中数量为 0 的时候,队列就是空的,会将所有迭代器进行清理并移除
queueIsEmpty();
//takeIndex 的下标是 0,意味着队列从尾中取完了,又回到头部获取
else if (takeIndex == 0)
takeIndexWrapped();}
/**
* 当队列为空的时候做的事件
* 1. 告诉所有迭代器队列曾经为空
* 2. 清空所有的弱援用,并且将迭代器置空
*/
void queueIsEmpty() {}
/**
* 将 takeIndex 包装成 0
* 并且告诉所有的迭代器,并且删除曾经过期的任何对象(集体了解是置空对象)* 也间接的说就是在 Blocking 队列进行出队的时候,进行迭代器中的数据同步,放弃队列中的元素和迭代器的元素是统一的。*/
void takeIndexWrapped() {}
}
Itrs 迭代器创立的机会
// 从这里晓得,在 ArrayBlockingQueue 对象中调用此办法,才会生成这个对象
// 那么就能够了解为,只有并未调用此办法,则 ArrayBlockingQueue 对象中的 Itrs 对象则为空
public Iterator<E> iterator() {return new Itr();
}
private class Itr implements Iterator<E> {Itr() {
// 这里就是生产它的中央
//count 等于 0 的时候,创立的这个迭代器是个无用的迭代器,能够间接移除,进入 detach 模式。// 否则就把以后队列的读取地位给迭代器当做下一个元素,cursor 存储下个元素的地位。if (count == 0) {
// assert itrs == null;
cursor = NONE;
nextIndex = NONE;
prevTakeIndex = DETACHED;
} else {
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
prevTakeIndex = takeIndex;
nextItem = itemAt(nextIndex = takeIndex);
cursor = incCursor(takeIndex);
if (itrs == null) {itrs = new Itrs(this);
} else {itrs.register(this); // in this order
itrs.doSomeSweeping(false);
}
prevCycles = itrs.cycles;
// assert takeIndex >= 0;
// assert prevTakeIndex == takeIndex;
// assert nextIndex >= 0;
// assert nextItem != null;
}
}
}
代码演示
package com.rumenz.task;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @className: BlockingQuqueExample
* @description: TODO 类形容
* @author: mac
* @date: 2021/1/20
**/
public class BlockingQueueExample {
private static volatile Boolean flag=false;
public static void main(String[] args) {BlockingQueue blockingQueue=new ArrayBlockingQueue(1024);
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(()->{
try{blockingQueue.put(1);
Thread.sleep(2000);
blockingQueue.put(3);
flag=true;
}catch (Exception e){e.printStackTrace();
}
});
executorService.execute(()->{
try {while (!flag){Integer i = (Integer) blockingQueue.take();
System.out.println(i);
}
}catch (Exception e){e.printStackTrace();
}
});
executorService.shutdown();}
}
LinkedBlockingQueue
基于链表的阻塞队列, 通
ArrayBlockingQueue
相似, 其外部也保护这一个数据缓冲队列(该队列由一个链表形成), 当生产者往队列放入一个数据时, 队列会从生产者手上获取数据, 并缓存在队列的外部, 而生产者立刻返回, 只有当队列缓冲区达到最大值容量时(LinkedBlockingQueue 能够通过构造函数指定该值), 才会阻塞队列, 直到消费者从队列中生产掉一份数据, 生产者会被唤醒, 反之对于消费者这端的解决也基于同样的原理。
LinkedBlockingQueue
之所以可能高效的解决并发数据, 还因为其对于生产者和消费者端别离采纳了独立的锁来控制数据同步, 这也意味着在高并发的状况下生产者和消费者能够并行的操作队列中的数据, 以调高整个队列的并发能力。如果结构一个
LinkedBlockingQueue
对象, 而没有指定容量大小,LinkedBlockingQueue
会默认一个相似有限大小的容量Integer.MAX_VALUE
, 这样的话, 如果生产者的速度一旦大于消费者的速度, 兴许还没有等到队列满阻塞产生, 零碎内存就有可能曾经被耗费殆尽了。
LinkedBlockingQueue
是一个应用链表实现队列操作的阻塞队列。链表是单向链表, 而不是双向链表。
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 队列的容量,指定大小或为默认值 Integer.MAX_VALUE
private final int capacity;
// 元素的数量
private final AtomicInteger count = new AtomicInteger();
// 队列头节点,始终满足 head.item==null
transient Node<E> head;
// 队列的尾节点,始终满足 last.next==null
private transient Node<E> last;
/** Lock held by take, poll, etc */
// 出队的锁:take, poll, peek 等读操作的办法须要获取到这个锁
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
// 当队列为空时,保留执行出队的线程:如果读操作的时候队列是空的,那么期待 notEmpty 条件
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
// 入队的锁:put, offer 等写操作的办法须要获取到这个锁
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
// 当队列满时,保留执行入队的线程:如果写操作的时候队列是满的,那么期待 notFull 条件
private final Condition notFull = putLock.newCondition();
// 传说中的无界队列
public LinkedBlockingQueue() {}
// 传说中的有界队列
public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
// 传说中的无界队列
public LinkedBlockingQueue(Collection<? extends E> c){}
/**
* 链表节点类
*/
static class Node<E> {
E item;
/**
* One of:
* - 真正的继任者节点
* - 这个节点,意味着继任者是 head.next
* - 空,意味着没有后继者(这是最初一个节点)*/
Node<E> next;
Node(E x) {item = x;}
}
}
通过其构造函数, 得悉其能够当做无界队列也能够当做有界队列来应用。
这里用了两把锁别离是
takeLock
和putLock
, 而Condition
别离是notEmpty
和notFull
, 它们是这样搭配的。
- 如果须要获取 (take) 一个元素, 须要获取
takeLock
锁, 然而获取了锁还不够, 如果队列此时为空, 还须要队列不为空 (notEmpty) 这个条件(Condition)。 - 如果要插入 (put) 一个元素, 须要获取
putLock
锁, 然而获取了锁还不够, 如果队列此时已满, 还是须要队列不满 (notFull) 的这个条件(Condition)。
从下面的构造函数中能够看到, 这里会初始化一个空的头结点, 那么第一个元素入队的时候, 队列中就会有两个元素。读取元素时, 也是获取头结点前面的一个元素。count 的计数值不蕴含这个头结点。
Put 源码剖析
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* 将指定元素插入到此队列的尾部,如有必要,则期待空间变得可用。*/
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();
// 如果你纠结这里为什么是 -1,能够看看 offer 办法。这就是个标识胜利、失败的标记而已。int c = -1;
// 包装成 node 节点
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 获取锁定
putLock.lockInterruptibly();
try {
/** 如果队列满,期待 notFull 的条件满足。*/
while (count.get() == capacity) {notFull.await();
}
// 入队
enqueue(node);
// 原子性自增
c = count.getAndIncrement();
// 如果这个元素入队后,还有至多一个槽能够应用,调用 notFull.signal() 唤醒期待线程。// 哪些线程会期待在 notFull 这个 Condition 上呢?if (c + 1 < capacity)
notFull.signal();} finally {
// 解锁
putLock.unlock();}
// 如果 c == 0,那么代表队列在这个元素入队前是空的(不包含 head 空节点),// 那么所有的读线程都在期待 notEmpty 这个条件,期待唤醒,这里做一次唤醒操作
if (c == 0)
signalNotEmpty();}
/** 链接节点在队列开端 */
private void enqueue(Node<E> node) {// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
// 入队的代码非常简单,就是将 last 属性指向这个新元素,并且让原队尾的 next 指向这个元素
//last.next = node;
//last = node;
// 这里入队没有并发问题,因为只有获取到 putLock 独占锁当前,才能够进行此操作
last = last.next = node;
}
/**
* 期待 PUT 信号
* 仅在 take/poll 中调用
* 也就是说:元素入队后,如果须要,则会调用这个办法唤醒读线程来读
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {notFull.signal();// 唤醒
} finally {putLock.unlock();
}
}
}
Take 源码剖析
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 首先,须要获取到 takeLock 能力进行出队操作
takeLock.lockInterruptibly();
try {
// 如果队列为空,期待 notEmpty 这个条件满足再继续执行
while (count.get() == 0) {notEmpty.await();
}
//// 出队
x = dequeue();
//count 进行原子减 1
c = count.getAndDecrement();
// 如果这次出队后,队列中至多还有一个元素,那么调用 notEmpty.signal() 唤醒其余的读线程
if (c > 1)
notEmpty.signal();} finally {takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
/**
* 出队
*/
private E dequeue() {// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
/**
* Signals a waiting put. Called only from take/poll.
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {notFull.signal();
} finally {putLock.unlock();
}
}
}
与 ArrayBlockingQueue
比照
ArrayBlockingQueue 是共享锁,粒度大,入队与出队的时候只能有 1 个被执行,不容许并行执行。LinkedBlockingQueue 是独占锁,入队与出队是能够并行进行的。当然这里说的是读和写进行并行,两者的读读与写写是不能并行的。总结就是 LinkedBlockingQueue 能够并发读写。
ArrayBlockingQueue 和 LinkedBlockingQueue 间还有一个显著的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额定的对象实例,而后者则会生成一个额定的 Node 对象。这在长时间内须要高效并发地解决大批量数据的零碎中,其对于 GC 的影响还是存在肯定的区别。
LinkedBlockingQueue
实现一个线程增加文件对象, 四个线程读取文件对象
package concurrent;
import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class TestBlockingQueue {static long randomTime() {return (long) (Math.random() * 1000);
}
public static void main(String[] args) {
// 能包容 100 个文件
final BlockingQueue<File> queue = new LinkedBlockingQueue<File>(100);
// 线程池
final ExecutorService exec = Executors.newFixedThreadPool(5);
final File root = new File("F:\\JavaLib");
// 实现标记
final File exitFile = new File("");
// 读个数
final AtomicInteger rc = new AtomicInteger();
// 写个数
final AtomicInteger wc = new AtomicInteger();
// 读线程
Runnable read = new Runnable() {public void run() {scanFile(root);
scanFile(exitFile);
}
public void scanFile(File file) {if (file.isDirectory()) {File[] files = file.listFiles(new FileFilter() {public boolean accept(File pathname) {return pathname.isDirectory()
|| pathname.getPath().endsWith(".java");
}
});
for (File one : files)
scanFile(one);
} else {
try {int index = rc.incrementAndGet();
System.out.println("Read0:" + index + " "
+ file.getPath());
queue.put(file);
} catch (InterruptedException e) {}}
}
};
exec.submit(read);
// 四个写线程
for (int index = 0; index < 4; index++) {
// write thread
final int NO = index;
Runnable write = new Runnable() {
String threadName = "Write" + NO;
public void run() {while (true) {
try {Thread.sleep(randomTime());
int index = wc.incrementAndGet();
File file = queue.take();
// 队列曾经无对象
if (file == exitFile) {
// 再次增加 "标记",以让其余线程失常退出
queue.put(exitFile);
break;
}
System.out.println(threadName + ":" + index + " "
+ file.getPath());
} catch (InterruptedException e) {}}
}
};
exec.submit(write);
}
exec.shutdown();}
}
关注微信公众号:【入门小站】, 解锁更多知识点。