一、有AtomicLong为什么还须要LongAdder/LongAccumulator?

大家对AtomicLong应该比拟相熟(如果未接触过,请翻看另一篇博客,通俗易懂的AtomicLong源码分析),但JDK1.8为什么又新增了LongAdder/LongAccumulator2个类?AtomicLong不够用吗?答案:次要是基于性能思考。AtomicLong的incrementAndGet()办法在高并发场景下,多个线程竞争批改共享资源value,会造成循环耗时过长,进而导致性能问题,上面贴出源码来解说这个问题:

public final long incrementAndGet() {    return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;}

其中,unsafe.getAndAddLong办法如下:

public final long getAndAddLong(Object var1, long var2, long var4) {    long var6; do {        var6 = this.getLongVolatile(var1, var2); } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4)); return var6;}

能够看到,多个线程在竞争批改共享资源value值时,是在一个循环外面,高并发状况下,同一时刻只有一个线程CAS操作胜利,其余大多数线程CAS失败,从而处于一直循环重试的场景,因而对性能造成影响。

二、那为什么LongAdder/LongAccumulator为什么能晋升性能呢?它底层是怎么实现的呢?用了什么数据结构呢?

因为LongAccumulator是LongAdder的性能扩大,底层原理差不多,在此以LongAdder原理来阐明。
首先看LongAdder的类构造:

public class LongAdder extends Striped64 implements Serializable {}

LongAdder继承了Striped64,真正发挥作用的是这个Striped64类,来看看它的类构造:

/** * A package-local class holding common representation and mechanics * for classes supporting dynamic striping on 64bit values. The class * extends Number so that concrete subclasses must publicly do so. */@SuppressWarnings("serial")abstract class Striped64 extends Number {}

接下来看它的重要属性有哪些:

/** * Table of cells. When non-null, size is a power of 2. */ /*晋升性能发挥作用的Cell数组,核心思想是通过多个线程在对应本人的Cell进行累加,从而缩小竞争*/ transient volatile Cell[] cells;/** * Base value, used mainly when there is no contention, but also as * a fallback during table initialization races. Updated via CAS. */ /*多个线程没有产生竞争的时候,值累加在base上,这与AtomicLong的value作用是一样的*/ transient volatile long base;/** * Spinlock (locked via CAS) used when resizing and/or creating Cells. */ /*当Cells数组初始化,创立元素或者扩容的时候为1,否则为0*/ transient volatile int cellsBusy;

可能不少同学对Cell感到不解,其实很简略,关上源码就晓得到底了

/*@Contended注解是JDK1.8提供的字节填充形式,解决伪共享问题,可翻看另一篇博客:**什么是伪共享***/@sun.misc.Contended static final class Cell {    volatile long value;    Cell(long x) { value = x; }       final boolean cas(long cmp, long val) {          return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);    } }

它是Striped64的外部类,外面有个volatile润饰的value值,也是通过cas操作批改它的值,LongAdder计数器的值就是所有Cell[]的value和再加上base的值。
对数据结构有了大抵理解后,再来看外面的罕用要害办法:

public void increment() {    add(1L);}
public void decrement() {    add(-1L);}

能够看到,递增递加都调用了add()办法,可见它是实现的外围。往里看:

public void add(long x) {    Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) {        boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 ||            (a = as[getProbe() & m]) == null ||            !(uncontended = a.cas(v = a.value, v + x)))            longAccumulate(x, null, uncontended); }}

先来看第一个if分支if ((as = cells) != null || !casBase(b = base, b + x))
因为初始时cells为空,第一次调用add()办法的话,(as = cells) != null不成立,转向!casBase(b = base, b + x),关上外面代码很简略,就是对base值进行CAS批改,后面说过,没有竞争的时候批改的是base值,产生竞争的时候Cellp[]才起作用。

final boolean casBase(long cmp, long val) {    return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);}

如果casBase返回true,示意该线程批改胜利,完结;
如果casBase返回false,示意该线程批改失败,产生了竞争,进入外面的if条件

if (as == null || (m = as.length - 1) < 0 ||    (a = as[getProbe() & m]) == null ||    !(uncontended = a.cas(v = a.value, v + x)))

咋看有点简单,有4个分支,不焦急,一个个来看。
第一个和第二个就是判断Cell[]有没有初始化,且元素不为空。
第三个和第四个就是在Cell[]已初始化的前提下,定位出以后线程应该对应的Cell元素,并尝试CAS批改外面的value值,给它加x,如果不胜利,进入外面的longAccumulate(x, null, uncontended);
进入之前,可能有同学对uncontended和getProbe() & m有疑难。
uncontendted,翻译过去是"未产生过竞争的"意思,外面的办法会用到这个标记;而getProbe()返回的是Thread类threadLocalRandomProbe属性的值,它在ThreadLocalRandom外面发挥作用,另一篇博客有解说,ThreadLocalRandom原理分析。在这里咱们能够把它了解成HashMap的哈希值h,而后与m=as.length - 1进行与操作,其实等效于h % as.length,即找到对应的地位,是不是和HashMap定位元素地位很相似?

static final int getProbe() {    return UNSAFE.getInt(Thread.currentThread(), PROBE);}
Class<?> tk = Thread.class;PROBE = UNSAFE.objectFieldOffset    (tk.getDeclaredField("threadLocalRandomProbe"));

当初咱们能够进入longAccumulate(x, null, uncontended);办法了,关上一看,你kin你ca,这么简单,失望了有没有?别急,急躁缓缓剖析!

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {    int h;    // 如果没有初始化    if ((h = getProbe()) == 0) {        // current()外面会初始化probe值        ThreadLocalRandom.current(); // force initialization        // 从新获取probe值        h = getProbe();        // 还未初始化,必定没有产生竞争        wasUncontended = true;      }    // 是否产生碰撞,即多个线程hash到同一个Cell元素地位    boolean collide = false; // True if last slot nonempty    for (;;) {        Cell[] as; Cell a; int n; long v;        // 如果cells数组曾经初始化        if ((as = cells) != null && (n = as.length) > 0) {            // hash到的数组元素地位为空            if ((a = as[(n - 1) & h]) == null) {                            if (cellsBusy == 0) {       // Try to attach new Cell                      Cell r = new Cell(x); // Optimistically create                      // 尝试获取锁                      if (cellsBusy == 0 && casCellsBusy())                       {                        boolean created = false;                        try {               // Recheck under lock                          Cell[] rs; int m, j;                                     // 再次查看该地位元素是否为空                          if ((rs = cells) != null &&                                (m = rs.length) > 0 &&                                rs[j = (m - 1) & h] == null) {                                // 将新生成的元素Cell(x)放在该地位上                                rs[j] = r;                                created = true;                               }                        } finally {                        // 开释锁                            cellsBusy = 0;                    }                         if (created)                        // (1)创立胜利,退出循环                            break;                            // 创立不胜利,下一轮循环重试                      continue; // Slot is now non-empty                 }                }                // 该地位元素为空,则没有产生碰撞                collide = false;             }             // 对应里面add()办法的第四个条件,即该地位元素不为空,且cas失败了             // 重置wasUncontended,通过上面的advanceProbe()从新hash,找到新的地位进行下一轮重试             // 之所以重置wasUncontended,是为了下一轮重试时走上面cas分支,尝试对该地位元素进行值的批改            else if (!wasUncontended)       // CAS already known to fail wasUncontended = true; // Continue after rehash // 第N(N > 1)轮重试,尝试对该地位元素进行值的批改, else if (a.cas(v = a.value, ((fn == null) ? v + x :                                         fn.applyAsLong(v, x))))                // (2)批改胜利退出循环                break; // 如果数组元素达到CPU个数或者曾经被扩容了,则从新hash下一轮重试 else if (n >= NCPU || cells != as)                collide = false; // At max size or stale                // 以上条件都不满足,则产生了碰撞,且竞争失败了 else if (!collide)                collide = true;                // 碰撞竞争失败时,则去尝试获取锁去扩容Cell数组 else if (cellsBusy == 0 && casCellsBusy()) {                try {                    if (cells == as) {      // Expand table unless stale                      // 扩容为原来的2倍                      Cell[] rs = new Cell[n << 1];                      // 拷贝旧数组元素到新数组中                      for (int i = 0; i < n; ++i)                            rs[i] = as[i];                      cells = rs;                    }                } finally {                // 开释所                    cellsBusy = 0;           } // 扩容胜利,则重置collide,示意我有新的地位去重试了,不跟你抢这个地位了                collide = false; continue; // Retry with expanded table }            // 产生新的hash值,尝试去找别的数组地位            h = advanceProbe(h); }        // Cell[]为空,对应里面add()的第一二个条件,则尝试获取锁去初始化数组        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {            boolean init = false;                try {                           // Initialize table                 if (cells == as) {                   // 初始化大小为2                    Cell[] rs = new Cell[2];                    // 将Cell(x)放在0或1号地位上                    rs[h & 1] = new Cell(x);                    cells = rs;                    init = true;                }            } finally {            // 开释锁                cellsBusy = 0; } // (3)初始化胜利,退出循环            if (init)                break; } // 有别的线程正在初始化数组,则尝试累加在base变量上        else if (casBase(v = base, ((fn == null) ? v + x :                                    fn.applyAsLong(v, x))))                                    // (4)胜利则退出循环            break; // Fall back on using base }}

由下面代码能够看出,这个办法逻辑相当简单,再来总结梳理下,能够从下面正文标记的4处退出循环的条件来看:
(1) Cell[]不为空,hash到的地位元素为空,那么就创立元素,并赋值为x,胜利的话能够退出循环;
(2) Cell[]不为空,hash到的地位元素不为空,且上一轮cas批改失败了,这轮重试如果胜利,能够退出循环;
(3) Cell[]为空,那么尝试初始化数组,并把x赋值到0或1号地位上,胜利的话能够退出循环;
(4) Cell[]为空,且有其余线程在初始化数组,那么尝试累加到base上,胜利的话能够退出循环;
其余条件都是须要通过advanceProbe()进行rehash到其余地位,进行下一轮重试

三、总结

总结之前顺便提下LongAccumulator,它是把LongAdder的(v + x)操作换成一个LongBinaryOperator,即用户能够自定义累加操作的逻辑,其余中央都是一样的

public LongAccumulator(LongBinaryOperator accumulatorFunction, long identity) {    this.function = accumulatorFunction; base = this.identity = identity;}

整个LongAdder的源码剖析就到这里完结了,其实JDK也提供了double类型的DoubleAdder和DoubleAccumulator,他们都继承了Striped64,原理是大同小异的,有趣味的同学能够本人去看看源码。
对于平时开发如何抉择AtomicLong,置信大家也很分明了,并发不高的状况下用AtomicLong就行,并发很高的状况下就要抉择LongAdder或者LongAccumulator了!
最初在别的中央看到一张图,能够更好的帮忙咱们了解原理,放在这里给大家看看。
ps:第一次写技术博客,形容地不精确的中央,心愿大家容纳,也能够指出来,共同进步!