关于java:JDK17-ConcurrentHashMap解析

43次阅读

共计 7644 个字符,预计需要花费 20 分钟才能阅读完成。

数据结构

JDK1.7 ConcurrentHashMap 基于数组 + 链表,包含一个 Segment 数组,每个 Segment 中是又是一个数组 + 链表的数据结构(相当于一个 HashMap),数组和链表存储的是一个个 HashEntry 对象

    static final class Segment<K,V> extends ReentrantLock implements Serializable {
        private static final long serialVersionUID = 2249069246763182397L;
        static final int MAX_SCAN_RETRIES =
            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
        transient volatile HashEntry<K,V>[] table;
        transient int count;
        transient int modCount;
        transient int threshold;
        final float loadFactor;
    }

    static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;
    }

罕用办法

应用

源码剖析

次要属性

    // 默认的容量大小,即 HashEntry 中数组的容量之和,初始化时会平均分配到每个 Segment 中的 HashEntry 数组
    static final int DEFAULT_INITIAL_CAPACITY = 16;
    // 默认加载因子
    static final float DEFAULT_LOAD_FACTOR = 0.75f;
    // 默认的并发级别,决定了 Segment 数组的长度
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;
    // 最大容量
    static final int MAXIMUM_CAPACITY = 1 << 30;
    // 每个 Segment 中的 HashEntry 数组最小容量
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
    //Segment 的最大数量 =65536
    static final int MAX_SEGMENTS = 1 << 16;
    // 重试次数
    static final int RETRIES_BEFORE_LOCK = 2;

构造方法

    public ConcurrentHashMap(int initialCapacity, float loadFactor) {this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
    }

    public ConcurrentHashMap(int initialCapacity) {this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }

    public ConcurrentHashMap() {this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }

    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,
                      DEFAULT_INITIAL_CAPACITY),
             DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
        putAll(m);
    }

    @SuppressWarnings("unchecked")
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        //ssize 即为 Segment 数组的长度,默认 concurrencyLevel=16,即 ssize=Segment 数组的长度 =16
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1; // 乘以 2
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        //cap 即每个 Segment 中的 HashEntry 数组的长度,即 cap= 每个 Segment 中的 HashEntry 数组的长度 =2
        while (cap < c)
            cap <<= 1;
        // 将 Segment 数组初始化长度为 16 并且只填充第 0 个元素,默认大小为 2,负载因子 0.75,扩容阀值是 2*0.75=1.5,插入第二个值时才会进行扩容
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

put()办法

 public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        // 1. 依据 key 值,通过 hash()计算出对应的 hash 值
        // 2. 依据 hash 值计算出对应的 segment 数组下标
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            //3. 如果 segment[j]==null,初始化 segment[j]
            s = ensureSegment(j);
               //4. 往 segment[j]增加 key-value
        return s.put(key, hash, value, false);
    }


        final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            //tryLock 尝试加锁,如果加锁胜利,返回 null,否则执行 scanAndLockForPut 尝试自旋加锁
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {

                // 1. 依据 key 值,通过 hash()计算出对应的 hash 值
                // 2. 依据 hash 值计算出对应的 HashEntry 数组下标
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                // 通过遍历以该数组元素为头结点的链表
                for (HashEntry<K,V> e = first;;) {
                    // 若头结点存在,遍历链表,若该 key 已存在,则用新 value 替换旧 value
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    // 若头节点不存在或曾经遍历到了链表尾部
                    else {
                        // 若 node 不为 null,将 node 增加到 HashEntry 数组中, 这里采纳头插法
                        if (node != null)
                            node.setNext(first);
                        // 若 node 为 null,将 node 初始化后增加到 HashEntry 数组中, 这里采纳头插法
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        // 键值对数量 size > 最大容量 threshold
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            // 扩容
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                // 解锁
                unlock();}
            return oldValue;
        }

        // 一直用 tryLock()自旋进行加锁,若达到自旋次数则调用 lock()阻塞获取锁
        private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            int retries = -1; // negative while locating node
            while (!tryLock()) {
                HashEntry<K,V> f; // to recheck first below
                if (retries < 0) {if (e == null) {if (node == null) // speculatively create node
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;
                    }
                    else if (key.equals(e.key))
                        retries = 0;
                    else
                        e = e.next;
                }
                else if (++retries > MAX_SCAN_RETRIES) {lock();
                    break;
                }
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                }
            }
            return node;
        }

rehash()办法

        //HashEntry 数组扩容为原来的两倍。老数组里的数据挪动到新数组时,地位要么不变,要么变为 index+ oldSize,应用头插法插入到新数组
        private void rehash(HashEntry<K,V> node) {HashEntry<K,V>[] oldTable = table;
            int oldCapacity = oldTable.length;
            int newCapacity = oldCapacity << 1;
            threshold = (int)(newCapacity * loadFactor);
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];
            int sizeMask = newCapacity - 1;
            for (int i = 0; i < oldCapacity ; i++) {HashEntry<K,V> e = oldTable[i];
                if (e != null) {
                    HashEntry<K,V> next = e.next;
                    int idx = e.hash & sizeMask;
                    if (next == null)   //  Single node on list
                        newTable[idx] = e;
                    else { // Reuse consecutive sequence at same slot
                        HashEntry<K,V> lastRun = e;
                        int lastIdx = idx;
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                lastIdx = k;
                                lastRun = last;
                            }
                        }
                        newTable[lastIdx] = lastRun;
                        // Clone remaining nodes
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
            int nodeIndex = node.hash & sizeMask; // add the new node
            node.setNext(newTable[nodeIndex]);
            newTable[nodeIndex] = node;
            table = newTable;
        }

get()办法

    // 因为 HashEntry 中的 value 属性是用 volatile 关键词润饰的,保障了内存可见性,所以每次获取时都是最新值。ConcurrentHashMap 的 get 办法是十分高效的,因为整个过程都不须要加锁。public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        // 1. 依据 key 值,通过 hash()计算出对应的 hash 值
        int h = hash(key);
        // 2. 依据 hash 值计算出对应的 segment 数组下标,失去 segment 数组
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            3. 依据 hash 值计算出对应的 HashEntry 数组下标,失去 HashEntry 数组,遍历数组
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                //4. 找到对应的 key,返回 value
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

size()办法

    // 计算两次,如果不变则返回计算结果,若不统一,则锁住所有的 Segment 求和
    public int size() {
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        final Segment<K,V>[] segments = this.segments;
        int size;
        boolean overflow; // true if size overflows 32 bits
        long sum;         // sum of modCounts
        long last = 0L;   // previous sum
        int retries = -1; // first iteration isn't retry
        try {for (;;) {if (retries++ == RETRIES_BEFORE_LOCK) {for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation}
                sum = 0L;
                size = 0;
                overflow = false;
                for (int j = 0; j < segments.length; ++j) {Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null) {
                        sum += seg.modCount;
                        int c = seg.count;
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                if (sum == last)
                    break;
                last = sum;
            }
        } finally {if (retries > RETRIES_BEFORE_LOCK) {for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();}
        }
        return overflow ? Integer.MAX_VALUE : size;
    }

总结

1.JDK1.7 ConcurrentHashMap 基于数组 + 链表,包含一个 Segment 数组,每个 Segment 中是又是一个数组 + 链表的数据结构 (相当于一个 HashMap),数组和链表存储的是一个个 HashEntry 对象
2.Segment 继承于 ReentrantLock,实践上 ConcurrentHashMap 反对 CurrencyLevel(Segment 数组数量) 的线程并发。每当一个线程占用锁拜访一个 Segment 时,不会影响到其余的 Segment。
3. 增加 key-value 时会依据 key 值计算,依据 hash 值计算出对应的 segment 数组下标,对这个 segment 应用 tryLock 尝试加锁,如果加锁失败,执行 scanAndLockForPut 尝试自旋加锁直到胜利;后续流程与 HashMap 雷同。
4. 因为 HashEntry 中的 value 属性是用 volatile 关键词润饰的,保障了内存可见性,所以每次获取时都是最新值。ConcurrentHashMap 的 get 办法是十分高效的,因为整个过程都不须要加锁。

正文完
 0