关于面试:面试经典-ConcurrentHashMap-源码你读过吗

4次阅读

共计 4547 个字符,预计需要花费 12 分钟才能阅读完成。

HashMap 的线程安全性

HashMap 是线程不平安的。

为了应用线程平安的数据结构,多线程条件下,可应用 Collections.synchronizedMap 办法结构出一个同步 Map,或者间接应用线程平安的 ConcurrentHashMap

Java 7 基于分段锁的 ConcurrentHashMap

注:本章的代码均基于 JDK 1.7.0_67

数据结构

Java 7 中的 ConcurrentHashMap 的底层数据结构依然是数组和链表。

与 HashMap 不同的是,ConcurrentHashMap 最外层不是一个大的数组,而是一个 Segment 的数组。

每个 Segment 蕴含一个与 HashMap 数据结构差不多的链表数组。整体数据结构如下图所示。

寻址形式

在读写某个 Key 时,先取该 Key 的哈希值。

并将哈希值的高 N 位对 Segment 个数取模从而失去该 Key 应该属于哪个 Segment,接着如同操作 HashMap 一样操作这个 Segment。

为了保障不同的值均匀分布到不同的 Segment,须要通过如下办法计算哈希值。

private int hash(Object k) {
  int h = hashSeed;
  if ((0 != h) && (k instanceof String)) {return sun.misc.Hashing.stringHash32((String) k);
  }
  h ^= k.hashCode();
  h += (h <<  15) ^ 0xffffcd7d;
  h ^= (h >>> 10);
  h += (h <<   3);
  h ^= (h >>>  6);
  h += (h <<   2) + (h << 14);
  return h ^ (h >>> 16);
}

同样为了进步取模运算效率,通过如下计算,ssize 即为大于 concurrencyLevel 的最小的 2 的 N 次方,同时 segmentMask 为 2^N-1。

这一点跟上文中计算数组长度的办法统一。

对于某一个 Key 的哈希值,只须要向右移 segmentShift 位以取高 sshift 位,再与 segmentMask 取与操作即可失去它在 Segment 数组上的索引。

int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
  ++sshift;
  ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];

同步形式

Segment 继承自 ReentrantLock,所以咱们能够很不便的对每一个 Segment 上锁。

对于读操作,获取 Key 所在的 Segment 时,须要保障可见性(请参考如何保障多线程条件下的可见性)。

具体实现上能够应用 volatile 关键字,也可应用锁。但应用锁开销太大,而应用 volatile 时每次写操作都会让所有 CPU 内缓存有效,也有肯定开销。

ConcurrentHashMap 应用如下办法保障可见性,获得最新的 Segment。

Segment<K,V> s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)

获取 Segment 中的 HashEntry 时也应用了相似办法

HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
  (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE)

对于写操作,并不要求同时获取所有 Segment 的锁,因为那样相当于锁住了整个 Map。

它会先获取该 Key-Value 对所在的 Segment 的锁,获取胜利后就能够像操作一个一般的 HashMap 一样操作该 Segment,并保障该 Segment 的安全性。

同时因为其它 Segment 的锁并未被获取,因而实践上可反对 concurrencyLevel(等于 Segment 的个数)个线程平安的并发读写。

获取锁时,并不间接应用 lock 来获取,因为该办法获取锁失败时会挂起。

事实上,它应用了自旋锁,如果 tryLock 获取锁失败,阐明锁被其它线程占用,此时通过循环再次以 tryLock 的形式申请锁。

如果在循环过程中该 Key 所对应的链表头被批改,则重置 retry 次数。如果 retry 次数超过肯定值,则应用 lock 办法申请锁。

这里应用自旋锁是因为自旋锁的效率比拟高,然而它耗费 CPU 资源比拟多,因而在自旋次数超过阈值时切换为互斥锁。

size 操作

put、remove 和 get 操作只须要关怀一个 Segment,而 size 操作须要遍历所有的 Segment 能力算出整个 Map 的大小。

一个简略的计划是,先锁住所有 Segment,计算完后再解锁。

但这样做,在做 size 操作时,不仅无奈对 Map 进行写操作,同时也无奈进行读操作,不利于对 Map 的并行操作。

为更好反对并发操作,ConcurrentHashMap 会在不上锁的前提一一 Segment 计算 3 次 size,如果某相邻两次计算获取的所有 Segment 的更新次数(每个 Segment 都与 HashMap 一样通过 modCount 跟踪本人的批改次数,Segment 每批改一次其 modCount 加一)相等,阐明这两次计算过程中无更新操作,则这两次计算出的总 size 相等,可间接作为最终后果返回。

如果这三次计算过程中 Map 有更新,则对所有 Segment 加锁从新计算 Size。

该计算方法代码如下

public int size() {final Segment<K,V>[] segments = this.segments;
  int size;
  boolean overflow; // true if size overflows 32 bits
  long sum;         // sum of modCounts
  long last = 0L;   // previous sum
  int retries = -1; // first iteration isn't retry
  try {for (;;) {if (retries++ == RETRIES_BEFORE_LOCK) {for (int j = 0; j < segments.length; ++j)
          ensureSegment(j).lock(); // force creation}
      sum = 0L;
      size = 0;
      overflow = false;
      for (int j = 0; j < segments.length; ++j) {Segment<K,V> seg = segmentAt(segments, j);
        if (seg != null) {
          sum += seg.modCount;
          int c = seg.count;
          if (c < 0 || (size += c) < 0)
            overflow = true;
        }
      }
      if (sum == last)
        break;
      last = sum;
    }
  } finally {if (retries > RETRIES_BEFORE_LOCK) {for (int j = 0; j < segments.length; ++j)
        segmentAt(segments, j).unlock();}
  }
  return overflow ? Integer.MAX_VALUE : size;
}

不同之处

ConcurrentHashMap 与 HashMap 相比,有以下不同点

  1. ConcurrentHashMap 线程平安,而 HashMap 非线程平安
  2. HashMap 容许 Key 和 Value 为 null,而 ConcurrentHashMap 不容许
  3. HashMap 不容许通过 Iterator 遍历的同时通过 HashMap 批改,而 ConcurrentHashMap 容许该行为,并且该更新对后续的遍历可见

Java 8 基于 CAS 的 ConcurrentHashMap

注:本章的代码均基于 JDK 1.8.0_91

数据结构

Java 7 为实现并行拜访,引入了 Segment 这一构造,实现了分段锁,实践上最大并发度与 Segment 个数相等。

Java 8 为进一步提高并发性,摒弃了分段锁的计划,而是间接应用一个大的数组。

同时为了进步哈希碰撞下的寻址性能,Java 8 在链表长度超过肯定阈值(8)时将链表(寻址工夫复杂度为O(N))转换为 红黑树(寻址工夫复杂度为O(log(N)))。

其数据结构如下图所示:

寻址形式

Java 8 的 ConcurrentHashMap 同样是通过 Key 的哈希值与数组长度取模确定该 Key 在数组中的索引。

同样为了防止不太好的 Key 的 hashCode 设计,它通过如下办法计算失去 Key 的最终哈希值。

不同的是,Java 8 的 ConcurrentHashMap 作者认为引入红黑树后,即便哈希抵触比较严重,寻址效率也足够高,所以作者并未在哈希值的计算上做过多设计,只是将 Key 的 hashCode 值与其高 16 位作异或并保障最高位为 0(从而保障最终后果为正整数)。

static final int spread(int h) {return (h ^ (h >>> 16)) & HASH_BITS;
}

同步形式

对于 put 操作,如果 Key 对应的数组元素为 null,则通过 CAS 操作将其设置为以后值。

如果 Key 对应的数组元素(也即链表表头或者树的根元素)不为 null,则对该元素应用 synchronized 关键字申请锁,而后进行操作。

如果该 put 操作使得以后链表长度超过肯定阈值,则将该链表转换为树,从而进步寻址效率。

对于读操作,因为数组被 volatile 关键字润饰,因而不必放心数组的可见性问题。

同时每个元素是一个 Node 实例(Java 7 中每个元素是一个 HashEntry),它的 Key 值和 hash 值都由 final 润饰,不可变更,毋庸关怀它们被批改后的可见性问题。

而其 Value 及对下一个元素的援用由 volatile 润饰,可见性也有保障。

static class Node<K,V> implements Map.Entry<K,V> {
  final int hash;
  final K key;
  volatile V val;
  volatile Node<K,V> next;
}

对于 Key 对应的数组元素的可见性,由 Unsafe.getObjectVolatile() 保障。

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

size 操作

put 办法和 remove 办法都会通过 addCount 办法保护 Map 的 size。

size 办法通过 sumCount 获取由 addCount 办法保护的 Map 的 size。

public int size() {long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}

小结

HashMap 和 List 是我最喜爱的两种数据结构,简略实用。

ConcurrentHashMap 为了性能晋升,都在不同的 jdk 版本中晋升本人,何况你我 呢?

心愿本文对你有帮忙,如果有其余想法的话,也能够评论区和大家分享哦。

各位 极客 的点赞珍藏转发,是老马继续写作的最大能源!

正文完
 0