前言
在介绍 AtomicInteger 时,曾经阐明在高并发下大量线程去竞争更新同一个原子变量时,因为只有一个线程可能更新胜利,其余的线程在竞争失败后,只能始终循环,一直的进行 CAS 尝试,从而节约了 CPU 资源。而在 JDK 8 中新增了 LongAdder 用来解决高并发下变量的原子操作。上面同样通过浏览源码来理解 LongAdder 。
公众号:liuzhihangs,记录工作学习中的技术、开发及源码笔记;时不时分享一些生存中的见闻感悟。欢送大佬来领导!
介绍
一个或多个变量独特维持初值为 0 总和。 当跨线程竞争更新时,变量集能够动静增长以缩小竞争。 办法 sum 返回以后变量集的总和。
当多个线程更新时,这个类是通常优选 AtomicLong ,比方用于收集统计信息,不用于细粒度同步控制的独特总和。 在低更新竞争,这两个类具备类似的特色。 但在高更新竞争时,应用 LongAdder 性能要高于 AtomicLong,同样要耗费更高的空间为代价。
LongAdder 继承了 Striped64,外部保护一个 Cells 数组,相当于多个 Cell 变量, 每个 Cell 外面都有一个初始值为 0 的 long 型变量。
源码剖析
Cell 类
Cell 类 是 Striped64 的动态外部类。
@sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } }}
- Cell 应用 @sun.misc.Contended 注解。
- 外部保护一个被 volatile 润饰的 long 型 value 。
- 提供 cas 办法,更新value。
其中 @sun.misc.Contended 注解作用是为了缩小缓存争用。什么是缓存争用,这里只做下简要介绍。
伪共享
CPU 存在多级缓存,其中最小存储单元是 Cache Line,每个 Cache Line 能存储 64 个字节的数据。
在多线程场景下,A B 两个线程数据如果被存储到同一个 Cache Line 上,此时 A B 更新各自的数据,就会产生缓存争用,导致多个线程之间互相牵制,变成了串行程序,升高了并发。
@sun.misc.Contended 注解,则能够保障该变量独占一个 Cache Line。
具体可参考:http://openjdk.java.net/jeps/142
Striped64 外围属性
abstract class Striped64 extends Number { /** CPU 的数量,以限度表大小 */ static final int NCPU = Runtime.getRuntime().availableProcessors(); /** * cell 数组,当非空时,大小是 2 的幂。 */ transient volatile Cell[] cells; /** * Base 值,在无争用时应用,表初始化比赛期间的后备。应用 CAS 更新 */ transient volatile long base; /** * 调整大小和创立Cells时自旋锁(通过CAS锁定)应用。 */ transient volatile int cellsBusy;}
Striped64 类次要提供以下几个属性:
- NCPU:CPU 的数量,以限度表大小。
- cells:Cell[] cell 数组,当非空时,大小是 2 的幂。
- base:long 型,Base 值,在无争用时应用,表初始化比赛期间的后备。应用 CAS 更新。
- cellsBusy:调整大小和创立Cells时自旋锁(通过CAS锁定)应用。
上面看是进入外围逻辑:
LongAdder#add
public class LongAdder extends Striped64 implements Serializable { public void add(long x) { Cell[] as; long b, v; int m; Cell a; // cells 是 数组,base 是根底值 if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } }}
abstract class Striped64 extends Number { // 应用 CAS 更新 BASE 的值 final boolean casBase(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); } // 返回以后线程的探测值。 因为包装限度,从ThreadLocalRandom复制 static final int getProbe() { return UNSAFE.getInt(Thread.currentThread(), PROBE); }}
- 首先会对 Base 值进行 CAS 更新,当 Base 产生竞争时, 会更新数组内的 Cell 。
- 数组未初始化,Cell 未初始化, Cell 更新失败,即 Cell 也产生竞争时,会调用 Striped64 的 longAccumulate 办法。
Striped64#longAccumulate
abstract class Striped64 extends Number { /** * x 要减少的值 * wasUncontended 有没有产生竞争 */ final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; // 以后线程有无初始化线程探测值, 给以后线程生成一个 非 0 探测值 if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty // 循环 for (;;) { Cell[] as; Cell a; int n; long v; // 数组不为空切数组长度大于 0 if ((as = cells) != null && (n = as.length) > 0) { // (n - 1) & h 获取到索引,索引处 cell 是否为 null, cell未初始化 if ((a = as[(n - 1) & h]) == null) { // 判断 cellsBusy 是否为 0 if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create // cellsBusy == 0 且 应用 casCellsBusy 办法将其更新为 1,失败会持续循环 if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; // 从新查看状态 并创立 if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { // 创立实现之后, 改回 cellsBusy 值 cellsBusy = 0; } if (created) break; // 未创立持续循环 continue; // Slot is now non-empty } } collide = false; } // 传入的 wasUncontended 为 false 即产生碰撞了, 批改为未碰撞, 此处会持续循环,走到下一步,相当于会始终循环这个 cell else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash // cas 更新 cell 的 value, 胜利则返回 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // 数组到最大长度 即大于等于 CPU 数量, 或者 cells 数组被扭转, else if (n >= NCPU || cells != as) collide = false; // At max size or stale else if (!collide) collide = true; // 乐观锁 进行扩容 else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } // 以后探针值不能操作胜利,则从新设置一个进行尝试 h = advanceProbe(h); } // 没有加 cellsBusy 乐观锁 且 没有初始化,且取得锁胜利(此时 cellsBusy == 1) else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } // 尝试在base上累加 else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }}
longAccumulate 办法一共有三种状况
(as = cells) != null && (n = as.length) > 0
数组不为空且长度大于 0 。- 获取索引处的 cell , cell 为空则进行初始化。
- cell 不为空,应用 cas 更新, 胜利
break;
跳出循环, 失败则还在循环内,会始终尝试。 - collide 指是否发生冲突,抵触后会进行重试。
- 抵触后会尝试取得锁并进行扩容,扩容长度为原来的 2 倍,而后持续重试。
- 取得锁失败(阐明其余线程在扩容)会从新进行计算探针值。
cellsBusy == 0 && cells == as && casCellsBusy()
数组为空,取得乐观锁胜利。- 间接初始化数组。
- 初始数组长度为 2 。
casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))
取得乐观锁失败。- 阐明有其余线程在初始化数组,间接 CAS 更新 base 。
LongAdder#sum
public class LongAdder extends Striped64 implements Serializable { public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }}
- 数组为空,阐明没有产生竞争,间接返回 base 。
- 数组不为空,阐明产生竞争,累加 cell 的 value 和 base 的和进行返回。
总结
根本流程
- LongAdder 继承了 Striped64,外部保护一个 Cells 数组,相当于多个 Cell 变量, 每个 Cell 外面都有一个初始值为 0 的 long 型变量。
- 未产生竞争时(Cells 数组未初始化),是对 base 变量进行原子操作。
- 产生竞争时,每个线程对本人的 Cell 变量的 value 进行原子操作。
如何确定哪个线程操作哪个 cell?
通过 getProbe()
办法获取该线程的探测值,而后和数组长度 n - 1
做 &
操作 (n - 1) & h 。
static final int getProbe() { return UNSAFE.getInt(Thread.currentThread(), PROBE);}
Cells 数组初始化及扩容?
初始化扩容时会判断 cellsBusy
, cellsBusy 应用 volatile
润饰,保障线程见可见性,同时应用 CAS 进行更新。 0 示意闲暇,1 示意正在初始化或扩容。
初始化时会创立长度为 2 的 Cell 数组。扩容是创立一个长度是原数组长度 2 倍的新数组,并循环赋值。
如果线程拜访调配的 Cell 元素有抵触后,会应用 advanceProbe()
办法从新获取探测值,再次进行尝试。
应用场景
在高并发状况下,须要绝对高的性能,同时数据准确性要求不高,能够思考应用 LongAdder。
当要保障线程平安,并容许肯定的性能损耗时,并对数据准确性要求较高,优先应用 AtomicLong。