关于面试:面试经典-ConcurrentHashMap-源码你读过吗

HashMap 的线程安全性

HashMap 是线程不平安的。

为了应用线程平安的数据结构,多线程条件下,可应用 Collections.synchronizedMap 办法结构出一个同步Map,或者间接应用线程平安的 ConcurrentHashMap

Java 7基于分段锁的ConcurrentHashMap

注:本章的代码均基于JDK 1.7.0_67

数据结构

Java 7中的ConcurrentHashMap的底层数据结构依然是数组和链表。

与HashMap不同的是,ConcurrentHashMap最外层不是一个大的数组,而是一个Segment的数组。

每个Segment蕴含一个与HashMap数据结构差不多的链表数组。整体数据结构如下图所示。

寻址形式

在读写某个Key时,先取该Key的哈希值。

并将哈希值的高N位对Segment个数取模从而失去该Key应该属于哪个Segment,接着如同操作HashMap一样操作这个Segment。

为了保障不同的值均匀分布到不同的Segment,须要通过如下办法计算哈希值。

private int hash(Object k) {
  int h = hashSeed;
  if ((0 != h) && (k instanceof String)) {
    return sun.misc.Hashing.stringHash32((String) k);
  }
  h ^= k.hashCode();
  h += (h <<  15) ^ 0xffffcd7d;
  h ^= (h >>> 10);
  h += (h <<   3);
  h ^= (h >>>  6);
  h += (h <<   2) + (h << 14);
  return h ^ (h >>> 16);
}

同样为了进步取模运算效率,通过如下计算,ssize即为大于concurrencyLevel的最小的2的N次方,同时segmentMask为2^N-1。

这一点跟上文中计算数组长度的办法统一。

对于某一个Key的哈希值,只须要向右移segmentShift位以取高sshift位,再与segmentMask取与操作即可失去它在Segment数组上的索引。

int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
  ++sshift;
  ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];

同步形式

Segment继承自ReentrantLock,所以咱们能够很不便的对每一个Segment上锁。

对于读操作,获取Key所在的Segment时,须要保障可见性(请参考如何保障多线程条件下的可见性)。

具体实现上能够应用volatile关键字,也可应用锁。但应用锁开销太大,而应用volatile时每次写操作都会让所有CPU内缓存有效,也有肯定开销。

ConcurrentHashMap应用如下办法保障可见性,获得最新的Segment。

Segment<K,V> s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)

获取Segment中的HashEntry时也应用了相似办法

HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
  (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE)

对于写操作,并不要求同时获取所有Segment的锁,因为那样相当于锁住了整个Map。

它会先获取该Key-Value对所在的Segment的锁,获取胜利后就能够像操作一个一般的HashMap一样操作该Segment,并保障该Segment的安全性。

同时因为其它Segment的锁并未被获取,因而实践上可反对concurrencyLevel(等于Segment的个数)个线程平安的并发读写。

获取锁时,并不间接应用lock来获取,因为该办法获取锁失败时会挂起。

事实上,它应用了自旋锁,如果tryLock获取锁失败,阐明锁被其它线程占用,此时通过循环再次以tryLock的形式申请锁。

如果在循环过程中该Key所对应的链表头被批改,则重置retry次数。如果retry次数超过肯定值,则应用lock办法申请锁。

这里应用自旋锁是因为自旋锁的效率比拟高,然而它耗费CPU资源比拟多,因而在自旋次数超过阈值时切换为互斥锁。

size操作

put、remove和get操作只须要关怀一个Segment,而size操作须要遍历所有的Segment能力算出整个Map的大小。

一个简略的计划是,先锁住所有Segment,计算完后再解锁。

但这样做,在做size操作时,不仅无奈对Map进行写操作,同时也无奈进行读操作,不利于对Map的并行操作。

为更好反对并发操作,ConcurrentHashMap会在不上锁的前提一一Segment计算3次size,如果某相邻两次计算获取的所有Segment的更新次数(每个Segment都与HashMap一样通过modCount跟踪本人的批改次数,Segment每批改一次其modCount加一)相等,阐明这两次计算过程中无更新操作,则这两次计算出的总size相等,可间接作为最终后果返回。

如果这三次计算过程中Map有更新,则对所有Segment加锁从新计算Size。

该计算方法代码如下

public int size() {
  final Segment<K,V>[] segments = this.segments;
  int size;
  boolean overflow; // true if size overflows 32 bits
  long sum;         // sum of modCounts
  long last = 0L;   // previous sum
  int retries = -1; // first iteration isn't retry
  try {
    for (;;) {
      if (retries++ == RETRIES_BEFORE_LOCK) {
        for (int j = 0; j < segments.length; ++j)
          ensureSegment(j).lock(); // force creation
      }
      sum = 0L;
      size = 0;
      overflow = false;
      for (int j = 0; j < segments.length; ++j) {
        Segment<K,V> seg = segmentAt(segments, j);
        if (seg != null) {
          sum += seg.modCount;
          int c = seg.count;
          if (c < 0 || (size += c) < 0)
            overflow = true;
        }
      }
      if (sum == last)
        break;
      last = sum;
    }
  } finally {
    if (retries > RETRIES_BEFORE_LOCK) {
      for (int j = 0; j < segments.length; ++j)
        segmentAt(segments, j).unlock();
    }
  }
  return overflow ? Integer.MAX_VALUE : size;
}

不同之处

ConcurrentHashMap与HashMap相比,有以下不同点

  1. ConcurrentHashMap线程平安,而HashMap非线程平安
  2. HashMap容许Key和Value为null,而ConcurrentHashMap不容许
  3. HashMap不容许通过Iterator遍历的同时通过HashMap批改,而ConcurrentHashMap容许该行为,并且该更新对后续的遍历可见

Java 8 基于 CAS 的 ConcurrentHashMap

注:本章的代码均基于JDK 1.8.0_91

数据结构

Java 7为实现并行拜访,引入了Segment这一构造,实现了分段锁,实践上最大并发度与Segment个数相等。

Java 8为进一步提高并发性,摒弃了分段锁的计划,而是间接应用一个大的数组。

同时为了进步哈希碰撞下的寻址性能,Java 8在链表长度超过肯定阈值(8)时将链表(寻址工夫复杂度为O(N))转换为 红黑树(寻址工夫复杂度为O(log(N)))。

其数据结构如下图所示:

寻址形式

Java 8的ConcurrentHashMap同样是通过Key的哈希值与数组长度取模确定该Key在数组中的索引。

同样为了防止不太好的Key的hashCode设计,它通过如下办法计算失去Key的最终哈希值。

不同的是,Java 8的ConcurrentHashMap作者认为引入红黑树后,即便哈希抵触比较严重,寻址效率也足够高,所以作者并未在哈希值的计算上做过多设计,只是将Key的hashCode值与其高16位作异或并保障最高位为0(从而保障最终后果为正整数)。

static final int spread(int h) {
  return (h ^ (h >>> 16)) & HASH_BITS;
}

同步形式

对于put操作,如果Key对应的数组元素为null,则通过CAS操作将其设置为以后值。

如果Key对应的数组元素(也即链表表头或者树的根元素)不为null,则对该元素应用synchronized关键字申请锁,而后进行操作。

如果该put操作使得以后链表长度超过肯定阈值,则将该链表转换为树,从而进步寻址效率。

对于读操作,因为数组被volatile关键字润饰,因而不必放心数组的可见性问题。

同时每个元素是一个Node实例(Java 7中每个元素是一个HashEntry),它的Key值和hash值都由final润饰,不可变更,毋庸关怀它们被批改后的可见性问题。

而其Value及对下一个元素的援用由volatile润饰,可见性也有保障。

static class Node<K,V> implements Map.Entry<K,V> {
  final int hash;
  final K key;
  volatile V val;
  volatile Node<K,V> next;
}

对于Key对应的数组元素的可见性,由 Unsafe.getObjectVolatile() 保障。

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
  return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

size 操作

put办法和remove办法都会通过addCount办法保护Map的size。

size办法通过sumCount获取由addCount办法保护的Map的size。

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}

小结

HashMap 和 List 是我最喜爱的两种数据结构,简略实用。

ConcurrentHashMap 为了性能晋升,都在不同的 jdk 版本中晋升本人,何况你我呢?

心愿本文对你有帮忙,如果有其余想法的话,也能够评论区和大家分享哦。

各位极客的点赞珍藏转发,是老马继续写作的最大能源!

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理