数据结构
JDK1.7 ConcurrentHashMap基于数组+链表,包含一个Segment数组,每个Segment中是又是一个数组+链表的数据结构(相当于一个HashMap),数组和链表存储的是一个个HashEntry对象
static final class Segment<K,V> extends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L; static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; transient volatile HashEntry<K,V>[] table; transient int count; transient int modCount; transient int threshold; final float loadFactor; } static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; }
罕用办法
应用
源码剖析
次要属性
//默认的容量大小,即HashEntry中数组的容量之和,初始化时会平均分配到每个Segment中的HashEntry数组 static final int DEFAULT_INITIAL_CAPACITY = 16; //默认加载因子 static final float DEFAULT_LOAD_FACTOR = 0.75f; //默认的并发级别,决定了Segment数组的长度 static final int DEFAULT_CONCURRENCY_LEVEL = 16; //最大容量 static final int MAXIMUM_CAPACITY = 1 << 30; //每个Segment中的HashEntry数组最小容量 static final int MIN_SEGMENT_TABLE_CAPACITY = 2; //Segment的最大数量=65536 static final int MAX_SEGMENTS = 1 << 16; //重试次数 static final int RETRIES_BEFORE_LOCK = 2;
构造方法
public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL); } public ConcurrentHashMap(int initialCapacity) { this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); } public ConcurrentHashMap() { this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); } public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1, DEFAULT_INITIAL_CAPACITY), DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); putAll(m); } @SuppressWarnings("unchecked") public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; //ssize即为Segment数组的长度,默认concurrencyLevel=16,即ssize=Segment数组的长度=16 while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; //乘以2 } this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; //cap即每个Segment中的HashEntry数组的长度,即cap=每个Segment中的HashEntry数组的长度=2 while (cap < c) cap <<= 1; //将Segment数组初始化长度为16并且只填充第0个元素,默认大小为2,负载因子 0.75,扩容阀值是 2*0.75=1.5,插入第二个值时才会进行扩容 Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
put()办法
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); // 1. 依据key值,通过hash()计算出对应的hash值 // 2. 依据hash值计算出对应的segment数组下标 int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment //3.如果segment[j]==null,初始化segment[j] s = ensureSegment(j); //4.往segment[j]增加key-value return s.put(key, hash, value, false); } final V put(K key, int hash, V value, boolean onlyIfAbsent) { //tryLock尝试加锁,如果加锁胜利,返回null,否则执行scanAndLockForPut尝试自旋加锁 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { // 1. 依据key值,通过hash()计算出对应的hash值 // 2. 依据hash值计算出对应的HashEntry数组下标 HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; HashEntry<K,V> first = entryAt(tab, index); //通过遍历以该数组元素为头结点的链表 for (HashEntry<K,V> e = first;;) { //若头结点存在,遍历链表,若该key已存在,则用新value替换旧value if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } //若头节点不存在或曾经遍历到了链表尾部 else { //若node不为null,将node增加到HashEntry数组中,这里采纳头插法 if (node != null) node.setNext(first); //若node为null,将node初始化后增加到HashEntry数组中,这里采纳头插法 else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; //键值对数量size > 最大容量threshold if (c > threshold && tab.length < MAXIMUM_CAPACITY) //扩容 rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { //解锁 unlock(); } return oldValue; } //一直用tryLock()自旋进行加锁,若达到自旋次数则调用lock()阻塞获取锁 private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // negative while locating node while (!tryLock()) { HashEntry<K,V> f; // to recheck first below if (retries < 0) { if (e == null) { if (node == null) // speculatively create node node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } else if (key.equals(e.key)) retries = 0; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node; }
rehash()办法
//HashEntry数组扩容为原来的两倍。老数组里的数据挪动到新数组时,地位要么不变,要么变为 index+ oldSize,应用头插法插入到新数组 private void rehash(HashEntry<K,V> node) { HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1; threshold = (int)(newCapacity * loadFactor); HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; int idx = e.hash & sizeMask; if (next == null) // Single node on list newTable[idx] = e; else { // Reuse consecutive sequence at same slot HashEntry<K,V> lastRun = e; int lastIdx = idx; for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // Clone remaining nodes for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
get()办法
//因为 HashEntry 中的 value 属性是用 volatile 关键词润饰的,保障了内存可见性,所以每次获取时都是最新值。ConcurrentHashMap 的 get 办法是十分高效的,因为整个过程都不须要加锁。 public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; // 1. 依据key值,通过hash()计算出对应的hash值 int h = hash(key); // 2. 依据hash值计算出对应的segment数组下标,失去segment数组 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { 3. 依据hash值计算出对应的HashEntry数组下标,失去HashEntry数组,遍历数组 for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; //4.找到对应的key,返回value if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
size()办法
//计算两次,如果不变则返回计算结果,若不统一,则锁住所有的Segment求和 public int size() { // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. final Segment<K,V>[] segments = this.segments; int size; boolean overflow; // true if size overflows 32 bits long sum; // sum of modCounts long last = 0L; // previous sum int retries = -1; // first iteration isn't retry try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } if (sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; }
总结
1.JDK1.7 ConcurrentHashMap基于数组+链表,包含一个Segment数组,每个Segment中是又是一个数组+链表的数据结构(相当于一个HashMap),数组和链表存储的是一个个HashEntry对象
2.Segment继承于ReentrantLock,实践上 ConcurrentHashMap反对CurrencyLevel(Segment数组数量)的线程并发。每当一个线程占用锁拜访一个Segment时,不会影响到其余的Segment。
3.增加key-value时会依据key值计算,依据hash值计算出对应的segment数组下标,对这个segment应用tryLock尝试加锁,如果加锁失败,执行scanAndLockForPut尝试自旋加锁直到胜利;后续流程与HashMap雷同。
4.因为HashEntry中的value属性是用volatile关键词润饰的,保障了内存可见性,所以每次获取时都是最新值。ConcurrentHashMap的get办法是十分高效的,因为整个过程都不须要加锁。