接上篇,持续剖析桶为链表状况下的数据迁徙。
扩容迁徙
首先 tab 的 size 始终是 2 的 n 次幂
,转换成二进制就是100..00
的模式
而落点桶的计算公式为:plot = (n-1)&hash
-> 11..11&hash
那么 hash 的第 n 个地位(runBit=hash&n)是 0 还是 1,决定了迁徙后的落点桶:
- 如果 runBit=0,那么新 tab 的落点为
plot
- 如果 runBit=1,那么新 tab 的落点为
plot+n
基于上述剖析,迁徙时会对桶中的链条从新组装(当然会先对头节点对象做 syn 加锁)
- runBit= 0 的组成 低位链条 ln,runBit= 1 的组成 高位链条 hn;并最终放入新 table 中(上图红色局部)
- 旧 tab 的迁徙桶会指向 Forwarding Node 节点(fwd,上图紫色局部)
- 随着 syn 锁开释,旧链 node1->node2->node3 会被 GC 回收(上图绿色局部)
给出 transfer() 办法
的源码:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// ## 迁徙分片,最小 MIN_TRANSFER_STRIDE(16)if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 创立新 tab
if (nextTab == null) { // initiating
try {@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
// transferIndex- 传输地位,初始化为旧数组大小
// stride- 迁徙分片
// nextn - 新数组的大小
int nextn = nextTab.length;
// A node inserted at head of bins during transfer operations.
// 迁徙过程中的非凡标记,在迁徙桶的头节点;它的 hash 值固定为 MOVED=-1
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 迁徙正在执行的标记
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
// i- 正在迁徙桶的索引;boud- 线程本次迁徙范畴的最小索引
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// ****** A、参数设置阶段 ******
while (advance) {
int nextIndex, nextBound;
// -- 2. 迁徙后续桶时 `--i`,证实迁徙方向:从右往左
if (--i >= bound || finishing)
advance = false;
// $$ 下一个迁徙地位 nextIndex,i<bound 时会执行(将 i =-1,设定了一个区间完结标记)else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// -- 1. 线程迁徙它的第一个桶时赋值:// 如果旧 tab 的 length 是 32
// 第一个线程:transferIndex=nextBound=16=bound、nextIndex=32(迁徙范畴,16-31)// 第二个线程:transferIndex=nextBound=0=bound、nextIndex=16(迁徙范畴,0-15)else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// ****** A、参数设置阶段 ******
// ****** B、完结断定阶段 ******
// $$ 满足 i =- 1 区间完结标记
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// tab 替换,新加载因子设置
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 完结条件
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// ****** B、完结断定阶段 ******
// ****** C、迁徙阶段 ******
// 迁徙桶为空,cas 形式搁置 fwd 标记节点
// f- 正在迁徙的节点
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 此桶的头节点是 fw,示意正在迁徙
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// ### 对正在迁徙的节点 f 加锁
synchronized (f) {if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// -- 链状况
if (fh >= 0) {
// runBit 取决于 fh 的第 n 地位,要么是 0,要么是 1
// 0 放在低位链 ln,1 放在高位链 hn
int runBit = fh & n;
// 高位或位置链的最初一个节点
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
// f 的下一个节点 p 的 hash 值的 n 地位。同样的,要么是 0,要么是 1
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// 高下位拆分双链
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
// 头插法组装链条
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
// -- 树状况
else if (f instanceof TreeBin) {// ...}
}
}
}
// ****** C、迁徙阶段 ******
}
}
元素计数
执行 put 办法后,会对元素个数进行统计
统计形式如图:
- 先尝试 简略模式,以 cas 形式对 baseCount 进行批改,批改胜利则完结
- 简略模式失败(比方竞争较多的状况),会尝试 简单模式,创立 counterCells 数组,并以此进行统计
统计形式依然通过 cas 的形式批改随机一个数组元素的 value 值,最终以 sum 所有数组元素 value 的形式计算出以后 map 的元素个数。
实现层面,会通过 cellsBusy 变量来管制对 counterCells 数组的操作(如 counterCells 扩容、桶初始化等)。
给出 fullAddCount() 办法
的源码:
private final void fullAddCount(long x, boolean wasUncontended) {
// ThreadLocalRandom 产生的随机数(多线程条件下的随机工具)int h;
// ThreadLocalRandom,须要初始化
if ((h = ThreadLocalRandom.getProbe()) == 0) {ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {CounterCell[] as; CounterCell a; int n; long v;
// == 一、counterCells 不为空的
if ((as = counterCells) != null && (n = as.length) > 0) {
// ## 1. 随机抉择的桶没有元素,须要新建
if ((a = as[(n - 1) & h]) == null) {
// -- cellsBusy- 标识位,用于初始化或扩容
if (cellsBusy == 0) { // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&
// cas 批改标识位胜利的,作桶位的初始化操作
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {rs[j] = r;
created = true;
}
}
// -- 创立后,重置标识位
finally {cellsBusy = 0;}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// ## 2. 随机抉择的桶有元素(a-CounterCell 对象),间接对 a 的 value 值做 cas 累加操作
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
else if (!collide)
collide = true;
// ## 3. 步骤 2 失败,做 counterCells 的 2 倍扩容
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {cellsBusy = 0;}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h);
}
// == 二、counterCells 为空,初始化操作
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {cellsBusy = 0;}
if (init)
break;
}
// == 三、兜底计划,仍应用 baseCount
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
get 办法
public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
// tab 存在,且依据 hash 计算出落点桶有元素
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// -- 比拟后 key 相等,间接返回 val
if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// -- 树或迁徙中(fwd)
else if (eh < 0){return (p = e.find(h, key)) != null ? p.val : null;
}
// -- 链:遍历链,找到就返回;找不到返回 null
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
次要关注一个问题:如果 要获取的元素正在迁徙中,源码中是如何解决的呢?
间接查看 fwd 的 find()
办法:
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
// 无 tab,或落点桶无元素,返回 null
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
// -- 2. 新 table 上找到元素,返回
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
// -- 1. 迁徙中,去新表查问
if (e instanceof ForwardingNode) {tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
// 树上查找
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
tableSizeFor 办法
最初察看一个有意思的办法
// 保障失去的是一个大于入参 c 的最小 2 的 n 次幂数
// 参考:https://www.cnblogs.com/xiyixiaodao/p/14483876.html
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}