关于后端:突击并发编程JUC系列并发容器ConcurrentHashMap

42次阅读

共计 10569 个字符,预计需要花费 27 分钟才能阅读完成。

突击并发编程 JUC 系列演示代码地址:
https://github.com/mtcarpenter/JavaTutorial

本节让咱们一起钻研一下该容器是如何在保障线程平安的同时又能保障高效的操作。ConcurrentHashMap是线程平安且高效的HashMap

为什么要应用 ConcurrentHashMap

在并发编程中应用 HashMap 可能导致程序死循环。而应用线程平安的 HashTable 效率又十分低下,基于以上两个起因,便有了 ConcurrentHashMap 的退场机会。

线程不平安的 HashMap

在多线程环境下,应用 HashMap 进行 put 操作会引起死循环,导致 CPU 利用率靠近 100%,所以在并发状况下不能应用HashMap。例如,执行以下代码会引起死循环。

public class HashMapExample {

    // 申请总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    private static Map<Integer, Integer> map = new HashMap<>(2);

    public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newFixedThreadPool(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {update(count);
                } catch (Exception e) {e.printStackTrace();
                }
                countDownLatch.countDown();});
        }
        countDownLatch.await();
        executorService.shutdown();
        System.out.println("size:"+ map.size());
    }

    private static void update(int i) {map.put(i, i);
    }
}


HashMap在并发执行 put 操作时会引起死循环,是因为多线程会导致 HashMap 的 Entry 链表造成环形数据结构,一旦造成环形数据结构,Entry 的 next 节点永远不为空,就会产生死循环获取 Entry。

效率低下的 HashTable

HashTable容器应用 synchronized 来保障线程平安,但在线程竞争强烈的状况下 HashTable 的效率十分低下。因为当一个线程拜访 HashTable 的同步办法,其余线程也拜访 HashTable 的同步办法时,会进入阻塞或轮询状态。如线程 1 应用 put 进行元素增加,线程 2 岂但不能应用 put 办法增加元素,也不能应用 get 办法来获取元素,所以竞争越强烈效率越低。

ConcurrentHashMap 的锁分段技术可无效晋升并发访问率

HashTable容器在竞争强烈的并发环境下体现出效率低下的起因是所有拜访 HashTable 的线程都必须竞争同一把锁,如果容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程拜访容器里不同数据段的数据时,线程间就不会存在锁竞争,从而能够无效进步并发拜访效率,这就是 ConcurrentHashMap 所应用的锁分段技术。首先将数据分成一段一段地存储,而后给每一段数据配一把锁,当一个线程占用锁拜访其中一个段数据的时候,其余段的数据也能被其余线程拜访。

ConcurrentHashMapExample 示例

public class ConcurrentHashMapExample {

    // 申请总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    private static Map<Integer, Integer> map = new ConcurrentHashMap<>();

    public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newFixedThreadPool(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            final int count = i;
            executorService.execute(() -> {
                try {update(count);
                } catch (Exception e) {e.printStackTrace();
                }
                countDownLatch.countDown();});
        }
        countDownLatch.await();
        executorService.shutdown();
        System.out.println("size:" + map.size());
    }

    private static void update(int i) {map.put(i, i);
    }
}

ConcurrentHashMap JDK 1.7/JDK 1.8

  • JDK 1.7 中应用分段锁(ReentrantLock + Segment + HashEntry),相当于把一个 HashMap 分成多个段,每段调配一把锁,这样反对多线程拜访。锁粒度:基于 Segment,蕴含多个 HashEntry。
  • JDK 1.8 中应用 CAS + synchronized + Node + 红黑树。锁粒度:Node(首结点)(实现 Map.Entry)。锁粒度升高了。

JDK 1.7 构造


JDK 1.7 中的 ConcurrentHashMap 外部进行了 Segment 分段,Segment 继承了 ReentrantLock,能够了解为一把锁,各个 Segment 之间都是互相独立上锁的,互不影响。
相比于之前的 Hashtable 每次操作都须要把整个对象锁住而言,大大提高了并发效率。因为它的锁与锁之间是独立的,而不是整个对象只有一把锁。
每个 Segment 的底层数据结构与 HashMap 相似,依然是数组和链表组成的拉链法构造。默认有 0~15 共 16 个 Segment,所以最多能够同时反对 16 个线程并发操作(操作别离散布在不同的 Segment 上)。16 这个默认值能够在初始化的时候设置为其余值,然而一旦确认初始化当前,是不能够扩容的。

JDK 1.8 构造


图中的节点有三种类型: 

  • 第一种是最简略的,空着的地位代表以后还没有元素来填充。
  • 第二种就是和 HashMap 十分相似的拉链法构造,在每一个槽中会首先填入第一个节点,然而后续如果计算出雷同的 Hash 值,就用链表的模式往后进行延长。
  • 第三种构造就是红黑树结构,这是 Java 7 的 ConcurrentHashMap 中所没有的构造,在此之前咱们可能也很少接触这样的数据结构

链表长度大于某一个阈值(默认为 8),满足容量从链表的模式转化为红黑树的模式。
红黑树是每个节点都带有色彩属性的二叉查找树,色彩为红色或彩色,红黑树的实质是对二叉查找树 BST 的一种均衡策略,咱们能够了解为是一种均衡二叉查找树,查找效率高,会主动均衡,避免极其不均衡从而影响查找效率的状况产生,红黑树每个节点要么是红色,要么是彩色,但根节点永远是彩色的。

ConcurrentHashMap 源码

常量阐明

    /**
     * table 桶数最大值,且前两位用作管制标记
     */
    private static final int MAXIMUM_CAPACITY = 1 << 30;

    /**
     * table 桶数初始化默认值,需为 2 的幂次方
     */
    private static final int DEFAULT_CAPACITY = 16;

    /**
     * 数组可能最大值,须要与 toArray()相干办法关联  
     */
    static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    /**
     * 并发级别,遗留下来的,为兼容以前的版本  
     */
    private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    /**
     * 加载因子,扩容的阀值,可在构造方法定义
     */
    private static final float LOAD_FACTOR = 0.75f;

    /**
     * 链表转红黑树阀值, 超过 8 链表转换为红黑树  
     */
    static final int TREEIFY_THRESHOLD = 8;

    /**
     * 树转链表阀值,小于等于 6(tranfer 时,lc、hc= 0 两个计数器别离 ++ 记录原 bin、新 binTreeNode 数量,<=UNTREEIFY_THRESHOLD 则 untreeify(lo))*/
    static final int UNTREEIFY_THRESHOLD = 6;

    /**
     * 树化阀值 2,当数组桶树达到 64 以上才容许链表树化
     */
    static final int MIN_TREEIFY_CAPACITY = 64;


初始化 table


private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;
    // 初始化胜利退出循环
    while ((tab = table) == null || tab.length == 0) {if ((sc = sizeCtl) < 0)
        Thread.yield(); // 有其余线程在初始化,自旋期待
      else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
        // 进入初始化 sizeCtl = -1
        try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
            table = tab = nt;
            // sc = n-n/4 ???
            sc = n - (n >>> 2);
          }
        } finally {
          // 初始化胜利设置 sizeCtl
          sizeCtl = sc;
        }
        break;
      }
    }
    return tab;
}

get 办法


public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    
    // 查看 table 是否存在,hashCode 所在的索引是有为空
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
      
        // 依照树的形式遍历 Node 查找
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
            
        // 依照链表的形式遍历 Node 查找    
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

总结一下 get 的过程:

  • 计算 Hash 值,并由此值找到对应的槽点;
  • 如果数组是空的或者该地位为 null,那么间接返回 null 就能够了;
  • 如果该地位处的节点刚好就是咱们须要的,间接返回该节点的值;
  • 如果该地位节点是红黑树或者正在扩容,就用 find 办法持续查找;
  • 否则那就是链表,就进行遍历链表查找

put 办法阐明


public V put(K key, V value) {return putVal(key, value, false);
}
    
final V putVal(K key, V value, boolean onlyIfAbsent) {
    // 计算 key 的 hashCode
    int hash = spread(key.hashCode());
    int binCount = 0;
    
    // 循环直到插入胜利,for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            // 初始化 table
            tab = initTable();
        
        // tabAt 调用 getObjectVolatile 
        // 以后地位为空能够直接插入的状况    
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            
            // 通过 CAS 操作插入,不须要加锁
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;              
        }
        
        // 上面是地位曾经有值的状况
        
        // MOVED 示意以后 Map 正在进行扩容
        else if ((fh = f.hash) == MOVED)
            // 帮忙进行扩容,而后进入下一次循环尝试插入
            tab = helpTransfer(tab, f);
      
        // 未在扩容的状况
        else {
            V oldVal = null;
            
            // 对 f 加锁,f 是存储在以后地位的 Node 的头节点
            synchronized (f) {
                // 双重查看,保障 Node 头节点没有扭转
                if (tabAt(tab, i) == f) {if (fh >= 0) {
                        // 对链表进行操作
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            // 更新值(判断 onlyIfAbsent)或插入链表尾部 ...
                            break;
                        }
                    }
                    else if (f instanceof TreeBin) {// 对树进行操作 ...}
                }
            }
            
            // 判断是否须要 treeify
            if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    
    addCount(1L, binCount);
    return null;
}

总结一下 put 的过程:

  • 判断 Node[] 数组是否初始化,没有则进行初始化操作
  • 通过 hash 定位数组的索引坐标,是否有 Node 节点,如果没有则应用 CAS 进行增加(链表的头节点),增加失败则进入下次循环。
  • 查看到外部正在扩容,就帮忙它一块扩容。
  • 如果 f != null,则应用 synchronized 锁住 f 元素(链表 / 红黑二叉树的头元素)

    • 如果是 Node(链表构造)则执行链表的增加操作
    • 如果是 TreeNode(树形构造)则执行树增加操作。
  • 判断链表长度曾经达到临界值 8,当然这个 8 是默认值,大家也能够去做调整,当节点数超过这个值就须要把链表转换为树结构。

tryPresize 办法

    private final void tryPresize(int size) {
        // 计算扩容的指标 size
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        while ((sc = sizeCtl) >= 0) {Node<K,V>[] tab = table; int n;
            //tab 没有初始化
            if (tab == null || (n = tab.length) == 0) {n = (sc > c) ? sc : c;
                // 初始化之前,CAS 设置 sizeCtl=-1 
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {if (table == tab) {@SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            //sc=0.75n, 相当于扩容阈值
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        // 此时并没有通过 CAS 赋值,因为其余想要执行初始化的线程,发现 sizeCtl=-1,就间接返回,从而确保任何状况,只会有一个线程执行初始化操作。sizeCtl = sc;
                    }
                }
            }
            // 指标扩容 size 小于扩容阈值,或者容量超过最大限度时,不须要扩容
            else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;
            // 扩容
            else if (tab == table) {int rs = resizeStamp(n);
                //sc<0 示意,曾经有其余线程正在扩容
                if (sc < 0) {Node<K,V>[] nt;
                      /**
                      1 (sc >>> RESIZE_STAMP_SHIFT) != rs:扩容线程数 > MAX_RESIZERS-1
                      2 sc == rs + 1 和 sc == rs + MAX_RESIZERS:示意什么???3 (nt = nextTable) == null:示意 nextTable 正在初始化
                      4 transferIndex <= 0:示意所有 hash 桶均调配进来
                    */
                    // 如果不须要帮其扩容,间接返回
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    //CAS 设置 sizeCtl=sizeCtl+1
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        // 帮其扩容
                        transfer(tab, nt);
                }
                // 第一个执行扩容操作的线程,将 sizeCtl 设置为:(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }

此段代码参考网址:https://www.jianshu.com/p/487d00afe6ca

transfer 办法


    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        // 计算须要迁徙多少个 hash 桶(MIN_TRANSFER_STRIDE 该值作为上限,以防止扩容线程过多)if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
       
        if (nextTab == null) {            // initiating
            try {
                // 扩容一倍
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
      
        //1 逆序迁徙曾经获取到的 hash 桶汇合,如果迁徙结束,则更新 transferIndex,获取下一批待迁徙的 hash 桶
        //2 如果 transferIndex=0,示意所以 hash 桶均被调配,将 i 置为 -1,筹备退出 transfer 办法
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            
            // 更新待迁徙的 hash 桶索引
            while (advance) {
                int nextIndex, nextBound;
                // 更新迁徙索引 i。if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    //transferIndex<= 0 示意曾经没有须要迁徙的 hash 桶,将 i 置为 -1,线程筹备退出
                    i = -1;
                    advance = false;
                }
                // 当迁徙完 bound 这个桶后,尝试更新 transferIndex,,获取下一批待迁徙的 hash 桶
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            // 退出 transfer
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {
                    // 最初一个迁徙的线程,recheck 后,做收尾工作,而后退出
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    /**
                     第一个扩容的线程,执行 transfer 办法之前,会设置 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)
                     后续帮其扩容的线程,执行 transfer 办法之前,会设置 sizeCtl = sizeCtl+1
                     每一个退出 transfer 的办法的线程,退出之前,会设置 sizeCtl = sizeCtl-1
                     那么最初一个线程退出时:必然有 sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),即 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT
                    */
                    
                    // 不相等,阐明不到最初一个线程,间接退出 transfer 办法
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    // 最初退出的线程要从新 check 下是否全副迁徙结束
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            // 迁徙 node 节点
            else {synchronized (f) {if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        // 链表迁徙
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            // 将 node 链表,分成 2 个新的 node 链表
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            // 将新 node 链表赋给 nextTab
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        // 红黑树迁徙
                        else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

欢送关注公众号 山间木匠, 我是小春哥,从事 Java 后端开发,会一点前端、通过继续输入系列技术文章以文会友,如果本文能为您提供帮忙,欢送大家关注、点赞、分享反对,_咱们下期再见!_

正文完
 0