关于前端:详解ConCurrentHashMap源码jdk18

39次阅读

共计 10814 个字符,预计需要花费 28 分钟才能阅读完成。

ConCurrentHashMap 是一个反对高并发汇合,罕用的汇合之一,在 jdk1.8ConCurrentHashMap 的构造和操作和 HashMap 都很相似:

  • 数据结构基于 数组 + 链表 / 红黑树
  • get 通过计算 hash 值后取模数组长度确认索引来查问元素。
  • put 办法也是先找索引地位,而后不存在就间接增加,存在雷同 key 就替换。
  • 扩容都是创立新的 table 数组,原来的数据转移到新的 table 数组中。

惟一不同的是,HashMap 不反对并发操作,ConCurrentHashMap 是反对并发操作的。所以 ConCurrentHashMap 的设计也比 HashMap 也简单的多,通过浏览 ConCurrentHashMap 的源码,也更加理解一些并发的操作, 比方:

<pre class=”hljs java”>volatile
CAS
synchronized
</pre>

详见 HashMap 相干文章:

详解 HashMap 源码解析(上)

详解 HashMap 源码解析(下)

数据结构

ConCurrentHashMap 是由 数组 + 链表 / 红黑树 组成的:

其中左侧局部是一个 哈希表 , 通过 hash 算法 确定元素在数组的下标:

transient volatile Node<K,V>[] table;

链表是为了解决 hash 抵触 , 当发生冲突的时候。采纳 链表法 , 将元素增加到链表的尾部。其中 Node 节点存储数据:

static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;

    Node(int hash, K key, V val, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.val = val;
        this.next = next;
    }
}

Node 节点蕴含:

hash
key
value
next

次要属性字段

// 最大容量
int MAXIMUM_CAPACITY = 1 << 30;

// 初始化容量
int DEFAULT_CAPACITY = 16

// 管制数组初始化或者扩容,为正数时,示意数组正在初始化或者扩容。- 1 示意正在初始化。其余状况 - n 示意 n 线程正在扩容。private transient volatile int sizeCtl;

// 装载因子
float LOAD_FACTOR = 0.75f

// 链表长度为 8 转成红黑树
int TREEIFY_THRESHOLD = 8

// 红黑树长度小于 6 进化成链表
int UNTREEIFY_THRESHOLD = 6;

获取数据 get

public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // 计算 hash 值
    int h = spread(key.hashCode());
    // 判断 tab 不为空并且 tab 对应的下标不为空 
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        // eh < 0 示意遇到扩容
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        // 遍历链表,直到遍历 key 相等的值 
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}
  • 获取数据流程:

    • 调用 spread 获取 hash 值, 通过 (n - 1) & h 取余获取数组下标的数据。
    • 首节点合乎就返回数据。
    • eh<0 示意遇到了扩容,会调用正在扩容节点 ForwardingNodefind 办法,查找该节点,匹配就返回。
    • 遍历链表,匹配到数据就返回。
    • 以上都不合乎,返回 null

get 如何实现线程平安

get 办法外面没有应用到锁,那是如何实现线程平安。次要应用到了 volatile

  • volatile

一个线程对共享变量的批改,另外一个线程可能立即看到,咱们称为 可见性

cpu 运行速度比内存速度快很多,为了平衡和内存之间的速度差别,减少了 cpu 缓存 , 如果在 cpu 缓存中存在 cpu 须要数据,阐明命中了 cpu 缓存,就不通过拜访内存。如果不存在,则要先把内存的数据载入到 cpu 缓存中,在返回给 cpu 处理器。

多核 cpu 的服务器中,每个 cpu 都有本人的缓存,cpu 之间的缓存是不共享的。当多个线程在不同的 cpu 上执行时,比方下图中,线程 A 操作的是 cpu-1 上的缓存,线程 B 操作的是 cpu-2 上的缓存,这个时候,线程 A 变量 V 的操作对于 线程 B 不可见的

然而一个变量被 volatile 申明,它的意思是:

通知编译器,对这个变量的读写,不能应用 cpu 缓存,必须从内存中读取或者写入。

下面的变量 V 被 volatile 申明,线程 A 在 cup- 1 中批改了数据,会间接写到内存中,不会写入到 cpu 缓存中。而线程 B 无奈从 cpu 缓存读取变量,须要从主内存拉取数据。

  • 总结:

     volatile

volatileget 利用

  • table 哈希表

    transient volatile Node<K,V>[] table;

    应用 volatile 申明数组,示意 援用地址 volatile 而不是 数组元素volatile

既然不是 数组元素 被润饰成 volatile,那实现线程平安在看 Node 节点。

  • Node 节点

    static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
    }

    其中 valnext 都用了 volatile 润饰,在多线程环境下,线程 A 批改节点 val 或者新增节点对他人线程是 可见的

所以 get 办法应用无锁操作是能够 保障线程平安

既然 volatile 润饰数组对 get 操作没有成果,那加在 volatile 上有什么目标呢?

是为了数组在扩容的时候对其余线程具备可见性。

  • jdk 1.8 的 get 操作不应用锁,次要有两个方面:

    • Node 节点的 valnext 都用 volatile 润饰,保障线程批改或者新增节点对他人线程是可见的。
    • volatile 润饰 table 数组,保障数组在扩容时其它线程是具备可见性的。

增加数据 put

put(K key, V value) 间接调用 putVal(key, value, false) 办法。

public V put(K key, V value) {return putVal(key, value, false);
}

putVal() 办法:

final V putVal(K key, V value, boolean onlyIfAbsent) {
        // key 或者 value 为空,报空指针谬误
        if (key == null || value == null) throw new NullPointerException();
        // 计算 hash 值
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                // tab 为空或者长度为 0,初始化 table
                tab = initTable();
            // 应用 volatile 查找索引下的数据
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // 索引地位没有数据,应用 cas 增加数据
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // MOVED 示意数组正在进行数组扩容,以后进行也加入到数组复制
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                // 数组不在扩容和也有值,阐明数据下标处有值
                // 链表中有数据,应用 synchronized 同步锁
                synchronized (f) {if (tabAt(tab, i) == f) {
                        // 为链表
                        if (fh >= 0) {
                            binCount = 1;
                            // 遍历链表
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                // hash 以及 key 雷同,替换 value 值
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                // 遍历到链表尾,增加链表节点
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 红黑树,TreeBin 哈希值固定为 -2
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    // 链表转红黑树
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
  • 增加数据流程:

    • 判断 key 或者 valuenull 都会报空指针谬误。
    • 计算 hash 值, 而后开启 没有终止条件 的循环。
    • 如果 table 数组为 null , 初始化数组。
    • 数组 table 不为空,通过 volatile 找到数组对应下标是否为空,为空就应用 CAS 增加头结点。
    • 节点的 hash = -1 示意数组正在扩容,一起进行扩容操作。
    • 以上不合乎,阐明索引处有值,应用 synchronized 锁住以后地位的节点,避免被其余线程批改。

      • 如果是链表,遍历链表,匹配到雷同的 key 替换 value 值。如果链表找不到,就增加到链表尾部。
      • 如果是红黑树,就增加到红黑树中。
    • 节点的链表个数大于 8 , 链表就转成红黑树。

ConcurrentHashMap 键值对为什么都不能为 null , 而 HashMap 就能够?

通过 get 获取数据时,如果获取的数据是 null,就无奈判断,是 put 时的 value 为 null,还是找个 key 就没做过映射。HashMap 是非并发的,能够通过 contains(key) 判断,而反对并发的 ConcurrentHashMap 在调用 contains 办法和 get 办法的时候,map 可能曾经不同了。参考

如果数组 table 为空调用 initTable 初始化数组:

private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;
    // table 为 null
    while ((tab = table) == null || tab.length == 0) {if ((sc = sizeCtl) < 0)
         // sizeCtl<0 示意其它线程正在初始化数组数组,以后线程须要让出 CPU
            Thread.yield(); // lost initialization race; just spin
        // 调用 CAS 初始化 table 表 
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {sizeCtl = sc;}
            break;
        }
    }
    return tab;
}

initTable 判断 sizeCtl 值,如果 sizeCtl-1 示意有其余线程正在初始化数组,以后线程调用 Thread.yield 让出 CPU。而正在初始化数组的线程通过 Unsafe.compareAndSwapInt 办法将 sizeCtl 改成 -1

initTable 最外层始终应用 while 循环,而非 if 条件判断,就是确保数组能够初始化胜利。

数组初始化胜利之后,再执行增加的操作,调用 tableAt 通过 volatile 的形式找到 (n-1)&hash 处的 bin 节点。

  • 如果为空,应用 CAS 增加节点。
  • 不为空,须要应用 synchronized 锁,索引对应的 bin 节点,进行增加或者更新操作。

    Insertion (via put or its variants) of the first node in an  empty bin is performed by just CASing it to the bin. This is  by far the most common case for put operations under most  key/hash distributions. Other update operations (insert,  delete, and replace) require locks. We do not want to waste  the space required to associate a distinct lock object with  each bin, so instead use the first node of a bin list itself as  a lock. Locking support for these locks relies on builtin  "synchronized" monitors.
    

    如果 f 的 hash 值为 -1,阐明以后 f 是 ForwaringNode 节点,意味着有其它线程正在扩容,则一起进行扩容操作。

实现增加或者更新操作之后,才执行 break 终止最外层 没有终止条件的 for 循环,确保数据能够增加胜利。

最初执行 addCount 办法。

private final void addCount(long x, int check) {CounterCell[] as; long b, s;
    // 利用 CAS 更新 baseCoount
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();}
    // check >= 0, 须要查看是否须要进行扩容操作
    if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {int rs = resizeStamp(n);
            if (sc < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();}
    }
}

扩容 transfer

什么时候会扩容

* 插入一个新的节点:

  • 新增节点,所在的链表元素个数达到阈值 8,则会调用 treeifyBin 把链表转成红黑树,在转成之前,会判断数组长度小于 MIN_TREEIFY_CAPACITY , 默认是 64 , 触发扩容。
  • 调用 put 办法,在结尾 addCount 办法记录元素个数,并查看是否进行扩容,数组元素达到阈值时,触发扩容。

不应用加锁的,反对多线程扩容。利用并发解决缩小扩容带来性能的影响。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) {            // initiating
        try {@SuppressWarnings("unchecked")
            // 创立 nextTab,容量为原来容量的两倍
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            // 扩容是抛出异样,将阈值设置成最大,示意不再扩容。sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    int nextn = nextTab.length;
    // 创立 ForwardingNode 节点,作为标记位,表明以后地位曾经做过桶解决
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    // advance = true 表明该节点曾经解决过了
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        // 管制 --i,遍历原 hash 表中的节点
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            // 用 CAS 计算失去的 transferIndex
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        // 将原数组中的节点赋值到新的数组中,nextTab 赋值给 table,清空 nextTable。if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // 所有节点实现复制工作,if (finishing) {
                nextTable = null;
                table = nextTab;
                // 设置新的阈值为原来的 1.5 倍
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            // 利用 CAS 办法更新扩容的阈值,sizeCtl 减一,阐明新退出一个线程参加到扩容中
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        // 遍历的节点为 null,则放入到 ForwardingNode 指针节点
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // f.hash==- 1 示意遍历到 ForwardingNode 节点,阐明该节点曾经解决过了 
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            // 节点加锁
            synchronized (f) {if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    // fh>=0,示意为链表节点
                    if (fh >= 0) {
                        // 构建两个链表,一个是原链表,另一个是原链表的反序链表
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        // 在 nextTable i 地位处插入链表
                        setTabAt(nextTab, i, ln);
                        // 在 nextTable i+n 地位处插入链表
                        setTabAt(nextTab, i + n, hn);
                        // 在 table i 的地位处插上 ForwardingNode,示意该节点曾经解决过
                        setTabAt(tab, i, fwd);
                        // 能够执行 -- i 的操作,再次遍历节点
                        advance = true;
                    }
                    // TreeBin 红黑树,依照红黑树解决,解决逻辑和链表相似
                    else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        // 扩容后树节点的个数 <=6, 红黑树转成链表
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

扩容过程有的简单,次要波及到多线程的并发扩容,ForwardingNode 的作用就是反对扩容操作,将曾经解决过的节点和空节点置为 ForwardingNode,并发解决时多个线程解决 ForwardingNode 示意曾经解决过了,就往后遍历。

总结

  • ConcurrentHashMap 是基于 数组 + 链表 / 红黑树 的数据结构,增加、删除、更新都是先通过计算 keyhash 值确定数据的索引值,这和 HashMap 是相似的,只不过 ConcurrentHashMap 针对并发做了更多的解决。
  • get 办法获取数据,先计算 hash 值再再和数组长度取余操作获取索引地位。

    • 通过 volatile 关键字找到 table 保障多线程环境下,数组扩容具备 可见性 , 而 Node 节点中 valnext 指针都应用 volatile 润饰保证数据批改后别的线程是 可见的 。这就保障了 ConcurrentHashMap 线程安全性
    • 如果遇到数组扩容,就参加到扩容中。
    • 首节点值匹配到数据就间接返回数据,否则就遍历链表或者红黑树,直到匹配到数据。
  • put 办法增加或者更新数据。

    • 如果 keyvalue 为空,就报错。这是因为在调用 get 办法获取数据为 null , 无奈判断是获取的数据为 null,还是对应的 key 就不存在映射, HashMap 能够通过 contains(key) 判断,而 ConcurrentHashMap 在多线程环境下调用 containsget 办法的时候,map 可能就不同了。
    • 如果 table 数组为空,先初始化数组,先通过 sizeCtl 管制并发,如果小于 0 示意有别的线程正在初始化数组,就让出 CPU , 否则应用 CASsizeCtl 设置成 -1
    • 初始化数组之后,如果节点为空,应用 CAS 增加节点。
    • 不为空,就锁住该节点,进行增加或者更新操作。
  • transfer 扩容

    • 在新增一个节点时,链表个数达到阈值 8,会将链表转成红黑树,在转成之前,会先判断数组长度小于 64 , 会触发扩容。还有汇合个数达到阈值时也会触发扩容。
    • 扩容数组的长度是原来数组的两倍。
    • 为了反对多线程扩容创立 ForwardingNode 节点作为标记位,如果遍历到该节点,阐明曾经做过解决。
    • 遍历赋值原来的数据给新的数组。

正文完
 0