ConcurrentHashMap

JDK1.8摈弃了原有的 Segment 分段锁,而采纳了 CAS + synchronized 来保障并发安全性。
创立ConcurrentHashMap对象,应用一个参数的构造函数时做了什么?
应用put办法干了哪些事件?
怎么扩容的?
且带着疑难,依据源码一步步剖析。
在剖析之前,先明确几个重要的成员变量。

  • table:数组,默认为null,用来存储Node节点数据,扩容时大小总是2的幂次方
  • nextTable:默认为null,扩容时新生成的数组,其大小为原数组的两倍
  • sizeCtl:默认为0,用来管制数组的初始化和扩容操作。正数示意有线程正在扩容。

ConcurrentHashMap一个参数结构器的过程

//带1个参数结构器public ConcurrentHashMap(int initialCapacity) {        // 小于抛出异样        if (initialCapacity < 0)            throw new IllegalArgumentException();        // 对于给定的预期容量作出正当布局。注:MAXIMUM_CAPACITY为2的30次方,MAXIMUM_CAPACITY >>> 1为2的30次幂为536870912,个别initialCapacity不会设置这么大的        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?                   MAXIMUM_CAPACITY :                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));          //tableSizeFor办法能够转换为sizeCtl = 【 (1.5 * initialCapacity + 1),而后向上取最近的 2 的 n 次方】         // 例如,initialCapacity为7,sizeCtl=cap=16        this.sizeCtl = cap;}

put办法的过程

public V put(K key, V value) {    // put办法里间接去调用了putVal办法    return putVal(key, value,false);}

putVal办法剖析

final V putVal(K key, V value, boolean onlyIfAbsent) {    // key键和value值不能为 null    if (key == null || value == null) throw new NullPointerException();    // 计算hash值,将Key的hashCode值与其高16位作异或再按位与int的最大值从而保障最高位为0(从而保障最终后果为正整数)    // 通过spread函数,int hash = (key.hashCode() ^ (key.hashCode() >>> 16)) & HASH_BITS    // HASH_BITS=int型的最大值,即十六进制0x7fffffff,二进制 0111 1111 1111 1111 1111 1111 1111 1111    int hash = spread(key.hashCode());    // 局部变量,binCount默认是0,只有hash抵触了才会大于1.且他的大小是链表的长度(如果不是红黑数构造的话)。    int binCount = 0;    //循环,因为前面是CAS操作,可能会须要大量的重试    for (Node<K,V>[] tab = table;;) {        Node<K,V> f; int n, i, fh;        // 如果没数组为空,调用initTable办法初始化创立数组        if (tab == null || (n = tab.length) == 0)            tab = initTable();        // 找到下标,如果为空,采纳CAS进行插入        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {            //如果没放胜利,持续向下走,因为这必定是呈现了并发操作,所以去判断没放胜利的理由.如果放胜利了,那就完结循环            if (casTabAt(tab, i, null,                    new Node<K,V>(hash, key, value, null)))                break;                   // no lock when adding to empty bin        }        // 如果 hash 抵触了,且 hash 值为 -1,阐明是 forwarding node 对象(这是一个占位符对象,保留了扩容后的容器)        else if ((fh = f.hash) == MOVED)            tab = helpTransfer(tab, f);// 帮忙数据迁徙        else { // 这里就是数组曾经有元素了,这时候就该挂链表或者挂树了            V oldVal = null;            // 获取头节点的监视器锁            synchronized (f) {                if (tabAt(tab, i) == f) {                    // 头节点的hash值,大于0示意这上面有点货色                    if (fh >= 0) {                        binCount = 1;                        for (Node<K,V> e = f;; ++binCount) { //for循环,示意遍历链表,循环一次后binCount加1                            K ek;                            // 如果发现了"相等"的 key,判断是否要进行值笼罩                            if (e.hash == hash &&                                    ((ek = e.key) == key ||                                            (ek != null && key.equals(ek)))) {                                oldVal = e.val;                                if (!onlyIfAbsent)                                    e.val = value;                                break;                            }                            Node<K,V> pred = e;                            // 到最初了没反复的key,就把新值向前面挂                            if ((e = e.next) == null) {                                pred.next = new Node<K,V>(hash, key,                                        value, null);                                break;                            }                        }                    }                    // 如果是个树                    else if (f instanceof TreeBin) {                        Node<K,V> p;                        binCount = 2;                        // 插节点,调用TreeBin的putTreeVal办法                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,                                value)) != null) {                            oldVal = p.val;                            if (!onlyIfAbsent)                                p.val = value;                        }                    }                }            }            if (binCount != 0) {                // 判断链表的长度,如果大于8,而后转树                if (binCount >= TREEIFY_THRESHOLD)                    // 这里要留神一个中央!!!!! --不是说像HashMap那样转树就没事了                    // 这里波及到一个外围思路,CurrentHashMap做了优化,这里如果数组长度小于64,它会先扩容,扩容代表什么含意                    // -- 原来的链表会被1分为2 别离散落在不同的节点上                    treeifyBin(tab, i);                // 如果key已存在,有原值,返回原值                if (oldVal != null)                    return oldVal;                break; // 完结外层死循环            }        }    }    // 元素计数加1,依据binCount来测验是否须要检查和扩容    addCount(1L, binCount);    return null;}

initTable办法进行数组初始化

// 在下面的putVal源码里,当数组为空或长度为0的时候,需初始化,调用了initTable()办法private final Node<K,V>[] initTable() {    Node<K,V>[] tab; int sc;    while ((tab = table) == null || tab.length == 0) {        // 须要留神的是,当整形的变量sc(即sizeCtl)小于0,那么阐明有其余线程在在扩容,就调用线程的yield()办法        if ((sc = sizeCtl) < 0)            Thread.yield(); // lost initialization race; just spin        // 应用sun.misc.Unsafe的compareAndSwapInt办法设置以后对象的sizeCtl为-1,设置胜利后,初始化数组,默认容量 DEFAULT_CAPACITY 为16        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {            try {                if ((tab = table) == null || tab.length == 0) {                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;                    @SuppressWarnings("unchecked")                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];                    table = tab = nt;                    // 变量sc的值为(n-(n>>>2)),n-(n>>>2)=n-(n/2^2)=n-n/4=3/4*n=0.75*n,默认n开始为16,16*0.75=12                    // 可知,负载因子为0.75                    sc = n - (n >>> 2);                }            } finally {                sizeCtl = sc;            }            break;        }    }    return tab;}

链表转树,treeifyBin办法剖析

private final void treeifyBin(Node<K,V>[] tab, int index) {    Node<K,V> b; int n, sc;    if (tab != null) {        //MIN_TREEIFY_CAPACITY为64         // 尽管进入到转树办法,如果数组长度小于64,那么先扩容        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)            tryPresize(n << 1); // 扩容,上面具体说        // 确定头节点没问题开始加锁,转树        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {            synchronized (b) {                if (tabAt(tab, index) == b) {                    TreeNode<K,V> hd = null, tl = null;                    // 遍历链表,生成一棵树                    for (Node<K,V> e = b; e != null; e = e.next) {                        TreeNode<K,V> p =                            new TreeNode<K,V>(e.hash, e.key, e.val,                                              null, null);                        if ((p.prev = tl) == null)                            hd = p;                        else                            tl.next = p;                        tl = p;                    }                    // 把数据放到树中                    setTabAt(tab, index, new TreeBin<K,V>(hd));                }            }        }    }}

tryPresize办法进行扩容

private final void tryPresize(int size) {    // 如果大小大于等于MAXIMUM_CAPACITY(2的30次幂)的一半,那么间接扩容为MAXIMUM_CAPACITY,否则扩容为1.5的size加1再向上获取最近的2的整数次幂    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :            tableSizeFor(size + (size >>> 1) + 1);    int sc;    // 如果sizeCtl大于等于0    while ((sc = sizeCtl) >= 0) {        Node<K,V>[] tab = table; int n;        // 数组table为空        if (tab == null || (n = tab.length) == 0) {            n = (sc > c) ? sc : c;            // CAS批改SIZECTL为-1,示意数组table正在进行初始化            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {                // 确认其余线程没有对数组table批改                try {                    //                    if (table == tab) {                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];                        table = nt;                        // sc=n-n/4=0.75*n                        sc = n - (n >>> 2);                    }                } finally {                    sizeCtl = sc;                }            }        }        // 数组table不为空,如果扩容大小没有达到阈值,或者超过最大容量2的30次方,跳出while循环        else if (c <= sc || n >= MAXIMUM_CAPACITY)            break;        else if (tab == table) {            // 生成戳            int rs = resizeStamp(n);            if (sc < 0) {// 有线程在进行扩容                Node<K,V>[] nt;                /**1.第一个判断 sc右移RESIZE_STAMP_SHIFT(16)位,也就是比拟高ESIZE_STAMP_BITS(16)位生成戳和rs是否相等                 * 相等则代表是同一个n,是在同一节点下进行的扩容,                 *  2.第二个和第三个判断 判断以后帮忙扩容线程数是否已达到MAX_RESIZERS(2的16次方-1=65535)最大扩容线程数                 *  3.第四个和第五个判断 为了确保transfer()办法初始化结束                 */                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||                        transferIndex <= 0)                    // 跳出循环                    break;                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))                    // 挪动和拷贝节点到新数组                    transfer(tab, nt);            }            /**没有线程在进行扩容,那么CAS批改sizeCtl值,作为扩容的发动,rs左移RESIZE_STAMP_SHIFT(16)位+2             * 此时sizeCtl高RESIZE_STAMP_BITS(16)位为生成戳,低RESIZE_STAMP_SHIFT(16)位为扩容线程数             */            else if (U.compareAndSwapInt(this, SIZECTL, sc,                    (rs << RESIZE_STAMP_SHIFT) + 2))                // 挪动和拷贝节点到新数组                transfer(tab, null);        }    }}

挪动和拷贝节点到新数组,transfer函数

//该办法通过全局的transferIndex来管制每个线程的迁徙工作private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {    // n为旧tab的长度,stride为步长(就是每个线程迁徙的节点数)    int n = tab.length, stride;    // 单核步长为1,多核为(n>>>3)/ NCPU,最小值为16    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)        stride = MIN_TRANSFER_STRIDE; // subdivide range    // 新的 table 尚未初始化    if (nextTab == null) { // initiating        try {            // 扩容2倍            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];            // 更新            nextTab = nt;        } catch (Throwable ex) {      // try to cope with OOME            // 扩容失败, sizeCtl 应用 int 最大值            sizeCtl = Integer.MAX_VALUE;            return;        }        // nextTable为全局属性        nextTable = nextTab;        // 更新转移下标,就是老的tab的length        transferIndex = n;    }    int nextn = nextTab.length;// 新 tab 的 length    // 创立一个 fwd 节点,用于占位。当别的线程发现这个槽位中是 fwd 类型的节点,则跳过这个节点    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);    // 首次推动为 true,如果等于 true,阐明须要再次推动一个下标(i--),反之,如果是 false,那么就不能推动下标,须要将以后的下标处理完毕能力持续推动    boolean advance = true;    // 实现状态,如果是 true,就完结此办法    boolean finishing = false; // to ensure sweep before committing nextTab    // 死循环,i 示意下标,bound 示意以后线程能够解决的以后桶区间最小下标    for (int i = 0, bound = 0;;) {        Node<K,V> f; int fh;        // 如果以后线程能够向后推动;这个循环就是管制 i 递加。同时,每个线程都会进入这里获得本人须要转移的桶的区间        while (advance) {            int nextIndex, nextBound;            // 对 i 减一,判断是否大于等于 bound (失常状况下,如果大于 bound 不成立,阐明该线程上次支付的工作曾经实现了。那么,须要在上面持续支付工作)            // 如果对 i 减一大于等于 bound(还须要持续做工作),或者实现了,批改推动状态为 false,不能推动了。工作胜利后批改推动状态为 true            // 通常,第一次进入循环,i-- 这个判断会无奈通过,从而走上面的 nextIndex 赋值操作(获取最新的转移下标)            // 其余状况都是:如果能够推动,将 i 减一,而后批改成不可推动。如果 i 对应的桶解决胜利了,改成能够推动            if (--i >= bound || finishing)                // 这里设置 false,是为了避免在没有胜利解决一个桶的状况下却进行了推动 这里的目标是:                // 1. 当一个线程进入时,会选取最新的转移下标。                // 2. 当一个线程解决完本人的区间时,如果还有残余区间的没有别的线程解决。再次获取区间。                advance = false;            else if ((nextIndex = transferIndex) <= 0) {                // 如果小于等于0,阐明没有区间了 ,i 改成 -1,推动状态变成 false,不再推动,示意,扩容完结了,以后线程能够退出了                // 这个 -1 会在上面的 if 块里判断,从而进入实现状态判断                i = -1;                advance = false;// 这里设置 false,是为了避免在没有胜利解决一个桶的状况下却进行了推动            }            else if (U.compareAndSwapInt                    (this, TRANSFERINDEX, nextIndex,                            nextBound = (nextIndex > stride ?                                    nextIndex - stride : 0))) {                // 这个值就是以后线程能够解决的最小以后区间最小下标                bound = nextBound;                // 首次对i 赋值,这个就是以后线程能够解决的以后区间的最大下标                i = nextIndex - 1;                // 这里设置 false,是为了避免在没有胜利解决一个桶的状况下却进行了推动,这样对导致漏掉某个桶                // 上面的 if (tabAt(tab, i) == f) 判断会呈现这样的状况                advance = false;            }        }        // 如果 i 小于0 (不在 tab 下标内,依照下面的判断,支付最初一段区间的线程扩容完结)        if (i < 0 || i >= n || i + n >= nextn) {            int sc;            if (finishing) {// 如果实现了扩容                nextTable = null;// 删除成员变量                table = nextTab;// 更新 table                sizeCtl = (n << 1) - (n >>> 1); // 更新阈值                return;            }            // 尝试将 sc -1. 示意这个线程完结帮忙扩容了,将 sc 的低 16 位减一            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {                // 如果 sc - 2 不等于标识符左移 16 位。如果他们相等了,阐明没有线程在帮忙他们扩容了。也就是说,扩容完结了。                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)                    return;// 不相等,阐明没完结,以后线程完结办法                // 如果相等,扩容完结了,更新 finising 变量                finishing = advance = true;                i = n; // recheck before commit // 再次循环检查一下整张表            }        }        // 获取老 tab i 下标地位的变量,如果是 null,就应用 fwd 占位。        else if ((f = tabAt(tab, i)) == null)            // 如果胜利写入 fwd 占位,再次推动一个下标            advance = casTabAt(tab, i, null, fwd);        else if ((fh = f.hash) == MOVED)// 如果不是 null 且 hash 值是 MOVED,MOVED=-1            // 阐明别的线程曾经解决过了,再次推动一个下标            advance = true; // already processed        else {// 到这里,阐明这个地位有理论值了,且不是占位符节点。对这个节点上锁。为什么上锁,避免 putVal 的时候向链表插入数据            synchronized (f) {                // 判断 i 下标处的桶节点是否和 f 雷同                if (tabAt(tab, i) == f) {                    Node<K,V> ln, hn;// low, height 高位桶,低位桶                    if (fh >= 0) {                        // 对老长度进行与运算(第一个操作数的的第n位于第二个操作数的第n位如果都是1,那么后果的第n为也为1,否则0)                        // 因为 Map 的长度都是 2 的次方(000001000 这类的数字),那么取于 length 只有 2 种后果,一种是 0,一种是1                        //  如果是后果是0 ,Doug Lea 将其放在低位,反之放在高位,目标是将链表从新 hash,放到对应的地位上,让新的取于算法可能击中他                        int runBit = fh & n;                        // 尾节点,且和头节点的 hash 值取于不相等                        Node<K,V> lastRun = f;                        // 遍历这个桶 接下来是惯例的设置操作,咱们先略过                        for (Node<K,V> p = f.next; p != null; p = p.next) {                            int b = p.hash & n;                            if (b != runBit) {                                runBit = b;                                lastRun = p;                            }                        }                        if (runBit == 0) {                            ln = lastRun;                            hn = null;                        }                        else {                            hn = lastRun;                            ln = null;                        }                        for (Node<K,V> p = f; p != lastRun; p = p.next) {                            int ph = p.hash; K pk = p.key; V pv = p.val;                            if ((ph & n) == 0)                                ln = new Node<K,V>(ph, pk, pv, ln);                            else                                hn = new Node<K,V>(ph, pk, pv, hn);                        }                        setTabAt(nextTab, i, ln);                        setTabAt(nextTab, i + n, hn);                        setTabAt(tab, i, fwd);                        advance = true;                    }                    else if (f instanceof TreeBin) {                        TreeBin<K,V> t = (TreeBin<K,V>)f;                        TreeNode<K,V> lo = null, loTail = null;                        TreeNode<K,V> hi = null, hiTail = null;                        int lc = 0, hc = 0;                        for (Node<K,V> e = t.first; e != null; e = e.next) {                            int h = e.hash;                            TreeNode<K,V> p = new TreeNode<K,V>                                    (h, e.key, e.val, null, null);                            if ((h & n) == 0) {                                if ((p.prev = loTail) == null)                                    lo = p;                                else                                    loTail.next = p;                                loTail = p;                                ++lc;                            }                            else {                                if ((p.prev = hiTail) == null)                                    hi = p;                                else                                    hiTail.next = p;                                hiTail = p;                                ++hc;                            }                        }                        // 如果树的节点数小于等于 6,那么转成链表,反之,创立一个新的树                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :                                (hc != 0) ? new TreeBin<K,V>(lo) : t;                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :                                (lc != 0) ? new TreeBin<K,V>(hi) : t;                        // 低位树                        setTabAt(nextTab, i, ln);                        // 高位树                        setTabAt(nextTab, i + n, hn);                        // 旧的设置成占位符                        setTabAt(tab, i, fwd);                        // 持续向后推动                        advance = true;                    }                }            }        }    }}

总结

  • 应用了 CAS 加 synchronized 来保障了 put 操作并发时的危险(特地是链表)
  • 采纳了 数组+链表+红黑树 的数据结构
  • 单线程初始化,多线程协同扩容

FAQ:

  • 什么时候触发扩容?
  1. 链表转换为红黑树时(链表节点个数达到8个会转换为树),数组长度小于64
  2. 数组中总节点数大于阈值(数组长度的0.75倍)
  • 如何hash定位

h^(h>>>16)&0x7fffffff,先将hashCode的高16位和低16位异或运算,这个做目标是为了让hash值更加随机。和0x7fffffff(int的最大值)相与运算是为了失去负数,因为正数的hash有非凡用处,如-1表forwarding node(示意该地位正在扩容)

  • 扩容时,扩容后的容量是原先的几倍?单线程扩容吗?
    2倍,多线程协同扩容,每个线程负责一块区域的复制迁徙工作