共计 6439 个字符,预计需要花费 17 分钟才能阅读完成。
一、有 AtomicLong 为什么还须要 LongAdder/LongAccumulator?
大家对 AtomicLong 应该比拟相熟(如果未接触过,请翻看另一篇博客,通俗易懂的 AtomicLong 源码分析 ),但 JDK1.8 为什么又新增了 LongAdder/LongAccumulator2 个类?AtomicLong 不够用吗?答案:次要是基于性能思考。AtomicLong 的 incrementAndGet() 办法在高并发场景下,多个线程竞争批改共享资源 value, 会造成循环耗时过长,进而导致性能问题,上面贴出源码来解说这个问题:
public final long incrementAndGet() {return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}
其中,unsafe.getAndAddLong 办法如下:
public final long getAndAddLong(Object var1, long var2, long var4) {
long var6;
do {var6 = this.getLongVolatile(var1, var2);
} while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));
return var6;
}
能够看到,多个线程在竞争批改共享资源 value 值时,是在一个循环外面,高并发状况下,同一时刻只有一个线程 CAS 操作胜利,其余大多数线程 CAS 失败,从而处于一直循环重试的场景,因而对性能造成影响。
二、那为什么 LongAdder/LongAccumulator 为什么能晋升性能呢?它底层是怎么实现的呢?用了什么数据结构呢?
因为 LongAccumulator 是 LongAdder 的性能扩大,底层原理差不多,在此以 LongAdder 原理来阐明。
首先看 LongAdder 的类构造:
public class LongAdder extends Striped64 implements Serializable {}
LongAdder 继承了 Striped64,真正发挥作用的是这个 Striped64 类,来看看它的类构造:
/**
* A package-local class holding common representation and mechanics * for classes supporting dynamic striping on 64bit values. The class * extends Number so that concrete subclasses must publicly do so. */@SuppressWarnings("serial")
abstract class Striped64 extends Number {}
接下来看它的重要属性有哪些:
/**
* Table of cells. When non-null, size is a power of 2. */
/* 晋升性能发挥作用的 Cell 数组,核心思想是通过多个线程在对应本人的 Cell 进行累加,从而缩小竞争 */
transient volatile Cell[] cells;
/**
* Base value, used mainly when there is no contention, but also as * a fallback during table initialization races. Updated via CAS. */
/* 多个线程没有产生竞争的时候,值累加在 base 上,这与 AtomicLong 的 value 作用是一样的 */
transient volatile long base;
/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells. */
/* 当 Cells 数组初始化,创立元素或者扩容的时候为 1,否则为 0 */
transient volatile int cellsBusy;
可能不少同学对 Cell 感到不解,其实很简略,关上源码就晓得到底了
/*@Contended 注解是 JDK1.8 提供的字节填充形式,解决伪共享问题,可翻看另一篇博客:** 什么是伪共享 ***/
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) {value = x;}
final boolean cas(long cmp, long val) {return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
}
它是 Striped64 的外部类,外面有个 volatile 润饰的 value 值,也是通过 cas 操作批改它的值,LongAdder 计数器的值就是所有 Cell[]的 value 和再加上 base 的值。
对数据结构有了大抵理解后,再来看外面的罕用要害办法:
public void increment() {add(1L);
}
public void decrement() {add(-1L);
}
能够看到,递增递加都调用了 add()办法,可见它是实现的外围。往里看:
public void add(long x) {Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
先来看第一个 if 分支 if ((as = cells) != null || !casBase(b = base, b + x))
因为初始时 cells 为空,第一次调用 add()办法的话,(as = cells) != null 不成立,转向!casBase(b = base, b + x), 关上外面代码很简略,就是对 base 值进行 CAS 批改,后面说过,没有竞争的时候批改的是 base 值,产生竞争的时候 Cellp[]才起作用。
final boolean casBase(long cmp, long val) {return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
如果 casBase 返回 true,示意该线程批改胜利,完结;
如果 casBase 返回 false,示意该线程批改失败,产生了竞争,进入外面的 if 条件
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
咋看有点简单,有 4 个分支,不焦急,一个个来看。
第一个和第二个就是判断 Cell[]有没有初始化,且元素不为空。
第三个和第四个就是在 Cell[]已初始化的前提下,定位出以后线程应该对应的 Cell 元素,并尝试 CAS 批改外面的 value 值,给它加 x,如果不胜利,进入外面的 longAccumulate(x, null, uncontended);
进入之前,可能有同学对 uncontended 和 getProbe() & m 有疑难。
uncontendted,翻译过去是 ” 未产生过竞争的 ” 意思,外面的办法会用到这个标记;而 getProbe()返回的是 Thread 类 threadLocalRandomProbe 属性的值,它在 ThreadLocalRandom 外面发挥作用,另一篇博客有解说,ThreadLocalRandom 原理分析。在这里咱们能够把它了解成 HashMap 的哈希值 h,而后与 m =as.length – 1 进行与操作,其实等效于 h % as.length,即找到对应的地位,是不是和 HashMap 定位元素地位很相似?
static final int getProbe() {return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
Class<?> tk = Thread.class;
PROBE = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomProbe"));
当初咱们能够进入 longAccumulate(x, null, uncontended); 办法了,关上一看,你 kin 你 ca,这么简单,失望了有没有?别急,急躁缓缓剖析!
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
int h;
// 如果没有初始化
if ((h = getProbe()) == 0) {// current()外面会初始化 probe 值
ThreadLocalRandom.current(); // force initialization
// 从新获取 probe 值
h = getProbe();
// 还未初始化,必定没有产生竞争
wasUncontended = true;
}
// 是否产生碰撞,即多个线程 hash 到同一个 Cell 元素地位
boolean collide = false; // True if last slot nonempty
for (;;) {Cell[] as; Cell a; int n; long v;
// 如果 cells 数组曾经初始化
if ((as = cells) != null && (n = as.length) > 0) {
// hash 到的数组元素地位为空
if ((a = as[(n - 1) & h]) == null) {if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
// 尝试获取锁
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
// 再次查看该地位元素是否为空
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {// 将新生成的元素 Cell(x)放在该地位上
rs[j] = r;
created = true;
}
} finally {
// 开释锁
cellsBusy = 0;
}
if (created)
// (1)创立胜利,退出循环
break;
// 创立不胜利,下一轮循环重试
continue; // Slot is now non-empty
}
}
// 该地位元素为空,则没有产生碰撞
collide = false;
}
// 对应里面 add()办法的第四个条件,即该地位元素不为空,且 cas 失败了
// 重置 wasUncontended,通过上面的 advanceProbe()从新 hash,找到新的地位进行下一轮重试
// 之所以重置 wasUncontended,是为了下一轮重试时走上面 cas 分支,尝试对该地位元素进行值的批改
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 第 N(N > 1)轮重试,尝试对该地位元素进行值的批改,else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
// (2)批改胜利退出循环
break;
// 如果数组元素达到 CPU 个数或者曾经被扩容了,则从新 hash 下一轮重试
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
// 以上条件都不满足,则产生了碰撞,且竞争失败了
else if (!collide)
collide = true;
// 碰撞竞争失败时,则去尝试获取锁去扩容 Cell 数组
else if (cellsBusy == 0 && casCellsBusy()) {
try {if (cells == as) { // Expand table unless stale
// 扩容为原来的 2 倍
Cell[] rs = new Cell[n << 1];
// 拷贝旧数组元素到新数组中
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
// 开释所
cellsBusy = 0;
}
// 扩容胜利,则重置 collide,示意我有新的地位去重试了,不跟你抢这个地位了
collide = false;
continue; // Retry with expanded table
}
// 产生新的 hash 值,尝试去找别的数组地位
h = advanceProbe(h);
}
// Cell[]为空,对应里面 add()的第一二个条件,则尝试获取锁去初始化数组
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
// 初始化大小为 2
Cell[] rs = new Cell[2];
// 将 Cell(x)放在 0 或 1 号地位上
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
// 开释锁
cellsBusy = 0;
}
// (3)初始化胜利,退出循环
if (init)
break;
}
// 有别的线程正在初始化数组,则尝试累加在 base 变量上
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
// (4)胜利则退出循环
break; // Fall back on using base
}
}
由下面代码能够看出,这个办法逻辑相当简单,再来总结梳理下,能够从下面正文标记的 4 处退出循环的条件来看:
(1) Cell[]不为空,hash 到的地位元素为空,那么就创立元素,并赋值为 x,胜利的话能够退出循环;
(2) Cell[]不为空,hash 到的地位元素不为空,且上一轮 cas 批改失败了,这轮重试如果胜利,能够退出循环;
(3) Cell[]为空,那么尝试初始化数组,并把 x 赋值到 0 或 1 号地位上,胜利的话能够退出循环;
(4) Cell[]为空,且有其余线程在初始化数组,那么尝试累加到 base 上,胜利的话能够退出循环;
其余条件都是须要通过 advanceProbe()进行 rehash 到其余地位,进行下一轮重试
三、总结
总结之前顺便提下 LongAccumulator,它是把 LongAdder 的 (v + x) 操作换成一个 LongBinaryOperator,即用户能够自定义累加操作的逻辑,其余中央都是一样的
public LongAccumulator(LongBinaryOperator accumulatorFunction,
long identity) {
this.function = accumulatorFunction;
base = this.identity = identity;
}
整个 LongAdder 的源码剖析就到这里完结了,其实 JDK 也提供了 double 类型的 DoubleAdder 和 DoubleAccumulator,他们都继承了 Striped64,原理是大同小异的,有趣味的同学能够本人去看看源码。
对于平时开发如何抉择 AtomicLong,置信大家也很分明了,并发不高的状况下用 AtomicLong 就行,并发很高的状况下就要抉择 LongAdder 或者 LongAccumulator 了!
最初在别的中央看到一张图,能够更好的帮忙咱们了解原理,放在这里给大家看看。
ps: 第一次写技术博客,形容地不精确的中央,心愿大家容纳,也能够指出来,共同进步!