并发编程实际中,ConcurrentHashMap是一个常常被应用的数据结构,相比于Hashtable以及Collections.synchronizedMap(),ConcurrentHashMap在线程平安的根底上提供了更好的写并发能力,但同时升高了对读一致性的要求(这点如同CAP实践啊 O(∩_∩)O)。ConcurrentHashMap的设计与实现十分精美,大量的利用了volatile,final,CAS等lock-free技术来缩小锁竞争对于性能的影响,无论对于Java并发编程的学习还是Java内存模型的了解,ConcurrentHashMap的设计以及源码都值得十分认真的浏览与琢磨。

这篇日志记录了本人对ConcurrentHashMap的一些总结,因为JDK6,7,8中实现都不同,须要离开论述在不同版本中的ConcurrentHashMap。

本文从源码登程,筛选集体感觉重要的点(会用红色标注)再次进行回顾,以及论述ConcurrentHashMap的一些留神点。

1. JDK6与JDK7中的实现

1.1 设计思路

ConcurrentHashMap采纳了分段锁的设计,只有在同一个分段内才存在竞态关系,不同的分段锁之间没有锁竞争。相比于对整个Map加锁的设计,分段锁大大的进步了高并发环境下的解决能力。但同时,因为不是对整个Map加锁,导致一些须要扫描整个Map的办法(如size(), containsValue())须要应用非凡的实现,另外一些办法(如clear())甚至放弃了对一致性的要求(ConcurrentHashMap是弱一致性的,具体请查看ConcurrentHashMap能齐全代替HashTable吗?)。

ConcurrentHashMap中的分段锁称为Segment,它即相似于HashMap(JDK7与JDK8中HashMap的实现)的构造,即外部领有一个Entry数组,数组中的每个元素又是一个链表;同时又是一个ReentrantLock(Segment继承了ReentrantLock)。ConcurrentHashMap中的HashEntry绝对于HashMap中的Entry有肯定的差异性:HashEntry中的value以及next都被volatile润饰,这样在多线程读写过程中可能放弃它们的可见性,代码如下:

static final class HashEntry<K,V> {        final int hash;        final K key;        volatile V value;        volatile HashEntry<K,V> next;}

1.2 并发度(Concurrency Level)

并发度能够了解为程序运行时可能同时更新ConccurentHashMap且不产生锁竞争的最大线程数,实际上就是ConcurrentHashMap中的分段锁个数,即Segment[]的数组长度。ConcurrentHashMap默认的并发度为16,但用户也能够在构造函数中设置并发度。当用户设置并发度时,ConcurrentHashMap会应用大于等于该值的最小2幂指数作为理论并发度(如果用户设置并发度为17,理论并发度则为32)。运行时通过将key的高n位(n = 32 – segmentShift)和并发度减1(segmentMask)做位与运算定位到所在的Segment。segmentShift与segmentMask都是在结构过程中依据concurrency level被相应的计算出来。

如果并发度设置的过小,会带来重大的锁竞争问题;如果并发度设置的过大,本来位于同一个Segment内的拜访会扩散到不同的Segment中,CPU cache命中率会降落,从而引起程序性能降落。(文档的说法是依据你并发的线程数量决定,太多会导性能升高)

1.3 创立分段锁

和JDK6不同,JDK7中除了第一个Segment之外,残余的Segments采纳的是提早初始化的机制:每次put之前都须要查看key对应的Segment是否为null,如果是则调用ensureSegment()以确保对应的Segment被创立。

ensureSegment可能在并发环境下被调用,但与设想中不同,ensureSegment并未应用锁来管制竞争,而是应用了Unsafe对象的getObjectVolatile()提供的原子读语义联合CAS来确保Segment创立的原子性。代码段如下:

if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))                == null) { // recheck                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))                       == null) {                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))                        break;                }}

1.4 put/putIfAbsent/putAll

和JDK6一样,ConcurrentHashMap的put办法被代理到了对应的Segment(定位Segment的原理之前曾经形容过)中。与JDK6不同的是,JDK7版本的ConcurrentHashMap在取得Segment锁的过程中,做了肯定的优化 - 在真正申请锁之前,put办法会通过tryLock()办法尝试取得锁,在尝试取得锁的过程中会对对应hashcode的链表进行遍历,如果遍历结束依然找不到与key雷同的HashEntry节点,则为后续的put操作提前创立一个HashEntry。当tryLock肯定次数后仍无奈取得锁,则通过lock申请锁。

须要留神的是,因为在并发环境下,其余线程的put,rehash或者remove操作可能会导致链表头结点的变动,因而在过程中须要进行查看,如果头结点发生变化则从新对表进行遍历。而如果其余线程引起了链表中的某个节点被删除,即便该变动因为是非原子写操作(删除节点后链接后续节点调用的是Unsafe.putOrderedObject(),该办法不提供原子写语义)可能导致以后线程无奈察看到,但因为不影响遍历的正确性所以忽略不计。

之所以在获取锁的过程中对整个链表进行遍历,次要目标是心愿遍历的链表被CPU cache所缓存,为后续理论put过程中的链表遍历操作晋升性能。

在取得锁之后,Segment对链表进行遍历,如果某个HashEntry节点具备雷同的key,则更新该HashEntry的value值,否则新建一个HashEntry节点,将它设置为链表的新head节点并将原头节点设为新head的下一个节点。新建过程中如果节点总数(含新建的HashEntry)超过threshold,则调用rehash()办法对Segment进行扩容,最初将新建HashEntry写入到数组中。

put办法中,链接新节点的下一个节点(HashEntry.setNext())以及将链表写入到数组中(setEntryAt())都是通过Unsafe的putOrderedObject()办法来实现,这里并未应用具备原子写语义的putObjectVolatile()的起因是:JMM会保障取得锁到开释锁之间所有对象的状态更新都会在锁被开释之后更新到主存,从而保障这些变更对其余线程是可见的。

1.5 rehash

绝对于HashMap的resize,ConcurrentHashMap的rehash原理相似,然而Doug Lea为rehash做了肯定的优化,防止让所有的节点都进行复制操作:因为扩容是基于2的幂指来操作,假如扩容前某HashEntry对应到Segment中数组的index为i,数组的容量为capacity,那么扩容后该HashEntry对应到新数组中的index只可能为i或者i+capacity,因而大多数HashEntry节点在扩容前后index能够放弃不变。基于此,rehash办法中会定位第一个后续所有节点在扩容后index都放弃不变的节点,而后将这个节点之前的所有节点重排即可。这部分代码如下:

private void rehash(HashEntry<K,V> node) {            HashEntry<K,V>[] oldTable = table;            int oldCapacity = oldTable.length;            int newCapacity = oldCapacity << 1;            threshold = (int)(newCapacity * loadFactor);            HashEntry<K,V>[] newTable =                (HashEntry<K,V>[]) new HashEntry[newCapacity];            int sizeMask = newCapacity - 1;            for (int i = 0; i < oldCapacity ; i++) {                HashEntry<K,V> e = oldTable[i];                if (e != null) {                    HashEntry<K,V> next = e.next;                    int idx = e.hash & sizeMask;                    if (next == null)   //  Single node on list                        newTable[idx] = e;                    else { // Reuse consecutive sequence at same slot                        HashEntry<K,V> lastRun = e;                        int lastIdx = idx;                        for (HashEntry<K,V> last = next;                             last != null;                             last = last.next) {                            int k = last.hash & sizeMask;                            if (k != lastIdx) {                                lastIdx = k;                                lastRun = last;                            }                        }                        newTable[lastIdx] = lastRun;                        // Clone remaining nodes                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {                            V v = p.value;                            int h = p.hash;                            int k = h & sizeMask;                            HashEntry<K,V> n = newTable[k];                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);                        }                    }                }            }            int nodeIndex = node.hash & sizeMask; // add the new node            node.setNext(newTable[nodeIndex]);            newTable[nodeIndex] = node;            table = newTable;        }

1.6 remove

和put相似,remove在真正取得锁之前,也会对链表进行遍历以进步缓存命中率。

1.7 get与containsKey

get与containsKey两个办法简直完全一致:他们都没有应用锁,而是通过Unsafe对象的getObjectVolatile()办法提供的原子读语义,来取得Segment以及对应的链表,而后对链表遍历判断是否存在key雷同的节点以及取得该节点的value。但因为遍历过程中其余线程可能对链表构造做了调整,因而get和containsKey返回的可能是过期的数据,这一点是ConcurrentHashMap在弱一致性上的体现。如果要求强一致性,那么必须应用Collections.synchronizedMap()办法。

1.8 size、containsValue

这些办法都是基于整个ConcurrentHashMap来进行操作的,他们的原理也根本相似:首先不加锁循环执行以下操作:循环所有的Segment(通过Unsafe的getObjectVolatile()以保障原子读语义),取得对应的值以及所有Segment的modcount之和。如果间断两次所有Segment的modcount和相等,则过程中没有产生其余线程批改ConcurrentHashMap的状况,返回取得的值。

当循环次数超过预约义的值时,这时须要对所有的Segment顺次进行加锁,获取返回值后再顺次解锁。值得注意的是,加锁过程中要强制创立所有的Segment,否则容易呈现其余线程创立Segment并进行put,remove等操作。代码如下:

for(int j =0; j < segments.length; ++j)ensureSegment(j).lock();// force creation

一般来说,应该防止在多线程环境下应用size和containsValue办法。

注1:modcount在put, replace, remove以及clear等办法中都会被批改。

注2:对于containsValue办法来说,如果在循环过程中发现匹配value的HashEntry,则间接返回true。

最初,与HashMap不同的是,ConcurrentHashMap并不容许key或者value为null,依照Doug Lea的说法,这么设计的起因是在ConcurrentHashMap中,一旦value呈现null,则代表HashEntry的key/value没有映射实现就被其余线程所见,须要非凡解决。在JDK6中,get办法的实现中就有一段对HashEntry.value == null的防御性判断。但Doug Lea也抵赖理论运行过程中,这种状况仿佛不可能产生(参考:http://cs.oswego.edu/pipermai...)。

2. JDK8中的实现

ConcurrentHashMap在JDK8中进行了微小改变,很须要通过源码来再次学习下Doug Lea的实现办法。

它摒弃了Segment(锁段)的概念,而是启用了一种全新的形式实现,利用CAS算法。它沿用了与它同期间的HashMap版本的思维,底层仍然由“数组”+链表+红黑树的形式思维(JDK7与JDK8中HashMap的实现),然而为了做到并发,又减少了很多辅助的类,例如TreeBin,Traverser等对象外部类。

2.1 重要的属性

首先来看几个重要的属性,与HashMap雷同的就不再介绍了,这里重点解释一下sizeCtl这个属性。能够说它是ConcurrentHashMap中出镜率很高的一个属性,因为它是一个管制标识符,在不同的中央有不同用处,而且它的取值不同,也代表不同的含意。

  • 正数代表正在进行初始化或扩容操作
  • -1代表正在初始化
  • -N 示意有N-1个线程正在进行扩容操作
  • 负数或0代表hash表还没有被初始化,这个数值示意初始化或下一次进行扩容的大小,这一点相似于扩容阈值的概念。还前面能够看到,它的值始终是以后ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的。
/**     * 盛装Node元素的数组 它的大小是2的整数次幂     * Size is always a power of two. Accessed directly by iterators.     */    transient volatile Node<K,V>[] table;    /**     * Table initialization and resizing control.  When negative, the     * table is being initialized or resized: -1 for initialization,     * else -(1 + the number of active resizing threads).  Otherwise,     * when table is null, holds the initial table size to use upon     * creation, or 0 for default. After initialization, holds the     * next element count value upon which to resize the table.     hash表初始化或扩容时的一个管制位标识量。     正数代表正在进行初始化或扩容操作     -1代表正在初始化     -N 示意有N-1个线程正在进行扩容操作     负数或0代表hash表还没有被初始化,这个数值示意初始化或下一次进行扩容的大小          */    private transient volatile int sizeCtl;     // 以下两个是用来管制扩容的时候 单线程进入的变量     /**     * The number of bits used for generation stamp in sizeCtl.     * Must be at least 6 for 32bit arrays.     */    private static int RESIZE_STAMP_BITS = 16;  /**     * The bit shift for recording size stamp in sizeCtl.     */    private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;            /*     * Encodings for Node hash fields. See above for explanation.     */    static final int MOVED     = -1; // hash值是-1,示意这是一个forwardNode节点    static final int TREEBIN   = -2; // hash值是-2  示意这时一个TreeBin节点

2.2 重要的类

2.2.1 Node

Node是最外围的外部类,它包装了key-value键值对,所有插入ConcurrentHashMap的数据都包装在这外面。它与HashMap中的定义很类似,然而然而有一些差异它对value和next属性设置了volatile同步锁(与JDK7的Segment雷同),它不容许调用setValue办法间接扭转Node的value域,它减少了find办法辅助map.get()办法。

2.2.2 TreeNode

树节点类,另外一个外围的数据结构。当链表长度过长的时候,会转换为TreeNode。然而与HashMap不雷同的是,它并不是间接转换为红黑树,而是把这些结点包装成TreeNode放在TreeBin对象中,由TreeBin实现对红黑树的包装。而且TreeNode在ConcurrentHashMap集成自Node类,而并非HashMap中的集成自LinkedHashMap.Entry<K,V>类,也就是说TreeNode带有next指针,这样做的目标是不便基于TreeBin的拜访。

2.2.3 TreeBin

这个类并不负责包装用户的key、value信息,而是包装的很多TreeNode节点。它代替了TreeNode的根节点,也就是说在理论的ConcurrentHashMap“数组”中,寄存的是TreeBin对象,而不是TreeNode对象,这是与HashMap的区别。另外这个类还带有了读写锁。

这里仅贴出它的构造方法。能够看到在结构TreeBin节点时,仅仅指定了它的hash值为TREEBIN常量,这也就是个标识为。同时也看到咱们相熟的红黑树构造方法

2.2.4 ForwardingNode

一个用于连贯两个table的节点类。它蕴含一个nextTable指针,用于指向下一张表。而且这个节点的key value next指针全副为null,它的hash值为-1. 这外面定义的find的办法是从nextTable里进行查问节点,而不是以本身为头节点进行查找。

/**     * A node inserted at head of bins during transfer operations.     */    static final class ForwardingNode<K,V> extends Node<K,V> {        final Node<K,V>[] nextTable;        ForwardingNode(Node<K,V>[] tab) {            super(MOVED, null, null, null);            this.nextTable = tab;        }        Node<K,V> find(int h, Object k) {            // loop to avoid arbitrarily deep recursion on forwarding nodes            outer: for (Node<K,V>[] tab = nextTable;;) {                Node<K,V> e; int n;                if (k == null || tab == null || (n = tab.length) == 0 ||                    (e = tabAt(tab, (n - 1) & h)) == null)                    return null;                for (;;) {                    int eh; K ek;                    if ((eh = e.hash) == h &&                        ((ek = e.key) == k || (ek != null && k.equals(ek))))                        return e;                    if (eh < 0) {                        if (e instanceof ForwardingNode) {                            tab = ((ForwardingNode<K,V>)e).nextTable;                            continue outer;                        }                        else                            return e.find(h, k);                    }                    if ((e = e.next) == null)                        return null;                }            }        }    }

2.3 Unsafe与CAS

在ConcurrentHashMap中,随处能够看到U, 大量应用了U.compareAndSwapXXX的办法,这个办法是利用一个CAS算法实现无锁化的批改值的操作,他能够大大降低锁代理的性能耗费。这个算法的根本思维就是一直地去比拟以后内存中的变量值与你指定的一个变量值是否相等,如果相等,则承受你指定的批改的值,否则回绝你的操作。因为以后线程中的值曾经不是最新的值,你的批改很可能会笼罩掉其余线程批改的后果。这一点与乐观锁,SVN的思维是比拟相似的。

2.3.1 unsafe动态块

unsafe代码块管制了一些属性的批改工作,比方最罕用的SIZECTL 。在这一版本的concurrentHashMap中,大量利用来的CAS办法进行变量、属性的批改工作。利用CAS进行无锁操作,能够大大提高性能。

private static final sun.misc.Unsafe U;    private static final long SIZECTL;    private static final long TRANSFERINDEX;    private static final long BASECOUNT;    private static final long CELLSBUSY;    private static final long CELLVALUE;    private static final long ABASE;    private static final int ASHIFT;    static {        try {            U = sun.misc.Unsafe.getUnsafe();            Class<?> k = ConcurrentHashMap.class;            SIZECTL = U.objectFieldOffset                (k.getDeclaredField("sizeCtl"));            TRANSFERINDEX = U.objectFieldOffset                (k.getDeclaredField("transferIndex"));            BASECOUNT = U.objectFieldOffset                (k.getDeclaredField("baseCount"));            CELLSBUSY = U.objectFieldOffset                (k.getDeclaredField("cellsBusy"));            Class<?> ck = CounterCell.class;            CELLVALUE = U.objectFieldOffset                (ck.getDeclaredField("value"));            Class<?> ak = Node[].class;            ABASE = U.arrayBaseOffset(ak);            int scale = U.arrayIndexScale(ak);            if ((scale & (scale - 1)) != 0)                throw new Error("data type scale not a power of two");            ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);        } catch (Exception e) {            throw new Error(e);        }    }    

2.3.2 三个外围办法

ConcurrentHashMap定义了三个原子操作,用于对指定地位的节点进行操作。正是这些原子操作保障了ConcurrentHashMap的线程平安。

//取得在i地位上的Node节点    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);    }  //利用CAS算法设置i地位上的Node节点。之所以能实现并发是因为他指定了原来这个节点的值是多少  //在CAS算法中,会比拟内存中的值与你指定的这个值是否相等,如果相等才承受你的批改,否则回绝你的批改  //因而以后线程中的值并不是最新的值,这种批改可能会笼罩掉其余线程的批改后果  有点相似于SVN    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,                                        Node<K,V> c, Node<K,V> v) {        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);    }  //利用volatile办法设置节点地位的值    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);    }

2.4 初始化办法initTable

对于ConcurrentHashMap来说,调用它的构造方法仅仅是设置了一些参数而已。而整个table的初始化是在向ConcurrentHashMap中插入元素的时候产生的。如调用put、computeIfAbsent、compute、merge等办法的时候,调用机会是查看table==null。

初始化办法次要利用了要害属性sizeCtl 如果这个值〈0,示意其余线程正在进行初始化,就放弃这个操作。在这也能够看出ConcurrentHashMap的初始化只能由一个线程实现。如果取得了初始化权限,就用CAS办法将sizeCtl置为-1,避免其余线程进入。初始化数组后,将sizeCtl的值改为0.75*n。

/**     * Initializes table, using the size recorded in sizeCtl.     */    private final Node<K,V>[] initTable() {        Node<K,V>[] tab; int sc;        while ((tab = table) == null || tab.length == 0) {          //sizeCtl示意有其余线程正在进行初始化操作,把线程挂起。对于table的初始化工作,只能有一个线程在进行。            if ((sc = sizeCtl) < 0)                Thread.yield(); // lost initialization race; just spin            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//利用CAS办法把sizectl的值置为-1 示意本线程正在进行初始化                try {                    if ((tab = table) == null || tab.length == 0) {                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;                        @SuppressWarnings("unchecked")                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];                        table = tab = nt;                        sc = n - (n >>> 2);//相当于0.75*n 设置一个扩容的阈值                    }                } finally {                    sizeCtl = sc;                }                break;            }        }        return tab;    }

2.5 扩容办法 transfer

当ConcurrentHashMap容量有余的时候,须要对table进行扩容。这个办法的根本思维跟HashMap是很像的,然而因为它是反对并发扩容的,所以要简单的多。起因是它反对多线程进行扩容操作,而并没有加锁。我想这样做的目标不仅仅是为了满足concurrent的要求,而是心愿利用并发解决去缩小扩容带来的工夫影响。因为在扩容的时候,总是会波及到从一个“数组”到另一个“数组”拷贝的操作,如果这个操作可能并发进行,那真真是极好的了。

整个扩容操作分为两个局部

  • 第一局部是构建一个nextTable,它的容量是原来的两倍,这个操作是单线程实现的。这个单线程的保障是通过RESIZE_STAMP_SHIFT这个常量通过一次运算来保障的,这个中央在前面会有提到;
  • 第二个局部就是将原来table中的元素复制到nextTable中,这里容许多线程进行操作。

先来看一下单线程是如何实现的:

它的大体思维就是遍历、复制的过程。首先依据运算失去须要遍历的次数i,而后利用tabAt办法取得i地位的元素:

  • 如果这个地位为空,就在原table中的i地位放入forwardNode节点,这个也是触发并发扩容的关键点;
  • 如果这个地位是Node节点(fh>=0),如果它是一个链表的头节点,就结构一个反序链表,把他们别离放在nextTable的i和i+n的地位上
  • 如果这个地位是TreeBin节点(fh<0),也做一个反序解决,并且判断是否须要untreefi,把解决的后果别离放在nextTable的i和i+n的地位上
  • 遍历过所有的节点当前就实现了复制工作,这时让nextTable作为新的table,并且更新sizeCtl为新容量的0.75倍 ,实现扩容。

再看一下多线程是如何实现的:

在代码的69行有一个判断,如果遍历到的节点是forward节点,就向后持续遍历,再加上给节点上锁的机制,就实现了多线程的管制。多线程遍历节点,解决了一个节点,就把对应点的值set为forward,另一个线程看到forward,就向后遍历。这样穿插就实现了复制工作。而且还很好的解决了线程平安的问题。这个办法的设计切实是让我膜拜。

/**     * 一个过渡的table表  只有在扩容的时候才会应用     */    private transient volatile Node<K,V>[] nextTable;  /**     * Moves and/or copies the nodes in each bin to new table. See     * above for explanation.     */    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {        int n = tab.length, stride;        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)            stride = MIN_TRANSFER_STRIDE; // subdivide range        if (nextTab == null) {            // initiating            try {                @SuppressWarnings("unchecked")                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//结构一个nextTable对象 它的容量是原来的两倍                nextTab = nt;            } catch (Throwable ex) {      // try to cope with OOME                sizeCtl = Integer.MAX_VALUE;                return;            }            nextTable = nextTab;            transferIndex = n;        }        int nextn = nextTab.length;        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//结构一个连节点指针 用于标记位        boolean advance = true;//并发扩容的要害属性 如果等于true 阐明这个节点曾经解决过        boolean finishing = false; // to ensure sweep before committing nextTab        for (int i = 0, bound = 0;;) {            Node<K,V> f; int fh;            //这个while循环体的作用就是在管制i--  通过i--能够顺次遍历原hash表中的节点            while (advance) {                int nextIndex, nextBound;                if (--i >= bound || finishing)                    advance = false;                else if ((nextIndex = transferIndex) <= 0) {                    i = -1;                    advance = false;                }                else if (U.compareAndSwapInt                         (this, TRANSFERINDEX, nextIndex,                          nextBound = (nextIndex > stride ?                                       nextIndex - stride : 0))) {                    bound = nextBound;                    i = nextIndex - 1;                    advance = false;                }            }            if (i < 0 || i >= n || i + n >= nextn) {                int sc;                if (finishing) {                 //如果所有的节点都曾经实现复制工作  就把nextTable赋值给table 清空长期对象nextTable                    nextTable = null;                    table = nextTab;                    sizeCtl = (n << 1) - (n >>> 1);//扩容阈值设置为原来容量的1.5倍  仍然相当于当初容量的0.75倍                    return;                }                //利用CAS办法更新这个扩容阈值,在这外面sizectl值减一,阐明新退出一个线程参加到扩容操作                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)                        return;                    finishing = advance = true;                    i = n; // recheck before commit                }            }            //如果遍历到的节点为空 则放入ForwardingNode指针            else if ((f = tabAt(tab, i)) == null)                advance = casTabAt(tab, i, null, fwd);            //如果遍历到ForwardingNode节点  阐明这个点曾经被解决过了 间接跳过  这里是管制并发扩容的外围            else if ((fh = f.hash) == MOVED)                advance = true; // already processed            else {              //节点上锁                synchronized (f) {                    if (tabAt(tab, i) == f) {                        Node<K,V> ln, hn;                        //如果fh>=0 证实这是一个Node节点                        if (fh >= 0) {                            int runBit = fh & n;                            //以下的局部在实现的工作是结构两个链表  一个是原链表  另一个是原链表的反序排列                            Node<K,V> lastRun = f;                            for (Node<K,V> p = f.next; p != null; p = p.next) {                                int b = p.hash & n;                                if (b != runBit) {                                    runBit = b;                                    lastRun = p;                                }                            }                            if (runBit == 0) {                                ln = lastRun;                                hn = null;                            }                            else {                                hn = lastRun;                                ln = null;                            }                            for (Node<K,V> p = f; p != lastRun; p = p.next) {                                int ph = p.hash; K pk = p.key; V pv = p.val;                                if ((ph & n) == 0)                                    ln = new Node<K,V>(ph, pk, pv, ln);                                else                                    hn = new Node<K,V>(ph, pk, pv, hn);                            }                            //在nextTable的i地位上插入一个链表                            setTabAt(nextTab, i, ln);                            //在nextTable的i+n的地位上插入另一个链表                            setTabAt(nextTab, i + n, hn);                            //在table的i地位上插入forwardNode节点  示意曾经解决过该节点                            setTabAt(tab, i, fwd);                            //设置advance为true 返回到下面的while循环中 就能够执行i--操作                            advance = true;                        }                        //对TreeBin对象进行解决  与下面的过程相似                        else if (f instanceof TreeBin) {                            TreeBin<K,V> t = (TreeBin<K,V>)f;                            TreeNode<K,V> lo = null, loTail = null;                            TreeNode<K,V> hi = null, hiTail = null;                            int lc = 0, hc = 0;                            //结构正序和反序两个链表                            for (Node<K,V> e = t.first; e != null; e = e.next) {                                int h = e.hash;                                TreeNode<K,V> p = new TreeNode<K,V>                                    (h, e.key, e.val, null, null);                                if ((h & n) == 0) {                                    if ((p.prev = loTail) == null)                                        lo = p;                                    else                                        loTail.next = p;                                    loTail = p;                                    ++lc;                                }                                else {                                    if ((p.prev = hiTail) == null)                                        hi = p;                                    else                                        hiTail.next = p;                                    hiTail = p;                                    ++hc;                                }                            }                            //如果扩容后曾经不再须要tree的构造 反向转换为链表构造                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :                                (hc != 0) ? new TreeBin<K,V>(lo) : t;                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :                                (lc != 0) ? new TreeBin<K,V>(hi) : t;                             //在nextTable的i地位上插入一个链表                                setTabAt(nextTab, i, ln);                            //在nextTable的i+n的地位上插入另一个链表                            setTabAt(nextTab, i + n, hn);                             //在table的i地位上插入forwardNode节点  示意曾经解决过该节点                            setTabAt(tab, i, fwd);                            //设置advance为true 返回到下面的while循环中 就能够执行i--操作                            advance = true;                        }                    }                }            }        }    }

2.6 Put办法

后面的所有的介绍其实都为这个办法做铺垫。ConcurrentHashMap最罕用的就是put和get两个办法。当初来介绍put办法,这个put办法仍然沿用HashMap的put办法的思维,依据hash值计算这个新插入的点在table中的地位i,如果i地位是空的,间接放进去,否则进行判断,如果i地位是树节点,依照树的形式插入新的节点,否则把i插入到链表的开端。ConcurrentHashMap中仍然沿用这个思维,有一个最重要的不同点就是ConcurrentHashMap不容许key或value为null值。另外因为波及到多线程,put办法就要简单一点。在多线程中可能有以下两个状况

  • 如果一个或多个线程正在对ConcurrentHashMap进行扩容操作,以后线程也要进入扩容的操作中。这个扩容的操作之所以能被检测到,是因为transfer办法中在空结点上插入forward节点,如果检测到须要插入的地位被forward节点占有,就帮忙进行扩容;
  • 如果检测到要插入的节点是非空且不是forward节点,就对这个节点加锁,这样就保障了线程平安。只管这个有一些影响效率,然而还是会比hashTable的synchronized要好得多。

整体流程就是首先定义不容许key或value为null的状况放入 对于每一个放入的值,首先利用spread办法对key的hashcode进行一次hash计算,由此来确定这个值在table中的地位。

如果这个地位是空的,那么间接放入,而且不须要加锁操作。

如果这个地位存在结点,阐明产生了hash碰撞,首先判断这个节点的类型。如果是链表节点(fh>0),则失去的结点就是hash值雷同的节点组成的链表的头节点。须要顺次向后遍历确定这个新退出的值所在位置。如果遇到hash值与key值都与新退出节点是统一的状况,则只须要更新value值即可。否则顺次向后遍历,直到链表尾插入这个结点。如果退出这个节点当前链表长度大于8,就把这个链表转换成红黑树。如果这个节点的类型曾经是树节点的话,间接调用树节点的插入方法进行插入新的值。

public V put(K key, V value) {       return putVal(key, value, false);   }   /** Implementation for put and putIfAbsent */   final V putVal(K key, V value, boolean onlyIfAbsent) {     //不容许 key或value为null       if (key == null || value == null) throw new NullPointerException();       //计算hash值       int hash = spread(key.hashCode());       int binCount = 0;       //死循环 何时插入胜利 何时跳出       for (Node<K,V>[] tab = table;;) {           Node<K,V> f; int n, i, fh;           //如果table为空的话,初始化table           if (tab == null || (n = tab.length) == 0)               tab = initTable();           //依据hash值计算出在table外面的地位            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {            //如果这个地位没有值 ,间接放进去,不须要加锁               if (casTabAt(tab, i, null,                            new Node<K,V>(hash, key, value, null)))                   break;                   // no lock when adding to empty bin           }           //当遇到表连接点时,须要进行整合表的操作           else if ((fh = f.hash) == MOVED)               tab = helpTransfer(tab, f);           else {               V oldVal = null;               //结点上锁  这里的结点能够了解为hash值雷同组成的链表的头结点               synchronized (f) {                   if (tabAt(tab, i) == f) {                       //fh〉0 阐明这个节点是一个链表的节点 不是树的节点                       if (fh >= 0) {                           binCount = 1;                           //在这里遍历链表所有的结点                           for (Node<K,V> e = f;; ++binCount) {                               K ek;                               //如果hash值和key值雷同  则批改对应结点的value值                               if (e.hash == hash &&                                   ((ek = e.key) == key ||                                    (ek != null && key.equals(ek)))) {                                   oldVal = e.val;                                   if (!onlyIfAbsent)                                       e.val = value;                                   break;                               }                               Node<K,V> pred = e;                               //如果遍历到了最初一个结点,那么就证实新的节点须要插入 就把它插入在链表尾部                               if ((e = e.next) == null) {                                   pred.next = new Node<K,V>(hash, key,                                                             value, null);                                   break;                               }                           }                       }                       //如果这个节点是树节点,就依照树的形式插入值                       else if (f instanceof TreeBin) {                           Node<K,V> p;                           binCount = 2;                           if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,                                                          value)) != null) {                               oldVal = p.val;                               if (!onlyIfAbsent)                                   p.val = value;                           }                       }                   }               }               if (binCount != 0) {                //如果链表长度曾经达到临界值8 就须要把链表转换为树结构                   if (binCount >= TREEIFY_THRESHOLD)                       treeifyBin(tab, i);                   if (oldVal != null)                       return oldVal;                   break;               }           }       }       //将以后ConcurrentHashMap的元素数量+1       addCount(1L, binCount);       return null;   }   

咱们能够发现JDK8中的实现也是锁拆散的思维,只是锁住的是一个Node,而不是JDK7中的Segment,而锁住Node之前的操作是无锁的并且也是线程平安的,建设在之前提到的3个原子操作上。

2.6.1 helpTransfer办法

这是一个帮助扩容的办法。这个办法被调用的时候,以后ConcurrentHashMap肯定曾经有了nextTable对象,首先拿到这个nextTable对象,调用transfer办法。回看下面的transfer办法能够看到,当本线程进入扩容办法的时候会间接进入复制阶段。

2.6.2 treeifyBin办法

这个办法用于将过长的链表转换为TreeBin对象。然而他并不是间接转换,而是进行一次容量判断,如果容量没有达到转换的要求,间接进行扩容操作并返回;如果满足条件才链表的构造抓换为TreeBin ,这与HashMap不同的是,它并没有把TreeNode间接放入红黑树,而是利用了TreeBin这个小容器来封装所有的TreeNode.

2.7 get办法

get办法比较简单,给定一个key来确定value的时候,必须满足两个条件 key雷同 hash值雷同,对于节点可能在链表或树上的状况,须要别离去查找。

public V get(Object key) {        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;        //计算hash值        int h = spread(key.hashCode());        //依据hash值确定节点地位        if ((tab = table) != null && (n = tab.length) > 0 &&            (e = tabAt(tab, (n - 1) & h)) != null) {            //如果搜寻到的节点key与传入的key雷同且不为null,间接返回这个节点             if ((eh = e.hash) == h) {                if ((ek = e.key) == key || (ek != null && key.equals(ek)))                    return e.val;            }            //如果eh<0 阐明这个节点在树上 间接寻找            else if (eh < 0)                return (p = e.find(h, key)) != null ? p.val : null;             //否则遍历链表 找到对应的值并返回            while ((e = e.next) != null) {                if (e.hash == h &&                    ((ek = e.key) == key || (ek != null && key.equals(ek))))                    return e.val;            }        }        return null;    }

2.8 Size相干的办法

对于ConcurrentHashMap来说,这个table里到底装了多少货色其实是个不确定的数量,因为不可能在调用size()办法的时候像GC的“stop the world”一样让其余线程都停下来让你去统计,因而只能说这个数量是个估计值。对于这个估计值,ConcurrentHashMap也是大费周章才计算出来的。

2.8.1 辅助定义

为了统计元素个数,ConcurrentHashMap定义了一些变量和一个外部类

/**     * A padded cell for distributing counts.  Adapted from LongAdder     * and Striped64.  See their internal docs for explanation.     */    @sun.misc.Contended static final class CounterCell {        volatile long value;        CounterCell(long x) { value = x; }    }      /******************************************/          /**     * 实际上保留的是hashmap中的元素个数  利用CAS锁进行更新     但它并不必返回以后hashmap的元素个数           */    private transient volatile long baseCount;    /**     * Spinlock (locked via CAS) used when resizing and/or creating CounterCells.     */    private transient volatile int cellsBusy;    /**     * Table of counter cells. When non-null, size is a power of 2.     */    private transient volatile CounterCell[] counterCells;
2.8.2 mappingCount与Size办法

mappingCount与size办法的相似 从Java工程师给出的正文来看,应该应用mappingCount代替size办法 两个办法都没有间接返回basecount 而是统计一次这个值,而这个值其实也是一个大略的数值,因而可能在统计的时候有其余线程正在执行插入或删除操作。

public int size() {        long n = sumCount();        return ((n < 0L) ? 0 :                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :                (int)n);    }     /**     * Returns the number of mappings. This method should be used     * instead of {@link #size} because a ConcurrentHashMap may     * contain more mappings than can be represented as an int. The     * value returned is an estimate; the actual count may differ if     * there are concurrent insertions or removals.     *     * @return the number of mappings     * @since 1.8     */    public long mappingCount() {        long n = sumCount();        return (n < 0L) ? 0L : n; // ignore transient negative values    }         final long sumCount() {        CounterCell[] as = counterCells; CounterCell a;        long sum = baseCount;        if (as != null) {            for (int i = 0; i < as.length; ++i) {                if ((a = as[i]) != null)                    sum += a.value;//所有counter的值求和            }        }        return sum;    }
2.8.3 addCount办法

在put办法结尾处调用了addCount办法,把以后ConcurrentHashMap的元素个数+1这个办法一共做了两件事,更新baseCount的值,检测是否进行扩容。

private final void addCount(long x, int check) {        CounterCell[] as; long b, s;        //利用CAS办法更新baseCount的值         if ((as = counterCells) != null ||            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {            CounterCell a; long v; int m;            boolean uncontended = true;            if (as == null || (m = as.length - 1) < 0 ||                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||                !(uncontended =                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {                fullAddCount(x, uncontended);                return;            }            if (check <= 1)                return;            s = sumCount();        }        //如果check值大于等于0 则须要测验是否须要进行扩容操作        if (check >= 0) {            Node<K,V>[] tab, nt; int n, sc;            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&                   (n = tab.length) < MAXIMUM_CAPACITY) {                int rs = resizeStamp(n);                //                if (sc < 0) {                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||                        transferIndex <= 0)                        break;                     //如果曾经有其余线程在执行扩容操作                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))                        transfer(tab, nt);                }                //以后线程是惟一的或是第一个发动扩容的线程  此时nextTable=null                else if (U.compareAndSwapInt(this, SIZECTL, sc,                                             (rs << RESIZE_STAMP_SHIFT) + 2))                    transfer(tab, null);                s = sumCount();            }        }    }

总结

JDK6,7中的ConcurrentHashmap次要应用Segment来实现减小锁粒度,把HashMap宰割成若干个Segment,在put的时候须要锁住Segment,get时候不加锁,应用volatile来保障可见性,当要统计全局时(比方size),首先会尝试屡次计算modcount来确定,这几次尝试中,是否有其余线程进行了批改操作,如果没有,则间接返回size。如果有,则须要顺次锁住所有的Segment来计算。

jdk7中ConcurrentHashmap中,当长度过长碰撞会很频繁,链表的增改删查操作都会耗费很长的工夫,影响性能,所以jdk8 中齐全重写了concurrentHashmap,代码量从原来的1000多行变成了 6000多 行,实现上也和原来的分段式存储有很大的区别。

次要设计上的变动有以下几点:

  • 不采纳segment而采纳node,锁住node来实现减小锁粒度。
  • 设计了MOVED状态 当resize的中过程中 线程2还在put数据,线程2会帮忙resize。
  • 应用3个CAS操作来确保node的一些操作的原子性,这种形式代替了锁。
  • sizeCtl的不同值来代表不同含意,起到了管制的作用。

至于为什么JDK8中应用synchronized而不是ReentrantLock,我猜是因为JDK8中对synchronized有了足够的优化吧。

Reference:

  1. http://www.jianshu.com/p/4806...
  2. https://www.zhihu.com/questio...
  3. http://blog.csdn.net/u0107237...
欢送关注公众号 【码农开花】一起学习成长