共计 22167 个字符,预计需要花费 56 分钟才能阅读完成。
并发编程实际中,ConcurrentHashMap 是一个常常被应用的数据结构,相比于 Hashtable 以及 Collections.synchronizedMap(),ConcurrentHashMap 在线程平安的根底上提供了更好的写并发能力,但同时升高了对读一致性的要求(这点如同 CAP 实践啊 O(∩_∩)O)。ConcurrentHashMap 的设计与实现十分精美,大量的利用了 volatile,final,CAS 等 lock-free 技术来缩小锁竞争对于性能的影响,无论对于 Java 并发编程的学习还是 Java 内存模型的了解,ConcurrentHashMap 的设计以及源码都值得十分认真的浏览与琢磨。
这篇日志记录了本人对 ConcurrentHashMap 的一些总结,因为 JDK6,7,8 中实现都不同,须要离开论述在不同版本中的 ConcurrentHashMap。
本文从源码登程,筛选集体感觉重要的点(会用红色标注)再次进行回顾,以及论述 ConcurrentHashMap 的一些留神点。
1. JDK6 与 JDK7 中的实现
1.1 设计思路
ConcurrentHashMap 采纳了分段锁的设计,只有在同一个分段内才存在竞态关系,不同的分段锁之间没有锁竞争。相比于对整个 Map 加锁的设计,分段锁大大的进步了高并发环境下的解决能力。但同时,因为不是对整个 Map 加锁,导致一些须要扫描整个 Map 的办法(如 size(), containsValue())须要应用非凡的实现,另外一些办法(如 clear())甚至放弃了对一致性的要求(ConcurrentHashMap 是弱一致性的,具体请查看 ConcurrentHashMap 能齐全代替 HashTable 吗?)。
ConcurrentHashMap 中的分段锁称为 Segment,它即相似于 HashMap(JDK7 与 JDK8 中 HashMap 的实现)的构造,即外部领有一个 Entry 数组,数组中的每个元素又是一个链表;同时又是一个 ReentrantLock(Segment 继承了 ReentrantLock)。ConcurrentHashMap 中的 HashEntry 绝对于 HashMap 中的 Entry 有肯定的差异性:HashEntry 中的 value 以及 next 都被 volatile 润饰,这样在多线程读写过程中可能放弃它们的可见性,代码如下:
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
}
1.2 并发度(Concurrency Level)
并发度能够了解为程序运行时可能同时更新 ConccurentHashMap 且不产生锁竞争的最大线程数,实际上就是 ConcurrentHashMap 中的分段锁个数,即 Segment[]的数组长度。ConcurrentHashMap 默认的并发度为 16,但用户也能够在构造函数中设置并发度。当用户设置并发度时,ConcurrentHashMap 会应用大于等于该值的最小 2 幂指数作为理论并发度(如果用户设置并发度为 17,理论并发度则为 32)。运行时通过将 key 的高 n 位(n = 32 – segmentShift)和并发度减 1(segmentMask)做位与运算定位到所在的 Segment。segmentShift 与 segmentMask 都是在结构过程中依据 concurrency level 被相应的计算出来。
如果并发度设置的过小,会带来重大的锁竞争问题;如果并发度设置的过大,本来位于同一个 Segment 内的拜访会扩散到不同的 Segment 中,CPU cache 命中率会降落,从而引起程序性能降落。(文档的说法是依据你并发的线程数量决定,太多会导性能升高)
1.3 创立分段锁
和 JDK6 不同,JDK7 中除了第一个 Segment 之外,残余的 Segments 采纳的是提早初始化的机制:每次 put 之前都须要查看 key 对应的 Segment 是否为 null,如果是则调用 ensureSegment()以确保对应的 Segment 被创立。
ensureSegment 可能在并发环境下被调用,但与设想中不同,ensureSegment 并未应用锁来管制竞争,而是应用了 Unsafe 对象的 getObjectVolatile()提供的原子读语义联合 CAS 来确保 Segment 创立的原子性。代码段如下:
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
1.4 put/putIfAbsent/putAll
和 JDK6 一样,ConcurrentHashMap 的 put 办法被代理到了对应的 Segment(定位 Segment 的原理之前曾经形容过)中。与 JDK6 不同的是,JDK7 版本的 ConcurrentHashMap 在取得 Segment 锁的过程中,做了肯定的优化 – 在真正申请锁之前,put 办法会通过 tryLock()办法尝试取得锁,在尝试取得锁的过程中会对对应 hashcode 的链表进行遍历,如果遍历结束依然找不到与 key 雷同的 HashEntry 节点,则为后续的 put 操作提前创立一个 HashEntry。当 tryLock 肯定次数后仍无奈取得锁,则通过 lock 申请锁。
须要留神的是,因为在并发环境下,其余线程的 put,rehash 或者 remove 操作可能会导致链表头结点的变动,因而在过程中须要进行查看,如果头结点发生变化则从新对表进行遍历。而如果其余线程引起了链表中的某个节点被删除,即便该变动因为是非原子写操作(删除节点后链接后续节点调用的是 Unsafe.putOrderedObject(),该办法不提供原子写语义)可能导致以后线程无奈察看到,但因为不影响遍历的正确性所以忽略不计。
之所以在获取锁的过程中对整个链表进行遍历,次要目标是心愿遍历的链表被 CPU cache 所缓存,为后续理论 put 过程中的链表遍历操作晋升性能。
在取得锁之后,Segment 对链表进行遍历,如果某个 HashEntry 节点具备雷同的 key,则更新该 HashEntry 的 value 值,否则新建一个 HashEntry 节点,将它设置为链表的新 head 节点并将原头节点设为新 head 的下一个节点。新建过程中如果节点总数(含新建的 HashEntry)超过 threshold,则调用 rehash()办法对 Segment 进行扩容,最初将新建 HashEntry 写入到数组中。
put 办法中,链接新节点的下一个节点(HashEntry.setNext())以及将链表写入到数组中(setEntryAt())都是通过 Unsafe 的 putOrderedObject()办法来实现,这里并未应用具备原子写语义的 putObjectVolatile()的起因是:JMM 会保障取得锁到开释锁之间所有对象的状态更新都会在锁被开释之后更新到主存,从而保障这些变更对其余线程是可见的。
1.5 rehash
绝对于 HashMap 的 resize,ConcurrentHashMap 的 rehash 原理相似,然而 Doug Lea 为 rehash 做了肯定的优化,防止让所有的节点都进行复制操作:因为扩容是基于 2 的幂指来操作,假如扩容前某 HashEntry 对应到 Segment 中数组的 index 为 i,数组的容量为 capacity,那么扩容后该 HashEntry 对应到新数组中的 index 只可能为 i 或者 i +capacity,因而大多数 HashEntry 节点在扩容前后 index 能够放弃不变。基于此,rehash 办法中会定位第一个后续所有节点在扩容后 index 都放弃不变的节点,而后将这个节点之前的所有节点重排即可。这部分代码如下:
private void rehash(HashEntry<K,V> node) {HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
1.6 remove
和 put 相似,remove 在真正取得锁之前,也会对链表进行遍历以进步缓存命中率。
1.7 get 与 containsKey
get 与 containsKey 两个办法简直完全一致:他们都没有应用锁,而是通过 Unsafe 对象的 getObjectVolatile()办法提供的原子读语义,来取得 Segment 以及对应的链表,而后对链表遍历判断是否存在 key 雷同的节点以及取得该节点的 value。但因为遍历过程中其余线程可能对链表构造做了调整,因而 get 和 containsKey 返回的可能是过期的数据,这一点是 ConcurrentHashMap 在弱一致性上的体现。如果要求强一致性,那么必须应用 Collections.synchronizedMap()办法。
1.8 size、containsValue
这些办法都是基于整个 ConcurrentHashMap 来进行操作的,他们的原理也根本相似:首先不加锁循环执行以下操作:循环所有的 Segment(通过 Unsafe 的 getObjectVolatile()以保障原子读语义),取得对应的值以及所有 Segment 的 modcount 之和。如果间断两次所有 Segment 的 modcount 和相等,则过程中没有产生其余线程批改 ConcurrentHashMap 的状况,返回取得的值。
当循环次数超过预约义的值时,这时须要对所有的 Segment 顺次进行加锁,获取返回值后再顺次解锁。值得注意的是,加锁过程中要强制创立所有的 Segment,否则容易呈现其余线程创立 Segment 并进行 put,remove 等操作。代码如下:
for(int j =0; j < segments.length; ++j)
ensureSegment(j).lock();// force creation
一般来说,应该防止在多线程环境下应用 size 和 containsValue 办法。
注 1:modcount 在 put, replace, remove 以及 clear 等办法中都会被批改。
注 2:对于 containsValue 办法来说,如果在循环过程中发现匹配 value 的 HashEntry,则间接返回 true。
最初,与 HashMap 不同的是,ConcurrentHashMap 并不容许 key 或者 value 为 null,依照 Doug Lea 的说法,这么设计的起因是在 ConcurrentHashMap 中,一旦 value 呈现 null,则代表 HashEntry 的 key/value 没有映射实现就被其余线程所见,须要非凡解决。在 JDK6 中,get 办法的实现中就有一段对 HashEntry.value == null 的防御性判断。但 Doug Lea 也抵赖理论运行过程中,这种状况仿佛不可能产生(参考:http://cs.oswego.edu/pipermai…)。
2. JDK8 中的实现
ConcurrentHashMap 在 JDK8 中进行了微小改变,很须要通过源码来再次学习下 Doug Lea 的实现办法。
它摒弃了 Segment(锁段)的概念,而是启用了一种全新的形式实现, 利用 CAS 算法。它沿用了与它同期间的 HashMap 版本的思维,底层仍然由“数组”+ 链表 + 红黑树的形式思维(JDK7 与 JDK8 中 HashMap 的实现),然而为了做到并发,又减少了很多辅助的类,例如 TreeBin,Traverser 等对象外部类。
2.1 重要的属性
首先来看几个重要的属性,与 HashMap 雷同的就不再介绍了,这里重点解释一下 sizeCtl 这个属性。能够说它是 ConcurrentHashMap 中出镜率很高的一个属性,因为它是一个管制标识符,在不同的中央有不同用处,而且它的取值不同,也代表不同的含意。
- 正数代表正在进行初始化或扩容操作
- - 1 代表正在初始化
- -N 示意有 N - 1 个线程正在进行扩容操作
- 负数或 0 代表 hash 表还没有被初始化,这个数值示意初始化或下一次进行扩容的大小,这一点相似于扩容阈值的概念。还前面能够看到,它的值始终是以后 ConcurrentHashMap 容量的 0.75 倍,这与 loadfactor 是对应的。
/**
* 盛装 Node 元素的数组 它的大小是 2 的整数次幂
* Size is always a power of two. Accessed directly by iterators.
*/
transient volatile Node<K,V>[] table;
/**
* Table initialization and resizing control. When negative, the
* table is being initialized or resized: -1 for initialization,
* else -(1 + the number of active resizing threads). Otherwise,
* when table is null, holds the initial table size to use upon
* creation, or 0 for default. After initialization, holds the
* next element count value upon which to resize the table.
hash 表初始化或扩容时的一个管制位标识量。正数代表正在进行初始化或扩容操作
- 1 代表正在初始化
-N 示意有 N - 1 个线程正在进行扩容操作
负数或 0 代表 hash 表还没有被初始化,这个数值示意初始化或下一次进行扩容的大小
*/
private transient volatile int sizeCtl;
// 以下两个是用来管制扩容的时候 单线程进入的变量
/**
* The number of bits used for generation stamp in sizeCtl.
* Must be at least 6 for 32bit arrays.
*/
private static int RESIZE_STAMP_BITS = 16;
/**
* The bit shift for recording size stamp in sizeCtl.
*/
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
/*
* Encodings for Node hash fields. See above for explanation.
*/
static final int MOVED = -1; // hash 值是 -1,示意这是一个 forwardNode 节点
static final int TREEBIN = -2; // hash 值是 -2 示意这时一个 TreeBin 节点
2.2 重要的类
2.2.1 Node
Node 是最外围的外部类,它包装了 key-value 键值对,所有插入 ConcurrentHashMap 的数据都包装在这外面。它与 HashMap 中的定义很类似,然而然而有一些差异它对 value 和 next 属性设置了 volatile 同步锁 (与 JDK7 的 Segment 雷同),它不容许调用 setValue 办法间接扭转 Node 的 value 域,它减少了 find 办法辅助 map.get() 办法。
2.2.2 TreeNode
树节点类,另外一个外围的数据结构。当链表长度过长的时候,会转换为 TreeNode。然而与 HashMap 不雷同的是,它并不是间接转换为红黑树,而是把这些结点包装成 TreeNode 放在 TreeBin 对象中,由 TreeBin 实现对红黑树的包装。而且 TreeNode 在 ConcurrentHashMap 集成自 Node 类,而并非 HashMap 中的集成自 LinkedHashMap.Entry<K,V> 类,也就是说 TreeNode 带有 next 指针,这样做的目标是不便基于 TreeBin 的拜访。
2.2.3 TreeBin
这个类并不负责包装用户的 key、value 信息,而是包装的很多 TreeNode 节点。它代替了 TreeNode 的根节点,也就是说在理论的 ConcurrentHashMap“数组”中,寄存的是 TreeBin 对象,而不是 TreeNode 对象,这是与 HashMap 的区别。另外这个类还带有了读写锁。
这里仅贴出它的构造方法。能够看到在结构 TreeBin 节点时,仅仅指定了它的 hash 值为 TREEBIN 常量,这也就是个标识为。同时也看到咱们相熟的红黑树构造方法
2.2.4 ForwardingNode
一个用于连贯两个 table 的节点类。它蕴含一个 nextTable 指针,用于指向下一张表。而且这个节点的 key value next 指针全副为 null,它的 hash 值为 -1. 这外面定义的 find 的办法是从 nextTable 里进行查问节点,而不是以本身为头节点进行查找。
/**
* A node inserted at head of bins during transfer operations.
*/
static final class ForwardingNode<K,V> extends Node<K,V> {final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {super(MOVED, null, null, null);
this.nextTable = tab;
}
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {if (e instanceof ForwardingNode) {tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
}
2.3 Unsafe 与 CAS
在 ConcurrentHashMap 中,随处能够看到 U, 大量应用了 U.compareAndSwapXXX 的办法,这个办法是利用一个 CAS 算法实现无锁化的批改值的操作,他能够大大降低锁代理的性能耗费。这个算法的根本思维就是一直地去比拟以后内存中的变量值与你指定的一个变量值是否相等,如果相等,则承受你指定的批改的值,否则回绝你的操作。因为以后线程中的值曾经不是最新的值,你的批改很可能会笼罩掉其余线程批改的后果。这一点与乐观锁,SVN 的思维是比拟相似的。
2.3.1 unsafe 动态块
unsafe 代码块管制了一些属性的批改工作,比方最罕用的 SIZECTL。在这一版本的 concurrentHashMap 中,大量利用来的 CAS 办法进行变量、属性的批改工作。利用 CAS 进行无锁操作,能够大大提高性能。
private static final sun.misc.Unsafe U;
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long BASECOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final long ABASE;
private static final int ASHIFT;
static {
try {U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentHashMap.class;
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
TRANSFERINDEX = U.objectFieldOffset
(k.getDeclaredField("transferIndex"));
BASECOUNT = U.objectFieldOffset
(k.getDeclaredField("baseCount"));
CELLSBUSY = U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));
Class<?> ck = CounterCell.class;
CELLVALUE = U.objectFieldOffset
(ck.getDeclaredField("value"));
Class<?> ak = Node[].class;
ABASE = U.arrayBaseOffset(ak);
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {throw new Error(e);
}
}
2.3.2 三个外围办法
ConcurrentHashMap 定义了三个原子操作,用于对指定地位的节点进行操作。正是这些原子操作保障了 ConcurrentHashMap 的线程平安。
// 取得在 i 地位上的 Node 节点
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
// 利用 CAS 算法设置 i 地位上的 Node 节点。之所以能实现并发是因为他指定了原来这个节点的值是多少
// 在 CAS 算法中,会比拟内存中的值与你指定的这个值是否相等,如果相等才承受你的批改,否则回绝你的批改
// 因而以后线程中的值并不是最新的值,这种批改可能会笼罩掉其余线程的批改后果 有点相似于 SVN
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
// 利用 volatile 办法设置节点地位的值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
2.4 初始化办法 initTable
对于 ConcurrentHashMap 来说,调用它的构造方法仅仅是设置了一些参数而已。而整个 table 的初始化是在向 ConcurrentHashMap 中插入元素的时候产生的。如调用 put、computeIfAbsent、compute、merge 等办法的时候,调用机会是查看 table==null。
初始化办法次要利用了要害属性 sizeCtl 如果这个值〈0,示意其余线程正在进行初始化,就放弃这个操作。在这也能够看出 ConcurrentHashMap 的初始化只能由一个线程实现。如果取得了初始化权限,就用 CAS 办法将 sizeCtl 置为 -1,避免其余线程进入。初始化数组后,将 sizeCtl 的值改为 0.75*n。
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//sizeCtl 示意有其余线程正在进行初始化操作,把线程挂起。对于 table 的初始化工作,只能有一个线程在进行。if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {// 利用 CAS 办法把 sizectl 的值置为 -1 示意本线程正在进行初始化
try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);// 相当于 0.75*n 设置一个扩容的阈值
}
} finally {sizeCtl = sc;}
break;
}
}
return tab;
}
2.5 扩容办法 transfer
当 ConcurrentHashMap 容量有余的时候,须要对 table 进行扩容。这个办法的根本思维跟 HashMap 是很像的,然而因为它是反对并发扩容的,所以要简单的多。起因是它反对多线程进行扩容操作,而并没有加锁。我想这样做的目标不仅仅是为了满足 concurrent 的要求,而是心愿利用并发解决去缩小扩容带来的工夫影响。因为在扩容的时候,总是会波及到从一个“数组”到另一个“数组”拷贝的操作,如果这个操作可能并发进行,那真真是极好的了。
整个扩容操作分为两个局部
- 第一局部是构建一个 nextTable, 它的容量是原来的两倍,这个操作是单线程实现的。这个单线程的保障是通过 RESIZE_STAMP_SHIFT 这个常量通过一次运算来保障的,这个中央在前面会有提到;
- 第二个局部就是将原来 table 中的元素复制到 nextTable 中,这里容许多线程进行操作。
先来看一下单线程是如何实现的:
它的大体思维就是遍历、复制的过程。首先依据运算失去须要遍历的次数 i,而后利用 tabAt 办法取得 i 地位的元素:
- 如果这个地位为空,就在原 table 中的 i 地位放入 forwardNode 节点,这个也是触发并发扩容的关键点;
- 如果这个地位是 Node 节点(fh>=0),如果它是一个链表的头节点,就结构一个反序链表,把他们别离放在 nextTable 的 i 和 i + n 的地位上
- 如果这个地位是 TreeBin 节点(fh<0),也做一个反序解决,并且判断是否须要 untreefi,把解决的后果别离放在 nextTable 的 i 和 i + n 的地位上
- 遍历过所有的节点当前就实现了复制工作,这时让 nextTable 作为新的 table,并且更新 sizeCtl 为新容量的 0.75 倍,实现扩容。
再看一下多线程是如何实现的:
在代码的 69 行有一个判断,如果遍历到的节点是 forward 节点,就向后持续遍历,再加上给节点上锁的机制,就实现了多线程的管制。多线程遍历节点,解决了一个节点,就把对应点的值 set 为 forward,另一个线程看到 forward,就向后遍历。这样穿插就实现了复制工作。而且还很好的解决了线程平安的问题。这个办法的设计切实是让我膜拜。
/**
* 一个过渡的 table 表 只有在扩容的时候才会应用
*/
private transient volatile Node<K,V>[] nextTable;
/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];// 结构一个 nextTable 对象 它的容量是原来的两倍
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);// 结构一个连节点指针 用于标记位
boolean advance = true;// 并发扩容的要害属性 如果等于 true 阐明这个节点曾经解决过
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 这个 while 循环体的作用就是在管制 i -- 通过 i -- 能够顺次遍历原 hash 表中的节点
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
// 如果所有的节点都曾经实现复制工作 就把 nextTable 赋值给 table 清空长期对象 nextTable
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);// 扩容阈值设置为原来容量的 1.5 倍 仍然相当于当初容量的 0.75 倍
return;
}
// 利用 CAS 办法更新这个扩容阈值,在这外面 sizectl 值减一,阐明新退出一个线程参加到扩容操作
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 如果遍历到的节点为空 则放入 ForwardingNode 指针
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 如果遍历到 ForwardingNode 节点 阐明这个点曾经被解决过了 间接跳过 这里是管制并发扩容的外围
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 节点上锁
synchronized (f) {if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 如果 fh>=0 证实这是一个 Node 节点
if (fh >= 0) {
int runBit = fh & n;
// 以下的局部在实现的工作是结构两个链表 一个是原链表 另一个是原链表的反序排列
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 在 nextTable 的 i 地位上插入一个链表
setTabAt(nextTab, i, ln);
// 在 nextTable 的 i + n 的地位上插入另一个链表
setTabAt(nextTab, i + n, hn);
// 在 table 的 i 地位上插入 forwardNode 节点 示意曾经解决过该节点
setTabAt(tab, i, fwd);
// 设置 advance 为 true 返回到下面的 while 循环中 就能够执行 i -- 操作
advance = true;
}
// 对 TreeBin 对象进行解决 与下面的过程相似
else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
// 结构正序和反序两个链表
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 如果扩容后曾经不再须要 tree 的构造 反向转换为链表构造
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
// 在 nextTable 的 i 地位上插入一个链表
setTabAt(nextTab, i, ln);
// 在 nextTable 的 i + n 的地位上插入另一个链表
setTabAt(nextTab, i + n, hn);
// 在 table 的 i 地位上插入 forwardNode 节点 示意曾经解决过该节点
setTabAt(tab, i, fwd);
// 设置 advance 为 true 返回到下面的 while 循环中 就能够执行 i -- 操作
advance = true;
}
}
}
}
}
}
2.6 Put 办法
后面的所有的介绍其实都为这个办法做铺垫。ConcurrentHashMap 最罕用的就是 put 和 get 两个办法。当初来介绍 put 办法,这个 put 办法仍然沿用 HashMap 的 put 办法的思维,依据 hash 值计算这个新插入的点在 table 中的地位 i,如果 i 地位是空的,间接放进去,否则进行判断,如果 i 地位是树节点,依照树的形式插入新的节点,否则把 i 插入到链表的开端。ConcurrentHashMap 中仍然沿用这个思维,有一个最重要的不同点就是 ConcurrentHashMap 不容许 key 或 value 为 null 值。另外因为波及到多线程,put 办法就要简单一点。在多线程中可能有以下两个状况
- 如果一个或多个线程正在对 ConcurrentHashMap 进行扩容操作,以后线程也要进入扩容的操作中。这个扩容的操作之所以能被检测到,是因为 transfer 办法中在空结点上插入 forward 节点,如果检测到须要插入的地位被 forward 节点占有,就帮忙进行扩容;
- 如果检测到要插入的节点是非空且不是 forward 节点,就对这个节点加锁,这样就保障了线程平安。只管这个有一些影响效率,然而还是会比 hashTable 的 synchronized 要好得多。
整体流程就是首先定义不容许 key 或 value 为 null 的状况放入 对于每一个放入的值,首先利用 spread 办法对 key 的 hashcode 进行一次 hash 计算,由此来确定这个值在 table 中的地位。
如果这个地位是空的,那么间接放入,而且不须要加锁操作。
如果这个地位存在结点,阐明产生了 hash 碰撞,首先判断这个节点的类型。如果是链表节点(fh>0), 则失去的结点就是 hash 值雷同的节点组成的链表的头节点。须要顺次向后遍历确定这个新退出的值所在位置。如果遇到 hash 值与 key 值都与新退出节点是统一的状况,则只须要更新 value 值即可。否则顺次向后遍历,直到链表尾插入这个结点。如果退出这个节点当前链表长度大于 8,就把这个链表转换成红黑树。如果这个节点的类型曾经是树节点的话,间接调用树节点的插入方法进行插入新的值。
public V put(K key, V value) {return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 不容许 key 或 value 为 null
if (key == null || value == null) throw new NullPointerException();
// 计算 hash 值
int hash = spread(key.hashCode());
int binCount = 0;
// 死循环 何时插入胜利 何时跳出
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果 table 为空的话,初始化 table
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 依据 hash 值计算出在 table 外面的地位
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 如果这个地位没有值,间接放进去,不须要加锁
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 当遇到表连接点时,须要进行整合表的操作
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 结点上锁 这里的结点能够了解为 hash 值雷同组成的链表的头结点
synchronized (f) {if (tabAt(tab, i) == f) {
//fh〉0 阐明这个节点是一个链表的节点 不是树的节点
if (fh >= 0) {
binCount = 1;
// 在这里遍历链表所有的结点
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果 hash 值和 key 值雷同 则批改对应结点的 value 值
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 如果遍历到了最初一个结点,那么就证实新的节点须要插入 就把它插入在链表尾部
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果这个节点是树节点,就依照树的形式插入值
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 如果链表长度曾经达到临界值 8 就须要把链表转换为树结构
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 将以后 ConcurrentHashMap 的元素数量 +1
addCount(1L, binCount);
return null;
}
咱们能够发现 JDK8 中的实现也是锁拆散的思维,只是锁住的是一个 Node,而不是 JDK7 中的 Segment,而锁住 Node 之前的操作是无锁的并且也是线程平安的,建设在之前提到的 3 个原子操作上。
2.6.1 helpTransfer 办法
这是一个帮助扩容的办法。这个办法被调用的时候,以后 ConcurrentHashMap 肯定曾经有了 nextTable 对象,首先拿到这个 nextTable 对象,调用 transfer 办法。回看下面的 transfer 办法能够看到,当本线程进入扩容办法的时候会间接进入复制阶段。
2.6.2 treeifyBin 办法
这个办法用于将过长的链表转换为 TreeBin 对象。然而他并不是间接转换,而是进行一次容量判断,如果容量没有达到转换的要求,间接进行扩容操作并返回;如果满足条件才链表的构造抓换为 TreeBin,这与 HashMap 不同的是,它并没有把 TreeNode 间接放入红黑树,而是利用了 TreeBin 这个小容器来封装所有的 TreeNode.
2.7 get 办法
get 办法比较简单,给定一个 key 来确定 value 的时候,必须满足两个条件 key 雷同 hash 值雷同,对于节点可能在链表或树上的状况,须要别离去查找。
public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 计算 hash 值
int h = spread(key.hashCode());
// 依据 hash 值确定节点地位
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 如果搜寻到的节点 key 与传入的 key 雷同且不为 null, 间接返回这个节点
if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 如果 eh<0 阐明这个节点在树上 间接寻找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 否则遍历链表 找到对应的值并返回
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
2.8 Size 相干的办法
对于 ConcurrentHashMap 来说,这个 table 里到底装了多少货色其实是个不确定的数量,因为不可能在调用 size()办法的时候像 GC 的“stop the world”一样让其余线程都停下来让你去统计,因而只能说这个数量是个估计值。对于这个估计值,ConcurrentHashMap 也是大费周章才计算出来的。
2.8.1 辅助定义
为了统计元素个数,ConcurrentHashMap 定义了一些变量和一个外部类
/**
* A padded cell for distributing counts. Adapted from LongAdder
* and Striped64. See their internal docs for explanation.
*/
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) {value = x;}
}
/******************************************/
/**
* 实际上保留的是 hashmap 中的元素个数 利用 CAS 锁进行更新
但它并不必返回以后 hashmap 的元素个数
*/
private transient volatile long baseCount;
/**
* Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
*/
private transient volatile int cellsBusy;
/**
* Table of counter cells. When non-null, size is a power of 2.
*/
private transient volatile CounterCell[] counterCells;
2.8.2 mappingCount 与 Size 办法
mappingCount 与 size 办法的相似 从 Java 工程师给出的正文来看,应该应用 mappingCount 代替 size 办法 两个办法都没有间接返回 basecount 而是统计一次这个值,而这个值其实也是一个大略的数值,因而可能在统计的时候有其余线程正在执行插入或删除操作。
public int size() {long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
/**
* Returns the number of mappings. This method should be used
* instead of {@link #size} because a ConcurrentHashMap may
* contain more mappings than can be represented as an int. The
* value returned is an estimate; the actual count may differ if
* there are concurrent insertions or removals.
*
* @return the number of mappings
* @since 1.8
*/
public long mappingCount() {long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}
final long sumCount() {CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)
sum += a.value;// 所有 counter 的值求和
}
}
return sum;
}
2.8.3 addCount 办法
在 put 办法结尾处调用了 addCount 办法,把以后 ConcurrentHashMap 的元素个数 + 1 这个办法一共做了两件事, 更新 baseCount 的值,检测是否进行扩容。
private final void addCount(long x, int check) {CounterCell[] as; long b, s;
// 利用 CAS 办法更新 baseCount 的值
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();}
// 如果 check 值大于等于 0 则须要测验是否须要进行扩容操作
if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {int rs = resizeStamp(n);
//
if (sc < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 如果曾经有其余线程在执行扩容操作
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 以后线程是惟一的或是第一个发动扩容的线程 此时 nextTable=null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();}
}
}
总结
JDK6,7 中的 ConcurrentHashmap 次要应用 Segment 来实现减小锁粒度,把 HashMap 宰割成若干个 Segment,在 put 的时候须要锁住 Segment,get 时候不加锁,应用 volatile 来保障可见性,当要统计全局时(比方 size),首先会尝试屡次计算 modcount 来确定,这几次尝试中,是否有其余线程进行了批改操作,如果没有,则间接返回 size。如果有,则须要顺次锁住所有的 Segment 来计算。
jdk7 中 ConcurrentHashmap 中,当长度过长碰撞会很频繁,链表的增改删查操作都会耗费很长的工夫,影响性能, 所以 jdk8 中齐全重写了 concurrentHashmap, 代码量从原来的 1000 多行变成了 6000 多 行,实现上也和原来的分段式存储有很大的区别。
次要设计上的变动有以下几点:
- 不采纳 segment 而采纳 node,锁住 node 来实现减小锁粒度。
- 设计了 MOVED 状态 当 resize 的中过程中 线程 2 还在 put 数据,线程 2 会帮忙 resize。
- 应用 3 个 CAS 操作来确保 node 的一些操作的原子性,这种形式代替了锁。
- sizeCtl 的不同值来代表不同含意,起到了管制的作用。
至于为什么 JDK8 中应用 synchronized 而不是 ReentrantLock,我猜是因为 JDK8 中对 synchronized 有了足够的优化吧。
Reference:
- http://www.jianshu.com/p/4806…
- https://www.zhihu.com/questio…
- http://blog.csdn.net/u0107237…
欢送关注公众号【码农开花】一起学习成长