关于juc:Java-并发包原子操作类解析

39次阅读

共计 10703 个字符,预计需要花费 27 分钟才能阅读完成。

Java 并发包原子操作类解析

前言

JUC 包中提供了一些列原子操作类,这些类都是应用 非阻塞 算法 CAS 实现的,相比应用锁实现原子性操作在性能上有较大进步。

因为原子性操作的原理都大致相同,本文只解说简略的 AtomicLong 类的原理以及在 JDK8 中新增的 LongAdder 类原理。

原子变量操作类

JUC 并发包中蕴含 AtomicInteger、AtomicLong 和 AtomicBoolean 等原子性操作类,原理大抵相似,接下来咱们看一下 AtomicLong 类。

AtomicLong 是原子性递增或者递加类,外部应用 Unsafe 来实现,咱们看上面的代码。

public class AtomicLong extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 1927816293512124184L;
    //1. 获取 Unsafe 实例
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    //2. 寄存变量 value 的偏移量
    private static final long valueOffset;
    //3. 判断 JVM 是否反对 Long 类型无锁 CAS
    static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();

    private static native boolean VMSupportsCS8();

    static {
        try {
            //4. 获取 value 在 AtomicLong 中的偏移量
            valueOffset = unsafe.objectFieldOffset
                (AtomicLong.class.getDeclaredField("value"));
        } catch (Exception ex) {throw new Error(ex); }
    }

    //5. 理论变量值
    private volatile long value;

    public AtomicLong(long initialValue) {value = initialValue;}
   ......
}

首先通过 Unsafe.getUnsafe() 办法获取到 Unsafe 类的实例,

为什么能够获取到 Unsafe 类的实例?因为 AtomicLong 类也在 rt.jar 包下,所以能够通过 BootStrap 类加载器进行加载。

第二步、第四步获取 value 变量在 AtomicLong 类中的偏移量。

第五步的 value 变量被申明为了 volatile,这是为了在多线程下保障 内存可见性,而 value 存储的就是具体计数器的值。

递增和递加操作代码

接下来咱们看一下 AtomicLong 中的次要函数。

// 调用 unsafe 办法,原子性设置 value 值为原始值 +1,返回递增后的值
public final long incrementAndGet() {return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}
// 调用 unsafe 办法,原子性设置 value 值为原始值 -1,返回值递加后的值
public final long decrementAndGet() {return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L;
}
// 调用 unsafe 办法,原子性设置 value 值为原始值 +1,返回原始值
public final long getAndIncrement() {return unsafe.getAndAddLong(this, valueOffset, 1L);
}
// 调用 unsafe 办法,原子性设置 value 值为原始值 -1,返回原始值
public final long getAndDecrement() {return unsafe.getAndAddLong(this, valueOffset, -1L);
}

上述代码都是通过调用 Unsafe 的 getAndAddLong() 办法来实现操作,这个函数是一个 原子性 操作,第一个参数为 AtomicLong 实例的援用,第二个参数是 value 变量在 AtomicLong 中的偏移值,第三个参数是要设置的第二个变量的值。

其中,getAndIncrement()办法在 JDK1.7 中的实现逻辑如下。

public final long getAndIncrement() {while (true) {long current = get();
        long next = current + 1;
        if (compareAndSet(current,next))
            return current;
    }
}

这段代码中,每个线程都是拿到变量的以后值(因为 value 是 volatile 变量,所以拿到的都是最新的值),而后在工作内存中进行减少 1 操作,之后应用 CAS 批改变量的值。如果设置失败,则始终循环尝试,直到设置胜利。

JDK8 中的逻辑为:

public final long getAndAddLong(Object var1, long var2, long var4) {
        long var6;
        do {var6 = this.getLongVolatile(var1, var2);
        } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));

        return var6;
}

能够看到,JDK1.7的 AtomicLong 中的循环逻辑曾经被 JDK8 中的原子操作类 Unsafe 内置了,之所以内置应该是思考到这个函数在其余中央也会用到,而 内置能够进步复用性

compareAndSet(long expect, long update)办法

public final boolean compareAndSet(long expect, long update) {return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}

如上代码咱们能够晓得,在外部还是调用了 unsafe.compareAndSwapLong 办法。如果原子变量中的 value 值等于 expect,则应用 update 值更新该值并返回 true,否则返回 false。

上面咱们通过一个多线程应用 AtomicLong 统计 0 的个数的例子来加深了解。

/**
 * @author 神秘杰克
 * 公众号: Java 菜鸟程序员
 * @date 2022/1/4
 * @Description 统计 0 的个数
 */
public class AtomicTest {private static AtomicLong atomicLong = new AtomicLong();
    private static Integer[] arrayOne = new Integer[]{0, 1, 2, 3, 0, 5, 6, 0, 56, 0};
    private static Integer[] arrayTwo = new Integer[]{10, 1, 2, 3, 0, 5, 6, 0, 56, 0};

    public static void main(String[] args) throws InterruptedException {final Thread threadOne = new Thread(() -> {
            final int size = arrayOne.length;
            for (int i = 0; i < size; ++i) {if (arrayOne[i].intValue() == 0) {atomicLong.incrementAndGet();
                }
            }
        });
        final Thread threadTwo = new Thread(() -> {
            final int size = arrayTwo.length;
            for (int i = 0; i < size; ++i) {if (arrayTwo[i].intValue() == 0) {atomicLong.incrementAndGet();
                }
            }
        });
        threadOne.start();
        threadTwo.start();
        // 期待线程执行结束
        threadOne.join();
        threadTwo.join();
        System.out.println("count 总数为:" + atomicLong.get()); //count 总数为: 7

    }
}

这段代码很简略,就是每找到一个 0 就会调用 AtomicLong 的原子性递增办法。

在没有原子类的时候,实现计数器须要肯定的同步措施,例如 synchronized 关键字等,但这些都是 阻塞 算法,对性能有肯定的影响,而咱们应用的 AtomicLong 应用的是 CAS 非阻塞 算法,性能更好。

然而在高并发下,AtomicLong 还会存在性能问题,JDK8 提供了一个在高并发下性能更好的 LongAdder 类。

LongAdder 介绍

后面说过,在高并发下应用 AtomicLong 时,大量线程会同时竞争同一个原子变量,然而因为同时只有一个线程的 CAS 操作会胜利,所以会造成大量线程竞争失败后,会有限循环不断的自旋尝试 CAS 操作,白白浪费 CPU 资源。

所以在 JDK8 中新增了一个原子性递增或者递加类 LongAdder 用来 克服高并发 AtomicLong 的毛病。既然 AtomicLong 的性能瓶颈是多个线程竞争一个变量的更新产生的,那如果把一个变量分成多个变量,让多个线程竞争多个资源,是不是就解决性能问题了?是的,LongAdder 就是这个思路。

如上图,在应用 LongAdder 时,则是在外部保护多个 Cell 变量,每个 Cell 外面有一个初始值为 0 的 long 型变量,这样的话在等同并发量的状况下,抢夺单个线程更新操作的线程会缩小,也就变相的缩小抢夺共享资源的并发量。

另外,如果多个线程在抢夺同一个 Cell 原子变量时失败了,它并不会始终自旋重试,而是去尝试其它 Cell 变量进行 CAS 尝试,这样就减少了以后线程重试 CAS 胜利的可能性,最初,在获取 LongAdder 以后值时,是把 所有的 Cell 变量的 value 值累加后再加上 base 返回的

LongAdder 保护了一个 提早初始化 的原子性更新数组(默认状况下 Cell 数组是 null)和一个基值变量 base,在一开始时并不创立 Cells 数组,而是在应用时创立,也就是 惰性加载

在一开始判断 Cell 数组是 null 并且并发线程缩小时,所有的累加都是在 base 变量上进行的 ,放弃 Cell 数组的大小为 2 的 N 次方,在初始化时 Cell 数组中的 Cell 元素个数为 2,数组外面的变量实体是 Cell 类型。Cell 类型是 AtomicLong 的一个改良,用来缩小缓存的争用,也就是解决 伪共享 问题。

在多个线程并发批改一个缓存行中的多个变量时,因为只能同时有一个线程去操作缓存行,将会导致性能的降落,这个问题就称之为 伪共享

一般而言,缓存行有 64 字节,咱们晓得一个 long 是 8 个字节,填充 5 个 long 之后,一共就是 48 个字节。

而 Java 中对象头在 32 位零碎下占用 8 个字节,64 位零碎下占用 16 个字节,这样填充 5 个 long 型即可填满 64 字节,也就是一个缓存行。

JDK8 以及之后的版本 Java 提供了sun.misc.Contended 注解,通过 @Contented 注解就能够解决伪共享的问题。

应用 @Contented 注解后会减少 128 字节的 padding,并且须要开启 -XX:-RestrictContended 选项后能力失效。

在 LongAdder 中解决伪共享的真正的外围就在 Cell 数组,Cell数组应用了 @Contented 注解。

对于大多数孤立的多个原子操作进行字节填充是节约的,因为原子性操作都是无规律地扩散在内存中的(也就是说多个原子性变量的内存地址是不间断的),多个原子变量被放入同一个缓存行的可能性很小。然而原子性数组元素的内存地址是间断的,所以数组内的多个元素能常常共享缓存行,因而这里应用 @Contented 注解对 Cell 类进行字节填充,这避免了数组中多个元素共享一个缓存行,在性能上是一个晋升。

LongAdder 源码剖析

问题:

  1. LongAdder 的构造是怎么的?
  2. 以后线程应该拜访 Cell 数组外面的哪一个 Cell 元素?
  3. 如何初始化 Cell 数组?
  4. Cell 数组如何扩容?
  5. 线程拜访调配的 Cell 元素有抵触后如何解决?
  6. 如何保障线程操作被调配的 Cell 元素的原子性?

接下来咱们看一下 LongAdder 的构造:

LongAdder 类继承自 Striped64 类,在 Striped64 外部保护这三个变量。

  • LongAdder 的实在值其实是 base 的值与 Cell 数组外面所有 Cell 元素中的 value 值的累加,base 是个根底值,默认为 0。
  • cellsBusy 用来实现 自旋锁,状态值只有 0 和 1,当创立 Cell 元素,扩容 Cell 数组或者初始化 Cell 数组时,应用 CAS 操作该变量来保障同时只有一个线程能够进行其中之一的操作。
transient volatile Cell[] cells;
transient volatile long base;
transient volatile int cellsBusy;
public class LongAdder extends Striped64 implements Serializable {

Cell 的结构

上面咱们看一下 Cell 的结构。

@sun.misc.Contended static final class Cell {
    volatile long value;
    Cell(long x) {value = x;}
    final boolean cas(long cmp, long val) {return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {throw new Error(e);
        }
    }
}

能够看到,外部保护一个被申明为 volatile 的变量,这里申明 volatile 是为了保障内存可见性。另外 cas 函数通过 CAS 操作,保障了以后线程更新时被调配的 Cell 元素中 value 值的原子性。并且能够看到 Cell 类是被 @Contended 润饰的,防止伪共享。

至此咱们曾经晓得了问题 1、6 的答案了。

sum()

sum()办法返回以后的值,外部操作是累加所有 Cell 外部的 value 值而后在累加 base。

因为计算总合时没有对 Cell 数组进行加锁,所以在累加过程中可能有其它线程对 Cell 值进行批改,也可能扩容,所以 sum 返回的值并不是十分精确的,其返回值并不是一个调用 sum()办法时原子快照值。

public long sum() {Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

reset()

reset()办法为重置操作,将 base 设置为 0,如果 Cell 数组有元素,则元素被重置为 0。

public void reset() {Cell[] as = cells; Cell a;
    base = 0L;
    if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)
                a.value = 0L;
        }
    }
}

sumThenReset()

sumThenReset()办法是 sum()办法的革新版本,该办法在应用 sum 累加对应的 Cell 值后,把以后的 Cell 和 base 重置为 0。

该办法存在在线程平安问题,比方第一个调用线程清空 Cell 的值,则后一个线程调用时累加的都是 0 值。

public long sumThenReset() {Cell[] as = cells; Cell a;
    long sum = base;
    base = 0L;
    if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null) {
                sum += a.value;
                a.value = 0L;
            }
        }
    }
    return sum;
}

add(long x)

接下来咱们次要看 add()办法,这个办法外面能够答复方才其余的问题。

public void add(long x) {Cell[] as; long b, v; int m; Cell a;
    //(1)
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        //(2)
        if (as == null || (m = as.length - 1) < 0 ||
            //(3)
            (a = as[getProbe() & m]) == null ||
            //(4)
            !(uncontended = a.cas(v = a.value, v + x)))
            //(5)
            longAccumulate(x, null, uncontended);
    }
}

final boolean casBase(long cmp, long val) {return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

该办法首先判断 cells 是否 null,如果为 null 则在 base 上进行累加。如果 cells 不为 null,或者线程执行代码 cas 失败,则去执行第二步。代码第二步第三步决定以后线程应该拜访 cells 数组中哪一个 Cell 元素,如果以后线程映射的元素存在的话则执行代码四。

第四步次要应用 CAS 操作去更新调配的 Cell 元素的 value 值,如果以后线程映射的元素不存在或者存在然而 CAS 操作失败则执行代码五。

线程应该拜访 cells 数组的哪一个 Cell 元素是通过 getProbe() & m 进行计算的,其中 m 是以后 cells 数组元素个数 -1,getProbe()则用于获取以后线程中变量 threadLocalRandomProbe 的值,这个值一开始为 0,在代码第五步外面会对其进行初始化。并且以后线程通过调配的 Cell 元素的 cas 函数来保障对 Cell 元素 value 值更新的原子性。

当初咱们曾经明确了第二个问题。

上面咱们看一下 longAccumulate(x,null,uncontended)办法,该办法次要是 cells 数组初始化和扩容的中央。

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    //6. 初始化以后线程变量 ThreadLocalRandomProbe 的值
    int h;
    if ((h = getProbe()) == 0) {ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {Cell[] as; Cell a; int n; long v;
        //7.
        if ((as = cells) != null && (n = as.length) > 0) {
            //8.
            if ((a = as[(n - 1) & h]) == null) {if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {rs[j] = r;
                                created = true;
                            }
                        } finally {cellsBusy = 0;}
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            //9. 以后 Cell 存在,则执行 CAS 设置
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            //10. 以后 Cell 元素个数大于 CPU 个数
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            //11. 是否有抵触
            else if (!collide)
                collide = true;
            //12. 如果以后元素个数没有达到 CPU 个数,并且存在抵触则扩容
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {if (cells == as) {      // Expand table unless stale
                      //12.1
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    //12.2
                    cellsBusy = 0;
                }
                //12.3
                collide = false;
                continue;                   // Retry with expanded table
            }
            //13. 为了可能找到一个闲暇的 Cell,从新计算 hash 值,xorshift 算法生成随机数
            h = advanceProbe(h);
        }
        //14. 初始化 Cell 数组
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    //14.1
                    Cell[] rs = new Cell[2];
                    //14.2
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                //14.3
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

该办法较为简单,咱们次要关注问题 3,问题 4,问题 5。

  1. 如何初始化 Cell 数组?
  2. Cell 数组如何扩容?
  3. 线程拜访调配的 Cell 元素有抵触后如何解决?

每个线程第一次 执行到代码第六步的时候,会初始化以后线程变量 ThreadLocalRandomProbe 的值,该值次要为了计算以后线程为了调配到 cells 数组中的哪一个 cell 元素中

cells 数组的初始化是在代码第十四步中进行,其中 cellsBusy 是一个标识,为 0 阐明以后 cells 数组没有被初始化或者扩容,也没有在新建 Cell 元素,为 1 阐明 cells 数组正在被初始化或者扩容、创立新元素,通过 CAS 来进行 0 或 1 状态切换,调用的是casCellsBusy()

假如以后线程通过 CAS 设置 cellsBuys 为 1,则以后线程开始初始化操作,那么这时候其余线程就不能进行扩容了,如代码(14.1)初始化 cells 数组个数为 2,而后应用 h & 1 计算以后线程应该拜访 cell 数组的那个地位,应用的 h 就是以后线程的 threadLocalRandomProbe 变量。而后标识 Cells 数组以及被初始化,最初(14.3)重置了 cellsBusy 标记。尽管这里没有应用 CAS 操作,然而却是线程平安的,起因是 cellsBusy 是 volatile 类型的,保障了内存可见性。在这里初始化的 cells 数组外面的两个元素的值目前还是 null。当初咱们晓得了问题 3 的答案。

而 cells 数组的扩容是在代码第十二步进行的,对 cells 扩容是有条件的,也就是第十步、十一步条件都不满足后进行扩容操作。具体就是以后 cells 的元素个数小于以后机器 CPU 个数并且以后多个线程拜访了 cells 中同一个元素,从而导致某个线程 CAS 失败才会进行扩容。

为何要波及 CPU 个数呢?只有当每个 CPU 都运行一个线程时才会使多线程的成果最佳,也就是当 cells 数组元素个数与 CPU 个数统一时,每个 Cell 都应用一个 CPU 进行解决,这时性能才是最佳的。

代码第十二步也是先通过 CAS 设置 cellsBusy 为 1,而后能力进行扩容。假如 CAS 胜利则执行代码(12.1)将容量裁减为之前的 2 倍,并复制 Cell 元素到扩容后数组。另外,扩容后 cells 数组外面除了蕴含复制过去的元素外,还蕴含其余新元素,这些元素的值目前还是 null。当初咱们晓得了问题 4 的答案。

在代码第七步、第八步中,以后线程调用 add()办法并依据以后线程的随机数 threadLocalRandomProbe 和 cells 元素个数计算要拜访的 Cell 元素下标,而后如果发现对应下标元素的值为 null,则新增一个 Cell 元素到 cells 数组,并且在将其增加到 cells 数组之前要竞争设置 cellsBusy 为 1。

而代码第十三步,对 CAS 失败的线程从新计算以后线程的随机值 threadLocalRandomProbe,以缩小下次访问 cells 元素时的抵触机会。这里咱们就晓得了问题 5 的答案。

总结

该类通过外部 cells 数组分担了高并发下多线程同时对一个原子变量进行更新时的竞争量,让多个线程能够同时对 cells 数组外面的元素进行并行的更新操作。另外,数组元素 Cell 应用 @Contended 注解进行润饰,这防止了 cells 数组内多个原子变量被放入同一个缓存行,也就是防止了伪共享。

LongAccumulator 相比于 LongAdder,能够为累加器提供非 0 的初始值,后者只能提供默认的 0 值。另外,前者还能够指定累加规定,比方不进行累加而进行相乘,只须要在结构 LongAccumulator 时传入自定义的双目运算器即可,后者则内置累加的规定。

正文完
 0