关于并发编程:Java并发编程并发操作原子类Atomic以及CAS的ABA问题

28次阅读

共计 9646 个字符,预计需要花费 25 分钟才能阅读完成。

本文基于 JDK1.8

Atomic 原子类

原子类是具备原子操作特色的类。

原子类存在于 java.util.concurrent.atmic 包下。

依据操作的数据类型,原子类能够分为以下几类。

根本类型

  • AtomicInteger:整型原子类
  • AtomicLong:长整型原子类
  • AtomicBoolean:布尔型原子类

AtomicInteger 的罕用办法

public final int get() // 获取以后的值
public final int getAndSet(int newValue)// 获取以后的值,并设置新的值
public final int getAndIncrement()// 获取以后的值,并自增
public final int getAndDecrement() // 获取以后的值,并自减
public final int getAndAdd(int delta) // 加上给定的值,并返回之前的值
public final int addAndGet(int delta) // 加上给定的值,并返回最终后果
boolean compareAndSet(int expect, int update) // 如果输出的数值等于预期值,则以原子形式将该值设置为输出值(update)public final void lazySet(int newValue)// 最终设置为 newValue, 应用 lazySet 设置之后可能导致其余线程在之后的一小段时间内还是能够读到旧的值。

AtomicInteger 常见办法的应用

@Test
public void AtomicIntegerT() {AtomicInteger c = new AtomicInteger();

    c.set(10);
    System.out.println("初始设置的值 ==>" + c.get());

    int andAdd = c.getAndAdd(10);
    System.out.println("为原先的值加上 10, 并返回原先的值, 原先的值是 ==>" + andAdd + "加上之后的值是 ==>" + c.get());

    int finalVal = c.addAndGet(5);
    System.out.println("加上 5, 之后的值是 ==>" + finalVal);

    int i = c.incrementAndGet();
    System.out.println("++1, 之后的值为 ==>" + i);
    
    int result = c.updateAndGet(e -> e + 3);
    System.out.println("能够应用函数式更新 + 3 计算后的后果为 ==>"+ result);

    int res = c.accumulateAndGet(10, (x, y) -> x + y);
    System.out.println("应用指定函数计算后的后果为 ==>" + res);
}

初始设置的值 ==>10
为原先的值加上 10, 并返回原先的值, 原先的值是 ==> 10 
加上之后的值是 ==> 20
加上 5, 之后的值是 ==> 25
++1, 之后的值为 ==> 26
能够应用函数式更新 + 3 计算后的后果为 ==> 29
应用指定函数计算后的后果为 ==>39 

AtomicInteger 保障原子性

咱们晓得,volatile 能够保障可见性和有序性,然而不能保障原子性,因而,以下的代码在并发环境下的后果会不正确:最终的后果可能会小于 10000。

public class AtomicTest {static CountDownLatch c = new CountDownLatch(10);
    public volatile int inc = 0;

    public static void main(String[] args) throws InterruptedException {final AtomicTest test = new AtomicTest();
        for (int i = 0; i < 10; i++) {new Thread(() -> {for (int j = 0; j < 1000; j++) {test.increase();
                }
                c.countDown();}).start();}
        c.await();
        System.out.println(test.inc);

    }
    // 不是原子操作, 先读取 inc 的值, inc + 1, 写回内存
    public void increase() {inc++;}
} 

想要解决最终后果不是 10000 的方法有两个:

  • 应用 synchronized 关键字,润饰 increase 办法,锁能够保障该办法某一时刻只能有一个线程执行,保障了原子性。
 public synchronized void increase() {inc++;} 
  • 应用 Atomic 原子类,比方这里的AtomicInteger
public class AtomicTest {static CountDownLatch c = new CountDownLatch(10);

    // 应用整型原子类 保障原子性
    public AtomicInteger inc = new AtomicInteger();

    public static void main(String[] args) throws InterruptedException {final AtomicTest test = new AtomicTest();
        for (int i = 0; i < 10; i++) {new Thread(() -> {for (int j = 0; j < 1000; j++) {test.increase();
                }
                c.countDown();}).start();}
        c.await();
        System.out.println(test.getCount());
    }

    // 获取以后的值,并自增
    public void increase() {inc.getAndIncrement();
    }

    // 获取以后的值
    public int getCount() {return inc.get();
    }
} 

getAndIncrement()办法的实现

getAndIncrement 办法是如何确保原子操作的呢?

 private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            //objectFieldOffset 本地办法,用来拿到“原来的值”的内存地址。valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) {throw new Error(ex); }
    }
    //value 在内存中可见,JVM 能够保障任何时刻任何线程总能拿到该变量的最新值
    private volatile int value;   


    public final int incrementAndGet() {return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    } 

openjdk1.8Unsafe 类的源码:Unsafe.java

 /**
     * Atomically adds the given value to the current value of a field
     * or array element within the given object <code>o</code>
     * at the given <code>offset</code>.
     *
     * @param o object/array to update the field/element in
     * @param offset field/element offset
     * @param delta the value to add
     * @return the previous value
     * @since 1.8
     */
    public final int getAndAddInt(Object o, long offset, int delta) {
        int v;
        do {v = getIntVolatile(o, offset);
        } while (!compareAndSwapInt(o, offset, v, v + delta));
        return v;
    } 

Java 的源码改变是有的,《Java 并发编程的艺术》的内容也在此摘录一下,相对来说更好了解一些:

 public final int getAddIncrement() {for ( ; ;) {
            // 先获得存储的值
            int current = get();
            // 加 1 操作
            int next = current + 1;
            // CAS 保障原子更新操作,如果输出的数值等于预期值,将值设置为输出的值
            if (compareAndSet(current, next)) {return current;}
        }
    }

    public final boolean compareAndSet(int expect, int update) {return unsafe.compareAndSwapInt(this, valueOffset, expect, update); 

数组类型

  • AtomicIntegerArray:整型数组原子类
  • AtomicLongArray:长整型数组原子类
  • AtomicReferenceArray:援用类型数组原子类

AtomicIntegerArray 的罕用办法

@Test
public void AtomicIntegerArrayT() {int[] nums = {1, 2, 3, 4, 5};
    AtomicIntegerArray c = new AtomicIntegerArray(nums);

    for (int i = 0; i < nums.length; i++) {System.out.print(c.get(i) + " ");
    }
    System.out.println();

    int finalVal = c.addAndGet(0, 10);
    System.out.println("索引为 0 的值 加上 10  ==>" + finalVal);

    int i = c.incrementAndGet(0);
    System.out.println("索引为 0 的值 ++1, 之后的值为 ==>" + i);

    int result = c.updateAndGet(0, e -> e + 3);
    System.out.println("能够应用函数式更新索引为 0 的地位 + 3 计算后的后果为 ==>" + result);

    int res = c.accumulateAndGet(0, 10, (x, y) -> x * y);
    System.out.println("应用指定函数计算后的后果为 ==>" + res);
} 

援用类型

根本类型原子类只能更新一个变量,如果须要原子更新多个变量,须要应用 援用类型原子类。

  • AtomicReference:援用类型原子类
  • AtomicMarkableReference:原子更新带有标记的援用类型,无奈解决 ABA 问题,该类的标记更多用于 示意援用值是否已逻辑删除
  • AtomicStampedReference:原子更新带有版本号的援用类型。该类将整数值与援用关联起来,可用于解决原子的更新数据和数据的版本号,能够解决应用 CAS 进行原子更新时可能呈现的 ABA 问题。

AtomicReference 常见办法的应用

@Test
public void AtomicReferenceT(){AtomicReference<Person> ar = new AtomicReference<>();
    Person p = new Person(18,"summer");

    ar.set(p);

    Person pp = new Person(50,"dan");
    ar.compareAndSet(p, pp);// except = p  update = pp

    System.out.println(ar.get().getName());
    System.out.println(ar.get().getAge());

}
@Data
@AllArgsConstructor
@NoArgsConstructor
class Person{

    int age;
    String name;
}
//dan
//50 

对象的属性批改类型

如果须要 原子更新某个类里的某个字段 时,须要用到对象的属性批改类型原子类。

  • AtomicIntegerFieldUpdater: 原子更新整型字段的更新器
  • AtomicLongFieldUpdater:原子更新长整型字段的更新器
  • AtomicReferenceFieldUpdater:原子更新援用类型里的字段

要想原子地更新对象的属性须要两步。

  1. 因为对象的属性批改类型原子类都是抽象类,所以每次应用都必须应用静态方法 newUpdater()创立一个更新器,并且须要设置想要更新的类和属性。
  2. 更新的对象属性必须应用 public volatile 修饰符。

AtomicIntegerFieldUpdater 罕用办法的应用

@Test
public void AtomicIntegerFieldUpdateTest(){
    AtomicIntegerFieldUpdater<Person> a =
        AtomicIntegerFieldUpdater.newUpdater(Person.class,"age");
    Person p = new Person(18,"summer");
    System.out.println(a.getAndIncrement(p)); //18
    System.out.println(a.get(p)); //19
}
@Data
@AllArgsConstructor
@NoArgsConstructor
class Person{

    public volatile int age;
    private String name;
} 

Java8 新增的原子操作类

  • LongAdder

因为 AtomicLong 通过 CAS 提供非阻塞的原子性操作,性能曾经很好,在高并发下大量线程竞争更新同一个原子量,但只有一个线程可能更新胜利,这就造成大量的 CPU 资源节约。

LongAdder 通过让多个线程去竞争多个 Cell 资源,来解决,再很高的并发状况下, 线程操作的是 Cell 数组,并不是 base,在 cell 元素有余时进行 2 倍扩容,在高并发下性能高于 AtomicLong

CAS 的 ABA 问题的产生

假如两个线程拜访同一变量 x。

  1. 第一个线程获取到了变量 x 的值 A,而后执行本人的逻辑。
  2. 这段时间内,第二个线程也取到了变量 x 的值 A,而后将变量 x 的值改为 B,而后执行本人的逻辑,最初又把变量 x 的值变为 A【还原】。
  3. 在这之后,第一个线程终于进行了变量 x 的操作,但此时变量 x 的值还是 A,认为 x 的值没有变动,所以 compareAndSet 还是会胜利执行。

先来看一个值变量产生的 ABA 问题,了解一下 ABA 问题产生的流程:

@SneakyThrows
@Test
public void test1() {AtomicInteger atomicInteger = new AtomicInteger(10);

    CountDownLatch countDownLatch = new CountDownLatch(2);

    new Thread(() -> {atomicInteger.compareAndSet(10, 11);
        atomicInteger.compareAndSet(11,10);
        System.out.println(Thread.currentThread().getName() + ":10->11->10");
        countDownLatch.countDown();}).start();

    new Thread(() -> {
        try {TimeUnit.SECONDS.sleep(1);
            boolean isSuccess = atomicInteger.compareAndSet(10,12);
            System.out.println("设置是否胜利:" + isSuccess + ", 设置的新值:" + atomicInteger.get());
        } catch (InterruptedException e) {e.printStackTrace();
        }
        countDownLatch.countDown();}).start();

    countDownLatch.await();}
// 输入:线程 2 并没有发现初始值曾经被批改
//Thread-0:10->11->10
// 设置是否胜利:true, 设置的新值:12 

ABA 问题存在,但可能对值变量并不会造成后果上的影响,然而思考一种非凡的状况:

https://zhuanlan.zhihu.com/p/237611535

  1. 线程 1 和线程 2 并发拜访 ConcurrentStack。
  2. 线程 1 执行出栈【预期后果是弹出 B,A 成为栈顶】,但在读取栈顶 B 之后,被线程 2 抢占。
  3. 线程 2 记录栈顶 B,顺次弹出 B 和 A,再顺次将 C,D,B 入栈,且保障 B 就是原栈顶记录的 B。
  4. 之后轮到线程 1,发现栈顶的确是冀望的 B,遂弹出 B,但此时栈顶曾经是 D,就呈现了谬误。

BAB 的问题如何解决

AtomicStampedReference 原子更新带有版本号的援用类型。该类将整数值与援用关联起来,可用于解决原子的更新数据和数据的版本号,能够解决应用 CAS 进行原子更新时可能呈现的 ABA 问题。

@SneakyThrows
@Test
public void test2() {AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference(10,1);

    CountDownLatch countDownLatch = new CountDownLatch(2);

    new Thread(() -> {System.out.println(Thread.currentThread().getName() + "第一次版本:" + atomicStampedReference.getStamp());
        atomicStampedReference.compareAndSet(10, 11, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
        System.out.println(Thread.currentThread().getName() + "第二次版本:" + atomicStampedReference.getStamp());
        atomicStampedReference.compareAndSet(11, 10, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
        System.out.println(Thread.currentThread().getName() + "第三次版本:" + atomicStampedReference.getStamp());
        countDownLatch.countDown();}).start();

    new Thread(() -> {System.out.println(Thread.currentThread().getName() + "第一次版本:" + atomicStampedReference.getStamp());
        try {TimeUnit.SECONDS.sleep(2);
            boolean isSuccess = atomicStampedReference.compareAndSet(10,12, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
            System.out.println(Thread.currentThread().getName() + "批改是否胜利:" + isSuccess + "以后版本:" + atomicStampedReference.getStamp() + "以后值:" + atomicStampedReference.getReference());
            countDownLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();
        }
    }).start();

    countDownLatch.await();}
// 输入
// 输入
Thread-0 第一次版本:1
Thread-0 第二次版本:2
Thread-0 第三次版本:3
Thread-1 第一次版本:3
Thread-1 批改是否胜利:true 以后版本:4 以后值:12 

而 AtomicMarkableReference 通过标记位,标记位只有 true 和 false,每次更新标记位的话,在第三次的时候,又会变得跟第一次一样,并不能解决 ABA 问题。

@SneakyThrows
@Test
public void test3() {AtomicMarkableReference<Integer> markableReference = new AtomicMarkableReference<>(10, false);

    CountDownLatch countDownLatch = new CountDownLatch(2);

    new Thread(() -> {System.out.println(Thread.currentThread().getName() + "第一次标记:" + markableReference.isMarked());
        markableReference.compareAndSet(10, 11, markableReference.isMarked(), true);
        System.out.println(Thread.currentThread().getName() + "第二次标记:" + markableReference.isMarked());
        markableReference.compareAndSet(11, 10, markableReference.isMarked(), false);
        System.out.println(Thread.currentThread().getName() + "第三次标记:" + markableReference.isMarked());
        countDownLatch.countDown();}).start();

    new Thread(() -> {System.out.println(Thread.currentThread().getName() + "第一次标记:" + markableReference.isMarked());
        try {TimeUnit.SECONDS.sleep(2);
            boolean isSuccess = markableReference.compareAndSet(10,12, false, true);
            System.out.println(Thread.currentThread().getName() + "批改是否胜利:" + isSuccess + "以后标记:" + markableReference.isMarked() + "以后值:" + markableReference.getReference());
            countDownLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();
        }
    }).start();

    countDownLatch.await();}
// 输入
Thread-0 第一次标记:false
Thread-0 第二次标记:true
Thread-0 第三次标记:false
Thread-1 第一次标记:false
Thread-1 批改是否胜利:true 以后标记:true 以后值:12 

如果感觉本文对你有帮忙,能够点赞关注反对一下,一起学习提高

正文完
 0