共计 6455 个字符,预计需要花费 17 分钟才能阅读完成。
ConcurrentHashMap
源码目前在网络上已有泛滥解析。本文章次要关注办法 transfer
,试图认真解析该实现,如有错漏,请斧正。
ConcurrenthashMap
的 transfer
次要是用于扩容重组阶段,当外部数组的容量值超过阈值时,将触发扩容重组,transfer
是该过程的次要实现。
1. 相干概念
2. 解析
1. 相干概念
1.1 ConcurrentHashMap
中,应用一个字段复用了多种性能,如:阈值管制、外部 Node[]
数组状态管制、扩容线程管制 等,该字段就是 sizeCtl
。
/**
* <pre>
* 数组初始化和重组时的控制器
* 为正数时:示意表格正在被初始化或重组(resize)* -1:初始化
* -(1+n):重组的线程数 n,也就是说,在批改时,第一个批改的线程应该是:-2,因为 -1 默认初始化
* 当表格为 null 时,应用初始化时指定的大小,或默认为 0
* 初始化实现后,赋值为下次重组 table 的大小的阈值(默认 1.75 倍)* </pre>
*/
private transient volatile int sizeCtl;
1.2 ConcurrentHashMap
在重组时,做法与 HashMap
相似,然而具体新的数组,则是应用了外部一个数组变量 nextTable
以保障并发管制。其余如:链表的重组、树结构的重组 流程均是大同小异。
1.3 ConcurrentHashMap
的重组采纳了跟分段表相似的思维,实际上是将数组划分为不同的分段区间,如果有线程进入,可获取该区间辅助转换
1.4 transferIndex
是 ConcurrentHashMap
的外部属性,次要是在重组阶段中应用,用来示意还未被转换的数组,区间为:table[0] ~ table[transferIndex-1]
1.5 ConcurrentHashMap
并发转换的过程,借助了 信号量 的概念,只有获取到信号的线程,能力进入辅助转换,而 信号量 则存储在 sizeCtl
,每当一个线程进入获取,则 sizeCtl + 1
(首个线程开启转换则是 sizeCtl + 2
)。次要留神的是,该信号量的初始值为 正数,退出线程将增大 sizeCtl
,直到 sizeCtl
的增大达到 0 时,信号量将用完,默认的与 信号量 相加等于 0 的值是:65535,也就是说,最多容许 65535 条线程参加辅助转换(非固定,可调节)。所以可通过 rs + 1 ~ rs + 65535
的边界管制,来决定线程是否退出辅助转换。让 sizeCtl
成为正数变成信号量的次要代码是:
resizeStamp(n) << RESIZE_STAMP_SHIFT
1.6 ConcurrentHash
的转换过程中,用到的辅助属性有两个:nextTable
,transferIndex
,它们属于线程共享的,所以在对他们进行变更时,都是应用了 “自旋 / 死循环 + CAS”
的形式,实现线程并发平安。
2. 解析
2.1 转换过程 transfer
的每个调用入口,实际上内部都有对 sizeCtl
进行 “自旋 + CAS”
的操作。也就是并发状况下,即便多条线程想要进行扩容,那也只有一条线程可能胜利,另外的线程则进入辅助扩容的过程,扩容办法进入前的判断如下:
// nt -> nextTable
// n -> num,sc -> sizeCtl
Node<K,V>[] tab, nt; int n, sc;
// 以后存储大于 75%,且总大小小于最大容量,须要扩容
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// resizeStamp 纯正只是移位来保障右 16 位为 0,可用来管制作为线程最大数
// 左 16 位理论并没有保留太多信息(因为显著:resizeStamp(4)、resizeStamp(5)、resizeStamp(6)、(7) 是雷同的后果
int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT;
if (sc < 0) {
// 限度线程的最大或最小,当达到最大 65535(默认)或 1 条时,则间接跳出
// rs + 1 --> 起码线程数(相当于不正确的状况了,或者是初始化,因为起始时起码是 rs + 2)// rs + MAX_RESIZERS --> 最多线程数
// 或其余状况,则不再辅助转移,如:nextable 已为 null 或 transferIndex <= 0(阐明已完结)// 前两个条件是限度线程数,后两个条件是扩容曾经完结
if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||
(nt = nextTable) == null || transferIndex <= 0)
break;
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 如果 sc >= 0,阐明是刚开始,因为 sc < 0 时,示意有多少条线程在进行转移是:sc - 1
// 所以这里要 rs + 2
else if (U.compareAndSetInt(this, SIZECTL, sc, rs + 2))
transfer(tab, null);
s = sumCount();}
这里呈现大量的判断比拟,容易造成凌乱,但次要记住:这些判断比拟,在 ConcurrentHashMap
大部分是边界判断。记住这点后可能帮忙了解大部分的判断比拟,比方:sc == rs + MAX_RESIZERS
和 sc == rs + 1
实际上是对线程数的上下界的限度,超过限度,则不进入辅助转换。
2.2 ConcurrentHashMap
是分段进行并发转换,就是一个数组,按 “幅度”
划分,而后相应的线程获取到哪个分组,则负责该分组的转换的实现。那么重组转换的进口在哪里呢?只有当所有线程都执行结束,解决转换的线程的信号量没有被获取了,才退出整个转换过程。默认最小幅度是 16,也就是说线程的起码解决元素个数是 16 个。
// stride 幅度
int n = tab.length, stride;
// 如果 CPU 大于 1,管制起码每个线程的处理量为 16 ==> n / 8 / NCPU
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
// 数组翻倍,为什么要多出一个赋值操作?是因为 new 操作可能异样?貌似也不影响
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
// 失败,间接减少数组大小,退出
sizeCtl = Integer.MAX_VALUE;
return;
}
// 因为本办法的外层调用都应用了 CAS,所以能够保障此赋值的正确性(多线程状况下)nextTable = nextTab;
// table 大小,最开始的转换范畴是原数组大小
transferIndex = n;
}
2.3 进入转换方法后,首先就是确定线程解决幅度,而后初始化 nextTable
(如果需要的话),并初始化转换过程中须要用到的一些辅助属性,如:transferIndex = n = table.length
。
2.4 接下来,就是一个死循环(假象)。死循环内嵌死循环。第一个死循环应用到了局部参数 i 和 bound
,实际上,在每个线程进入该办法后,都会取得本人这两个局部变量值,而它们的值变动则是在外部循环中开始赋值,一旦赋值胜利,那么第一个死循环就变成了一个 有界的 for 循环
2.5 优先看第二个外部循环,advance
变量管制了该循环。advance
变量次要示意:是否推动到下一个元素。它理论与 i 和 bound
是有逻辑关系的,一旦 i 和 bound
的关系不匹配,那么 advance
也就必须为 false
,不再让线程进行推动,推动的操作是 (--i)
。也就是说,线程进入后,将有三个变量管制其运行,其中 bound, i
是线程解决的数组边界,而 advance
则控制线程在这个边界中进行挪动
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 是否推动到下一个元素,false 则示意还是解决以后元素
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
// f -> findNode;fh -> findNode hash
Node<K,V> f; int fh;
// 死循环次要是为了划分线程解决区间!还有管制元素推动
while (advance) {
int nextIndex, nextBound;
// 死循环标记位,一直死循环执行解决,没有太多意义,纯正依附标记位
// 每一个线程进来,第一个判断都不成立
// 通过 --i 来控制线程解决区间的推动,// 如果 --i > bound 阐明区间范畴超过线程的解决范畴,线程不再该范畴内就行推动,标记位为 false
// 每一次划分完,则 i 实际上是闭区间的尾部,而 bound 则为区间的首部,所以 --i 胜利,进入区间下一个元素解决
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
// 赋值 nextIndex
// 小于 0:表已被划分完,不再作划分推动,跳出循环
i = -1;
advance = false;
}
else if (U.compareAndSetInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// CAS 替换值,将 transferIndex 更新为 transferIndex - stride
// 管制此线程的解决区间为:bound ~ (nextIndex - 1)
// 假设初始表大小为 35,2 个线程进入(其实跟线程数无关,跟 CPU 无关),NCPU = 2,则幅度管制下为 16
// 通过循环,划分下为:// 19 ~ 34
// 3 ~ 18
// 0 ~ 2
// 也就说,transfer 的解决,(单线程)是从尾部到头部(当然总体状况下多线程则取决于线程的执行状况)bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
能够看到,外部死循环的次要作用,其实是为了划分分区(划分幅度为 stride
),也能够意识到,即便是单线程,其执行也是按分区执行,并且执行的分区程序是从尾部到首部。通过 CAS
保障分区的划分的线程平安,失败则从新循环再次操作。
2.6 划分完分区后,剩下的就是线程的处理过程。处理过程包含 2 局部,一部分是一般的元素解决,一部分是边界管制——退出进口。
在每一个元素的处理过程中,线程都会先判断是否达到进口,是则退出?差不离,但退出蕴含两种状况,一种是一般的辅助线程的退出,它只擦本人的屁股,另外一种是整体线程的退出,它除了解决负责本人的退出进口,还要负责将重组后的后果 nextTable 反复赋值给 table,并为 sizeCtl 赋值 1.75 倍的阈值
1 // 如果 i < 0 || i >= n || i + n >= nextn,都属于区间的边界判断
2 // 超过边界则判断是否线程都已执行结束,其实只有首尾区间的线程会触发到这个判断,3 // 其余的线程因为 stride < i < 2stride,所以不会触发此判断
4 if (i < 0 || i >= n || i + n >= nextn) {
5 int sc;
6 // 扩容进口
7 // 只有当 finishing 为 true 时,才真正将 nextTable 赋值给 旧 table 指针
8 // 而 finishing 为 true 的惟一条件,是所有的线程都执行结束
9 if (finishing) {
10 nextTable = null;
11 table = nextTab;
12 // 翻倍减去 0.25,得 1.75 阈值
13 sizeCtl = (n << 1) - (n >>> 1);
14 return;
15 }
16 if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {17 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
18 return;
19 // 只有当所有的线程都执行结束,能力保障 finishing 为 true
20 finishing = advance = true;
21 i = n; // recheck before commit
22 }
23 }
说完了边界进口,剩下的就是一般的操作了,有以下判断:
当线程转换时 旧数组 对应地位上为 null
,则间接 CAS
替换为 ForwardingNode
(其 hash = MOVED
),示意转移过了;此时,当内部有操作 put
刚好命中此地位时,将会进入辅助转换的过程,判断根据就是 if (hash == MOVED)
。也就是说,在重组转换过程中,进行 put
操作,将进入辅助转换过程。
如果 hash 为 MOVED
,则示意该地位已被其余线程转移过,推动到下一个元素
最初,进入与 HashMap
雷同的链表重组和树结构重组的逻辑中,胜利执行后,advance = true
,持续推动解决元素(--i)
。这里比 HashMap
多出一步,就是将旧数组对应地位上的标记为已解决。
1 else if ((f = tabAt(tab, i)) == null)
2 advance = casTabAt(tab, i, null, fwd); // 如果旧表该地位为 null,则标记为已解决
3 else if ((fh = f.hash) == MOVED) // 感觉不太可能遇到,毕竟线程繁多负责本人的区域(?)4 advance = true; // already processed
5 else {
6 // 进入转换
7 synchronized (f) {8 if (tabAt(tab, i) == f) {
9 Node<K,V> ln, hn;
10 // 一般链表的 hash 节点是失常的 hash 码,树节点的 hash 则默认小于 0
11 // 重哈希算法与 HashMap 雷同,都是以 2 的 n 次幂 对应的二进制刚好为 1,12 // 间接挪动高位局部元素
13 if (fh >= 0) {}
14 else if (f instanceof TreeBin) {15 setTabAt(nextTab, i, ln);
16 setTabAt(nextTab, i + n, hn);
17 // 解决实现后,将旧数组的节点标记为已解决(旧数据将没有数据)18 setTabAt(tab, i, fwd);
19 }
20 else if (f instanceof ReservationNode)
至此,整个 ConcurrentHashMap
的转换过程算完了,整个解析感觉还是有理有据,如有谬误,必当改过。
文末,再总结下其中一些比拟容易漠视或难以了解的点:
- 大多数看起来简单凌乱的判断,其实是边界判断
- 整个大办法应用了
死循环 +CAS
的形式管制并发 - 以幅度划分线程解决的数组范畴就是应用 “
死循环 + CAS
“ 实现的 - 实现如果出现异常,导致多线程下,某个线程没有执行进口的逻辑,没有胜利扣减
ConcurrentHashMap
的sizeCtl
的线程数,是否会进入一个谬误状态并无奈退出转换过程(未验证) - 控制线程数量,实际上是复用了
sizeCtl
这个变量,先保留局部信息后左移,并空出右16
位来进行线程量的减少
流程图如下: