HashMap 存在的问题
任何技术的诞生都是有其独特的诞生背景的,HashMap 诞生于分治思维,而 ConcurrentHashMap 则是为了解决 HashMap 中的线程平安问题而生,接下来咱们就一起看一下 HashMap 中存在的线程平安问题。
-
先看下 jdk7 中扩容办法的实现
void resize(int newCapacity) {Entry[] oldTable = table; int oldCapacity = oldTable.length; if (oldCapacity == MAXIMUM_CAPACITY) { threshold = Integer.MAX_VALUE; return; } Entry[] newTable = new Entry[newCapacity]; // 最终会进入 transfer 办法 transfer(newTable, initHashSeedAsNeeded(newCapacity)); table = newTable; threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1); } 复制代码
resize
办法会调用transfer
办法
《2020 最新 Java 根底精讲视频教程和学习路线!》void transfer(Entry[] newTable, boolean rehash) { int newCapacity = newTable.length; for (Entry<K,V> e : table) {while(null != e) { Entry<K,V> next = e.next; if (rehash) {e.hash = null == e.key ? 0 : hash(e.key); } int i = indexFor(e.hash, newCapacity); e.next = newTable[i]; newTable[i] = e; e = next; } } } 复制代码
如图所示,该 map 在插入第四个元素时会触发扩容
假如有两个线程同时执行到 transfer
办法,线程 2 执行到 Entry<K,V> next = e.next;
之后 cpu 工夫片耗尽,此时变量 e 指向节点 a,变量 next 指向节点 b。线程 1 实现了扩容,变量援用关系如图所示:
线程 2 开始执行,此时对于线程 2 中保留的变量援用关系如图所示:
执行后,变量 e 指向节点 b,因为 e 不是 null,则继续执行循环体,执行后的援用关系。
变量 e 又从新指回节点 a,只能继续执行循环体,这里仔细分析下:1、执行完Entry<K,V> next = e.next;
,目前节点 a 没有 next,所以变量 next 指向 null;2、e.next = newTable[i];
其中 newTable[i] 指向节点 b,那就是把 a 的 next 指向了节点 b,这样 a 和 b 就互相援用了,造成了一个环;3、newTable[i] = e
把节点 a 放到了数组 i 地位;4、e = next;
把变量 e 赋值为 null,因为第一步中变量 next 就是指向 null;
所以最终的援用关系是这样的:
节点 a 和 b 相互援用,造成了一个环,当在数组该地位 get 寻找对应的 key 时,就产生了死循环。
另外,如果线程 2 把 newTable 设置成到外部的 table,节点 c 的数据就丢了,看来还有数据遗失的问题。
-
jdk8 中的数据笼罩问题
final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) {Node<K,V>[] tab; Node<K,V> p; int n, i; if ((tab = table) == null || (n = tab.length) == 0) n = (tab = resize()).length; if ((p = tab[i = (n - 1) & hash]) == null) // 如果没有 hash 碰撞则直接插入元素 tab[i] = newNode(hash, key, value, null); else {//...} ++modCount; if (++size > threshold) resize(); afterNodeInsertion(evict); return null; } 复制代码
1、假如两个线程 1、2 都在进行 put 操作,并且 hash 函数计算出的插入下标是雷同的。
2、当线程 1 执行完第六行代码后因为工夫片耗尽导致被挂起,而线程 2 失去工夫片后在该下标处插入了元素,实现了失常的插入。
3、而后线程 1 取得工夫片,因为之前曾经进行了 hash 碰撞的判断,所有此时不会再进行判断,而是间接进行插入。
4、这就导致了线程 2 插入的数据被线程 1 笼罩了,从而线程不平安。
ConcurrentHashMap 实现原理
ConcurrentHashMap 在 jdk7 降级 j 到 dk8 之 后有较大的改变,jdk7 中次要采纳 Segment
分段锁的思维,Segment
继承自ReentrantLock
类,顺次来保障线程平安。限于篇幅起因,本文只探讨 jdk8 中的 ConcurrentHashMap 实现原理。有趣味的同学能够自行钻研 jdk7 中的实现。
jdk8 中的 ConcurrentHashMap 数据结构同 jdk8 中的 HashMap 数据结构一样,都是 数组 + 链表 + 红黑树。摒弃了 jdk7 中的分段锁设计,应用了 Node
+ CAS
+ Synchronized
来保障线程平安。
重要成员变量
table:默认为 null,初始化产生在第一次插入操作,默认大小为 16 的数组,用来存储 Node 节点数据,扩容时大小总是 2 的幂次方。nextTable:默认为 null,扩容时新生成的数组,其大小为原数组的两倍。sizeCtl:默认为 0,用来管制 table 的初始化和扩容操作,具体利用在后续会体现进去。-1 代表 table 正在初始化
-N 示意有 N-1 个线程正在进行扩容操作
其余状况:1、如果 table 未初始化,示意 table 须要初始化的大小。2、如果 table 初始化实现,示意 table 的容量,默认是 table 大小的 0.75 倍,用这个公式算 0.75(n - (n >>> 2))。Node:保留 key,value 及 key 的 hash 值的数据结构。其中 value 和 next 都用 **volatile** 润饰,保障并发的可见性。ForwardingNode:一个非凡的 Node 节点,hash 值为 -1,其中存储 nextTable 的援用。只有 table 产生扩容的时候,ForwardingNode 才会发挥作用,作为一个占位符放在 table 中示意以后节点为 null 或则曾经被挪动。复制代码
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
// volatile 润饰的变量能够保障线程可见性,同时也能够禁止指令重排序
// 无关 volatile 原理此处不开展,volatile 的实现原理可自行上网查阅
volatile V val;
volatile Node<K,V> next;
//...
}
复制代码
重要办法实现原理
在探索办法实现之前,咱们先认识一下 Unsafe
和 CAS 思维,ConcurrentHashMap 中大量用到 Unsafe
类和 CAS 思维。
Unsafe
Unsafe 是 jdk 提供的一个间接拜访操作系统资源的工具类(底层 c ++ 实现),它能够间接分配内存,内存复制,copy,提供 cpu 级别的 CAS 乐观锁等操作。它的目标是为了加强 java 语言间接操作底层资源的能力。应用 Unsafe 类最次要的起因是防止应用高速缓冲区中的过期数据。为了不便了解,举个栗子。类 User 有一个成员变量 name。咱们 new 了一个对象 User 后,就晓得了它在内存中的起始值 , 而成员变量 name 在对象中的地位偏移是固定的。这样通过这个起始值和这个偏移量就可能定位到 name 在内存中的具体位置。Unsafe 提供了相应的办法获取动态成员变量,成员变量偏移量的办法,所以咱们能够应用 Unsafe 间接更新内存中 name 的值。复制代码
CAS
CAS 译为 Compare And Swap,它是乐观锁的一种实现。假如内存值为 v,预期值为 e,想要更新成得值为 u,当且仅当内存值 v 等于预期值 e 时,才将 v 更新为 u。CAS 操作不须要加锁,所以比起加锁的形式 CAS 效率更高。复制代码
接下来咱们来看看具体的办法实现。
size 办法
size
办法用于统计 map 中的元素个数,通过源码咱们发现 size
外围是 sumCount
办法,其中变量 baseCount 的值是记录实现元素插入并且胜利更新到 baseCount 上的元素个数,CounterCell 数组是记录实现元素插入然而在 CAS 批改 baseCount 失败时的元素个数,因而 baseCount + CounterCell 数组记录的总数是 map 中的元素个数。
这样设计的起因是高并发状况下大量的 CAS 批改 baseCount 的值是失败的,为了节约性能,CAS 更新 baseCount 失败之后用 CounterCell 数组记录下来,CounterCell 初始化数组长度为 2,高并发状况能够扩容,每个数组节点别离记录落在以后数组的记录数,应用的是 CAS 去操作 value++,最初将所有节点的 value 求和,并加上 baseCount 的值,即为 map 元素个数。
public int size() {long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
复制代码
final long sumCount() {
// CounterCell 数组是批改 baseCount 失败的线程放入的数量
CounterCell[] as = counterCells; CounterCell a;
// baseCount 是批改 baseCount 胜利的线程放入的数量
long sum = baseCount;
if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
复制代码
put 办法
put
办法是向 map 中存入元素,实质上调用了 putVal
,该办法十分外围的办法之一,读者可联合笔者增加的正文加以了解
final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果 tab 未初始化,先初始化 tab,此处是懒加载的思维
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 如果计算出来的 tab 下标地位上没有其余元素,用 CAS 操作建设援用
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果发现以后节点的哈希值是 MOVED,则阐明正处于扩容状态中,以后线程退出扩容大军,帮忙扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 哈希抵触,锁住以后节点
synchronized (f) {if (tabAt(tab, i) == f) {
// fh>= 0 阐明是链表,遍历寻找
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 发现曾经存在雷同的 key,依据 onlyIfAbsent 判断是否笼罩
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 持续向下遍历
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果是红黑树,调用 putTreeVal 办法,遍历树,此处逻辑不具体开展
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 此时 binCount 示意一个节点对应链表的长度,达到 8 就转换成红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
// 返回旧值
if (oldVal != null)
return oldVal;
break;
}
}
}
// 判断是否须要扩容
addCount(1L, binCount);
return null;
}
复制代码
小结 put
:
- 先校验一个 k 和 v 都不可为空。
- 判断 table 是否为空,如果为空就进入初始化阶段。
- 如果发现插入地位的 bucket 为空,间接把键值对插入到这个桶中作为头节点。
- 如果这个要插入的桶中的 hash 值为 – 1,也就是 MOVED 状态(也就是这个节点是 forwordingNode),那就是阐明有线程正在进行扩容操作,那么以后线程就进入帮助扩容阶段。
- 如果这个节点是一个链表节点,依据 key 匹配后果去决定是插入还是笼罩,插入是用尾插法。如果这个节点是一个红黑树节点,那就须要依照树的插入规定进行插入。
- 插入完结之后判断该链表节点个数是否达到 8,如果是就把链表转化为红黑树存储。
- put 完结之后,须要给 map 已存储的数量 +1,在 addCount 办法中判断是否须要扩容。
总结一下扩容条件:1. 元素个数达到扩容阈值。2. 调用 putAll 办法,但目前容量不足以寄存所有元素时。3. 某条链表长度达到 8,但数组长度却小于 64 时,该逻辑在 treeifyBin 办法中。复制代码
通读了 putVal
之后,咱们比拟关注其中一些办法:
tabAt
办法是通过Unsafe
类依据偏移量间接从内存中获取数据,防止了从高速缓冲区取得了过期数据-
casTabAt
办法次要通过Unsafe
类间接操作内存,通过比拟替换赋值,该操作不必加锁,所以能够进步操作效率@SuppressWarnings("unchecked") static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } 复制代码
-
initTable
办法初始化 map 中的底层数组private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { // 如果一个线程发现 sizeCtl < 0,意味着另外的线程执行 CAS 操作胜利,以后线程只须要让出 CPU 工夫片 if ((sc = sizeCtl) < 0) // Thread.yield() 办法是让以后线程被动放弃 CPU 的执行权,线程回到就绪状态 Thread.yield(); // lost initialization race; just spin // 通过 CAS 设置 sizeCtl 为 -1 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; //sc = 0.75 * n,此处的 sc 即为扩容的阈值 sc = n - (n >>> 2); } } finally { // 将 sizeCtl 设置成扩容阈值 sizeCtl = sc; } break; } } return tab; } 复制代码
-
addCount
办法用于判断是否须要扩容private final void addCount(long x, int check) {CounterCell[] as; long b, s; if ((as = counterCells) != null || // 曾经放入到 map 中然而更新 baseCount 失败,放到 CounterCell 数组中 !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; // 如果 CounterCell 数组是空(尚未呈现并发)// 如果随机取余一个数组地位为空 或者 // 批改这个槽位的变量失败(呈现并发了)if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { // 执行 fullAddCount 办法,将更新 baseCount 失败的元素个数保留到 CounterCell 数组中 fullAddCount(x, uncontended); return; } if (check <= 1) return; // s 是总节点数量;s = sumCount();} // 扩容逻辑 if (check >= 0) {Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { // 依据 length 失去一个标识 int rs = resizeStamp(n); // 如果正在扩容 if (sc < 0) { // 如果 sc 的低 16 位不等于 标识符(校验异样 sizeCtl 变动了)// 如果 sc == 标识符 + 1(扩容完结了,不再有线程进行扩容)(默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程完结扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1)// 如果 sc == 标识符 + 65535(帮忙线程数曾经达到最大)// 如果 nextTable == null(完结扩容了)// 如果 transferIndex <= 0 (转移状态变动了) // 完结循环 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; // 如果能够帮忙扩容,那么将 sc 加 1. 示意多了一个线程在帮忙扩容 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } // 如果不在扩容,将 sc 更新:标识符左移 16 位 而后 + 2. 也就是变成一个正数。高 16 位是标识符,低 16 位初始是 2. else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) // 更新 sizeCtl 为正数后,开始扩容。transfer(tab, null); s = sumCount();} } } 复制代码
-
- *
get 办法
get
办法逻辑比较简单清晰
public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 判断链表头结点是否匹配 key
if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// eh < 0,如果是红黑树,去红黑树上查找,如果是 ForwardingNode,则会跳到扩容后的 map 上寻找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 遍历链表查找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
复制代码
helpTransfer 和 transfer 办法
helpTransfer
办法在判断完扩容状态后,实质上还是调用了 transfer
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {Node<K,V>[] nextTab; int sc;
// 以后节点是 ForwardingNode 并且曾经初始化实现新的数组,帮忙扩容
if (tab != null && (f instanceof ForwardingNode 并且曾经初始化实现新的数组,帮忙扩容) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// 扩容实现的标记
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 开始帮忙扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
复制代码
transfer
办法逻辑比较复杂,请读者联合正文和配图急躁了解
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// CPU 外围数大于 1,每个线程负责迁徙 16 个 bucket
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 初始化扩容后的数组
if (nextTab == null) { // initiating
try {@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
// 通过 for 自循环解决每个槽位中的链表元素,默认 advace 为真,通过 CAS 设置 transferIndex 属性值,并初始化 i 和 bound 值,i 指以后解决的槽位序号,bound 指须要解决的槽位边界,先解决槽位 15 的节点;for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
// 所有节点都有线程负责或者曾经扩容实现
if (--i >= bound || finishing)
advance = false;
// 将 i 值置 -1
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {// 确定以后线程每次调配的待迁徙桶的范畴为[bound, nextIndex)
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 以后线程本人的活曾经做完或所有线程的活都已做完,第二与第三个条件应该是上面让 "i = n" 后,再次进入循环时要做的边界查看。if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 扩容实现的后续工作
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 采纳 CAS 算法更新 SizeCtl, 设置 finishing 和 advance 标记
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 以后节点为 null,间接标记成 forwordingNode
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 以后节点曾经迁徙过
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 锁住头结点
synchronized (f) {if (tabAt(tab, i) == f) {
// 这里 ln,hn 是高下链表的思维,具体过程下图中会有阐明
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
// 红黑树也是高下链表的思维,具体过程下图中会有阐明
else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
复制代码
- 多线程开始扩容
- lastrun 节点
- 链表迁徙
- 红黑树迁徙
- 迁徙过程中 get 和 put 的操作的解决
- 并发迁徙
- 迁徙实现
小结 transfer
:
- 依据 CPU 外围数确定每个线程负责的桶数,默认每个线程 16 个桶
- 创立新数组,长度是原来数组的两倍
- 调配好以后线程负责的桶区域 [bound, nextIndex)
- 并发迁徙,依据链表和红黑树执行不同迁徙策略
- 迁徙实现,设置新的数组和新的扩容阈值
-
- *
至此,笔者曾经把 ConcurrentHashMap 几个重要的办法实现介绍完了。剩下的如 remove
、replace
等办法实现都大同小异,读者可自行钻研。
总结
通过以上对 ConcurrentHashMap 的初步探讨,置信读者也会和笔者一样惊叹于 Doug Lea 巨匠的编程程度和技巧。ConcurrentHashMap 类在 jdk8 中源码多达 6300 行,其中使用了大量的多线程与高并发的编程技术,如 volatile、synchronized、CAS、Unsafe、Thread 类相干 API,以及许多精美的算法,如 ConcurrentHashMap 底层数组的长度是 2 的幂次方以便用位运算计算元素下标,同时也不便计算扩容后的元素下标,还有令笔者惊叹的高下链表迁徙操作,诸如此类、不再赘述。
感激 Doug Lea 对于 Java 倒退做出的奉献,也心愿咱们能够向巨匠学习,磨炼本人的编程程度。借用笔者很喜爱的一个程序员大佬的一句话,学习是一条令人时而悲痛欲绝、时而郁郁寡欢的路线。共勉!