Java 高并发之无锁(CAS)

本篇次要讲 Java中的无锁 CAS ,无锁 顾名思义就是 以不上锁的形式解决并发问题,而不应用synchronized 和 lock 等。。

1. Atomic 包

java.util.concurrent.atomic 包下类都是原子类,原子类都是基于 sun.misc.Unsafe 实现的

根本能够分为一下几类:

  1. 原子性根本数据类型:AtomicBoolean、AtomicInteger、AtomicLong
  2. 原子性对象援用类型:AtomicReferenceAtomicStampedReference、AtomicMarkableReference
  3. 原子性数组:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
  4. 原子性对象属性更新:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater

2.CAS 的概念

无锁 是指 所有的线程都能 无障碍的达到临界点(然而能不能胜利操作临界点的值 是不肯定的) ,无锁的实现采纳的是 CAS(compare and swap) 算法实现的
  1. CAS 须要依赖 CPU提供的 CAS指令,CPU 为了解决并发问题,提供了 CAS 指令
  2. CAS 指令须要3个参数 ,(变量值 、预期值、新值),只有当 变量值和预期值 雷同的时候 才会更新变量值为新值
  3. CAS 是一条 CPU 指令,由 CPU 硬件级别上保障原子性

3.AtomicInteger 案例 以源码

AtomicInteger 我置信应该大部分都用过吧, 用于解决 count++

因为count ++ 操作 不是原子的,可能 第一个线程拿到的 count = 0 将要进行++操作的时候,被其余线程抢占了CPU资源,第二个线程此时读取的还是count = 0 ,那么这样就会造成问题, 而AtomicInteger 它外部应用CAS 算法 保障了并发平安问题。

public class AtomicIntegerDemo {

private static int count = 0;
private static AtomicInteger  atomicInteger = new AtomicInteger(0);

public static void main(String[] args) throws InterruptedException {

Thread[] threads = new Thread[10];

for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
for(int j = 0; j < 1000; j++){
count++;
atomicInteger.incrementAndGet(); //外部CAS 进行自增
}
});
thread.start();
threads[i] = thread;
}
for (int i = 0; i < threads.length; i++) {
threads[i].join();
}
System.out.println("后果: " + count);
System.out.println("后果: " + atomicInteger.get());
}
}

输入后果发现,atomicInteger 保障了数据正确

源码剖析:外部通过 unafe 类提供的办法解决的,Java中的Unsafe类为咱们提供了相似C++手动治理内存的能力,能够间接获取某个属性的内容地址,并且提供了一些对内存地址上的值得操作

private static final Unsafe unsafe = Unsafe.getUnsafe();

//获取value 属性的 对象偏移量
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
/**

  • Atomically increments by one the current value.

*

  • @return the updated value

*/
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

unsafe的 getAndAddInt 办法 ,var5 相当于期望值 只有 外部调用 compareAndSwapInt 判断var1 对象(AtomicInteger) 的内存偏移值var2 (valueOffset) 是否和期望值雷同,如果雷同示意 这期间没有其余线程批改, 否则 自旋再 获取 以后var5 再比拟

//var5是期望值 var5+var4 是新值
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);//获取this上的 valueOffset的偏移量的值 value
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;
}

这个办法是 native (C/C++)编写的代码,外部实现 应用了CPU的 CAS指令 具备原子性

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

AtomicLong 和 AtomicInteger相似 这里不开展说了,

4.AtomicBoolean 原理和应用场景

而 AtomicBoolean 也相似,不过外部value 是一个int 通过 int值是 1 还是 0 当做 true / false

public AtomicBoolean(boolean initialValue) {
value = initialValue ? 1 : 0;
}

public final boolean compareAndSet(boolean expectedValue, boolean newValue) {
return VALUE.compareAndSet(this,
(expectedValue ? 1 : 0),
(newValue ? 1 : 0));
}

当在多个线程 只解决 一次初始化性能的时候 能够应用

private static AtomicBoolean initialized = new AtomicBoolean(false);
public void init()
{
if( initialized.compareAndSet(false, true) )
{
// 这里搁置初始化代码....
}
}

5.AtomicReference

始终很难了解 AtmoicReference 的 应用场景,毕竟 java赋值操作原本就是 原子性的

作用在 对 对象进行原子操作 提供了一种读和写都是原子性的 对象援用变量

AtomicReference和AtomicInteger十分相似 AtomicReference则对应一般的对象援用,底层应用的是compareAndSwapObject实现CAS,比拟的是两个对象的地址是否相等。也就是它能够保障你在批改对象援用时的线程安全性。

疑难:援用类型的赋值是原子的 为什么要有AtomicReference 来保障批改对象援用时的线程安全性 呢?

真正须要应用AtomicReference的场景是你须要CAS类操作时,因为波及到比拟、设置等多于一个的操作

上面的案例是 :模仿 拦截器拦挡到申请 记录ip 而后 去更新最新的 LoginDetailInfo 登录信息 应用AtomicReference 去包装老的 登录信息 , 应用5个线程去同时更新最新的 登录信息

package com.johnny.atomic;


import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

/**

  • @author johnny
  • @create 2020-09-16 下午3:58

**/
public class AtomicReferenceDemo {


private static final int CORE_SIZE = 5;
private static final int MAX_SIZE = 10;

private static final int QUEUE_SIZE = 100;

private static final CountDownLatch countDownLatch = new CountDownLatch(5);


public static void main(String[] args) throws InterruptedException {


ExecutorService executorService = new ThreadPoolExecutor(CORE_SIZE, MAX_SIZE, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(QUEUE_SIZE));

//interceptor ip
//依据拦截器拦挡到 最新的登录ip
String newIp = "47.98.250.170";

//from db search old LoginDetailInfo
//从数据库中查到 最新的一次 登录信息
LoginDetailInfo oldLoginDetailInfo = LoginDetailInfo.builder()
.loginCount(10)
.ip("47.98.250.138")
.timeStamp(System.currentTimeMillis()).build();

//封装老的 登录信息
AtomicReference<LoginDetailInfo> atomicReference = new AtomicReference<>(oldLoginDetailInfo);

//封装工作
TaskRun taskRun = new TaskRun(atomicReference, newIp);
//应用5个线程去 同时更新 最新的 登录信息,只有有一个胜利即可
for (int i = 0; i < 5; i++) {
executorService.execute(taskRun);
}

//主线程 期待
countDownLatch.await();

System.out.println("【所有线程执行结束 main 线程完结 】");
}


static class TaskRun implements Runnable {

private String ip;

private AtomicReference<LoginDetailInfo> atomicReference;

/**

  • @param atomicReference : 老的 atomicReference

*/
public TaskRun(AtomicReference<LoginDetailInfo> atomicReference, String ip) {
this.ip = ip;
this.atomicReference = atomicReference;
}

@Override
public void run() {

if (atomicReference != null) {

LoginDetailInfo oldLoginDetailInfo = atomicReference.get();

long count = oldLoginDetailInfo.getLoginCount() + 1;

LoginDetailInfo newLoginDetailInfo = new LoginDetailInfo();
newLoginDetailInfo.setLoginCount(count);
newLoginDetailInfo.setIp(ip);
newLoginDetailInfo.setTimeStamp(System.currentTimeMillis());

try {
//模仿 执行一下其余操作 能够让多个线程 都同时走到了 这里 保障多个线程 拿到的 oldLoginDetailInfo 是一个 ,这样才会只有一个 能更新胜利
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}

if (atomicReference.compareAndSet(oldLoginDetailInfo, newLoginDetailInfo)) {
//save To Db or some ...
System.out.println(Thread.currentThread().getName() + " 线程更新胜利 --- {}");
System.out.println(newLoginDetailInfo);
}else{
System.out.println(Thread.currentThread().getName() + " 线程更新失败 其余线程曾经更新 ---");
}
//计数器 -1
countDownLatch.countDown();
}

}
}


@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
static class LoginDetailInfo {

private long loginCount;

private String ip;

private long timeStamp;

}

}

运行后果如下:

这里的应用场景很牵强,不晓得你会 怎么应用AtomicReference 欢送在下方留言 给我提点思路

6. AtomicStampRefence

线程判断被批改对象是否能够正确写入的条件是对象的以后值和冀望是否统一。这个逻辑从个别意义上来说是正确的。但有可能呈现一个小小的例外,就是当你取得对象以后数据后,在筹备批改为新值前,对象的值被其余线程间断批改了2次,而通过这2次批改后,对象的值又复原为旧值。这样,以后线程就无奈正确判断这个对象到底是否被批改过,这就是典型的 CAS ABA问题

如何解决 ABA问题呢,其实Java 提供了 AtomicStampRefence 就是用来解决ABA问题的

模仿如下案例:

现有一个用单向链表实现的堆栈,栈顶为A,这时线程T1曾经晓得A.next为B,而后心愿用CAS将栈顶替换为B:

head.compareAndSet(A,B);

在T1执行下面这条指令之前,线程T2染指,将A、B出栈,再pushD、C、A,此时堆栈构造如下图,而对象B此时处于游离状态:

此时轮到线程T1执行CAS操作,检测发现栈顶仍为A,所以CAS胜利,栈顶变为B,但实际上B.next为null,所以此时的状况变为:

其中堆栈中只有B一个元素,C和D组成的链表不再存在于堆栈中,平白无故就把C、D丢掉了。

上面咱们就来用代码 模仿下面这个案例:

6.1 首先咱们定义 栈 :

/**
*
*/
@Data
static class Node<T> {

private T value;

private Node<T> next;

public Node(T value) {
this.value = value;
}
}


static class Stack<T> {

private Node<T> top;

private int length = 0;


public boolean isEmpty() {
if (length == 0 || top == null) {
return true;
} else {
return false;
}
}

public void push(Node<T> node) {
if (isEmpty()) {
top = node;
} else {
//替换
//node.setNext(top);
top = node;
}
length++;
}

public void push(T value) {
Node<T> node = new Node<T>(value);
if (isEmpty()) {
top = node;
} else {
//替换
node.setNext(top);
top = node;
}
length++;
}

public T pop() {

if (isEmpty()) {
System.out.println("栈 为空");
return null;
} else {
T result = top.getValue();
top = top.next;
length--;
return result;
}
}

public T top() {
if (isEmpty()) {
System.out.println("栈 为空");
return null;
} else {
return top.getValue();
}
}

public Node<T> topNode() {
if (isEmpty()) {
System.out.println("栈 为空");
return null;
} else {
return top;
}
}

public Node<T> topNextNode() {
if (isEmpty()) {
System.out.println("栈 为空");
return null;
} else {
return top.next;

}
}

public int size() {
return length;
}

public void deleteStack() {
top = null;
length = 0;
}

}

6.2 应用 AtmoicReference 来看看带来的问题

package com.johnny.atomic;

import lombok.Data;

import java.util.concurrent.atomic.AtomicReference;

/**

  • ABA 问题Demo

*

  • @author johnny
  • @create 2020-09-20 下午5:52

**/
public class AtomicReferenceDemo {


/**

  • 应用 Stack 来 模仿 CAS ABA 问题

*

  • @param args

*/
public static void main(String[] args) throws InterruptedException {


//1.初始化 栈
Stack<String> stack = new Stack<String>();

stack.push("B");
stack.push("A");

//headReference 保留 以后的 栈的 top
AtomicReference<Node<String>> headReference = new AtomicReference<>(stack.topNode());


//2.须要2个线程
TaskA taskA = new TaskA(stack, headReference);
TaskB taskB = new TaskB(stack, headReference);


Thread threadA = new Thread(taskA);
Thread threadB = new Thread(taskB);

threadA.start();
threadB.start();


threadA.join();
threadB.join();

System.out.println(stack.pop());
System.out.println(stack.pop());
System.out.println(stack.pop());

}

static class TaskA implements Runnable {

private AtomicReference<Node<String>> headReference;
private Stack<String> stack;

public TaskA(Stack<String> stack, AtomicReference<Node<String>> headReference) {
this.stack = stack;
this.headReference = headReference;
}

@Override
public void run() {

if (stack.topNode() == headReference.get()) {
Node<String> aNode = headReference.get();
Node<String> bNode = stack.topNextNode();

//让TaskB 去把 AB 出栈并且 入 D C A
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//此时 TaskB 把 栈 变成 D C A top还是A 更新 因为此时TaskA还是认为 B.next = null 就会弄丢 D C
if (headReference.compareAndSet(aNode, bNode)) {
//把A出栈
stack.pop();
//增加B 当做top
stack.push(bNode);
} else {
System.out.println("有其余线程 已更新 top");
}
}
}
}

static class TaskB implements Runnable {

private AtomicReference<Node<String>> headReference;
private Stack<String> stack;

public TaskB(Stack<String> stack, AtomicReference<Node<String>> headReference) {
this.stack = stack;
this.headReference = headReference;
}

@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (stack.topNode() == headReference.get()) {
Node<String> aNode = headReference.get();
//A 出栈
stack.pop();
//B 出栈
stack.pop();

Node<String> dNode = new Node<>("D");
Node<String> cNode = new Node<>("C");
dNode.next = cNode;
cNode.next = aNode;
//顺次入栈 D C A
if (headReference.compareAndSet(aNode, dNode)) {
stack.push(dNode);
}
if (headReference.compareAndSet(dNode, cNode)) {
stack.push(cNode);
}
if (headReference.compareAndSet(cNode, aNode)) {
stack.push(aNode);
}
}


}
}

}

能够看到 最初栈里 只有 B了,其余的 D C 都被弄丢了

6.3 通过应用 AtomicStampedReference 来改良
AtomicStampedReference 外部还保护了一个Stamp 来记录 更改次数 ,这样当 从 A 变到 B 再变回A的时候,尽管值 没变,然而Stamp 变动了,仍然不能更新胜利

/**

  • 应用 Stack 来 模仿 CAS ABA 问题

*

  • @param args

*/
public static void main(String[] args) throws InterruptedException {


//1.初始化 栈
Stack<String> stack = new Stack<String>();

stack.push("B");
stack.push("A");

AtomicStampedReference<Node<String>> head = new AtomicStampedReference<>(stack.topNode(), 0);


//2.须要2个线程
TaskA taskA = new TaskA(stack, head);
TaskB taskB = new TaskB(stack, head);


Thread threadA = new Thread(taskA);
Thread threadB = new Thread(taskB);

threadA.start();
threadB.start();


threadA.join();
threadB.join();

System.out.println(stack.pop());
System.out.println(stack.pop());
System.out.println(stack.pop());

}

static class TaskA implements Runnable {

private Stack<String> stack = null;

private AtomicStampedReference<Node<String>> headStampReference;

public TaskA(Stack<String> stack, AtomicStampedReference<Node<String>> headStampReference) {
this.stack = stack;
this.headStampReference = headStampReference;
}

@Override
public void run() {

if (stack.topNode() == headStampReference.getReference()) {

AtomicStampedReference<Node<String>> headNode = this.headStampReference;
int stamp = headNode.getStamp();

Node<String> bNode = stack.topNextNode();

//让TaskB 去把 AB 出栈并且 入 D C A
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//此时 TaskB 把 栈 变成 D C A top还是A 更新 因为此时TaskA还是认为 B.next = null 就会弄丢 D C
if (headNode.compareAndSet(stack.topNode(), bNode, stamp, stamp + 1)) {
stack.pop(); //A出栈
//入栈B 然而 B的 next = null
stack.push(bNode);
} else {
System.out.println("有其余线程 已更新 top");
}
}
}
}

static class TaskB implements Runnable {

private Stack<String> stack;

private AtomicStampedReference<Node<String>> headStampReference;

public TaskB(Stack<String> stack, AtomicStampedReference<Node<String>> headStampReference) {
this.stack = stack;
this.headStampReference = headStampReference;
}


@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (stack.topNode() == headStampReference.getReference()) {
//A 出栈
stack.pop();
//B 出栈
stack.pop();

Node<String> aNode = headStampReference.getReference();

Node<String> dNode = new Node<>("D");
Node<String> cNode = new Node<>("C");
cNode.next = dNode;
aNode.next = cNode;

//顺次入栈 D C A
if (headStampReference.compareAndSet(aNode, dNode, headStampReference.getStamp(), headStampReference.getStamp() + 1)) {
stack.push(dNode);
}
if (headStampReference.compareAndSet(dNode, cNode, headStampReference.getStamp(), headStampReference.getStamp() + 1)) {
stack.push(cNode);
}
if (headStampReference.compareAndSet(cNode, aNode, headStampReference.getStamp(), headStampReference.getStamp() + 1)) {
stack.push(aNode);
}
}

}
}

能够看到 B 并没有被更新到 栈中 因为 通过AtomicStampedReference 的Stampd 曾经可能晓得 这个栈曾经被更改过

7. AtomicMarkableReference

AtomicMarkableReference能够了解为下面AtomicStampedReference的简化版,就是不关怀批改过几次,仅仅关怀是否批改过。因而变量mark是boolean类型,仅记录值是否有过批改。

private static class Pair<T> {
final T reference;
final boolean mark;
private Pair(T reference, boolean mark) {
this.reference = reference;
this.mark = mark;
}
static <T> Pair<T> of(T reference, boolean mark) {
return new Pair<T>(reference, mark);
}
}

和下面的 AtomicStampedReference 相似 也来解决 ABA 问题 就不开展说了

8. AtomicIntegerFieldUpdater

AtomicIntegerFieldUpdater,它实现了能够线程平安地更新对象中的整型变量。比方如下的 int

类型的 count 变量 ,当你的零碎有些变量当初并没有设计成 Atomic ,那么当初能够通过FieldUpdater来平安的 更新,并且不须要带来 对系统代码的批改,最多只须要增加 一个 volatile , 不会造成什么影响

public class AtomicIntegerFieldUpdaterDemo {

public int count = 100;

public static AtomicIntegerFieldUpdater<AtomicIntegerFieldUpdaterDemo> atomicIntegerFieldUpdater = AtomicIntegerFieldUpdater.newUpdater(AtomicIntegerFieldUpdaterDemo.class, "count");


public static void main(String[] args) {

AtomicIntegerFieldUpdaterDemo atomicIntegerFieldUpdaterDemo = new AtomicIntegerFieldUpdaterDemo();

if (atomicIntegerFieldUpdater.compareAndSet(atomicIntegerFieldUpdaterDemo, 100, 200)) {
System.out.println("更新胜利 count : " + atomicIntegerFieldUpdaterDemo.count);
}

if(atomicIntegerFieldUpdater.compareAndSet(atomicIntegerFieldUpdaterDemo,100,300)){
System.out.println("更新胜利 count : " + atomicIntegerFieldUpdaterDemo.count);
}
}

报错了:因为必须是 volatile 变量,并且不能是private

public volatile int count = 100;

打印后果:

更新胜利 count : 200

总结

本篇次要解说了 Java中 无锁CAS相干常识,包含 罕用的AtomicInteger ,AtomicBoolean , AtomicReference AtomicStampedReference 和AtomicMarkableReference, AtomicIntegerFieldUpdater,以及介绍了ABA问题,和解决办法。 一起来理解吧!