关于java:Java-ConcurrentHashMap-高并发安全实现原理解析

4次阅读

共计 26917 个字符,预计需要花费 68 分钟才能阅读完成。

本文首发于 vivo 互联网技术 微信公众号
链接:https://mp.weixin.qq.com/s/4sz6sTPvBigR_1g8piFxug
作者:vivo 游戏技术团队

一、概述

ConcurrentHashMap (以下简称 C13Map) 是并发编程出场率最高的数据结构之一,大量的并发 CASE 背地都有 C13Map 的反对,同时也是 JUC 包中代码量最大的组件(6000 多行),自 JDK8 开始 Oracle 对其进行了大量优化工作。

本文从 HashMap 的基础知识开始,尝试逐个剖析 C13Map 中各个组件的实现和安全性保障。

二、HashMap 基础知识 

剖析 C13MAP 前,须要理解以下的 HashMap 常识或者约定:

  • 哈希表的长度永远都是 2 的幂次方,起因是 hashcode%tableSize==hashcode&(tableSize-1),也就是哈希槽位的确定能够用一次与运算来代替取余运算。
  • 会对 hashcode 调用若干次扰动函数,将高 16 位与低 16 位做异或运算,因为高 16 位的随机性更强。
  • 当表中的元素总数超过 tableSize * 0.75 时,哈希表会产生扩容操作,每次扩容的 tableSize 是原先的两倍。
  • 下文提到的槽位(bucket)、哈希分桶、BIN 均示意同一个概念,即哈希 table 上的某一列。
  • 旧表在做搬运时 i 槽位的 node 能够依据其哈希值的第 tableSize 位的 bit 决定在新表上的槽位是 i 还是 i +tableSize。
  • 每个槽位上有可能会呈现哈希抵触,在未达到某个阈值时它是一个链表构造,达到阈值后会降级到红黑树结构。
  • HashMap 自身并非为多线程环境设计,永远不要尝试在并发环境下间接应用 HashMap,C13Map 不存在这个平安问题。

三、C13Map 的字段定义

C13Map 的字段定义

// 最大容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
 
// 默认初始容量
private static final int DEFAULT_CAPACITY = 16;
 
// 数组的最大容量, 避免抛出 OOM
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
 
// 最大并行度,仅用于兼容 JDK1.7 以前版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
 
// 扩容因子
private static final float LOAD_FACTOR = 0.75f;
 
// 链表转红黑树的阈值
static final int TREEIFY_THRESHOLD = 8;
 
// 红黑树进化阈值
static final int UNTREEIFY_THRESHOLD = 6;
 
// 链表转红黑树的最小总量
static final int MIN_TREEIFY_CAPACITY = 64;
 
// 扩容搬运时批量搬运的最小槽位数
private static final int MIN_TRANSFER_STRIDE = 16;
 
 
// 以后待扩容 table 的邮戳位, 通常是高 16 位
private static final int RESIZE_STAMP_BITS = 16;
 
// 同时搬运的线程数自增的最大值
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
 
// 搬运线程数的标识位,通常是低 16 位
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
 
static final int MOVED     = -1; // 阐明是 forwardingNode
static final int TREEBIN   = -2; // 红黑树
static final int RESERVED  = -3; // 原子计算的占位 Node
static final int HASH_BITS = 0x7fffffff; // 保障 hashcode 扰动计算结果为负数
 
// 以后哈希表
transient volatile Node<K,V>[] table;
 
// 下一个哈希表
private transient volatile Node<K,V>[] nextTable;
 
// 计数的基准值
private transient volatile long baseCount;
 
// 控制变量,不同场景有不同用处,参考下文
private transient volatile int sizeCtl;
 
// 并发搬运过程中 CAS 获取区段的下限值
private transient volatile int transferIndex;
 
// 计数 cell 初始化或者扩容时基于此字段应用自旋锁
private transient volatile int cellsBusy;
 
// 减速多核 CPU 计数的 cell 数组
private transient volatile CounterCell[] counterCells;

四、平安操作 Node<K,V> 数组

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {return (Node<K,V>)U.getReferenceAcquire(tab, ((long)i << ASHIFT) + ABASE);
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {return U.compareAndSetReference(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {U.putReferenceRelease(tab, ((long)i << ASHIFT) + ABASE, v);
}

对 Node<K,V>[] 上任意一个 index 的读取和写入都应用了 Unsafe 辅助类,table 自身是 volatile 类型的并不能保障其下的每个元素的内存语义也是 volatile 类型;

须要借助于 Unsafe 来保障 Node<K,V>[]元素的“读 / 写 /CAS”操作在多核并发环境下的原子或者可见性。

五、读操作 get 为什么是线程平安的

首先须要明确的是,C13Map 的读操作个别是不加锁的(TreeBin 的读写锁除外),而读操作与写操作有可能并行;能够保障的是,因为 C13Map 的写操作都要获取 bin 头部的 syncronized 互斥锁,能保障最多只有一个线程在做更新,这其实是一个单线程写、多线程读的并发安全性的问题。

C13Map 的 get 办法

public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // 执行扰动函数
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

1、如果以后哈希表 table 为 null

哈希表未初始化或者正在初始化未实现,间接返回 null;尽管 line5 和 line18 之间其它线程可能经验了千山万水,至多在判断 tab==null 的工夫点 key 必定是不存在的,返回 null 合乎某一时刻的客观事实。

2、如果读取的 bin 头节点为 null

阐明该槽位尚未有节点,间接返回 null。

3、如果读取的 bin 是一个链表

阐明头节点是个一般 Node。

(1)如果正在产生链表向红黑树的 treeify 工作,因为 treeify 自身并不毁坏旧的链表 bin 的构造,只是在全副 treeify 实现后将头节点一次性替换为新创建的 TreeBin,能够释怀读取。

(2)如果正在产生 resize 且以后 bin 正在被 transfer,因为 transfer 自身并不毁坏旧的链表 bin 的构造,只是在全副 transfer 实现后将头节点一次性替换为 ForwardingNode,能够释怀读取。

(3)如果其它线程正在操作链表,在以后线程遍历链表的任意一个工夫点,都有可能同时在产生 add/replace/remove 操作。

  • 如果是 add 操作,因为链表的节点新增从 JDK8 当前都采纳了后入式,无非是多遍历或者少遍历一个 tailNode。
  • 如果是 remove 操作,存在遍历到某个 Node 时,正好有其它线程将其 remove,导致其孤立于整个链表之外;但因为其 next 援用未产生变更,整个链表并没有断开,还是能够照常遍历链表直到 tailNode。
  • 如果是 replace 操作,链表的构造未变,只是某个 Node 的 value 产生了变动,没有平安问题。

论断:对于链表这种线性数据结构,单线程写且插入操作保障是后入式的前提下,并发读取是平安的;不会存在误读、链表断开导致的漏读、读到环状链表等问题。

4、如果读取的 bin 是一个红黑树

阐明头节点是个 TreeBin 节点。

(1)如果正在产生红黑树向链表的 untreeify 操作,因为 untreeify 自身并不毁坏旧的红黑树结构,只是在全副 untreeify 实现后将头节点一次性替换为新创建的一般 Node,能够释怀读取。

(2)如果正在产生 resize 且以后 bin 正在被 transfer,因为 transfer 自身并不毁坏旧的红黑树结构,只是在全副 transfer 实现后将头节点一次性替换为 ForwardingNode,能够释怀读取。

(3)如果其余线程在操作红黑树,在以后线程遍历红黑树的任意一个工夫点,都可能有单个的其它线程产生 add/replace/remove/ 红黑树的翻转等操作,参考上面的红黑树的读写锁实现。

TreeBin 中的读写锁实现

 TreeNode<K,V> root;
    volatile TreeNode<K,V> first;
    volatile Thread waiter;
    volatile int lockState;
    // values for lockState
    static final int WRITER = 1; // set while holding write lock
    static final int WAITER = 2; // set when waiting for write lock
    static final int READER = 4; // increment value for setting read lock
 
    private final void lockRoot() {
        // 如果一次性获取写锁失败,进入 contendedLock 循环体,循环获取写锁或者休眠期待
        if (!U.compareAndSetInt(this, LOCKSTATE, 0, WRITER))
            contendedLock(); // offload to separate method}
 
    private final void unlockRoot() {lockState = 0;}
    // 对红黑树加互斥锁, 也就是写锁
    private final void contendedLock() {
        boolean waiting = false;
        for (int s;;) {
            // 如果 lockState 除了第二位外其它位上都为 0,示意红黑树以后既没有上读锁,又没有上写锁,仅有可能存在 waiter,能够尝试间接获取写锁
            if (((s = lockState) & ~WAITER) == 0) {if (U.compareAndSetInt(this, LOCKSTATE, s, WRITER)) {if (waiting)
                        waiter = null;
                    return;
                }
            }
            // 如果 lockState 第二位是 0, 示意以后没有线程在期待写锁
            else if ((s & WAITER) == 0) {
                // 将 lockState 的第二位设置为 1,相当于打上了 waiter 的标记,示意有线程在期待写锁
                if (U.compareAndSetInt(this, LOCKSTATE, s, s | WAITER)) {
                    waiting = true;
                    waiter = Thread.currentThread();}
            }
            // 休眠以后线程
            else if (waiting)
                LockSupport.park(this);
        }
    }
     
    // 查找红黑树中的某个节点
    final Node<K,V> find(int h, Object k) {if (k != null) {for (Node<K,V> e = first; e != null;) {
                int s; K ek;
                // 如果以后有 waiter 或者有写锁,走线性检索,因为红黑树尽管代替了链表,但其外部仍然保留了链表的构造,尽管链表的查问性能个别,但依据先前的剖析其读取的安全性有保障。// 发现有写锁改走线性检索,是为了防止期待写锁开释花去太久工夫; 而发现有 waiter 改走线性检索,是为了防止读锁叠加的太多,导致写锁线程须要期待太长的工夫; 实质上都是为了缩小读写碰撞
                // 线性遍历的过程中,每遍历到下一个节点都做一次判断,一旦发现锁竞争的可能性缩小就改走 tree 检索以进步性能
                if (((s = lockState) & (WAITER|WRITER)) != 0) {
                    if (e.hash == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                    e = e.next;
                }
                // 对红黑树加共享锁, 也就是读锁,CAS 一次性减少 4,也就是减少的只是 3~32 位
                else if (U.compareAndSetInt(this, LOCKSTATE, s,
                                             s + READER)) {
                    TreeNode<K,V> r, p;
                    try {p = ((r = root) == null ? null :
                             r.findTreeNode(h, k, null));
                    } finally {
                        Thread w;
                        // 开释读锁,如果开释结束且有 waiter, 则将其唤醒
                        if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
                            (READER|WAITER) && (w = waiter) != null)
                            LockSupport.unpark(w);
                    }
                    return p;
                }
            }
        }
        return null;
    }
    // 更新红黑树中的某个节点
    final TreeNode<K,V> putTreeVal(int h, K k, V v) {
        Class<?> kc = null;
        boolean searched = false;
        for (TreeNode<K,V> p = root;;) {
            int dir, ph; K pk;
            //... 省略解决红黑树数据结构的代码若干          
                else {
                    // 写操作前加互斥锁
                    lockRoot();
                    try {root = balanceInsertion(root, x);
                    } finally {
                        // 开释互斥锁
                        unlockRoot();}
                }
                break;
            }
        }
        assert checkInvariants(root);
        return null;
    }
}

红黑树内置了一套读写锁的逻辑,其外部定义了 32 位的 int 型变量 lockState,第 1 位是写锁标记位,第 2 位是写锁期待标记位,从 3~32 位则是共享锁标记位。

读写操作是互斥的,容许多个线程同时读取,但不容许读写操作并行,同一时刻只容许一个线程进行写操作;这样任意工夫点读取的都是一个非法的红黑树,整体上是平安的。

有的同学会产生纳闷,写锁开释时为何没有将 waiter 唤醒的操作呢?是否有可能 A 线程进入了期待区,B 线程获取了写锁,开释写锁时仅做了 lockState= 0 的操作。

那么 A 线程是否就没有机会被唤醒了,只有期待下一个读锁开释时的唤醒了呢?

显然这种状况违反常理,C13Map 不会呈现这样的疏漏,再进一步察看,红黑树的变更操作的外围,也就是在 putValue/replaceNode 那一层,都是对 BIN 的头节点加了 synchornized 互斥锁的,同一时刻只能有一个写线程进入 TreeBin 的办法范畴内,当写线程发现以后 waiter 不为空,其实此 waiter 只能是以后线程本人,能够释怀的获取写锁,不必放心无奈被唤醒的问题。

TreeBin 在 find 读操作检索时,在 linearSearch(线性检索)和 treeSearch(树检索)间做了折衷,前者性能差但并发平安,后者性能佳但要做并发管制,可能导致锁竞争;设计者应用线性检索来尽量避免读写碰撞导致的锁竞争,但评估到 race condition 已隐没时,又立刻趋向于改用树检索来进步性能,在平安和性能之间做到了极佳的均衡。具体的折衷策略请参考 find 办法及正文。

因为有线性检索这样一个抄底计划,以及入口处 bin 头节点的 synchornized 机制,保障了进入到 TreeBin 整体代码块的写线程只有一个;TreeBin 中读写锁的整体设计与 ReentrantReadWriteLock 相比还是简略了不少,比方并未定义用于寄存待唤醒线程的 threadQueue,以及读线程仅会自旋而不会阻塞等等,能够看做是特定条件下 ReadWriteLock 的简化版本。

5、如果读取的 bin 是一个 ForwardingNode

阐明以后 bin 已迁徙,调用其 find 办法到 nextTable 读取数据。

forwardingNode 的 find 办法

static final class ForwardingNode<K,V> extends Node<K,V> {final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {super(MOVED, null, null);
        this.nextTable = tab;
    }
     
    // 递归检索哈希表链
    Node<K,V> find(int h, Object k) {
        // loop to avoid arbitrarily deep recursion on forwarding nodes
        outer: for (Node<K,V>[] tab = nextTable;;) {
            Node<K,V> e; int n;
            if (k == null || tab == null || (n = tab.length) == 0 ||
                (e = tabAt(tab, (n - 1) & h)) == null)
                return null;
            for (;;) {
                int eh; K ek;
                if ((eh = e.hash) == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
                if (eh < 0) {if (e instanceof ForwardingNode) {tab = ((ForwardingNode<K,V>)e).nextTable;
                        continue outer;
                    }
                    else
                        return e.find(h, k);
                }
                if ((e = e.next) == null)
                    return null;
            }
        }
    }
 
 
}

ForwardingNode 中保留了 nextTable 的援用,会转向下一个哈希表进行检索,但并不能保障 nextTable 就肯定是 currentTable,因为在高并发插入的状况下,极短时间内就能够导致哈希表的屡次扩容,内存中极有可能驻留一条哈希表链,彼此以 bin 的头节点上的 ForwardingNode 相连,线程刚读取时拿到的是 table1,遍历时却有可能经验了哈希表的链条。

eh<0 有三种状况:

  • 如果是 ForwardingNode 持续遍历下一个哈希表。
  • 如果是 TreeBin, 调用其 find 办法进入 TreeBin 读写锁的保护区读取数据。
  • 如果是 ReserveNode,阐明以后有 compute 计算中,整条 bin 还是一个空构造,间接返回 null。

6、如果读取的 bin 是一个 ReserveNode

ReserveNode 用于 compute/computeIfAbsent 原子计算的办法,在 BIN 的头节点为 null 且计算尚未实现时,先在 bin 的头节点打上一个 ReserveNode 的占位标记。

读操作发现 ReserveNode 间接返回 null,写操作会因为抢夺 ReserveNode 的互斥锁而进入阻塞态,在 compute 实现后被唤醒后循环重试。

六、写操作 putValue/replaceNode 为什么是线程平安的

典型的编程范式如下:

C13Map 的 putValue 办法

Node<K,V>[] tab = table;  // 将堆中的 table 变量赋给线程堆栈中的局部变量
Node f = tabAt(tab, i);
if(f==null){
 // 以后槽位没有头节点, 间接 CAS 写入
 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
    break;
}else if(f.hash == MOVED){
 // 退出帮助搬运行列
 helpTransfer(tab,f);
}
// 不是 forwardingNode
else if(f.hash != MOVED){
    // 先锁住 I 槽位上的头节点
    synchronized (f) {
    // 再 doubleCheck 看此槽位上的头节点是否还是 f
    if (tabAt(tab, i) == f) {... 各种写操作}
  }
}

1、以后槽位如果头节点为 null 时,间接 CAS 写入

有人兴许会质疑,如果写入时 resize 操作已实现,产生了 table 向 nextTable 的转变,是否会存在写入的是旧表的 bin 导致数据失落的可能 ? 

这种可能性是不存在的,因为一个 table 在 resize 实现后所有的 BIN 都会被打上 ForwardingNode 的标记,能够形象的了解为所有槽位上都插满了红旗,而此处在 CAS 时的 compare 的变量 null,可能保障至多在 CAS 原子操作产生的工夫点 table 并未产生变更。

2、以后槽位如果头节点不为 null

这里采纳了一个小技巧:先锁住 I 槽位上的头节点,进入同步代码块后,再 doubleCheck 看此槽位上的头节点是否有变动。

进入同步块后还须要 doubleCheck 的起因:尽管一开始获取到的头节点 f 并非 ForwardingNode,但在获取到 f 的同步锁之前,可能有其它线程提前获取了 f 的同步锁并实现了 transfer 工作,并将 I 槽位上的头节点标记为 ForwardingNode,此时的 f 就成了一个过期的 bin 的头节点。

然而因为标记操作与 transfer 作为一个整体在同步的代码块中执行,如果 doubleCheck 的后果是此槽位上的头节点还是 f,则表明至多在以后工夫点该槽位还没有被 transfer 到新表(如果以后有 transfer in progress 的话),能够释怀的对该 bin 进行 put/remove/replace 等写操作。

只有未产生 transfer 或者 treeify 操作,链表的新增操作都是采取后入式,头节点一旦确定不会轻易扭转,这种后入式的更新形式保障了锁定头节点就等于锁住了整个 bin。

如果不作 doubleCheck 判断,则有可能以后槽位已被 transfer,写入的还是旧表的 BIN,从而导致写入数据的失落;也有可能在获取到 f 的同步锁之前,其它线程对该 BIN 做了 treeify 操作,并将头节点替换成了 TreeBin, 导致写入的是旧的链表,而非新的红黑树;

3、doubleCheck 是否有 ABA 问题

兴许有人会质疑,如果有其它线程提前对以后 bin 进行了的 remove/put 的操作,引入了新的头节点,并且恰好产生了 JVM 的内存开释和重新分配,导致新的 Node 的援用地址恰好跟旧的雷同,也就是存在所谓的 ABA 问题。

这个能够通过反证法来颠覆,在带有 GC 机制的语言环境下通常不会产生 ABA 问题,因为以后线程蕴含了对头节点 f 的援用,以后线程并未沦亡,不可能存在 f 节点的内存被 GC 回收的可能性。

还有人会质疑,如果在写入过程中主哈希表产生了变动,是否可能写入的是旧表的 bin 导致数据失落,这个也能够通过反证法来颠覆,因为 table 向 nextTable 的转化 (也就是将 resize 后的新哈希表正式 commit) 只有在所有的槽位都曾经 transfer 胜利后才会进行,只有有一个 bin 未 transfer 胜利,则阐明以后的 table 未发生变化,在以后的工夫点能够释怀的向 table 的 bin 内写入数据。

4、如何操作才平安

能够总结出法则,在对 table 的槽位胜利进行了 CAS 操作且 compare 值为 null,或者对槽位的非 forwardingNode 的头节点加锁后,doubleCheck 头节点未发生变化,对 bin 的写操作都是平安的。

七、原子计算相干办法

原子计算次要包含:computeIfAbsent、computeIfPresent、compute、merge 四个办法。

1、几个办法的比拟 

次要区别如下:

(1)computeIfAbsent 只会在判断到 key 不存在时才会插入,判空与插入是一个原子操作,提供的 FunctionalInterface 是一个二元的 Function, 承受 key 参数,返回 value 后果;如果计算结果为 null 则不做插入。

(2)computeIfPresent 只会在判读单到 Key 非空时才会做更新,判断非空与插入是一个原子操作,提供的 FunctionalInterface 是一个三元的 BiFunction, 承受 key,value 两个参数,返回新的 value 后果;如果新的 value 为 null 则删除 key 对应节点。

(3)compute 则不加 key 是否存在的限度,提供的 FunctionalInterface 是一个三元的 BiFunction, 承受 key,value 两个参数,返回新的 value 后果;如果旧的 value 不存在则以 null 代替进行计算;如果新的 value 为 null 则保障 key 对应节点不会存在。

(4)merge 不加 key 是否存在的限度,提供的 FunctionalInterface 是一个三元的 BiFunction, 承受 oldValue, newVALUE 两个参数,返回 merge 后的 value;如果旧的 value 不存在,间接以 newVALUE 作为最终后果,存在则返回 merge 后的后果;如果最终后果为 null,则保障 key 对应节点不会存在。

2、何时会应用 ReserveNode 占位

如果指标 bin 的头节点为 null,须要写入的话有两种伎俩:一种是生成好新的节点 r 后应用 casTabAt(tab, i, null, r)原子操作,因为 compare 的值为 null 能够保障并发的平安;

另外一种形式是创立一个占位的 ReserveNode,锁住该节点并将其 CAS 设置到 bin 的头节点,再进行进一步的原子计算操作;这两种方法都有可能在 CAS 的时候失败,须要自旋重复尝试。

(1)为什么只有 computeIfAbsent/compute 办法应用占位符的形式

computeIfPresent 只有在 BIN 构造非空的状况下才会开展原子计算,天然不存在须要 ReserveNode 占位的状况;锁住已有的头节点即可。

computeIfAbsent/compute 办法在 BIN 构造为空时,须要开展 Function 或者 BiFunction 的运算,这个操作是内部引入的须要耗时多久无奈精确评估;这种状况下如果采纳先计算,再 casTabAt(tab, i, null, r)的形式,如果有其它线程提前更新了这个 BIN,那么就须要从新锁定新退出的头节点,并反复一次原子计算(C13Map 无奈帮你缓存上次计算的后果,因为计算的入参有可能会变动),这个开销是比拟大的。

而应用 ReserveNode 占位的形式无需等到原子计算出后果,能够第一工夫先抢占 BIN 的所有权,使其余并发的写线程阻塞。

(2)merge 办法为何不须要占位

起因是如果 BIN 构造为空时,依据 merge 的解决策略,老的 value 为空则间接应用新的 value 代替,这样就省去了 BiFunction 中新老 value 进行 merge 的计算,这个耗费简直是没有的;因而能够应用 casTabAt(tab, i, null, r)的形式间接批改,防止了应用 ReserveNode 占位,锁定该占位 ReserveNode 后再进行 CAS 批改的两次 CAS 无谓的开销。

C13Map 的 compute 办法

public V compute(K key,
                 BiFunction<? super K, ? super V, ? extends V> remappingFunction) {if (key == null || remappingFunction == null)
        throw new nullPointerException();
    int h = spread(key.hashCode());
    V val = null;
    int delta = 0;
    int binCount = 0;
    for (Node<K, V>[] tab = table; ; ) {
        Node<K, V> f;
        int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & h)) == null) {
            // 创立占位 Node
            Node<K, V> r = new ReservationNode<K, V>();
           // 先锁定该占位 Node
            synchronized (r) {
                // 将其设置到 BIN 的头节点
                if (casTabAt(tab, i, null, r)) {
                    binCount = 1;
                    Node<K, V> node = null;
                    try {
                        // 开始原子计算
                        if ((val = remappingFunction.apply(key, null)) != null) {
                            delta = 1;
                            node = new Node<K, V>(h, key, val, null);
                        }
                    } finally {
                        // 设置计算后的最终节点
                        setTabAt(tab, i, node);
                    }
                }
            }
            if (binCount != 0)
                break;
        } else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {synchronized (f) {if (tabAt(tab, i) == f) {if (fh >= 0) {// 此处省略对一般链表的变更操作} else if (f instanceof TreeBin) {// 此处省略对红黑树的变更操作}
                }
            }
            
        }
    }
    if (delta != 0)
        addCount((long) delta, binCount);
    return val;
}

3、如何保障原子性

computeIfAbsent/computeIfPresent 中判空与计算是原子操作,根据上述剖析次要是通过 casTabAt(tab, i, null, r)原子操作,或者应用 ReserveNode 占位并锁定的形式,或者锁住 bin 的头节点的形式来实现的。

也就是说整个 bin 始终处于锁定状态,在获取到指标 KEY 的 value 是否为空当前,其它线程无奈变更指标 KEY 的值,判空与计算天然是原子的。

而 casTabAt(tab, i, null, r)是由硬件层面的原子指令来保障的,可能保障同一个内存区域在 compare 和 set 操作之间不会有任何其它指令对其进行变更。

八、resize 过程中的并发 transfer

C13Map 中总共有三处中央会触发 transfer 办法的调用,别离是 addCount、tryPresize、helpTransfer 三个函数。

  • addCount用于写操作实现后测验元素数量,如果超过了 sizeCtl 中的阈值,则触发 resize 扩容和旧表向新表的 transfer。
  • tryPresize是 putAll 一次性插入一个汇合前的自检,如果汇合数目较大,则事后触发一次 resize 扩容和旧表向新表的 transfer。
  • helpTransfer是写操作过程中发现 bin 的头节点是 ForwardingNode, 则调用 helpTransfer 退出帮助搬运的行列。

1、开始 transfer 前的查看工作 

以 addCount 中的查看逻辑为例:

addCount 中的 transfer 查看

Node<K, V>[] tab, nt;
int n, sc;
// 以后的 tableSize 曾经超过 sizeCtl 阈值,且小于最大值
while (s >= (long) (sc = sizeCtl) && (tab = table) != null &&
        (n = tab.length) < MAXIMUM_CAPACITY) {int rs = resizeStamp(n);
    // 曾经在搬运中
    if (sc < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                transferIndex <= 0)
            break;
        // 搬运线程数加一
        if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
            transfer(tab, nt);
    } else if (U.compareAndSwapInt(this, SIZECTL, sc,
            (rs << RESIZE_STAMP_SHIFT) + 2))
        // 尚未搬运,以后线程是本次 resize 工作的第一个线程,设置初始值为 2,十分奇妙的设计
        transfer(tab, null);
    s = sumCount();}

多处利用了对变量 sizeCtl 的 CAS 操作,sizeCtl 是一个全局控制变量。

参考下此变量的定义:private transient volatile int sizeCtl;

  • 初始值是 0 示意哈希表尚未初始化
  • 如果是 - 1 示意正在初始化,只容许一个线程进入初始化代码块
  • 初始化或者 reSize 胜利后,sizeCtl=loadFactor * tableSize 也就是触发再次扩容的阈值,是一个正整数
  • 在扩容过程中,sizeCtrl 是一个负整数,其高 16 位是与以后的 tableSize 关联的邮戳 resizeStamp,其低 16 位是以后从事搬运工作的线程数加 1

在办法的循环体中每次都将 table、sizeCtrl、nextTable 赋给局部变量以保障读到的是以后的最新值,且保障逻辑计算过程中变量的稳固。

如果 sizeCtrl 中高 16 位的邮戳与以后 tableSize 不匹配,或者搬运线程数达到了最大值,或者所有搬运的线程都曾经退出(只有在遍历完所有槽位后才会退出,否则会始终循环),或者 nextTable 曾经被清空,跳过搬运操作。

如果满足搬运条件,则对 sizeCtrl 做 CAS 操作,sizeCtrl>= 0 时设置初始线程数为 2,sizeCtrl<0 时将其值加 1,CAS 胜利后开始搬运操作,失败则进入下一次循环从新判断。

首个线程设置初始值为 2 的起因是:线程退出时会通过 CAS 操作将参加搬运的总线程数 -1,如果初始值依照惯例做法设置成 1,那么减 1 后就会变为 0。

此时其它线程发现线程数为 0 时,无奈辨别是没有任何线程做过搬运,还是有线程做完搬运但都退出了,也就无奈判断要不要退出搬运的行列。

值得注意的是,代码中的“sc == rs + 1 || sc == rs + MAX_RESIZERS“是 JDK8 中的显著的 BUG,少了 rs 无符号左移 16 位的操作;JDK12 曾经修复了此问题。

2、并发搬运过程和退出机制  

C13Map 的 transfer 办法

private final void transfer(Node<K, V>[] tab, Node<K, V>[] nextTab) {
    int n = tab.length, stride;
    // 一次搬运多少个槽位
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE;
    if (nextTab == null) {           
        try {
            // 首个搬运线程,负责初始化 nextTable
            Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {     
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        // 初始化以后搬运索引
        transferIndex = n;
    }
    int nextn = nextTab.length;
    // 公共的 forwardingNode
    ForwardingNode<K, V> fwd = new ForwardingNode<K, V>(nextTab);
    boolean advance = true;
    boolean finishing = false; // 保障提交 nextTable 之前已遍历旧表的所有槽位
    for (int i = 0, bound = 0; ;) {
        Node<K, V> f;
        int fh;
        // 循环 CAS 获取下一个搬运区段
        while (advance) {
            int nextIndex, nextBound;
            // 搬运已完结,或者以后区段尚未实现,退出循环体; 最初一次抄底扫描时,仅辅助做 i 减一的运算
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            } else if (U.compareAndSwapInt
                    (this, TRANSFERINDEX, nextIndex,
                            nextBound = (nextIndex > stride ?
                                    nextIndex - stride : 0))) {
                 
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // 并非最初一个退出的线程
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;               
                finishing = advance = true;
                // 异样奇妙的设计,最初一个线程推出前将 i 回退到最高位,等于是强制做最初一次的全表扫描;程序间接执行后续的 else if 代码,看有没有哪个槽位漏掉了,或者说是否全副是 forwardingNode 标记;// 能够视为抄底逻辑,尽管检测到漏掉槽位的概率根本是 0
                i = n;
            }
        } else if ((f = tabAt(tab, i)) == null)
            // 空槽位间接打上 forwardingNode 标记,CAS 失败下一次循环持续搬运该槽位,胜利则进入下一个槽位
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)
            advance = true; // 最初一次抄底遍历时,失常状况下所有的槽位应该都被打上 forwardingNode 标记
        else {
            // 锁定头节点
            synchronized (f) {if (tabAt(tab, i) == f) {
                    Node<K, V> ln, hn;
                    if (fh >= 0) {
                        //...... 此处省略链表搬运代码: 职责是将链表拆成两份, 搬运到 nextTable 的 i 和 i + n 槽位
                        setTabAt(nextTab, i, ln); 
                        setTabAt(nextTab, i + n, hn);
                        // 设置旧表对应槽位的头节点为 forwardingNode
                        setTabAt(tab, i, fwd);
                        advance = true;
                    } else if (f instanceof TreeBin) {
                        //...... 此处省略红黑树搬运代码: 职责是将红黑树拆成两份, 搬运到 nextTable 的 i 和 i + n 槽位,如果满足红黑树的进化条件,顺便将其进化为链表
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        // 设置旧表对应槽位的头节点为 forwardingNode
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

多个线程并发搬运时,如果是首个搬运线程,负责 nextTable 的初始化工作;而后借助于全局的 transferIndex 变量从以后 table 的 n - 1 槽位开始顺次向低位扫描搬运,通过对 transferIndex 的 CAS 操作一次获取一个区段(默认是 16),当 transferIndex 达到最低位时,不再可能获取到新的区段,线程开始退出,退出时会在 sizeCtl 上将总的线程数减一,最初一个退出的线程将扫描坐标 i 回退到最高位,强制做一次抄底的全局扫描。

3、transfer 过程中的读写安全性剖析

(1)首先是 transfer 过程中是否有可能全局的哈希表 table 产生屡次 resize,或者说存在过期的危险?

察看 nextTable 提交到 table 的代码,发现只有在所有线程均搬运结束退出后才会 commit,所以凡是有一个线程在 transfer 代码块中,table 都不可能被替换;所以不存在 table 过期的危险。

(2)有并发的写操作时,是否存在平安危险?

因为 transfer 操作与写操作都要竞争 bin 的头节点的 syncronized 锁,两者是互斥串行的;当写线程失去锁后,还要做 doubleCheck,发现不是一开始的头节点时什么事件都不会做,发现是 forwardingNode,就会退出搬运行列直到新表被提交,而后去间接操作新表。

nextTable 的提交总是在所有的槽位都曾经搬运结束,插上 ForwardingNode 的标识之后的,因而只有新表已提交,旧表必然无奈写入;这样就可能无效的防止数据写入旧表。

推理:获取到 bin 头节点的同步锁开始写操作 ———-> transfer 必然未实现 ———> 新表必然未提交 ——-→写入的必然是以后表。

也就说永远不可能存在新旧两张表同时被写入的状况,table 被写入时 nextTable 永远都只能被读取。

(3)有并发的读操作时,是否存在平安危险?

transfer 操作并不毁坏旧的 bin 构造,如果尚未开始搬运,将会照常遍历旧的 BIN 构造;如果已搬运结束,会调用到 forwadingNode 的 find 办法到新表中递归查问,参考上文中的 forwadingNode 介绍。

九、Traverser 遍历器

因为 iterator 或 containsValue 等通用 API 的存在,以及某些业务场景的确须要遍历整个 Map,设计一种平安且有性能保障的遍历机制显得天经地义。

C13Map 遍历器实现的难点在于读操作与 transfer 可能并行,在扫描各个 bin 时如果遇到 forwadingNode 该如何解决的问题。

因为并发 transfer 机制的存在,在某个槽位上遇到了 forwadingNode,仅表明以后槽位已被搬运,并不能代表其后的槽位肯定被搬运或者尚未被搬运;也就是说其后的若干槽位是一个不可控的状态。

解决办法是引入了相似于办法调用堆栈的机制,在跳转到 nextTable 时记录下以后 table 和曾经到达的槽位并进行入栈操作,而后开始遍历下一个 table 的 i 和 i + n 槽位,如果遇到 forwadingNode 再一次入栈,周而复始周而复始;

每次如果 i + n 槽位如果到了右半段快要溢出的话就会遵循原来的入栈规定进行出栈,也就是回到上一个上下文节点,最终会回到初始的 table 也就是 initialTable 中的节点。

C13Map 的 Traverser 组件

static class Traverser<K,V> {Node<K,V>[] tab;        // current table; updated if resized
    Node<K,V> next;         // the next entry to use
    TableStack<K,V> stack, spare; // to save/restore on ForwardingNodes
    int index;              // index of bin to use next
    int baseIndex;          // current index of initial table
    int baseLimit;          // index bound for initial table
    final int baseSize;     // initial table size
 
    Traverser(Node<K,V>[] tab, int size, int index, int limit) {
        this.tab = tab;
        this.baseSize = size;
        this.baseIndex = this.index = index;
        this.baseLimit = limit;
        this.next = null;
    }
 
    /**
     * 返回下一个节点
     */
    final Node<K,V> advance() {
        Node<K,V> e;
        if ((e = next) != null)
            e = e.next;
        for (;;) {Node<K,V>[] t; int i, n;  // 局部变量保障稳定性
            if (e != null)
                return next = e;
            if (baseIndex >= baseLimit || (t = tab) == null ||
                (n = t.length) <= (i = index) || i < 0)
                return next = null;
            if ((e = tabAt(t, i)) != null && e.hash < 0) {if (e instanceof ForwardingNode) {tab = ((ForwardingNode<K,V>)e).nextTable;
                    e = null;
                    pushState(t, i, n);
                    continue;
                }
                else if (e instanceof TreeBin)
                    e = ((TreeBin<K,V>)e).first;
                else
                    e = null;
            }
            // 以后如果有跳转堆栈间接回放
            if (stack != null)
                recoverState(n);
            // 没有跳转堆栈阐明曾经到 initalTable
            else if ((index = i + baseSize) >= n)
                index = ++baseIndex; // visit upper slots if present
        }
    }
 
    /**
     * 遇到 ForwardingNode 时保留以后上下文
     */
    private void pushState(Node<K,V>[] t, int i, int n) {
        TableStack<K,V> s = spare;  // reuse if possible
        if (s != null)
            spare = s.next;
        else
            s = new TableStack<K,V>();
        s.tab = t;
        s.length = n;
        s.index = i;
        s.next = stack;
        stack = s;
    }
 
    /**
     * 弹出上下文
     *
     */
    private void recoverState(int n) {
        TableStack<K,V> s; int len;
        // 如果以后有堆栈,且 index 曾经达到右半段后溢出以后 table, 阐明该回去了
        // 如果 index 还在左半段,则只辅助做 index+=s.length 操作
        while ((s = stack) != null && (index += (len = s.length)) >= n) {
            n = len;
            index = s.index;
            tab = s.tab;
            s.tab = null;
            TableStack<K,V> next = s.next;
            s.next = spare; // save for reuse
            stack = next;
            spare = s;
        }
        // 曾经到 initialTable, 索引自增
        if (s == null && (index += baseSize) >= n)
            index = ++baseIndex;
    }
}

假如在整个遍历过程中初始表 initalTable=table1,遍历到完结时最大的表为 table5,也就是在遍历过程中经验了四次扩容,属于一边遍历一边扩容的最简单场景;

那么整个遍历过程就是一个以初始化表 initalTable 为基准表,以下一张表的 i 和 i + n 槽位为 forwadingNode 的跳转指标,相似于粒子裂变个别的从最低表向最高表喷射的过程;

traverser 并不能保障肯定遍历某张表的所有的槽位,但如果假如低阶表的某个槽位在最高阶表总是有相应的投影,比方 table1 的一个节点在 table5 中就会对应 16 个投影;

traverser 可能保障一次遍历的所有槽位在最高阶表上的投影,能够布满整张最高阶表,而不会有任何脱漏。

十、并发计数

与 HashMap 中间接定义了 size 字段相似,获取元素的 totalCount 在 C13MAP 中必定不会去遍历残缺的数据结构;那样元素较多时性能会十分差,C13MAP 设计了 CounterCell[]数组来解决并发计数的问题。

CounterCell[]机制并不理睬新旧 table 的更迭,不论是操作的新表还是旧表,对于计数而言没有实质的差别,CounterCell[]只关注总量的减少或缩小。

1、从 LongAdder 到 CounterCell 内存对齐

C13MAP 借鉴了 JUC 中 LongAdder 和 Striped64 的计数机制,有大量代码与 LongAdder 和 Striped64 是反复的,其核心思想是多核环境下对于 64 位 long 型数据的计数操作,尽管借助于 volatile 和 CAS 操作可能保障并发的安全性,然而因为多核操作的是同一内存区域,而每个 CPU 又有本人的本地 cache,例如 LV1 Cache,LVL2 Cache,寄存器等。

因为内存一致性协定 MESI 的存在,会导致本地 Cache 的频繁刷新影响性能,一个比拟好的解决思路是每个 CPU 只操作固定的一块内存对齐区域,最终采纳求和的形式来计数。

这种形式能进步性能,然而并非所有场景都实用,因为其最终的 value 是求和估算进去的,CounterCell 累加求和的过程并非原子,不能代表某个时刻的精准 value,所以像 compareAndSet 这样的原子操作就无奈反对。

2、CounterCell[]、cellBusy、baseCount 的作用 

CounterCell[]中寄存 2 的指数幂个 CounterCell,并发操作期间有可能会扩容,每次扩容都是原有 size 的两倍,一旦超过了 CPU 的核数即不再扩容,因为 CPU 的总数通常也是 2 的指数幂,所以其 size 往往等于 CPU 的核数 CounterCell[]初始化、扩容、填充元素时,借助 cellBusy 其进行 spinLock 管制 baseCount 是根底数据。

在并发量不那么大,CAS 没有呈现失败时间接基于 baseCount 变量做计数;一旦呈现 CAS 失败,阐明有并发抵触,就开始思考 CounterCell[]的初始化或者扩容操作,但在初始化未实现时,还是会将其视为抄底计划进行计数。

所以最终的技术总和 =baseCount+ 所有 CounterCell 中的 value。

C13Map 的 addCount 办法

private final void addCount(long x, int check) {CounterCell[] cs; long b, s;
   // 初始时总是间接对 baseCount 计数,直到呈现第一次失败,或者曾经有现成的 CounterCell[]数组可用
    if ((cs = counterCells) != null ||
        !U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell c; long v; int m;
        // 是否存在竞态,为 true 时示意无竞态
        boolean uncontended = true;
        if (cs == null || (m = cs.length - 1) < 0 ||
            // 学生成随机数再对 CounterCell[]数组 size 求余, 也就是随机调配到其中某个槽位
            (c = cs[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) {
            // 该槽位尚未初始化或者 CAS 操作又呈现竞态
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();}
    // 检测元素总数是否超过 sizeCtl 阈值
    if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT;
            if (sc < 0) {
                if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||
                    (nt = nextTable) == null || transferIndex <= 0)
                    break;
                if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSetInt(this, SIZECTL, sc, rs + 2))
                transfer(tab, null);
            s = sumCount();}
    }
}

其中 ThreadLocalRandom 是线程上下文内的随机数生成器,能够不受其它线程的影响,进步随机数生成的性能;总是在 CAS 失败当前,也就是明确感知到存在多线程的竞争的前提下,才会对 CounterCell[]进行初始化或者扩容操作。

C13Map 的 fullAddCount 办法

// 残缺的计数, 与 LongAdder 的代码根本雷同
private final void fullAddCount(long x, boolean wasUncontended) {
    int h;
    if ((h = ThreadLocalRandom.getProbe()) == 0) {ThreadLocalRandom.localInit();      // force initialization
        h = ThreadLocalRandom.getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // 是否有新的抵触
    for (;;) {CounterCell[] cs; CounterCell c; int n; long v;
        if ((cs = counterCells) != null && (n = cs.length) > 0) {if ((c = cs[(n - 1) & h]) == null) {
                // 随机匹配的槽位尚未有 CounterCell 元素则初始化之
                if (cellsBusy == 0) {            // Try to attach new Cell
                    CounterCell r = new CounterCell(x); // Optimistic create
                    if (cellsBusy == 0 &&
                        U.compareAndSetInt(this, CELLSBUSY, 0, 1)) {
                        boolean created = false;
                        try {               // Recheck under lock
                            CounterCell[] rs; int m, j;
                            if ((rs = counterCells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {rs[j] = r;
                                created = true;
                            }
                        } finally {cellsBusy = 0;}
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)      
                wasUncontended = true;      //fullAddCount 前曾经存在 cas 失败但并不立刻扩容,从新生成一个随机数进行 CAS 重试
            else if (U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))
                break;
            else if (counterCells != cs || n >= NCPU)
                collide = false;            // 超过 CPU 的最大核数,或者检测到 counterCells 已扩容,都将抵触状态置为无
            else if (!collide)
                collide = true;             // 以上的若干条件都不满足,能够断定必然有抵触,再生成一个随机数试探一下
            else if (cellsBusy == 0 &&
                     U.compareAndSetInt(this, CELLSBUSY, 0, 1)) {
                try {if (counterCells == cs)   // 对 counterCells 进行 doubleCheck
                        counterCells = Arrays.copyOf(cs, n << 1);   // 扩容,容量翻倍
                } finally {cellsBusy = 0;}
                collide = false;
                continue;                   // 对性的 counterCell[]进行重试 CAS 操作}
            h = ThreadLocalRandom.advanceProbe(h);   // 以旧的随机数为基数生成一个新的随机数
        }
        else if (cellsBusy == 0 && counterCells == cs &&
                 U.compareAndSetInt(this, CELLSBUSY, 0, 1)) {
            // 第一次初始化工作,初始的数组大小为 2
            boolean init = false;
            try {                           // Initialize table
                if (counterCells == cs) {CounterCell[] rs = new CounterCell[2];
                    rs[h & 1] = new CounterCell(x);
                    counterCells = rs;
                    init = true;
                }
            } finally {cellsBusy = 0;}
            if (init)
                break;
        }
        // 初始化过程中其它线程的抄底计划
        else if (U.compareAndSetLong(this, BASECOUNT, v = baseCount, v + x))
            break;                        
    }
}

循环生成新的随机数匹配到新的槽位进行 CAS 的计数操作,呈现 CAS 失败后并不急于扩容;而是总是在间断呈现 CAS 失败的状况才会尝试扩容。

CounterCell[]的整体计划绝对独立,与 C13Map 的关系并不大,能够视为一种成熟的高性能技术计划在各个场景应用。

十一、与 stream 相似的 bulk 操作反对

1、bulkTask 类的子类

所有的批量工作执行类均为 bulkTask 的子类, bulkTask 内置了与 traverser 相似的实现,用以反对对 C13Map 的遍历;同时它也是 ForkJoinTask 的子类,反对以 fork/join 的形式来实现各种批量工作的执行。

因为 ForkJoinTask 并非本文的重点,这里仅列出几种有代表性的批量办法,以及相应的的 task 实现。

2、几种有代表性的批量办法

C13Map 的批量工作

// 将所有的 entry 依照 transformer 函数进行二元计算,再对所有生成的后果执行 action 一元函数
public <U> void forEach(long parallelismThreshold,
                        BiFunction<? super K, ? super V, ? extends U> transformer,
                        Consumer<? super U> action);
 
 
// 对所有的 entry 执行 searchFunction 二元计算,一旦发现任意一个计算结果不为 null,即全盘返回
public <U> U search(long parallelismThreshold,
                    BiFunction<? super K, ? super V, ? extends U> searchFunction);
 
// 对所有的 entry 执行 transformer 二元计算,再对所有的后果执行 reducer 收敛函数
public <U> U reduce(long parallelismThreshold,
                    BiFunction<? super K, ? super V, ? extends U> transformer,
                    BiFunction<? super U, ? super U, ? extends U> reducer)
 
 
// 对所有的 entry 中的 value 执行 transformer 二元计算,再对所有的后果执行 reducer 收敛函数
public <U> U reduceValues(long parallelismThreshold,
                          Function<? super V, ? extends U> transformer,
                          BiFunction<? super U, ? super U, ? extends U> reducer)

以上所有的批量办法都有惟一与其对应的批量 task 执行类,背地均是基于 fork/join 思维实现。

3、批量 task 的实现

以 2 中列出的 reduce 办法所对应的 MapReduceMappingsTask 为例,无关 fork/join 中的实现细节不属于本文的领域,不做具体探讨。

C13Map 的 MapReduceMappingsTask

static final class MapReduceMappingsTask<K,V,U> extends BulkTask<K,V,U> {
    final BiFunction<? super K, ? super V, ? extends U> transformer;
    final BiFunction<? super U, ? super U, ? extends U> reducer;
    U result;
    MapReduceMappingsTask<K,V,U> rights, nextRight;
    MapReduceMappingsTask
        (BulkTask<K,V,?> p, int b, int i, int f, Node<K,V>[] t,
         MapReduceMappingsTask<K,V,U> nextRight,
         BiFunction<? super K, ? super V, ? extends U> transformer,
         BiFunction<? super U, ? super U, ? extends U> reducer) {super(p, b, i, f, t); this.nextRight = nextRight;
        this.transformer = transformer;
        this.reducer = reducer;
    }
    public final U getRawResult() { return result;}
    public final void compute() {
        final BiFunction<? super K, ? super V, ? extends U> transformer;
        final BiFunction<? super U, ? super U, ? extends U> reducer;
        if ((transformer = this.transformer) != null &&
            (reducer = this.reducer) != null) {
            for (int i = baseIndex, f, h; batch > 0 &&
                     (h = ((f = baseLimit) + i) >>> 1) > i;) {addToPendingCount(1);
                // 裂变出新的 fork-join 工作
                (rights = new MapReduceMappingsTask<K,V,U>
                 (this, batch >>>= 1, baseLimit = h, f, tab,
                  rights, transformer, reducer)).fork();}
            U r = null;
            // 遍历本 batch 元素
            for (Node<K,V> p; (p = advance()) != null; ) {
                U u;
                // 对本 batch 做 reduce 收敛操作
                if ((u = transformer.apply(p.key, p.val)) != null)
                    r = (r == null) ? u : reducer.apply(r, u);
            }
            // 对本人和本人 fork 出的子工作做 reducer 收敛操作
            result = r;
            CountedCompleter<?> c;
            for (c = firstComplete(); c != null; c = c.nextComplete()) {@SuppressWarnings("unchecked")
                MapReduceMappingsTask<K,V,U>
                    t = (MapReduceMappingsTask<K,V,U>)c,
                    s = t.rights;
                while (s != null) {
                    U tr, sr;
                    if ((sr = s.result) != null)
                        t.result = (((tr = t.result) == null) ? sr :
                                    reducer.apply(tr, sr));
                    s = t.rights = s.nextRight;
                }
            }
        }
    }
}

十二、小结

自 JDK8 开始 C13Map 摒弃了 JDK7 中的 Segment 段实现计划,将锁的粒度细化到了每个 bin 上,锁的粒度更小并发能力更强。用 syncronized 关键字代替原先的 ReentrantLock 互斥锁,因 JDK8 中对 syncronized 做了大量优化,能够达到比 ReentrantLock 更优的性能。

引入并发 transfer 的机制反对多线程搬运,写操作和 transfer 操作在不同 bin 上可并行。引入 ForwardingNode 反对读操作和 transfer 并行,并进一步反对 transfer 过程有可能存在的哈希表链的遍历。引入 ReserveNode 在 compute 原子计算可能耗时较长的状况下领先占位,防止反复计算。

引入红黑树来优化哈希抵触时的检索性能,其外部实现了轻量级的读写锁保障读写平安,在线性检索和 tree 检索之间做了智能切换,达到了性能与平安的极佳的均衡。引入 CounterCell 机制优化多核场景的计数,解决内存伪共享问题。

引入 ForkJoinTask 的子类优化 bulk 计算时的性能。整个 C13Map 的实现过程大量应用 volatile 保障可见,应用 CAS 保障原子,是一种部分无锁的 lockFree dataStructure 的榜样实现。

与 HashMap 的单线程读写操作不同的是,HashMap 读到的数据在下一次写操作间是始终稳固的,在多个写操作之间是一个稳固的 snapshot,而 C13Map 因为并发线程的存在,数据瞬息万变,读到的永远只是某个工夫点的正确数据,写入胜利也只是在某个工夫点保障写入是平安的,因而 C13Map 通常只谈平安而不谈实时,这极大进步了编程的难度,也是单线程和并发数据结构之间的显著差别。

正文完
 0