本文首发于 vivo互联网技术 微信公众号
链接:https://mp.weixin.qq.com/s/4sz6sTPvBigR_1g8piFxug
作者:vivo 游戏技术团队
一、概述
ConcurrentHashMap (以下简称C13Map) 是并发编程出场率最高的数据结构之一,大量的并发CASE背地都有C13Map的反对,同时也是JUC包中代码量最大的组件(6000多行),自JDK8开始Oracle对其进行了大量优化工作。
本文从 HashMap 的基础知识开始,尝试逐个剖析C13Map中各个组件的实现和安全性保障。
二、HashMap基础知识
剖析C13MAP前,须要理解以下的HashMap常识或者约定:
- 哈希表的长度永远都是2的幂次方,起因是hashcode%tableSize==hashcode&(tableSize-1),也就是哈希槽位的确定能够用一次与运算来代替取余运算。
- 会对hashcode调用若干次扰动函数,将高16位与低16位做异或运算,因为高16位的随机性更强。
- 当表中的元素总数超过tableSize * 0.75时,哈希表会产生扩容操作,每次扩容的tableSize是原先的两倍。
- 下文提到的槽位(bucket)、哈希分桶、BIN均示意同一个概念,即哈希table上的某一列。
- 旧表在做搬运时i槽位的node能够依据其哈希值的第tableSize位的bit决定在新表上的槽位是i还是i+tableSize。
- 每个槽位上有可能会呈现哈希抵触,在未达到某个阈值时它是一个链表构造,达到阈值后会降级到红黑树结构。
- HashMap自身并非为多线程环境设计,永远不要尝试在并发环境下间接应用HashMap,C13Map不存在这个平安问题。
三、C13Map的字段定义
C13Map的字段定义
//最大容量private static final int MAXIMUM_CAPACITY = 1 << 30; //默认初始容量private static final int DEFAULT_CAPACITY = 16; //数组的最大容量,避免抛出OOMstatic final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //最大并行度,仅用于兼容JDK1.7以前版本private static final int DEFAULT_CONCURRENCY_LEVEL = 16; //扩容因子private static final float LOAD_FACTOR = 0.75f; //链表转红黑树的阈值static final int TREEIFY_THRESHOLD = 8; //红黑树进化阈值static final int UNTREEIFY_THRESHOLD = 6; //链表转红黑树的最小总量static final int MIN_TREEIFY_CAPACITY = 64; //扩容搬运时批量搬运的最小槽位数private static final int MIN_TRANSFER_STRIDE = 16; //以后待扩容table的邮戳位,通常是高16位private static final int RESIZE_STAMP_BITS = 16; //同时搬运的线程数自增的最大值private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; //搬运线程数的标识位,通常是低16位private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; static final int MOVED = -1; // 阐明是forwardingNodestatic final int TREEBIN = -2; // 红黑树static final int RESERVED = -3; // 原子计算的占位Nodestatic final int HASH_BITS = 0x7fffffff; // 保障hashcode扰动计算结果为负数 //以后哈希表transient volatile Node<K,V>[] table; //下一个哈希表private transient volatile Node<K,V>[] nextTable; //计数的基准值private transient volatile long baseCount; //控制变量,不同场景有不同用处,参考下文private transient volatile int sizeCtl; //并发搬运过程中CAS获取区段的下限值private transient volatile int transferIndex; //计数cell初始化或者扩容时基于此字段应用自旋锁private transient volatile int cellsBusy; //减速多核CPU计数的cell数组private transient volatile CounterCell[] counterCells;
四、平安操作Node<K,V>数组
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getReferenceAcquire(tab, ((long)i << ASHIFT) + ABASE);}static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSetReference(tab, ((long)i << ASHIFT) + ABASE, c, v);}static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putReferenceRelease(tab, ((long)i << ASHIFT) + ABASE, v);}
对Node<K,V>[] 上任意一个index的读取和写入都应用了Unsafe辅助类,table自身是volatile类型的并不能保障其下的每个元素的内存语义也是volatile类型;
须要借助于Unsafe来保障Node<K,V>[]元素的“读/写/CAS”操作在多核并发环境下的原子或者可见性。
五、读操作get为什么是线程平安的
首先须要明确的是,C13Map的读操作个别是不加锁的(TreeBin的读写锁除外),而读操作与写操作有可能并行;能够保障的是,因为C13Map的写操作都要获取bin头部的syncronized互斥锁,能保障最多只有一个线程在做更新,这其实是一个单线程写、多线程读的并发安全性的问题。
C13Map的get办法
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; //执行扰动函数 int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null;}
1、如果以后哈希表table为null
哈希表未初始化或者正在初始化未实现,间接返回null;尽管line5和line18之间其它线程可能经验了千山万水,至多在判断tab==null的工夫点key必定是不存在的,返回null合乎某一时刻的客观事实。
2、如果读取的bin头节点为null
阐明该槽位尚未有节点,间接返回null。
3、如果读取的bin是一个链表
阐明头节点是个一般Node。
(1)如果正在产生链表向红黑树的treeify工作,因为treeify自身并不毁坏旧的链表bin的构造,只是在全副treeify实现后将头节点一次性替换为新创建的TreeBin,能够释怀读取。
(2)如果正在产生resize且以后bin正在被transfer,因为transfer自身并不毁坏旧的链表bin的构造,只是在全副transfer实现后将头节点一次性替换为ForwardingNode,能够释怀读取。
(3)如果其它线程正在操作链表,在以后线程遍历链表的任意一个工夫点,都有可能同时在产生add/replace/remove操作。
- 如果是add操作,因为链表的节点新增从JDK8当前都采纳了后入式,无非是多遍历或者少遍历一个tailNode。
- 如果是remove操作,存在遍历到某个Node时,正好有其它线程将其remove,导致其孤立于整个链表之外;但因为其next援用未产生变更,整个链表并没有断开,还是能够照常遍历链表直到tailNode。
- 如果是replace操作,链表的构造未变,只是某个Node的value产生了变动,没有平安问题。
论断:对于链表这种线性数据结构,单线程写且插入操作保障是后入式的前提下,并发读取是平安的;不会存在误读、链表断开导致的漏读、读到环状链表等问题。
4、如果读取的bin是一个红黑树
阐明头节点是个TreeBin节点。
(1)如果正在产生红黑树向链表的untreeify操作,因为untreeify自身并不毁坏旧的红黑树结构,只是在全副untreeify实现后将头节点一次性替换为新创建的一般Node,能够释怀读取。
(2)如果正在产生resize且以后bin正在被transfer,因为transfer自身并不毁坏旧的红黑树结构,只是在全副transfer实现后将头节点一次性替换为ForwardingNode,能够释怀读取。
(3)如果其余线程在操作红黑树,在以后线程遍历红黑树的任意一个工夫点,都可能有单个的其它线程产生add/replace/remove/红黑树的翻转等操作,参考上面的红黑树的读写锁实现。
TreeBin中的读写锁实现
TreeNode<K,V> root; volatile TreeNode<K,V> first; volatile Thread waiter; volatile int lockState; // values for lockState static final int WRITER = 1; // set while holding write lock static final int WAITER = 2; // set when waiting for write lock static final int READER = 4; // increment value for setting read lock private final void lockRoot() { //如果一次性获取写锁失败,进入contendedLock循环体,循环获取写锁或者休眠期待 if (!U.compareAndSetInt(this, LOCKSTATE, 0, WRITER)) contendedLock(); // offload to separate method } private final void unlockRoot() { lockState = 0; } //对红黑树加互斥锁,也就是写锁 private final void contendedLock() { boolean waiting = false; for (int s;;) { //如果lockState除了第二位外其它位上都为0,示意红黑树以后既没有上读锁,又没有上写锁,仅有可能存在waiter,能够尝试间接获取写锁 if (((s = lockState) & ~WAITER) == 0) { if (U.compareAndSetInt(this, LOCKSTATE, s, WRITER)) { if (waiting) waiter = null; return; } } //如果lockState第二位是0,示意以后没有线程在期待写锁 else if ((s & WAITER) == 0) { //将lockState的第二位设置为1,相当于打上了waiter的标记,示意有线程在期待写锁 if (U.compareAndSetInt(this, LOCKSTATE, s, s | WAITER)) { waiting = true; waiter = Thread.currentThread(); } } //休眠以后线程 else if (waiting) LockSupport.park(this); } } //查找红黑树中的某个节点 final Node<K,V> find(int h, Object k) { if (k != null) { for (Node<K,V> e = first; e != null; ) { int s; K ek; //如果以后有waiter或者有写锁,走线性检索,因为红黑树尽管代替了链表,但其外部仍然保留了链表的构造,尽管链表的查问性能个别,但依据先前的剖析其读取的安全性有保障。 //发现有写锁改走线性检索,是为了防止期待写锁开释花去太久工夫; 而发现有waiter改走线性检索,是为了防止读锁叠加的太多,导致写锁线程须要期待太长的工夫; 实质上都是为了缩小读写碰撞 //线性遍历的过程中,每遍历到下一个节点都做一次判断,一旦发现锁竞争的可能性缩小就改走tree检索以进步性能 if (((s = lockState) & (WAITER|WRITER)) != 0) { if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; e = e.next; } //对红黑树加共享锁,也就是读锁,CAS一次性减少4,也就是减少的只是3~32位 else if (U.compareAndSetInt(this, LOCKSTATE, s, s + READER)) { TreeNode<K,V> r, p; try { p = ((r = root) == null ? null : r.findTreeNode(h, k, null)); } finally { Thread w; //开释读锁,如果开释结束且有waiter,则将其唤醒 if (U.getAndAddInt(this, LOCKSTATE, -READER) == (READER|WAITER) && (w = waiter) != null) LockSupport.unpark(w); } return p; } } } return null; } //更新红黑树中的某个节点 final TreeNode<K,V> putTreeVal(int h, K k, V v) { Class<?> kc = null; boolean searched = false; for (TreeNode<K,V> p = root;;) { int dir, ph; K pk; //...省略解决红黑树数据结构的代码若干 else { //写操作前加互斥锁 lockRoot(); try { root = balanceInsertion(root, x); } finally { //开释互斥锁 unlockRoot(); } } break; } } assert checkInvariants(root); return null; }}
红黑树内置了一套读写锁的逻辑,其外部定义了32位的int型变量lockState,第1位是写锁标记位,第2位是写锁期待标记位,从3~32位则是共享锁标记位。
读写操作是互斥的,容许多个线程同时读取,但不容许读写操作并行,同一时刻只容许一个线程进行写操作;这样任意工夫点读取的都是一个非法的红黑树,整体上是平安的。
有的同学会产生纳闷,写锁开释时为何没有将waiter唤醒的操作呢?是否有可能A线程进入了期待区,B线程获取了写锁,开释写锁时仅做了lockState=0的操作。
那么A线程是否就没有机会被唤醒了,只有期待下一个读锁开释时的唤醒了呢 ?
显然这种状况违反常理,C13Map不会呈现这样的疏漏,再进一步察看,红黑树的变更操作的外围,也就是在putValue/replaceNode那一层,都是对BIN的头节点加了synchornized互斥锁的,同一时刻只能有一个写线程进入TreeBin的办法范畴内,当写线程发现以后waiter不为空,其实此waiter只能是以后线程本人,能够释怀的获取写锁,不必放心无奈被唤醒的问题。
TreeBin在find读操作检索时,在linearSearch(线性检索)和treeSearch(树检索)间做了折衷,前者性能差但并发平安,后者性能佳但要做并发管制,可能导致锁竞争;设计者应用线性检索来尽量避免读写碰撞导致的锁竞争,但评估到race condition已隐没时,又立刻趋向于改用树检索来进步性能,在平安和性能之间做到了极佳的均衡。具体的折衷策略请参考find办法及正文。
因为有线性检索这样一个抄底计划,以及入口处bin头节点的synchornized机制,保障了进入到TreeBin整体代码块的写线程只有一个;TreeBin中读写锁的整体设计与ReentrantReadWriteLock相比还是简略了不少,比方并未定义用于寄存待唤醒线程的threadQueue,以及读线程仅会自旋而不会阻塞等等, 能够看做是特定条件下ReadWriteLock的简化版本。
5、如果读取的bin是一个ForwardingNode
阐明以后bin已迁徙,调用其find办法到nextTable读取数据。
forwardingNode的find办法
static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null); this.nextTable = tab; } //递归检索哈希表链 Node<K,V> find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes outer: for (Node<K,V>[] tab = nextTable;;) { Node<K,V> e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else return e.find(h, k); } if ((e = e.next) == null) return null; } } } }
ForwardingNode中保留了nextTable的援用,会转向下一个哈希表进行检索,但并不能保障nextTable就肯定是currentTable,因为在高并发插入的状况下,极短时间内就能够导致哈希表的屡次扩容,内存中极有可能驻留一条哈希表链,彼此以bin的头节点上的ForwardingNode相连,线程刚读取时拿到的是table1,遍历时却有可能经验了哈希表的链条。
eh<0有三种状况:
- 如果是ForwardingNode持续遍历下一个哈希表。
- 如果是TreeBin,调用其find办法进入TreeBin读写锁的保护区读取数据。
- 如果是ReserveNode,阐明以后有compute计算中,整条bin还是一个空构造,间接返回null。
6、如果读取的bin是一个ReserveNode
ReserveNode用于compute/computeIfAbsent原子计算的办法,在BIN的头节点为null且计算尚未实现时,先在bin的头节点打上一个ReserveNode的占位标记。
读操作发现ReserveNode间接返回null,写操作会因为抢夺ReserveNode的互斥锁而进入阻塞态,在compute实现后被唤醒后循环重试。
六、写操作putValue/replaceNode为什么是线程平安的
典型的编程范式如下:
C13Map的putValue办法
Node<K,V>[] tab = table; //将堆中的table变量赋给线程堆栈中的局部变量Node f = tabAt(tab, i );if(f==null){ //以后槽位没有头节点,间接CAS写入 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value))) break;}else if(f.hash == MOVED){ //退出帮助搬运行列 helpTransfer(tab,f);}//不是forwardingNodeelse if(f.hash != MOVED){ //先锁住I槽位上的头节点 synchronized (f) { //再doubleCheck看此槽位上的头节点是否还是f if (tabAt(tab, i) == f) { ...各种写操作 } }}
1、以后槽位如果头节点为null时,间接CAS写入
有人兴许会质疑,如果写入时resize操作已实现,产生了table向nextTable的转变,是否会存在写入的是旧表的bin导致数据失落的可能 ?
这种可能性是不存在的,因为一个table在resize实现后所有的BIN都会被打上ForwardingNode的标记,能够形象的了解为所有槽位上都插满了红旗,而此处在CAS时的compare的变量null,可能保障至多在CAS原子操作产生的工夫点table并未产生变更。
2、以后槽位如果头节点不为null
这里采纳了一个小技巧:先锁住I槽位上的头节点,进入同步代码块后,再doubleCheck看此槽位上的头节点是否有变动。
进入同步块后还须要doubleCheck的起因:尽管一开始获取到的头节点f并非ForwardingNode,但在获取到f的同步锁之前,可能有其它线程提前获取了f的同步锁并实现了transfer工作,并将I槽位上的头节点标记为ForwardingNode,此时的f就成了一个过期的bin的头节点。
然而因为标记操作与transfer作为一个整体在同步的代码块中执行,如果doubleCheck的后果是此槽位上的头节点还是f,则表明至多在以后工夫点该槽位还没有被transfer到新表(如果以后有transfer in progress的话),能够释怀的对该bin进行put/remove/replace等写操作。
只有未产生transfer或者treeify操作,链表的新增操作都是采取后入式,头节点一旦确定不会轻易扭转,这种后入式的更新形式保障了锁定头节点就等于锁住了整个bin。
如果不作doubleCheck判断,则有可能以后槽位已被transfer,写入的还是旧表的BIN,从而导致写入数据的失落;也有可能在获取到f的同步锁之前,其它线程对该BIN做了treeify操作,并将头节点替换成了TreeBin, 导致写入的是旧的链表,而非新的红黑树;
3、doubleCheck是否有ABA问题
兴许有人会质疑,如果有其它线程提前对以后bin进行了的remove/put的操作,引入了新的头节点,并且恰好产生了JVM的内存开释和重新分配,导致新的Node的援用地址恰好跟旧的雷同,也就是存在所谓的ABA问题。
这个能够通过反证法来颠覆,在带有GC机制的语言环境下通常不会产生ABA问题,因为以后线程蕴含了对头节点f的援用,以后线程并未沦亡,不可能存在f节点的内存被GC回收的可能性。
还有人会质疑,如果在写入过程中主哈希表产生了变动,是否可能写入的是旧表的bin导致数据失落,这个也能够通过反证法来颠覆,因为table向nextTable的转化(也就是将resize后的新哈希表正式commit)只有在所有的槽位都曾经transfer胜利后才会进行,只有有一个bin未transfer胜利,则阐明以后的table未发生变化,在以后的工夫点能够释怀的向table的bin内写入数据。
4、如何操作才平安
能够总结出法则,在对table的槽位胜利进行了CAS操作且compare值为null,或者对槽位的非forwardingNode的头节点加锁后,doubleCheck头节点未发生变化,对bin的写操作都是平安的。
七、原子计算相干办法
原子计算次要包含:computeIfAbsent、computeIfPresent、compute、merge四个办法。
1、几个办法的比拟
次要区别如下:
(1)computeIfAbsent只会在判断到key不存在时才会插入,判空与插入是一个原子操作,提供的FunctionalInterface是一个二元的Function, 承受key参数,返回value后果;如果计算结果为null则不做插入。
(2)computeIfPresent只会在判读单到Key非空时才会做更新,判断非空与插入是一个原子操作,提供的FunctionalInterface是一个三元的BiFunction,承受key,value两个参数,返回新的value后果;如果新的value为null则删除key对应节点。
(3)compute则不加key是否存在的限度,提供的FunctionalInterface是一个三元的BiFunction,承受key,value两个参数,返回新的value后果;如果旧的value不存在则以null代替进行计算;如果新的value为null则保障key对应节点不会存在。
(4)merge不加key是否存在的限度,提供的FunctionalInterface是一个三元的BiFunction,承受oldValue, newVALUE两个参数,返回merge后的value;如果旧的value不存在,间接以newVALUE作为最终后果,存在则返回merge后的后果;如果最终后果为null,则保障key对应节点不会存在。
2、何时会应用ReserveNode占位
如果指标bin的头节点为null,须要写入的话有两种伎俩:一种是生成好新的节点r后应用casTabAt(tab, i, null, r)原子操作,因为compare的值为null能够保障并发的平安;
另外一种形式是创立一个占位的ReserveNode,锁住该节点并将其CAS设置到bin的头节点,再进行进一步的原子计算操作;这两种方法都有可能在CAS的时候失败,须要自旋重复尝试。
(1)为什么只有computeIfAbsent/compute办法应用占位符的形式
computeIfPresent只有在BIN构造非空的状况下才会开展原子计算,天然不存在须要ReserveNode占位的状况;锁住已有的头节点即可。
computeIfAbsent/compute办法在BIN构造为空时,须要开展Function或者BiFunction的运算,这个操作是内部引入的须要耗时多久无奈精确评估;这种状况下如果采纳先计算,再casTabAt(tab, i, null, r)的形式,如果有其它线程提前更新了这个BIN,那么就须要从新锁定新退出的头节点,并反复一次原子计算(C13Map无奈帮你缓存上次计算的后果,因为计算的入参有可能会变动),这个开销是比拟大的。
而应用ReserveNode占位的形式无需等到原子计算出后果,能够第一工夫先抢占BIN的所有权,使其余并发的写线程阻塞。
(2)merge办法为何不须要占位
起因是如果BIN构造为空时,依据merge的解决策略,老的value为空则间接应用新的value代替,这样就省去了BiFunction中新老value进行merge的计算,这个耗费简直是没有的;因而能够应用casTabAt(tab, i, null, r)的形式间接批改,防止了应用ReserveNode占位,锁定该占位ReserveNode后再进行CAS批改的两次CAS无谓的开销。
C13Map的compute办法
public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { if (key == null || remappingFunction == null) throw new nullPointerException(); int h = spread(key.hashCode()); V val = null; int delta = 0; int binCount = 0; for (Node<K, V>[] tab = table; ; ) { Node<K, V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & h)) == null) { //创立占位Node Node<K, V> r = new ReservationNode<K, V>(); //先锁定该占位Node synchronized (r) { //将其设置到BIN的头节点 if (casTabAt(tab, i, null, r)) { binCount = 1; Node<K, V> node = null; try { //开始原子计算 if ((val = remappingFunction.apply(key, null)) != null) { delta = 1; node = new Node<K, V>(h, key, val, null); } } finally { //设置计算后的最终节点 setTabAt(tab, i, node); } } } if (binCount != 0) break; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { //此处省略对一般链表的变更操作 } else if (f instanceof TreeBin) { //此处省略对红黑树的变更操作 } } } } } if (delta != 0) addCount((long) delta, binCount); return val;}
3、如何保障原子性
computeIfAbsent/computeIfPresent中判空与计算是原子操作,根据上述剖析次要是通过casTabAt(tab, i, null, r)原子操作,或者应用ReserveNode占位并锁定的形式,或者锁住bin的头节点的形式来实现的。
也就是说整个bin始终处于锁定状态,在获取到指标KEY的value是否为空当前,其它线程无奈变更指标KEY的值,判空与计算天然是原子的。
而casTabAt(tab, i, null, r)是由硬件层面的原子指令来保障的,可能保障同一个内存区域在compare和set操作之间不会有任何其它指令对其进行变更。
八、resize过程中的并发transfer
C13Map中总共有三处中央会触发transfer办法的调用,别离是addCount、tryPresize、helpTransfer三个函数。
- addCount用于写操作实现后测验元素数量,如果超过了sizeCtl中的阈值,则触发resize扩容和旧表向新表的transfer。
- tryPresize是putAll一次性插入一个汇合前的自检,如果汇合数目较大,则事后触发一次resize扩容和旧表向新表的transfer。
- helpTransfer是写操作过程中发现bin的头节点是ForwardingNode, 则调用helpTransfer退出帮助搬运的行列。
1、开始transfer前的查看工作
以addCount中的查看逻辑为例:
addCount中的transfer查看
Node<K, V>[] tab, nt;int n, sc;//以后的tableSize曾经超过sizeCtl阈值,且小于最大值while (s >= (long) (sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); //曾经在搬运中 if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; //搬运线程数加一 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) //尚未搬运,以后线程是本次resize工作的第一个线程,设置初始值为2,十分奇妙的设计 transfer(tab, null); s = sumCount();}
多处利用了对变量sizeCtl的CAS操作,sizeCtl是一个全局控制变量。
参考下此变量的定义:private transient volatile int sizeCtl;
- 初始值是0示意哈希表尚未初始化
- 如果是-1示意正在初始化,只容许一个线程进入初始化代码块
- 初始化或者reSize胜利后,sizeCtl=loadFactor * tableSize也就是触发再次扩容的阈值,是一个正整数
- 在扩容过程中,sizeCtrl是一个负整数,其高16位是与以后的tableSize关联的邮戳resizeStamp,其低16位是以后从事搬运工作的线程数加1
在办法的循环体中每次都将table、sizeCtrl、nextTable赋给局部变量以保障读到的是以后的最新值,且保障逻辑计算过程中变量的稳固。
如果sizeCtrl中高16位的邮戳与以后tableSize不匹配,或者搬运线程数达到了最大值,或者所有搬运的线程都曾经退出(只有在遍历完所有槽位后才会退出,否则会始终循环),或者nextTable曾经被清空,跳过搬运操作。
如果满足搬运条件,则对sizeCtrl做CAS操作,sizeCtrl>=0时设置初始线程数为2,sizeCtrl<0时将其值加1,CAS胜利后开始搬运操作,失败则进入下一次循环从新判断。
首个线程设置初始值为2的起因是:线程退出时会通过CAS操作将参加搬运的总线程数-1,如果初始值依照惯例做法设置成1,那么减1后就会变为0。
此时其它线程发现线程数为0时,无奈辨别是没有任何线程做过搬运,还是有线程做完搬运但都退出了,也就无奈判断要不要退出搬运的行列。
值得注意的是,代码中的“sc == rs + 1 || sc == rs + MAX_RESIZERS“是JDK8中的显著的BUG,少了rs无符号左移16位的操作;JDK12曾经修复了此问题。
2、并发搬运过程和退出机制
C13Map的transfer办法
private final void transfer(Node<K, V>[] tab, Node<K, V>[] nextTab) { int n = tab.length, stride; //一次搬运多少个槽位 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; if (nextTab == null) { try { //首个搬运线程,负责初始化nextTable Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n << 1]; nextTab = nt; } catch (Throwable ex) { sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; //初始化以后搬运索引 transferIndex = n; } int nextn = nextTab.length; //公共的forwardingNode ForwardingNode<K, V> fwd = new ForwardingNode<K, V>(nextTab); boolean advance = true; boolean finishing = false; // 保障提交nextTable之前已遍历旧表的所有槽位 for (int i = 0, bound = 0; ; ) { Node<K, V> f; int fh; //循环CAS获取下一个搬运区段 while (advance) { int nextIndex, nextBound; //搬运已完结,或者以后区段尚未实现,退出循环体;最初一次抄底扫描时,仅辅助做i减一的运算 if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //并非最初一个退出的线程 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; //异样奇妙的设计,最初一个线程推出前将i回退到最高位,等于是强制做最初一次的全表扫描;程序间接执行后续的else if代码,看有没有哪个槽位漏掉了,或者说是否全副是forwardingNode标记; //能够视为抄底逻辑,尽管检测到漏掉槽位的概率根本是0 i = n; } } else if ((f = tabAt(tab, i)) == null) //空槽位间接打上forwardingNode标记,CAS失败下一次循环持续搬运该槽位,胜利则进入下一个槽位 advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) advance = true; //最初一次抄底遍历时,失常状况下所有的槽位应该都被打上forwardingNode标记 else { //锁定头节点 synchronized (f) { if (tabAt(tab, i) == f) { Node<K, V> ln, hn; if (fh >= 0) { //......此处省略链表搬运代码:职责是将链表拆成两份,搬运到nextTable的i和i+n槽位 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); //设置旧表对应槽位的头节点为forwardingNode setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { //......此处省略红黑树搬运代码:职责是将红黑树拆成两份,搬运到nextTable的i和i+n槽位,如果满足红黑树的进化条件,顺便将其进化为链表 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); //设置旧表对应槽位的头节点为forwardingNode setTabAt(tab, i, fwd); advance = true; } } } } }}
多个线程并发搬运时,如果是首个搬运线程,负责nextTable的初始化工作;而后借助于全局的transferIndex变量从以后table的n-1槽位开始顺次向低位扫描搬运,通过对transferIndex的CAS操作一次获取一个区段(默认是16),当transferIndex达到最低位时,不再可能获取到新的区段,线程开始退出,退出时会在sizeCtl上将总的线程数减一,最初一个退出的线程将扫描坐标i回退到最高位,强制做一次抄底的全局扫描。
3、transfer过程中的读写安全性剖析
(1)首先是transfer过程中是否有可能全局的哈希表table产生屡次resize,或者说存在过期的危险?
察看nextTable提交到table的代码,发现只有在所有线程均搬运结束退出后才会commit,所以凡是有一个线程在transfer代码块中,table都不可能被替换;所以不存在table过期的危险。
(2)有并发的写操作时,是否存在平安危险?
因为transfer操作与写操作都要竞争bin的头节点的syncronized锁,两者是互斥串行的;当写线程失去锁后,还要做doubleCheck,发现不是一开始的头节点时什么事件都不会做,发现是forwardingNode,就会退出搬运行列直到新表被提交,而后去间接操作新表。
nextTable的提交总是在所有的槽位都曾经搬运结束,插上ForwardingNode的标识之后的,因而只有新表已提交,旧表必然无奈写入;这样就可能无效的防止数据写入旧表。
推理:获取到bin头节点的同步锁开始写操作----------> transfer必然未实现--------->新表必然未提交-------→写入的必然是以后表。
也就说永远不可能存在新旧两张表同时被写入的状况,table被写入时nextTable永远都只能被读取。
(3)有并发的读操作时,是否存在平安危险?
transfer操作并不毁坏旧的bin构造,如果尚未开始搬运,将会照常遍历旧的BIN构造;如果已搬运结束,会调用到forwadingNode的find办法到新表中递归查问,参考上文中的forwadingNode介绍。
九、Traverser遍历器
因为iterator或containsValue等通用API的存在,以及某些业务场景的确须要遍历整个Map,设计一种平安且有性能保障的遍历机制显得天经地义。
C13Map遍历器实现的难点在于读操作与transfer可能并行,在扫描各个bin时如果遇到forwadingNode该如何解决的问题。
因为并发transfer机制的存在,在某个槽位上遇到了forwadingNode,仅表明以后槽位已被搬运,并不能代表其后的槽位肯定被搬运或者尚未被搬运;也就是说其后的若干槽位是一个不可控的状态。
解决办法是引入了相似于办法调用堆栈的机制,在跳转到nextTable时记录下以后table和曾经到达的槽位并进行入栈操作,而后开始遍历下一个table的i和i+n槽位,如果遇到forwadingNode再一次入栈,周而复始周而复始;
每次如果i+n槽位如果到了右半段快要溢出的话就会遵循原来的入栈规定进行出栈,也就是回到上一个上下文节点,最终会回到初始的table也就是initialTable中的节点。
C13Map的Traverser组件
static class Traverser<K,V> { Node<K,V>[] tab; // current table; updated if resized Node<K,V> next; // the next entry to use TableStack<K,V> stack, spare; // to save/restore on ForwardingNodes int index; // index of bin to use next int baseIndex; // current index of initial table int baseLimit; // index bound for initial table final int baseSize; // initial table size Traverser(Node<K,V>[] tab, int size, int index, int limit) { this.tab = tab; this.baseSize = size; this.baseIndex = this.index = index; this.baseLimit = limit; this.next = null; } /** * 返回下一个节点 */ final Node<K,V> advance() { Node<K,V> e; if ((e = next) != null) e = e.next; for (;;) { Node<K,V>[] t; int i, n; // 局部变量保障稳定性 if (e != null) return next = e; if (baseIndex >= baseLimit || (t = tab) == null || (n = t.length) <= (i = index) || i < 0) return next = null; if ((e = tabAt(t, i)) != null && e.hash < 0) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; e = null; pushState(t, i, n); continue; } else if (e instanceof TreeBin) e = ((TreeBin<K,V>)e).first; else e = null; } //以后如果有跳转堆栈间接回放 if (stack != null) recoverState(n); //没有跳转堆栈阐明曾经到initalTable else if ((index = i + baseSize) >= n) index = ++baseIndex; // visit upper slots if present } } /** * 遇到ForwardingNode时保留以后上下文 */ private void pushState(Node<K,V>[] t, int i, int n) { TableStack<K,V> s = spare; // reuse if possible if (s != null) spare = s.next; else s = new TableStack<K,V>(); s.tab = t; s.length = n; s.index = i; s.next = stack; stack = s; } /** * 弹出上下文 * */ private void recoverState(int n) { TableStack<K,V> s; int len; //如果以后有堆栈,且index曾经达到右半段后溢出以后table,阐明该回去了 //如果index还在左半段,则只辅助做index+=s.length操作 while ((s = stack) != null && (index += (len = s.length)) >= n) { n = len; index = s.index; tab = s.tab; s.tab = null; TableStack<K,V> next = s.next; s.next = spare; // save for reuse stack = next; spare = s; } //曾经到initialTable,索引自增 if (s == null && (index += baseSize) >= n) index = ++baseIndex; }}
假如在整个遍历过程中初始表initalTable=table1,遍历到完结时最大的表为table5,也就是在遍历过程中经验了四次扩容,属于一边遍历一边扩容的最简单场景;
那么整个遍历过程就是一个以初始化表initalTable为基准表,以下一张表的i和i+n槽位为forwadingNode的跳转指标,相似于粒子裂变个别的从最低表向最高表喷射的过程;
traverser并不能保障肯定遍历某张表的所有的槽位,但如果假如低阶表的某个槽位在最高阶表总是有相应的投影,比方table1的一个节点在table5中就会对应16个投影;
traverser可能保障一次遍历的所有槽位在最高阶表上的投影,能够布满整张最高阶表,而不会有任何脱漏。
十、并发计数
与HashMap中间接定义了size字段相似,获取元素的totalCount在C13MAP中必定不会去遍历残缺的数据结构;那样元素较多时性能会十分差,C13MAP设计了CounterCell[]数组来解决并发计数的问题。
CounterCell[]机制并不理睬新旧table的更迭,不论是操作的新表还是旧表,对于计数而言没有实质的差别,CounterCell[]只关注总量的减少或缩小。
1、从LongAdder到CounterCell内存对齐
C13MAP借鉴了JUC中LongAdder和Striped64的计数机制,有大量代码与LongAdder和Striped64是反复的,其核心思想是多核环境下对于64位long型数据的计数操作,尽管借助于volatile和CAS操作可能保障并发的安全性,然而因为多核操作的是同一内存区域,而每个CPU又有本人的本地cache,例如LV1 Cache,LVL2 Cache,寄存器等。
因为内存一致性协定MESI的存在,会导致本地Cache的频繁刷新影响性能,一个比拟好的解决思路是每个CPU只操作固定的一块内存对齐区域,最终采纳求和的形式来计数。
这种形式能进步性能,然而并非所有场景都实用,因为其最终的value是求和估算进去的,CounterCell累加求和的过程并非原子,不能代表某个时刻的精准value,所以像compareAndSet这样的原子操作就无奈反对。
2、CounterCell[] 、cellBusy、baseCount的作用
CounterCell[]中寄存2的指数幂个CounterCell,并发操作期间有可能会扩容,每次扩容都是原有size的两倍,一旦超过了CPU的核数即不再扩容,因为CPU的总数通常也是2的指数幂,所以其size往往等于CPU的核数CounterCell[]初始化、扩容、填充元素时,借助cellBusy其进行spinLock管制baseCount是根底数据。
在并发量不那么大,CAS没有呈现失败时间接基于baseCount变量做计数;一旦呈现CAS失败,阐明有并发抵触,就开始思考CounterCell[]的初始化或者扩容操作,但在初始化未实现时,还是会将其视为抄底计划进行计数。
所以最终的技术总和=baseCount+所有CounterCell中的value。
C13Map的addCount办法
private final void addCount(long x, int check) { CounterCell[] cs; long b, s; //初始时总是间接对baseCount计数,直到呈现第一次失败,或者曾经有现成的CounterCell[]数组可用 if ((cs = counterCells) != null || !U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell c; long v; int m; //是否存在竞态,为true时示意无竞态 boolean uncontended = true; if (cs == null || (m = cs.length - 1) < 0 || //学生成随机数再对CounterCell[]数组size求余,也就是随机调配到其中某个槽位 (c = cs[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) { //该槽位尚未初始化或者CAS操作又呈现竞态 fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } //检测元素总数是否超过sizeCtl阈值 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT; if (sc < 0) { if (sc == rs + MAX_RESIZERS || sc == rs + 1 || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSetInt(this, SIZECTL, sc, rs + 2)) transfer(tab, null); s = sumCount(); } }}
其中ThreadLocalRandom是线程上下文内的随机数生成器,能够不受其它线程的影响,进步随机数生成的性能;总是在CAS失败当前,也就是明确感知到存在多线程的竞争的前提下,才会对CounterCell[]进行初始化或者扩容操作。
C13Map的fullAddCount办法
//残缺的计数,与LongAdder的代码根本雷同private final void fullAddCount(long x, boolean wasUncontended) { int h; if ((h = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); // force initialization h = ThreadLocalRandom.getProbe(); wasUncontended = true; } boolean collide = false; // 是否有新的抵触 for (;;) { CounterCell[] cs; CounterCell c; int n; long v; if ((cs = counterCells) != null && (n = cs.length) > 0) { if ((c = cs[(n - 1) & h]) == null) { //随机匹配的槽位尚未有CounterCell元素则初始化之 if (cellsBusy == 0) { // Try to attach new Cell CounterCell r = new CounterCell(x); // Optimistic create if (cellsBusy == 0 && U.compareAndSetInt(this, CELLSBUSY, 0, 1)) { boolean created = false; try { // Recheck under lock CounterCell[] rs; int m, j; if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (!wasUncontended) wasUncontended = true; //fullAddCount前曾经存在cas失败但并不立刻扩容,从新生成一个随机数进行CAS重试 else if (U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x)) break; else if (counterCells != cs || n >= NCPU) collide = false; // 超过CPU的最大核数,或者检测到counterCells已扩容,都将抵触状态置为无 else if (!collide) collide = true; // 以上的若干条件都不满足,能够断定必然有抵触,再生成一个随机数试探一下 else if (cellsBusy == 0 && U.compareAndSetInt(this, CELLSBUSY, 0, 1)) { try { if (counterCells == cs) //对counterCells进行doubleCheck counterCells = Arrays.copyOf(cs, n << 1); //扩容,容量翻倍 } finally { cellsBusy = 0; } collide = false; continue; // 对性的counterCell[]进行重试CAS操作 } h = ThreadLocalRandom.advanceProbe(h); //以旧的随机数为基数生成一个新的随机数 } else if (cellsBusy == 0 && counterCells == cs && U.compareAndSetInt(this, CELLSBUSY, 0, 1)) { //第一次初始化工作,初始的数组大小为2 boolean init = false; try { // Initialize table if (counterCells == cs) { CounterCell[] rs = new CounterCell[2]; rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } //初始化过程中其它线程的抄底计划 else if (U.compareAndSetLong(this, BASECOUNT, v = baseCount, v + x)) break; }}
循环生成新的随机数匹配到新的槽位进行CAS的计数操作,呈现CAS失败后并不急于扩容;而是总是在间断呈现CAS失败的状况才会尝试扩容。
CounterCell[]的整体计划绝对独立,与C13Map的关系并不大,能够视为一种成熟的高性能技术计划在各个场景应用。
十一、与stream相似的bulk操作反对
1、bulkTask类的子类
所有的批量工作执行类均为bulkTask的子类, bulkTask内置了与traverser相似的实现,用以反对对C13Map的遍历;同时它也是ForkJoinTask的子类,反对以fork/join的形式来实现各种批量工作的执行。
因为ForkJoinTask并非本文的重点,这里仅列出几种有代表性的批量办法,以及相应的的task实现。
2、几种有代表性的批量办法
C13Map的批量工作
//将所有的entry依照transformer函数进行二元计算,再对所有生成的后果执行action一元函数public <U> void forEach(long parallelismThreshold, BiFunction<? super K, ? super V, ? extends U> transformer, Consumer<? super U> action); //对所有的entry执行searchFunction二元计算,一旦发现任意一个计算结果不为null,即全盘返回public <U> U search(long parallelismThreshold, BiFunction<? super K, ? super V, ? extends U> searchFunction); //对所有的entry执行transformer二元计算,再对所有的后果执行reducer收敛函数public <U> U reduce(long parallelismThreshold, BiFunction<? super K, ? super V, ? extends U> transformer, BiFunction<? super U, ? super U, ? extends U> reducer) //对所有的entry中的value执行transformer二元计算,再对所有的后果执行reducer收敛函数public <U> U reduceValues(long parallelismThreshold, Function<? super V, ? extends U> transformer, BiFunction<? super U, ? super U, ? extends U> reducer)
以上所有的批量办法都有惟一与其对应的批量task执行类,背地均是基于fork/join思维实现。
3、批量task的实现
以2中列出的reduce办法所对应的MapReduceMappingsTask为例,无关fork/join中的实现细节不属于本文的领域,不做具体探讨。
C13Map的MapReduceMappingsTask
static final class MapReduceMappingsTask<K,V,U> extends BulkTask<K,V,U> { final BiFunction<? super K, ? super V, ? extends U> transformer; final BiFunction<? super U, ? super U, ? extends U> reducer; U result; MapReduceMappingsTask<K,V,U> rights, nextRight; MapReduceMappingsTask (BulkTask<K,V,?> p, int b, int i, int f, Node<K,V>[] t, MapReduceMappingsTask<K,V,U> nextRight, BiFunction<? super K, ? super V, ? extends U> transformer, BiFunction<? super U, ? super U, ? extends U> reducer) { super(p, b, i, f, t); this.nextRight = nextRight; this.transformer = transformer; this.reducer = reducer; } public final U getRawResult() { return result; } public final void compute() { final BiFunction<? super K, ? super V, ? extends U> transformer; final BiFunction<? super U, ? super U, ? extends U> reducer; if ((transformer = this.transformer) != null && (reducer = this.reducer) != null) { for (int i = baseIndex, f, h; batch > 0 && (h = ((f = baseLimit) + i) >>> 1) > i;) { addToPendingCount(1); //裂变出新的fork-join工作 (rights = new MapReduceMappingsTask<K,V,U> (this, batch >>>= 1, baseLimit = h, f, tab, rights, transformer, reducer)).fork(); } U r = null; //遍历本batch元素 for (Node<K,V> p; (p = advance()) != null; ) { U u; //对本batch做reduce收敛操作 if ((u = transformer.apply(p.key, p.val)) != null) r = (r == null) ? u : reducer.apply(r, u); } //对本人和本人fork出的子工作做reducer收敛操作 result = r; CountedCompleter<?> c; for (c = firstComplete(); c != null; c = c.nextComplete()) { @SuppressWarnings("unchecked") MapReduceMappingsTask<K,V,U> t = (MapReduceMappingsTask<K,V,U>)c, s = t.rights; while (s != null) { U tr, sr; if ((sr = s.result) != null) t.result = (((tr = t.result) == null) ? sr : reducer.apply(tr, sr)); s = t.rights = s.nextRight; } } } }}
十二、小结
自JDK8开始C13Map摒弃了JDK7中的Segment段实现计划,将锁的粒度细化到了每个bin上,锁的粒度更小并发能力更强。用syncronized关键字代替原先的ReentrantLock互斥锁,因JDK8中对syncronized做了大量优化,能够达到比ReentrantLock更优的性能。
引入并发transfer的机制反对多线程搬运,写操作和transfer操作在不同bin上可并行。引入ForwardingNode反对读操作和transfer并行,并进一步反对transfer过程有可能存在的哈希表链的遍历。引入ReserveNode在compute原子计算可能耗时较长的状况下领先占位,防止反复计算。
引入红黑树来优化哈希抵触时的检索性能,其外部实现了轻量级的读写锁保障读写平安,在线性检索和tree检索之间做了智能切换,达到了性能与平安的极佳的均衡。引入CounterCell机制优化多核场景的计数,解决内存伪共享问题。
引入 ForkJoinTask的子类优化bulk计算时的性能。整个C13Map的实现过程大量应用volatile保障可见,应用CAS保障原子,是一种部分无锁的lockFree dataStructure的榜样实现。
与HashMap的单线程读写操作不同的是,HashMap读到的数据在下一次写操作间是始终稳固的,在多个写操作之间是一个稳固的snapshot,而C13Map因为并发线程的存在,数据瞬息万变,读到的永远只是某个工夫点的正确数据,写入胜利也只是在某个工夫点保障写入是平安的,因而C13Map通常只谈平安而不谈实时,这极大进步了编程的难度,也是单线程和并发数据结构之间的显著差别。