共计 8205 个字符,预计需要花费 21 分钟才能阅读完成。
| 难看请赞,养成习惯
- 你有一个思维,我有一个思维,咱们替换后,一个人就有两个思维
- If you can NOT explain it simply, you do NOT understand it well enough
现陆续将 Demo 代码和技术文章整顿在一起 Github 实际精选,不便大家浏览查看,本文同样收录在此,感觉不错,还请 Star
前言
如果依照用处与个性进行粗略的划分,JUC 包中蕴含的工具大体能够分为 6 类:
- 执行者与线程池
- 并发队列
- 同步工具
- 并发汇合
- 锁
- 原子变量
在并发系列中,次要解说了 执行者与线程池
, 同步工具
, 锁
,在剖析源码时,或多或少的提及到了「队列」,队列在 JUC 中也是多种多样存在,所以本文就以「远看」视角,帮忙大家疾速理解与辨别这些看似「芜杂」的队列
并发队列
Java 并发队列依照实现形式来进行划分能够分为 2 种:
- 阻塞队列
- 非阻塞队列
如果你曾经看完并发系列锁的实现,你曾经可能晓得他们实现的区别:
前者就是基于锁实现的,后者则是基于 CAS 非阻塞算法实现的
常见的队列有上面这几种:
霎时懵逼?看到这个没有兽性的图想间接走人?主观先别急,一会就柳暗花明了
当下你兴许有个问题:
为什么会有这么多种队列的存在?
锁有应答各种情景的锁,队列也天然有应答各种情景的队列了, 是不是也有点繁多职责准则的意思呢?
所以咱们要理解这些队列到底是怎么设计的?以及用在了哪些地方?
先来看下图
如果你在 IDE 中关上以上非阻塞队列和阻塞队列,查看其实现办法,你就会发现,阻塞队列
较非阻塞队列
额定反对两种操作:
- 阻塞的插入
当队列满时,队列会阻塞插入元素的线程,直到队列不满
- 阻塞的移除
当队列为空时,获取元素的线程会阻塞,直到队列变为非空
综合阐明入队 / 出队操作,看似芜杂的办法,用一个表格就能概括了
抛出异样
- 当队列满时,此时如果再向队列中插入元素,会抛出 IllegalStateException(这很好了解)
- 当队列空时,此时如果再从队列中获取元素,会抛出 NoSuchElementException(这也很好了解)
返回非凡值
- 当向队列插入元素时,会返回元素是否插入胜利,胜利则返回 true
- 当从队列移除元素时,如果没有则返回 null
始终阻塞
- 当队列满时,如果 生产者线程 向队列 put 元素,队列会始终阻塞生产者线程,直到队列可用或者响应中断退出
- 当队列为空时,如果 消费者线程 从队列外面 take 元素,队列会阻塞消费者线程,直到队列不为空
对于阻塞,咱们其实早在 并发编程之期待告诉机制 就曾经充分说明过了,你还记得上面这张图吗?原理其实是一样一样滴
超时退出
和锁一样,因为有阻塞,为了灵便应用,就肯定反对超时退出,阻塞工夫达到超时工夫,就会间接返回
至于为啥插入和移除这么多种单词示意模式,我也不晓得,为了不便记忆,只须要记住阻塞的办法模式即可:
单词
put
和take
字母t
首位相连,一个放,一个拿
到这里你应该对 Java 并发队列有了个初步的意识了,原来看似芜杂的办法貌似也有了法则。接下来就到了疯狂串知识点的时刻了,借助前序章节的常识,分分钟就了解全副队列了
ArrayBlockingQueue
之前也说过,JDK 中的命名还是很考究滴,一看这名字,底层就是数组实现了,是否有界,那就看在结构的时候是否须要指定 capacity 值了
填鸭式的阐明也容易忘,这些都是哪看到的呢?在所有队列的 Java docs 的第一段,一句话就概括了该队列的次要个性,所以强烈建议大家本人在看源码时,简略 瞄一眼 docs 结尾,心中就有多半个数了
在讲 Java AQS 队列同步器以及 ReentrantLock 的利用 时咱们介绍了偏心锁与非偏心锁的概念,ArrayBlockingQueue 也有同样的概念,看它的构造方法,就有 ReentrantLock 来辅助实现
public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();}
默认状况下,仍旧是不保障线程偏心拜访队列(偏心与否是指阻塞的线程是否依照阻塞的先后顺序拜访队列,先阻塞线拜访,后阻塞后拜访)
到这我也要长期问一个说过屡次的面试送分题了:
为什么默认采纳非偏心锁的形式?它较偏心锁形式有什么益处,又可能带来哪些问题?
晓得了以上内容,联合下面表格中的办法,ArrayBlockingQueue 就能够轻松过关了
和数组绝对的天然是链表了
LinkedBlockingQueue
LinkedBlockingQueue 也算是一个有界阻塞队列,从上面的构造函数中你也能够看出,该队列的默认和最大长度为 Integer.MAX_VALUE,这也就 docs 说 optionally-bounded 的起因了
public LinkedBlockingQueue() {this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
正如 Java 汇合一样,链表模式的队列,其存取效率要比数组模式的队列高。然而在一些并发程序中,数组模式的队列因为具备肯定的可预测性,因而能够在某些场景中取得更高的效率
看到 LinkedBlockingQueue 是不是也有些相熟呢?为什么要应用线程池? 就曾经和它屡次照面了
创立单个线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
创立固定个数线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
面试送分题又来了
应用 Executors 创立线程池很简略,为什么大厂严格要求禁用这种创立形式呢?
PriorityBlockingQueue
PriorityBlockingQueue 是一个反对优先级的无界的阻塞队列,默认状况下采纳天然程序升序排列,当然也有非默认状况自定义优先级,须要排序,那天然要用到 Comparator 来定义排序规定了
能够定义优先级,天然也就有相应的限度,以及应用的注意事项
- 依照上图阐明,队列中不容许存在 null 值,也不容许存在不能排序的元素
-
对于排序值雷同的元素,其序列是不保障的,但你能够持续自定义其余能够辨别进去优先级的值,如果你有严格的优先级辨别,倡议有更欠缺的比拟规定,就像 Java docs 这样
class FIFOEntry<E extends Comparable<? super E>> implements Comparable<FIFOEntry<E>> {static final AtomicLong seq = new AtomicLong(0); final long seqNum; final E entry; public FIFOEntry(E entry) {seqNum = seq.getAndIncrement(); this.entry = entry; } public E getEntry() { return entry;} public int compareTo(FIFOEntry<E> other) {int res = entry.compareTo(other.entry); if (res == 0 && other.entry != this.entry) res = (seqNum < other.seqNum ? -1 : 1); return res; } }
- 队列容量是没有下限的,然而如果插入的元素超过负载,有可能会引起 OutOfMemory 异样(这是必定的),这也是为什么咱们通常所说,队列无界,心中有界
-
PriorityBlockingQueue 也有 put 办法,这是一个阻塞的办法,因为它是无界的,天然不会阻塞,所以就有了上面比拟聪慧的做法
public void put(E e) {offer(e); // never need to block 请自行对照下面表格 }
-
能够给定初始容量,这个容量会依照肯定的算法主动裁减
// Default array capacity. private static final int DEFAULT_INITIAL_CAPACITY = 11; public PriorityBlockingQueue() {this(DEFAULT_INITIAL_CAPACITY, null); }
这里默认的容量是 11,因为也是基于数组,那面试送分题又来了
你通常是怎么定义容器 / 汇合初始容量的?有哪些根据?
DelayQueue
DelayQueue 是一个反对延时获取元素的无界阻塞队列
- 是否延时必定是和某个工夫 (通常和以后工夫) 进行 比拟
- 比拟过后还要进行排序,所以也是存在肯定的 优先级
看到这兴许感觉这有点和 PriorityBlockingQueue
很像,没错,DelayQueue
的外部也是应用 PriorityQueue
上图绿色框线也通知你,DelayQueue 队列的元素必须要实现 Depayed 接口:
所以从上图能够看出应用 DelayQueue 非常简单,只须要两步:
实现 getDelay() 办法,返回元素要延时多长时间
public long getDelay(TimeUnit unit) {
// 最好采纳纳秒模式,这样更准确
return unit.convert(time - now(), NANOSECONDS);
}
实现 compareTo() 办法,比拟元素程序
public int compareTo(Delayed other) {if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
下面的代码哪来的呢?如果你关上 ScheduledThreadPoolExecutor 里的 ScheduledFutureTask,你就看到了(ScheduledThreadPoolExecutor 外部就是利用 DelayQueue)
所以综合来说,上面两种状况非常适合应用 DelayQueue
- 缓存零碎的设计:用 DelayQueue 保留缓存元素的有效期,应用一个线程循环查问 DelayQueue,如果能从 DelayQueue 中获取元素,阐明缓存有效期到了
- 定时任务调度:用 DelayQueue 保留当天会执行的工作以及工夫,如果能从 DelayQueue 中获取元素,工作就能够开始执行了。比方 TimerQueue 就是这样实现的
SynchronousQueue
这是一个 不存储元素 的阻塞队列,不存储元素还叫队列?
没错,SynchronousQueue 直译过去叫 同步队列,如果在队列外面呆久了应该就算是“异步”了吧
所以应用它,每个 put() 操作必须要期待一个 take() 操作,反之亦然,否则不能持续增加元素
理论中怎么用呢?如果你须要两个线程之间同步共享变量,如果不必 SynchronousQueue 你可能会抉择用 CountDownLatch 来实现,就像这样:
ExecutorService executor = Executors.newFixedThreadPool(2);
AtomicInteger sharedState = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(1);
Runnable producer = () -> {
Integer producedElement = ThreadLocalRandom
.current()
.nextInt();
sharedState.set(producedElement);
countDownLatch.countDown();};
Runnable consumer = () -> {
try {countDownLatch.await();
Integer consumedElement = sharedState.get();} catch (InterruptedException ex) {ex.printStackTrace();
}
};
这点小事就用计数器来实现,显然很不适合,用 SynchronousQueue 革新一下,感觉霎时就不一样了
ExecutorService executor = Executors.newFixedThreadPool(2);
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
Runnable producer = () -> {
Integer producedElement = ThreadLocalRandom
.current()
.nextInt();
try {queue.put(producedElement);
} catch (InterruptedException ex) {ex.printStackTrace();
}
};
Runnable consumer = () -> {
try {Integer consumedElement = queue.take();
} catch (InterruptedException ex) {ex.printStackTrace();
}
};
其实 Executors.newCachedThreadPool() 办法外面应用的就是 SynchronousQueue
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
看到后面
LinkedBlockingQueue
用在newSingleThreadExecutor
和newFixedThreadPool
上,而newCachedThreadPool
却用SynchronousQueue
,这是为什么呢?
因为单线程池和固定线程池中,线程数量是无限的,因而提交的工作须要在 LinkedBlockingQueue
队列中期待空余的线程;
而缓存线程池中,线程数量简直有限(下限为Integer.MAX_VALUE
),因而提交的工作只须要在SynchronousQueue
队列中同步移交给空余线程即可, 所以有时也会说 SynchronousQueue
的吞吐量要高于 LinkedBlockingQueue
和 ArrayBlockingQueue
LinkedTransferQueue
简略来说,TransferQueue 提供了一个场合,生产者线程应用 transfer
办法传入一些对象并阻塞,直至这些对象被消费者线程全副取出。
你有没有感觉,刚刚介绍的 SynchronousQueue 是否很像一个容量为 0 的 TransferQueue。
但 LinkedTransferQueue 相比其余阻塞队列多了三个办法
- transfer(E e)
如果以后有消费者正在期待生产元素,transfer 办法就能够间接将生产者传入的元素立即 transfer (传输) 给消费者;如果没有消费者期待生产元素,那么 transfer 办法会把元素放到队列的 tail(尾部)
节点,始终阻塞,直到该元素被消费者生产才返回
- tryTransfer(E e)
tryTransfer,很显然是一种尝试,如果没有消费者期待生产元素,则马上返回 false,程序不会阻塞
- tryTransfer(E e, long timeout, TimeUnit unit)
带有超时限度,尝试将生产者传入的元素 transfer 给消费者,如果超时工夫到,还没有消费者生产元素,则返回 false
你瞧,所有阻塞的办法都是一个套路:
- 阻塞形式
- 带有 try 的非阻塞形式
- 带有 try 和超时工夫的非阻塞形式
看到这你兴许感觉 LinkedTransferQueue 没啥特点,其实它和其余阻塞队列的差异还挺大的:
BlockingQueue 是如果队列满了,线程才会阻塞;然而 TransferQueue 是如果没有生产元素,则会阻塞(transfer 办法)
这也就应了 Doug Lea 说的那句话:
LinkedTransferQueue
is actually a superset ofConcurrentLinkedQueue
,SynchronousQueue
(in“fair”mode), and unboundedLinkedBlockingQueues
. And it’s made better by allowing you to mix and
match those features as well as take advantage of higher-performance i
mplementation techniques.简略翻译:
LinkedTransferQueue
是ConcurrentLinkedQueue
,SynchronousQueue
(在偏心模式下), 无界的LinkedBlockingQueues
等的超集; 容许你混合应用阻塞队列的多种个性所以,在适合的场景中,请尽量应用
LinkedTransferQueue
下面都看的是单向队列 FIFO,接下来咱们看看双向队列
LinkedBlockingDeque
LinkedBlockingDeque
是一个由链表构造组成的双向阻塞队列,但凡后缀为 Deque 的都是双向队列意思,后缀的发音为deck——/dek/
, 刚接触它时我认为是这个冰激凌的发音
所谓双向队列值得就是能够从队列的两端插入和移除元素。所以:
双向队列因为多了一个操作队列的入口,在多线程同时入队是,也就会缩小一半的竞争
队列有头,有尾,因而它又比其余阻塞队列多了几个非凡的办法
- addFirst
- addLast
- xxxxFirst
- xxxxLast
- … …
这么一看,双向阻塞队列的确很高效,
那双向阻塞队列利用在什么中央了呢?
不晓得你是否听过“工作窃取 ”模式,看似不太厚道的一种办法,实则是高效利用线程的好方法。下一篇文章,咱们就来看看 ForkJoinPool 是如何利用“ 工作窃取”模式的
总结
到这对于 Java 队列(其实次要介绍了阻塞队列)就疾速的辨别完了,将看似芜杂的办法做了分类整理,不便疾速了解其用处,同时也阐明了这些队列的理论用处。置信你带着更高的视角来浏览源码会更加轻松,最初也心愿大家认真看两个队列的源码实现,在遇到队列的问题,脑海中的画面分分钟就能够搞定了
参考
- Java 并发编程的艺术
- Java 并发编程之美
- https://zhuanlan.zhihu.com/p/…
日拱一兵 | 原创