关于java8:ConcurrentHashMap源码深度分析

25次阅读

共计 7378 个字符,预计需要花费 19 分钟才能阅读完成。

1、根本变量

// Unsafe mechanics
//Unsafe 类是用来做 cas 操作的,都是 native 办法,代码由 C ++ 实现,上面的变量示意对应变量的偏移
// 例如:public final native boolean compareAndSwapInt(Object o, long offset,
                                              int expected,
                                              int x);
// 示意对象 o 偏移地位 offset 的变量如果和期望值 expected 相等,把变量值设置为 x

private static final sun.misc.Unsafe U;
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long BASECOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final long ABASE;
private static final int ASHIFT;

1、table 初始化

1、table 会提早到第一次 put 时初始化,同过应用循环 +CAS 的套路,能够保障一次只有一个线程会初始化 table。
2、在 table 为空的时候如果 sizeCtl 小于 0,则阐明曾经有线程开始初始化了,其它线程通过 Thread.yield()让出 CPU 工夫片,期待 table 非空即可。
3、否则应用 CAS 将 sizeCtl 的值换为 -1,置换胜利则初始化 table。
4、留神 table 的大小为 sizeCtl,初始化后将 sizeCtl 的值设为 n – (n >>> 2)即 0.75n,这个值用来确定是否须要为 table 扩容。

//Initializes table, using the size recorded in sizeCtl.
private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        // 判断是否曾经有线程在初始化,如果有则让出 CPU,之后持续自旋判断
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
         //cas 操作设置 sizeCtl 的值
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                // 持续判断双重查看
                if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    // 初始化 table
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);// 设置 sizeCtl=0.75n
                }
            } finally {sizeCtl = sc;}
            break;
        }
    }
    return tab;
}

2、put 操作(putVal 办法)

1、首先计算 key 的 hash 值
2、判断 table 是否为空,如果是就初始化
3、依据 hash 值取余确定桶的地位,并判断桶是否为空,如果是空,通过 cas 操作设置进去
4、如果桶的第一个节点非空,并且 hash=MOVED,阐明有线程正在进行扩容,调用 helpTransfer 帮忙扩容
5、对桶加锁并判断节点是链表还是树,依据不同状况插入节点
6、判断是否达到链表转树的阈值
7、统计节点数

final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();
    //1、首先计算 key 的 hash 值
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //2、判断 table 是否为空,如果是就初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
            //3、依据 hash 值取余确定桶的地位,并判断桶是否为空,如果是空
            // 通过 cas 操作设置进去
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //4、如果桶的第一个节点非空,并且 hash=MOVED,阐明有线程正在进行扩容,// 调用 helpTransfer 帮忙扩容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            //5、对桶加锁并判断节点是链表还是树,依据不同状况插入节点
            synchronized (f) {if (tabAt(tab, i) == f) {// 保障多线程环境下的安全性,再一次查看
                    if (fh >= 0) {//hash 值大于 0,阐明是一个链表
                        binCount = 1;// 记录链表的元素个数
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 判断 key 是否已在链表中,如果是间接替换 value
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            // 否则阐明是一个新 Key,插入表尾
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    // 不是链表节点就是树节点,调用树结点插入方法
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                //6、判断是否达到链表转树的阈值
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //7、统计节点数
    addCount(1L, binCount);
    return null;
}

3、链表转树

1、判断链表节点是否小于 64,如果是优先应用扩容
2、对桶加锁
3、顺次把链表节点汇合转成树结点汇合
4、把树结点汇合转成红黑树,这颗红黑树是链表和树结构的联合,它同样通过 next 援用串成了一个链表

private final void treeifyBin(Node<K,V>[] tab, int index) {// 将 tab[index]链表转树
    Node<K,V> b; int n, sc;
    if (tab != null) {
        //1、判断链表节点是否小于 64,如果是优先应用扩容
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
            // 为了多线程环境下的安全性,再次判断该桶是否为空,hash>0 示意是链表
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {synchronized (b) {//2、对桶加锁
                if (tabAt(tab, index) == b) {// 加锁后再次判断
                    TreeNode<K,V> hd = null, tl = null;
                    //3、顺次把链表节点汇合转成树结点
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    //4、把树结点汇合转成红黑树
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

4、扩容

1、因为扩容容许并发操作,首先通过 CPU 数量,计算容许每个线程能解决的桶的数量 stride,起码每个线程解决 16 个
2、申请 nextTable 数组
3、把 nextTab 设置为 ForwardingNode, 示意正在进行扩容操作
4、每个线程进来的线程去获取本人能解决区域, 每个线程通过 cas 操作设置 transferIndex 变量,设置胜利则线程取得 table 中(transferIndex, transferIndex-strade)这一段的解决权限
相似下图这样的,反向调配的,先来的线程从前面获取桶段

5、循环顺次解决每一个桶:
5.1 如果以后桶首节点是链表,把链表节点重新分配到两个桶,一部分在原地位 i,一分部在 i + n 地位
5.2 如果是红黑树,这里有点意思,同样是通过一个 for 像链表一样遍历,分成两局部,一个将放在 i 地位,一个将放在 i + n 地位。

  • 那么问题来了,这里既然是树,为什么能够像链表一样操作呢?

    这里是作者设计很奇妙的中央,拜服 Doug lea 大神,树的遍历十分不不便,而链表遍历很不便。TreeNode 自身继承了 Node,有一个 next 指针。当把链表节点数超过 8,转成红黑树的时候,next 指针不变,链表构造没有毁坏,所以这颗树自身是一个树和链表的结合体,既能够当链表用又能够当树用 (这个设计有点相似 LinkedHashMap,即时一个 hashMap 又是一个链表)。
    相似下图这样的构造

    这里被分成两个局部后,再依据这两个局部是否达到了树转列表的阈 UNTREEIFY_THRESHOLD= 6 确定是转成链表还是组织成红黑树。

    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
      int n = tab.length, stride;
      // 取 CPU 的数量,确定每个线程迁徙的 Node 的数量,确保不会少于 MIN_TRANSFER_STRIDE=16 个
      if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
          stride = MIN_TRANSFER_STRIDE; // subdivide range
      // 申请数组 nextTable
      if (nextTab == null) {            // initiating
          try {@SuppressWarnings("unchecked")
              Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
              nextTab = nt;
          } catch (Throwable ex) {      // try to cope with OOME
              sizeCtl = Integer.MAX_VALUE;
              return;
          }
          nextTable = nextTab;
          transferIndex = n;
      }
      int nextn = nextTab.length;
      // 把 nextTab 设置为 ForwardingNode, 示意正在进行扩容操作
      ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
      boolean advance = true;
      boolean finishing = false; // to ensure sweep before committing nextTab
      for (int i = 0, bound = 0;;) {
          Node<K,V> f; int fh;
          while (advance) {
              int nextIndex, nextBound;
              if (--i >= bound || finishing)
                  advance = false;
              else if ((nextIndex = transferIndex) <= 0) {
                  i = -1;
                  advance = false;
              }
              /**
              nextIndex 从 n 开始
               这里每个线程进来的时候通过 cas 操作设置 transferIndex 变量,如果胜利示意该线程解决 nextIndex-stride 这一段桶的迁徙
               即 i=[nextIndex-1, nextIndex-stride] 这一段桶
              */
            // 不相等,阐明不到最初一个线程,间接退出 transfer 办法
              else if (U.compareAndSwapInt
                       (this, TRANSFERINDEX, nextIndex,
                        nextBound = (nextIndex > stride ?
                                     nextIndex - stride : 0))) {
                  bound = nextBound;
                  i = nextIndex - 1;
                  advance = false;
              }
          }
          // 判断是否解决实现所有的桶
          if (i < 0 || i >= n || i + n >= nextn) {
              int sc;
              if (finishing) {
                  nextTable = null;
                  table = nextTab;
                  sizeCtl = (n << 1) - (n >>> 1);
                  return;
              }
              if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                      return;
                  finishing = advance = true;
                  i = n; // recheck before commit
              }
          }
          else if ((f = tabAt(tab, i)) == null)
              advance = casTabAt(tab, i, null, fwd);
          else if ((fh = f.hash) == MOVED)
              advance = true; // already processed
          else {synchronized (f) {if (tabAt(tab, i) == f) {
                      Node<K,V> ln, hn;
                      //hash>0 示意链表, 链表节点重新分配到两个桶,一部分在原地位 i,一分部在 i + n 地位
                      if (fh >= 0) {
                          int runBit = fh & n;
                          Node<K,V> lastRun = f;
                          // 找到低位和高位链表的各自的尾节点,配合前面用头插法
                          for (Node<K,V> p = f.next; p != null; p = p.next) {
                              int b = p.hash & n;
                              if (b != runBit) {
                                  runBit = b;
                                  lastRun = p;
                              }
                          }
                          if (runBit == 0) {
                              ln = lastRun;
                              hn = null;
                          }
                          else {
                              hn = lastRun;
                              ln = null;
                          }
                          // 循环将链表拆分成两个链表,头插法
                          for (Node<K,V> p = f; p != lastRun; p = p.next) {
                              int ph = p.hash; K pk = p.key; V pv = p.val;
                              if ((ph & n) == 0)
                                  ln = new Node<K,V>(ph, pk, pv, ln);
                              else
                                  hn = new Node<K,V>(ph, pk, pv, hn);
                          }
                          // 将拆分后的列表通过 cas 操作设置为对应的桶中
                          setTabAt(nextTab, i, ln);
                          setTabAt(nextTab, i + n, hn);
                          setTabAt(tab, i, fwd);
                          advance = true;
                      }
                      // 该桶存储的是红黑树
                      else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;
                          TreeNode<K,V> lo = null, loTail = null;
                          TreeNode<K,V> hi = null, hiTail = null;
                          int lc = 0, hc = 0;
                          // 这里的代码比拟有意思
                          // 因为这里的红黑树是一个树和链表的结合体,TreeNode 同时也是一个 Node
                          // 所以这里能够像链表一样遍历,应用这种构造是为了不便遍历
                          // 顺次取出树的节点,依据节点 hash 的后果分到位置和高位链表
                          // 前面把链表转成树,同时会放弃链表的构造
                          for (Node<K,V> e = t.first; e != null; e = e.next) {
                              int h = e.hash;
                              TreeNode<K,V> p = new TreeNode<K,V>
                                  (h, e.key, e.val, null, null);
                              if ((h & n) == 0) {if ((p.prev = loTail) == null)
                                      lo = p;
                                  else
                                      loTail.next = p;
                                  loTail = p;
                                  ++lc;
                              }
                              else {if ((p.prev = hiTail) == null)
                                      hi = p;
                                  else
                                      hiTail.next = p;
                                  hiTail = p;
                                  ++hc;
                              }
                          }
                          // 把树拆分成了低位(放在地位 i)和高位 (地位 i +n) 链表,这里判断两个链表长度是否达到转成树的阈值
                          ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                              (hc != 0) ? new TreeBin<K,V>(lo) : t;
                          hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                              (lc != 0) ? new TreeBin<K,V>(hi) : t;
                          // 拆分后的构造设置到相应的桶中
                          setTabAt(nextTab, i, ln);
                          setTabAt(nextTab, i + n, hn);
                          setTabAt(tab, i, fwd);
                          advance = true;
                      }
                  }
              }
          }
      }
    }

5、统计元素数量

1、简略通过 CouterCell 数组的和,因为可能有其余线程在进行增删操作,这个数据并不是准确的。

final long sumCount() {CounterCell[] as = counterCells; CounterCell a;
   long sum = baseCount;
   if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)
               sum += a.value;
       }
   }
   return sum;
}

正文完
 0