关于后端:Java-并发编程之-ConcurrentHashMap-源码分析

44次阅读

共计 14842 个字符,预计需要花费 38 分钟才能阅读完成。

走好抉择的路,别抉择好走的路,你能力领有真正的本人。

咱们晓得哈希表是一种十分高效的数据结构,设计低劣的哈希函数能够使其上的增删改查操作达到 O(1)级别。Java 为咱们提供了一个现成的哈希构造,那就是 HashMap 类,在后面的文章中我已经介绍过 HashMap 类,晓得它的所有办法都未进行同步,因而在多线程环境中是不平安的。为此,Java 为咱们提供了另外一个 HashTable 类,它对于多线程同步的解决非常简单粗犷,那就是在 HashMap 的根底上对其所有办法都应用 synchronized 关键字进行加锁。

这种办法尽管简略,但导致了一个问题,那就是在同一时间内只能由一个线程去操作哈希表。即便这些线程都只是进行读操作也必须要排队,这在竞争强烈的多线程环境中极为影响性能。本篇介绍的 ConcurrentHashMap 就是为了解决这个问题的,它的外部应用分段锁将锁进行细粒度化,从而使得多个线程可能同时操作哈希表,这样极大的进步了性能。

下图是其内部结构的示意图。

1. ConcurrentHashMap 有哪些成员变量?

// 默认初始化容量  
static final int DEFAULT_INITIAL_CAPACITY = 16;  
  
// 默认加载因子  
static final float DEFAULT_LOAD_FACTOR = 0.75f;  
  
// 默认并发级别  
static final int DEFAULT_CONCURRENCY_LEVEL = 16;  
  
// 汇合最大容量  
static final int MAXIMUM_CAPACITY = 1 << 30;  
  
// 分段锁的最小数量  
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;  
  
// 分段锁的最大数量  
static final int MAX_SEGMENTS = 1 << 16;  
  
// 加锁前的重试次数  
static final int RETRIES_BEFORE_LOCK = 2;  
  
// 分段锁的掩码值  
final int segmentMask;  
  
// 分段锁的移位值  
final int segmentShift;  
  
// 分段锁数组  
final Segment<K,V>[] segments;

在浏览完本篇文章之前,置信读者不能了解这些成员变量的具体含意和作用,不过请读者们急躁看上来,前面将会在具体场景中一一介绍到这些成员变量的作用。在这里读者只需对这些成员变量留个眼生即可。

然而仍有个别变量是咱们当初须要理解的,例如 Segment 数组代表分段锁汇合,并发级别则代表分段锁的数量(也象征有多少线程能够同时操作),初始化容量代表整个容器的容量,加载因子代表容器元素能够达到多满的一种水平。

2. 分段锁的内部结构是怎么的?

// 分段锁  
static final class Segment<K,V> extends ReentrantLock implements Serializable {  
    // 自旋最大次数  
    static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;  
    // 哈希表  
    transient volatile HashEntry<K,V>[] table;  
    // 元素总数  
    transient int count;  
    // 批改次数  
    transient int modCount;  
    // 元素阀值  
    transient int threshold;  
    // 加载因子  
    final float loadFactor;  
    // 省略以下内容  
    ...  
}

Segment 是 ConcurrentHashMap 的动态外部类,能够看到它继承自 ReentrantLock,因而它在实质上是一个锁。它在外部持有一个 HashEntry 数组(哈希表),并且保障所有对该数组的增删改查办法都是线程平安的,具体是怎么实现的前面会讲到。

所有对 ConcurrentHashMap 的增删改查操作都能够委托 Segment 来进行,因而 ConcurrentHashMap 可能保障在多线程环境下是平安的。又因为不同的 Segment 是不同的锁,所以多线程能够同时操作不同的 Segment,也就意味着多线程能够同时操作 ConcurrentHashMap,这样就能防止 HashTable 的缺点,从而极大的进步性能。

3. ConcurrentHashMap 初始化时做了些什么?

// 外围结构器
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) {throw new IllegalArgumentException();
    }
    // 确保并发级别不大于限定值
    if (concurrencyLevel > MAX_SEGMENTS) {concurrencyLevel = MAX_SEGMENTS;}
    int sshift = 0;
    int ssize = 1;
    // 保障 ssize 为 2 的幂, 且是最靠近的大于等于并发级别的数
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 计算分段锁的移位值
    this.segmentShift = 32 - sshift;
    // 计算分段锁的掩码值
    this.segmentMask = ssize - 1;
    // 总的初始容量不能大于限定值
    if (initialCapacity > MAXIMUM_CAPACITY) {initialCapacity = MAXIMUM_CAPACITY;}
    // 获取每个分段锁的初始容量
    int c = initialCapacity / ssize;
    // 分段锁容量总和不小于初始总容量
    if (c * ssize < initialCapacity) {++c;}
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    // 保障 cap 为 2 的幂, 且是最靠近的大于等于 c 的数
    while (cap < c) {cap <<= 1;}
    // 新建一个 Segment 对象模版
    Segment<K, V> s0 = new Segment<K, V>(loadFactor, (int) (cap * loadFactor), (HashEntry<K, V>[]) new HashEntry[cap]);
    // 新建指定大小的分段锁数组
    Segment<K, V>[] ss = (Segment<K, V>[]) new Segment[ssize];
    // 应用 UnSafe 给数组第 0 个元素赋值
    UNSAFE.putOrderedObject(ss, SBASE, s0);
    this.segments = ss;
}

ConcurrentHashMap 有多个结构器,然而下面贴出的是它的外围结构器,其余结构器都通过调用它来实现初始化。外围结构器须要传入三个参数,别离是初始容量,加载因子和并发级别。在后面介绍成员变量时咱们能够晓得默认的初始容量为 16,加载因子为 0.75f,并发级别为 16。

当初咱们看到外围结构器的代码,首先是通过传入的 concurrencyLevel 来计算出 ssize,ssize 是 Segment 数组的长度,它必须保障是 2 的幂,这样就能够通过 hash&ssize- 1 来计算分段锁在数组中的下标。

因为传入的 concurrencyLevel 不能保障是 2 的幂,所以不能间接用它来当作 Segment 数组的长度,因而咱们要找到一个最靠近 concurrencyLevel 的 2 的幂,用它来作为数组的长度。如果当初传入的 concurrencyLevel=15,通过下面代码能够计算出 ssize=16,sshift=4。接下来立马能够算出 segmentShift=16,segmentMask=15。留神这里的 segmentShift 是分段锁的移位值,segmentMask 是分段锁的掩码值,这两个值是用来计算分段锁在数组中的下标,在上面咱们会讲到。

在算出分段锁的个数 ssize 之后,就能够依据传入的总容量来计算每个分段锁的容量,它的值 c = initialCapacity / ssize。分段锁的容量也就是 HashEntry 数组的长度,同样也必须保障是 2 的幂,而下面算出的 c 的值不能保障这一点,所以不能间接用 c 作为 HashEntry 数组的长度,须要另外找到一个最靠近 c 的 2 的幂,将这个值赋给 cap,而后用 cap 来作为 HashEntry 数组的长度。当初咱们有了 ssize 和 cap,就能够新建分段锁数组 Segment[]和元素数组 HashEntry[]了。留神,与 JDK1.6 不同是的,在 JDK1.7 中只新建了 Segment 数组,并没有对它初始化,初始化 Segment 的操作留到了插入操作时进行。

4. 通过怎么的形式来定位锁和定位元素?

// 依据哈希码获取分段锁
@SuppressWarnings("unchecked")
private Segment<K, V> segmentForHash(int h) {long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    return (Segment<K, V>) UNSAFE.getObjectVolatile(segments, u);
}

// 依据哈希码获取元素
@SuppressWarnings("unchecked")
static final <K, V> HashEntry<K, V> entryForHash(Segment<K, V> seg, int h) {HashEntry<K, V>[] tab;
    return (seg == null || (tab = seg.table) == null) ? null :
            (HashEntry<K, V>) UNSAFE.getObjectVolatile(tab, ((long) (((tab.length - 1) & h)) << TSHIFT) + TBASE);
}

在 JDK1.7 中是通过 UnSafe 来获取数组元素的,因而这里比 JDK1.6 多了些计算数组元素偏移量的代码,这些代码咱们临时不关注,当初咱们只需晓得上面这两点:

  • 通过哈希码计算分段锁在数组中的下标:(h >>> segmentShift) & segmentMask。
  • 通过哈希码计算元素在数组中的下标:(tab.length – 1) & h。

当初咱们假如传给结构器的两个参数为 initialCapacity=128, concurrencyLevel=16。依据计算能够失去 ssize=16, sshift=4,segmentShift=28,segmentMask=15。同样,算得每个分段锁内的 HashEntry 数组的长度为 8,所以 tab.length-1=7。依据这些值,咱们通过下图来解释如何依据同一个哈希码来定位分段锁和元素。

能够看到分段锁和元素的定位都是通过元素的哈希码来决定的。定位分段锁是取哈希码的高位值(从 32 位处取起),定位元素是取的哈希码的低位值。当初有个问题,它们一个从 32 位的左端取起,一个从 32 位的右端取起,那么会在某个时刻产生抵触吗?咱们在成员变量里能够找到 MAXIMUM_CAPACITY = 1 << 30,MAX_SEGMENTS = 1 << 16,这阐明定位分段锁和定位元素应用的总的位数不超过 30,并且定位分段锁应用的位数不超过 16,所以至多还隔着 2 位的空余,因而是不会产生抵触的。

5. 查找元素具体是怎么实现的?

// 依据 key 获取 value
public V get(Object key) {
    Segment<K, V> s;
    HashEntry<K, V>[] tab;
    // 应用哈希函数计算哈希码
    int h = hash(key);
    // 依据哈希码计算分段锁的索引
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    // 获取分段锁和对应的哈希表
    if ((s = (Segment<K, V>) UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
        // 依据哈希码获取链表头结点, 再对链表进行遍历
        for (HashEntry<K, V> e = (HashEntry<K, V>) UNSAFE.getObjectVolatile
                (tab, ((long) (((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            // 依据 key 和 hash 找到对应元素后返回 value 值
            if ((k = e.key) == key || (e.hash == h && key.equals(k))) {return e.value;}
        }
    }
    return null;
}

在 JDK1.6 中分段锁的 get 办法是通过下标来拜访数组元素的,而在 JDK1.7 中是通过 UnSafe 的 getObjectVolatile 办法来读取数组中的元素。为啥要这样做?

咱们晓得尽管 Segment 对象持有的 HashEntry 数组援用是 volatile 类型的,然而数组内的元素援用不是 volatile 类型的,因而多线程对数组元素的批改是不平安的,可能会在数组中读取到尚未结构实现的对象。

在 JDK1.6 中是通过第二次加锁读取来保障平安的,而 JDK1.7 中通过 UnSafe 的 getObjectVolatile 办法来读取同样也是为了保障这一点。应用 getObjectVolatile 办法读取数组元素须要先取得元素在数组中的偏移量,在这里依据哈希码计算失去分段锁在数组中的偏移量为 u,而后通过偏移量 u 来尝试读取分段锁。因为分段锁数组在结构时没进行初始化,因而可能读出来一个空值,所以须要先进行判断。

在确定分段锁和它外部的哈希表都不为空之后,再通过哈希码读取 HashEntry 数组的元素,依据下面的结构图能够看到,这时取得的是链表的头结点。之后再从头到尾的对链表进行遍历查找,如果找到对应的值就将其返回,否则就返回 null。以上就是整个查找元素的过程。

6. 插入元素具体是怎么实现的?

// 向汇合增加键值对(若存在则替换)
@SuppressWarnings("unchecked")
public V put(K key, V value) {
    Segment<K, V> s;
    // 传入的 value 不能为空
    if (value == null) throw new NullPointerException();
    // 应用哈希函数计算哈希码
    int hash = hash(key);
    // 依据哈希码计算分段锁的下标
    int j = (hash >>> segmentShift) & segmentMask;
    // 依据下标去尝试获取分段锁
    if ((s = (Segment<K, V>) UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null) {
        // 取得的分段锁为空就去结构一个
        s = ensureSegment(j);
    }
    // 调用分段锁的 put 办法
    return s.put(key, hash, value, false);
}

// 向汇合增加键值对(不存在才增加)
@SuppressWarnings("unchecked")
public V putIfAbsent(K key, V value) {
    Segment<K, V> s;
    // 传入的 value 不能为空
    if (value == null) throw new NullPointerException();
    // 应用哈希函数计算哈希码
    int hash = hash(key);
    // 依据哈希码计算分段锁的下标
    int j = (hash >>> segmentShift) & segmentMask;
    // 依据下标去尝试获取分段锁
    if ((s = (Segment<K, V>) UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null) {
        // 取得的分段锁为空就去结构一个
        s = ensureSegment(j);
    }
    // 调用分段锁的 put 办法
    return s.put(key, hash, value, true);
}

ConcurrentHashMap 中有两个增加键值对的办法,通过 put 办法增加时如果存在则会进行笼罩,通过 putIfAbsent 办法增加时如果存在则不进行笼罩,这两个办法都是调用分段锁的 put 办法来实现操作,只是传入的最初一个参数不同而已。

在下面代码中咱们能够看到首先是依据 key 的哈希码来计算出分段锁在数组中的下标,而后依据下标应用 UnSafe 类 getObject 办法来读取分段锁。因为在结构 ConcurrentHashMap 时没有对 Segment 数组中的元素初始化,所以可能读到一个空值,这时会先通过 ensureSegment 办法新建一个分段锁。获取到分段锁之后再调用它的 put 办法实现增加操作,上面咱们来看看具体是怎么操作的。

// 增加键值对
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 尝试获取锁, 若失败则进行自旋
    HashEntry<K, V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
    V oldValue;
    try {HashEntry<K, V>[] tab = table;
        // 计算元素在数组中的下标
        int index = (tab.length - 1) & hash;
        // 依据下标获取链表头结点
        HashEntry<K, V> first = entryAt(tab, index);
        for (HashEntry<K, V> e = first; ;) {
            // 遍历链表寻找该元素, 找到则进行替换
            if (e != null) {
                K k;
                if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    // 依据参数决定是否替换旧值
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
                // 没找到则在链表增加一个结点
            } else {
                // 将 node 结点插入链表头部
                if (node != null) {node.setNext(first);
                } else {node = new HashEntry<K, V>(hash, key, value, first);
                }
                // 插入结点后将元素总是加 1
                int c = count + 1;
                // 元素超过阀值则进行扩容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY) {rehash(node);
                    // 否则就将哈希表指定下标替换为 node 结点
                } else {setEntryAt(tab, index, node);
                }
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {unlock();
    }
    return oldValue;
}

为保障线程平安,分段锁中的 put 操作是须要进行加锁的,所以线程一开始就会去获取锁,如果获取胜利就继续执行,若获取失败则调用 scanAndLockForPut 办法进行自旋,在自旋过程中会先去扫描哈希表去查找指定的 key,如果 key 不存在就会新建一个 HashEntry 返回,这样在获取到锁之后就不用再去新建了,为的是在期待锁的过程中顺便做些事件,不至于白白浪费工夫,可见作者的良苦用心。

具体自旋办法咱们前面再细讲,当初先把关注点拉回来,线程在胜利获取到锁之后会依据计算到的下标,获取指定下标的元素。此时获取到的是链表的头结点,如果头结点不为空就对链表进行遍历查找,找到之后再依据 onlyIfAbsent 参数的值决定是否进行替换。

如果遍历没找到就会新建一个 HashEntry 指向头结点,此时如果自旋时创立了 HashEntry,则间接将它的 next 指向以后头结点,如果自旋时没有创立就在这里新建一个 HashEntry 并指向头结点。在向链表增加元素之后查看元素总数是否超过阀值,如果超过就调用 rehash 进行扩容,没超过的话就间接将数组对应下标的元素援用指向新增加的 node。setEntryAt 办法外部是通过调用 UnSafe 的 putOrderedObject 办法来更改数组元素援用的,这样就保障了其余线程在读取时能够读到最新的值。

7. 删除元素具体是怎么实现的?

// 删除指定元素(找到对应元素后间接删除)
public V remove(Object key) {
    // 应用哈希函数计算哈希码
    int hash = hash(key);
    // 依据哈希码获取分段锁的索引
    Segment<K, V> s = segmentForHash(hash);
    // 调用分段锁的 remove 办法
    return s == null ? null : s.remove(key, hash, null);
}

// 删除指定元素(查找值等于给定值才删除)
public boolean remove(Object key, Object value) {
    // 应用哈希函数计算哈希码
    int hash = hash(key);
    Segment<K, V> s;
    // 确保分段锁不为空才调用 remove 办法
    return value != null && (s = segmentForHash(hash)) != null && s.remove(key, hash, value) != null;
}

ConcurrentHashMap 提供了两种删除操作,一种是找到后间接删除,一种是找到后先比拟再删除。这两种删除办法都是先依据 key 的哈希码找到对应的分段锁后,再通过调用分段锁的 remove 办法实现删除操作。上面咱们来看看分段锁的 remove 办法。

// 删除指定元素
final V remove(Object key, int hash, Object value) {
    // 尝试获取锁, 若失败则进行自旋
    if (!tryLock()) {scanAndLock(key, hash);
    }
    V oldValue = null;
    try {HashEntry<K, V>[] tab = table;
        // 计算元素在数组中的下标
        int index = (tab.length - 1) & hash;
        // 依据下标获得数组元素(链表头结点)
        HashEntry<K, V> e = entryAt(tab, index);
        HashEntry<K, V> pred = null;
        // 遍历链表寻找要删除的元素
        while (e != null) {
            K k;
            //next 指向以后结点的后继结点
            HashEntry<K, V> next = e.next;
            // 依据 key 和 hash 寻找对应结点
            if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
                V v = e.value;
                // 传入的 value 不等于 v 就跳过, 其余状况就进行删除操作
                if (value == null || value == v || value.equals(v)) {
                    // 如果 pred 为空则代表要删除的结点为头结点
                    if (pred == null) {
                        // 从新设置链表头结点
                        setEntryAt(tab, index, next);
                    } else {
                        // 设置 pred 结点的后继为 next 结点
                        pred.setNext(next);
                    }
                    ++modCount;
                    --count;
                    // 记录元素删除之前的值
                    oldValue = v;
                }
                break;
            }
            // 若 e 不是要找的结点就将 pred 援用指向它
            pred = e;
            // 查看下一个结点
            e = next;
        }
    } finally {unlock();
    }
    return oldValue;
}

在删除分段锁中的元素时须要先获取锁,如果获取失败就调用 scanAndLock 办法进行自旋,如果获取胜利就执行下一步,首先计算数组下标而后通过下标获取 HashEntry 数组的元素,这里取得了链表的头结点,接下来就是对链表进行遍历查找,在此之前先用 next 指针记录以后结点的后继结点,而后比照 key 和 hash 看看是否是要找的结点,如果是的话就执行下一个 if 判断。

满足 value 为空或者 value 的值等于结点以后值这两个条件就会进入到 if 语句中进行删除操作,否则间接跳过。在 if 语句中执行删除操作时会有两种状况,如果以后结点为头结点则间接将 next 结点设置为头结点,如果以后结点不是头结点则将 pred 结点的后继设置为 next 结点。这里的 pred 结点示意以后结点的前继结点,每次在要查看下一个结点之前就将 pred 指向以后结点,这就保障了 pred 结点总是以后结点的前继结点。

留神,与 JDK1.6 不同,在 JDK1.7 中 HashEntry 对象的 next 变量不是 final 的,因而这里能够通过间接批改 next 援用的值来删除元素,因为 next 变量是 volatile 类型的,所以读线程能够马上读到最新的值。

8. 替换元素具体是怎么实现的?

// 替换指定元素(CAS 操作)
public boolean replace(K key, V oldValue, V newValue) {
    // 应用哈希函数计算哈希码
    int hash = hash(key);
    // 保障 oldValue 和 newValue 不为空
    if (oldValue == null || newValue == null) throw new NullPointerException();
    // 依据哈希码获取分段锁的索引
    Segment<K, V> s = segmentForHash(hash);
    // 调用分段锁的 replace 办法
    return s != null && s.replace(key, hash, oldValue, newValue);
}

// 替换元素操作(CAS 操作)
final boolean replace(K key, int hash, V oldValue, V newValue) {
    // 尝试获取锁, 若失败则进行自旋
    if (!tryLock()) {scanAndLock(key, hash);
    }
    boolean replaced = false;
    try {
        HashEntry<K, V> e;
        // 通过 hash 间接找到头结点而后对链表遍历
        for (e = entryForHash(this, hash); e != null; e = e.next) {
            K k;
            // 依据 key 和 hash 找到要替换的结点
            if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
                // 如果指定的以后值正确则进行替换
                if (oldValue.equals(e.value)) {
                    e.value = newValue;
                    ++modCount;
                    replaced = true;
                }
                // 否则不进行任何操作间接返回
                break;
            }
        }
    } finally {unlock();
    }
    return replaced;
}

ConcurrentHashMap 同样提供了两种替换操作,一种是找到后间接替换,另一种是找到后先比拟再替换(CAS 操作)。这两种操作的实现大抵是雷同的,只是 CAS 操作在替换前多了一层比拟操作,因而咱们只需简略理解其中一种操作即可。

这里拿 CAS 操作进行剖析,还是老套路,首先依据 key 的哈希码找到对应的分段锁,而后调用它的 replace 办法。进入分段锁中的 replace 办法后须要先去获取锁,如果获取失败则进行自旋,如果获取胜利则进行下一步。首先依据 hash 码获取链表头结点,而后依据 key 和 hash 进行遍历查找,找到了对应的元素之后,比拟给定的 oldValue 是否是以后值,如果不是则放弃批改,如果是则用新值进行替换。因为 HashEntry 对象的 value 域是 volatile 类型的,因而能够间接替换。

9. 自旋时具体做了些什么?

// 自旋期待获取锁(put 操作)
private HashEntry<K, V> scanAndLockForPut(K key, int hash, V value) {
    // 依据哈希码获取头结点
    HashEntry<K, V> first = entryForHash(this, hash);
    HashEntry<K, V> e = first;
    HashEntry<K, V> node = null;
    int retries = -1;
    // 在 while 循环内自旋
    while (!tryLock()) {
        HashEntry<K, V> f;
        if (retries < 0) {
            // 如果头结点为空就新建一个 node
            if (e == null) {if (node == null) {node = new HashEntry<K, V>(hash, key, value, null);
                }
                retries = 0;
                // 否则就遍历链表定位该结点
            } else if (key.equals(e.key)) {retries = 0;} else {e = e.next;}
            //retries 每次在这加 1, 并判断是否超过最大值
        } else if (++retries > MAX_SCAN_RETRIES) {lock();
            break;
            //retries 为偶数时去判断 first 有没有扭转
        } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) {
            e = first = f;
            retries = -1;
        }
    }
    return node;
}

// 自旋期待获取锁(remove 和 replace 操作)
private void scanAndLock(Object key, int hash) {
    // 依据哈希码获取链表头结点
    HashEntry<K, V> first = entryForHash(this, hash);
    HashEntry<K, V> e = first;
    int retries = -1;
    // 在 while 循环里自旋
    while (!tryLock()) {
        HashEntry<K, V> f;
        if (retries < 0) {
            // 遍历链表定位到该结点
            if (e == null || key.equals(e.key)) {retries = 0;} else {e = e.next;}
            //retries 每次在这加 1, 并判断是否超过最大值
        } else if (++retries > MAX_SCAN_RETRIES) {lock();
            break;
            //retries 为偶数时去判断 first 有没有扭转
        } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) {
            e = first = f;
            retries = -1;
        }
    }
}

在后面咱们讲到过,分段锁中的 put,remove,replace 这些操作都会要求先去获取锁,只有胜利取得锁之后能力进行下一步操作,如果获取失败就会进行自旋。

自旋操作也是在 JDK1.7 中增加的,为了防止线程频繁的挂起和唤醒,以此进步并发操作时的性能。在 put 办法中调用的是 scanAndLockForPut,在 remove 和 replace 办法中调用的是 scanAndLock。这两种自旋办法大抵是雷同的,这里咱们只剖析 scanAndLockForPut 办法。首先还是先依据 hash 码取得链表头结点,之后线程会进入 while 循环中执行,退出该循环的惟一形式是胜利获取锁,而在这期间线程不会被挂起。

刚进入循环时 retries 的值为 -1,这时线程不会马上再去尝试获取锁,而是先去寻找到 key 对应的结点 (没找到会新建一个),而后再将 retries 设为 0,接下来就会一次次的尝试获取锁,对应 retries 的值也会每次加 1,直到超过最大尝试次数如果还没获取到锁,就会调用 lock 办法进行阻塞获取。在尝试获取锁的期间,还会每隔一次(retries 为偶数) 去查看头结点是否被扭转,如果被扭转则将 retries 重置回 -1,而后再重走一遍方才的流程。这就是线程自旋时所做的操作,需注意的是如果在自旋时检测到头结点已被扭转,则会延长线程的自旋工夫。

10. 哈希表扩容时都做了哪些操作?

// 再哈希
@SuppressWarnings("unchecked")
private void rehash(HashEntry<K, V> node) {
    // 获取旧哈希表的援用
    HashEntry<K, V>[] oldTable = table;
    // 获取旧哈希表的容量
    int oldCapacity = oldTable.length;
    // 计算新哈希表的容量(为旧哈希表的 2 倍)
    int newCapacity = oldCapacity << 1;
    // 计算新的元素阀值
    threshold = (int) (newCapacity * loadFactor);
    // 新建一个 HashEntry 数组
    HashEntry<K, V>[] newTable = (HashEntry<K, V>[]) new HashEntry[newCapacity];
    // 生成新的掩码值
    int sizeMask = newCapacity - 1;
    // 遍历旧表的所有元素
    for (int i = 0; i < oldCapacity; i++) {
        // 获得链表头结点
        HashEntry<K, V> e = oldTable[i];
        if (e != null) {
            HashEntry<K, V> next = e.next;
            // 计算元素在新表中的索引
            int idx = e.hash & sizeMask;
            //next 为空表明链表只有一个结点
            if (next == null) {
                // 间接把该结点放到新表中
                newTable[idx] = e;
            } else {
                HashEntry<K, V> lastRun = e;
                int lastIdx = idx;
                // 定位 lastRun 结点, 将 lastRun 之后的结点间接放到新表中
                for (HashEntry<K, V> last = next; last != null; last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                newTable[lastIdx] = lastRun;
                // 遍历在链表 lastRun 结点之前的元素, 将它们顺次复制到新表中
                for (HashEntry<K, V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K, V> n = newTable[k];
                    newTable[k] = new HashEntry<K, V>(h, p.key, v, n);
                }
            }
        }
    }
    // 计算传入结点在新表中的下标
    int nodeIndex = node.hash & sizeMask;
    // 将传入结点增加到链表头结点
    node.setNext(newTable[nodeIndex]);
    // 将新表指定下标元素换成传入结点
    newTable[nodeIndex] = node;
    // 将哈希表援用指向新表
    table = newTable;
}

rehash 办法在 put 办法中被调用,咱们晓得在 put 办法时会新建元素并增加到哈希数组中,随着元素的增多产生哈希抵触的可能性越大,哈希表的性能也会随之降落。因而每次 put 操作时都会查看元素总数是否超过阀值,如果超过则调用 rehash 办法进行扩容。

因为数组长度一旦确定则不能再被扭转,因而须要新建一个数组来替换原先的数组。从代码中能够晓得新创建的数组长度为原数组的 2 倍 (oldCapacity << 1)。创立好新数组后须要将旧数组中的所有元素移到新数组中,因而须要计算每个元素在新数组中的下标。计算新下标的过程如下图所示。

咱们晓得下标间接取的是哈希码的后几位,因为新数组的容量是间接用旧数组容量右移 1 位得来的,因而掩码位数向右减少 1 位,取到的哈希码位数也向右减少 1 位。如上图,若旧的掩码值为 111,则元素下标为 101,扩容后新的掩码值为 1111,则计算出元素的新下标为 0101。

因为同一条链表上的元素下标是雷同的,当初假如链表所有元素的下标为 101,在扩容后该链表元素的新下标只有 0101 或 1101 这两种状况,因而数组扩容会打乱原先的链表并将链表元素分成两批。在计算出新下标后须要将元素挪动到新数组中,在 HashMap 中通过间接批改 next 援用导致了多线程的死锁。

尽管在 ConcurrentHashMap 中通过加锁防止了这种状况,然而咱们晓得 next 域是 volatile 类型的,它的改变能立马被读线程读取到,因而为保障线程平安采纳复制元素来迁徙数组。然而对链表中每个元素都进行复制有点影响性能,作者发现链表尾部有许多元素的 next 是不变的,它们在新数组中的下标是雷同的,因而能够思考整体挪动这部分元素。具统计实际操作中只有 1 / 6 的元素是必须复制的,所以整体挪动链表尾部元素 (lastRun 前面的元素) 是能够晋升肯定性能的。

注:本篇文章基于 JDK1.7 版本。

正文完
 0