共计 6608 个字符,预计需要花费 17 分钟才能阅读完成。
该系列文章收录在公众号【Ccww 技术博客】,原创技术文章早于博客推出
在上篇《面试:为了进阿里,死磕了 ConcurrentHashMap 源码和面试题(一)》
,钻研了根底原理,以及 ConcurrentHashMap 数据 put 的流程等线程平安的,来回顾一下面试的问题点:
-
ConcurrentHashMap 的实现原理
- ConcurrentHashMap1.7 和 1.8 的区别?
- ConcurrentHashMap 应用什么技术来保障线程平安
-
ConcurrentHashMap 的 put()办法
- ConcurrentHashmap 不反对 key 或者 value 为 null 的起因?
- put()办法如何实现线程平安呢?
- ConcurrentHashMap 扩容机制
- ConcurrentHashMap 的 get 办法是否要加锁,为什么?
-
其余问题
- 为什么应用 ConcurrentHashMap
- ConcurrentHashMap 迭代器是强一致性还是弱一致性?HashMap 呢?
- JDK1.7 与 JDK1.8 中 ConcurrentHashMap 的区别
那咱们接下持续看看 CurrentHashMap 核心内容,扩容机制。
ConcurrentHashMap 的扩容机制
-
扩容变量
// 新 tab 的 length int nextn = nextTab.length; // 创立一个 fwd 节点,用于占位。当别的线程发现这个槽位中是 fwd 类型的节点,则跳过这个节点。ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 首次推动为 true,如果等于 true,阐明须要再次推动一个下标(i--)// 反之,如果是 false,那么就不能推动下标,须要将以后的下标处理完毕能力持续推动 boolean advance = true; // 实现状态,如果是 true,就完结此办法。boolean finishing = false; // to ensure sweep before committing nextTab
-
因为
ConcurrentHashMap
反对多线程扩容,多个线程解决不同的节点,首先先计算出每个线程(CPU)解决的桶数:将 length / 8 而后除以 CPU 外围数。如果失去的后果小于 16,那么就应用 16。(避免出现转移工作不平均的景象)int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE;
-
新的 table 尚未初始化,进行 2 倍扩容
if (nextTab == null) { // initiating try { // 扩容 2 倍 Node<K,V>[] nt = (Node<K,V>\[])new Node<?,?>[n << 1]; // 更新 nextTab = nt; } catch (Throwable ex) { // try to cope with OOME // 扩容失败,sizeCtl 应用 int 最大值。sizeCtl = Integer.MAX_VALUE; return;// 完结 } // 更新成员变量 nextTable = nextTab; // 更新转移下标,就是 老的 tab 的 length transferIndex = n; }
-
在死循环中,每个线程先获得本人须要转移的桶的区间:先获取 CAS 批改 transferIndex,即 length – 区间值,留下残余的区间值供前面的线程应用(i 示意下标,bound 示意以后线程能够解决的以后桶区间最小下标)。
- 判断
--i
是否大于等于bound
,失常状况下,如果大于 bound 不成立,阐明该线程上次支付的工作曾经实现了。那么,须要在上面持续支付工作。 transferIndex
小于等于 0,阐明没有区间了,i 改成 -1,推动状态变成 false,不再推动,示意,扩容完结了,以后线程能够退出了- 第一次进入循环,走上面的 nextIndex 赋值操作(获取最新的转移下标)。其余状况都是:如果能够推动,将 i 减一,而后批改成不可推动。如果 i 对应的桶解决胜利了,改成能够推动。
int nextIndex, nextBound; if (--i >= bound || finishing) // 是为了避免在没有胜利解决一个桶的状况下却进行了推动 advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if ((nextIndex = transferIndex) <= 0) { // 如果小于等于 0,阐明没有区间了,i 改成 -1,// 推动状态变成 false,不再推动,示意,扩容完结了,以后线程能够退出了 // 这个 -1 会在上面的 if 块里判断,从而进入实现状态判断 i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { // 以后线程能够解决的最小以后区间最小下标 bound = nextBound; // 首次对 i 赋值,这个就是以后线程能够解决的以后区间的最大下标 i = nextIndex - 1; advance = false; }
- 判断
-
判断该节点是否须要进行扩容解决
-
是否已实现扩容
finishing
为true
,实现扩容-
如果没实现
- 这个线程完结帮忙扩容了
U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)
为 true
- 这个线程完结帮忙扩容了
(f = tabAt(tab, i)) == null
, 获取老 tab i 下标地位的变量,如果是 null,写入 fwd 占位,推动下个下标(fh = f.hash) == MOVED
阐明别的线程曾经解决过了,再次推动一个下标。-
以上状况都不合乎就阐明,这个地位有理论值了,且不是占位符,须要对这个节点
synchronized
上锁,进行数据迁徙if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { // 如果实现了扩容 nextTable = null;// 删除成员变量 table = nextTab;// 更新 table sizeCtl = (n << 1) - (n >>> 1); // 更新阈值 return;// 完结办法。}// 如果没实现 // 尝试将 sc -1. 示意这个线程完结帮忙扩容了,将 sc 的低 16 位减一。if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 如果 sc - 2 等于标识符左移 16 位,阐明没有线程在帮忙他们扩容了 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)。return;// 不相等,阐明没完结,以后线程完结办法。finishing = advance = true;// 如果相等,扩容完结了,更新 finising 变量 i = n; // 再次循环检查一下整张表 } } // 获取老 tab i 下标地位的变量,如果是 null 就写入 fwd 占位,再次推动一个下标 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); // 如果不是 null 且 hash 值是 MOVED, 阐明别的线程曾经解决过了,再次推动一个下标。else if ((fh = f.hash) == MOVED) advance = true; else {// 到这里,阐明这个地位有理论值了,且不是占位符。对这个节点上锁。// 为什么上锁,避免 putVal 的时候向链表插入数据 synchronized (f) {....}
- 扩容时,对该节点
synchronized
加锁, 再进行解决, 判断 i 下标处的桶节点是否和 f 雷同:
-
如果 f 的 hash 值大于 0 , 是链表构造,依据以后节点和首节点的
hash &n
值取于后果不同,进行解决:- 相等为低位节点解决
- 不相等为高位节点解决
if (fh >= 0) { // 获取以后 int runBit = fh & n; // 尾节点,且和头节点的 hash 值取于不相等 Node<K,V> lastRun = f; // 遍历这个桶 for (Node<K,V> p = f.next; p != null; p = p.next) { // 取于桶中每个节点的 hash 值 int b = p.hash & n; // 如果节点的 hash 值和首节点的 hash 值取于后果不同 if (b != runBit) { // 更新 runBit,用于上面判断 lastRun 该赋值给 ln 还是 hn。runBit = b; lastRun = p; } } // 如果最初更新的 runBit 是 0,设置低位节点 if (runBit == 0) { ln = lastRun; hn = null; } else {// 如果最初更新的 runBit 是 1,设置高位节点 hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; // 如果与运算后果是 0,那么就还在低位 if ((ph & n) == 0) // 如果是 0,那么创立低位节点 ln = new Node<K,V>(ph, pk, pv, ln); else // 1 则创立高位 hn = new Node<K,V>(ph, pk, pv, hn); } // 其实这里相似 hashMap // 设置低位链表放在新链表的 i setTabAt(nextTab, i, ln); // 设置高位链表,在原有长度上加 n setTabAt(nextTab, i + n, hn); // 将旧的链表设置成占位符 setTabAt(tab, i, fwd); // 持续向后推动 advance = true; }
- TreeBin 的 hash 是 -2,是红黑树结构进行解决
else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; // 遍历 for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); // 和链表雷同的判断,与运算 == 0 的放在低位 if ((h & n) == 0) {if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } // 不是 0 的放在高位 else {if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } // 如果树的节点数小于等于 6,那么转成链表,反之,创立一个新的树 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; // 低位树 setTabAt(nextTab, i, ln); // 高位数 setTabAt(nextTab, i + n, hn); // 旧的设置成占位符 setTabAt(tab, i, fwd); // 持续向后推动 advance = true; }
-
当 ConcurrentHashMap
中元素的数量达到 cap * loadFactor
时,就须要进行扩容。扩容次要通过 transfer()
办法进行,当有线程进行 put
操作时,如果正在进行扩容,能够通过 helpTransfer()
办法退出扩容。也就是说,ConcurrentHashMap 反对多线程扩容,多个线程解决不同的节点,实现形式是,将 Map 表拆分,让每个线程解决本人的区间。如下图:
### ConcurrentHashMap 的 get 办法是否要加锁,为什么?
public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
// 满足条件间接返回对应的值
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//e.hash<0,正在扩容
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 遍历以后节点
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
ConcurrentHashMap 的 get 办法就是从 Hash 表中读取数据,并不会与扩容不抵触,因而该办法也不须要同步锁,这样可进步 ConcurrentHashMap 的并发性能。
——
### 总结
#### 为什么应用 ConcurrentHashMap
- HashMap 在多线程中进行 put 办法有可能导致程序死循环,因为多线程可能会导致 HashMap 造成环形链表,(即链表的一个节点的 next 节点永不为 null,就会产生死循环), 会导致 CPU 的利用率靠近 100%,因而并发状况下不能应用 HashMap。
- HashTable 通过应用 synchronized 保障线程平安,但在线程竞争强烈的状况下效率低下。因为当一个线程拜访 HashTable 的同步办法时,其余线程只能阻塞期待占用线程操作结束。
- ConcurrentHashMap 应用分段锁的思维,对于不同的数据段应用不同的锁,能够反对多个线程同时拜访不同的数据段,这样线程之间就不存在锁竞争,从而进步了并发效率。
#### ConcurrentHashMap 迭代器是强一致性还是弱一致性?HashMap 呢?
在迭代时,ConcurrentHashMap 应用了不同于传统汇合的疾速失败迭代器,弱统一迭代器。
在这种迭代形式中,当 iterator 被创立后汇合再产生扭转就不再是抛出 ConcurrentModificationException,取而代之的是在扭转时 new 新的数据从而不影响原有的数据,iterator 实现后再将头指针替换为新的数据,
这样 iterator 线程能够应用原来老的数据,而写线程也能够并发的实现扭转,更重要的,这保障了多个线程并发执行的连续性和扩展性,是性能晋升的要害。
#### JDK1.7 与 JDK1.8 中 ConcurrentHashMap 的区别
其实能够看出 JDK1.8 版本的 ConcurrentHashMap 的数据结构曾经靠近 HashMap,
- 相对而言,ConcurrentHashMap 只是减少了同步的操作来管制并发,从 JDK1.7 版本的 ReentrantLock+Segment+HashEntry,到 JDK1.8 版本中 synchronized+CAS+HashEntry+ 红黑树。
- 数据结构:勾销了 Segment 分段锁的数据结构,取而代之的是数组 + 链表 + 红黑树的构造。
- 保障线程平安机制:JDK1.7 采纳 segment 的分段锁机制实现线程平安,其中 segment 继承自 ReentrantLock。JDK1.8 采纳 CAS+Synchronized 保障线程平安。
- 锁的粒度:原来是对须要进行数据操作的 Segment 加锁,现调整为对每个数组元素加锁(Node)。
- 链表转化为红黑树: 定位结点的 hash 算法简化会带来弊病,Hash 抵触加剧, 因而在链表节点数量大于 8 时,会将链表转化为红黑树进行存储。
- 查问工夫复杂度:从原来的遍历链表 O(n),变成遍历红黑树 O(logN)。
各位看官还能够吗?喜爱的话,动动手指导个????,点个关注呗!!谢谢反对!
欢送关注公众号【Ccww 技术博客】,原创技术文章第一工夫推出