JDK源码那些事儿之并发ConcurrentHashMap上篇

5次阅读

共计 15509 个字符,预计需要花费 39 分钟才能阅读完成。

前面前已经说明了 HashMap 以及红黑树的一些基本知识,对 JDK8 的 HashMap 也有了一定的了解,本篇就开始看看并发包下的 ConcurrentHashMap,说实话,还是比较复杂的,笔者在这里也不会过多深入,源码层次上了解一些主要流程即可,清楚多线程环境下整个 Map 的运作过程就算是很大进步了,更细的底层部分需要时间和精力来研究,暂不深入

前言

jdk 版本:1.8

JDK7 中,ConcurrentHashMap 把内部细分成了若干个小的 HashMap,称之为段(Segment), 默认被分为 16 个段。多线程写操作对每个段进行加锁,段与段之间互不影响。而 JDK8 则抛弃了这种结构,类似 HashMap,多线程下为了保证线程安全,通过 CAS 和 synchronized 进行并发控制,降低锁颗粒度,性能上也就提高许多

同时由于降低锁粒度,同时需要兼顾读操作的正确性,增加了许多内部类来帮助完成并发控制,保证读操作的正确执行,同时支持了并发扩容操作,算是相当复杂了,由于过于复杂,对 ConcurrentHashMap 的说明将分为两章说明,本章就对 ConcurrentHashMap 的常量,变量,内部类和构造方法进行说明,下一章将重点分析其中的重要方法

这里先提前说明下,有个整体印象:

  • ConcurrentHashMap 结构上类似 HashMap,即数组 + 链表 + 红黑树
  • 锁颗粒度降低,复杂度提升
  • 多线程并发扩容
  • 多线程下保证读操作正确性
  • 计数方式处理:分段处理
  • 函数式编程(不是本文重点,自行查阅)

类定义

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable 

继承 AbstractMap,实现了 ConcurrentMap 接口,ConcurrentMap 也可以看看源码,加入了并发操作方法,是一个实现了并发访问的集合接口

常量

有些常量和变量可能不是很好理解,在后边到方法时会尽量详细说明

    /**
     * 最大容量
     */
    private static final int MAXIMUM_CAPACITY = 1 << 30;

    /**
     * 默认初始化容量,同 HashMap, 必须为 2 的倍数
     */
    private static final int DEFAULT_CAPACITY = 16;

    /**
     * 可能达到的最大的数组大小值(非 2 的次幂),2 的 31 次方 -8
     */
    static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    /**
     * 默认并发级别,新版本无用,为了兼容旧版本,使用的地方在序列化方法 writeObject 中
     */
    private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    /**
     * 负载因子,只是为了兼容性
     * 构造方法里指定的负载因子只会影响初始化的 table 容量
     * 一般也不使用浮点数计算
     * ConcurrentHashMap 不会使用这个常量,而使用类似 n -(n >>> 2) 的方式来进行调整大小
     */
    private static final float LOAD_FACTOR = 0.75f;

    /**
     * 树化阈值
     * 同 HashMap
     */
    static final int TREEIFY_THRESHOLD = 8;

    /**
     * 调整大小时树转化为链表的阈值
     */
    static final int UNTREEIFY_THRESHOLD = 6;

    /**
     * 可以转化为红黑树的最小数组容量,即调整为红黑树时数组长度最小值必须为 MIN_TREEIFY_CAPACITY
     * 如果 bin 包含太多节点,则会调整表的大小
     * 该值应至少为 4 * TREEIFY_THRESHOLD,避免扩容和树化阈值之间的冲突。*/
    static final int MIN_TREEIFY_CAPACITY = 64;

    /**
     * 
     * 扩容的每个线程每次最少要迁移 16 个 hash 桶
     * 每个线程都可参与迁移任务,每个线程至少要连续迁移 MIN_TRANSFER_STRIDE 个 hash 桶
     * 帮助扩容提高了效率,当然复杂性也提高了很多,要处理的事情更多
     */
    private static final int MIN_TRANSFER_STRIDE = 16;

    /**
     * 与移位量和最大线程数相关
     * 先了解就好,后边涉及到方法会进行说明
     */
    private static int RESIZE_STAMP_BITS = 16;

    /**
     * 帮助扩容的最大线程数
     */
    private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

    /**
     * 移位量
     */
    private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

    /** 
     * Node hash 值编码,规定了各自的含义
     */
    // forwarding nodes 节点 hash 值
    // 临时节点,扩容时出现,不存储实际数据
    // 如果旧数组的一个 hash 桶中全部的节点都迁移到新数组中,旧数组就在这个 hash 桶中放置一个 ForwardingNode
    // 读操作遇见该节点时,转到新的 table 数组上执行,写操作遇见时,则帮助扩容
    static final int MOVED     = -1; // hash for forwarding nodes
    // TREEBIN 节点
    // TreeBin 是 ConcurrentHashMap 中用于代理操作 TreeNode 的特殊节点
    // 保存实际的红黑树根节点,在红黑树插入节点时会对读操作造成影响,该对象维护了一个读写锁来保证多线程的正确性
    static final int TREEBIN   = -2; // hash for roots of trees
    // ReservationNode 节点 hash 值
    // 保留节点,JDK8 的新特性会用到,这里不过多说明
    static final int RESERVED  = -3; // hash for transient reservations
    
    // 负数转正数,定位 hash 桶时用到了,负数 Hash 值有特殊含义,具体看后边
    static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

    // CPU 数量,限制边界,计算一个线程需要做多少迁移量
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    // 序列化兼容性
    private static final ObjectStreamField[] serialPersistentFields = {new ObjectStreamField("segments", Segment[].class),
        new ObjectStreamField("segmentMask", Integer.TYPE),
        new ObjectStreamField("segmentShift", Integer.TYPE)
    };

变量

    /**
     * The array of bins. Lazily initialized upon first insertion.
     * Size is always a power of two. Accessed directly by iterators.
     * 
     * volatile 保证可见性
     * Node 类和 HashMap 中的类似,val 和 next 属性通过 volatile 保证可见性
     */
    transient volatile Node<K,V>[] table;

    /**
     * The next table to use; non-null only while resizing.
     *
     * 扩容时使用的数组,只在扩容时非空,扩容时会创建
     */
    private transient volatile Node<K,V>[] nextTable;

    /**
     * Table initialization and resizing control.  When negative, the
     * table is being initialized or resized: -1 for initialization,
     * else -(1 + the number of active resizing threads).  Otherwise,
     * when table is null, holds the initial table size to use upon
     * creation, or 0 for default. After initialization, holds the
     * next element count value upon which to resize the table.
     *
     * sizeCtl = -1,表示有线程在进行初始化操作
     * sizeCtl < 0 且不为 - 1 表示有多个线程正在进行扩容操作,jdk 源码解释部分感觉有点问题
     * 每次第一个线程参与扩容时,会将 sizeCtl 设置为一个与当前 table 长度相关的数值,避免出现问题,讲解方法时进行说明
     * sizeCtl > 0,表示第一次初始化操作中使用的容量,或者初始化 / 扩容完成后的阈值
     * sizeCtl = 0,默认值,此时在真正的初始化操作中使用默认容量
     */
    private transient volatile int sizeCtl;

    /**
     * 多线程帮助扩容相关
     * 下一个 transfer 任务的起始下标 index + 1 的值
     * transfer 时下标 index 从 length - 1 到 0 递减
     * 扩容 index 从后往前和迭代从前往后为了避免冲突
     */
    private transient volatile int transferIndex;

    /**
     * Base counter value, used mainly when there is no contention,
     * but also as a fallback during table initialization
     * races. Updated via CAS.
     *
     * 计数器基础值,记录元素个数,通过 CAS 操作进行更新
     */
    private transient volatile long baseCount;

    /**
     * Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
     *
     * CAS 自旋锁标志位,counterCells 扩容或初始化时使用
     */
    private transient volatile int cellsBusy;

    /**
     * Table of counter cells. When non-null, size is a power of 2.
     *
     * 高并发下计数数组,counterCells 数组非空时大小是 2 的 n 次幂
     */
    private transient volatile CounterCell[] counterCells;

    // views
    private transient KeySetView<K,V> keySet;
    private transient ValuesView<K,V> values;
    private transient EntrySetView<K,V> entrySet;

同时需要注意静态代码块中已经获取了一些变量在对象中的内存偏移量,这个是为了方便我们在 CAS 中的使用,如果不明白 CAS,请自行查阅资料学习,源码如下:

    // Unsafe mechanics
    private static final sun.misc.Unsafe U;
    private static final long SIZECTL;
    private static final long TRANSFERINDEX;
    private static final long BASECOUNT;
    private static final long CELLSBUSY;
    private static final long CELLVALUE;
    private static final long ABASE;
    private static final int ASHIFT;

    static {
        try {U = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ConcurrentHashMap.class;
            // 获取 sizeCtl 在对象中的内存偏移量,下同
            SIZECTL = U.objectFieldOffset
                (k.getDeclaredField("sizeCtl"));
            TRANSFERINDEX = U.objectFieldOffset
                (k.getDeclaredField("transferIndex"));
            BASECOUNT = U.objectFieldOffset
                (k.getDeclaredField("baseCount"));
            CELLSBUSY = U.objectFieldOffset
                (k.getDeclaredField("cellsBusy"));
            Class<?> ck = CounterCell.class;
            CELLVALUE = U.objectFieldOffset
                (ck.getDeclaredField("value"));
            Class<?> ak = Node[].class;
            ABASE = U.arrayBaseOffset(ak);
            int scale = U.arrayIndexScale(ak);
            if ((scale & (scale - 1)) != 0)
                throw new Error("data type scale not a power of two");
            ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
        } catch (Exception e) {throw new Error(e);
        }
    }

内部类

ConcurrentHashMap 中增加了许多内部类来帮助完成并发下的操作

Node

普通 Entry,与 HashMap.Node 类似,不同主要在于 val 和 next 属性设置为了 volatile,同时不支持 setValue 方法,直接抛错,增加了 find 方法

TreeNode

继承自 Node 节点,与 HashMap.TreeNode 类似,之前文章专门介绍过这个内部类,可以去看看,同时需要说明的是,在 ConcurrentHashMap 中并不是直接操作 TreeNode 节点,而是通过 TreeBin 来代理操作的

TreeBin

代理操作 TreeNode 节点,保存红黑树的根节点,Hash 值固定为 -2,构造方法里传入对应的节点创建一棵红黑树。故在数组中保存的 hash 桶为红黑树结构时,在数组上的节点类型为 TreeBin,该节点类自带读写锁

    static final class TreeBin<K,V> extends Node<K,V> {
        // 保存树的根节点
        TreeNode<K,V> root;
        // 链表头节点
        volatile TreeNode<K,V> first;
        // 最近一次 waiter 状态的线程
        volatile Thread waiter;
        // 锁状态
        volatile int lockState;
        // values for lockState
        // 写锁,二进制 001
        static final int WRITER = 1; // set while holding write lock
        // 等待获取写锁的状态,二进制 010
        static final int WAITER = 2; // set when waiting for write lock
        // 读锁状态,二进制 100
        static final int READER = 4; // increment value for setting read lock

        /**
         * 红黑树结构重构时需要获取写锁
         */
        private final void lockRoot() {
            // CAS 尝试获取写锁
            if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER))
                // 当线程获取到写锁时,才会返回
                contendedLock(); // offload to separate method}

        /**
         * 释放写锁
         */
        private final void unlockRoot() {lockState = 0;}

        /**
         * 写线程不断尝试获取写锁,出现的几种情况的处理,这里不会有写线程和写线程竞争的情况出现
         * 在写线程进入这个方法时,这个红黑树对应的桶已经被外部锁住,不会让写线程进入,故不需要考虑写写竞争的情况
         * 故当前线程为写线程,则判断读线程即可
         */
        private final void contendedLock() {
            boolean waiting = false;
            for (int s;;) {
                // 取反与操作,其他线程未持有读锁
                if (((s = lockState) & ~WAITER) == 0) {
                    // CAS 尝试修改状态为写锁状态
                    if (U.compareAndSwapInt(this, LOCKSTATE, s, WRITER)) {
                        // 当前线程曾经处于 waiting 状态,将其清除
                        if (waiting)
                            waiter = null;
                        return;
                    }
                }
                // 有读线程
                else if ((s & WAITER) == 0) {
                    // 当前线程置为 wait 状态,waiter 记录线程
                    if (U.compareAndSwapInt(this, LOCKSTATE, s, s | WAITER)) {
                        waiting = true;
                        waiter = Thread.currentThread();}
                }
                // 当前线程处于 waiting 状态
                else if (waiting)
                    // 阻塞线程
                    LockSupport.park(this);
            }
        }

        /**
         * 从根节点开始进行遍历,查找到对应匹配的节点
         * 同时需要注意多线程环境下如果有写锁则通过链表结构(不是用红黑树结构查询)进行查询
         */
        final Node<K,V> find(int h, Object k) {if (k != null) {
                // 从 first 开始
                for (Node<K,V> e = first; e != null;) {
                    int s; K ek;
                    // 1. 有写线程操作,通过链表查询不进行阻塞
                    // 2.WAITER 状态
                    if (((s = lockState) & (WAITER|WRITER)) != 0) {
                        if (e.hash == h &&
                            ((ek = e.key) == k || (ek != null && k.equals(ek))))
                            return e;
                        e = e.next;
                    }
                    // 读线程累加
                    else if (U.compareAndSwapInt(this, LOCKSTATE, s,
                                                 s + READER)) {
                        TreeNode<K,V> r, p;
                        try {p = ((r = root) == null ? null :
                                 r.findTreeNode(h, k, null));
                        } finally {
                            Thread w;
                            // 当线程为最后一个读线程并且有写锁被阻塞,此时应该通知阻塞的写线程重新尝试获得写锁
                            // getAndAddInt 更新之后返回旧值
                            if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
                                (READER|WAITER) && (w = waiter) != null)
                                // 唤醒阻塞的线程
                                LockSupport.unpark(w);
                        }
                        return p;
                    }
                }
            }
            return null;
        }

        /**
         * 大部分与 HashMap.TreeNode 中的 putTreeVal 操作类似
         * 这里只说下不同的部分
         * 多线程环境下主要是在平衡时加锁操作,防止读线程操作时树结构在变化
         */
        final TreeNode<K,V> putTreeVal(int h, K k, V v) {
                ......
                TreeNode<K,V> xp = p;
                if ((p = (dir <= 0) ? p.left : p.right) == null) {
                    TreeNode<K,V> x, f = first;
                    first = x = new TreeNode<K,V>(h, k, v, f, xp);
                    if (f != null)
                        f.prev = x;
                    if (dir <= 0)
                        xp.left = x;
                    else
                        xp.right = x;
                    // 新添加节点父节点为黑色,添加节点置为红色即可保证平衡
                    if (!xp.red)
                        x.red = true;
                    else {
                        // 在进行插入平衡操作时会对读线程造成影响,需要加锁,让读线程以链表方式进行查询操作
                        lockRoot();
                        try {root = balanceInsertion(root, x);
                        } finally {unlockRoot();
                        }
                    }
                    break;
                }
            }
            assert checkInvariants(root);
            return null;
        }

        /**
         * 删除时修改链表关系和删除平衡操作时需要添加锁,防止读线程读取错误
         * 代码操作参考 HashMap.TreeNode,这里不再详细说明了
         */
        final boolean removeTreeNode(TreeNode<K,V> p) {
            ......
            lockRoot();
            try {
                TreeNode<K,V> replacement;
                TreeNode<K,V> pl = p.left;
                TreeNode<K,V> pr = p.right;
                if (pl != null && pr != null) {
                    TreeNode<K,V> s = pr, sl;
                    while ((sl = s.left) != null) // find successor
                        s = sl;
                    boolean c = s.red; s.red = p.red; p.red = c; // swap colors
                    TreeNode<K,V> sr = s.right;
                    TreeNode<K,V> pp = p.parent;
                    if (s == pr) { // p was s's direct parent
                        p.parent = s;
                        s.right = p;
                    }
                    else {
                        TreeNode<K,V> sp = s.parent;
                        if ((p.parent = sp) != null) {if (s == sp.left)
                                sp.left = p;
                            else
                                sp.right = p;
                        }
                        if ((s.right = pr) != null)
                            pr.parent = s;
                    }
                    p.left = null;
                    if ((p.right = sr) != null)
                        sr.parent = p;
                    if ((s.left = pl) != null)
                        pl.parent = s;
                    if ((s.parent = pp) == null)
                        r = s;
                    else if (p == pp.left)
                        pp.left = s;
                    else
                        pp.right = s;
                    if (sr != null)
                        replacement = sr;
                    else
                        replacement = p;
                }
                else if (pl != null)
                    replacement = pl;
                else if (pr != null)
                    replacement = pr;
                else
                    replacement = p;
                if (replacement != p) {
                    TreeNode<K,V> pp = replacement.parent = p.parent;
                    if (pp == null)
                        r = replacement;
                    else if (p == pp.left)
                        pp.left = replacement;
                    else
                        pp.right = replacement;
                    p.left = p.right = p.parent = null;
                }

                root = (p.red) ? r : balanceDeletion(r, replacement);

                if (p == replacement) {  // detach pointers
                    TreeNode<K,V> pp;
                    if ((pp = p.parent) != null) {if (p == pp.left)
                            pp.left = null;
                        else if (p == pp.right)
                            pp.right = null;
                        p.parent = null;
                    }
                }
            } finally {unlockRoot();
            }
            assert checkInvariants(root);
            return false;
        }

Traverser

首先需要明白的是在并发下遍历读操作和扩容操作同时进行时,读操作有可能不会正确的进行,而为了解决这个问题,引入了 Traverser 这个内部类来完成并发操作下的读操作。在涉及到读取所有 hash 桶时,比如 containsValue 操作,不能使用 HashMap 的方式进行,一个一个 hash 桶遍历进行读取,此时扩容时,ConcurrentHashMap 会在转移完一个 hash 桶后将一个 ForwardingNode 节点填入,如果还是非并发遍历,导致这个 hash 桶将不会被遍历到,正确的处理方式也就是遇到 ForwardingNode 节点时将当前数组信息和索引保存,然后在 ForwardingNode 节点的 nextTable 数组(也就是扩容后的数组)上进行遍历。这里还使用了栈结构 TableStack,是为了处理多个 ForwardingNode 节点遍历的情况,同时需要注意的是,参考 HashMap,在 nextTable 上需要遍历旧的 hash 桶和新的对应的扩容 hash 桶(index+size)

    static class Traverser<K,V> {
        // 当前 table
        Node<K,V>[] tab;        // current table; updated if resized
        // 下一个 entry
        Node<K,V> next;         // the next entry to use
        // 保存 / 恢复 ForwardingNodes
        TableStack<K,V> stack, spare; // to save/restore on ForwardingNodes
        // 下一个遍历的 hash 桶 index
        int index;              // index of bin to use next
        // 开始 index
        int baseIndex;          // current index of initial table
        // 结尾 index
        int baseLimit;          // index bound for initial table
        // 数组长度
        final int baseSize;     // initial table size

        Traverser(Node<K,V>[] tab, int size, int index, int limit) {
            this.tab = tab;
            this.baseSize = size;
            this.baseIndex = this.index = index;
            this.baseLimit = limit;
            this.next = null;
        }

        /**
         * 遍历到下一个有数据的节点并返回,无则返回 null
         */
        final Node<K,V> advance() {
            Node<K,V> e;
            // next 值不为空,则直接获取其下一个节点
            if ((e = next) != null)
                e = e.next;
            for (;;) {Node<K,V>[] t; int i, n;  // must use locals in checks
                // e 非空则直接返回当前值 e 并将 next 赋值为 e 便于下次再次调用该方法使用
                if (e != null)
                    return next = e;
                // 判断边界同时做了一些赋值操作
                // 数组 t = tab
                // 数组长度 n = t.length
                // 数组 index i = index
                // 不满足直接返回 null
                if (baseIndex >= baseLimit || (t = tab) == null ||
                    (n = t.length) <= (i = index) || i < 0)
                    return next = null;
                // e 赋值为 t 的第 i 处 hash 桶节点值,判断是否为特殊节点
                if ((e = tabAt(t, i)) != null && e.hash < 0) {
                    // ForwardingNode 节点说明正在扩容,此时遍历读需要转移到扩容数组上进行
                    if (e instanceof ForwardingNode) {
                        // tab 指向扩容数组
                        tab = ((ForwardingNode<K,V>)e).nextTable;
                        // e 置空,继续循环
                        e = null;
                        // 保存此时的未扩容前的数组状态,相当于入栈操作
                        pushState(t, i, n);
                        continue;
                    }
                    // TreeBin 节点,e 置为 first 节点
                    else if (e instanceof TreeBin)
                        e = ((TreeBin<K,V>)e).first;
                    else
                        // 保留节点
                        e = null;
                }
                if (stack != null)
                    // 出栈操作,恢复 tab 到未扩容之前的数组继续遍历
                    recoverState(n);
                else if ((index = i + baseSize) >= n)
                    // 更新为下一个 hash 桶索引
                    index = ++baseIndex; // visit upper slots if present
            }
        }

        /**
         * 入栈操作,保存当前数组的状态
         */
        private void pushState(Node<K,V>[] t, int i, int n) {
            TableStack<K,V> s = spare;  // reuse if possible
            if (s != null)
                spare = s.next;
            else
                s = new TableStack<K,V>();
            s.tab = t;
            s.length = n;
            s.index = i;
            s.next = stack;
            stack = s;
        }

        /**
         * 出栈操作,恢复扩容之前的数组状态
         */
        private void recoverState(int n) {
            TableStack<K,V> s; int len;
            while ((s = stack) != null && (index += (len = s.length)) >= n) {
                n = len;
                index = s.index;
                tab = s.tab;
                s.tab = null;
                TableStack<K,V> next = s.next;
                s.next = spare; // save for reuse
                stack = next;
                spare = s;
            }
            if (s == null && (index += baseSize) >= n)
                index = ++baseIndex;
        }
    }

遍历过程可参考下图(图中数据非真实数据):

ForwardingNode

ForwardingNode 节点在扩容时被使用,hash 值固定为 -1,当进行扩容操作时,转移 hash 桶时会在原数组位置上放置一个 ForwardingNode 节点,表示当前位置 hash 桶已转移到新扩容数组上,当读线程读到 ForwardingNode 节点时则需要到新扩容数组对应的位置进行读操作,如果是写线程,则需要帮助进行扩容操作

    /**
     * A node inserted at head of bins during transfer operations.
     */
    static final class ForwardingNode<K,V> extends Node<K,V> {final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            // MOVED hash 值固定为 -1
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
        
        // 提供 find 方法查找,这里通过 nextTable 进行遍历
        Node<K,V> find(int h, Object k) {
            // loop to avoid arbitrarily deep recursion on forwarding nodes
            outer: for (Node<K,V>[] tab = nextTable;;) {
                Node<K,V> e; int n;
                // 前置判断
                if (k == null || tab == null || (n = tab.length) == 0 ||
                    (e = tabAt(tab, (n - 1) & h)) == null)
                    return null;
                for (;;) {
                    int eh; K ek;
                    // 判断是 hash 桶第一个节点,返回这个节点
                    if ((eh = e.hash) == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                    if (eh < 0) {
                        // 递归
                        if (e instanceof ForwardingNode) {tab = ((ForwardingNode<K,V>)e).nextTable;
                            continue outer;
                        }
                        else
                            // 特殊节点调用其自身方法
                            return e.find(h, k);
                    }
                    // 找到最后返回空
                    if ((e = e.next) == null)
                        return null;
                }
            }
        }
    }

ReservationNode

保留节点,源码仅 compute,computeIfAbsent 中使用,通过 ReservationNode 对 hash 桶进行加锁处理,写操作时正常情况下需要对第一个节点进行加锁处理,put 操作使用 CAS 不需要加锁,而在 compute,computeIfAbsent 方法中比较复杂,需加锁处理,在 hash 桶节点为空时,添加一个 ReservationNode 节点,同时对其加锁处理

    static final class ReservationNode<K,V> extends Node<K,V> {ReservationNode() {super(RESERVED, null, null, null);
        }

        Node<K,V> find(int h, Object k) {return null;}
    }

构造方法

在有参构造函数中,传入初始容量参数,调用 tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)),这里和 HashMap 就不一样了,可自行查阅源码,ConcurrentHashMap 先扩大了 1.5 倍再调用 tableSizeFor 方法,可以回想下 tableSizeFor 方法的含义,和 HashMap 是相同的处理方式,同时构造方法也不是 table 进行初始化的地方,和 HashMap 类似,都是在插入 K - V 值时进行数组的初始化,构造方法仅仅是来设置属性值的,重要的在于 sizeCtl,此时是用来记录数组初始化使用容量的,在 initTable 方法中使用

    public ConcurrentHashMap() {}
    public ConcurrentHashMap(int initialCapacity) {if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        // 设置 sizeCtl,此时表示数组初始化使用容量,因为在初始化时将会使用这个变量
        this.sizeCtl = cap;
    }
    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        // sizeCtl 设置为默认容量,此时表示数组初始化使用容量
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }
    public ConcurrentHashMap(int initialCapacity, float loadFactor) {this(initialCapacity, loadFactor, 1);
    }
    // concurrencyLevel 并发级别 兼容旧版本
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        // 同样进行设置 sizeCtl,此时表示数组初始化使用容量
        this.sizeCtl = cap;
    }
    

initTable

构造方法并没有创建 Node 数组,最终执行是在 put 方法中的 initTable 方法,这个方法中也能看出 sizeCtl 在不同值下代表了不同的含义

    private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            // 在初始化时不能并发执行,在已经有线程进入初始化状态时,非初始化线程需让步操作,等待完成
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            // CAS 将 sizeCtl 置为 -1
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    // 检查是否已经初始化,否则进入初始化过程
                    if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        // sc = 3n/4,相当于 0.75n   
                        sc = n - (n >>> 2);
                    }
                } finally {
                    // sizeCtl 代表了阈值,类似 threshold
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

总结

本文从整体上对 ConcurrentHashMap 进行了介绍,其中常量和变量以及其中重要的内部类都进行了相关的解释说明,当然,限于篇幅以及复杂度暂未涉及方法的源码部分,下一篇将结合本文的相关知识重点说明其中涉及的方法,在看本文时请完全理解 HashMap 的源码,将会事半功倍,毕竟结构上和 HashMap 非常相似

ConcurrentHashMap 从 HashMap 的基础上进行演变,适应多线程并发情况下的操作,理解下列几点:

  • 初始化数组,单线程创建
  • 扩容时,多线程可帮助扩容(下一章将详细说明)
  • hash 桶头节点有 4 种类型,ForwardingNode 节点表明当前 hash 桶正在扩容移动中,TreeBin 节点表明为红黑树结构,ReservationNode 节点暂不多说,还有就是正常链表节点
  • 遍历操作通过一个特殊的内部类 Traverser 实现
  • CAS 操作大大增加了并发执行的效率

水平有限,如有错误,请及时指出,谢谢!

正文完
 0