共计 14053 个字符,预计需要花费 36 分钟才能阅读完成。
上一篇文章已经就 ConcurrentHashMap 进行了部分说明,介绍了其中涉及的常量和变量的含义,有些部分需要结合方法源码来理解,今天这篇文章就继续讲解并发 ConcurrentHashMap
前言
本文主要介绍 ConcurrentHashMap 中的一些重要方法,结合上篇文章中的讲解部分进行更进一步的介绍
回顾下上篇文章,我们应该已经知道 ConcurrentHashMap 的整体结构和 HashMap 基本一致,不同的是处理多线程并发下保证操作的正确性,ConcurrentHashMap 通过 CAS 和 synchronized 进行并发控制,当然,这种情况下各种处理都会变的更为复杂,下面我们就通过方法来深入理解 ConcurrentHashMap 的操作
重要方法
在一些方法中展示了各个变量以及常量的使用,能让我们更好的理解其中的操作
tabAt/casTabAt/setTabAt
下列方法用于读写 table 数组,使用 Unsafe 提供的更新获取 volatile 变量,CAS 更新数组元素等操作
// 读取 table[i]
@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
// CAS 更新 table[i]
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
// 插入 table[i]
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
size
size 方法返回了一个不精确的值,在多线程环境下,返回一个不精确的值,通过 sumCount 迭代 counterCells 统计 sum 值。
public int size() {long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
这里很多人可能会问,为什么需要叠加 counterCells 数组的值呢?
其实这和 ConcurrentHashMap 特点有关,多线程环境下,同时插入值,执行 CAS 操作,执行成功的更新了 baseCount,而执行失败的则将值放入到了 counterCells 数组中,可以查阅 CounterCell 内部类源码,只有一个 long 类型变量,每次进行插入或者删除时调用 addCount 通过 CAS 操作更新 baseCount,失败时执行 fullAddCount 方法,初始化 counterCells 数组,并将 1(相当于插入或删除一个元素)插入到 CounterCell 类中,这样尽可能保证了 Map 长度的正确性,这里理解流程即可,不深入,addCount 部分有具体操作可查看
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) {value = x;}
}
get
参考 HashMap,类似操作流程,需要注意的也就是在 eh < 0 处,如果是特殊节点,比如 TreeBin 或者 ForwardingNode 节点,则调用其具体类实现的 find 方法完成遍历查询,内部类解释可以参考我的上一篇文章
- 计算 hash 值
- 判断 table 是否为空,不为空,找到对应 hash 桶根节点判断
- 非根节点继续遍历树或者链表,存在对应值则返回对应值,否则返回 null
public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 计算 key 的 hash 值
int h = spread(key.hashCode());
// table 非空并且对应的 hash 桶根节点不为空
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 查找节点为根节点
if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 树节点或者扩容中(FN 节点)else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 链表遍历查找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
// 参考 HashMap 的 hash 方法,不同之处在于和 HASH_BITS 进行了一次与操作,最高位变为了 0,即为正数,因为前一篇文章也已经说过负数 hash 值有特殊意义
static final int spread(int h) {return (h ^ (h >>> 16)) & HASH_BITS;
}
containsValue
这里通过静态内部类实现 Traverser 来遍历数组,具体的内部实现查看上篇文章里中的内部类说明,advance 相当于查找到下一个非空节点
public boolean containsValue(Object value) {if (value == null)
throw new NullPointerException();
Node<K,V>[] t;
if ((t = table) != null) {Traverser<K,V> it = new Traverser<K,V>(t, t.length, 0, t.length);
for (Node<K,V> p; (p = it.advance()) != null; ) {
V v;
if ((v = p.val) == value || (v != null && value.equals(v)))
return true;
}
}
return false;
}
遍历时遇见特殊节点的处理上一篇文章中已经画图说明,如下:
putVal
putVal 整体同 HashMap 的 putVal 操作,操作流程上基本类似,只是在多线程操作下需要正确的处理插入值操作,同时如果发现有线程在进行扩容操作时,需帮助扩容,然后再进行插入值的流程操作
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 空值判断
if (key == null || value == null) throw new NullPointerException();
// hash 值计算,保证了 hash 值为正数
int hash = spread(key.hashCode());
// 当前 bin 中元素的个数,判断是否树化处理
int binCount = 0;
// 无限循环直到被正确处理
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 空表进行初始化操作
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 计算出的 hash 桶位置链表头节点无值则通过 CAS 插入值
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果 hash 桶链表头节点为 MOVED 状态,即说明有线程在进行扩容操作,则通过 helpTransfer 帮助扩容操作
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// hash 桶链表头节点加锁,在多线程环境下其他线程不能同时操作当前相同的头节点代码块
synchronized (f) {if (tabAt(tab, i) == f) {
// 正常链表插入操作
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// key 和 hash 值相同则进行替换
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 没匹配到则直接插入到链表尾部
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 红黑树插入操作
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 达到树化阈值,则可能进行树化操作
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// size+ 1 操作
addCount(1L, binCount);
return null;
}
resizeStamp
前一篇文章在对 sizeCtl 说明时在多个线程帮忙扩容时其值小于 0 时做过一些说明,在源码中涉及到了下面这个方法,先理解清楚这个方法比较重要
这里的参数每次传入的都是当前数组的长度,也就是说每次这里生成的数都与当时扩容时的数组长度有关,Integer.numberOfLeadingZeros(n),返回二进制表示,前面有多少个连续的 0,RESIZE_STAMP_BITS 固定为 16,没发现有提供方法来修改这个变量,位或运算得到一个值,这个值表示了与扩容时数组的长度相关,这里需记得是左移了(RESIZE_STAMP_BITS – 1),因为后边代码中我们需要反向操作右移来重新获取
这里通过这个方法与数组长度关联,同时 sizeCtl 也会与之关联,同时也记录当前扩容中的线程数,故 sizeCtl 在扩容中同时兼顾了两种作用,一是判断是否是在同一个批次的扩容中(都是从 16 扩容到 32),同时判断当前扩容中参与的线程数来确定是否结束和初始化操作
/**
* Returns the stamp bits for resizing a table of size n.
* Must be negative when shifted left by RESIZE_STAMP_SHIFT.
*/
static final int resizeStamp(int n) {return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
addCount
整体来看主要进行了两部分内容,一是更新 baseCount,二是检查是否进行扩容操作。其实这个方法里还是相当复杂的,涉及到了线程私有的伪随机数生成器 ThreadLocalRandom,并发效率更高的 LongAdder,不过初学者可以不用研究那么深入,这里不详细说明,大概了解就好
private final void addCount(long x, int check) {CounterCell[] as; long b, s;
// 通过 CAS 更新 baseCount
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
// 更新 baseCount 失败
CounterCell a; long v; int m;
boolean uncontended = true;
// 相当于每个线程的 probe 就是它在 CounterCell 数组中的 hash code,用来定位 counterCells 数组
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 更新 cellvalue 失败则执行 fullAddCount,具体不看了,比较复杂,不停尝试更新计数
// 源码注释上也写了类似 LongAdder
fullAddCount(x, uncontended);
return;
}
// 执行到此说明更新计数器成功, 判断是否退出,为什么是 1 其实还是有点困惑
if (check <= 1)
return;
s = sumCount();}
// check 大于 0 代表着对应 hash 桶下的节点数,检查是否扩容
// 满足条件帮助扩容,不满足退出
if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;
// 注意,这里条件中 3 个变量赋值同时 while 判断
// sc = sizeCtl,tab = table,n = tab.length
// 在并发操作中可能会出现变量错误的情况造成扩容处理出错,通过 resizeStamp 保证扩容时版本一致
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// resizeStamp 根据 n 返回一个扩容版本戳,保证唯一性,上边一个方法我已经说明了
int rs = resizeStamp(n);
// 说明有别的线程在扩容
if (sc < 0) {
// 判断是否帮助扩容,满足条件,不帮助扩容,这里会分析下,看下面的分析部分
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 帮助扩容,线程数 +1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 无线程帮助扩容,当前线程尝试成为第一个扩容的线程
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();}
}
}
在上面这段代码中,不帮助扩容的条件中有些地方让人困惑
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
(sc >>> RESIZE_STAMP_SHIFT) != rs
首先需要明白上边整个扩容中的第一个线程会通过 U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2) 设置 sizeCtl,之后扩容线程增加则通过 U.compareAndSwapInt(this, SIZECTL, sc, sc + 1) 更新
sc 右移 RESIZE_STAMP_SHIFT(由于 RESIZE_STAMP_BITS 不提供修改方法,RESIZE_STAMP_SHIFT 也只能取到 16),第一个条件为什么是这个?需要结合扩容代码来看,首个线程抢到扩容任务时需先创建 nextTable,设置 transferIndex,在执行之前需要将 sizeCtl 更新,即 U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2),代码存在于 addCount 和 tryPresize 方法中,sizeCtl 在每次扩容时会更新成 (rs << RESIZE_STAMP_SHIFT) + 2。判断条件里 sc 无符号右移,如果是相同的一次扩容过程,则与 rs 相等是肯定的,rs 是由 resizeStamp 根据长度 n 计算得来,其实最终这里比较的也就是 table 的长度,防止多次扩容下错误的帮助了扩容
另外在已经有线程扩容的情况下增加扩容线程会会更新 sizeCtl,U.compareAndSwapInt(this, SIZECTL, sc, sc + 1) 看出在首次更新的基础上加 1 即可,扩容线程完成自己的任务同理减 1,结合上边对 resizeStamp 的说明应该算很清楚了
以上部分也证实了上篇文章中 sizeCtl 注释是不正确的
sc == rs + 1 || sc == rs + MAX_RESIZERS
这个条件是有问题的,sc 小于 0,rs 大于 0,两个条件一直为 false,没有 true 的可能,从这个条件上看,应该是判断扩容完毕和扩容线程数达到最大时不能帮助扩容。
我们想一下,第一次线程扩容时已经将 sc 更新成 (rs << RESIZE_STAMP_SHIFT) + 2,这里判断的话需要改为 sc == (rs << RESIZE_STAMP_SHIFT) +1 才对,不能将 sc 右移,右移将会导致低 16 位记录的线程数数据丢失,最大线程数判断同理,应改为 sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS
我在 Oracle 官网 bug 库里看到已经提到了这个问题:https://bugs.java.com/bugdata…
(nt = nextTable) == null
此时状态可能表明扩容已经结束或者第一个线程在扩容中,不能帮助扩容
transferIndex <= 0
transfer 任务已经被分配完毕,不能分配任务给当前线程,不能帮助扩容,帮助扩容部分下面会说到
helpTransfer
如果正在进行扩容操作,则帮助扩容
/**
* Helps transfer if a resize is in progress.
*/
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {Node<K,V>[] nextTab; int sc;
// 判断是否为 ForwardingNode 并且 nextTable 是否已经被创建
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// 根据长度获取扩容戳
int rs = resizeStamp(tab.length);
// 再次验证是否正在扩容
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// 几个条件上边已经解释过了,满足不帮助扩容
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// sizeCtl 加 1,表示当前线程加入扩容,多了一个线程帮忙
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
tryPresize
预先扩容,putAll 和 treeifyBin 中使用到,不满足 table 容量时,进行一次扩容操作
/**
* Tries to presize table to accommodate the given number of elements.
*
* @param size number of elements (doesn't need to be perfectly accurate)
*/
private final void tryPresize(int size) {
// 判断长度是否超过最大值,超过则赋值为最大值,正常则通过 tableSizeFor 计算扩容后的长度
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
// 未初始化或扩容完成才能执行本次扩容操作
while ((sc = sizeCtl) >= 0) {Node<K,V>[] tab = table; int n;
// table 未初始化
if (tab == null || (n = tab.length) == 0) {n = (sc > c) ? sc : c;
// 置为 - 1 表示数组初始化,前一篇文章已经说明
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {if (table == tab) {@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
// 此时相当于阈值
sizeCtl = sc;
}
}
}
// 已经初始化,扩容长度小于阈值或者大于最大值,不进行扩容操作
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
// 再次验证 table 未改变
else if (tab == table) {int rs = resizeStamp(n);
// 同上边代码部分,判断是否帮助扩容
if (sc < 0) {Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
transfer
table 迁移操作, 通过 transferIndex 来完成任务的分配,之前文章变量中也提及了 MIN_TRANSFER_STRIDE(最小步长),对每个扩容线程申请迁移的 hash 桶数量做了限制,每次需要扩容线程执行完毕已经领取完的 hash 桶迁移任务才可以继续领取任务帮助迁移,最后一个迁移线程在迁移完毕后会进行检查
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 设置步长,即每个迁移任务迁移多少个 hash 桶,默认最小迁移步长 16
// 即每个扩容线程最小迁移 16 个 hash 桶
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// nextTab 未初始化,则进行初始化操作,这里不需要 CAS,调用的地方已经做了控制,保证只有一个线程能执行
if (nextTab == null) { // initiating
try {@SuppressWarnings("unchecked")
// 新数组长度扩容为原有数组的 2 倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
// 内存溢出时不能继续扩容
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
// 设置 ForwardingNode 节点
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 线程可以继续分配迁移任务的标识
boolean advance = true;
// 设置结束标识
boolean finishing = false; // to ensure sweep before committing nextTab
// i 表示数组下标,bound 表示迁移任务的最小下标
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// advance 为 false 则表明当前线程分配的迁移任务未完成或已经扩容完毕
while (advance) {
int nextIndex, nextBound;
// --i 大于等于 bound 则表明本次分配的迁移任务还未完成,将 advance 置为 false
// 表明不能继续分配迁移任务
if (--i >= bound || finishing)
advance = false;
// 设置 nextIndex
// 如果小于等于 0 则表示迁移 hash 桶已被分配完毕,不用继续,将 advance 置为 false
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 设置迁移任务区间 bound 到 i
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 上边计算了区间和任务状态
// i < 0 上边代码已经说明,transfer 任务已经执行完毕,退出
// i >= n 这里 n 表示的是传入的 tab 数组长度,而 i 有可能因为 transferIndex 改变而改变
// 比如连续扩容从 16 扩容到 32,然后又从 32 扩容到 64,此时这个条件是可能成立的,这里的 i 有可能在 32 到 64 之间,大于 n 的 32
// 不在一个扩容维度内,需退出。最后一个条件没看明白是什么情况出现这种状态
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
// 扩容迁移完毕设置 table 和 sizeCtl
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 线程数减 1,表明当前线程退出
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 判断当前线程是否为最后一个扩容线程,不是,则退出,条件可以看上边的说明,已经讲解过
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 确定当前线程为最后一个扩容线程,则需要进行检查工作
// 检查所有的旧数组 hash 桶是否被正确的迁移
finishing = advance = true;
i = n; // recheck before commit
}
}
// i 处的 hash 桶为 null 则直接放置 ForwardingNode 节点
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// i 处的 hash 桶为 ForwardingNode 节点
else if ((fh = f.hash) == MOVED)
// 最后的线程执行检查
advance = true; // already processed
else {synchronized (f) {
// 再次验证 hash 桶头节点为 f
if (tabAt(tab, i) == f) {
// 进行迁移任务, 类似 HashMap,分高位和低位,不明白的可以看我 HashMap 的文章
Node<K,V> ln, hn;
if (fh >= 0) {
// 正常链表操作
// runBit 表明首节点的位置,0 则表示在低位,非 0 表示在高位
int runBit = fh & n;
Node<K,V> lastRun = f;
// 找到尾部最后一个高低位不同的节点,之后的节点不需要进行操作,直接进行复用
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// 低位 lastRun 在下面循环时使用
if (runBit == 0) {
ln = lastRun;
hn = null;
}
// 高位 lastRun 在下面循环时使用
else {
hn = lastRun;
ln = null;
}
// 确定 lastRun 为了提高效率,复用原有链表
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
// 低位链表
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
// 高位链表
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 新数组上设置低位链表
setTabAt(nextTab, i, ln);
// 新数组上设置高位链表
setTabAt(nextTab, i + n, hn);
// 旧数组 i 处设置为 ForwardingNode 节点
setTabAt(tab, i, fwd);
advance = true;
}
// 红黑树通过 TreeBin 操作
else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;
// 同样划分为高低位进行处理,通过链表来操作
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
// 判断是低位还是高位然后修改链表关系
if ((h & n) == 0) {if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 同链表类似,判断下是否需转成链表,通过 TreeBin 将高低位链表构建成红黑树
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
迁移任务是从数组尾部向头部进行,这样做的目的应该是与迭代正向操作相反来减少冲突,当迭代时是已经迁移好的 hash 桶,迁移时是已经迭代完毕的 hash 桶
clear
清空操作,比较简单
public void clear() {
// 删除节点数记录最后需要更新
long delta = 0L; // negative number of deletions
// 数组下标
int i = 0;
Node<K,V>[] tab = table;
while (tab != null && i < tab.length) {
int fh;
Node<K,V> f = tabAt(tab, i);
// hash 桶首节点为 null 表明不需要执行
if (f == null)
++i;
// 扩容中帮助扩容然后重新开始循环清空操作
else if ((fh = f.hash) == MOVED) {tab = helpTransfer(tab, f);
i = 0; // restart
}
// 正常链表或 TreeBin 节点
else {synchronized (f) {if (tabAt(tab, i) == f) {
Node<K,V> p = (fh >= 0 ? f :
(f instanceof TreeBin) ?
((TreeBin<K,V>)f).first : null);
// 获取 hash 桶的节点数
while (p != null) {
--delta;
p = p.next;
}
// 将 hash 桶置 null
setTabAt(tab, i++, null);
}
}
}
}
// 更新数组长度
if (delta != 0L)
addCount(delta, -1);
}
总结
本文紧接上一篇文章讲解了 ConcurrentHashMap 的重要的方法,对于一些变量和常量结合方法进行了更多的解释说明,本身而言还是比较复杂,其中部分笔者也不能完全理解,不过整体的流程有了一个更清晰的认知,重点需要理解的在下面几点:
- 涉及到 Map 长度的计算:通过 counterCells 完成以及通过 addCount 进行长度的更新
- 扩容操作:sizeCtl 的设置以及更新和各种情况下对应的含义
- 迁移操作:迁移步长,线程检查
- 节点类型:几种节点类型的不同处理方式
当然,有些条件可能比较复杂,难以理解,只能尽力多看多想,希望对各位有所帮助
以上内容如有问题欢迎指出,笔者验证后将及时修正,谢谢