该系列文章收录在公众号【Ccww技术博客】,原创技术文章早于博客推出
前言
在平时中汇合应用中,当波及多线程开发时,如果应用HashMap
可能会导致死锁问题,应用HashTable
效率又不高。而ConcurrentHashMap
在放弃同步同时并发效率比拟高,ConcurrentHashmap
是最好的抉择,那面试中也会被经常问到,那可能的问题是:
ConcurrentHashMap的实现原理
- ConcurrentHashMap1.7和1.8的区别?
- ConcurrentHashMap应用什么技术来保障线程平安
ConcurrentHashMap的put()办法
- ConcurrentHashmap 不反对 key 或者 value 为 null 的起因?
- put()办法如何实现线程平安呢?
- ConcurrentHashMap扩容机制
- ConcurrentHashMap的get办法是否要加锁,为什么?
其余问题
- 为什么应用ConcurrentHashMap
- ConcurrentHashMap迭代器是强一致性还是弱一致性?HashMap呢?
- JDK1.7与JDK1.8中ConcurrentHashMap的区别
ConcurrentHashMap的实现原理
ConcurrentHashMap的呈现次要为了解决hashmap在并发环境下不平安,JDK1.8ConcurrentHashMap的设计与实现十分精美,大量的利用了volatile,CAS等乐观锁技术来缩小锁竞争对于性能的影响,ConcurrentHashMap保障线程平安的计划是:
- JDK1.8:synchronized+CAS+HashEntry+红黑树;
- JDK1.7:ReentrantLock+Segment+HashEntry。
JDK7 ConcurrentHashMap
在JDK1.7中ConcurrentHashMap由Segment(分段锁)数组构造和HashEntry数组组成,且次要通过Segment(分段锁)段技术实现线程平安。
Segment是一种可重入锁,是一种数组和链表的构造,一个Segment中蕴含一个HashEntry数组,每个HashEntry又是一个链表构造,因而在ConcurrentHashMap查问一个元素的过程须要进行两次Hash操作,如下所示:
- 第一次Hash定位到Segment,
- 第二次Hash定位到元素所在的链表的头部
正是通过Segment分段锁技术,将数据分成一段一段的存储,而后给每一段数据配一把锁,当一个线程占用锁拜访其中一个段数据的时候,其余段的数据也能被其余线程拜访,可能实现真正的并发拜访。
这样构造会使Hash的过程要比一般的HashMap要长,影响性能,但写操作的时候能够只对元素所在的Segment进行加锁即可,不会影响到其余的Segment,ConcurrentHashMap晋升了并发能力。
JDK8 ConcurrentHashMap
在JDK8ConcurrentHashMap外部机构:数组+链表+红黑树,Java 8在链表长度超过肯定阈值(8)时将链表(寻址工夫复杂度为O(N))转换为红黑树(寻址工夫复杂度为O(long(N))),构造基本上与性能和JDK8的HashMap一样,只不过ConcurrentHashMap保障线程安全性。
但在JDK1.8中摒弃了Segment分段锁的数据结构,基于CAS操作保证数据的获取以及应用synchronized关键字对相应数据段加锁来实现线程平安,这进一步提高了并发性。(CAS原理详情《面试:为了进阿里,又把并发CAS(Compare and Swap)实现从新精读一遍》)
))
static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; //应用了volatile属性 volatile Node<K,V> next; //应用了volatile属性 ... }
ConcurrentHashMap采纳Node类作为根本的存储单元,每个键值对(key-value)都存储在一个Node中,应用了volatile关键字润饰value和next,保障并发的可见性。其中Node子类有:
- ForwardingNode:扩容节点,只是在扩容阶段应用的节点,次要作为一个标记,在解决并发时起着关键作用,有了ForwardingNodes,也是ConcurrentHashMap有了分段的个性,进步了并发效率
- TreeBin:TreeNode的代理节点,用于保护TreeNodes,ConcurrentHashMap的红黑树寄存的是TreeBin
- TreeNode:用于树结构中,红黑树的节点(当链表长度大于8时转化为红黑树),此节点不能间接放入桶内,只能是作为红黑树的节点
- ReservationNode:保留结点
ConcurrentHashMap中查找元素、替换元素和赋值元素都是基于sun.misc.Unsafe
中原子操作实现多并发的无锁化操作。
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectAcquire(tab, ((long)i << ASHIFT) + ABASE); } static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectRelease(tab, ((long)i << ASHIFT) + ABASE, v); }
ConcurrentHashMap的put()办法
ConcurrentHashMap的put的流程步骤
- 如果key或者value为null,则抛出空指针异样,和HashMap不同的是HashMap单线程是容许为Null;
if (key == null || value == null) throw new NullPointerException();
for的死循环,为了实现CAS的无锁化更新,如果table为null或者table的长度为0,则初始化table,调用
initTable()
办法(第一次put数据,调用默认参数实现,其中重要的sizeCtl
参数)。//计算索引的第一步,传入键值的hash值 int hash = spread(key.hashCode()); int binCount = 0; //保留以后节点的长度 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; K fk; V fv; if (tab == null || (n = tab.length) == 0) tab = initTable(); //初始化Hash表 ... }
确定元素在Hash表的索引
通过hash算法能够将元素扩散到哈希桶中。在ConcurrentHashMap中通过如下办法确定数组索引:
第一步:
static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
第二步:
(length-1) & (h ^ (h >>> 16)) & HASH_BITS);
通过
tableAt()
办法找到地位tab[i]
的Node
,当Node为null时为没有hash
抵触的话,应用casTabAt()
办法CAS
操作将元素插入到Hash
表中,ConcurrentHashmap
应用CAS
无锁化操作,这样在高并发hash
抵触低的状况下,性能良好。else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //利用CAS操作将元素插入到Hash表中 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value))) break; // no lock when adding to empty bin(插入null的节点,无需加锁) }
当f不为null时,阐明产生了hash抵触,当f.hash == MOVED==-1 时,阐明
ConcurrentHashmap
正在产生resize
操作,应用helpTransfer()
办法帮忙正在进行resize操作。else if ((fh = f.hash) == MOVED) //f.hash == -1 //hash为-1 阐明是一个forwarding nodes节点,表明正在扩容 tab = helpTransfer(tab, f);
以上状况都不满足的时,应用
synchronized
同步块上锁以后节点Node
,并判断有没有线程对数组进行了批改,如果没有则进行:- 遍历该链表并统计该链表长度
binCount
,查找是否有和key雷同的节点,如果有则将查找到节点的val值替换为新的value值,并返回旧的value值,否则依据key,value,hash创立新Node并将其放在链表的尾部 - 如果
Node f
是TreeBin
的类型,则应用红黑树的形式进行插入。而后则退出synchronized(f)
锁住的代码块
//以后节点加锁 synchronized (f) { //判断下有没有线程对数组进行了批改 if (tabAt(tab, i) == f) { //如果hash值是大于等于0的阐明是链表 if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; //插入的元素键值的hash值有节点中元素的hash值雷同,替换以后元素的值 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) //替换以后元素的值 e.val = value; break; } Node<K,V> pred = e; //如果循环到链表结尾还没发现,那么进行插入操作 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value); break; } } }else if (f instanceof TreeBin) { //节点为树 Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) //替换旧值 p.val = value; } } else if (f instanceof ReservationNode) throw new IllegalStateException("Recursive update"); } }
- 遍历该链表并统计该链表长度
执行完
synchronized(f)
同步代码块之后会先查看binCount
,如果大于等于TREEIFY_THRESHOLD = 8则进行treeifyBin操作尝试将该链表转换为红黑树。if (binCount != 0) { //如果节点长度大于8,转化为树 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; }
执行了一个
addCount
办法,次要用于统计数量以及决定是否须要扩容.addCount(1L, binCount);
ConcurrentHashmap 不反对 key 或者 value 为 null 的起因?
ConcurrentHashmap
和hashMap
不同的是,concurrentHashMap
的key
和value
都不容许为null,
因为concurrenthashmap
它们是用于多线程的,并发的 ,如果map.get(key)
失去了null,不能判断到底是映射的value是null,还是因为没有找到对应的key而为空,
而用于单线程状态的hashmap
却能够用containKey(key)
去判断到底是否蕴含了这个null。
put()办法如何实现线程平安呢?
- 在第一次put数据时,调用
initTable()
办法
/** * Hash表的初始化和调整大小的管制标记。为正数,Hash表正在初始化或者扩容; * (-1示意正在初始化,-N示意有N-1个线程在进行扩容) * 否则,当表为null时,保留创立时应用的初始化大小或者默认0; * 初始化当前保留下一个调整大小的尺寸。 */ private transient volatile int sizeCtl; //第一次put,初始化数组 private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { //如果曾经有别的线程在初始化了,这里期待一下 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin //-1 示意正在初始化 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { ... } finally { sizeCtl = sc; } break; } } return tab; }
应用sizeCtl
参数作为管制标记的作用,当在从插入元素时,才会初始化Hash表。在开始初始化的时候,
- 首先判断
sizeCtl
的值,如果sizeCtl < 0,阐明有线程在初始化,以后线程便放弃初始化操作。否则,将SIZECTL
设置为-1,Hash表进行初始化。 - 初始化胜利当前,将
sizeCtl
的值设置为以后的容量值
- 在不存在hash抵触的时
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //利用CAS操作将元素插入到Hash表中 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value))) break; // no lock when adding to empty bin(插入null的节点,无需加锁) }
(f = tabAt(tab, i = (n - 1) & hash)) == null
中应用tabAt原子操作获取数组,并利用casTabAt(tab, i, null, new Node<K,V>(hash, key, value))
CAS操作将元素插入到Hash表中
- 在存在hash抵触时,先把以后节点应用关键字
synchronized
加锁,而后再应用tabAt()
原子操作判断下有没有线程对数组进行了批改,最初再进行其余操作。
为什么要锁住更新操作的代码块?
因为产生了哈希抵触,以后线程正在f所在的链表上进行更新操作,如果此时另外一个线程也须要到这个链表上进行更新操作,则须要期待以后线程更新完后再执行
//以后节点加锁 synchronized (f) { //这里判断下有没有线程对数组进行了批改 if (tabAt(tab, i) == f) { ......//do something }}
因为篇幅过于长,分成两局部来讲讲,接下来的内容请看[《面试:为了进阿里,死磕了ConcurrentHashMap源码和面试题(二)》]()
各位看官还能够吗?喜爱的话,动动手指导个????,点个关注呗!!谢谢反对
欢送关注公众号【Ccww技术博客】,原创技术文章第一工夫推出