关于java:基础篇JAVA原子组件和同步组件

5次阅读

共计 8845 个字符,预计需要花费 23 分钟才能阅读完成。

前言

在应用多线程并发编程的时,常常会遇到对共享变量批改操作。此时咱们能够抉择 ConcurrentHashMap,ConcurrentLinkedQueue 来进行平安地存储数据。但如果单单是波及状态的批改,线程执行程序问题,应用 Atomic 结尾的原子组件或者 ReentrantLock、CyclicBarrier 之类的同步组件,会是更好的抉择,上面将一一介绍它们的原理和用法

  • 原子组件的实现原理 CAS
  • AtomicBoolean、AtomicIntegerArray 等原子组件的用法、
  • 同步组件的实现原理
  • ReentrantLock、CyclicBarrier 等同步组件的用法

关注公众号,一起交换,微信搜一搜: 潜行前行

github 地址,感激 star

原子组件的实现原理 CAS

  • cas 的底层实现能够看下之前写的一篇文章: 详解锁原理,synchronized、volatile+cas 底层实现

利用场景

  • 可用来实现变量、状态在多线程下的原子性操作
  • 可用于实现同步锁(ReentrantLock)

原子组件

  • 原子组件的原子性操作是靠应用 cas 来自旋操作 volatile 变量实现的
  • volatile 的类型变量保障变量被批改时,其余线程都能看到最新的值
  • cas 则保障 value 的批改操作是原子性的,不会被中断

    根本类型原子类

    AtomicBoolean // 布尔类型
    AtomicInteger // 正整型数类型
    AtomicLong      // 长整型类型
  • 应用示例

    public static void main(String[] args) throws Exception {AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        // 异步线程批改 atomicBoolean
        CompletableFuture<Void> future = CompletableFuture.runAsync(() ->{
       try {Thread.sleep(1000); // 保障异步线程是在主线程之后批改 atomicBoolean 为 false
           atomicBoolean.set(false);
       }catch (Exception e){throw new RuntimeException(e);
       }
        });
        atomicBoolean.set(true);
        future.join();
        System.out.println("boolean value is:"+atomicBoolean.get());
    }
    --------------- 输入后果 ------------------
    boolean value is:false

援用类原子类

AtomicReference
// 加工夫戳版本的援用类原子类
AtomicStampedReference
// 相当于 AtomicStampedReference,AtomicMarkableReference 关怀的是
// 变量是否还是原来变量,两头被批改过也无所谓
AtomicMarkableReference
  • AtomicReference 的源码如下,它外部定义了一个 volatile V value,并借助 VarHandle(具体子类是 FieldInstanceReadWrite) 实现原子操作,MethodHandles 会帮忙计算 value 在类的偏移地位,最初在 VarHandle 调用 Unsafe.public final native boolean compareAndSetReference(Object o, long offset, Object expected, Object x)办法原子批改对象的属性

    public class AtomicReference<V> implements java.io.Serializable {
        private static final long serialVersionUID = -1848883965231344442L;
        private static final VarHandle VALUE;
        static {
       try {MethodHandles.Lookup l = MethodHandles.lookup();
           VALUE = l.findVarHandle(AtomicReference.class, "value", Object.class);
       } catch (ReflectiveOperationException e) {throw new ExceptionInInitializerError(e);
       }
        }
        private volatile V value;
        ....

    ABA 问题

  • 线程 X 筹备将变量的值从 A 改为 B,然而这期间线程 Y 将变量的值从 A 改为 C,而后再改为 A;最初线程 X 检测变量值是 A,并置换为 B。但实际上,A 曾经不再是原来的 A 了
  • 解决办法,是把变量定为惟一类型。值能够加上版本号,或者工夫戳。如加上版本号,线程 Y 的批改变为 A1->B2->A3,此时线程 X 再更新则能够判断出 A1 不等于 A3
  • AtomicStampedReference 的实现和 AtomicReference 差不多,不过它原子批改的变量是volatile Pair<V> pair;,Pair 是其内部类。AtomicStampedReference 能够用来解决 ABA 问题

    public class AtomicStampedReference<V> {
        private static class Pair<T> {
       final T reference;
       final int stamp;
       private Pair(T reference, int stamp) {
           this.reference = reference;
           this.stamp = stamp;
       }
       static <T> Pair<T> of(T reference, int stamp) {return new Pair<T>(reference, stamp);
       }
        }
        private volatile Pair<V> pair;
  • 如果咱们不关怀变量在两头过程是否被批改过,而只是关怀以后变量是否还是原先的变量,则能够应用 AtomicMarkableReference
  • AtomicStampedReference 的应用示例

    public class Main {public static void main(String[] args) throws Exception {Test old = new Test("hello"), newTest = new Test("world");
       AtomicStampedReference<Test> reference = new AtomicStampedReference<>(old, 1);
       reference.compareAndSet(old, newTest,1,2);
       System.out.println("对象:"+reference.getReference().name+"; 版本号:"+reference.getStamp());
        }
    }
    class Test{Test(String name){this.name = name;}
        public String name;
    }
    --------------- 输入后果 ------------------
    对象:world; 版本号:2

    数组原子类

    AtomicIntegerArray     // 整型数组
    AtomicLongArray         // 长整型数组
    AtomicReferenceArray    // 援用类型数组
  • 数组原子类外部会初始一个 final 的数组,它把整个数组当做一个对象,而后依据下标 index 计算法元素偏移量,再调用 UNSAFE.compareAndSetReference 进行原子操作。数组并没被 volatile 润饰,为了保障元素类型在不同线程的可见,获取元素应用到了 UNSAFEpublic native Object getReferenceVolatile(Object o, long offset)办法来获取实时的元素值
  • 应用示例

    // 元素默认初始化为 0
    AtomicIntegerArray array = new AtomicIntegerArray(2);
    // 下标为0的元素,期待值是 0,更新值是1
    array.compareAndSet(0,0,1);
    System.out.println(array.get(0));
    --------------- 输入后果 ------------------
    1

属性原子类

AtomicIntegerFieldUpdater 
AtomicLongFieldUpdater
AtomicReferenceFieldUpdater
  • 如果操作对象是某一类型的属性,能够应用 AtomicIntegerFieldUpdater 原子更新,不过类的属性须要定义成 volatile 润饰的变量,保障该属性在各个线程的可见性,否则会报错
  • 应用示例

    public class Main {public static void main(String[] args) {AtomicReferenceFieldUpdater<Test,String> fieldUpdater = AtomicReferenceFieldUpdater.newUpdater(Test.class,String.class,"name");
       Test test = new Test("hello world");
       fieldUpdater.compareAndSet(test,"hello world","siting");
       System.out.println(fieldUpdater.get(test));
       System.out.println(test.name);
        }
    }
    class Test{Test(String name){this.name = name;}
        public volatile String name;
    }
    --------------- 输入后果 ------------------
    siting
    siting

累加器

Striped64
LongAccumulator
LongAdder
//accumulatorFunction:运算规定,identity:初始值
public LongAccumulator(LongBinaryOperator accumulatorFunction,long identity)
  • LongAccumulator 和 LongAdder 都继承于 Striped64,Striped64 的次要思维是和 ConcurrentHashMap 有点相似,分段计算,单个变量计算并发性能慢时,咱们能够把数学运算扩散在多个变量,而须要计算总值时,再一一累加起来
  • LongAdder 相当于 LongAccumulator 一个特例实现
  • LongAccumulator 的示例

    public static void main(String[] args) throws Exception {LongAccumulator accumulator = new LongAccumulator(Long::sum, 0);
        for(int i=0;i<100000;i++){CompletableFuture.runAsync(() -> accumulator.accumulate(1));
        }
        Thread.sleep(1000); // 期待全副 CompletableFuture 线程执行实现,再获取
        System.out.println(accumulator.get());
    }
    --------------- 输入后果 ------------------
    100000

    同步组件的实现原理

  • java 的少数同步组件会在外部保护一个状态值,和原子组件一样,批改状态值时个别也是通过 cas 来实现。而状态批改的保护工作被 Doug Lea 形象出 AbstractQueuedSynchronizer(AQS)来实现
  • AQS 的原理能够看下之前写的一篇文章: 详解锁原理,synchronized、volatile+cas 底层实现

同步组件

ReentrantLock、ReentrantReadWriteLock

  • ReentrantLock、ReentrantReadWriteLock 都是基于 AQS(AbstractQueuedSynchronizer)实现的。因为它们有偏心锁和非偏心锁的辨别,因而没间接继承 AQS,而是应用外部类去继承,偏心锁和非偏心锁各自实现 AQS,ReentrantLock、ReentrantReadWriteLock 再借助外部类来实现同步
  • ReentrantLock 的应用示例

    ReentrantLock lock = new ReentrantLock();
    if(lock.tryLock()){
        // 业务逻辑
        lock.unlock();}
  • ReentrantReadWriteLock 的应用示例

    public static void main(String[] args) throws Exception {ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        if(lock.readLock().tryLock()){ // 读锁
       // 业务逻辑
       lock.readLock().unlock();
        }
        if(lock.writeLock().tryLock()){ // 写锁
       // 业务逻辑
       lock.writeLock().unlock();
        }
    }

    Semaphore 实现原理和应用场景

  • Semaphore 和 ReentrantLock 一样,也有偏心和非公平竞争锁的策略,一样也是通过外部类继承 AQS 来实现同步
  • 艰深解释:假如有一口井,最多有三个人的地位打水。每有一个人打水,则须要占用一个地位。当三个地位全副占满时,第四个人须要打水,则要期待前三个人中一个来到打水位,能力持续获取打水的地位
  • 应用示例

    public static void main(String[] args) throws Exception {Semaphore semaphore = new Semaphore(2);
        for (int i = 0; i < 3; i++)
       CompletableFuture.runAsync(() -> {
           try {System.out.println(Thread.currentThread().toString() + "start");
               if(semaphore.tryAcquire(1)){Thread.sleep(1000);
                   semaphore.release(1);
                   System.out.println(Thread.currentThread().toString() + "无阻塞完结");
               }else {System.out.println(Thread.currentThread().toString() + "被阻塞完结");
               }
           } catch (Exception e) {throw new RuntimeException(e);
           }
       });
        // 保障 CompletableFuture 线程被执行,主线程再完结
        Thread.sleep(2000);
    }
    --------------- 输入后果 ------------------
    Thread[ForkJoinPool.commonPool-worker-19,5,main] start 
    Thread[ForkJoinPool.commonPool-worker-5,5,main] start 
    Thread[ForkJoinPool.commonPool-worker-23,5,main] start 
    Thread[ForkJoinPool.commonPool-worker-23,5,main] 被阻塞完结 
    Thread[ForkJoinPool.commonPool-worker-5,5,main] 无阻塞完结 
    Thread[ForkJoinPool.commonPool-worker-19,5,main] 无阻塞完结 
  • 能够看出三个线程,因为信号量设定为2,第三个线程是无奈获取信息胜利的,会打印阻塞完结

    CountDownLatch 实现原理和应用场景

  • CountDownLatch 也是靠 AQS 实现的同步操作
  • 艰深解释:玩游戏时,如果主线工作须要靠实现五个小工作,主线工作能力持续进行时。此时能够用 CountDownLatch,主线工作阻塞期待,每实现一小工作,就 done 一次计数,直到五个小工作全副被执行能力触发主线
  • 应用示例

    public static void main(String[] args) throws Exception {CountDownLatch count = new CountDownLatch(2);
        for (int i = 0; i < 2; i++)
       CompletableFuture.runAsync(() -> {
           try {Thread.sleep(1000);
               System.out.println("CompletableFuture over");
               count.countDown();} catch (Exception e) {throw new RuntimeException(e);
           }
       });
        // 期待 CompletableFuture 线程的实现
        count.await();
        System.out.println("main over");
    }
    --------------- 输入后果 ------------------
     CompletableFuture over 
     CompletableFuture over 
     main over 

    CyclicBarrier 实现原理和应用场景

  • CyclicBarrier 则是靠 ReentrantLock lockCondition trip属性来实现同步
  • 艰深解释:CyclicBarrier 须要阻塞全副线程到 await 状态,而后全副线程再全副被唤醒执行。设想有一个栏杆拦住五只羊,须要当五只羊一起站在栏杆时,栏杆才会被拉起,此时所有的羊都能够飞跑出羊圈
  • 应用示例

    public static void main(String[] args) throws Exception {CyclicBarrier barrier = new CyclicBarrier(2);
        CompletableFuture.runAsync(()->{
       try {System.out.println("CompletableFuture run start-"+ Clock.systemUTC().millis());
           barrier.await(); // 须要期待 main 线程也执行到 await 状态能力继续执行
           System.out.println("CompletableFuture run over-"+ Clock.systemUTC().millis());
       }catch (Exception e){throw new RuntimeException(e);
       }
        });
        Thread.sleep(1000);
        // 和 CompletableFuture 线程互相期待
        barrier.await();
        System.out.println("main run over!");
    }
    --------------- 输入后果 ------------------
    CompletableFuture run start-1609822588881
    main run over!
    CompletableFuture run over-1609822589880

    StampedLock

  • StampedLock 不是借助 AQS,而是本人外部保护多个状态值,并配合 cas 实现的
  • StampedLock 具备三种模式:写模式、读模式、乐观读模式
  • StampedLock 的读写锁能够互相转换

    // 获取读锁,自旋获取,返回一个戳值
    public long readLock()
    // 尝试加读锁,不胜利返回 0
    public long tryReadLock()
    // 解锁
    public void unlockRead(long stamp) 
    // 获取写锁,自旋获取,返回一个戳值
    public long writeLock()
    // 尝试加写锁,不胜利返回 0
    public long tryWriteLock()
    // 解锁
    public void unlockWrite(long stamp)
    // 尝试乐观读读取一个工夫戳,并配合 validate 办法校验工夫戳的有效性
    public long tryOptimisticRead()
    // 验证 stamp 是否无效
    public boolean validate(long stamp)
  • 应用示例

    public static void main(String[] args) throws Exception {StampedLock stampedLock = new StampedLock();
        long stamp = stampedLock.tryOptimisticRead();
        // 判断版本号是否失效
        if (!stampedLock.validate(stamp)) {
       // 获取读锁,会空转
       stamp = stampedLock.readLock();
       long writeStamp = stampedLock.tryConvertToWriteLock(stamp);
       if (writeStamp != 0) { // 胜利转为写锁
           //fixme 业务操作
           stampedLock.unlockWrite(writeStamp);
       } else {stampedLock.unlockRead(stamp);
           // 尝试获取写读
           stamp = stampedLock.tryWriteLock();
           if (stamp != 0) {
               //fixme 业务操作
               stampedLock.unlockWrite(writeStamp);
           }
       }
        }
    }    

欢送指注释中谬误

参考文章

  • 并发之 Striped64(l 累加器)
正文完
 0