关于java:JAVA并发编程原子操作类以及LongAdder源码分析

41次阅读

共计 8176 个字符,预计需要花费 21 分钟才能阅读完成。

1. 原子操作类有哪些

2. 根本类型原子类

3. 数组类型原子类

4. 援用类型原子类

5. 对象的属性批改原子类

6.LongAdder 原理剖析

7.LongAdder 源码解读

8. 总结

1. 原子操作类有哪些
JAVA 并发编程——CAS 概念以及 ABA 问题咱们通过以前这篇博客,基础性地理解了一下 CAS 的概念以及 ABA 的用法,而且应用了一下根本的 AtomicInteger 类,对原子援用类有了一个初步的理解。明天咱们来零碎归类一下原子援用类:

2. 根本类型原子类
首先是根本类型原子类:

这是咱们之前的博客介绍过的,咱们就列举一下咱们常常应用的 api:

public final int get() // 获取以后的值
public final int getAndSet(int newValue)// 获取以后的值,并设置新的值,cas
public final int getAndIncrement()// 获取以后的值,并自增,cas
public final int getAndDecrement() // 获取以后的值,并自减,cas
public final int getAndAdd(int delta) // 获取以后的值,并加上预期的值
boolean compareAndSet(int expect, int update) // 如果输出的数值等于预期值,则以原子形式将该值设置为输出值(update)

无非就是应用了 cas,对数据的操作进行了原子性校验,这样多线程操作数据的时候就不会失落操作了。

3. 数组类型原子类

这一组 api 也比较简单

 AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[5]);// 创立数组
atomicIntegerArray.getAndSet(0,1122);
atomicIntegerArray.getAndIncrement(1);

public class AtomicIntegerArrayDemo {public static void main(String[] args) {
        // 初始化一个数组
        AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[5]);
        //AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(5);
        //AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[]{1,2,3,4,5});
        // 打印一下看看原始值
        for(int i =0;i<atomicIntegerArray.length();i++){System.out.println(atomicIntegerArray.get(i));
        }
        System.out.println();
        System.out.println();
        System.out.println();

        int tmpInt = 0;
        // 把下标为 0 的值换成 1122
        tmpInt = atomicIntegerArray.getAndSet(0, 1122);
        // 获取下标为 0 的值
        System.out.println(tmpInt + "\t" + atomicIntegerArray.get(0));
        // 将下标为 1 的值进行 +1
        atomicIntegerArray.getAndIncrement(1);
        atomicIntegerArray.getAndIncrement(1);
        tmpInt = atomicIntegerArray.getAndIncrement(1);
        System.out.println(tmpInt + "\t" + atomicIntegerArray.get(1));
    }
}

4. 援用类型原子类

这个类,咱们在 JAVA 并发编程——CAS 概念以及 ABA 问题里介绍过,就是将这个值设置一个版本号,能够解决 ABA 问题。

5. 对象的属性批改原子类


这个类比拟有意思,咱们来介绍一下:

咱们应用这几个类的目标就是要以一种线程平安的形式,操作非线程安全类的某些字段 :比方操作一个 Account(账户类) 中的 Balance(余额)字段,因为其它字段比方 username(用户名)不容易更改,咱们只须要对 Balance 进行加锁就行了。

留神:
1)更新的对象属性必须应用 public volatile 修饰符。
2)应用静态方法 newUpdater()创立一个更新器,并且须要设置想要更新的类和属性。

@Data
public class BankAccount {

    private volatile int money;

    public void transferMoney(BankAccount bankAccount) {
        // 创立一个更新器
        AtomicIntegerFieldUpdater<BankAccount> integerFieldUpdater = AtomicIntegerFieldUpdater.newUpdater(BankAccount.class,"money");
        integerFieldUpdater.incrementAndGet(bankAccount);
    }
}

public class AtomicIntegerFieldUpdaterDemo {public static void main(String[] args) {BankAccount bankAccount = new BankAccount();

        for (int i = 1; i<=1000;i++){
            int finalI = i;
            new Thread(() -> {bankAccount.transferMoney(bankAccount);
            }, String.valueOf(i)).start();}

        // 暂停毫秒
        try {TimeUnit.MILLISECONDS.sleep(2000);
        } catch (InterruptedException e) {e.printStackTrace();
        }

        System.out.println(bankAccount.getMoney());

    }
}

6.LongAdder 原理剖析
接下来介绍一下最重要的一个类:LongAdder!

咱们都晓得,如果在并发的状况下,做一个点赞计数器,能够应用 AtomicInteger,它采纳自旋的乐观锁形式,进行自增,然而如果并发量更大一点,就会让很多线程取抢占 AtomicInteger,造成线程空转,这个时候,咱们能够应用 LongAdder!

LongAdder 的性能比 AtomicInteger 高出很多,咱们能够先来比照着看一下这两个类的性能:


import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;

public class LongAdderDemo {

    static class ClickNumberNet {
        int number = 0;
        // 应用 synchronized 锁进行自增
        public synchronized void clickBySync() {number++;}

        AtomicLong atomicLong = new AtomicLong(0);
        // 应用原子类进行自增
        public void clickByAtomicLong() {atomicLong.incrementAndGet();
        }

        LongAdder longAdder = new LongAdder();
        // 应用 longAdder 进行自增
        public void clickByLongAdder() {longAdder.increment();
        }

        LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x + y, 0);
        // 应用 longAccumulator(自定义自增函数内容)进行自增
        public void clickByLongAccumulator() {longAccumulator.accumulate(1);
        }
    }

    public static void main(String[] args) throws InterruptedException {ClickNumberNet clickNumberNet = new ClickNumberNet();

        long startTime;
        long endTime;
        CountDownLatch countDownLatch = new CountDownLatch(50);
        CountDownLatch countDownLatch2 = new CountDownLatch(50);
        CountDownLatch countDownLatch3 = new CountDownLatch(50);
        CountDownLatch countDownLatch4 = new CountDownLatch(50);


        startTime = System.currentTimeMillis();
        for (int i = 1; i <= 50; i++) {new Thread(() -> {
                try {for (int j = 1; j <= 100 * 10000; j++) {clickNumberNet.clickBySync();
                    }
                } finally {countDownLatch.countDown();
                }
            }, String.valueOf(i)).start();}
        countDownLatch.await();
        endTime = System.currentTimeMillis();
        System.out.println("synchronized----costTime:" + (endTime - startTime) + "毫秒" + "\t clickBySync result:" + clickNumberNet.number);

        startTime = System.currentTimeMillis();
        for (int i = 1; i <= 50; i++) {new Thread(() -> {
                try {for (int j = 1; j <= 100 * 10000; j++) {clickNumberNet.clickByAtomicLong();
                    }
                } finally {countDownLatch2.countDown();
                }
            }, String.valueOf(i)).start();}
        countDownLatch2.await();
        endTime = System.currentTimeMillis();
        System.out.println("AtomicInteger----costTime:" + (endTime - startTime) + "毫秒" + "\t clickByAtomicLong result:" + clickNumberNet.atomicLong);

        startTime = System.currentTimeMillis();
        for (int i = 1; i <= 50; i++) {new Thread(() -> {
                try {for (int j = 1; j <= 100 * 10000; j++) {clickNumberNet.clickByLongAdder();
                    }
                } finally {countDownLatch3.countDown();
                }
            }, String.valueOf(i)).start();}
        countDownLatch3.await();
        endTime = System.currentTimeMillis();
        System.out.println("LongAdder----costTime:" + (endTime - startTime) + "毫秒" + "\t clickByLongAdder result:" + clickNumberNet.longAdder.sum());

        startTime = System.currentTimeMillis();
        for (int i = 1; i <= 50; i++) {new Thread(() -> {
                try {for (int j = 1; j <= 100 * 10000; j++) {clickNumberNet.clickByLongAccumulator();
                    }
                } finally {countDownLatch4.countDown();
                }
            }, String.valueOf(i)).start();}
        countDownLatch4.await();
        endTime = System.currentTimeMillis();
        System.out.println("LongAccumulator----costTime:" + (endTime - startTime) + "毫秒" + "\t clickByLongAccumulator result:" + clickNumberNet.longAccumulator.longValue());


    }
}

运行后果为:

咱们发现,LongAdder 的性能是 AtomicInteger 类性能的 十倍!咱们来看一下为什么 LongAdder 能这么快。

咱们从源码的形容中能够看出:当应用少的并发来操作这个 LongAdder 类的时候,LongAdder 和 AtomicLong 领有 差不多的 性能,当并发量大的时候,LongAdder 的 性能会高很 多,因为 LongAdder破费了更多的空间。

也就是说,LongAdder 采取的是用空间换工夫的办法来晋升了统计速度。
咱们顺便来看一下 LongAdder 的父类 Striped64:

Striped64 用了一个 Cell 数组和 base 以及 cellsBusy,咱们来看一下上面这个形容和这张原理图,咱们就能大略明确 LongAdder 的底层原理了:

LongAdder 的基本思路就是 扩散热点 将 value 值扩散到一个 Cell 数组中,不同线程会命中到数组的不同槽位外面,各个线程对本人槽中的那个值进行 CAS 操作,这样热点就被扩散了,抵触的概率就会小很多,如果要获取真正的 long 值,只有将各个槽的变量累计加回就能够了。


sum()办法会将 所有 cell 数组中的 value 和 base 累加作为返回值,核心思想就是将 AtomicLong 的一个 value 的更新压力分到多个 value 当中去,以空间换工夫,从而降级更新热点。

数学表白公式:

7.LongAdder 源码解读

说了这么多,咱们来看看 LongAdder 的源码:

    public void add(long x) {
        //cs 示意 cell 的援用,b 示意 base 的值,v 示意期望值,m 示意 cell 数组的长度,c 示意命中 cell 的单元格
        Cell[] cs; long b, v; int m; Cell c;
        // 首次操作线程,cell 必定为空,去走 cas,失败的时候才会走到 if 外面
        // 如果 cell 不为空,则必定产生过竞争强烈的景象,进入 if
        // 如果 cas 失败,则竞争必定也强烈,进入 if
        if ((cs = cells) != null || !casBase(b = base, b + x)) {
            // 不是竞争强烈的 true 代表无竞争,false 代表竞争积攒
            boolean uncontended = true;
            //1. 如果 cell 数组为空,则要进入初始化,进入办法(初始化 cell 数组)//2. 如果数组长度小于 0,则进入初始化(初始化 cell 数组)(个别不会进入这个条件)//3.getProbe()是通过算法返回这个线程应该落在哪个槽,如果这个槽还没初始化,则进入这个办法,初始化 cell
           //4. 如果这个槽 cas 失败,则进入这个办法(去扩容)if (cs == null || (m = cs.length - 1) < 0 ||
                (c = cs[getProbe() & m]) == null ||
                !(uncontended = c.cas(v = c.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }
 final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        // 如果线程还未随机调配哈希值,给调配一个
        if ((h = getProbe()) == 0) {ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        done: for (;;) { // 自旋
            Cell[] cs; Cell c; int n; long v;
            if ((cs = cells) != null && (n = cs.length) > 0) {if ((c = cs[(n - 1) & h]) == null) {// 如果命中了一个槽位,这个槽位还未初始化
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {// 双重查看锁
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {rs[j] = r;
                                    break done;
                                }
                            } finally {cellsBusy = 0;}
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                else if (c.cas(v = c.value,
                               (fn == null) ? v + x : fn.applyAsLong(v, x)))// 对某一个槽位进行 cas
                    break;
                else if (n >= NCPU || cells != cs)// 容量曾经超过 CPU 核数,不能扩容
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 && casCellsBusy()) {// 扩容操作
                    try {if (cells == cs)        // Expand table unless stale
                            cells = Arrays.copyOf(cs, n << 1);
                    } finally {cellsBusy = 0;}
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {// 初始化 cell 数组
                try {                           // Initialize table
                    if (cells == cs) {Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        break done;
                    }
                } finally {cellsBusy = 0;}
            }
            // Fall back on using base
            // 如果全副操作都失败,进行 casBase, 兜底操作 
            else if (casBase(v = base,
                             (fn == null) ? v + x : fn.applyAsLong(v, x)))
                break done;
        }
    }

8. 总结

明天咱们学习了原子操作类以及 LongAdder 源码剖析,
根本类型原子类咱们平时最相熟,应用乐观锁。
数组类型原子类能够操作数组。
援用类型原子类带版本号,能够解决 aba 问题。
对象的属性批改原子类,能够让加锁的粒度更细。
LongAdder 是每个线程领有本人的槽,各个线程个别只对本人槽中的那个值进行 CAS 操作。

正文完
 0