关于后端:一文看懂jdk8中的-ConcurrentHashMap

41次阅读

共计 12852 个字符,预计需要花费 33 分钟才能阅读完成。

HashMap 存在的问题

任何技术的诞生都是有其独特的诞生背景的,HashMap 诞生于分治思维,而 ConcurrentHashMap 则是为了解决 HashMap 中的线程平安问题而生,接下来咱们就一起看一下 HashMap 中存在的线程平安问题。

  • 先看下 jdk7 中扩容办法的实现

    void resize(int newCapacity) {Entry[] oldTable = table;
        int oldCapacity = oldTable.length;
        if (oldCapacity == MAXIMUM_CAPACITY) {
            threshold = Integer.MAX_VALUE;
            return;
        }
        Entry[] newTable = new Entry[newCapacity];
        // 最终会进入 transfer 办法
        transfer(newTable, initHashSeedAsNeeded(newCapacity));
        table = newTable;
        threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
    }
      
      
    复制代码

    resize 办法会调用 transfer办法
    《2020 最新 Java 根底精讲视频教程和学习路线!》

    void transfer(Entry[] newTable, boolean rehash) {
        int newCapacity = newTable.length;
        for (Entry<K,V> e : table) {while(null != e) {
                Entry<K,V> next = e.next;
                if (rehash) {e.hash = null == e.key ? 0 : hash(e.key);
                }
                int i = indexFor(e.hash, newCapacity);
                e.next = newTable[i];
                newTable[i] = e;
                e = next;
            }
        }
    }
    复制代码

    如图所示,该 map 在插入第四个元素时会触发扩容

假如有两个线程同时执行到 transfer 办法,线程 2 执行到 Entry<K,V> next = e.next; 之后 cpu 工夫片耗尽,此时变量 e 指向节点 a,变量 next 指向节点 b。线程 1 实现了扩容,变量援用关系如图所示:

线程 2 开始执行,此时对于线程 2 中保留的变量援用关系如图所示:

执行后,变量 e 指向节点 b,因为 e 不是 null,则继续执行循环体,执行后的援用关系。

变量 e 又从新指回节点 a,只能继续执行循环体,这里仔细分析下:1、执行完Entry<K,V> next = e.next;,目前节点 a 没有 next,所以变量 next 指向 null;2、e.next = newTable[i]; 其中 newTable[i] 指向节点 b,那就是把 a 的 next 指向了节点 b,这样 a 和 b 就互相援用了,造成了一个环;3、newTable[i] = e 把节点 a 放到了数组 i 地位;4、e = next; 把变量 e 赋值为 null,因为第一步中变量 next 就是指向 null;

所以最终的援用关系是这样的:

​ 节点 a 和 b 相互援用,造成了一个环,当在数组该地位 get 寻找对应的 key 时,就产生了死循环。

​ 另外,如果线程 2 把 newTable 设置成到外部的 table,节点 c 的数据就丢了,看来还有数据遗失的问题。

  • jdk8 中的数据笼罩问题

    final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
                       boolean evict) {Node<K,V>[] tab; Node<K,V> p; int n, i;
            if ((tab = table) == null || (n = tab.length) == 0)
                n = (tab = resize()).length;
            if ((p = tab[i = (n - 1) & hash]) == null)
                 // 如果没有 hash 碰撞则直接插入元素
                tab[i] = newNode(hash, key, value, null);
            else {//...}
            ++modCount;
            if (++size > threshold)
                resize();
            afterNodeInsertion(evict);
            return null;
        }
    复制代码

    1、假如两个线程 1、2 都在进行 put 操作,并且 hash 函数计算出的插入下标是雷同的。

    2、当线程 1 执行完第六行代码后因为工夫片耗尽导致被挂起,而线程 2 失去工夫片后在该下标处插入了元素,实现了失常的插入。

    3、而后线程 1 取得工夫片,因为之前曾经进行了 hash 碰撞的判断,所有此时不会再进行判断,而是间接进行插入。

    4、这就导致了线程 2 插入的数据被线程 1 笼罩了,从而线程不平安。

ConcurrentHashMap 实现原理

ConcurrentHashMap 在 jdk7 降级 j 到 dk8 之 后有较大的改变,jdk7 中次要采纳 Segment 分段锁的思维,Segment 继承自ReentrantLock 类,顺次来保障线程平安。限于篇幅起因,本文只探讨 jdk8 中的 ConcurrentHashMap 实现原理。有趣味的同学能够自行钻研 jdk7 中的实现。

jdk8 中的 ConcurrentHashMap 数据结构同 jdk8 中的 HashMap 数据结构一样,都是 数组 + 链表 + 红黑树。摒弃了 jdk7 中的分段锁设计,应用了 Node + CAS + Synchronized 来保障线程平安。

重要成员变量

table:默认为 null,初始化产生在第一次插入操作,默认大小为 16 的数组,用来存储 Node 节点数据,扩容时大小总是 2 的幂次方。nextTable:默认为 null,扩容时新生成的数组,其大小为原数组的两倍。sizeCtl:默认为 0,用来管制 table 的初始化和扩容操作,具体利用在后续会体现进去。-1 代表 table 正在初始化 
    -N 示意有 N-1 个线程正在进行扩容操作 
    其余状况:1、如果 table 未初始化,示意 table 须要初始化的大小。2、如果 table 初始化实现,示意 table 的容量,默认是 table 大小的 0.75 倍,用这个公式算 0.75(n - (n >>> 2))。Node:保留 key,value 及 key 的 hash 值的数据结构。其中 value 和 next 都用 **volatile** 润饰,保障并发的可见性。ForwardingNode:一个非凡的 Node 节点,hash 值为 -1,其中存储 nextTable 的援用。只有 table 产生扩容的时候,ForwardingNode 才会发挥作用,作为一个占位符放在 table 中示意以后节点为 null 或则曾经被挪动。复制代码
static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    // volatile 润饰的变量能够保障线程可见性,同时也能够禁止指令重排序
    // 无关 volatile 原理此处不开展,volatile 的实现原理可自行上网查阅
    volatile V val;
    volatile Node<K,V> next;
    //...
}
复制代码

重要办法实现原理

在探索办法实现之前,咱们先认识一下 Unsafe 和 CAS 思维,ConcurrentHashMap 中大量用到 Unsafe 类和 CAS 思维。

Unsafe

Unsafe 是 jdk 提供的一个间接拜访操作系统资源的工具类(底层 c ++ 实现),它能够间接分配内存,内存复制,copy,提供 cpu 级别的 CAS 乐观锁等操作。它的目标是为了加强 java 语言间接操作底层资源的能力。应用 Unsafe 类最次要的起因是防止应用高速缓冲区中的过期数据。为了不便了解,举个栗子。类 User 有一个成员变量 name。咱们 new 了一个对象 User 后,就晓得了它在内存中的起始值 , 而成员变量 name 在对象中的地位偏移是固定的。这样通过这个起始值和这个偏移量就可能定位到 name 在内存中的具体位置。Unsafe 提供了相应的办法获取动态成员变量,成员变量偏移量的办法,所以咱们能够应用 Unsafe 间接更新内存中 name 的值。复制代码

CAS

CAS 译为 Compare And Swap,它是乐观锁的一种实现。假如内存值为 v,预期值为 e,想要更新成得值为 u,当且仅当内存值 v 等于预期值 e 时,才将 v 更新为 u。CAS 操作不须要加锁,所以比起加锁的形式 CAS 效率更高。复制代码

接下来咱们来看看具体的办法实现。

size 办法

size 办法用于统计 map 中的元素个数,通过源码咱们发现 size 外围是 sumCount 办法,其中变量 baseCount 的值是记录实现元素插入并且胜利更新到 baseCount 上的元素个数,CounterCell 数组是记录实现元素插入然而在 CAS 批改 baseCount 失败时的元素个数,因而 baseCount + CounterCell 数组记录的总数是 map 中的元素个数。

这样设计的起因是高并发状况下大量的 CAS 批改 baseCount 的值是失败的,为了节约性能,CAS 更新 baseCount 失败之后用 CounterCell 数组记录下来,CounterCell 初始化数组长度为 2,高并发状况能够扩容,每个数组节点别离记录落在以后数组的记录数,应用的是 CAS 去操作 value++,最初将所有节点的 value 求和,并加上 baseCount 的值,即为 map 元素个数。

public int size() {long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}
复制代码
final long sumCount() {
    // CounterCell 数组是批改 baseCount 失败的线程放入的数量
    CounterCell[] as = counterCells; CounterCell a;
    // baseCount 是批改 baseCount 胜利的线程放入的数量
    long sum = baseCount;
    if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}
复制代码

put 办法

put 办法是向 map 中存入元素,实质上调用了 putVal,该办法十分外围的办法之一,读者可联合笔者增加的正文加以了解

 final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();
     int hash = spread(key.hashCode());
     int binCount = 0;
     for (Node<K,V>[] tab = table;;) {
         Node<K,V> f; int n, i, fh;
         // 如果 tab 未初始化,先初始化 tab,此处是懒加载的思维
         if (tab == null || (n = tab.length) == 0)
             tab = initTable();
         // 如果计算出来的 tab 下标地位上没有其余元素,用 CAS 操作建设援用
         else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
             if (casTabAt(tab, i, null,
                          new Node<K,V>(hash, key, value, null)))
                 break;                   // no lock when adding to empty bin
         }
         // 如果发现以后节点的哈希值是 MOVED,则阐明正处于扩容状态中,以后线程退出扩容大军,帮忙扩容
         else if ((fh = f.hash) == MOVED)
             tab = helpTransfer(tab, f);
         else {
             V oldVal = null;
             // 哈希抵触,锁住以后节点
             synchronized (f) {if (tabAt(tab, i) == f) {
                     // fh>= 0 阐明是链表,遍历寻找
                     if (fh >= 0) {
                         binCount = 1;
                         for (Node<K,V> e = f;; ++binCount) {
                             K ek;
                             // 发现曾经存在雷同的 key,依据 onlyIfAbsent 判断是否笼罩
                             if (e.hash == hash &&
                                 ((ek = e.key) == key ||
                                  (ek != null && key.equals(ek)))) {
                                 oldVal = e.val;
                                 if (!onlyIfAbsent)
                                     e.val = value;
                                 break;
                             }
                             // 持续向下遍历
                             Node<K,V> pred = e;
                             if ((e = e.next) == null) {
                                 pred.next = new Node<K,V>(hash, key,
                                                           value, null);
                                 break;
                             }
                         }
                     }
                     // 如果是红黑树,调用 putTreeVal 办法,遍历树,此处逻辑不具体开展
                     else if (f instanceof TreeBin) {
                         Node<K,V> p;
                         binCount = 2;
                         if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                               value)) != null) {
                             oldVal = p.val;
                             if (!onlyIfAbsent)
                                 p.val = value;
                         }
                     }
                 }
             }
             if (binCount != 0) {
                 // 此时 binCount 示意一个节点对应链表的长度,达到 8 就转换成红黑树
                 if (binCount >= TREEIFY_THRESHOLD)
                     treeifyBin(tab, i);
                 // 返回旧值
                 if (oldVal != null)
                     return oldVal;
                 break;
             }
         }
     }
     // 判断是否须要扩容
     addCount(1L, binCount);
     return null;
 }
复制代码

小结 put

  1. 先校验一个 k 和 v 都不可为空。
  2. 判断 table 是否为空,如果为空就进入初始化阶段。
  3. 如果发现插入地位的 bucket 为空,间接把键值对插入到这个桶中作为头节点。
  4. 如果这个要插入的桶中的 hash 值为 – 1,也就是 MOVED 状态(也就是这个节点是 forwordingNode),那就是阐明有线程正在进行扩容操作,那么以后线程就进入帮助扩容阶段。
  5. 如果这个节点是一个链表节点,依据 key 匹配后果去决定是插入还是笼罩,插入是用尾插法。如果这个节点是一个红黑树节点,那就须要依照树的插入规定进行插入。
  6. 插入完结之后判断该链表节点个数是否达到 8,如果是就把链表转化为红黑树存储。
  7. put 完结之后,须要给 map 已存储的数量 +1,在 addCount 办法中判断是否须要扩容。
总结一下扩容条件:1. 元素个数达到扩容阈值。2. 调用 putAll 办法,但目前容量不足以寄存所有元素时。3. 某条链表长度达到 8,但数组长度却小于 64 时,该逻辑在 treeifyBin 办法中。复制代码

通读了 putVal 之后,咱们比拟关注其中一些办法:

  • tabAt 办法是通过 Unsafe 类依据偏移量间接从内存中获取数据,防止了从高速缓冲区取得了过期数据
  • casTabAt 办法次要通过 Unsafe 类间接操作内存,通过比拟替换赋值,该操作不必加锁,所以能够进步操作效率

    @SuppressWarnings("unchecked")
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    复制代码
  • initTable 办法初始化 map 中的底层数组

    private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            // 如果一个线程发现 sizeCtl < 0,意味着另外的线程执行 CAS 操作胜利,以后线程只须要让出 CPU 工夫片
            if ((sc = sizeCtl) < 0)
                // Thread.yield() 办法是让以后线程被动放弃 CPU 的执行权,线程回到就绪状态
                Thread.yield(); // lost initialization race; just spin
            // 通过 CAS 设置 sizeCtl 为 -1
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        //sc = 0.75 * n,此处的 sc 即为扩容的阈值
                        sc = n - (n >>> 2);
                    }
                } finally {
                    // 将 sizeCtl 设置成扩容阈值
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
    复制代码
  • addCount 办法用于判断是否须要扩容

    private final void addCount(long x, int check) {CounterCell[] as; long b, s;
        if ((as = counterCells) != null ||
            // 曾经放入到 map 中然而更新 baseCount 失败,放到 CounterCell 数组中
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            // 如果 CounterCell 数组是空(尚未呈现并发)// 如果随机取余一个数组地位为空 或者
            // 批改这个槽位的变量失败(呈现并发了)if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                // 执行 fullAddCount 办法,将更新 baseCount 失败的元素个数保留到 CounterCell 数组中
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            // s 是总节点数量;s = sumCount();}
        // 扩容逻辑
        if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                // 依据 length 失去一个标识
                int rs = resizeStamp(n);
                // 如果正在扩容
                if (sc < 0) {
                    // 如果 sc 的低 16 位不等于 标识符(校验异样 sizeCtl 变动了)// 如果 sc == 标识符 + 1(扩容完结了,不再有线程进行扩容)(默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程完结扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1)// 如果 sc == 标识符 + 65535(帮忙线程数曾经达到最大)// 如果 nextTable == null(完结扩容了)// 如果 transferIndex <= 0 (转移状态变动了)
                    // 完结循环
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    // 如果能够帮忙扩容,那么将 sc 加 1. 示意多了一个线程在帮忙扩容
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                // 如果不在扩容,将 sc 更新:标识符左移 16 位 而后 + 2. 也就是变成一个正数。高 16 位是标识符,低 16 位初始是 2.
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    // 更新 sizeCtl 为正数后,开始扩容。transfer(tab, null);
                s = sumCount();}
        }
    }
    复制代码
    • *

get 办法

get 办法逻辑比较简单清晰

public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        // 判断链表头结点是否匹配 key
        if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        // eh < 0,如果是红黑树,去红黑树上查找,如果是 ForwardingNode,则会跳到扩容后的 map 上寻找
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        // 遍历链表查找
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}
复制代码

helpTransfer 和 transfer 办法

helpTransfer 办法在判断完扩容状态后,实质上还是调用了 transfer

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {Node<K,V>[] nextTab; int sc;
    // 以后节点是 ForwardingNode 并且曾经初始化实现新的数组,帮忙扩容
    if (tab != null && (f instanceof  ForwardingNode 并且曾经初始化实现新的数组,帮忙扩容) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {int rs = resizeStamp(tab.length);
        
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            // 扩容实现的标记
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            // 开始帮忙扩容
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}
复制代码

transfer 办法逻辑比较复杂,请读者联合正文和配图急躁了解

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // CPU 外围数大于 1,每个线程负责迁徙 16 个 bucket
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    // 初始化扩容后的数组
    if (nextTab == null) {            // initiating
        try {@SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    int nextn = nextTab.length;
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    // 通过 for 自循环解决每个槽位中的链表元素,默认 advace 为真,通过 CAS 设置 transferIndex 属性值,并初始化 i 和 bound 值,i 指以后解决的槽位序号,bound 指须要解决的槽位边界,先解决槽位 15 的节点;for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            // 所有节点都有线程负责或者曾经扩容实现
            if (--i >= bound || finishing)
                advance = false;
            // 将 i 值置 -1
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {// 确定以后线程每次调配的待迁徙桶的范畴为[bound, nextIndex)
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        // 以后线程本人的活曾经做完或所有线程的活都已做完,第二与第三个条件应该是上面让 "i = n" 后,再次进入循环时要做的边界查看。if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // 扩容实现的后续工作
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            // 采纳 CAS 算法更新 SizeCtl, 设置 finishing 和 advance 标记
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        // 以后节点为 null,间接标记成 forwordingNode 
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // 以后节点曾经迁徙过
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            // 锁住头结点
            synchronized (f) {if (tabAt(tab, i) == f) {
                    // 这里 ln,hn 是高下链表的思维,具体过程下图中会有阐明
                    Node<K,V> ln, hn;
                    if (fh >= 0) {
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    // 红黑树也是高下链表的思维,具体过程下图中会有阐明
                    else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                        (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                        (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}
复制代码
  • 多线程开始扩容
  • lastrun 节点
  • 链表迁徙
  • 红黑树迁徙
  • 迁徙过程中 get 和 put 的操作的解决
  • 并发迁徙
  • 迁徙实现

小结 transfer:

  1. 依据 CPU 外围数确定每个线程负责的桶数,默认每个线程 16 个桶
  2. 创立新数组,长度是原来数组的两倍
  3. 调配好以后线程负责的桶区域 [bound, nextIndex)
  4. 并发迁徙,依据链表和红黑树执行不同迁徙策略
  5. 迁徙实现,设置新的数组和新的扩容阈值
    • *

至此,笔者曾经把 ConcurrentHashMap 几个重要的办法实现介绍完了。剩下的如 removereplace 等办法实现都大同小异,读者可自行钻研。

总结

通过以上对 ConcurrentHashMap 的初步探讨,置信读者也会和笔者一样惊叹于 Doug Lea 巨匠的编程程度和技巧。ConcurrentHashMap 类在 jdk8 中源码多达 6300 行,其中使用了大量的多线程与高并发的编程技术,如 volatile、synchronized、CAS、Unsafe、Thread 类相干 API,以及许多精美的算法,如 ConcurrentHashMap 底层数组的长度是 2 的幂次方以便用位运算计算元素下标,同时也不便计算扩容后的元素下标,还有令笔者惊叹的高下链表迁徙操作,诸如此类、不再赘述。

感激 Doug Lea 对于 Java 倒退做出的奉献,也心愿咱们能够向巨匠学习,磨炼本人的编程程度。借用笔者很喜爱的一个程序员大佬的一句话,学习是一条令人时而悲痛欲绝、时而郁郁寡欢的路线。共勉!

正文完
 0