关于java:面试为了进阿里死磕了ConcurrentHashMap源码和面试题一

4次阅读

共计 6337 个字符,预计需要花费 16 分钟才能阅读完成。

该系列文章收录在公众号【Ccww 技术博客】,原创技术文章早于博客推出

前言

在平时中汇合应用中,当波及多线程开发时,如果应用 HashMap 可能会导致死锁问题,应用 HashTable 效率又不高。而 ConcurrentHashMap 在放弃同步同时并发效率比拟高,ConcurrentHashmap是最好的抉择,那面试中也会被经常问到,那可能的问题是:

  • ConcurrentHashMap 的实现原理

    • ConcurrentHashMap1.7 和 1.8 的区别?
    • ConcurrentHashMap 应用什么技术来保障线程平安
  • ConcurrentHashMap 的 put()办法

    • ConcurrentHashmap 不反对 key 或者 value 为 null 的起因?
    • put()办法如何实现线程平安呢?
  • ConcurrentHashMap 扩容机制
  • ConcurrentHashMap 的 get 办法是否要加锁,为什么?
  • 其余问题

    • 为什么应用 ConcurrentHashMap
    • ConcurrentHashMap 迭代器是强一致性还是弱一致性?HashMap 呢?
    • JDK1.7 与 JDK1.8 中 ConcurrentHashMap 的区别

ConcurrentHashMap 的实现原理

ConcurrentHashMap 的呈现次要为了解决 hashmap 在并发环境下不平安,JDK1.8ConcurrentHashMap 的设计与实现十分精美,大量的利用了 volatile,CAS 等乐观锁技术来缩小锁竞争对于性能的影响,ConcurrentHashMap 保障线程平安的计划是:

  • JDK1.8:synchronized+CAS+HashEntry+ 红黑树;
  • JDK1.7:ReentrantLock+Segment+HashEntry。

JDK7 ConcurrentHashMap

在 JDK1.7 中 ConcurrentHashMap 由 Segment(分段锁)数组构造和 HashEntry 数组组成,且次要通过 Segment(分段锁)段技术实现线程平安。

Segment 是一种可重入锁,是一种数组和链表的构造,一个 Segment 中蕴含一个 HashEntry 数组,每个 HashEntry 又是一个链表构造,因而在 ConcurrentHashMap 查问一个元素的过程须要进行两次 Hash 操作,如下所示:

  • 第一次 Hash 定位到 Segment,
  • 第二次 Hash 定位到元素所在的链表的头部

正是通过 Segment 分段锁技术,将数据分成一段一段的存储,而后给每一段数据配一把锁,当一个线程占用锁拜访其中一个段数据的时候,其余段的数据也能被其余线程拜访,可能实现真正的并发拜访。

这样构造会使 Hash 的过程要比一般的 HashMap 要长,影响性能,但写操作的时候能够只对元素所在的 Segment 进行加锁即可,不会影响到其余的 Segment,ConcurrentHashMap 晋升了并发能力。

JDK8 ConcurrentHashMap

在 JDK8ConcurrentHashMap 外部机构:数组 + 链表 + 红黑树,Java 8 在链表长度超过肯定阈值 (8) 时将链表(寻址工夫复杂度为 O(N))转换为红黑树(寻址工夫复杂度为 O(long(N))),构造基本上与性能和 JDK8 的 HashMap 一样,只不过 ConcurrentHashMap 保障线程安全性。

但在 JDK1.8 中摒弃了 Segment 分段锁的数据结构,基于 CAS 操作保证数据的获取以及应用 synchronized 关键字对相应数据段加锁来实现线程平安,这进一步提高了并发性。(CAS 原理详情《面试:为了进阿里,又把并发 CAS(Compare and Swap)实现从新精读一遍》)
))

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;  // 应用了 volatile 属性
        volatile Node<K,V> next;  // 应用了 volatile 属性
        ...
    }

ConcurrentHashMap 采纳 Node 类作为根本的存储单元,每个键值对 (key-value) 都存储在一个 Node 中,应用了 volatile 关键字润饰 value 和 next,保障并发的可见性。其中 Node 子类有:

  • ForwardingNode:扩容节点,只是在扩容阶段应用的节点,次要作为一个标记,在解决并发时起着关键作用,有了 ForwardingNodes,也是 ConcurrentHashMap 有了分段的个性,进步了并发效率
  • TreeBin:TreeNode 的代理节点,用于保护 TreeNodes,ConcurrentHashMap 的红黑树寄存的是 TreeBin
  • TreeNode:用于树结构中,红黑树的节点(当链表长度大于 8 时转化为红黑树),此节点不能间接放入桶内,只能是作为红黑树的节点
  • ReservationNode:保留结点

ConcurrentHashMap 中查找元素、替换元素和赋值元素都是基于 sun.misc.Unsafe原子操作 实现 多并发的无锁化 操作。

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {return (Node<K,V>)U.getObjectAcquire(tab, ((long)i << ASHIFT) + ABASE);
    }

    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {U.putObjectRelease(tab, ((long)i << ASHIFT) + ABASE, v);
    }

ConcurrentHashMap 的 put()办法

ConcurrentHashMap 的 put 的流程步骤

  1. 如果 key 或者 value 为 null,则抛出空指针异样,和 HashMap 不同的是 HashMap 单线程是容许为 Null;

    if (key == null || value == null) throw new NullPointerException();

  2. for 的死循环,为了实现 CAS 的无锁化更新,如果 table 为 null 或者 table 的长度为 0,则初始化 table,调用 initTable() 办法(第一次 put 数据,调用默认参数实现,其中重要的 sizeCtl 参数)。

        // 计算索引的第一步,传入键值的 hash 值
        int hash = spread(key.hashCode());
        int binCount = 0; // 保留以后节点的长度
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh; K fk; V fv;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable(); // 初始化 Hash 表
            ...
        }
  3. 确定元素在 Hash 表的索引

    通过 hash 算法能够将元素扩散到哈希桶中。在 ConcurrentHashMap 中通过如下办法确定数组索引:

    第一步:

    static final int spread(int h) {return (h ^ (h >>> 16)) & HASH_BITS; 
        }

    第二步:(length-1) & (h ^ (h >>> 16)) & HASH_BITS);

  4. 通过 tableAt() 办法找到地位 tab[i]Node, 当 Node 为 null 时为没有 hash 抵触的话,应用 casTabAt() 办法 CAS 操作将元素插入到 Hash 表中,ConcurrentHashmap应用 CAS 无锁化操作,这样在高并发 hash 抵触低的状况下,性能良好。

    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                    // 利用 CAS 操作将元素插入到 Hash 表中
                    if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                        break;  // no lock when adding to empty bin(插入 null 的节点,无需加锁)
                }
  5. 当 f 不为 null 时,阐明产生了 hash 抵触,当 f.hash == MOVED==-1 时,阐明 ConcurrentHashmap 正在产生 resize 操作, 应用 helpTransfer() 办法帮忙正在进行 resize 操作。

    else if ((fh = f.hash) == MOVED) //f.hash == -1 
            //hash 为 -1 阐明是一个 forwarding nodes 节点,表明正在扩容
            tab = helpTransfer(tab, f);
  6. 以上状况都不满足的时,应用 synchronized 同步块上锁以后节点Node , 并判断有没有线程对数组进行了批改,如果没有则进行:

    • 遍历该链表并统计该链表长度binCount,查找是否有和 key 雷同的节点,如果有则将查找到节点的 val 值替换为新的 value 值,并返回旧的 value 值,否则依据 key,value,hash 创立新 Node 并将其放在链表的尾部
    • 如果 Node fTreeBin的类型,则应用红黑树的形式进行插入。而后则退出 synchronized(f) 锁住的代码块
    // 以后节点加锁
     synchronized (f) {
     // 判断下有没有线程对数组进行了批改
     if (tabAt(tab, i) == f) {
           // 如果 hash 值是大于等于 0 的阐明是链表
            if (fh >= 0) {
                  binCount = 1;
                  for (Node<K,V> e = f;; ++binCount) {
                        K ek;
                       // 插入的元素键值的 hash 值有节点中元素的 hash 值雷同,替换以后元素的值
                          if (e.hash == hash &&
                               ((ek = e.key) == key ||
                                (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    // 替换以后元素的值
                                    e.val = value;
                                break;
                            }
                         Node<K,V> pred = e;
                          // 如果循环到链表结尾还没发现,那么进行插入操作
                         if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key, value);
                                break;
                               }
                    }
              }else if (f instanceof TreeBin) { // 节点为树
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    // 替换旧值
                                    p.val = value;
                            }
                        }
                        else if (f instanceof ReservationNode)
                            throw new IllegalStateException("Recursive update");
                    }
                }     
  7. 执行完 synchronized(f) 同步代码块之后会先查看binCount, 如果大于等于 TREEIFY_THRESHOLD = 8 则进行 treeifyBin 操作尝试将该链表转换为红黑树。

    if (binCount != 0) {
                  // 如果节点长度大于 8, 转化为树
                  if (binCount >= TREEIFY_THRESHOLD)
                       treeifyBin(tab, i);
                  if (oldVal != null)
                       return oldVal; 
                   break;
             }
  8. 执行了一个 addCount 办法, 次要用于统计数量以及决定是否须要扩容.

    addCount(1L, binCount);

ConcurrentHashmap 不反对 key 或者 value 为 null 的起因?

ConcurrentHashmaphashMap 不同的是,concurrentHashMapkeyvalue都不容许为 null,

因为 concurrenthashmap 它们是用于多线程的,并发的,如果 map.get(key) 失去了 null,不能判断到底是映射的 value 是 null, 还是因为没有找到对应的 key 而为空,

而用于单线程状态的 hashmap 却能够用containKey(key) 去判断到底是否蕴含了这个 null。

put()办法如何实现线程平安呢?

  1. 在第一次 put 数据时,调用 initTable() 办法
 /**  
 * Hash 表的初始化和调整大小的管制标记。为正数,Hash 表正在初始化或者扩容;  
 * (- 1 示意正在初始化,- N 示意有 N - 1 个线程在进行扩容)  
 * 否则,当表为 null 时,保留创立时应用的初始化大小或者默认 0;  
 * 初始化当前保留下一个调整大小的尺寸。*/  
 private transient volatile int sizeCtl;  
     // 第一次 put,初始化数组  
     private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;  
         while ((tab = table) == null || tab.length == 0) {  
             // 如果曾经有别的线程在初始化了,这里期待一下  
             if ((sc = sizeCtl) < 0)  
             Thread.yield(); // lost initialization race; just spin  
             //-1 示意正在初始化  
             else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {...} finally {sizeCtl = sc;}  
            break;  
         }  
     }  
     return tab;  
 }

应用 sizeCtl 参数作为管制标记的作用,当在从插入元素时,才会初始化 Hash 表。在开始初始化的时候,

  • 首先判断 sizeCtl 的值,如果 sizeCtl < 0,阐明 有线程在初始化 以后线程便放弃初始化操作 。否则,将SIZECTL 设置为 -1Hash 表进行初始化
  • 初始化胜利当前,将 sizeCtl 的值设置为以后的容量值
  1. 在不存在 hash 抵触的时
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {  
     // 利用 CAS 操作将元素插入到 Hash 表中  
     if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))  
     break;  // no lock when adding to empty bin(插入 null 的节点,无需加锁)  
 }

(f = tabAt(tab, i = (n - 1) & hash)) == null中应用 tabAt 原子操作获取数组,并利用casTabAt(tab, i, null, new Node<K,V>(hash, key, value))CAS 操作将元素插入到 Hash 表中

  1. 在存在 hash 抵触时,先把以后节点应用关键字 synchronized 加锁,而后再应用 tabAt() 原子操作判断下有没有线程对数组进行了批改,最初再进行其余操作。

为什么要锁住更新操作的代码块?

因为产生了哈希抵触,以后线程正在 f 所在的链表上进行更新操作,如果此时另外一个线程也须要到这个链表上进行更新操作,则须要期待以后线程更新完后再执行

// 以后节点加锁  
synchronized (f) {  
     // 这里判断下有没有线程对数组进行了批改  
     if (tabAt(tab, i) == f) {......//do something}
}

因为篇幅过于长,分成两局部来讲讲,接下来的内容请看[《面试:为了进阿里,死磕了 ConcurrentHashMap 源码和面试题(二)》]()

各位看官还能够吗?喜爱的话,动动手指导个????,点个关注呗!!谢谢反对
欢送关注公众号【Ccww 技术博客】,原创技术文章第一工夫推出

正文完
 0