关于java:JUCConcurrentHashMap原理分析下

2次阅读

共计 6563 个字符,预计需要花费 17 分钟才能阅读完成。

接上篇,持续剖析桶为链表状况下的数据迁徙。

扩容迁徙

首先 tab 的 size 始终是 2 的 n 次幂,转换成二进制就是100..00 的模式
而落点桶的计算公式为:plot = (n-1)&hash -> 11..11&hash

那么 hash 的第 n 个地位(runBit=hash&n)是 0 还是 1,决定了迁徙后的落点桶:

  • 如果 runBit=0,那么新 tab 的落点为plot
  • 如果 runBit=1,那么新 tab 的落点为plot+n

基于上述剖析,迁徙时会对桶中的链条从新组装(当然会先对头节点对象做 syn 加锁)

  1. runBit= 0 的组成 低位链条 ln,runBit= 1 的组成 高位链条 hn;并最终放入新 table 中(上图红色局部)
  2. 旧 tab 的迁徙桶会指向 Forwarding Node 节点(fwd,上图紫色局部)
  3. 随着 syn 锁开释,旧链 node1->node2->node3 会被 GC 回收(上图绿色局部)

给出 transfer() 办法 的源码:

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // ## 迁徙分片,最小 MIN_TRANSFER_STRIDE(16)if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    // 创立新 tab
    if (nextTab == null) {            // initiating
        try {@SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    
    // transferIndex- 传输地位,初始化为旧数组大小
    // stride- 迁徙分片
    // nextn - 新数组的大小
    int nextn = nextTab.length;
    
    // A node inserted at head of bins during transfer operations.
    // 迁徙过程中的非凡标记,在迁徙桶的头节点;它的 hash 值固定为 MOVED=-1
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    // 迁徙正在执行的标记
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    
    // i- 正在迁徙桶的索引;boud- 线程本次迁徙范畴的最小索引
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        
        // ****** A、参数设置阶段 ******
        while (advance) {
            int nextIndex, nextBound;
            // -- 2. 迁徙后续桶时 `--i`,证实迁徙方向:从右往左
            if (--i >= bound || finishing)
                advance = false;
            // $$ 下一个迁徙地位 nextIndex,i<bound 时会执行(将 i =-1,设定了一个区间完结标记)else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            
            // -- 1. 线程迁徙它的第一个桶时赋值:// 如果旧 tab 的 length 是 32
            // 第一个线程:transferIndex=nextBound=16=bound、nextIndex=32(迁徙范畴,16-31)// 第二个线程:transferIndex=nextBound=0=bound、nextIndex=16(迁徙范畴,0-15)else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        // ****** A、参数设置阶段 ******
        
        // ****** B、完结断定阶段 ******
        // $$ 满足 i =- 1 区间完结标记
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // tab 替换,新加载因子设置
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            // 完结条件
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        // ****** B、完结断定阶段 ******
        
        // ****** C、迁徙阶段 ******
        // 迁徙桶为空,cas 形式搁置 fwd 标记节点
        // f- 正在迁徙的节点
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // 此桶的头节点是 fw,示意正在迁徙
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            // ### 对正在迁徙的节点 f 加锁
            synchronized (f) {if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    // -- 链状况
                    if (fh >= 0) {
                        // runBit 取决于 fh 的第 n 地位,要么是 0,要么是 1
                        // 0 放在低位链 ln,1 放在高位链 hn
                        int runBit = fh & n;
                        // 高位或位置链的最初一个节点
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            // f 的下一个节点 p 的 hash 值的 n 地位。同样的,要么是 0,要么是 1
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        
                        // 高下位拆分双链
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        // 头插法组装链条
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    
                    // -- 树状况
                    else if (f instanceof TreeBin) {// ...}
                }
            }
        }
        // ****** C、迁徙阶段 ******
    }
}

元素计数

执行 put 办法后,会对元素个数进行统计

统计形式如图:

  1. 先尝试 简略模式,以 cas 形式对 baseCount 进行批改,批改胜利则完结
  2. 简略模式失败(比方竞争较多的状况),会尝试 简单模式,创立 counterCells 数组,并以此进行统计

统计形式依然通过 cas 的形式批改随机一个数组元素的 value 值,最终以 sum 所有数组元素 value 的形式计算出以后 map 的元素个数。
实现层面,会通过 cellsBusy 变量来管制对 counterCells 数组的操作(如 counterCells 扩容、桶初始化等)。

给出 fullAddCount() 办法 的源码:

private final void fullAddCount(long x, boolean wasUncontended) {
    // ThreadLocalRandom 产生的随机数(多线程条件下的随机工具)int h;
    // ThreadLocalRandom,须要初始化
    if ((h = ThreadLocalRandom.getProbe()) == 0) {ThreadLocalRandom.localInit();      // force initialization
        h = ThreadLocalRandom.getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {CounterCell[] as; CounterCell a; int n; long v;
        // == 一、counterCells 不为空的
        if ((as = counterCells) != null && (n = as.length) > 0) {
            // ## 1. 随机抉择的桶没有元素,须要新建
            if ((a = as[(n - 1) & h]) == null) {
                // -- cellsBusy- 标识位,用于初始化或扩容
                if (cellsBusy == 0) {            // Try to attach new Cell
                    CounterCell r = new CounterCell(x); // Optimistic create
                    if (cellsBusy == 0 &&
                        // cas 批改标识位胜利的,作桶位的初始化操作
                        U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                        boolean created = false;
                        try {               // Recheck under lock
                            CounterCell[] rs; int m, j;
                            if ((rs = counterCells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {rs[j] = r;
                                created = true;
                            }
                        } 
                        // -- 创立后,重置标识位
                        finally {cellsBusy = 0;}
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            // ## 2. 随机抉择的桶有元素(a-CounterCell 对象),间接对 a 的 value 值做 cas 累加操作
            else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                break;
            else if (counterCells != as || n >= NCPU)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            // ## 3. 步骤 2 失败,做 counterCells 的 2 倍扩容
            else if (cellsBusy == 0 &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                try {if (counterCells == as) {// Expand table unless stale
                        CounterCell[] rs = new CounterCell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        counterCells = rs;
                    }
                } finally {cellsBusy = 0;}
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = ThreadLocalRandom.advanceProbe(h);
        }
        // == 二、counterCells 为空,初始化操作
        else if (cellsBusy == 0 && counterCells == as &&
                 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
            boolean init = false;
            try {                           // Initialize table
                if (counterCells == as) {CounterCell[] rs = new CounterCell[2];
                    rs[h & 1] = new CounterCell(x);
                    counterCells = rs;
                    init = true;
                }
            } finally {cellsBusy = 0;}
            if (init)
                break;
        }
        // == 三、兜底计划,仍应用 baseCount
        else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
            break;                          // Fall back on using base
    }
}

get 办法

public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    // tab 存在,且依据 hash 计算出落点桶有元素
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        // -- 比拟后 key 相等,间接返回 val
        if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        // -- 树或迁徙中(fwd)
        else if (eh < 0){return (p = e.find(h, key)) != null ? p.val : null;
        }
        
        // -- 链:遍历链,找到就返回;找不到返回 null
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

次要关注一个问题:如果 要获取的元素正在迁徙中,源码中是如何解决的呢?

间接查看 fwd 的 find() 办法:

Node<K,V> find(int h, Object k) {
    // loop to avoid arbitrarily deep recursion on forwarding nodes
    outer: for (Node<K,V>[] tab = nextTable;;) {
        Node<K,V> e; int n;
        // 无 tab,或落点桶无元素,返回 null
        if (k == null || tab == null || (n = tab.length) == 0 ||
            (e = tabAt(tab, (n - 1) & h)) == null)
            return null;
        
        for (;;) {
            int eh; K ek;
            // -- 2. 新 table 上找到元素,返回
            if ((eh = e.hash) == h &&
                ((ek = e.key) == k || (ek != null && k.equals(ek))))
                return e;
            
            if (eh < 0) {
                // -- 1. 迁徙中,去新表查问
                if (e instanceof ForwardingNode) {tab = ((ForwardingNode<K,V>)e).nextTable;
                    continue outer;
                }
                // 树上查找
                else
                    return e.find(h, k);
            }
            if ((e = e.next) == null)
                return null;
        }
    }
}

tableSizeFor 办法

最初察看一个有意思的办法

// 保障失去的是一个大于入参 c 的最小 2 的 n 次幂数
// 参考:https://www.cnblogs.com/xiyixiaodao/p/14483876.html
private static final int tableSizeFor(int c) {
    int n = c - 1;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
   return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
正文完
 0