乐趣区

关于java:Java集合系列之五ConcurrentHashMap底层原理

ConcurrentHashMap 底层原理
ConcurrentHashMap 可能会用的绝对比拟少,因为它跟 HashMap 其实性能十分类似,然而它是并发平安的,而且 1.7 和 1.8 版本中的变动比拟大。

1.7 版本
底层构造

1.7 版本的底层数据结构比拟非凡,像是相似于嵌套 Map 一样,首先整个构造是有一个 Segment 数组,Segment 是一个键值对模式的构造,而后外部存储的是 HashEntry 数组,这外面才真正的寄存数值,而每次加锁的时候会锁住不同操作的那个 Segment,也就是说如果操作的不是同一个,那必定就不会进行加锁操作,默认会有 16 个 Segment。

static final class Segment<K,V> extends ReentrantLock implements Serializable {

 Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
     this.loadFactor = lf;
     this.threshold = threshold;
     this.table = tab;
 }
 ...

}

static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;

}

构造方法

总共有好几个构造方法,然而最终都是调用这个构造方法,能够看到就是依据设置的值来创立 Segment 数组和 Segment 以及 HashEntry 数组。

// 最大分段数量
static final int MAX_SEGMENTS = 1 << 16;
// 最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认分段数量
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

@SuppressWarnings(“unchecked”)
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {

 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
     throw new IllegalArgumentException();
 // 限度最大分段数量
 if (concurrencyLevel > MAX_SEGMENTS)
     concurrencyLevel = MAX_SEGMENTS;
 // Find power-of-two sizes best matching arguments
 int sshift = 0;
 // ssize 是计算出 Segment 的容量
 int ssize = 1;
 while (ssize < concurrencyLevel) {
     ++sshift;
     ssize <<= 1;
 }

 // 这两个变量在定位 segment 时会用到
 this.segmentShift = 32 - sshift;
 this.segmentMask = ssize - 1;

 // 计算 segment 中每个 HashEntry 数组的容量
 if (initialCapacity > MAXIMUM_CAPACITY)
     initialCapacity = MAXIMUM_CAPACITY;
 int c = initialCapacity / ssize;
 if (c * ssize < initialCapacity)
     ++c;
 int cap = MIN_SEGMENT_TABLE_CAPACITY;
 while (cap < c)
     cap <<= 1;
 // create segments and segments[0]
 // 创立 segments 数组并初始化第一个 Segment,其余的 Segment 提早初始化
 Segment<K,V> s0 =
     new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                      (HashEntry<K,V>[])new HashEntry[cap]);
 Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
 UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
 this.segments = ss;

}
put()办法

其实步骤就是先依据初始化定义的两个变量来计算要放到哪个 Segment,而后加锁,再定位到 Segment 外面的 HashEntry 数组 key 的插入地位,最初再判断是直接插入还是链表追加,加锁的形式是自旋尝试加锁。

@SuppressWarnings(“unchecked”)
public V put(K key, V value) {

 Segment<K,V> s;
 if (value == null)
     throw new NullPointerException();
 int hash = hash(key);
 // 应用这两个初始化定义的全局变量定位 segment,返回的 hash 值无符号右移 segmentShift 位与段掩码进行位运算
 int j = (hash >>> segmentShift) & segmentMask;
 if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
      (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
     s = ensureSegment(j);
 // 调用对应的 Segment 的 put()办法
 return s.put(key, hash, value, false);

}

final V put(K key, int hash, V value, boolean onlyIfAbsent) {

 // 因为 Segment 继承 ReentrantLock,应用 tryLock()尝试获取独占锁
 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
 // 返回被替换的旧值
 V oldValue;
 // 放入数据
 try {
     // Segment 中的 HashEntry 数组
     HashEntry<K,V>[] tab = table;
     int index = (tab.length - 1) & hash;
     // 找到数组中对应 key 的第一个元素
     HashEntry<K,V> first = entryAt(tab, index);
     for (HashEntry<K,V> e = first;;) {
         // 第一个元素不为空,间接在前面插入,造成链表
         if (e != null) {
             K k;
             if ((k = e.key) == key ||
                 (e.hash == hash && key.equals(k))) {
                 oldValue = e.value;
                 if (!onlyIfAbsent) {
                     e.value = value;
                     ++modCount;
                 }
                 break;
             }
             e = e.next;
         }
         // scanAndLockForPut()操作中只有对应地位没有元素才创立 node
         // 第一个元素为空,间接把这个 node 插入
         else {if (node != null)
                 node.setNext(first);
             else
                 node = new HashEntry<K,V>(hash, key, value, first);
             int c = count + 1;
             if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                 rehash(node);
             else
                 setEntryAt(tab, index, node);
             ++modCount;
             count = c;
             oldValue = null;
             break;
         }
     }
 } finally {
     // 解锁
     unlock();}
 return oldValue;

}

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {

 // 获取 HashEntry 数组上对应 key 的第一个元素
 HashEntry<K,V> first = entryForHash(this, hash);
 HashEntry<K,V> e = first;
 HashEntry<K,V> node = null;
 int retries = -1; // negative while locating node
 // 自旋获取锁
 while (!tryLock()) {
     HashEntry<K,V> f; // to recheck first below
     if (retries < 0) {
         // 为空就创立一个新节点
         if (e == null) {if (node == null) // speculatively create node
                 node = new HashEntry<K,V>(hash, key, value, null);
             retries = 0;
         }
         else if (key.equals(e.key))
             retries = 0;
         else
             e = e.next;
     }
     // 超过自旋次数,间接锁住
     else if (++retries > MAX_SCAN_RETRIES) {lock();
         break;
     }
     // 头节点发生变化,从新遍历
     else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) {
         e = first = f; // re-traverse if entry changed
         retries = -1;
     }
 }
 return node;

}
get()办法

get()办法中并没有用锁,而是应用了 UNSAFE.getObjectVolatile()来获取,这是一个操作硬件级别的并发类,而这个办法是保障了 Volatile 语义,也就是取的时候肯定会取到最新的值,所以不须要加锁。

public V get(Object key) {

 Segment<K,V> s; // manually integrate access methods to reduce overhead
 HashEntry<K,V>[] tab;
 int h = hash(key);
 // 计算 segment 地位
 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
 // 计算 HashEntry 数组地位中的 key 地位
 if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
     (tab = s.table) != null) {
     // 循环链表
     for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
              (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
          e != null; e = e.next) {
         K k;
         if ((k = e.key) == key || (e.hash == h && key.equals(k)))
             return e.value;
     }
 }
 return null;

}
1.8 版本
底层构造

在 1.8 版本的时候数据结构又改回和 HashMap 相似的数组加链表,链表过长转化为红黑树,也不再应用 ReentrantLock 来保障并发,而是应用 synchronized 关键字和 CAS 操作保障并发问题,因为 1.6 开始 synchronized 锁做了优化,比方偏差锁、轻量锁和分量锁的降级,所以性能并不一定就差,然而还是保留了 Segment 来进行兼容。

而且能够看到相较于 HashMap 它的外部节点类里的属性应用 volatile 来申明。

static class Node<K,V> implements Map.Entry<K,V> {

 final int hash;
 final K key;
 volatile V val;
 volatile Node<K,V> next;

}

构造方法

能够看到除了为了兼容 1.7 版本而保留的构造方法之外,其余的构造方法都和 HashMap 差不多。

public ConcurrentHashMap(int initialCapacity) {

 if (initialCapacity < 0)
     throw new IllegalArgumentException();
 int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
            MAXIMUM_CAPACITY :
            tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
 this.sizeCtl = cap;

}

public ConcurrentHashMap(int initialCapacity, float loadFactor) {

 this(initialCapacity, loadFactor, 1);

}

public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {

 if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
     throw new IllegalArgumentException();
 if (initialCapacity < concurrencyLevel)   // Use at least as many bins
     initialCapacity = concurrencyLevel;   // as estimated threads
 long size = (long)(1.0 + (long)initialCapacity / loadFactor);
 int cap = (size >= (long)MAXIMUM_CAPACITY) ?
     MAXIMUM_CAPACITY : tableSizeFor((int)size);
 this.sizeCtl = cap;

}
put()办法

首先是一个循环尝试插入,如果插入的地位上是空的,间接 CAS 操作插入,如果有值则加 synchronized 锁,而后批改值,留神的是只锁住了单个索引上的链表或红黑树,也就是说除非是插入操作遇到了哈希碰撞才有加锁操作,否则是不会有加锁操作的,因为不同地位上的值的操作基本不可能有并发问题的,所以不须要加锁。

public V put(K key, V value) {

 return putVal(key, value, false);

}

final V putVal(K key, V value, boolean onlyIfAbsent) {

 // key 和 value 不能为空
 if (key == null || value == null) throw new NullPointerException();
 int hash = spread(key.hashCode());
 int binCount = 0;
 // 循环晓得胜利为止
 for (Node<K,V>[] tab = table;;) {
     Node<K,V> f; int n, i, fh;
     // 数组为空,初始化
     if (tab == null || (n = tab.length) == 0)
         tab = initTable();
     // 以后地位还没有元素,应用 Unsafe 类的 CAS 操作放入数据,胜利了就跳出循环
     else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
         if (casTabAt(tab, i, null,
                      new Node<K,V>(hash, key, value, null)))
             break;                   // no lock when adding to empty bin
     }
     // 扩容
     else if ((fh = f.hash) == MOVED)
         tab = helpTransfer(tab, f);
     // 这里会加锁,然而留神锁住了链表或者红黑树,也就是说只独自锁住了数组中这个索引,对于其余的索引地位上的 put 和 get 都没有影响
     else {
         V oldVal = null;
         // 锁住时候就是查找地位,判断数据结构进行插入
         synchronized (f) {if (tabAt(tab, i) == f) {if (fh >= 0) {
                     binCount = 1;
                     for (Node<K,V> e = f;; ++binCount) {
                         K ek;
                         if (e.hash == hash &&
                             ((ek = e.key) == key ||
                              (ek != null && key.equals(ek)))) {
                             oldVal = e.val;
                             if (!onlyIfAbsent)
                                 e.val = value;
                             break;
                         }
                         Node<K,V> pred = e;
                         if ((e = e.next) == null) {
                             pred.next = new Node<K,V>(hash, key,
                                                       value, null);
                             break;
                         }
                     }
                 }
                 else if (f instanceof TreeBin) {
                     Node<K,V> p;
                     binCount = 2;
                     if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                    value)) != null) {
                         oldVal = p.val;
                         if (!onlyIfAbsent)
                             p.val = value;
                     }
                 }
             }
         }
         if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)
                 treeifyBin(tab, i);
             if (oldVal != null)
                 return oldVal;
             break;
         }
     }
 }
 addCount(1L, binCount);
 return null;

}
get()办法

整个办法就是间接进行哈希计算查找对应地位上的数据,因为后面说了 Node 节点里的字段都是用 volatile 润饰,所以取的时候肯定会取到最新的值。

public V get(Object key) {

 Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
 int h = spread(key.hashCode());
 if ((tab = table) != null && (n = tab.length) > 0 &&
     (e = tabAt(tab, (n - 1) & h)) != null) {if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))
             return e.val;
     }
     else if (eh < 0)
         return (p = e.find(h, key)) != null ? p.val : null;
     while ((e = e.next) != null) {
         if (e.hash == h &&
             ((ek = e.key) == key || (ek != null && key.equals(ek))))
             return e.val;
     }
 }
 return null;

}
总结
ConcurrentHashMap 比照与 HashMap 能够解决并发问题,其余的大体原理还是没有太多变动。
1.7 版本应用的分段锁,1.8 版本则更像是基于 HashMap 应用 synchronized 和 CAS 操作来保障并发问题,而且只锁单个索引地位,性能更好。
ConcurrentHashMap 是弱一致性的,也就是有些迭代器的办法可能没方法获取最新值,这也是为了性能而舍弃强一致性,不然只能像 Hashtable 那样间接锁住全副操作了。

退出移动版