关于java:Java-高并发之无锁CAS

1次阅读

共计 13599 个字符,预计需要花费 34 分钟才能阅读完成。

Java 高并发之无锁(CAS)

本篇次要讲 Java 中的无锁 CAS , 无锁 顾名思义就是 以不上锁的形式解决并发问题,而不应用 synchronized 和 lock 等。。

1. Atomic 包

java.util.concurrent.atomic 包下类都是原子类,原子类都是基于 sun.misc.Unsafe 实现的

根本能够分为一下几类:

  1. 原子性根本数据类型:AtomicBoolean、AtomicInteger、AtomicLong
  2. 原子性对象援用类型:AtomicReferenceAtomicStampedReference、AtomicMarkableReference
  3. 原子性数组:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
  4. 原子性对象属性更新:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater

2.CAS 的概念

无锁 是指 所有的线程都能 无障碍的达到临界点(然而能不能胜利操作临界点的值 是不肯定的),无锁的实现采纳的是 CAS(compare and swap) 算法实现的

  1. CAS 须要依赖 CPU 提供的 CAS 指令,CPU 为了解决并发问题,提供了 CAS 指令
  2. CAS 指令须要 3 个参数,(变量值、预期值、新值),只有当 变量值和预期值 雷同的时候 才会更新变量值为新值
  3. CAS 是一条 CPU 指令,由 CPU 硬件级别上保障原子性

3.AtomicInteger 案例 以源码

AtomicInteger 我置信应该大部分都用过吧,用于解决 count++

因为 count ++ 操作 不是原子的,可能 第一个线程拿到的 count = 0 将要进行 ++ 操作的时候,被其余线程抢占了 CPU 资源,第二个线程此时读取的还是 count = 0,那么这样就会造成问题,而 AtomicInteger 它外部应用 CAS 算法 保障了并发平安问题。

public class AtomicIntegerDemo {

private static int count = 0;
private static AtomicInteger  atomicInteger = new AtomicInteger(0);

public static void main(String[] args) throws InterruptedException {

Thread[] threads = new Thread[10];

for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
for(int j = 0; j < 1000; j++){
count++;
atomicInteger.incrementAndGet(); // 外部 CAS 进行自增
}
});
thread.start();
threads[i] = thread;
}
for (int i = 0; i < threads.length; i++) {
threads[i].join();
}
System.out.println(“ 后果: ” + count);
System.out.println(“ 后果: ” + atomicInteger.get());
}
}

输入后果发现,atomicInteger 保障了数据正确

源码剖析:外部通过 unafe 类提供的办法解决的 ,Java 中的 Unsafe 类为咱们提供了相似 C ++ 手动治理内存的能力, 能够间接获取某个属性的内容地址,并且提供了一些对内存地址上的值得操作

private static final Unsafe unsafe = Unsafe.getUnsafe();

// 获取 value 属性的 对象偏移量
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField(“value”));
} catch (Exception ex) {throw new Error(ex); }
}
/**

  • Atomically increments by one the current value.

*

  • @return the updated value

*/
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

unsafe 的 getAndAddInt 办法,var5 相当于期望值 只有 外部调用 compareAndSwapInt 判断 var1 对象(AtomicInteger)的内存偏移值 var2(valueOffset)是否和期望值雷同,如果雷同示意 这期间没有其余线程批改,否则 自旋再 获取 以后 var5 再比拟

//var5 是期望值 var5+var4 是新值
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);// 获取 this 上的 valueOffset 的偏移量的值 value
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;
}

这个办法是 native (C/C++)编写的代码,外部实现 应用了 CPU 的 CAS 指令 具备原子性

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

AtomicLong 和 AtomicInteger 相似 这里不开展说了,

4.AtomicBoolean 原理和应用场景

而 AtomicBoolean 也相似,不过外部 value 是一个 int 通过 int 值是 1 还是 0 当做 true / false

public AtomicBoolean(boolean initialValue) {
value = initialValue ? 1 : 0;
}

public final boolean compareAndSet(boolean expectedValue, boolean newValue) {
return VALUE.compareAndSet(this,
(expectedValue ? 1 : 0),
(newValue ? 1 : 0));
}

当在多个线程 只解决 一次初始化性能的时候 能够应用

private static AtomicBoolean initialized = new AtomicBoolean(false);
public void init()
{
if(initialized.compareAndSet(false, true) )
{
// 这里搁置初始化代码 ….
}
}

5.AtomicReference

始终很难了解 AtmoicReference 的 应用场景,毕竟 java 赋值操作原本就是 原子性的

作用在 对 对象 进行原子操作 提供了一种读和写都是原子性的 对象援用变量

AtomicReference 和 AtomicInteger 十分相似 AtomicReference 则对应一般的对象援用,底层应用的是 compareAndSwapObject 实现 CAS,比拟的是两个对象的地址是否相等。也就是它能够保障你在批改对象援用时的线程安全性。

疑难:援用类型的赋值是原子的 为什么要有 AtomicReference 来保障批改对象援用时的线程安全性 呢?

真正须要应用 AtomicReference 的场景是你须要 CAS 类操作时,因为波及到比拟、设置等多于一个的操作

上面的案例是:模仿 拦截器拦挡到申请 记录 ip 而后 去更新最新的 LoginDetailInfo 登录信息 应用 AtomicReference 去包装老的 登录信息,应用 5 个线程去同时更新最新的 登录信息

package com.johnny.atomic;


import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

/**

  • @author johnny
  • @create 2020-09-16 下午 3:58

**/
public class AtomicReferenceDemo {


private static final int CORE_SIZE = 5;
private static final int MAX_SIZE = 10;

private static final int QUEUE_SIZE = 100;

private static final CountDownLatch countDownLatch = new CountDownLatch(5);


public static void main(String[] args) throws InterruptedException {


ExecutorService executorService = new ThreadPoolExecutor(CORE_SIZE, MAX_SIZE, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(QUEUE_SIZE));

//interceptor ip
// 依据拦截器拦挡到 最新的登录 ip
String newIp = “47.98.250.170”;

//from db search old LoginDetailInfo
// 从数据库中查到 最新的一次 登录信息
LoginDetailInfo oldLoginDetailInfo = LoginDetailInfo.builder()
.loginCount(10)
.ip(“47.98.250.138”)
.timeStamp(System.currentTimeMillis()).build();

// 封装老的 登录信息
AtomicReference<LoginDetailInfo> atomicReference = new AtomicReference<>(oldLoginDetailInfo);

// 封装工作
TaskRun taskRun = new TaskRun(atomicReference, newIp);
// 应用 5 个线程去 同时更新 最新的 登录信息,只有有一个胜利即可
for (int i = 0; i < 5; i++) {
executorService.execute(taskRun);
}

// 主线程 期待
countDownLatch.await();

System.out.println(“【所有线程执行结束 main 线程完结】”);
}


static class TaskRun implements Runnable {

private String ip;

private AtomicReference<LoginDetailInfo> atomicReference;

/**

  • @param atomicReference : 老的 atomicReference

*/
public TaskRun(AtomicReference<LoginDetailInfo> atomicReference, String ip) {
this.ip = ip;
this.atomicReference = atomicReference;
}

@Override
public void run() {

if (atomicReference != null) {

LoginDetailInfo oldLoginDetailInfo = atomicReference.get();

long count = oldLoginDetailInfo.getLoginCount() + 1;

LoginDetailInfo newLoginDetailInfo = new LoginDetailInfo();
newLoginDetailInfo.setLoginCount(count);
newLoginDetailInfo.setIp(ip);
newLoginDetailInfo.setTimeStamp(System.currentTimeMillis());

try {
// 模仿 执行一下其余操作 能够让多个线程 都同时走到了 这里 保障多个线程 拿到的 oldLoginDetailInfo 是一个,这样才会只有一个 能更新胜利
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}

if (atomicReference.compareAndSet(oldLoginDetailInfo, newLoginDetailInfo)) {
//save To Db or some …
System.out.println(Thread.currentThread().getName() + ” 线程更新胜利 — {}”);
System.out.println(newLoginDetailInfo);
}else{
System.out.println(Thread.currentThread().getName() + ” 线程更新失败 其余线程曾经更新 —“);
}
// 计数器 -1
countDownLatch.countDown();
}

}
}


@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
static class LoginDetailInfo {

private long loginCount;

private String ip;

private long timeStamp;

}

}

运行后果如下:

这里的应用场景很牵强,不晓得你会 怎么应用 AtomicReference 欢送在下方留言 给我提点思路

6. AtomicStampRefence

线程判断被批改对象是否能够正确写入的条件 是对象的以后值和冀望是否统一 。这个逻辑从个别意义上来说是正确的。但有可能呈现一个小小的例外, 就是当你取得对象以后数据后,在筹备批改为新值前,对象的值被其余线程间断批改了 2 次,而通过这 2 次批改后,对象的值又复原为旧值。这样,以后线程就无奈正确判断这个对象到底是否被批改过, 这就是典型的 CAS ABA 问题

如何解决 ABA 问题呢,其实 Java 提供了 AtomicStampRefence 就是用来解决 ABA 问题的

模仿如下案例:

现有一个用单向链表实现的堆栈,栈顶为 A,这时线程 T1 曾经晓得 A.next 为 B,而后心愿用 CAS 将栈顶替换为 B:

head.compareAndSet(A,B);

在 T1 执行下面这条指令之前,线程 T2 染指,将 A、B 出栈,再 pushD、C、A,此时堆栈构造如下图,而对象 B 此时处于游离状态:

此时轮到线程 T1 执行 CAS 操作,检测发现栈顶仍为 A,所以 CAS 胜利,栈顶变为 B,但实际上 B.next 为 null,所以此时的状况变为:

其中堆栈中只有 B 一个元素,C 和 D 组成的链表不再存在于堆栈中,平白无故就把 C、D 丢掉了。

上面咱们就来用代码 模仿下面这个案例:

6.1 首先咱们定义 栈 :

/**
*
*/
@Data
static class Node<T> {

private T value;

private Node<T> next;

public Node(T value) {
this.value = value;
}
}


static class Stack<T> {

private Node<T> top;

private int length = 0;


public boolean isEmpty() {
if (length == 0 || top == null) {
return true;
} else {
return false;
}
}

public void push(Node<T> node) {
if (isEmpty()) {
top = node;
} else {
// 替换
//node.setNext(top);
top = node;
}
length++;
}

public void push(T value) {
Node<T> node = new Node<T>(value);
if (isEmpty()) {
top = node;
} else {
// 替换
node.setNext(top);
top = node;
}
length++;
}

public T pop() {

if (isEmpty()) {
System.out.println(“ 栈 为空 ”);
return null;
} else {
T result = top.getValue();
top = top.next;
length–;
return result;
}
}

public T top() {
if (isEmpty()) {
System.out.println(“ 栈 为空 ”);
return null;
} else {
return top.getValue();
}
}

public Node<T> topNode() {
if (isEmpty()) {
System.out.println(“ 栈 为空 ”);
return null;
} else {
return top;
}
}

public Node<T> topNextNode() {
if (isEmpty()) {
System.out.println(“ 栈 为空 ”);
return null;
} else {
return top.next;

}
}

public int size() {
return length;
}

public void deleteStack() {
top = null;
length = 0;
}

}

6.2 应用 AtmoicReference 来看看带来的问题

package com.johnny.atomic;

import lombok.Data;

import java.util.concurrent.atomic.AtomicReference;

/**

  • ABA 问题 Demo

*

  • @author johnny
  • @create 2020-09-20 下午 5:52

**/
public class AtomicReferenceDemo {


/**

  • 应用 Stack 来 模仿 CAS ABA 问题

*

  • @param args

*/
public static void main(String[] args) throws InterruptedException {


//1. 初始化 栈
Stack<String> stack = new Stack<String>();

stack.push(“B”);
stack.push(“A”);

//headReference 保留 以后的 栈的 top
AtomicReference<Node<String>> headReference = new AtomicReference<>(stack.topNode());


//2. 须要 2 个线程
TaskA taskA = new TaskA(stack, headReference);
TaskB taskB = new TaskB(stack, headReference);


Thread threadA = new Thread(taskA);
Thread threadB = new Thread(taskB);

threadA.start();
threadB.start();


threadA.join();
threadB.join();

System.out.println(stack.pop());
System.out.println(stack.pop());
System.out.println(stack.pop());

}

static class TaskA implements Runnable {

private AtomicReference<Node<String>> headReference;
private Stack<String> stack;

public TaskA(Stack<String> stack, AtomicReference<Node<String>> headReference) {
this.stack = stack;
this.headReference = headReference;
}

@Override
public void run() {

if (stack.topNode() == headReference.get()) {
Node<String> aNode = headReference.get();
Node<String> bNode = stack.topNextNode();

// 让 TaskB 去把 AB 出栈并且 入 D C A
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 此时 TaskB 把 栈 变成 D C A top 还是 A 更新 因为此时 TaskA 还是认为 B.next = null 就会弄丢 D C
if (headReference.compareAndSet(aNode, bNode)) {
// 把 A 出栈
stack.pop();
// 增加 B 当做 top
stack.push(bNode);
} else {
System.out.println(“ 有其余线程 已更新 top”);
}
}
}
}

static class TaskB implements Runnable {

private AtomicReference<Node<String>> headReference;
private Stack<String> stack;

public TaskB(Stack<String> stack, AtomicReference<Node<String>> headReference) {
this.stack = stack;
this.headReference = headReference;
}

@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (stack.topNode() == headReference.get()) {
Node<String> aNode = headReference.get();
//A 出栈
stack.pop();
//B 出栈
stack.pop();

Node<String> dNode = new Node<>(“D”);
Node<String> cNode = new Node<>(“C”);
dNode.next = cNode;
cNode.next = aNode;
// 顺次入栈 D C A
if (headReference.compareAndSet(aNode, dNode)) {
stack.push(dNode);
}
if (headReference.compareAndSet(dNode, cNode)) {
stack.push(cNode);
}
if (headReference.compareAndSet(cNode, aNode)) {
stack.push(aNode);
}
}


}
}

}

能够看到 最初栈里 只有 B 了,其余的 D C 都被弄丢了

6.3 通过应用 AtomicStampedReference 来改良

AtomicStampedReference 外部还保护了一个 Stamp 来记录 更改次数,这样当 从 A 变到 B 再变回 A 的时候,尽管值 没变,然而 Stamp 变动了,仍然不能更新胜利

/**

  • 应用 Stack 来 模仿 CAS ABA 问题

*

  • @param args

*/
public static void main(String[] args) throws InterruptedException {


//1. 初始化 栈
Stack<String> stack = new Stack<String>();

stack.push(“B”);
stack.push(“A”);

AtomicStampedReference<Node<String>> head = new AtomicStampedReference<>(stack.topNode(), 0);


//2. 须要 2 个线程
TaskA taskA = new TaskA(stack, head);
TaskB taskB = new TaskB(stack, head);


Thread threadA = new Thread(taskA);
Thread threadB = new Thread(taskB);

threadA.start();
threadB.start();


threadA.join();
threadB.join();

System.out.println(stack.pop());
System.out.println(stack.pop());
System.out.println(stack.pop());

}

static class TaskA implements Runnable {

private Stack<String> stack = null;

private AtomicStampedReference<Node<String>> headStampReference;

public TaskA(Stack<String> stack, AtomicStampedReference<Node<String>> headStampReference) {
this.stack = stack;
this.headStampReference = headStampReference;
}

@Override
public void run() {

if (stack.topNode() == headStampReference.getReference()) {

AtomicStampedReference<Node<String>> headNode = this.headStampReference;
int stamp = headNode.getStamp();

Node<String> bNode = stack.topNextNode();

// 让 TaskB 去把 AB 出栈并且 入 D C A
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 此时 TaskB 把 栈 变成 D C A top 还是 A 更新 因为此时 TaskA 还是认为 B.next = null 就会弄丢 D C
if (headNode.compareAndSet(stack.topNode(), bNode, stamp, stamp + 1)) {
stack.pop(); // A 出栈
// 入栈 B 然而 B 的 next = null
stack.push(bNode);
} else {
System.out.println(“ 有其余线程 已更新 top”);
}
}
}
}

static class TaskB implements Runnable {

private Stack<String> stack;

private AtomicStampedReference<Node<String>> headStampReference;

public TaskB(Stack<String> stack, AtomicStampedReference<Node<String>> headStampReference) {
this.stack = stack;
this.headStampReference = headStampReference;
}


@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (stack.topNode() == headStampReference.getReference()) {
//A 出栈
stack.pop();
//B 出栈
stack.pop();

Node<String> aNode = headStampReference.getReference();

Node<String> dNode = new Node<>(“D”);
Node<String> cNode = new Node<>(“C”);
cNode.next = dNode;
aNode.next = cNode;

// 顺次入栈 D C A
if (headStampReference.compareAndSet(aNode, dNode, headStampReference.getStamp(), headStampReference.getStamp() + 1)) {
stack.push(dNode);
}
if (headStampReference.compareAndSet(dNode, cNode, headStampReference.getStamp(), headStampReference.getStamp() + 1)) {
stack.push(cNode);
}
if (headStampReference.compareAndSet(cNode, aNode, headStampReference.getStamp(), headStampReference.getStamp() + 1)) {
stack.push(aNode);
}
}

}
}

能够看到 B 并没有被更新到 栈中 因为 通过 AtomicStampedReference 的 Stampd 曾经可能晓得 这个栈曾经被更改过

7. AtomicMarkableReference

AtomicMarkableReference 能够了解为下面 AtomicStampedReference 的简化版,就是不关怀批改过几次,仅仅关怀是否批改过。因而变量 mark 是 boolean 类型,仅记录值是否有过批改。

private static class Pair<T> {
final T reference;
final boolean mark;
private Pair(T reference, boolean mark) {
this.reference = reference;
this.mark = mark;
}
static <T> Pair<T> of(T reference, boolean mark) {
return new Pair<T>(reference, mark);
}
}

和下面的 AtomicStampedReference 相似 也来解决 ABA 问题 就不开展说了

8. AtomicIntegerFieldUpdater

AtomicIntegerFieldUpdater,它实现了能够线程平安地更新对象中的整型变量。比方如下的 int

类型的 count 变量 , 当你的零碎有些变量当初并没有设计成 Atomic,那么当初能够通过 FieldUpdater 来平安的 更新,并且不须要带来 对系统代码的批改,最多只须要增加 一个 volatile , 不会造成什么影响

public class AtomicIntegerFieldUpdaterDemo {

public int count = 100;

public static AtomicIntegerFieldUpdater<AtomicIntegerFieldUpdaterDemo> atomicIntegerFieldUpdater = AtomicIntegerFieldUpdater.newUpdater(AtomicIntegerFieldUpdaterDemo.class, “count”);


public static void main(String[] args) {

AtomicIntegerFieldUpdaterDemo atomicIntegerFieldUpdaterDemo = new AtomicIntegerFieldUpdaterDemo();

if (atomicIntegerFieldUpdater.compareAndSet(atomicIntegerFieldUpdaterDemo, 100, 200)) {
System.out.println(“ 更新胜利 count : ” + atomicIntegerFieldUpdaterDemo.count);
}

if(atomicIntegerFieldUpdater.compareAndSet(atomicIntegerFieldUpdaterDemo,100,300)){
System.out.println(“ 更新胜利 count : ” + atomicIntegerFieldUpdaterDemo.count);
}
}

报错了:因为必须是 volatile 变量,并且不能是 private

public volatile int count = 100;

打印后果:

更新胜利 count : 200

总结

本篇次要解说了 Java 中 无锁 CAS 相干常识,包含 罕用的 AtomicInteger,AtomicBoolean,AtomicReference AtomicStampedReference 和 AtomicMarkableReference,AtomicIntegerFieldUpdater,以及介绍了 ABA 问题,和解决办法。一起来理解吧!

正文完
 0