突击并发编程JUC系列演示代码地址:
https://github.com/mtcarpenter/JavaTutorial
本节让咱们一起钻研一下该容器是如何在保障线程平安的同时又能保障高效的操作。ConcurrentHashMap
是线程平安且高效的HashMap
。
为什么要应用ConcurrentHashMap
在并发编程中应用HashMap
可能导致程序死循环。而应用线程平安的HashTable
效率又十分低下,基于以上两个起因,便有了ConcurrentHashMap
的退场机会。
线程不平安的HashMap
在多线程环境下,应用HashMap
进行put操作会引起死循环,导致CPU利用率靠近100%,所以在并发状况下不能应用HashMap
。例如,执行以下代码会引起死循环。
public class HashMapExample { // 申请总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; private static Map<Integer, Integer> map = new HashMap<>(2); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { update(count); } catch (Exception e) { e.printStackTrace(); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); System.out.println("size:"+ map.size()); } private static void update(int i) { map.put(i, i); }}
HashMap
在并发执行put操作时会引起死循环,是因为多线程会导致HashMap
的 Entry 链表造成环形数据结构,一旦造成环形数据结构,Entry 的 next 节点永远不为空,就会产生死循环获取 Entry。
效率低下的HashTable
HashTable
容器应用synchronized
来保障线程平安,但在线程竞争强烈的状况下HashTable
的效率十分低下。因为当一个线程拜访HashTable
的同步办法,其余线程也拜访HashTable
的同步办法时,会进入阻塞或轮询状态。如线程 1 应用 put 进行元素增加,线程2岂但不能应用put办法增加元素,也不能应用 get 办法来获取元素,所以竞争越强烈效率越低。
ConcurrentHashMap 的锁分段技术可无效晋升并发访问率
HashTable
容器在竞争强烈的并发环境下体现出效率低下的起因是所有拜访HashTable
的线程都必须竞争同一把锁,如果容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程拜访容器里不同数据段的数据时,线程间就不会存在锁竞争,从而能够无效进步并发拜访效率,这就是ConcurrentHashMap
所应用的锁分段技术。首先将数据分成一段一段地存储,而后给每一段数据配一把锁,当一个线程占用锁拜访其中一个段数据的时候,其余段的数据也能被其余线程拜访。
ConcurrentHashMapExample 示例
public class ConcurrentHashMapExample { // 申请总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; private static Map<Integer, Integer> map = new ConcurrentHashMap<>(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { update(count); } catch (Exception e) { e.printStackTrace(); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); System.out.println("size:" + map.size()); } private static void update(int i) { map.put(i, i); }}
ConcurrentHashMap JDK 1.7/JDK 1.8
- JDK 1.7 中应用分段锁(ReentrantLock + Segment + HashEntry),相当于把一个 HashMap 分成多个段,每段调配一把锁,这样反对多线程拜访。锁粒度:基于 Segment,蕴含多个 HashEntry。
- JDK 1.8 中应用 CAS + synchronized + Node + 红黑树。锁粒度:Node(首结点)(实现 Map.Entry)。锁粒度升高了。
JDK 1.7 构造
JDK 1.7
中的ConcurrentHashMap
外部进行了 Segment
分段,Segment
继承了 ReentrantLock
,能够了解为一把锁,各个 Segment
之间都是互相独立上锁的,互不影响。
相比于之前的 Hashtable
每次操作都须要把整个对象锁住而言,大大提高了并发效率。因为它的锁与锁之间是独立的,而不是整个对象只有一把锁。
每个 Segment 的底层数据结构与 HashMap
相似,依然是数组和链表组成的拉链法构造。默认有 0~15 共 16 个 Segment
,所以最多能够同时反对 16 个线程并发操作(操作别离散布在不同的 Segment
上)。16 这个默认值能够在初始化的时候设置为其余值,然而一旦确认初始化当前,是不能够扩容的。
JDK 1.8 构造
图中的节点有三种类型:
- 第一种是最简略的,空着的地位代表以后还没有元素来填充。
- 第二种就是和
HashMap
十分相似的拉链法构造,在每一个槽中会首先填入第一个节点,然而后续如果计算出雷同的 Hash 值,就用链表的模式往后进行延长。 - 第三种构造就是红黑树结构,这是 Java 7 的
ConcurrentHashMap
中所没有的构造,在此之前咱们可能也很少接触这样的数据结构
链表长度大于某一个阈值(默认为 8),满足容量从链表的模式转化为红黑树的模式。
红黑树是每个节点都带有色彩属性的二叉查找树,色彩为红色或彩色,红黑树的实质是对二叉查找树 BST 的一种均衡策略,咱们能够了解为是一种均衡二叉查找树,查找效率高,会主动均衡,避免极其不均衡从而影响查找效率的状况产生,红黑树每个节点要么是红色,要么是彩色,但根节点永远是彩色的。
ConcurrentHashMap 源码
常量阐明
/** * table 桶数最大值,且前两位用作管制标记 */ private static final int MAXIMUM_CAPACITY = 1 << 30; /** * table 桶数初始化默认值,需为2的幂次方 */ private static final int DEFAULT_CAPACITY = 16; /** * 数组可能最大值,须要与toArray()相干办法关联 */ static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; /** *并发级别,遗留下来的,为兼容以前的版本 */ private static final int DEFAULT_CONCURRENCY_LEVEL = 16; /** * 加载因子,扩容的阀值,可在构造方法定义 */ private static final float LOAD_FACTOR = 0.75f; /** * 链表转红黑树阀值, 超过 8 链表转换为红黑树 */ static final int TREEIFY_THRESHOLD = 8; /** * 树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器别离++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo)) */ static final int UNTREEIFY_THRESHOLD = 6; /** * 树化阀值2,当数组桶树达到64以上才容许链表树化 */ static final int MIN_TREEIFY_CAPACITY = 64;
初始化 table
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; // 初始化胜利退出循环 while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); // 有其余线程在初始化,自旋期待 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 进入初始化 sizeCtl = -1 try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; // sc = n-n/4 ??? sc = n - (n >>> 2); } } finally { // 初始化胜利设置 sizeCtl sizeCtl = sc; } break; } } return tab;}
get 办法
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); // 查看 table 是否存在,hashCode 所在的索引是有为空 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } // 依照树的形式遍历 Node 查找 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; // 依照链表的形式遍历 Node 查找 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null;}
总结一下 get 的过程:
- 计算 Hash 值,并由此值找到对应的槽点;
- 如果数组是空的或者该地位为 null,那么间接返回 null 就能够了;
- 如果该地位处的节点刚好就是咱们须要的,间接返回该节点的值;
- 如果该地位节点是红黑树或者正在扩容,就用 find 办法持续查找;
- 否则那就是链表,就进行遍历链表查找
put 办法阐明
public V put(K key, V value) { return putVal(key, value, false);} final V putVal(K key, V value, boolean onlyIfAbsent) { // 计算 key 的 hashCode int hash = spread(key.hashCode()); int binCount = 0; // 循环直到插入胜利, for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) // 初始化 table tab = initTable(); // tabAt 调用 getObjectVolatile // 以后地位为空能够直接插入的状况 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 通过 CAS 操作插入,不须要加锁 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; } // 上面是地位曾经有值的状况 // MOVED 示意以后 Map 正在进行扩容 else if ((fh = f.hash) == MOVED) // 帮忙进行扩容,而后进入下一次循环尝试插入 tab = helpTransfer(tab, f); // 未在扩容的状况 else { V oldVal = null; // 对f加锁,f 是存储在以后地位的 Node 的头节点 synchronized (f) { // 双重查看,保障 Node 头节点没有扭转 if (tabAt(tab, i) == f) { if (fh >= 0) { // 对链表进行操作 binCount = 1; for (Node<K,V> e = f;; ++binCount) { // 更新值(判断 onlyIfAbsent)或插入链表尾部 ... break; } } else if (f instanceof TreeBin) { // 对树进行操作 ... } } } // 判断是否须要 treeify if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null;}
总结一下 put 的过程:
- 判断Node[] 数组是否初始化,没有则进行初始化操作
- 通过 hash 定位数组的索引坐标,是否有 Node 节点,如果没有则应用 CAS 进行增加(链表的头节点),增加失败则进入下次循环。
- 查看到外部正在扩容,就帮忙它一块扩容。
如果 f != null ,则应用 synchronized 锁住 f 元素(链表/红黑二叉树的头元素)
- 如果是 Node (链表构造)则执行链表的增加操作
- 如果是 TreeNode (树形构造)则执行树增加操作。
- 判断链表长度曾经达到临界值 8 ,当然这个 8 是默认值,大家也能够去做调整,当节点数超过这个值就须要把链表转换为树结构。
tryPresize办法
private final void tryPresize(int size) { //计算扩容的指标size int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; //tab没有初始化 if (tab == null || (n = tab.length) == 0) { n = (sc > c) ? sc : c; //初始化之前,CAS设置sizeCtl=-1 if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; //sc=0.75n,相当于扩容阈值 sc = n - (n >>> 2); } } finally { //此时并没有通过CAS赋值,因为其余想要执行初始化的线程,发现sizeCtl=-1,就间接返回,从而确保任何状况,只会有一个线程执行初始化操作。 sizeCtl = sc; } } } //指标扩容size小于扩容阈值,或者容量超过最大限度时,不须要扩容 else if (c <= sc || n >= MAXIMUM_CAPACITY) break; //扩容 else if (tab == table) { int rs = resizeStamp(n); //sc<0示意,曾经有其余线程正在扩容 if (sc < 0) { Node<K,V>[] nt; /** 1 (sc >>> RESIZE_STAMP_SHIFT) != rs :扩容线程数 > MAX_RESIZERS-1 2 sc == rs + 1 和 sc == rs + MAX_RESIZERS :示意什么??? 3 (nt = nextTable) == null :示意nextTable正在初始化 4 transferIndex <= 0 :示意所有hash桶均调配进来 */ //如果不须要帮其扩容,间接返回 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; //CAS设置sizeCtl=sizeCtl+1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) //帮其扩容 transfer(tab, nt); } //第一个执行扩容操作的线程,将sizeCtl设置为:(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2) else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); } } }
此段代码参考网址:https://www.jianshu.com/p/487d00afe6ca
transfer办法
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; //计算须要迁徙多少个hash桶(MIN_TRANSFER_STRIDE该值作为上限,以防止扩容线程过多) if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { //扩容一倍 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab //1 逆序迁徙曾经获取到的hash桶汇合,如果迁徙结束,则更新transferIndex,获取下一批待迁徙的hash桶 //2 如果transferIndex=0,示意所以hash桶均被调配,将i置为-1,筹备退出transfer办法 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; //更新待迁徙的hash桶索引 while (advance) { int nextIndex, nextBound; //更新迁徙索引i。 if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { //transferIndex<=0示意曾经没有须要迁徙的hash桶,将i置为-1,线程筹备退出 i = -1; advance = false; } //当迁徙完bound这个桶后,尝试更新transferIndex,,获取下一批待迁徙的hash桶 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } //退出transfer if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { //最初一个迁徙的线程,recheck后,做收尾工作,而后退出 nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { /** 第一个扩容的线程,执行transfer办法之前,会设置 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2) 后续帮其扩容的线程,执行transfer办法之前,会设置 sizeCtl = sizeCtl+1 每一个退出transfer的办法的线程,退出之前,会设置 sizeCtl = sizeCtl-1 那么最初一个线程退出时: 必然有sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),即 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT */ //不相等,阐明不到最初一个线程,间接退出transfer办法 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; //最初退出的线程要从新check下是否全副迁徙结束 i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) advance = true; // already processed //迁徙node节点 else { synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; //链表迁徙 if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } //将node链表,分成2个新的node链表 for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } //将新node链表赋给nextTab setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } //红黑树迁徙 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
欢送关注公众号 山间木匠 , 我是小春哥,从事 Java 后端开发,会一点前端、通过继续输入系列技术文章以文会友,如果本文能为您提供帮忙,欢送大家关注、点赞、分享反对,_咱们下期再见!_