共计 5924 个字符,预计需要花费 15 分钟才能阅读完成。
简述
LongAdder 是 JDK8 新增的一个用于高并发场景下进行统计的类。以前 AtomicInteger、AtomicLong 等通过应用 CAS 的形式来更新变量,比 synchronized 这些阻塞算法领有更好的性能。然而在高并发状况下,大量线程同时去更新一个变量,任意一个工夫点只有一个线程可能胜利,绝大部分的线程在尝试更新失败后,会通过自旋的形式再次进行尝试,重大占用了 CPU 的工夫片。导致 AtomicInteger、AtomicLong 在高并发场景下的性能重大升高,所以产生了 LongAdder 来满足高并发场景下的统计。
LongAdder 原理
没有竞争的时候,线程会对 base 外面的 value 进行批改
一旦呈现高并发场景下的多线程竞争,那么 LongAdder 会初始化一个 cell 数组,而后对每个线程获取对应的 hash 值,之后通过 hash & (size -1)[size 为 cell 数组的长度]将每个线程定位到对应的 cell 单元格,之后这个线程将值写入对应的 cell 单元格中的 value,之后将所有 cell 单元格的 value 和 base 中的 value 进行累加求和失去最终的值。
在整个过程中,波及到 cell 的初始化,线程定位到单元格,以及 cell 数组的扩容等一系列过程。接下来,咱们会对 LongAdder 的 add 办法源码进行剖析,来学习 LongAdder 在解决高并发场景下的思维
源码剖析
首先,LongAdder 继承 Striped64 这个类。在这个类外面定义了如下要害变量
// 获取以后机器的 CPU 数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
// cell 数组
transient volatile Cell[] cells;
// 没有竞争时写入的 base 值
transient volatile long base;
// cell 数组对应的锁 0 示意以后 cell 数组没有线程应用,1 示意以后数组曾经有线程占用
transient volatile int cellsBusy;
final boolean casBase(long cmp, long val) {return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
// 通过 CAS 来获取以后 cell 数组的锁
final boolean casCellsBusy() {return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}
// 获取以后线程的 hash 值
static final int getProbe() {return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
上面是 LongAdder 的 add 办法
/*
as 是以后 cell 数组的援用
b 代表 base 的值
m 代表 cell 数组的长度
a 代表以后线程 hash 之后定位到的 cell 单元格
v 代表期望值
uncontended=true 代表以后线程对应的 cell 单元格 CAS 胜利,uncontended=false 示意以后线程对应的 cell 单元格 CAS 写入失败,呈现竞争。*/
public void add(long x) {Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
/*
进入 longAccumulate 办法的三种状态
1,as == null || (m = as.length - 1) 以后 cell 数组为空,没有初始化
2,a = as[getProbe() & m]) == null 获取以后线程的 hash 值而后和数组长度进行 & 运算失去对应的 cell 单元格为空
3,uncontended = a.cas(v = a.value, v + x) 以后线程对应的 cell 单元格 CAS 失败,呈现竞争
*/
longAccumulate(x, null, uncontended);
}
}
上面是 longAccumulate 办法
*/
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
// 是否扩容标记位,false 示意肯定不扩容,true 示意可能会扩容
boolean collide = false;
for (;;) {Cell[] as; Cell a; int n; long v;
// CASE-1: 以后 cell 数组曾经初始化
if ((as = cells) != null && (n = as.length) > 0) {
// CASE-1.1 以后线程对应的 cell 单元格为空
if ((a = as[(n - 1) & h]) == null) {if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {rs[j] = r;
created = true;
}
} finally {cellsBusy = 0;}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
// CASE-1.2 以后线程对应的 cell 单元格 CAS 写入数据失败呈现竞争。else if (!wasUncontended)
wasUncontended = true; // Continue after rehash
// CASE-1.3 以后线程对应的 cell 单元格 CAS 写入失败之后重试,如果胜利则跳出以后循环
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// CASE-1.4 如果以后 cell 长度曾经超过以后机器的 CPU 数量,回绝扩容
else if (n >= NCPU || cells != as)
collide = false;
else if (!collide)
collide = true;
// 对 cell 数组进行扩容
else if (cellsBusy == 0 && casCellsBusy()) {
try {if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {cellsBusy = 0;}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
// CASE-2 cell 数组进行初始化
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {cellsBusy = 0;}
if (init)
break;
}
// CASE-3 但 cell 数组为空,并且获取 cellBusy 失败再次进行重试 CAS 写入 base
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
场景剖析
1,呈现多线程竞争,就是多个线程写入 base
就是 add 办法的判断条件 !casBase(b = base, b + x)
为 false,取反之后为 true,进入下一个 if
1.1 此时 cell 数组没有进行初始化
就是 as == null || (m = as.length - 1) < 0
为 true,进入 longAccumulate 办法的
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
/*cellBusy 为 0 示意没有线程获取到以后 cell 的锁,通过 CAS 将 cellBusy 更新为 1 获取到锁开始初始化 cell 数组
init cell 数组是否初始化实现的标记位
*/
boolean init = false;
try {
/*
再次判断以后 cell 数组是否为后面的 as 援用。这里会呈现一种状况,就是线程 1 执行到 cellBusy= 0 之后因为系统调度的起因让出 CPU 工夫,这时线程 2 更新 cellBusy 胜利获取到锁并初始化 cell 数组胜利并开释锁,将 cellBusy 更新为 0。而后线程 1 失去 CPU 工夫开始更新 cellBusy 值获取锁胜利,但此时 cell 数组曾经初始化实现,此时线程 1 再次进行初始化会笼罩掉线程 2 曾经初始化的 cell 数组。其余中央的判断都是避免系统调度起因避免线程再次操作而笼罩。*/
if (cells == as) {
// 初始化一个长度为 2 的数组
Cell[] rs = new Cell[2];
// 以后线程的 hash 值和 1 进行 & 运算,后果只会是 0 和 1 所以这个线程只会定位到 cell[0]或者 cell[1]的单元格
rs[h & 1] = new Cell(x);
cells = rs;
// init 为 true 示意 cell 数组初始化实现
init = true;
}
} finally {
// 更新 cellBusy 为 0,开释锁
cellsBusy = 0;
}
// cell 数组初始化实现,跳出以后循环,开始下一轮循环
if (init)
break;
}
如果此时 cellBusy 为 0 然而通过 CAS 将 cellBusy 为 1 失败,阐明曾经有线程在初始化 cell 数组,那么就进入下一个判断
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
// 再次尝试写入 base,如果此时写入胜利,就跳出,否则进入下一个循环。
1.2 以后线程定位的 cell 单元格不存在为空(a = as[getProbe() & m]) == null
getProbe()办法获取的就是以后线程的 hash 值。这个判断条件满足一个前提就是 add 办法的 (as = cells) != null
为 true。此时进入 longAccumulate 的
/*
老规矩 二次判断
*/
if ((as = cells) != null && (n = as.length) > 0) {
// 二次判断
if ((a = as[(n - 1) & h]) == null) {
// 以后 cellBusy 为 0 没有线程批改 cell 数组
if (cellsBusy == 0) {
// 创立一个 cell 对象
Cell r = new Cell(x);
// 获取以后 cell 数组的锁
if (cellsBusy == 0 && casCellsBusy()) {
// cell 对象是否创立实现的标记位
boolean created = false;
try {
/*
再次判断,老样子,避免系统调度起因呈现线程的二次批改
*/
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {rs[j] = r;
// cell 单元格创立实现,更新 created 为 true
created = true;
}
} finally {
// 开释锁
cellsBusy = 0;
}
// 跳出循环,走上面的 else if
if (created)
break;
continue;
}
}
collide = false;
}
h = advanceProbe(h);
}
1.3 以后线程对应的 cell 单元格呈现了竞争,多个线程通过 hash 之后都定位到同一个 cell 单元格,对应 add 办法的
!(uncontended = a.cas(v = a.value, v + x))
此时进入 longAccumulate 办法的
else if (!wasUncontended)
wasUncontended = true;
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 如果 cell 数组长度大于 CPU 数量,进行扩容
else if (n >= NCPU || cells != as)
collide = false;
else if (!collide)
collide = true;
// 获取到 cell 数组的锁,开始执行数组扩容
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// 老样子 二次判断
if (cells == as) {
// cell 数组扩容为原来 2 倍
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
// 开释锁
cellsBusy = 0;
}
// 进行扩容
collide = false;
continue;
}
1.4 LongAdder 的 sum 办法
public long sum() {Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
在这个办法,遍历 cell 数组的 value 并和 base 进行求和,最终失去 sum 值。
总结
通过下面的源码剖析咱们能够看到,LongAdder 通过每个线程对应本人的 cell 单元格来升高高并发下的竞争写问题。同时 cell 数组长度一旦大于 CPU 数量就进行扩容来最大水平应用并发数,因为一台机器真正并发的线程就等于 CPU 的数量,进而进步性能。
将对单个 base 的并发写散列开,每个线程对应一个 cell 单元格来升高并发。这个解决办法咱们在当前的高并发场景下能够学习借鉴。