关于java:翻了ConcurrentHashMap17-和18的源码我总结了它们的主要区别

43次阅读

共计 13591 个字符,预计需要花费 34 分钟才能阅读完成。

ConcurrentHashMap

思考:HashTable 是线程平安的,为什么不举荐应用?

HashTable 是一个线程平安的类,它应用 synchronized 来锁住整张 Hash 表来实现线程平安,即每次锁住整张表让线程独占,相当于所有线程进行读写时都去竞争一把锁,导致效率十分低下。

1 ConcurrentHashMap 1.7

在 JDK1.7 中 ConcurrentHashMap 采纳了数组 + 分段锁的形式实现

Segment(分段锁)- 缩小锁的粒度

ConcurrentHashMap 中的分段锁称为 Segment,它即相似于 HashMap 的构造,即外部领有一个 Entry 数组,数组中的每个元素又是一个链表, 同时又是一个 ReentrantLock(Segment 继承了 ReentrantLock)。

1. 存储构造

Java 7 版本 ConcurrentHashMap 的存储构造如图:

ConcurrnetHashMap 由很多个 Segment 组合,而每一个 Segment 是一个相似于 HashMap 的构造,所以每一个 HashMap 的外部能够进行扩容。然而 Segment 的个数一旦 初始化就不能扭转,默认 Segment 的个数是 16 个,所以能够认为 ConcurrentHashMap 默认反对最多 16 个线程并发。

2. 初始化

通过 ConcurrentHashMap 的无参结构探寻 ConcurrentHashMap 的初始化流程。

    /**
     * Creates a new, empty map with a default initial capacity (16),
     * load factor (0.75) and concurrencyLevel (16).
     */
    public ConcurrentHashMap() {this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }

无参结构中调用了有参结构,传入了三个参数的默认值,他们的值是。

    /**
     * 默认初始化容量, 这个容量指的是 Segment 的大小
     */
    static final int DEFAULT_INITIAL_CAPACITY = 16;

    /**
     * 默认负载因子
     */
    static final float DEFAULT_LOAD_FACTOR = 0.75f;

    /**
     * 默认并发级别,并发级别指的是 Segment 桶的个数,默认是 16 个并发大小
     */
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;

Segment 上面 entryset 数组的大小是用 DEFAULT_INITIAL_CAPACITY/DEFAULT_CONCURRENCY_LEVEL 求进去的。

接着看下这个有参构造函数的外部实现逻辑。

@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
    // 参数校验
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    // 校验并发级别大小,大于 1<<16,重置为 65536
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    // 2 的多少次方
    int sshift = 0;// 管制 segment 数组的大小
    int ssize = 1;
    // 这个循环能够找到 concurrencyLevel 之上最近的 2 的次方值
    while (ssize < concurrencyLevel) {
        ++sshift;// 代表 ssize 左移的次数
        ssize <<= 1;
    }
    // 记录段偏移量
    this.segmentShift = 32 - sshift;
    // 记录段掩码
    this.segmentMask = ssize - 1;
    // 设置容量   判断初始容量是否超过容许的最大容量
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    // c = 容量 / ssize,默认 16 / 16 = 1,这里是计算每个 Segment 中的相似于 HashMap 的容量
   // 求 entrySet 数组的大小,这个中央须要保障 entrySet 数组的大小至多能够存储下 initialCapacity 的容量,假如 initialCapacity 为 33,ssize 为 16,那么 c =2, 所以 if 语句是 true,那么 c =3,MIN_SEGMENT_TABLE_CAPACITY 初始值是 2,所以 if 语句成立,那么 cap=4,所以每一个 segment 的容量初始为 4,segment 为 16,16*4>33 成立,entrySet 数组的大小也须要是 2 的幂次方
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    //Segment 中的相似于 HashMap 的容量至多是 2 或者 2 的倍数
    while (cap < c)
        cap <<= 1;
    // create segments and segments[0]
    // 创立 Segment 数组,设置 segments[0]
    Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

总结一下在 Java 7 中 ConcurrnetHashMap 的初始化逻辑。

  1. 必要参数校验。
  2. 校验并发级别 concurrencyLevel 大小,如果大于最大值,重置为最大值。无参结构 默认值是 16.
  3. 寻找并发级别 concurrencyLevel 之上最近的 2 的幂次方 值,作为初始化容量大小,默认是 16
  4. 记录 segmentShift 偏移量,这个值为【容量 = 2 的 N 次方】中的 N,在前面 Put 时计算地位时会用到。默认是 32 – sshift = 28.
  5. 记录 segmentMask,默认是 ssize – 1 = 16 -1 = 15.
  6. 初始化 segments[0]默认大小为 2负载因子 0.75扩容阀值是 2*0.75=1.5,插入第二个值时才会进行扩容。
  1. 计算 segment 数组容量的大小。
  2. 计算 entrySet 数组的大小。
  3. 初始化 segment 数组,其中生成一个 s0 对象放在数组的第 0 个地位
  4. 为什么首先须要一个 s0 存储到数组的第一个地位?

因为初始化数组实现后数组元素都还是 null 值,当前每一次增加一个元素的话,须要封装为 entrySet 对象,还须要对 entrySet 数组的大小从新计算,如果把第一次的计算结果全副存储到 S0 中,那么当前的话只须要间接拿来应用即可,不须要从新计算。尽管 Segment 对象不同,然而对象中属性内容其实是一样的。

  1. Segment 数组的长度第一次曾经确定,当前不会在扭转,扩容是部分扩容,只对 setrySet 数组的容量进行扩容。

3. put

接着下面的初始化参数持续查看 put 办法源码。

/**
 * Maps the specified key to the specified value in this table.
 * Neither the key nor the value can be null.
 *
 * <p> The value can be retrieved by calling the <tt>get</tt> method
 * with a key that is equal to the original key.
 *
 * @param key key with which the specified value is to be associated
 * @param value value to be associated with the specified key
 * @return the previous value associated with <tt>key</tt>, or
 *         <tt>null</tt> if there was no mapping for <tt>key</tt>
 * @throws NullPointerException if the specified key or value is null
 */
public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    // hash 值无符号右移 28 位(初始化时取得),而后与 segmentMask=15 做与运算
    // 其实也就是把高 4 位与 segmentMask(1111)做与运算
  
  // this.segmentMask = ssize - 1;
   // 对 hash 值进行右移 segmentShift 位,计算元素对应 segment 中数组下表的地位
   // 把 hash 右移 segmentShift,相当于只有 hash 值的高 32-segmentShift 位,右移的目标是保留了 hash 值的高位。而后和 segmentMask 与操作计算元素在 segment 数组中的下表
    int j = (hash >>> segmentShift) & segmentMask;
   // 应用 unsafe 对象获取数组中第 j 个地位的值,前面加上的是偏移量
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        // 如果查找到的 Segment 为空,初始化
        s = ensureSegment(j);
   // 插入 segment 对象
    return s.put(key, hash, value, false);
}

/**
 * Returns the segment for the given index, creating it and
 * recording in segment table (via CAS) if not already present.
 *
 * @param k the index
 * @return the segment
 */
@SuppressWarnings("unchecked")
private Segment<K,V> ensureSegment(int k) {final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    // 判断 u 地位的 Segment 是否为 null
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {Segment<K,V> proto = ss[0]; // use segment 0 as prototype
        // 获取 0 号 segment 里的 HashEntry<K,V> 初始化长度
        int cap = proto.table.length;
        // 获取 0 号 segment 里的 hash 表里的扩容负载因子,所有的 segment 的 loadFactor 是雷同的
        float lf = proto.loadFactor;
        // 计算扩容阀值
        int threshold = (int)(cap * lf);
        // 创立一个 cap 容量的 HashEntry 数组
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck
            // 再次查看 u 地位的 Segment 是否为 null,因为这时可能有其余线程进行了操作
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            // 自旋查看 u 地位的 Segment 是否为 null
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                   == null) {
                // 应用 CAS 赋值,只会胜利一次
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    return seg;
}

下面的源码剖析了 ConcurrentHashMap 在 put 一个数据时的解决流程,上面梳理下具体流程。

  1. 计算要 put 的 key 的地位,获取指定地位的 Segment。
  2. 如果指定地位的 Segment 为空,则初始化这个 Segment.

    初始化 Segment 流程:

    1. 查看计算失去的地位的 Segment 是否为 null.
    2. 为 null 持续初始化,应用 Segment[0] 的容量和负载因子创立一个 HashEntry 数组。
    3. 再次查看计算失去的指定地位的 Segment 是否为 null.
    4. 应用创立的 HashEntry 数组初始化这个 Segment.
    5. 自旋判断计算失去的指定地位的 Segment 是否为 null,应用 CAS 在这个地位赋值为 Segment.
  3. Segment.put 插入 key,value 值。

下面探索了获取 Segment 段和初始化 Segment 段的操作。最初一行的 Segment 的 put 办法还没有查看,持续剖析。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 获取 ReentrantLock 独占锁,获取不到,scanAndLockForPut 获取。HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
    V oldValue;
    try {HashEntry<K,V>[] tab = table;
        // 计算要 put 的数据地位
        int index = (tab.length - 1) & hash;
        // CAS 获取 index 坐标的值
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {if (e != null) {
                // 查看是否 key 曾经存在,如果存在,则遍历链表寻找地位,找到后替换 value
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {
                // first 有值没阐明 index 地位曾经有值了,有抵触,链表头插法。if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                // 容量大于扩容阀值,小于最大容量,进行扩容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    // index 地位赋值 node,node 可能是一个元素,也可能是一个链表的表头
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {unlock();
    }
    return oldValue;
}

因为 Segment 继承了 ReentrantLock,所以 Segment 外部能够很不便的获取锁,put 流程就用到了这个性能。

  1. tryLock() 获取锁,获取不到应用 scanAndLockForPut 办法持续获取。
  2. 计算 put 的数据要放入的 index 地位,而后获取这个地位上的 HashEntry。
  3. 遍历 put 新元素,为什么要遍历?因为这里获取的 HashEntry 可能是一个空元素,也可能是链表已存在,所以要区别对待。

    如果这个地位上的 HashEntry 不存在

    1. 如果以后容量大于扩容阀值,小于最大容量,进行扩容
    2. 间接头插法插入。

    如果这个地位上的 HashEntry 存在

    1. 判断链表以后元素 Key 和 hash 值是否和要 put 的 key 和 hash 值统一。统一则替换值
    2. 不统一,获取链表下一个节点,直到发现雷同进行值替换,或者链表表里结束没有雷同的。

      1. 如果以后容量大于扩容阀值,小于最大容量,进行扩容
      2. 间接链表头插法插入。
  4. 如果要插入的地位之前曾经存在,替换后返回旧值,否则返回 null.

这外面的第一步中的 scanAndLockForPut 操作这里没有介绍,这个办法做的操作就是一直的自旋 tryLock() 获取锁。当自旋次数大于指定次数时,应用 lock() 阻塞获取锁。在自旋时顺表获取下 hash 地位的 HashEntry。

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node
    // 自旋获取锁
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {if (e == null) {if (node == null) // speculatively create node
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            }
            else if (key.equals(e.key))
                retries = 0;
            else
                e = e.next;
        }
        else if (++retries > MAX_SCAN_RETRIES) {
            // 自旋达到指定次数后,阻塞等到只到获取到锁
            lock();
            break;
        }
        else if ((retries & 1) == 0 &&
                 (f = entryForHash(this, hash)) != first) {
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}

4. 扩容 rehash

ConcurrentHashMap 的扩容只会扩容到原来的两倍。老数组里的数据挪动到新的数组时,地位要么不变,要么变为 index+ oldSize,参数里的 node 会在扩容之后应用链表 头插法 插入到指定地位。

private void rehash(HashEntry<K,V> node) {HashEntry<K,V>[] oldTable = table;
    // 老容量
    int oldCapacity = oldTable.length;
    // 新容量,扩充两倍
    int newCapacity = oldCapacity << 1;
    // 新的扩容阀值 
    threshold = (int)(newCapacity * loadFactor);
    // 创立新的数组
    HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity];
    // 新的掩码,默认 2 扩容后是 4,- 1 是 3,二进制就是 11。int sizeMask = newCapacity - 1;
    for (int i = 0; i < oldCapacity ; i++) {
        // 遍历老数组
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            // 计算新的地位,新的地位只可能是不便或者是老的地位 + 老的容量。int idx = e.hash & sizeMask;
            if (next == null)   //  Single node on list
                // 如果以后地位还不是链表,只是一个元素,间接赋值
                newTable[idx] = e;
            else { // Reuse consecutive sequence at same slot
                // 如果是链表了
                HashEntry<K,V> lastRun = e;
                int lastIdx = idx;
                // 新的地位只可能是不便或者是老的地位 + 老的容量。// 遍历完结后,lastRun 前面的元素地位都是雷同的
                for (HashEntry<K,V> last = next; last != null; last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                //,lastRun 前面的元素地位都是雷同的,间接作为链表赋值到新地位。newTable[lastIdx] = lastRun;
                // Clone remaining nodes
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    // 遍历残余元素,头插法到指定 k 地位。V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    // 头插法插入新的节点
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}

有些同学可能会对最初的两个 for 循环有纳闷,这里第一个 for 是为了寻找这样一个节点,这个节点前面的所有 next 节点的新地位都是雷同的。而后把这个作为一个链表赋值到新地位。第二个 for 循环是为了把残余的元素通过头插法插入到指定地位链表。这样实现的起因可能是基于概率统计,有深入研究的同学能够发表下意见。

5. get

到这里就很简略了,get 办法只须要两步即可。

  1. 计算失去 key 的寄存地位。
  2. 遍历指定地位查找雷同 key 的 value 值。
public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    // 计算失去 key 的寄存地位
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            // 如果是链表,遍历查找到雷同 key 的 value。K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

2 ConcurrentHashMap 1.8

1. 存储构造

能够发现 Java8 的 ConcurrentHashMap 绝对于 Java7 来说变动比拟大,不再是之前的 Segment 数组 + HashEntry 数组 + 链表,而是 Node 数组 + 链表 / 红黑树。当抵触链表达到肯定长度时,链表会转换成红黑树。

补充:CAS

CAS(Compare-and-Swap/Exchange),即比拟并替换,是一种实现并发罕用到的技术。

CAS 外围算法:执行函数:CAS(V,E,N)

 V 示意筹备要被更新的变量(内存的值)E 示意咱们提供的 冀望的值(冀望的原值)N 示意新值,筹备更新 V 的值(新值)

算法思路:V 是共享变量,咱们拿着本人筹备的这个 E,去跟 V 去比拟,如果 E == V,阐明以后没有其它线程在操作,所以,咱们把 N 这个值 写入对象的 V 变量中。如果 E!= V,阐明咱们筹备的这个 E,曾经过期了,所以咱们要从新筹备一个最新的 E,去跟 V 比拟,比拟胜利后能力更新 V 的值为 N。

如果多个线程同时应用 CAS 操作一个变量的时候,只有一个线程可能批改胜利。其余的线程提供的期望值曾经与共享变量的值不一样了,所以均会失败。

   因为 CAS 操作属于乐观派,它总是认为本人可能操作胜利,所以操作失败的线程将会再次发动操作,而不是被 OS 挂起。所以说,即便 CAS 操作没有应用同步锁,其它线程也可能晓得对共享变量的影响。因为其它线程没有被挂起,并且将会再次发动批改尝试,所以无锁操作即 CAS 操作天生免疫死锁。另外一点须要晓得的是,CAS 是零碎原语,CAS 操作是一条 CPU 的原子指令,所以不会有线程平安问题。

ABA 问题:E 和 E2 比照雷同是不能保障百分百保障,其余线程没有在本人线程执行计算的过程里抢锁胜利过。有可能其余线程操作后新 E 值和旧 E 值一样!

ABA 问题解决:在 E 对象里加个操作次数变量就行,每次判断时比照两个,E 和操作次数就 OK 了,因为 ABA 问题中就算 E 雷同操作次数也绝不雷同

2. 初始化 initTable

/**
 * Initializes table, using the size recorded in sizeCtl.
 */
private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        // 如果 sizeCtl < 0 , 阐明另外的线程执行 CAS 胜利,正在进行初始化。if ((sc = sizeCtl) < 0)
            // 让出 CPU 使用权
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {sizeCtl = sc;}
            break;
        }
    }
    return tab;
}

从源码中能够发现 ConcurrentHashMap 的初始化是通过 自旋和 CAS 操作实现的。外面须要留神的是变量 sizeCtl,它的值决定着以后的初始化状态。

  1. -1 阐明正在初始化
  2. -N 阐明有 N - 1 个线程正在进行扩容
  3. 示意 table 初始化大小,如果 table 没有初始化
  4. 示意 table 容量,如果 table 曾经初始化。

3. put

间接过一遍 put 源码。

public V put(K key, V value) {return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    // key 和 value 不能为空
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        // f = 指标地位元素
        Node<K,V> f; int n, i, fh;// fh 前面寄存指标地位的元素 hash 值
        if (tab == null || (n = tab.length) == 0)
            // 数组桶为空,初始化数组桶(自旋 +CAS)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 桶内为空,CAS 放入,不加锁,胜利了就间接 break 跳出
            if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
                break;  // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            // 应用 synchronized 加锁退出节点
            synchronized (f) {if (tabAt(tab, i) == f) {
                    // 阐明是链表
                    if (fh >= 0) {
                        binCount = 1;
                        // 循环退出新的或者笼罩节点
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        // 红黑树
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}
  1. 依据 key 计算出 hashcode。
  2. 判断是否须要进行初始化。
  3. 即为以后 key 定位出的 Node,如果为空示意以后地位能够写入数据,利用 CAS 尝试写入,失败则自旋保障胜利。
  4. 如果以后地位的 hashcode == MOVED == -1, 则须要进行扩容。
  5. 如果都不满足,则利用 synchronized 锁写入数据。
  6. 如果数量大于 TREEIFY_THRESHOLD 则要转换为红黑树。

4. get

get 流程比较简单,间接过一遍源码。

public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // key 所在的 hash 地位
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        // 如果指定地位元素存在,头结点 hash 值雷同
        if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                // key hash 值相等,key 值雷同,间接返回元素 value
                return e.val;
        }
        else if (eh < 0)
            // 头结点 hash 值小于 0,阐明正在扩容或者是红黑树,find 查找
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {
            // 是链表,遍历查找
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

总结一下 get 过程:

  1. 依据 hash 值计算地位。
  2. 查找到指定地位,如果头节点就是要找的,间接返回它的 value.
  3. 如果头节点 hash 值小于 0,阐明正在扩容或者是红黑树,查找之。
  4. 如果是链表,遍历查找之。

3 总结

Java7 中 ConcurrentHashMap 应用的 分段锁,也就是每一个 Segment 上同时只有一个线程能够操作,每一个 Segment 都是一个相似 HashMap 数组的构造,它能够扩容,它的抵触会转化为链表。然而 Segment 的个数一但初始化就不能扭转。

Java8 中的 ConcurrentHashMap 应用的 Synchronized 锁加 CAS 的机制。构造也由 Java7 中的 Segment 数组 + HashEntry 数组 + 链表 进化成了 Node 数组 + 链表 / 红黑树,Node 是相似于一个 HashEntry 的构造。它的抵触再达到肯定大小时会转化成红黑树,在抵触小于肯定数量时又退回链表。

如果本文对您有帮忙,欢送 关注 点赞`,您的反对是我保持创作的能源。

转载请注明出处!

正文完
 0