本篇出处ConcurrentHashMap源码解析,引入请注明出处
如果想更具体理解ConcurrentHashMap 外部工作原来能够去看下我以前写过一篇HashMap源码解析理解下hash 算法原理、数组扩容、红黑树转换等等。
initTable
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) //这时曾经有其余线程获取到执行权,沉睡一会 Thread.yield(); // lost initialization race; just spin else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; //初始化数组 table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
这个是初始化map数组,外围是sizeCtl 这个变量:一个应用volatile润饰共享变量,作用通过替换比拟竞争获取初始化或者扩容数组执行权。当线程发现sizeCtl小于0的时候,线程就会让出执行权,当线程胜利竞争设置-1 就相当于获取到执行权,所以就能够去初始化数组了。当为负数时,保留下一个要调整Map大小的元素计数值
阐明下Unsafe 替换比拟办法
/** * 通过对象属性的值,批改前、更新后统一,才是更新胜利 * @param o 须要被批改的属性的对象 * @param l 对象Field的指针地址,能够了解成属性值援用 * @param i 批改前的值 * @param i1 批改后的值 * @return */ public final native boolean compareAndSwapInt(java.lang.Object o, long l, int i, int i1);
put办法解析
public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { //ConcurrentHashMap 不容许key value为空,因为在并发状况下不能通过获取get(key) 判断key存不存在 if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; K fk; V fv; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //通过计算hash失去数组下标,为空通过替换比拟设置进去,这时不须要加锁的 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) //这种状况阐明数组正在扩容,须要对链表和黑红树进行迁徙 tab = helpTransfer(tab, f); else if (onlyIfAbsent // check first node without acquiring lock && fh == hash && ((fk = f.key) == key || (fk != null && key.equals(fk))) && (fv = f.val) != null) return fv; else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { //头节点没有扭转,阐明获取锁过程,没有线程扩容 if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value); break; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } else if (f instanceof ReservationNode) throw new IllegalStateException("Recursive update"); } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
简略说下put 业务逻辑:
- 先判断table数组是否为空,不加锁,调用initTable 进行初始化
- 计算key 哈希值,通过hash计算出数组中下标,如果下标刚好为空,通过替换比拟设置进去。多个线程设置通过数组下标时,当设置失败时会不满足下标为空状况,进入获取头节点锁
- 此时节点有值,并且hash 值等于-1,则阐明数组正在扩容,调用helpTransfer 办法多线程进行copy数组
- 只锁住链表或红黑树根节点,先判断根节点是否等于获取锁的对象,因为有可能获取到锁对象曾经因为数组扩容进行偏移了 ,如果曾经进行偏移还在根节点进行插入会导致hash计算错误,导致get 获取不到值。这个是安全检查很重要的。
为什么要判断fh >= 0 次要是因为ConcurrentHashMap Node hash 有非凡意义
- int MOVED = -1; 正在进行数组扩容,筹备迁徙
- int TREEBIN = -2 红黑树根节点
- int RESERVED = -3; 长期保留节点
- 大于0就晓得这是一个链表,从头开始遍历到最初插入,Java8 链表改成尾插入有一个益处就是遍历完链表后就晓得链表长度了,binCount就是链表长度。每次遍历节点都会比拟key相等,笼罩旧值,否则在链表最初插入。
- 红黑树逻辑插入和链表差不多,通过putTreeVal 遍历并且插入节点,只有找到雷同key节点才会返回节点对象,如果是在红黑树下新增只会返回null。
- binCount是链表长度,如果长度大于链表长度阈值(默认8)转换成红黑树。因为下面遍历时遇到雷同key值,都会将节点赋值给oldVal,如果不为空则是笼罩,不须要执行累加,间接返回就能够了。
- addCount只有就是对map size进行累加,因为在并发状况下不能间接加锁住size办法进行加一,这违反设定的粗细粒锁的设定。理论状况比拟复制,数组扩容的逻辑也是在这个办法中实现的,上面具体分析下。
size() 办法
在剖析map size之前,看讲下size办法如何获取长度,有利于addCount 解说。
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } final long sumCount() { CounterCell[] cs = counterCells; long sum = baseCount; if (cs != null) { for (CounterCell c : cs) if (c != null) sum += c.value; } return sum; }
size长度次要是通过baseCount + CounterCell数组累加起来的。baseCount 只有当一个线程在应用map时,才会应用baseCount来记录size大小,当呈现多个线程线程同时对map操作时,就会放弃应用baseCount,而将每个线程操作数量 放入CounterCell数组中。能够了解CounterCell只是一个计算盒子,通过一些算法将不同线程操作数量放入到指定index地位,能够隔离线程对同个数竞争批改,有点相似hash 计算下标放入数组形式,这样能够缩小抵触次数。
addCount剖析
private final void addCount(long x, int check) { CounterCell[] cs; long b, s; if ((cs = counterCells) != null || !U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell c; long v; int m; boolean uncontended = true; // 此时cs 没有初始化,或者 cs刚刚初始化,长度还是0,通过线程随机数获取下标刚好为空 if (cs == null || (m = cs.length - 1) < 0 || (c = cs[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) { fullAddCount(x, uncontended); //将累加值增加到数组中 return; } if (check <= 1) return; s = sumCount(); } if (check >= 0) { //于于0不会判断数组扩容状况 Node<K,V>[] tab, nt; int n, sc; // 只有满足size长度等于或大于sizeCtl 扩容阈值长度,才会进行上面逻辑 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT; if (sc < 0) { //以后tab数组来扩容,通过竞争去争夺扩容权 if (sc == rs + MAX_RESIZERS || sc == rs + 1 || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSetInt(this, SIZECTL, sc, rs + 2)) transfer(tab, null); s = sumCount(); } } }
先从if判断开始,如果countCells 这个数组不空,第二个条件不必判断,进入countCells设置累加。第一个条件不成立就会执行 baseCount累加,累加胜利条件不成立,不进入if逻辑。
countCells 初始化、x值增加到数组上,或者多线程对同一个数组下标累加的管制都须要fullAddCount去实现的。
下面说过sizeCtl 为正整数就是数组容量扩容阈值,while 下先判断map size 是否达到或者超过阈值,符合条件就会
进入循环了进行数组扩容。当sizeCtl > 0 时则是多个线程在对数组扩容,这时须要竞争获取执行权。
具体分析下扩容条件如何成立的,首先直到
resizeStamp(n) << RESIZE_STAMP_SHIFT;
这个办法是将数组n,低位补零左移16为,相当于讲数组长度保留到高16位中。每一个线程参加并发就会在sizeCtl 上+1 ,低16位这样就是用来保留参加数组扩容数量。
sc > 0
- 这时还没有线程对tab数组进行扩容
sz < 0
- 曾经有线程在扩容,将sizeCtl+1并调用transfer()让以后线程参加扩容。
上面剖析下while 外面的判断
- sc == rs + MAX_RESIZERS 以后线程数曾经达到最大下限了,再有新的线程进来扩容就没有意义了
- sc == rs + 1 只有rs + 1胜利会在调用transfer,然而当初rs值曾经被其余线程批改了,累加曾经失败了,没有必要在执行一次替换比拟了。
- (nt = nextTable) == null 此时扩容曾经实现了,新的线程不用进入扩容办法了
- transferIndex <= 0 transferIndex tab数组没有调配的迁徙的数量,此时为0示意扩容线程曾经达到最大数量了,不用再应用新的线程进入了
细讲下fullAddCount 办法
ThreadLocalRandom.getProbe()能够简略了解成每一个线程hash值,然而这个值不是固定的,在找不到数组slot是,能够从新计算。
private final void fullAddCount(long x, boolean wasUncontended) { int h; if ((h = ThreadLocalRandom.getProbe()) == 0) { //初始化 ThreadLocalRandom.localInit(); // force initialization h = ThreadLocalRandom.getProbe(); wasUncontended = true; } boolean collide = false; for (;;) { CounterCell[] cs; CounterCell c; int n; long v; if ((cs = counterCells) != null && (n = cs.length) > 0) { if ((c = cs[(n - 1) & h]) == null) { if (cellsBusy == 0) { // Try to attach new Cell CounterCell r = new CounterCell(x); // Optimistic create if (cellsBusy == 0 && U.compareAndSetInt(this, CELLSBUSY, 0, 1)) { //竞争胜利了能够对数组批改 boolean created = false; try { // Recheck under lock CounterCell[] rs; int m, j; if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (!wasUncontended) // 这时CAS曾经失败了,持续自旋期待扩容 wasUncontended = true; // Continue after rehash else if (U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x)) //累加胜利,退出自旋 break; //当数组长度超过CPU个数了,这时就不应该扩容了,而是持续自旋直到累加胜利 else if (counterCells != cs || n >= NCPU) collide = false; // At max size or stale else if (!collide) //如果竞争持续失败了,不扩容的话会持续自旋 collide = true; else if (cellsBusy == 0 && //走到这里阐明第一次竞争失败了,有抵触间接对数据扩容 U.compareAndSetInt(this, CELLSBUSY, 0, 1)) { try { if (counterCells == cs) // Expand table unless stale counterCells = Arrays.copyOf(cs, n << 1); } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } //从新生成probe ,因为此时是CAS插入失败。 更换其余的插槽试试 h = ThreadLocalRandom.advanceProbe(h); } else if (cellsBusy == 0 && counterCells == cs && U.compareAndSetInt(this, CELLSBUSY, 0, 1)) {//初始化数组 boolean init = false; try { // Initialize table if (counterCells == cs) { CounterCell[] rs = new CounterCell[2]; rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } // 这时候counterCells 数组没有初始化,有没有多个线程竞争,能够应用baseCount 进行累加 else if (U.compareAndSetLong(this, BASECOUNT, v = baseCount, v + x)) break; // Fall back on using base } }
cellsBusy 这个属性跟咱们之前讲过sizeCtl 性能很相识的,都是来管制数组批改权限的。当cellsBusy =0时,counterCells数组能够被插入、扩容的 ,线程只有将cellsBusy设置成1就能够获取批改权限,改完后将cellsBusy变成0就能够了。
看过LongAdder源码的同学是不是感觉很相熟啊,不能说很相识,只是说是截然不同,就连变量名都是复制过去的。其实这个我违心讲这个细节的起因,波及了LongAdder设计思维,value值拆散成一个数组,当多线程拜访时,通过hash算法映射到其中的一个数字进行计数
transfer
接着看下map在多线程下如何进行扩容的
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; //这里是计算一个线程最大要迁徙数组个数,相当于将数组分拆成这么多局部,每个线程最大能够解决 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; //数组下标挪动标记 boolean finishing = false; //一个线程负责数组上元素是否曾经全副迁徙实现 //i 示意上一次调配残余数量 bound数组当初剩下数量 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; //这里只是为了i-1,或者就是所有数组元素曾经全副迁徙实现了 if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) {//数组曾经全副被标注完了 i = -1; advance = false; } //这里给进来线程调配迁徙数量,调配胜利就会跳出wile循环到上面去执行copy else if (U.compareAndSetInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } // 这三个条件任意一个成立都阐明旧数据曾经齐全迁徙实现 了 if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //在addCount 时 sc +2 就会进入扩容,当初再减回去 如果不相等则示意当初还要线程在解决数据扩容,否则将finish改成true示意扩容曾经完结 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } //上面进入数组元素迁徙,每解决完一个将advance 改成true 从新进入while else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //表明这个节点曾经有其余线程在解决了 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { //要迁徙这个元素,间接锁住 synchronized (f) { if (tabAt(tab, i) == f) { //双重查看 Node<K,V> ln, hn; //将链表分拆成两个 ln示意低位 hn高位 if (fh >= 0) { //下面说过,只有hash大于0就是链表 int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { //这里先找出最初一个元素 int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } //将链表拆成两个,别离连起来了 for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); // 标记旧数组,此下标不能够应用了 setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { //红黑树原来和下面差不多了 TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; //遍历红黑树依然应用链表下标去做,找到最初一个 for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //新产生红黑树链表不满足阈值,转换成链表 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } else if (f instanceof ReservationNode) throw new IllegalStateException("Recursive update"); } } } } }
ForwardingNode 这个是一个非凡节点,在数组迁徙过程中,曾经被迁徙过来节点会被标注成这个对象。避免在迁徙数组过程中插入到旧数组中。外部封装一个find办法,能够去新数组中查找key value,上面get办法会用到的。
如何反对并发状况下,调配各个线程工作的。每一个线程进入都会在双层循环的while中调配到工作。刚进来时后面两个if、else if不会满足条件,然而会讲transferIndex从新赋值给nextIndex,此时nextIndex还是数组长度来的,当开始竞争批改transferIndex 时,设置胜利的线程,调配到了数组下标 tab.leng -1 到 tab.length -1 - stride 这个范畴的下标,从数组最初往前进行元素调配,并且将迁徙实现index设置bound,i为数组copy的下标,当下标曾经迁徙到下个线程开始地位时,就会跳出while循环了,否则每次进入while就为了让下标 -1。竞争失败的线程会从新进入whle循环,持续去从transferIndex获取调配数量。当transferIndex不为空时,示意数组依然有任务分配,持续去竞争获取。
if (i < 0 || i >= n || i + n >= nextn) { 当i < 0 成立时示意以后线程曾经做完copy工作了, i >= n || i + n >= nextn都是跟上面i = n 相干,对以后数组进行边界查看。咱们下面说过sizeCtl低16位表白以后线程扩容数量,让一个线程实现工作后,在sizeCtl -1 。sizeCtl -2 不相等次要和扩容时sizeCtl +2 调用transfer 向对应,不相等阐明此时依然有其余线程正在copy中,扩容没有实现的,曾经实现线程主动退出扩容办法。让最初一个线程将实现收尾工作,将nextTab 变成tab。
get办法
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { //数组元素就是以后key if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) //这里是红黑树或者ForwardingNode ,应用外部封装办法去查问 return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
get办法没有应用锁,反对最大并发读取,而且不会呈现平安问题。当eh < 0 ,这时是红黑树的话,调用红黑树遍历办法去操作节点。如果此时ForwardingNode,尽管这个节点的数据曾经被迁徙到新数组去了,然而外部封装find办法,去新的数组中查找。无论是哪种节点都能够找到对应key value。
删除办法
public V remove(Object key) { return replaceNode(key, null, null); } final V replaceNode(Object key, V value, Object cv) { int hash = spread(key.hashCode()); //计算hash值 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null) //找不到对应key 节点,退出循环 break; else if ((fh = f.hash) == MOVED) //参考下面put 此时数组在扩容,线程帮忙进入扩容 tab = helpTransfer(tab, f); else { V oldVal = null; boolean validated = false; synchronized (f) { //锁住数组下标节点 if (tabAt(tab, i) == f) { if (fh >= 0) { //此时是链表构造 validated = true; for (Node<K,V> e = f, pred = null;;) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { //曾经在链表找到对应元素了 V ev = e.val; if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { oldVal = ev; if (value != null) //替换旧值 e.val = value; //前一个节点不为空,代替的值为空,则删除以后节点,扭转前后指引就行了 else if (pred != null) pred.next = e.next; else //后面节点为空,头节点就是要被删除的 setTabAt(tab, i, e.next); // } break; } pred = e; if ((e = e.next) == null) //遍历到最初一个,退出循环 break; } } else if (f instanceof TreeBin) { validated = true; TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> r, p; if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { //应用红黑树办法去查找 V pv = p.val; if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { oldVal = pv; if (value != null) p.val = value; else if (t.removeTreeNode(p)) //从黑红树删除节点 setTabAt(tab, i, untreeify(t.first)); } } } else if (f instanceof ReservationNode) throw new IllegalStateException("Recursive update"); } } if (validated) { if (oldVal != null) { //旧值不存在,才会真正去删除节点数据 if (value == null) addCount(-1L, -1); //size -1 return oldVal; } break; } } } return null; }
材料援用
https://xilidou.com/2018/11/27/LongAdder/
https://cloud.tencent.com/developer/article/1497528
https://blog.csdn.net/zzu_seu/article/details/106698150