前言

最近在看公众号 看到一个问题:为什么 ConcurrentHashMap 的读操作不须要加锁?<br/>

第一次看到这个问题的时候 我也的确比拟懵逼 我尽管晓得ConCurrentHashMap 是怎么在put的时候 去保障线程平安的,然而真的没关注过 get取的时候 怎么去保障线程平安的 明天咱们就从这个问题切入 来看下 是怎么做到的

剖析

首先 咱们应该分明下ConcurrentHashMap 怎么去保障 线程平安的,我会别离从初始化的时候 和新增元素的时候 用了那些伎俩去保障了线程平安

初识ConcurrentHashMap

首先 咱们还是要先理解下ConcurrentHashMap 是什么 在那种场景下应用。<br/>
看名字 咱们晓得 首先 这个类应该是 在java.util.concurrent包中(JUC)这个包很重要,咱们常见的 线程平安的汇合 都会在这个包外面,再看前面的是HashMap.

数据结构比照

HashMap咱们晓得 哈希表,是一个双列汇合。是一个能依据key值 能疾速定位的value的数据结构。 既然聊到这个这里,那咱们应该分明 为什么会存在这样的数据结构,存在即正当是吧,那肯定是有需要 才会产生这样的数据结构,那就比照下常见的数据结构 在新增 和删除 工夫复杂度的体现

  1. 数组:数组是才用一段间断的地址空间来存储元素,能够指定下标 来 疾速的找到数值,工夫复杂度是O(1),然而通过给定的值来查找的时候,工夫复杂度是O(n),因为要遍历整个数组能力去做匹配,如果是有序数组的话 咱们能够通过二分法等伎俩 缩减查找时间,此时的复杂度是O(long n),然而对于个别的插入,删除操作,这个时刻就可能须要 挪动数组元素 这个是均匀工夫复杂度是O(n)。
  2. 链表:链表的存储构造 是用一个头尾节点 去做关联的,所以这个时候不须要一段间断的地址空间和数组有所区别,这边还是把数组和链表做下比照,数组存储的时候须要开拓一段间断的地址空间去存储数据,然而对于一些大的对象 就要调配一段间断的地址空间,如果这个工夫空间很缓和 就难以调配 ,然而对于链表这样的存储构造,就是能够的 因为他不须要开拓间断的地址空间,只有一块一块的就能够了,用头尾节点做连贯即可。然而链表这样构造也存在肯定的缺点,就是要保护头尾节点多占空间,而且也会容易产生空间碎片。对于这样构造的优化,咱们能够从Redis中quicklist 的到一些想法,有趣味的 能够本人去看看,弱小的Redis 是怎么解决底层数据的存储的。前面有工夫了再写博文 聊聊。链表因为其存储构造的关系 在插入和删除的时候 就比拟快了 工夫复杂度是O(1),然而在查问的时候 就要从头开始遍历了复杂度是O(n)。
  3. 二叉树:如果是对于一个均衡二叉树的话 插入 删除 和查问的 均匀工夫复杂度是O(long n),HashMap和ConcurrentHashMap 在存储hash抵触的列表的时候 就是采取了链表和红黑树,当链表长度大于8的工夫,就会转变为红黑树,当红黑树的元素小于6的时候 就会转变了链表,我置信看过源码的小伙伴 应该能晓得,具体 为什么是8,6 这肯定是平衡了插入和查问的 效率思考的~
  4. 哈希表:比照了 以上几种数据结构,哈希表 在这不便 是有劣势的 在插入 删除和查询方法 都是O(1)的 ,当然是不思考hash抵触的状况下 hash抵触了 还是是从抵触的列表外面去比照查问的。所以这个时候计算存哈希表中卡槽地位的hash函数 是因为的重要,这个函数的是否优良 间接决定了 hash抵触的概率。

<br/>咱们常见的 几个哈希表中 是怎么计算卡槽地位的

  • HashTable: 【(key.hashCode() & 0x7FFFFFFF) % tab.length】; // 0x7FFFFFFF 的意思是int的最大值 二进制标识的话 全部都是高位 都是1 第一为是0 是符号位, 和hash值做了位与运算 就是保障了第一为肯定是0 的到了一个负数 具体 为什么要这样做 能够去网上查查 次要是为了 范畴的问题。
  • HashMap: hash值的算法:【(h = key.hashCode()) ^ (h >>> 16)】,计算index: 【(n - 1) & hash】,h为什么要右移16位 正文下面说 是为了尽量使得高位掩码的的影响向下,缩小hash抵触,反正正文上是这么说的,具体起因 哈哈 自己没钻研明确,当前待补充。
  • ConcurrentHashMap: 【( (h=key.hashCode() ^ (h >>> 16) ) & 0x7fffffff】计算index:【(n - 1) & hash)】

HashMap和ConcurrentHashMap

首先 这2个类 都是实现了Map接口 和继承了AbstractMap抽象类 所以咱们从HashMap 切换成ConcurrentHashMap的时候 简直不必放心 因为办法都是差不多的。
应用ConcurrentHashMap的起因 肯定是因为HashMap 在多线程下 不平安,其实这就是废话,哈哈,不平安的中央体现在,如果是多个线程同时拜访HashMap的时候,比方这个时候产生了扩容,扩容的时候 卡槽的外面的值 是要从新计算地位的 重新分配的,然而多线程的状况下 可能你计算好地位 当如卡槽的时候 这个时候 曾经产生了扩容 就会导致 你寄存的地位有问题,在新的扩容后数组下 再去依据key 可能就找不道对应的value了,当然我这边只是说的一种状况,还有很多这样的状况。这个时候 咱们就要应用JUC外面的ConcurrentHashMap。

ConcurrentHashMap 就是相当于 线程平安的HashMap. 那上面就看下ConcurrentHashMap是怎么保障线程平安的。

ConcurrentHashMap

initTable初始化

我在上篇博文说到过HashMap 容量设置的时候 说起过 HashMap 的数组初始化 其实在是第一次put时候产生的,那咱们看下 ConcurrentHashMap 是什么样子的,先看下代码

    private static final int MAXIMUM_CAPACITY = 1 << 30;         /**     * Creates a new, empty map with the default initial table size (16).     */    public ConcurrentHashMap() {    }        public ConcurrentHashMap(int initialCapacity) {        if (initialCapacity < 0)            throw new IllegalArgumentException();        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?                   MAXIMUM_CAPACITY :                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));        this.sizeCtl = cap;    }        private static final int tableSizeFor(int c) {        int n = c - 1;        n |= n >>> 1;        n |= n >>> 2;        n |= n >>> 4;        n |= n >>> 8;        n |= n >>> 16;        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;    }

失常如果 咱们设置了initialCapacity的话 这个时候就会算出以后的cap 这边 如果initialCapacity是10的话,initialCapacity >>> 1的意思就是右移1位 10的二进制是:1010 右移1位是:0101 十进制值是:5 那 tableSizeFor外面的参数就是 10+5+1=16, tableSizeFor 这个办法意思 我在上篇博文中也提到过 就是算出 大于传入的值 最小的二的幂的值 如果传入7 那记功就是8 如果传入17 那就是32.

咱们晓得 数组初始化都是在第一次新增元素的时候做的,那咱们就看下put办法中 初始化办法

    /**     * Initializes table, using the size recorded in sizeCtl.     */    private final Node<K,V>[] initTable() {        Node<K,V>[] tab; int sc;        while ((tab = table) == null || tab.length == 0) {//自旋解决 保障了肯定初始化胜利            if ((sc = sizeCtl) < 0)//小于0 阐明 有线程正在进行初始化  就让出CPU资源                Thread.yield(); // lost initialization race; just spin            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {                try {                    if ((tab = table) == null || tab.length == 0) {                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;                        @SuppressWarnings("unchecked")                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];                        table = tab = nt;                        sc = n - (n >>> 2);                    }                } finally {                    sizeCtl = sc;                }                break;            }        }        return tab;    }

首先 咱们看到 这边用了一个While 判断tab是否实现初始化,也就是这个一个自旋解决,保障肯定能初始化胜利。
<br/>前面判断了一下sizeCtl 如果小于0,其实就是等于-1,阐明了此时有线程真在进行初始化,为什么要这样判断呢,是因为看下前面的那个CAS 操作,就是批改SIZECTL为 -1,如果能批改胜利,就执行上面的初始化操作,如果批改失败,阐明
SIZECTL的值曾经被批改,和预期的不符,就进行下次询换,这个时候 可能tab曾经初始化实现 就退出了循环,如果tab还没有初始化实现,此时这个条件sc小于0的条件又成立了 再次让出CPU资源 直到初始化实现。

持续说下数组初始化,一进入初始化还是判断了table是否是初始化应该有的状态,这边是一个双重Check.

一种状况sc大于0 就是咱们初始化的时候传入了initialCapacity,计算失去了sizeCtl 此时就用这个值,初始化数组大小。

另外的是应用的是默认的构造函数 没有传入initialCapacity,此时就应用默认的值DEFAULT_CAPACITY,这个默认值是16.

看到这里 简略的一段代码,看人家写的多严禁,自旋 不是空自旋 还做了及时让出CPU资源,看源码的益处 是学习优良的代码!!!

就这样初始化实现了,咱们简略的总结下 ConcurrentHashMap是用什么保障线程平安的

  • 自旋 保障肯定初始化实现
  • CAS 保障同一时间 只有一个初始化数组线程
  • 二次check保障数组初始化的时候 table状态的对的

put的线程平安

好的说完了初始化的办法 咱们来看下外围放入元素的办法。

先看下代码:

    /** Implementation for put and putIfAbsent */    final V putVal(K key, V value, boolean onlyIfAbsent) {        if (key == null || value == null) throw new NullPointerException();        int hash = spread(key.hashCode());//计算 失去哈希值        int binCount = 0;        for (Node<K,V>[] tab = table;;) {//这边也是一个自旋操作 保障肯定能新增元素胜利            Node<K,V> f; int n, i, fh;            if (tab == null || (n = tab.length) == 0)                tab = initTable();//执行初始化操作            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {             <!--这边也是一个CAS 操作 只有 的以后节点值是null 的时候才能够网卡槽外面放入元素-->                if (casTabAt(tab, i, null,                             new Node<K,V>(hash, key, value, null)))                    break;                   // no lock when adding to empty bin            }            else if ((fh = f.hash) == MOVED)//MOVED 是一个固定的值 是为了标识 以后正在扩容,此节点已被转移,不能操作 须要期待扩容后 能力操作,此时是自悬期待                tab = helpTransfer(tab, f);            else {                V oldVal = null;                synchronized (f) { //这边应用了synchronized锁哈                    //Hash抵触后处理                }                if (binCount != 0) {                    if (binCount >= TREEIFY_THRESHOLD)//如果链表的长度大于等8 就要转变为红黑树                        treeifyBin(tab, i);                    if (oldVal != null)                        return oldVal;                    break;                }            }        }        addCount(1L, binCount);// 这边新增数量大小,而后判断是否要进行扩容        return null;    }

一些具体的操作 我都加了正文 ,这边也是才用和初始化table的时候 一样的伎俩来保障多线程下的平安

  • 自旋 自旋保障了 元素肯定能被放入胜利
  • CSA CAS 操作 保障了 放入数据的时候 以后节点肯定是null的 如果 在这期间 有别的线程也来操作这个卡槽的值,那CAS 中table中i 地位的值 就肯定不是null了,和预期值不符,那就CAS 执行失败,进入下次循环 下次循环的时候 发现此槽位曾经有值了 就会走到Hash碰撞后的Synchronized的办法块外面,这样保障了 在新的卡槽 新增第一个元素的是线程平安的
  • synchronized 锁 保障执行的线程平安

transfer 扩容时的线程平安

ConcurrentHashMap 和HashMap 在操作扩容的机会都是一样的,都是在新增原数后 判断是否达到临界值 如果达到 就进行扩容 都是2倍的扩容原大小
那咱们就看下transfer 代码

 /**     * Moves and/or copies the nodes in each bin to new table. See     * above for explanation.     */    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {        int n = tab.length, stride;        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)            stride = MIN_TRANSFER_STRIDE; // 这边是设置CPU 可操作数组的node步长 最低 是16        <!--tab是新的数组  这边的nextTab 就是扩容后的数组 -->            if (nextTab == null) {            // initiating            try {                @SuppressWarnings("unchecked")                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//扩容的时候 是n << 1 后果相当于n*2                nextTab = nt;            } catch (Throwable ex) {      // try to cope with OOME                sizeCtl = Integer.MAX_VALUE;                return;            }            nextTable = nextTab;//当在扩容过程中的时候 一个过渡的表            transferIndex = n;// 这边transferIndex 是扩容的时候 第一个转移的卡槽的节点,对原数组的操作是顺叙的。        }        int nextn = nextTab.length;                 /**         * ForwardingNode是继承了Node的 然而他所有节点的hash值都是设置成了MOVED,标识着以后节点在扩容当中         * above for explanation.         */        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);        boolean advance = true;        boolean finishing = false; // to ensure sweep before committing nextTab        for (int i = 0, bound = 0;;) {            Node<K,V> f; int fh;            while (advance) {                <!--nextIndex 是开始拷贝的槽点地位 是从尾部开始 每次-1的 -->                int nextIndex, nextBound;                <!-- --i >= bound这个阐明此次的循环完结 bound是每次执行到最初的那个值  finishing阐明这个整个拷贝完结-->                if (--i >= bound || finishing)                    advance = false;               <!--阐明整个数组都拷贝完结了 index 曾经是 到最开始的处  顺叙拷贝 的所以是判断小于等0 是算完结-->                else if ((nextIndex = transferIndex) <= 0) {                    i = -1;                    advance = false;                }                else if (U.compareAndSwapInt                         (this, TRANSFERINDEX, nextIndex,                          nextBound = (nextIndex > stride ?                                       nextIndex - stride : 0))) {                    bound = nextBound;                    i = nextIndex - 1;// 这边开始 每次减一                    advance = false;                }            }            if (i < 0 || i >= n || i + n >= nextn) {                int sc;                if (finishing) {                    nextTable = null;//长期表 赋值为null                    table = nextTab;//完结后 新的table 赋值                    sizeCtl = (n << 1) - (n >>> 1);//                    return;                }                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)                        return;                    finishing = advance = true;                    i = n; // recheck before commit                }            }            else if ((f = tabAt(tab, i)) == null)                advance = casTabAt(tab, i, null, fwd);            else if ((fh = f.hash) == MOVED)                advance = true; // already processed            else {                synchronized (f) {                    if (tabAt(tab, i) == f) {                        Node<K,V> ln, hn;                        if (fh >= 0) {                           //是链表的时候解决                        }                        else if (f instanceof TreeBin) {                           //红黑树的时候解决                        }                    }                }            }        }    }

扩容的代码 有点简短,须要肯定的工夫 去了解
大体的思路是这样的:

  1. 首先从原数组的队尾开始进行拷贝
  2. 拷贝数组的时候 会把原数组的槽点的锁住应用的是synchronized,这样 原数组外面的数据就没法被批改,保障了线程平安,胜利拷贝到新数组后,把原数组的槽点设置为转移节点move。
  3. 如果这个时候有数据的put 以后槽点状态是转移节点也就是move,就会始终期待,这个讲put办法的时候也聊到过
  4. 直到原数组所有的节点被复制到新数组外面,而后再把新数组赋值给数组容器,实现拷贝

总结一下,在数组扩容的时候,次要是利用Synchronized锁去锁住槽点,不让别的线程去操作,槽点复制胜利后,会标识为转移节点,这样新的put操作过去,看的槽点状态是move,就会始终期待扩容实现后才会再进行put操作。

get

最初咱们来看下get办法

public V get(Object key) {        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;        int h = spread(key.hashCode());//获取hash值        if ((tab = table) != null && (n = tab.length) > 0 &&            (e = tabAt(tab, (n - 1) & h)) != null) {            <!--依据hash值 找到槽点  而后看槽点的第一个节点 是否是要找到值 如果是间接返回-->            if ((eh = e.hash) == h) {                if ((ek = e.key) == key || (ek != null && key.equals(ek)))                    return e.val;            }            <!--节点的hash值小于0 阐明是转移节点 或者是红黑树-->            else if (eh < 0)                return (p = e.find(h, key)) != null ? p.val : null;            <!--判断如果是链表 就遍历 查找-->            while ((e = e.next) != null) {                if (e.hash == h &&                    ((ek = e.key) == key || (ek != null && key.equals(ek))))                    return e.val;            }        }        return null;    }

看完了 整个get 办法 咱们发现 这个获取办法 和咱们的HashMap中的get 办法 并没有什么区别,没有任何的锁 CAS 等等,那是怎么做到的呢

持续看下代码中

**     * The array of bins. Lazily initialized upon first insertion.     * Size is always a power of two. Accessed directly by iterators.     */    transient volatile Node<K,V>[] table;    /**     * The next table to use; non-null only while resizing.     */    private transient volatile Node<K,V>[] nextTable;    /**     * Base counter value, used mainly when there is no contention,     * but also as a fallback during table initialization     * races. Updated via CAS.     */    private transient volatile long baseCount;    /**     * Table initialization and resizing control.  When negative, the     * table is being initialized or resized: -1 for initialization,     * else -(1 + the number of active resizing threads).  Otherwise,     * when table is null, holds the initial table size to use upon     * creation, or 0 for default. After initialization, holds the     * next element count value upon which to resize the table.     */    private transient volatile int sizeCtl;    /**     * The next table index (plus one) to split while resizing.     */    private transient volatile int transferIndex;    /**     * Spinlock (locked via CAS) used when resizing and/or creating CounterCells.     */    private transient volatile int cellsBusy;    /**     * Table of counter cells. When non-null, size is a power of 2.     */    private transient volatile CounterCell[] counterCells;

大家有没有发现 这些 类的成员变量 都是加了非凡的关键字 volatile,这个关键字 我想大家都晓得什么意思 就是保障了多线程之间 内存的可见性,也就是A线程批改了变量的值 B线程能立马晓得值的变动,并且获取到最新的,具体是怎么实现的 这个次要应用是内存屏障,缓存一致性等技术实现,不是本文的重点,当前缓缓聊。

然而 咱们晓得volatile 润饰的关键字,如果是根底值类型 保障了多线程批改后值类型的可见性 然而这边润饰了 数组容器table 是一个数组的对象 是一个援用类型 这样尽管保障了数组对象的援用可见性,然而数组对象 外面的元素批改了 是没法晓得的呀,那咱们再去看下 数组容器外面的Node节点

   static class Node<K,V> implements Map.Entry<K,V> {        final int hash;        final K key;        volatile V val;        volatile Node<K,V> next;        Node(int hash, K key, V val, Node<K,V> next) {            this.hash = hash;            this.key = key;            this.val = val;            this.next = next;        }    }

当看到这里的时候 我差不多晓得了 这里的node节点外面的val节点值,和下一个节点next都是加了volatitle关键字的,也就是说 咱们在批改 节点值的时候 和插入节点值的时候 都是线程之间可见的,这样咱们在应用get办法的时候 就能在不加任何锁的状况下 失去最新的值,也就实现了多线程下的同步,保障了多线程下的平安。

volatitle 关键字 真香呀,而且性能很高,明天还看了本书 书上说volatitle利用的切当话,在并发状况下 完满的代替锁,来进步多线程下的性能。

总结

ConcurrentHashMap 次要应用的是 CAS+自旋+synchronized+多重check 来保障在初始化,新增,和扩容的时候线程平安,读取数据的时候则应用了 volatitle 让元素节点 在多线程之间 可见,从而达到获取最新的值!

又实现了一篇 哇偶!!!! 持续加油吧