关于java:刨根问底-Java-并发之-CAS

39次阅读

共计 6824 个字符,预计需要花费 18 分钟才能阅读完成。

前言

后端开发中大家必定遇到过实现一个线程平安的计数器这种需要,依据教训你应该晓得咱们要在多线程中实现 「共享变量」 的原子性和可见性问题,于是锁成为一个不可避免的话题,明天咱们探讨的是与之对应的无锁 CAS。本文会从怎么来的、是什么、怎么用、原理剖析、遇到的问题等不同的角度带你真正搞懂 CAS。

为什么要无锁

咱们一想到在多线程下保障平安的形式头一个要拎进去的必定是锁,不论从硬件、操作系统层面都或多或少在应用锁。锁有什么毛病吗?当然有了,不然 JDK 里为什么呈现那么多各式各样的锁,就是因为每一种锁都有其优劣势。

应用锁就须要取得锁、开释锁,CPU 须要通过上下文切换和调度治理来进行这个操作,对于一个 「独占锁」 而言一个线程在持有锁后没执行完结其余的哥们就必须在里面等着,等到后面的哥们执行结束 CPU 大哥就会把锁拿进去其余的线程来抢了(非偏心)。锁的这种概念基于一种乐观机制,它总是认为数据会被批改,所以你在操作一部分代码块之前先加一把锁,操作结束后再开释,这样就平安了。其实在 JDK1.5 应用 synchronized 就能够做到。

然而像下面的操作在多线程下会让 CPU 一直的切换,十分耗费资源,咱们晓得能够应用具体的某一类锁来防止局部问题。那除了锁的形式还有其余的吗?当然,有人就提出了无锁算法,比拟有名的就是咱们明天要说的 CAS(compare and swap),和锁不同的是它是一种乐观的机制,它认为他人去拿数据的时候不会批改,然而在批改数据的时候去判断一下数据此时的状态,这样的话 CPU 不会切换,在读多的状况下性能将失去大幅晋升。以后咱们应用的大部分 CPU 都有 CAS 指令了,从硬件层面反对无锁,这样开发的时候去调用就能够了。

不论是锁还是无锁都有其优劣势,前面咱们也会通过例子阐明 CAS 的问题。

什么是 CAS?

后面提了无锁的 CAS,那到底 CAS 是个啥呢?我曾经急不可待了,咱们来看看维基百科的解释

比拟并替换 (compare and swap, CAS),是原子操作的一种,可用于在多线程编程中实现不被打断的数据交换操作,从而防止多线程同时改写某一数据时因为执行程序不确定性以及中断的不可预知性产生的数据不统一问题。该操作通过将内存中的值与指定数据进行比拟,当数值一样时将内存中的数据替换为新的值。

CAS 给咱们提供了一种思路,通过 「比拟」 和 「替换」 来实现原子性,来看一段代码:

`int cas(long *addr, long old, long new) {`
 `/* 原子执行 */`
 `if(*addr != old)`
 `return 0;`
 `*addr = new;`
 `return 1;`
`}`

这是一段 c 语言代码,能够看到有 3 个参数,别离是:

  • *addr: 进行比拟的值
  • old: 内存以后值
  • new: 筹备批改的新值,写入到内存

只有咱们以后传入的进行比拟的值和内存里的值相等,就将新值批改胜利,否则返回 0 通知比拟失败了。学过数据库的同学都晓得乐观锁和乐观锁,乐观锁总是认为数据不会被批改。基于这种假如 CAS 的操作也认为内存里的值和以后值是相等的,所以操作总是能胜利,咱们能够不须要加锁就实现多线程下的原子性操作。

在多线程状况下应用 CAS 同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被阻塞挂起,而是通知它这次批改失败了,你能够从新尝试,于是能够写这样的代码。

`while (!cas(&addr, old, newValue)) {`
`}`
`// success`
`printf("new value = %ld", addr);`

不过这样的代码置信你可能看出其中的蹊跷了,这个咱们前面来剖析,上面来看看 Java 里是怎么用 CAS 的。

Java 中的 CAS

还是后面的问题,如果让你用 Java 的 API 来实现你可能会想到两种形式,一种是加锁(可能是 synchronized 或者其余品种的锁),另一种是应用 atomic 类,如 AtomicInteger,这一系列类是在 JDK1.5 的时候呈现的,在咱们罕用的 java.util.concurrent.atomic 包下,咱们来看个例子:

`ExecutorService executorService = Executors.newCachedThreadPool();`
`AtomicInteger   atomicInteger   = new AtomicInteger(0);`
`for (int i = 0; i < 5000; i++) {`
 `executorService.execute(atomicInteger::incrementAndGet);`
`}`
`System.out.println(atomicInteger.get());`
`executorService.shutdown();`

这个例子开启了 5000 个线程去进行累加操作,不论你执行多少次答案都是 5000。这么神奇的操作是如何实现的呢?就是依附 CAS 这种技术来实现的,咱们揭开 AtomicInteger 的老底看看它的代码:

`public class AtomicInteger extends Number implements java.io.Serializable {`
 `private static final long serialVersionUID = 6214790243416807050L;`
 `// setup to use Unsafe.compareAndSwapInt for updates`
 `private static final Unsafe unsafe = Unsafe.getUnsafe();`
 `private static final long valueOffset;`
 `static {`
 `try {`
 `valueOffset = unsafe.objectFieldOffset`
 `(AtomicInteger.class.getDeclaredField("value"));`
 `} catch (Exception ex) {throw new Error(ex); }`
 `}`
 `private volatile int value;`
 `/**`
 `* Creates a new AtomicInteger with the given initial value.` 
 `* @param initialValue the initial value`
 `*/`
 `public AtomicInteger(int initialValue) {`
 `value = initialValue;`
 `}`
 `/**`
 `* Gets the current value.` 
 `* @return the current value`
 `*/`
 `public final int get() {`
 `return value;`
 `}`
 `/**`
 `* Atomically increments by one the current value.` 
 `* @return the updated value`
 `*/`
 `public final int incrementAndGet() {`
 `return unsafe.getAndAddInt(this, valueOffset, 1) + 1;`
 `}`
`}`

这里我只帖出了咱们后面例子相干的代码,其余都是相似的,能够看到 incrementAndGet 调用了 unsafe.getAndAddInt 办法。Unsafe 这个类是 JDK 提供的一个比拟底层的类,它不让咱们程序员间接应用,次要是怕操作不当把机器玩坏了。。。(其实能够通过反射的形式获取到这个类的实例)你会在 JDK 源码的很多中央看到这家伙,咱们先说说它有什么能力:

  • 内存治理:包含分配内存、开释内存
  • 操作类、对象、变量:通过获取对象和变量偏移量间接批改数据
  • 挂起与复原:将线程阻塞或者复原阻塞状态
  • CAS:调用 CPU 的 CAS 指令进行比拟和替换
  • 内存屏障:定义内存屏障,防止指令重排序

这里只是大抵提一下罕用的操作,具体细节能够在文末的参考链接中查看。上面咱们持续看 unsafegetAndAddInt 在做什么。

`public final int getAndAddInt(Object var1, long var2, int var4) {`
 `int var5;`
 `do {`
 `var5 = this.getIntVolatile(var1, var2);`
 `} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));`
 `return var5;`
`}`
`public native int getIntVolatile(Object var1, long var2);`
`public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);`

其实很简略,先通过 getIntVolatile 获取到内存的以后值,而后进行比拟,开展 compareAndSwapInt 办法的几个参数:

  • var1: 以后要操作的对象(其实就是 AtomicInteger 实例)
  • var2: 以后要操作的变量偏移量(能够了解为 CAS 中的内存以后值)
  • var4: 冀望内存中的值
  • var5: 要批改的新值

所以 this.compareAndSwapInt(var1, var2, var5, var5 + var4) 的意思就是,比拟一下 var2 和内存以后值 var5 是否相等,如果相等那我就将内存值 var5 批改为 var5 + var4var4 就是 1,也能够是其余数)。

这里咱们还须要解释一下 「偏移量」 是个啥?你在后面的代码中可能看到这么一段:

`// setup to use Unsafe.compareAndSwapInt for updates`
`private static final Unsafe unsafe = Unsafe.getUnsafe();`
`private static final long valueOffset;`
`static {`
 `try {`
 `valueOffset = unsafe.objectFieldOffset`
 `(AtomicInteger.class.getDeclaredField("value"));`
 `} catch (Exception ex) {throw new Error(ex); }`
`}`
`private volatile int value;`

能够看出在动态代码块执行的时候将 AtomicInteger 类的 value 这个字段的偏移量获取进去,拿这个 long 数据干嘛呢?在 Unsafe 类里很多中央都须要传入 obj 和偏移量,联合咱们说 Unsafe 的诸多能力,其实就是间接通过更底层的形式将对象字段在内存的数据批改掉。

应用下面的形式就能够很好的解决多线程下的原子性和可见性问题。因为代码里应用了 do while 这种循环构造,所以 CPU 不会被挂起,比拟失败后重试,就不存在上下文切换了,实现了无锁并发编程

CAS 存在的问题

自旋的劣势

你注意下面的代码会发现一个问题,while 循环如果在最坏状况下总是失败怎么办?会导致 CPU 在一直解决。像这种 while(!compareAndSwapInt) 的操作咱们称之为自旋,CAS 是乐观的,认为大家来并不都是批改数据的,事实可能呈现十分多的线程过去都要批改这个数据,此时随着并发量的减少会导致 CAS 操作长时间不胜利,CPU 也会有很大的开销。所以咱们要分明,如果是读多写少的状况也就满足乐观,性能是十分好的。

ABA 问题

提到 CAS 不得不说 ABA 问题,它是说如果内存的值原来是 A,被一个线程批改为了 B,此时又有一个线程把它批改为了 A,那么 CAS 必定是操作胜利的。真的这样做的话代码可能就有 bug 了,对于批改数据为 B 的那个线程它应该读取到 B 而不是 A,如果你做过数据库相干的乐观锁机制可能会想到咱们在比拟的时候应用一个版本号 version 来进行判断就能够搞定。在 JDK 里提供了一个 AtomicStampedReference 类来解决这个问题,来看一个例子:

`int stamp = 10001;`
`AtomicStampedReference<Integer> stampedReference = new AtomicStampedReference<>(0, stamp);`
`stampedReference.compareAndSet(0, 10, stamp, stamp + 1);`
`System.out.println("value:" + stampedReference.getReference());`
`System.out.println("stamp:" + stampedReference.getStamp());`

它的构造函数是 2 个参数,多传入了一个初始 工夫戳,用这个戳来给数据加了一个版本,这样的话多个线程来批改如果提供的戳不同。在批改数据的时候除了提供一个新的值之外还要提供一个新的戳,这样在多线程状况下只有数据被批改了那么戳肯定会产生扭转,另一个线程拿到的是旧的戳所以会批改失败。

尝试利用

既然 CAS 提供了这么好的 API,咱们无妨用它来实现一个简易版的独占锁。思路是当某个线程进入 lock 办法就比拟锁对象的内存值是否是 false,如果是则代表这把锁它能够获取,获取后将内存之批改为 true,获取不到就自旋。在 unlock 的时候将内存值再批改为 false 即可,代码如下:

`public class SpinLock {`
 `private AtomicBoolean mutex = new AtomicBoolean(false);`
 `public void lock() {`
 `while (!mutex.compareAndSet(false, true)) {`
 `// System.out.println(Thread.currentThread().getName()+ "wait lock release");`
 `}`
 `}`
 `public void unlock() {`
 `while (!mutex.compareAndSet(true, false)) {`
 `// System.out.println(Thread.currentThread().getName()+ "wait lock release");`
 `}`
 `}`
`}`

这里应用了 AtomicBoolean 这个类,当然用 AtomicInteger 也是能够的,因为咱们只保留一个状态 boolean 占用比拟小就用它了。这个锁的实现比较简单,毛病非常明显,因为 while 循环导致的自旋会让其余线程都在占用 CPU,然而也能够应用,对于锁的优化版本实现我会在后续的文章中进行改良和阐明,正因为这些问题咱们也会在后续钻研 AQS 这把利器的长处。

CAS 源码

看了下面的这些代码和解释置信你对 CAS 曾经了解了,上面咱们要说的原理是后面的 native 办法中的 C++ 代码写了什么,在 openjdk 的 /hotspot/src/share/vm/prims 目录中有一个 Unsafe.cpp 文件中有这样一段代码:

留神:这里以 hotspot 实现为例

`UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))`
 `UnsafeWrapper("Unsafe_CompareAndSwapInt");`
 `oop p = JNIHandles::resolve(obj);`
 `// 通过偏移量获取对象变量地址 `
 `jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);`
 `// 执行一个原子操作 `
 `// 如果后果和当初不同,就间接返回,因为有其他人批改了;否则会始终尝试去批改。直到胜利。`
 `return (jint)(Atomic::cmpxchg(x, addr, e)) == e;`
`UNSAFE_ENDC`

好了,对于 CAS 明天就分享到这里了,心愿对你有所帮忙。




正文完
 0