关于java:ConcurrentHashMap-怎么样去保证线程安全的-读操作为什么不需要加锁

51次阅读

共计 10281 个字符,预计需要花费 26 分钟才能阅读完成。

前言

最近在看公众号 看到一个问题:为什么 ConcurrentHashMap 的读操作不须要加锁?<br/>

第一次看到这个问题的时候 我也的确比拟懵逼 我尽管晓得 ConCurrentHashMap 是怎么在 put 的时候 去保障线程平安的,然而真的没关注过 get 取的时候 怎么去保障线程平安的 明天咱们就从这个问题切入 来看下 是怎么做到的

剖析

首先 咱们应该分明下 ConcurrentHashMap 怎么去保障 线程平安的,我会别离从初始化的时候 和新增元素的时候 用了那些伎俩去保障了线程平安

初识 ConcurrentHashMap

首先 咱们还是要先理解下 ConcurrentHashMap 是什么 在那种场景下应用。<br/>
看名字 咱们晓得 首先 这个类应该是 在 java.util.concurrent 包中(JUC)这个包很重要,咱们常见的 线程平安的汇合 都会在这个包外面,再看前面的是 HashMap.

数据结构比照

HashMap 咱们晓得 哈希表,是一个双列汇合。是一个能依据 key 值 能疾速定位的 value 的数据结构。既然聊到这个这里,那咱们应该分明 为什么会存在这样的数据结构,存在即正当是吧,那肯定是有需要 才会产生这样的数据结构,那就比照下常见的数据结构 在新增 和删除 工夫复杂度的体现

  1. 数组:数组是才用一段间断的地址空间来存储元素,能够指定下标 来 疾速的找到数值,工夫复杂度是 O(1), 然而通过给定的值来查找的时候,工夫复杂度是 O(n), 因为要遍历整个数组能力去做匹配,如果是有序数组的话 咱们能够通过二分法等伎俩 缩减查找时间,此时的复杂度是 O(long n), 然而对于个别的插入,删除操作,这个时刻就可能须要 挪动数组元素 这个是均匀工夫复杂度是 O(n)。
  2. 链表:链表的存储构造 是用一个头尾节点 去做关联的,所以这个时候不须要一段间断的地址空间和数组有所区别,这边还是把数组和链表做下比照,数组存储的时候须要开拓一段间断的地址空间去存储数据,然而对于一些大的对象 就要调配一段间断的地址空间,如果这个工夫空间很缓和 就难以调配,然而对于链表这样的存储构造,就是能够的 因为他不须要开拓间断的地址空间,只有一块一块的就能够了,用头尾节点做连贯即可。然而链表这样构造也存在肯定的缺点,就是要保护头尾节点多占空间,而且也会容易产生空间碎片。对于这样构造的优化,咱们能够从 Redis 中 quicklist 的到一些想法,有趣味的 能够本人去看看,弱小的 Redis 是怎么解决底层数据的存储的。前面有工夫了再写博文 聊聊。链表因为其存储构造的关系 在插入和删除的时候 就比拟快了 工夫复杂度是 O(1), 然而在查问的时候 就要从头开始遍历了复杂度是 O(n)。
  3. 二叉树:如果是对于一个均衡二叉树的话 插入 删除 和查问的 均匀工夫复杂度是 O(long n),HashMap 和 ConcurrentHashMap 在存储 hash 抵触的列表的时候 就是采取了链表和红黑树,当链表长度大于 8 的工夫,就会转变为红黑树,当红黑树的元素小于 6 的时候 就会转变了链表,我置信看过源码的小伙伴 应该能晓得,具体 为什么是 8,6 这肯定是平衡了插入和查问的 效率思考的~
  4. 哈希表:比照了 以上几种数据结构,哈希表 在这不便 是有劣势的 在插入 删除和查询方法 都是 O(1)的,当然是不思考 hash 抵触的状况下 hash 抵触了 还是是从抵触的列表外面去比照查问的。所以这个时候计算存哈希表中卡槽地位的 hash 函数 是因为的重要,这个函数的是否优良 间接决定了 hash 抵触的概率。

<br/> 咱们常见的 几个哈希表中 是怎么计算卡槽地位的

  • HashTable:【(key.hashCode() & 0x7FFFFFFF) % tab.length】; // 0x7FFFFFFF 的意思是 int 的最大值 二进制标识的话 全部都是高位 都是 1 第一为是 0 是符号位, 和 hash 值做了位与运算 就是保障了第一为肯定是 0 的到了一个负数 具体 为什么要这样做 能够去网上查查 次要是为了 范畴的问题。
  • HashMap: hash 值的算法:【(h = key.hashCode()) ^ (h >>> 16)】,计算 index:【(n – 1) & hash】,h 为什么要右移 16 位 正文下面说 是为了尽量使得高位掩码的的影响向下,缩小 hash 抵触,反正正文上是这么说的,具体起因 哈哈 自己没钻研明确,当前待补充。
  • ConcurrentHashMap:【((h=key.hashCode() ^ (h >>> 16) ) & 0x7fffffff】计算 index:【(n – 1) & hash)】

HashMap 和 ConcurrentHashMap

首先 这 2 个类 都是实现了 Map 接口 和继承了 AbstractMap 抽象类 所以咱们从 HashMap 切换成 ConcurrentHashMap 的时候 简直不必放心 因为办法都是差不多的。
应用 ConcurrentHashMap 的起因 肯定是因为 HashMap 在多线程下 不平安,其实这就是废话,哈哈,不平安的中央体现在,如果是多个线程同时拜访 HashMap 的时候,比方这个时候产生了扩容,扩容的时候 卡槽的外面的值 是要从新计算地位的 重新分配的,然而多线程的状况下 可能你计算好地位 当如卡槽的时候 这个时候 曾经产生了扩容 就会导致 你寄存的地位有问题,在新的扩容后数组下 再去依据 key 可能就找不道对应的 value 了,当然我这边只是说的一种状况,还有很多这样的状况。这个时候 咱们就要应用 JUC 外面的 ConcurrentHashMap。

ConcurrentHashMap 就是相当于 线程平安的 HashMap. 那上面就看下 ConcurrentHashMap 是怎么保障线程平安的。

ConcurrentHashMap

initTable 初始化

我在上篇博文说到过 HashMap 容量设置的时候 说起过 HashMap 的数组初始化 其实在是第一次 put 时候产生的,那咱们看下 ConcurrentHashMap 是什么样子的, 先看下代码

    private static final int MAXIMUM_CAPACITY = 1 << 30;
    
     /**
     * Creates a new, empty map with the default initial table size (16).
     */
    public ConcurrentHashMap() {}
    
    public ConcurrentHashMap(int initialCapacity) {if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }
    
    private static final int tableSizeFor(int c) {
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }

失常如果 咱们设置了 initialCapacity 的话 这个时候就会算出以后的 cap 这边 如果 initialCapacity 是 10 的话,initialCapacity >>> 1 的意思就是右移 1 位 10 的二进制是:1010 右移 1 位是:0101 十进制值是:5 那 tableSizeFor 外面的参数就是 10+5+1=16, tableSizeFor 这个办法意思 我在上篇博文中也提到过 就是算出 大于传入的值 最小的二的幂的值 如果传入 7 那记功就是 8 如果传入 17 那就是 32.

咱们晓得 数组初始化都是在第一次新增元素的时候做的,那咱们就看下 put 办法中 初始化办法

    /**
     * Initializes table, using the size recorded in sizeCtl.
     */
    private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {// 自旋解决 保障了肯定初始化胜利
            if ((sc = sizeCtl) < 0)// 小于 0 阐明 有线程正在进行初始化  就让出 CPU 资源
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {sizeCtl = sc;}
                break;
            }
        }
        return tab;
    }

首先 咱们看到 这边用了一个 While 判断 tab 是否实现初始化,也就是这个一个自旋解决,保障肯定能初始化胜利。
<br/> 前面判断了一下 sizeCtl 如果小于 0,其实就是等于 -1,阐明了此时有线程真在进行初始化,为什么要这样判断呢,是因为看下前面的那个 CAS 操作,就是批改 SIZECTL 为 -1,如果能批改胜利,就执行上面的初始化操作,如果批改失败,阐明
SIZECTL 的值曾经被批改,和预期的不符,就进行下次询换,这个时候 可能 tab 曾经初始化实现 就退出了循环,如果 tab 还没有初始化实现,此时这个条件 sc 小于 0 的条件又成立了 再次让出 CPU 资源 直到初始化实现。

持续说下数组初始化,一进入初始化还是判断了 table 是否是初始化应该有的状态,这边是一个双重 Check.

一种状况 sc 大于 0 就是咱们初始化的时候传入了 initialCapacity,计算失去了 sizeCtl 此时就用这个值,初始化数组大小。

另外的是应用的是默认的构造函数 没有传入 initialCapacity,此时就应用默认的值 DEFAULT_CAPACITY,这个默认值是 16.

看到这里 简略的一段代码,看人家写的多严禁,自旋 不是空自旋 还做了及时让出 CPU 资源,看源码的益处 是学习优良的代码!!!

就这样初始化实现了,咱们简略的总结下 ConcurrentHashMap 是用什么保障线程平安的

  • 自旋 保障肯定初始化实现
  • CAS 保障同一时间 只有一个初始化数组线程
  • 二次 check 保障数组初始化的时候 table 状态的对的

put 的线程平安

好的说完了初始化的办法 咱们来看下外围放入元素的办法。

先看下代码:

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());// 计算 失去哈希值
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {// 这边也是一个自旋操作 保障肯定能新增元素胜利
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();// 执行初始化操作
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
             <!-- 这边也是一个 CAS 操作 只有 的以后节点值是 null 的时候才能够网卡槽外面放入元素 -->
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)//MOVED 是一个固定的值 是为了标识 以后正在扩容,此节点已被转移,不能操作 须要期待扩容后 能力操作,此时是自悬期待
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) { // 这边应用了 synchronized 锁哈
                    //Hash 抵触后处理
                }
                if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)// 如果链表的长度大于等 8 就要转变为红黑树
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);// 这边新增数量大小,而后判断是否要进行扩容
        return null;
    }

一些具体的操作 我都加了正文,这边也是才用和初始化 table 的时候 一样的伎俩来保障多线程下的平安

  • 自旋 自旋保障了 元素肯定能被放入胜利
  • CSA CAS 操作 保障了 放入数据的时候 以后节点肯定是 null 的 如果 在这期间 有别的线程也来操作这个卡槽的值,那 CAS 中 table 中 i 地位的值 就肯定不是 null 了,和预期值不符,那就 CAS 执行失败,进入下次循环 下次循环的时候 发现此槽位曾经有值了 就会走到 Hash 碰撞后的 Synchronized 的办法块外面,这样保障了 在新的卡槽 新增第一个元素的是线程平安的
  • synchronized 锁 保障执行的线程平安

transfer 扩容时的线程平安

ConcurrentHashMap 和 HashMap 在操作扩容的机会都是一样的,都是在新增原数后 判断是否达到临界值 如果达到 就进行扩容 都是 2 倍的扩容原大小
那咱们就看下 transfer 代码

 /**
     * Moves and/or copies the nodes in each bin to new table. See
     * above for explanation.
     */
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // 这边是设置 CPU 可操作数组的 node 步长 最低 是 16
        <!--tab 是新的数组  这边的 nextTab 就是扩容后的数组 -->    
        if (nextTab == null) {            // initiating
            try {@SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];// 扩容的时候 是 n << 1 后果相当于 n *2
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;// 当在扩容过程中的时候 一个过渡的表
            transferIndex = n;// 这边 transferIndex 是扩容的时候 第一个转移的卡槽的节点,对原数组的操作是顺叙的。}
        int nextn = nextTab.length;
        
         /**
         * ForwardingNode 是继承了 Node 的 然而他所有节点的 hash 值都是设置成了 MOVED,标识着以后节点在扩容当中
         * above for explanation.
         */
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {
                <!--nextIndex 是开始拷贝的槽点地位 是从尾部开始 每次 - 1 的 -->
                int nextIndex, nextBound;
                <!-- --i >= bound 这个阐明此次的循环完结 bound 是每次执行到最初的那个值  finishing 阐明这个整个拷贝完结 -->
                if (--i >= bound || finishing)
                    advance = false;
               <!-- 阐明整个数组都拷贝完结了 index 曾经是 到最开始的处  顺叙拷贝 的所以是判断小于等 0 是算完结 -->
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;// 这边开始 每次减一
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {
                    nextTable = null;// 长期表 赋值为 null
                    table = nextTab;// 完结后 新的 table 赋值
                    sizeCtl = (n << 1) - (n >>> 1);//
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {synchronized (f) {if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {// 是链表的时候解决}
                        else if (f instanceof TreeBin) {// 红黑树的时候解决}
                    }
                }
            }
        }
    }

扩容的代码 有点简短,须要肯定的工夫 去了解
大体的思路是这样的:

  1. 首先从原数组的队尾开始进行拷贝
  2. 拷贝数组的时候 会把原数组的槽点的锁住应用的是 synchronized,这样 原数组外面的数据就没法被批改,保障了线程平安,胜利拷贝到新数组后,把原数组的槽点设置为转移节点 move。
  3. 如果这个时候有数据的 put 以后槽点状态是转移节点也就是 move,就会始终期待,这个讲 put 办法的时候也聊到过
  4. 直到原数组所有的节点被复制到新数组外面,而后再把新数组赋值给数组容器,实现拷贝

总结一下,在数组扩容的时候,次要是利用 Synchronized 锁去锁住槽点,不让别的线程去操作,槽点复制胜利后,会标识为转移节点,这样新的 put 操作过去,看的槽点状态是 move, 就会始终期待扩容实现后才会再进行 put 操作。

get

最初咱们来看下 get 办法

public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());// 获取 hash 值
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            <!-- 依据 hash 值 找到槽点  而后看槽点的第一个节点 是否是要找到值 如果是间接返回 -->
            if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            <!-- 节点的 hash 值小于 0 阐明是转移节点 或者是红黑树 -->
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            <!-- 判断如果是链表 就遍历 查找 -->
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

看完了 整个 get 办法 咱们发现 这个获取办法 和咱们的 HashMap 中的 get 办法 并没有什么区别,没有任何的锁 CAS 等等,那是怎么做到的呢

持续看下代码中

**
     * The array of bins. Lazily initialized upon first insertion.
     * Size is always a power of two. Accessed directly by iterators.
     */
    transient volatile Node<K,V>[] table;

    /**
     * The next table to use; non-null only while resizing.
     */
    private transient volatile Node<K,V>[] nextTable;

    /**
     * Base counter value, used mainly when there is no contention,
     * but also as a fallback during table initialization
     * races. Updated via CAS.
     */
    private transient volatile long baseCount;

    /**
     * Table initialization and resizing control.  When negative, the
     * table is being initialized or resized: -1 for initialization,
     * else -(1 + the number of active resizing threads).  Otherwise,
     * when table is null, holds the initial table size to use upon
     * creation, or 0 for default. After initialization, holds the
     * next element count value upon which to resize the table.
     */
    private transient volatile int sizeCtl;

    /**
     * The next table index (plus one) to split while resizing.
     */
    private transient volatile int transferIndex;

    /**
     * Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
     */
    private transient volatile int cellsBusy;

    /**
     * Table of counter cells. When non-null, size is a power of 2.
     */
    private transient volatile CounterCell[] counterCells;

大家有没有发现 这些 类的成员变量 都是加了非凡的关键字 volatile,这个关键字 我想大家都晓得什么意思 就是保障了多线程之间 内存的可见性,也就是 A 线程批改了变量的值 B 线程能立马晓得值的变动,并且获取到最新的,具体是怎么实现的 这个次要应用是内存屏障,缓存一致性等技术实现,不是本文的重点,当前缓缓聊。

然而 咱们晓得 volatile 润饰的关键字,如果是根底值类型 保障了多线程批改后值类型的可见性 然而这边润饰了 数组容器 table 是一个数组的对象 是一个援用类型 这样尽管保障了数组对象的援用可见性,然而数组对象 外面的元素批改了 是没法晓得的呀,那咱们再去看下 数组容器外面的 Node 节点

   static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;

        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
    }

当看到这里的时候 我差不多晓得了 这里的 node 节点外面的 val 节点值,和下一个节点 next 都是加了 volatitle 关键字的,也就是说 咱们在批改 节点值的时候 和插入节点值的时候 都是线程之间可见的,这样咱们在应用 get 办法的时候 就能在不加任何锁的状况下 失去最新的值,也就实现了多线程下的同步,保障了多线程下的平安。

volatitle 关键字 真香呀,而且性能很高,明天还看了本书 书上说 volatitle 利用的切当话,在并发状况下 完满的代替锁,来进步多线程下的性能。

总结

ConcurrentHashMap 次要应用的是 CAS+ 自旋 +synchronized+ 多重 check 来保障在初始化,新增,和扩容的时候线程平安,读取数据的时候则应用了 volatitle 让元素节点 在多线程之间 可见,从而达到获取最新的值!

又实现了一篇 哇偶!!!!持续加油吧

正文完
 0