HashMap存在的问题

任何技术的诞生都是有其独特的诞生背景的,HashMap 诞生于分治思维,而 ConcurrentHashMap 则是为了解决 HashMap 中的线程平安问题而生,接下来咱们就一起看一下 HashMap 中存在的线程平安问题。

  • 先看下 jdk7 中扩容办法的实现

    void resize(int newCapacity) {    Entry[] oldTable = table;    int oldCapacity = oldTable.length;    if (oldCapacity == MAXIMUM_CAPACITY) {        threshold = Integer.MAX_VALUE;        return;    }    Entry[] newTable = new Entry[newCapacity];    // 最终会进入transfer办法    transfer(newTable, initHashSeedAsNeeded(newCapacity));    table = newTable;    threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);}    复制代码

    resize 办法会调用 transfer办法
    《2020最新Java根底精讲视频教程和学习路线!》

    void transfer(Entry[] newTable, boolean rehash) {    int newCapacity = newTable.length;    for (Entry<K,V> e : table) {        while(null != e) {            Entry<K,V> next = e.next;            if (rehash) {                e.hash = null == e.key ? 0 : hash(e.key);            }            int i = indexFor(e.hash, newCapacity);            e.next = newTable[i];            newTable[i] = e;            e = next;        }    }}复制代码

    如图所示,该map在插入第四个元素时会触发扩容

假如有两个线程同时执行到 transfer 办法,线程2执行到 Entry<K,V> next = e.next; 之后cpu工夫片耗尽,此时变量e指向节点a,变量next指向节点b。线程1实现了扩容,变量援用关系如图所示:

线程2开始执行,此时对于线程2中保留的变量援用关系如图所示:

执行后,变量e指向节点b,因为e不是null,则继续执行循环体,执行后的援用关系。

变量e又从新指回节点a,只能继续执行循环体,这里仔细分析下: 1、执行完Entry<K,V> next = e.next;,目前节点a没有next,所以变量next指向null; 2、e.next = newTable[i]; 其中 newTable[i] 指向节点 b,那就是把 a 的 next 指向了节点 b,这样 a 和 b 就互相援用了,造成了一个环; 3、newTable[i] = e 把节点a放到了数组i地位; 4、e = next; 把变量e赋值为 null,因为第一步中变量 next 就是指向 null;

所以最终的援用关系是这样的:

节点a和b相互援用,造成了一个环,当在数组该地位get寻找对应的key时,就产生了死循环。

另外,如果线程2把 newTable 设置成到外部的 table,节点 c 的数据就丢了,看来还有数据遗失的问题。

  • jdk8中的数据笼罩问题

    final V putVal(int hash, K key, V value, boolean onlyIfAbsent,                   boolean evict) {        Node<K,V>[] tab; Node<K,V> p; int n, i;        if ((tab = table) == null || (n = tab.length) == 0)            n = (tab = resize()).length;        if ((p = tab[i = (n - 1) & hash]) == null)             // 如果没有hash碰撞则直接插入元素            tab[i] = newNode(hash, key, value, null);        else {           //...        }        ++modCount;        if (++size > threshold)            resize();        afterNodeInsertion(evict);        return null;    }复制代码

    1、假如两个线程1、2都在进行 put 操作,并且hash函数计算出的插入下标是雷同的。

    2、当线程1执行完第六行代码后因为工夫片耗尽导致被挂起,而线程2失去工夫片后在该下标处插入了元素,实现了失常的插入。

    3、而后线程1取得工夫片,因为之前曾经进行了 hash 碰撞的判断,所有此时不会再进行判断,而是间接进行插入。

    4、这就导致了线程2插入的数据被线程1笼罩了,从而线程不平安。

ConcurrentHashMap实现原理

ConcurrentHashMap 在 jdk7 降级j到 dk8之 后有较大的改变,jdk7 中次要采纳 Segment 分段锁的思维,Segment 继承自ReentrantLock 类,顺次来保障线程平安。限于篇幅起因,本文只探讨 jdk8 中的 ConcurrentHashMap 实现原理。有趣味的同学能够自行钻研 jdk7 中的实现。

jdk8 中的 ConcurrentHashMap 数据结构同 jdk8 中的 HashMap 数据结构一样,都是 数组+链表+红黑树。摒弃了 jdk7 中的分段锁设计,应用了 Node + CAS + Synchronized 来保障线程平安。

重要成员变量

table:默认为 null,初始化产生在第一次插入操作,默认大小为16的数组,用来存储Node节点数据,扩容时大小总是2的幂次方。nextTable:默认为 null,扩容时新生成的数组,其大小为原数组的两倍。sizeCtl:默认为0,用来管制table的初始化和扩容操作,具体利用在后续会体现进去。  -1 代表 table 正在初始化     -N 示意有 N-1 个线程正在进行扩容操作     其余状况:         1、如果 table 未初始化,示意 table 须要初始化的大小。         2、如果 table 初始化实现,示意 table 的容量,默认是 table 大小的0.75倍,用这个公式算 0.75(n - (n >>> 2))。 Node:保留 key,value 及 key 的 hash 值的数据结构。其中 value 和 next 都用 **volatile** 润饰,保障并发的可见性。ForwardingNode:一个非凡的 Node 节点,hash 值为-1,其中存储 nextTable 的援用。只有 table 产生扩容的时候,ForwardingNode 才会发挥作用,作为一个占位符放在 table 中示意以后节点为 null 或则曾经被挪动。复制代码
static class Node<K,V> implements Map.Entry<K,V> {    final int hash;    final K key;    // volatile 润饰的变量能够保障线程可见性,同时也能够禁止指令重排序    // 无关 volatile 原理此处不开展,volatile 的实现原理可自行上网查阅    volatile V val;    volatile Node<K,V> next;    //...}复制代码

重要办法实现原理

在探索办法实现之前,咱们先认识一下 Unsafe 和 CAS 思维,ConcurrentHashMap 中大量用到 Unsafe 类和 CAS 思维。

Unsafe

Unsafe 是 jdk 提供的一个间接拜访操作系统资源的工具类(底层c++实现),它能够间接分配内存,内存复制,copy,提供 cpu 级别的 CAS 乐观锁等操作。它的目标是为了加强java语言间接操作底层资源的能力。应用Unsafe类最次要的起因是防止应用高速缓冲区中的过期数据。为了不便了解,举个栗子。类 User 有一个成员变量 name。咱们new了一个对象 User 后,就晓得了它在内存中的起始值 ,而成员变量 name 在对象中的地位偏移是固定的。这样通过这个起始值和这个偏移量就可能定位到 name 在内存中的具体位置。Unsafe 提供了相应的办法获取动态成员变量,成员变量偏移量的办法,所以咱们能够应用 Unsafe 间接更新内存中 name 的值。复制代码

CAS

CAS 译为 Compare And Swap,它是乐观锁的一种实现。假如内存值为 v,预期值为 e,想要更新成得值为 u,当且仅当内存值v等于预期值e时,才将v更新为u。CAS 操作不须要加锁,所以比起加锁的形式 CAS 效率更高。复制代码

接下来咱们来看看具体的办法实现。

size 办法

size 办法用于统计 map 中的元素个数,通过源码咱们发现 size 外围是 sumCount 办法,其中变量 baseCount 的值是记录实现元素插入并且胜利更新到 baseCount 上的元素个数,CounterCell 数组是记录实现元素插入然而在 CAS 批改 baseCount 失败时的元素个数,因而 baseCount + CounterCell 数组记录的总数是 map 中的元素个数。

这样设计的起因是高并发状况下大量的 CAS 批改 baseCount 的值是失败的,为了节约性能,CAS 更新 baseCount 失败之后用 CounterCell 数组记录下来,CounterCell 初始化数组长度为2,高并发状况能够扩容,每个数组节点别离记录落在以后数组的记录数,应用的是 CAS 去操作 value++,最初将所有节点的 value 求和,并加上 baseCount 的值,即为 map 元素个数。

public int size() {    long n = sumCount();    return ((n < 0L) ? 0 :            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :            (int)n);}复制代码
final long sumCount() {    // CounterCell 数组是批改 baseCount 失败的线程放入的数量    CounterCell[] as = counterCells; CounterCell a;    // baseCount 是批改 baseCount 胜利的线程放入的数量    long sum = baseCount;    if (as != null) {        for (int i = 0; i < as.length; ++i) {            if ((a = as[i]) != null)                sum += a.value;        }    }    return sum;}复制代码

put 办法

put 办法是向map中存入元素,实质上调用了 putVal,该办法十分外围的办法之一,读者可联合笔者增加的正文加以了解

 final V putVal(K key, V value, boolean onlyIfAbsent) {         if (key == null || value == null) throw new NullPointerException();     int hash = spread(key.hashCode());     int binCount = 0;     for (Node<K,V>[] tab = table;;) {         Node<K,V> f; int n, i, fh;         // 如果 tab 未初始化,先初始化 tab,此处是懒加载的思维         if (tab == null || (n = tab.length) == 0)             tab = initTable();         // 如果计算出来的 tab 下标地位上没有其余元素,用 CAS 操作建设援用         else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {             if (casTabAt(tab, i, null,                          new Node<K,V>(hash, key, value, null)))                 break;                   // no lock when adding to empty bin         }         // 如果发现以后节点的哈希值是 MOVED,则阐明正处于扩容状态中,以后线程退出扩容大军,帮忙扩容         else if ((fh = f.hash) == MOVED)             tab = helpTransfer(tab, f);         else {             V oldVal = null;             // 哈希抵触,锁住以后节点             synchronized (f) {                 if (tabAt(tab, i) == f) {                     // fh>=0阐明是链表,遍历寻找                     if (fh >= 0) {                         binCount = 1;                         for (Node<K,V> e = f;; ++binCount) {                             K ek;                             // 发现曾经存在雷同的 key,依据 onlyIfAbsent 判断是否笼罩                             if (e.hash == hash &&                                 ((ek = e.key) == key ||                                  (ek != null && key.equals(ek)))) {                                 oldVal = e.val;                                 if (!onlyIfAbsent)                                     e.val = value;                                 break;                             }                             // 持续向下遍历                             Node<K,V> pred = e;                             if ((e = e.next) == null) {                                 pred.next = new Node<K,V>(hash, key,                                                           value, null);                                 break;                             }                         }                     }                     // 如果是红黑树,调用 putTreeVal 办法,遍历树,此处逻辑不具体开展                     else if (f instanceof TreeBin) {                         Node<K,V> p;                         binCount = 2;                         if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,                                                               value)) != null) {                             oldVal = p.val;                             if (!onlyIfAbsent)                                 p.val = value;                         }                     }                 }             }             if (binCount != 0) {                 // 此时 binCount 示意一个节点对应链表的长度,达到8就转换成红黑树                 if (binCount >= TREEIFY_THRESHOLD)                     treeifyBin(tab, i);                 // 返回旧值                 if (oldVal != null)                     return oldVal;                 break;             }         }     }     // 判断是否须要扩容     addCount(1L, binCount);     return null; }复制代码

小结 put

  1. 先校验一个 k 和 v 都不可为空。
  2. 判断 table 是否为空,如果为空就进入初始化阶段。
  3. 如果发现插入地位的 bucket 为空,间接把键值对插入到这个桶中作为头节点。
  4. 如果这个要插入的桶中的 hash 值为 - 1,也就是 MOVED 状态(也就是这个节点是 forwordingNode ),那就是阐明有线程正在进行扩容操作,那么以后线程就进入帮助扩容阶段。
  5. 如果这个节点是一个链表节点,依据 key 匹配后果去决定是插入还是笼罩,插入是用尾插法。如果这个节点是一个红黑树节点,那就须要依照树的插入规定进行插入。
  6. 插入完结之后判断该链表节点个数是否达到8,如果是就把链表转化为红黑树存储。
  7. put 完结之后,须要给 map 已存储的数量 +1,在 addCount 办法中判断是否须要扩容。
总结一下扩容条件:1. 元素个数达到扩容阈值。2. 调用 putAll 办法,但目前容量不足以寄存所有元素时。3. 某条链表长度达到8,但数组长度却小于64时,该逻辑在 treeifyBin 办法中。复制代码

通读了putVal之后,咱们比拟关注其中一些办法:

  • tabAt 办法是通过 Unsafe 类依据偏移量间接从内存中获取数据,防止了从高速缓冲区取得了过期数据
  • casTabAt 办法次要通过 Unsafe 类间接操作内存,通过比拟替换赋值,该操作不必加锁,所以能够进步操作效率

    @SuppressWarnings("unchecked")static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);}static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,                                    Node<K,V> c, Node<K,V> v) {    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);}复制代码
  • initTable 办法初始化 map 中的底层数组

    private final Node<K,V>[] initTable() {    Node<K,V>[] tab; int sc;    while ((tab = table) == null || tab.length == 0) {        // 如果一个线程发现 sizeCtl < 0 ,意味着另外的线程执行 CAS 操作胜利,以后线程只须要让出 CPU 工夫片        if ((sc = sizeCtl) < 0)            // Thread.yield() 办法是让以后线程被动放弃 CPU 的执行权,线程回到就绪状态            Thread.yield(); // lost initialization race; just spin        // 通过 CAS 设置 sizeCtl 为 -1        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {            try {                if ((tab = table) == null || tab.length == 0) {                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;                    @SuppressWarnings("unchecked")                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];                    table = tab = nt;                    //sc = 0.75 * n,此处的sc即为扩容的阈值                    sc = n - (n >>> 2);                }            } finally {                //将sizeCtl设置成扩容阈值                sizeCtl = sc;            }            break;        }    }    return tab;}复制代码
  • addCount 办法用于判断是否须要扩容

    private final void addCount(long x, int check) {    CounterCell[] as; long b, s;    if ((as = counterCells) != null ||        // 曾经放入到 map 中然而更新 baseCount 失败,放到 CounterCell 数组中        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {        CounterCell a; long v; int m;        boolean uncontended = true;        // 如果 CounterCell 数组是空(尚未呈现并发)        // 如果随机取余一个数组地位为空 或者        // 批改这个槽位的变量失败(呈现并发了)        if (as == null || (m = as.length - 1) < 0 ||            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||            !(uncontended =              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {            // 执行 fullAddCount 办法,将更新 baseCount 失败的元素个数保留到CounterCell 数组中            fullAddCount(x, uncontended);            return;        }        if (check <= 1)            return;        // s 是总节点数量;        s = sumCount();    }    // 扩容逻辑    if (check >= 0) {        Node<K,V>[] tab, nt; int n, sc;        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&               (n = tab.length) < MAXIMUM_CAPACITY) {            // 依据 length 失去一个标识            int rs = resizeStamp(n);            // 如果正在扩容            if (sc < 0) {                // 如果 sc 的低 16 位不等于 标识符(校验异样 sizeCtl 变动了)                // 如果 sc == 标识符 + 1 (扩容完结了,不再有线程进行扩容)(默认第一个线程设置 sc ==rs 左移 16位 + 2,当第一个线程完结扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1)                // 如果 sc == 标识符 + 65535(帮忙线程数曾经达到最大)                // 如果 nextTable == null(完结扩容了)                // 如果 transferIndex <= 0 (转移状态变动了)                // 完结循环                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||                    transferIndex <= 0)                    break;                // 如果能够帮忙扩容,那么将 sc 加 1. 示意多了一个线程在帮忙扩容                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))                    transfer(tab, nt);            }            // 如果不在扩容,将 sc 更新:标识符左移 16 位 而后 + 2. 也就是变成一个正数。高 16 位是标识符,低 16位初始是 2.            else if (U.compareAndSwapInt(this, SIZECTL, sc,                                         (rs << RESIZE_STAMP_SHIFT) + 2))                // 更新 sizeCtl 为正数后,开始扩容。                transfer(tab, null);            s = sumCount();        }    }}复制代码
    • *

get 办法

get 办法逻辑比较简单清晰

public V get(Object key) {    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;    int h = spread(key.hashCode());    if ((tab = table) != null && (n = tab.length) > 0 &&        (e = tabAt(tab, (n - 1) & h)) != null) {        // 判断链表头结点是否匹配 key        if ((eh = e.hash) == h) {            if ((ek = e.key) == key || (ek != null && key.equals(ek)))                return e.val;        }        // eh < 0 ,如果是红黑树,去红黑树上查找,如果是 ForwardingNode ,则会跳到扩容后的 map 上寻找        else if (eh < 0)            return (p = e.find(h, key)) != null ? p.val : null;        // 遍历链表查找        while ((e = e.next) != null) {            if (e.hash == h &&                ((ek = e.key) == key || (ek != null && key.equals(ek))))                return e.val;        }    }    return null;}复制代码

helpTransfer 和 transfer 办法

helpTransfer 办法在判断完扩容状态后,实质上还是调用了 transfer

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {    Node<K,V>[] nextTab; int sc;    // 以后节点是 ForwardingNode 并且曾经初始化实现新的数组,帮忙扩容    if (tab != null && (f instanceof  ForwardingNode 并且曾经初始化实现新的数组,帮忙扩容) &&        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {        int rs = resizeStamp(tab.length);                while (nextTab == nextTable && table == tab &&               (sc = sizeCtl) < 0) {            // 扩容实现的标记            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||                sc == rs + MAX_RESIZERS || transferIndex <= 0)                break;            // 开始帮忙扩容            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {                transfer(tab, nextTab);                break;            }        }        return nextTab;    }    return table;}复制代码

transfer 办法逻辑比较复杂,请读者联合正文和配图急躁了解

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {    int n = tab.length, stride;    // CPU 外围数大于1,每个线程负责迁徙16个 bucket    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)        stride = MIN_TRANSFER_STRIDE; // subdivide range    // 初始化扩容后的数组    if (nextTab == null) {            // initiating        try {            @SuppressWarnings("unchecked")            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];            nextTab = nt;        } catch (Throwable ex) {      // try to cope with OOME            sizeCtl = Integer.MAX_VALUE;            return;        }        nextTable = nextTab;        transferIndex = n;    }    int nextn = nextTab.length;    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);    boolean advance = true;    boolean finishing = false; // to ensure sweep before committing nextTab    // 通过for自循环解决每个槽位中的链表元素,默认advace为真,通过CAS设置transferIndex属性值,并初始化i和bound值,i指以后解决的槽位序号,bound指须要解决的槽位边界,先解决槽位15的节点;    for (int i = 0, bound = 0;;) {        Node<K,V> f; int fh;        while (advance) {            int nextIndex, nextBound;            // 所有节点都有线程负责或者曾经扩容实现            if (--i >= bound || finishing)                advance = false;            // 将 i 值置-1            else if ((nextIndex = transferIndex) <= 0) {                i = -1;                advance = false;            }            else if (U.compareAndSwapInt                     (this, TRANSFERINDEX, nextIndex,                      nextBound = (nextIndex > stride ?                                   nextIndex - stride : 0))) {                //确定以后线程每次调配的待迁徙桶的范畴为[bound, nextIndex)                bound = nextBound;                i = nextIndex - 1;                advance = false;            }        }        // 以后线程本人的活曾经做完或所有线程的活都已做完,第二与第三个条件应该是上面让"i = n"后,再次进入循环时要做的边界查看。        if (i < 0 || i >= n || i + n >= nextn) {            int sc;            // 扩容实现的后续工作            if (finishing) {                nextTable = null;                table = nextTab;                sizeCtl = (n << 1) - (n >>> 1);                return;            }            // 采纳CAS算法更新SizeCtl,设置 finishing 和 advance 标记            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)                    return;                finishing = advance = true;                i = n; // recheck before commit            }        }        // 以后节点为 null,间接标记成 forwordingNode         else if ((f = tabAt(tab, i)) == null)            advance = casTabAt(tab, i, null, fwd);        // 以后节点曾经迁徙过        else if ((fh = f.hash) == MOVED)            advance = true; // already processed        else {            // 锁住头结点            synchronized (f) {                if (tabAt(tab, i) == f) {                    //这里 ln,hn 是高下链表的思维,具体过程下图中会有阐明                    Node<K,V> ln, hn;                    if (fh >= 0) {                        int runBit = fh & n;                        Node<K,V> lastRun = f;                        for (Node<K,V> p = f.next; p != null; p = p.next) {                            int b = p.hash & n;                            if (b != runBit) {                                runBit = b;                                lastRun = p;                            }                        }                        if (runBit == 0) {                            ln = lastRun;                            hn = null;                        }                        else {                            hn = lastRun;                            ln = null;                        }                        for (Node<K,V> p = f; p != lastRun; p = p.next) {                            int ph = p.hash; K pk = p.key; V pv = p.val;                            if ((ph & n) == 0)                                ln = new Node<K,V>(ph, pk, pv, ln);                            else                                hn = new Node<K,V>(ph, pk, pv, hn);                        }                        setTabAt(nextTab, i, ln);                        setTabAt(nextTab, i + n, hn);                        setTabAt(tab, i, fwd);                        advance = true;                    }                    // 红黑树也是高下链表的思维,具体过程下图中会有阐明                    else if (f instanceof TreeBin) {                        TreeBin<K,V> t = (TreeBin<K,V>)f;                        TreeNode<K,V> lo = null, loTail = null;                        TreeNode<K,V> hi = null, hiTail = null;                        int lc = 0, hc = 0;                        for (Node<K,V> e = t.first; e != null; e = e.next) {                            int h = e.hash;                            TreeNode<K,V> p = new TreeNode<K,V>                                (h, e.key, e.val, null, null);                            if ((h & n) == 0) {                                if ((p.prev = loTail) == null)                                    lo = p;                                else                                    loTail.next = p;                                loTail = p;                                ++lc;                            }                            else {                                if ((p.prev = hiTail) == null)                                    hi = p;                                else                                    hiTail.next = p;                                hiTail = p;                                ++hc;                            }                        }                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :                        (hc != 0) ? new TreeBin<K,V>(lo) : t;                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :                        (lc != 0) ? new TreeBin<K,V>(hi) : t;                        setTabAt(nextTab, i, ln);                        setTabAt(nextTab, i + n, hn);                        setTabAt(tab, i, fwd);                        advance = true;                    }                }            }        }    }}复制代码
  • 多线程开始扩容

  • lastrun节点

  • 链表迁徙

  • 红黑树迁徙

  • 迁徙过程中get和put的操作的解决

  • 并发迁徙

  • 迁徙实现

小结 transfer:

  1. 依据 CPU 外围数确定每个线程负责的桶数,默认每个线程16个桶
  2. 创立新数组,长度是原来数组的两倍
  3. 调配好以后线程负责的桶区域 [bound, nextIndex)
  4. 并发迁徙,依据链表和红黑树执行不同迁徙策略
  5. 迁徙实现,设置新的数组和新的扩容阈值
    • *

至此,笔者曾经把 ConcurrentHashMap 几个重要的办法实现介绍完了。剩下的如 removereplace 等办法实现都大同小异,读者可自行钻研。

总结

通过以上对 ConcurrentHashMap 的初步探讨,置信读者也会和笔者一样惊叹于 Doug Lea 巨匠的编程程度和技巧。ConcurrentHashMap 类在 jdk8 中源码多达6300 行,其中使用了大量的多线程与高并发的编程技术,如 volatile、synchronized、CAS、Unsafe、Thread 类相干 API,以及许多精美的算法,如 ConcurrentHashMap 底层数组的长度是2的幂次方以便用位运算计算元素下标 ,同时也不便计算扩容后的元素下标,还有令笔者惊叹的高下链表迁徙操作,诸如此类、不再赘述。

感激 Doug Lea 对于 Java 倒退做出的奉献,也心愿咱们能够向巨匠学习,磨炼本人的编程程度。借用笔者很喜爱的一个程序员大佬的一句话,学习是一条令人时而悲痛欲绝、时而郁郁寡欢的路线。共勉!