简述

LongAdder是JDK8新增的一个用于高并发场景下进行统计的类。以前AtomicInteger、AtomicLong等通过应用CAS的形式来更新变量,比synchronized这些阻塞算法领有更好的性能。然而在高并发状况下,大量线程同时去更新一个变量,任意一个工夫点只有一个线程可能胜利,绝大部分的线程在尝试更新失败后,会通过自旋的形式再次进行尝试,重大占用了CPU的工夫片。导致AtomicInteger、AtomicLong在高并发场景下的性能重大升高,所以产生了LongAdder来满足高并发场景下的统计。

LongAdder原理


没有竞争的时候,线程会对base外面的value进行批改
一旦呈现高并发场景下的多线程竞争,那么LongAdder会初始化一个cell数组,而后对每个线程获取对应的hash值,之后通过hash & (size -1)[size为cell数组的长度]将每个线程定位到对应的cell单元格,之后这个线程将值写入对应的cell单元格中的value,之后将所有cell单元格的value和base中的value进行累加求和失去最终的值。
在整个过程中,波及到cell的初始化,线程定位到单元格,以及cell数组的扩容等一系列过程。接下来,咱们会对LongAdder的add办法源码进行剖析,来学习LongAdder在解决高并发场景下的思维

源码剖析

首先,LongAdder继承Striped64这个类。在这个类外面定义了如下要害变量

 // 获取以后机器的CPU数量   static final int NCPU = Runtime.getRuntime().availableProcessors();// cell数组     transient volatile Cell[] cells;// 没有竞争时写入的base值    transient volatile long base;// cell数组对应的锁 0示意以后cell数组没有线程应用,1示意以后数组曾经有线程占用    transient volatile int cellsBusy;        final boolean casBase(long cmp, long val) {        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);    }// 通过CAS来获取以后cell数组的锁    final boolean casCellsBusy() {        return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);    }//获取以后线程的hash值    static final int getProbe() {        return UNSAFE.getInt(Thread.currentThread(), PROBE);    }

上面是LongAdder的add办法

/*as是以后cell数组的援用b代表base的值m代表cell数组的长度a代表以后线程hash之后定位到的cell单元格v代表期望值uncontended=true 代表以后线程对应的cell单元格CAS胜利,uncontended=false示意以后线程对应的cell单元格CAS写入失败,呈现竞争。*/    public void add(long x) {        Cell[] as; long b, v; int m; Cell a;        if ((as = cells) != null || !casBase(b = base, b + x)) {            boolean uncontended = true;            if (as == null || (m = as.length - 1) < 0 ||                (a = as[getProbe() & m]) == null ||                !(uncontended = a.cas(v = a.value, v + x)))                /*                进入longAccumulate办法的三种状态                1,as == null || (m = as.length - 1) 以后cell数组为空,没有初始化                2,a = as[getProbe() & m]) == null 获取以后线程的hash值而后和数组长度进行&运算失去对应的cell单元格为空                3,uncontended = a.cas(v = a.value, v + x) 以后线程对应的cell单元格CAS失败,呈现竞争                */                longAccumulate(x, null, uncontended);        }    }

上面是longAccumulate办法

   */    final void longAccumulate(long x, LongBinaryOperator fn,                              boolean wasUncontended) {        int h;        if ((h = getProbe()) == 0) {            ThreadLocalRandom.current(); // force initialization            h = getProbe();            wasUncontended = true;        }        //是否扩容标记位,false示意肯定不扩容,true示意可能会扩容        boolean collide = false;                        for (;;) {            Cell[] as; Cell a; int n; long v;            // CASE-1:以后cell数组曾经初始化            if ((as = cells) != null && (n = as.length) > 0) {            // CASE-1.1 以后线程对应的cell单元格为空                if ((a = as[(n - 1) & h]) == null) {                    if (cellsBusy == 0) {       // Try to attach new Cell                        Cell r = new Cell(x);   // Optimistically create                        if (cellsBusy == 0 && casCellsBusy()) {                            boolean created = false;                            try {               // Recheck under lock                                Cell[] rs; int m, j;                                if ((rs = cells) != null &&                                    (m = rs.length) > 0 &&                                    rs[j = (m - 1) & h] == null) {                                    rs[j] = r;                                    created = true;                                }                            } finally {                                cellsBusy = 0;                            }                            if (created)                                break;                            continue;           // Slot is now non-empty                        }                    }                    collide = false;                }                // CASE-1.2 以后线程对应的cell单元格CAS写入数据失败呈现竞争。                else if (!wasUncontended)                           wasUncontended = true;      // Continue after rehash                // CASE-1.3 以后线程对应的cell单元格CAS写入失败之后重试,如果胜利则跳出以后循环                else if (a.cas(v = a.value, ((fn == null) ? v + x :                                             fn.applyAsLong(v, x))))                    break;                // CASE-1.4 如果以后cell长度曾经超过以后机器的CPU数量,回绝扩容                else if (n >= NCPU || cells != as)                    collide = false;                            else if (!collide)                    collide = true;                // 对cell数组进行扩容                else if (cellsBusy == 0 && casCellsBusy()) {                    try {                        if (cells == as) {      // Expand table unless stale                            Cell[] rs = new Cell[n << 1];                            for (int i = 0; i < n; ++i)                                rs[i] = as[i];                            cells = rs;                        }                    } finally {                        cellsBusy = 0;                    }                    collide = false;                    continue;                   // Retry with expanded table                }                h = advanceProbe(h);            }// CASE-2 cell数组进行初始化            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {                boolean init = false;                try {                           // Initialize table                    if (cells == as) {                        Cell[] rs = new Cell[2];                        rs[h & 1] = new Cell(x);                        cells = rs;                        init = true;                    }                } finally {                    cellsBusy = 0;                }                if (init)                    break;            }// CASE-3 但cell数组为空,并且获取cellBusy失败再次进行重试CAS写入base            else if (casBase(v = base, ((fn == null) ? v + x :                                        fn.applyAsLong(v, x))))                break;                          // Fall back on using base        }    }

场景剖析

1,呈现多线程竞争,就是多个线程写入base

就是add办法的判断条件!casBase(b = base, b + x)为false,取反之后为true,进入下一个if

1.1 此时cell数组没有进行初始化

就是as == null || (m = as.length - 1) < 0为true,进入longAccumulate办法的

else if (cellsBusy == 0 && cells == as && casCellsBusy()) {/*cellBusy为0示意没有线程获取到以后cell的锁,通过CAS将cellBusy更新为1获取到锁开始初始化cell数组init cell数组是否初始化实现的标记位*/                boolean init = false;                try {/*再次判断以后cell数组是否为后面的as援用。这里会呈现一种状况,就是线程1执行到cellBusy=0之后因为系统调度的起因让出CPU工夫,这时线程2更新cellBusy胜利获取到锁并初始化cell数组胜利并开释锁,将cellBusy更新为0。而后线程1失去CPU工夫开始更新cellBusy值获取锁胜利,但此时cell数组曾经初始化实现,此时线程1再次进行初始化会笼罩掉线程2曾经初始化的cell数组。其余中央的判断都是避免系统调度起因避免线程再次操作而笼罩。*/                    if (cells == as) {                    // 初始化一个长度为2的数组                        Cell[] rs = new Cell[2];                    //以后线程的hash值和1进行&运算,后果只会是0和1所以这个线程只会定位到cell[0]或者cell[1]的单元格                        rs[h & 1] = new Cell(x);                        cells = rs;                        // init为true示意cell数组初始化实现                        init = true;                    }                } finally {                // 更新cellBusy为0,开释锁                    cellsBusy = 0;                }                // cell数组初始化实现,跳出以后循环,开始下一轮循环                if (init)                    break;            }

如果此时cellBusy为0然而通过CAS将cellBusy为1失败,阐明曾经有线程在初始化cell数组,那么就进入下一个判断

else if (casBase(v = base, ((fn == null) ? v + x :                                        fn.applyAsLong(v, x))))// 再次尝试写入base,如果此时写入胜利,就跳出,否则进入下一个循环。                                  
1.2 以后线程定位的cell单元格不存在为空(a = as[getProbe() & m]) == null

getProbe()办法获取的就是以后线程的hash值。这个判断条件满足一个前提就是add办法的(as = cells) != null为true。此时进入longAccumulate的

/*老规矩 二次判断*/            if ((as = cells) != null && (n = as.length) > 0) {            // 二次判断                if ((a = as[(n - 1) & h]) == null) {                // 以后cellBusy为0 没有线程批改cell数组                    if (cellsBusy == 0) {                    // 创立一个cell对象                               Cell r = new Cell(x);                        // 获取以后cell数组的锁                           if (cellsBusy == 0 && casCellsBusy()) {                        // cell对象是否创立实现的标记位                            boolean created = false;                            try {                            /*                            再次判断,老样子,避免系统调度起因呈现线程的二次批改                            */                                             Cell[] rs; int m, j;                                if ((rs = cells) != null &&                                    (m = rs.length) > 0 &&                                    rs[j = (m - 1) & h] == null) {                                    rs[j] = r;                                    // cell单元格创立实现,更新created为true                                    created = true;                                }                            } finally {                            // 开释锁                                cellsBusy = 0;                            }                            // 跳出循环,走上面的else if                            if (created)                                break;                            continue;                                   }                    }                    collide = false;                }                h = advanceProbe(h);            }
1.3 以后线程对应的cell单元格呈现了竞争,多个线程通过hash之后都定位到同一个cell单元格,对应add办法的
!(uncontended = a.cas(v = a.value, v + x))

此时进入longAccumulate办法的

 else if (!wasUncontended)                           wasUncontended = true;                   else if (a.cas(v = a.value, ((fn == null) ? v + x :                                             fn.applyAsLong(v, x))))                    break;                 // 如果cell数组长度大于CPU数量,进行扩容                else if (n >= NCPU || cells != as)                    collide = false;                            else if (!collide)                    collide = true;                // 获取到cell数组的锁,开始执行数组扩容                else if (cellsBusy == 0 && casCellsBusy()) {                    try {                   // 老样子 二次判断                        if (cells == as) {                           // cell数组扩容为原来2倍                              Cell[] rs = new Cell[n << 1];                            for (int i = 0; i < n; ++i)                                rs[i] = as[i];                            cells = rs;                        }                    } finally {                    // 开释锁                        cellsBusy = 0;                    }                    // 进行扩容                    collide = false;                    continue;                                   }
1.4 LongAdder的sum办法
    public long sum() {        Cell[] as = cells; Cell a;        long sum = base;        if (as != null) {            for (int i = 0; i < as.length; ++i) {                if ((a = as[i]) != null)                    sum += a.value;            }        }        return sum;    }

在这个办法,遍历cell数组的value并和base进行求和,最终失去sum值。

总结

通过下面的源码剖析咱们能够看到,LongAdder通过每个线程对应本人的cell单元格来升高高并发下的竞争写问题。同时cell数组长度一旦大于CPU数量就进行扩容来最大水平应用并发数,因为一台机器真正并发的线程就等于CPU的数量,进而进步性能。
将对单个base的并发写散列开,每个线程对应一个cell单元格来升高并发。这个解决办法咱们在当前的高并发场景下能够学习借鉴。