Java 高并发之无锁(CAS)
本篇次要讲 Java 中的无锁 CAS , 无锁 顾名思义就是 以不上锁的形式解决并发问题,而不应用 synchronized 和 lock 等。。
1. Atomic 包
java.util.concurrent.atomic 包下类都是原子类,原子类都是基于 sun.misc.Unsafe 实现的
根本能够分为一下几类:
- 原子性根本数据类型:AtomicBoolean、AtomicInteger、AtomicLong
- 原子性对象援用类型:AtomicReference、AtomicStampedReference、AtomicMarkableReference
- 原子性数组:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
- 原子性对象属性更新:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater
2.CAS 的概念
无锁 是指 所有的线程都能 无障碍的达到临界点(然而能不能胜利操作临界点的值 是不肯定的),无锁的实现采纳的是 CAS(compare and swap) 算法实现的
- CAS 须要依赖 CPU 提供的 CAS 指令,CPU 为了解决并发问题,提供了 CAS 指令
- CAS 指令须要 3 个参数,(变量值、预期值、新值),只有当 变量值和预期值 雷同的时候 才会更新变量值为新值
- CAS 是一条 CPU 指令,由 CPU 硬件级别上保障原子性
3.AtomicInteger 案例 以源码
AtomicInteger 我置信应该大部分都用过吧,用于解决 count++
因为 count ++ 操作 不是原子的,可能 第一个线程拿到的 count = 0 将要进行 ++ 操作的时候,被其余线程抢占了 CPU 资源,第二个线程此时读取的还是 count = 0,那么这样就会造成问题,而 AtomicInteger 它外部应用 CAS 算法 保障了并发平安问题。
public class AtomicIntegerDemo {
private static int count = 0;
private static AtomicInteger atomicInteger = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
for(int j = 0; j < 1000; j++){
count++;
atomicInteger.incrementAndGet(); // 外部 CAS 进行自增
}
});
thread.start();
threads[i] = thread;
}
for (int i = 0; i < threads.length; i++) {
threads[i].join();
}
System.out.println(“ 后果: ” + count);
System.out.println(“ 后果: ” + atomicInteger.get());
}
}
输入后果发现,atomicInteger 保障了数据正确
源码剖析:外部通过 unafe 类提供的办法解决的
,Java 中的 Unsafe 类为咱们提供了相似 C ++ 手动治理内存的能力, 能够间接获取某个属性的内容地址
,并且提供了一些对内存地址上的值得操作
private static final Unsafe unsafe = Unsafe.getUnsafe();
// 获取 value 属性的 对象偏移量
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField(“value”));
} catch (Exception ex) {throw new Error(ex); }
}
/**
- Atomically increments by one the current value.
*
- @return the updated value
*/
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
unsafe 的 getAndAddInt 办法,var5 相当于期望值 只有 外部调用 compareAndSwapInt 判断 var1 对象(AtomicInteger)的内存偏移值 var2(valueOffset)是否和期望值雷同
,如果雷同示意 这期间没有其余线程批改,否则 自旋再 获取 以后 var5 再比拟
//var5 是期望值 var5+var4 是新值
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);// 获取 this 上的 valueOffset 的偏移量的值 value
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
这个办法是 native (C/C++)编写的代码,外部实现 应用了 CPU 的 CAS 指令 具备原子性
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
AtomicLong 和 AtomicInteger 相似 这里不开展说了,
4.AtomicBoolean 原理和应用场景
而 AtomicBoolean 也相似,不过外部 value 是一个 int 通过 int 值是 1 还是 0 当做 true / false
public AtomicBoolean(boolean initialValue) {
value = initialValue ? 1 : 0;
}
public final boolean compareAndSet(boolean expectedValue, boolean newValue) {
return VALUE.compareAndSet(this,
(expectedValue ? 1 : 0),
(newValue ? 1 : 0));
}
当在多个线程 只解决 一次初始化性能的时候 能够应用
private static AtomicBoolean initialized = new AtomicBoolean(false);
public void init()
{
if(initialized.compareAndSet(false, true) )
{
// 这里搁置初始化代码 ….
}
}
5.AtomicReference
始终很难了解 AtmoicReference 的 应用场景,毕竟 java 赋值操作原本就是 原子性的
作用在 对
对象
进行原子操作 提供了一种读和写都是原子性的 对象援用变量
AtomicReference 和 AtomicInteger 十分相似 AtomicReference 则对应一般的对象援用,底层应用的是 compareAndSwapObject 实现 CAS,比拟的是两个对象的地址是否相等。也就是它能够保障你在批改对象援用时的线程安全性。
疑难:援用类型的赋值是原子的 为什么要有 AtomicReference 来保障批改对象援用时的线程安全性 呢?
真正须要应用 AtomicReference 的场景是你须要 CAS 类操作时,因为波及到比拟、设置等多于一个的操作
上面的案例是:模仿 拦截器拦挡到申请 记录 ip 而后 去更新最新的 LoginDetailInfo 登录信息 应用 AtomicReference 去包装老的 登录信息,应用 5 个线程去同时更新最新的 登录信息
package com.johnny.atomic;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
/**
- @author johnny
- @create 2020-09-16 下午 3:58
**/
public class AtomicReferenceDemo {
private static final int CORE_SIZE = 5;
private static final int MAX_SIZE = 10;
private static final int QUEUE_SIZE = 100;
private static final CountDownLatch countDownLatch = new CountDownLatch(5);
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = new ThreadPoolExecutor(CORE_SIZE, MAX_SIZE, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(QUEUE_SIZE));
//interceptor ip
// 依据拦截器拦挡到 最新的登录 ip
String newIp = “47.98.250.170”;
//from db search old LoginDetailInfo
// 从数据库中查到 最新的一次 登录信息
LoginDetailInfo oldLoginDetailInfo = LoginDetailInfo.builder()
.loginCount(10)
.ip(“47.98.250.138”)
.timeStamp(System.currentTimeMillis()).build();
// 封装老的 登录信息
AtomicReference<LoginDetailInfo> atomicReference = new AtomicReference<>(oldLoginDetailInfo);
// 封装工作
TaskRun taskRun = new TaskRun(atomicReference, newIp);
// 应用 5 个线程去 同时更新 最新的 登录信息,只有有一个胜利即可
for (int i = 0; i < 5; i++) {
executorService.execute(taskRun);
}
// 主线程 期待
countDownLatch.await();
System.out.println(“【所有线程执行结束 main 线程完结】”);
}
static class TaskRun implements Runnable {
private String ip;
private AtomicReference<LoginDetailInfo> atomicReference;
/**
- @param atomicReference : 老的 atomicReference
*/
public TaskRun(AtomicReference<LoginDetailInfo> atomicReference, String ip) {
this.ip = ip;
this.atomicReference = atomicReference;
}
@Override
public void run() {
if (atomicReference != null) {
LoginDetailInfo oldLoginDetailInfo = atomicReference.get();
long count = oldLoginDetailInfo.getLoginCount() + 1;
LoginDetailInfo newLoginDetailInfo = new LoginDetailInfo();
newLoginDetailInfo.setLoginCount(count);
newLoginDetailInfo.setIp(ip);
newLoginDetailInfo.setTimeStamp(System.currentTimeMillis());
try {
// 模仿 执行一下其余操作 能够让多个线程 都同时走到了 这里 保障多个线程 拿到的 oldLoginDetailInfo 是一个,这样才会只有一个 能更新胜利
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (atomicReference.compareAndSet(oldLoginDetailInfo, newLoginDetailInfo)) {
//save To Db or some …
System.out.println(Thread.currentThread().getName() + ” 线程更新胜利 — {}”);
System.out.println(newLoginDetailInfo);
}else{
System.out.println(Thread.currentThread().getName() + ” 线程更新失败 其余线程曾经更新 —“);
}
// 计数器 -1
countDownLatch.countDown();
}
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
static class LoginDetailInfo {
private long loginCount;
private String ip;
private long timeStamp;
}
}
运行后果如下:
这里的应用场景很牵强,不晓得你会 怎么应用 AtomicReference 欢送在下方留言 给我提点思路
6. AtomicStampRefence
线程判断被批改对象是否能够正确写入的条件 是对象的以后值和冀望是否统一
。这个逻辑从个别意义上来说是正确的。但有可能呈现一个小小的例外, 就是当你取得对象以后数据后,在筹备批改为新值前,对象的值被其余线程间断批改了 2 次,而通过这 2 次批改后,对象的值又复原为旧值
。这样,以后线程就无奈正确判断这个对象到底是否被批改过, 这就是典型的 CAS ABA 问题
如何解决 ABA 问题呢,其实 Java 提供了 AtomicStampRefence 就是用来解决 ABA 问题的
模仿如下案例:
现有一个用单向链表实现的堆栈,栈顶为 A,这时线程 T1 曾经晓得 A.next 为 B,而后心愿用 CAS 将栈顶替换为 B:
head.compareAndSet(A,B);
在 T1 执行下面这条指令之前,线程 T2 染指,将 A、B 出栈,再 pushD、C、A,此时堆栈构造如下图,而对象 B 此时处于游离状态:
此时轮到线程 T1 执行 CAS 操作,检测发现栈顶仍为 A,所以 CAS 胜利,栈顶变为 B,但实际上 B.next 为 null,所以此时的状况变为:
其中堆栈中只有 B 一个元素,C 和 D 组成的链表不再存在于堆栈中,平白无故就把 C、D 丢掉了。
上面咱们就来用代码 模仿下面这个案例:
6.1 首先咱们定义 栈 :
/**
*
*/
@Data
static class Node<T> {
private T value;
private Node<T> next;
public Node(T value) {
this.value = value;
}
}
static class Stack<T> {
private Node<T> top;
private int length = 0;
public boolean isEmpty() {
if (length == 0 || top == null) {
return true;
} else {
return false;
}
}
public void push(Node<T> node) {
if (isEmpty()) {
top = node;
} else {
// 替换
//node.setNext(top);
top = node;
}
length++;
}
public void push(T value) {
Node<T> node = new Node<T>(value);
if (isEmpty()) {
top = node;
} else {
// 替换
node.setNext(top);
top = node;
}
length++;
}
public T pop() {
if (isEmpty()) {
System.out.println(“ 栈 为空 ”);
return null;
} else {
T result = top.getValue();
top = top.next;
length–;
return result;
}
}
public T top() {
if (isEmpty()) {
System.out.println(“ 栈 为空 ”);
return null;
} else {
return top.getValue();
}
}
public Node<T> topNode() {
if (isEmpty()) {
System.out.println(“ 栈 为空 ”);
return null;
} else {
return top;
}
}
public Node<T> topNextNode() {
if (isEmpty()) {
System.out.println(“ 栈 为空 ”);
return null;
} else {
return top.next;
}
}
public int size() {
return length;
}
public void deleteStack() {
top = null;
length = 0;
}
}
6.2 应用 AtmoicReference 来看看带来的问题
package com.johnny.atomic;
import lombok.Data;
import java.util.concurrent.atomic.AtomicReference;
/**
- ABA 问题 Demo
*
- @author johnny
- @create 2020-09-20 下午 5:52
**/
public class AtomicReferenceDemo {
/**
- 应用 Stack 来 模仿 CAS ABA 问题
*
- @param args
*/
public static void main(String[] args) throws InterruptedException {
//1. 初始化 栈
Stack<String> stack = new Stack<String>();
stack.push(“B”);
stack.push(“A”);
//headReference 保留 以后的 栈的 top
AtomicReference<Node<String>> headReference = new AtomicReference<>(stack.topNode());
//2. 须要 2 个线程
TaskA taskA = new TaskA(stack, headReference);
TaskB taskB = new TaskB(stack, headReference);
Thread threadA = new Thread(taskA);
Thread threadB = new Thread(taskB);
threadA.start();
threadB.start();
threadA.join();
threadB.join();
System.out.println(stack.pop());
System.out.println(stack.pop());
System.out.println(stack.pop());
}
static class TaskA implements Runnable {
private AtomicReference<Node<String>> headReference;
private Stack<String> stack;
public TaskA(Stack<String> stack, AtomicReference<Node<String>> headReference) {
this.stack = stack;
this.headReference = headReference;
}
@Override
public void run() {
if (stack.topNode() == headReference.get()) {
Node<String> aNode = headReference.get();
Node<String> bNode = stack.topNextNode();
// 让 TaskB 去把 AB 出栈并且 入 D C A
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 此时 TaskB 把 栈 变成 D C A top 还是 A 更新 因为此时 TaskA 还是认为 B.next = null 就会弄丢 D C
if (headReference.compareAndSet(aNode, bNode)) {
// 把 A 出栈
stack.pop();
// 增加 B 当做 top
stack.push(bNode);
} else {
System.out.println(“ 有其余线程 已更新 top”);
}
}
}
}
static class TaskB implements Runnable {
private AtomicReference<Node<String>> headReference;
private Stack<String> stack;
public TaskB(Stack<String> stack, AtomicReference<Node<String>> headReference) {
this.stack = stack;
this.headReference = headReference;
}
@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (stack.topNode() == headReference.get()) {
Node<String> aNode = headReference.get();
//A 出栈
stack.pop();
//B 出栈
stack.pop();
Node<String> dNode = new Node<>(“D”);
Node<String> cNode = new Node<>(“C”);
dNode.next = cNode;
cNode.next = aNode;
// 顺次入栈 D C A
if (headReference.compareAndSet(aNode, dNode)) {
stack.push(dNode);
}
if (headReference.compareAndSet(dNode, cNode)) {
stack.push(cNode);
}
if (headReference.compareAndSet(cNode, aNode)) {
stack.push(aNode);
}
}
}
}
}
能够看到 最初栈里 只有 B 了,其余的 D C 都被弄丢了
6.3 通过应用 AtomicStampedReference 来改良
AtomicStampedReference 外部还保护了一个 Stamp 来记录 更改次数,这样当 从 A 变到 B 再变回 A 的时候,尽管值 没变,然而 Stamp 变动了,仍然不能更新胜利
/**
- 应用 Stack 来 模仿 CAS ABA 问题
*
- @param args
*/
public static void main(String[] args) throws InterruptedException {
//1. 初始化 栈
Stack<String> stack = new Stack<String>();
stack.push(“B”);
stack.push(“A”);
AtomicStampedReference<Node<String>> head = new AtomicStampedReference<>(stack.topNode(), 0);
//2. 须要 2 个线程
TaskA taskA = new TaskA(stack, head);
TaskB taskB = new TaskB(stack, head);
Thread threadA = new Thread(taskA);
Thread threadB = new Thread(taskB);
threadA.start();
threadB.start();
threadA.join();
threadB.join();
System.out.println(stack.pop());
System.out.println(stack.pop());
System.out.println(stack.pop());
}
static class TaskA implements Runnable {
private Stack<String> stack = null;
private AtomicStampedReference<Node<String>> headStampReference;
public TaskA(Stack<String> stack, AtomicStampedReference<Node<String>> headStampReference) {
this.stack = stack;
this.headStampReference = headStampReference;
}
@Override
public void run() {
if (stack.topNode() == headStampReference.getReference()) {
AtomicStampedReference<Node<String>> headNode = this.headStampReference;
int stamp = headNode.getStamp();
Node<String> bNode = stack.topNextNode();
// 让 TaskB 去把 AB 出栈并且 入 D C A
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 此时 TaskB 把 栈 变成 D C A top 还是 A 更新 因为此时 TaskA 还是认为 B.next = null 就会弄丢 D C
if (headNode.compareAndSet(stack.topNode(), bNode, stamp, stamp + 1)) {
stack.pop(); // A 出栈
// 入栈 B 然而 B 的 next = null
stack.push(bNode);
} else {
System.out.println(“ 有其余线程 已更新 top”);
}
}
}
}
static class TaskB implements Runnable {
private Stack<String> stack;
private AtomicStampedReference<Node<String>> headStampReference;
public TaskB(Stack<String> stack, AtomicStampedReference<Node<String>> headStampReference) {
this.stack = stack;
this.headStampReference = headStampReference;
}
@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (stack.topNode() == headStampReference.getReference()) {
//A 出栈
stack.pop();
//B 出栈
stack.pop();
Node<String> aNode = headStampReference.getReference();
Node<String> dNode = new Node<>(“D”);
Node<String> cNode = new Node<>(“C”);
cNode.next = dNode;
aNode.next = cNode;
// 顺次入栈 D C A
if (headStampReference.compareAndSet(aNode, dNode, headStampReference.getStamp(), headStampReference.getStamp() + 1)) {
stack.push(dNode);
}
if (headStampReference.compareAndSet(dNode, cNode, headStampReference.getStamp(), headStampReference.getStamp() + 1)) {
stack.push(cNode);
}
if (headStampReference.compareAndSet(cNode, aNode, headStampReference.getStamp(), headStampReference.getStamp() + 1)) {
stack.push(aNode);
}
}
}
}
能够看到 B 并没有被更新到 栈中 因为 通过 AtomicStampedReference 的 Stampd 曾经可能晓得 这个栈曾经被更改过
7. AtomicMarkableReference
AtomicMarkableReference 能够了解为下面 AtomicStampedReference 的简化版,就是不关怀批改过几次,仅仅关怀是否批改过。
因而变量 mark 是 boolean 类型
,仅记录值是否有过批改。
private static class Pair<T> {
final T reference;
final boolean mark;
private Pair(T reference, boolean mark) {
this.reference = reference;
this.mark = mark;
}
static <T> Pair<T> of(T reference, boolean mark) {
return new Pair<T>(reference, mark);
}
}
和下面的 AtomicStampedReference 相似 也来解决 ABA 问题 就不开展说了
8. AtomicIntegerFieldUpdater
AtomicIntegerFieldUpdater,它实现了能够线程平安地更新对象中的整型变量。比方如下的 int
类型的 count 变量 , 当你的零碎有些变量当初并没有设计成 Atomic,那么当初能够通过 FieldUpdater 来平安的 更新,并且不须要带来 对系统代码的批改,最多只须要增加 一个 volatile , 不会造成什么影响
public class AtomicIntegerFieldUpdaterDemo {
public int count = 100;
public static AtomicIntegerFieldUpdater<AtomicIntegerFieldUpdaterDemo> atomicIntegerFieldUpdater = AtomicIntegerFieldUpdater.newUpdater(AtomicIntegerFieldUpdaterDemo.class, “count”);
public static void main(String[] args) {
AtomicIntegerFieldUpdaterDemo atomicIntegerFieldUpdaterDemo = new AtomicIntegerFieldUpdaterDemo();
if (atomicIntegerFieldUpdater.compareAndSet(atomicIntegerFieldUpdaterDemo, 100, 200)) {
System.out.println(“ 更新胜利 count : ” + atomicIntegerFieldUpdaterDemo.count);
}
if(atomicIntegerFieldUpdater.compareAndSet(atomicIntegerFieldUpdaterDemo,100,300)){
System.out.println(“ 更新胜利 count : ” + atomicIntegerFieldUpdaterDemo.count);
}
}
报错了:因为必须是 volatile 变量,并且不能是 private
public volatile int count = 100;
打印后果:
更新胜利 count : 200
总结
本篇次要解说了 Java 中 无锁 CAS 相干常识,包含 罕用的 AtomicInteger,AtomicBoolean,AtomicReference AtomicStampedReference 和 AtomicMarkableReference,AtomicIntegerFieldUpdater,以及介绍了 ABA 问题,和解决办法。一起来理解吧!