关于java:Java面试问题汇总Concurrent并发

96次阅读

共计 21817 个字符,预计需要花费 55 分钟才能阅读完成。

1. 运行中的线程是否强制杀死

Jdk 提供了 stop() 办法 用于强制进行线程,但官网并 不倡议应用 ,因为强制进行线程 会导致线程应用的资源,比方文件描述符、网络连接处于不失常的状态 倡议应用标记位的形式来终止线程 ,如果 线程中有应用无限期的阻塞形式 ,比方 wait() 没有设置超时工夫,就只能应用 interrupt() 办法 来终止线程

@SneakyThrows
@Test
public void stack() {Thread1 thread1 = new Thread1();
    thread1.start();
    TimeUnit.MILLISECONDS.sleep(1);
    thread1.setStop();}

class Thread1 extends Thread{

    private volatile boolean isStop = false;

    @SneakyThrows
    @Override
    public void run() {while (!isStop) {System.out.println(Thread.currentThread().getName() + "run...");
        }
    }

    public void setStop() {isStop = true;}
}

2.ThreadLocal 子类及原理, OOM 产生起因及防治

原文:https://www.cnblogs.com/micra…

1)什么是 ThreadLocal?
ThreadLocal 类顾名思义能够了解为 线程本地变量 。也就是说如果 定义了一个 ThreadLocal每个线程往这个 ThreadLocal 中读写是线程隔离 ,相互之间不会影响的。它提供了一种将可变数据通过每 个线程有本人的独立正本 从而 实现线程关闭 的机制。

2)它大抵的实现思路是怎么的?
Thread 类有一个类型为 ThreadLocal.ThreadLocalMap 的实例变量 threadLocals,也就是说 每个线程有一个本人的 ThreadLocalMap。ThreadLocalMap 有本人的独立实现,能够简略地将它的 key 视作 ThreadLocalvalue 为代码中放入的值 实际上 key 并不是 ThreadLocal 自身,而是它的一个弱援用 )。每个线程在往某个 ThreadLocal 里 塞值的时候 ,都会 往本人的 ThreadLocalMap 里存 也是 以某个 ThreadLocal 作为援用 ,在本人的map 里找对应的 key,从而 实现了线程隔离

3)ThreadLocal 的 API

  • 构造函数 ThreadLocal<T>()
  • 初始化 initialValue(): 返回此线程以后线程 局部变量的初始值
  • 拜访器 get/set
  • 回收 remove

4) ThreadLocalMap 的源码实现

ThreadLocalMap 提供了一种为 ThreadLocal 定制的高效实现,并且自带一种基于弱援用的垃圾清理机制。
上面从根本构造开始一点点解读。

4.1 存储构造
既然是个 map(留神不要与 java.util.map 一概而论,这里指的是概念上的 map),当然得要有本人的 key 和 value,下面答复的问题 2 中也曾经提及,咱们能够将其 简略视作 key 为 ThreadLocal,value 为理论放入的值 。之所以说是简略视作,因为 实际上 ThreadLocal 中寄存的是 ThreadLocal 的弱援用。咱们来看看 ThreadLocalMap 里的节点是如何定义的。

static class Entry extends WeakReference<java.lang.ThreadLocal<?>> {
    // 往 ThreadLocal 里理论塞入的值
    Object value;

    Entry(java.lang.ThreadLocal<?> k, Object v) {super(k);
        value = v;
    }
}

Entry便是 ThreadLocalMap 里定义的 节点 ,它继承了 WeakReference 类,定义了一个 类型为 Object 的 value,用于 寄存塞到 ThreadLocal 里的值

4.2 为什么要弱援用

因为如果这里应用一般的 key-value 模式来定义存储构造,本质上就会造成节点的生命周期与线程强绑定,只有线程没有销毁,那么节点在 GC 剖析中始终处于可达状态,没方法被回收,而程序自身也无奈判断是否能够清理节点。弱援用是 Java 中四档援用的第三档,比软援用更加弱一些,如果一个对象没有强援用链可达,那么个别活不过下一次 GC。当某个 ThreadLocal 曾经没有强援用可达,则随着它被垃圾回收,在 ThreadLocalMap 里对应的 Entry 的键值会生效,这 为 ThreadLocalMap 自身的垃圾清理提供了便当

4.3 类成员变量与相应办法

/**
 * 初始容量,必须为 2 的幂
 */
private static final int INITIAL_CAPACITY = 16;

/**
 * Entry 表,大小必须为 2 的幂
 */
private Entry[] table;

/**
 * 表里 entry 的个数
 */
private int size = 0;

/**
 * 重新分配表大小的阈值,默认为 0
 */
private int threshold; 

能够看到,ThreadLocalMap 保护了一个 Entry 表或者说 Entry 数组,并且要求表的大小必须为 2 的幂,同时记录表外面 entry 的个数以及下一次须要扩容的阈值。
显然这里会产生一个问题,为什么 必须是 2 的幂?很好,然而目前还无法回答,带着问题接着往下读。

/**
 * 设置 resize 阈值以维持最坏 2 / 3 的装载因子
 */
private void setThreshold(int len) {threshold = len * 2 / 3;}

/**
 * 环形意义的下一个索引
 */
private static int nextIndex(int i, int len) {return ((i + 1 < len) ? i + 1 : 0);
}

/**
 * 环形意义的上一个索引
 */
private static int prevIndex(int i, int len) {return ((i - 1 >= 0) ? i - 1 : len - 1);
}

ThreadLocal须要维持一个最坏 2 / 3 的负载因子 ,对于负载因子置信应该不会生疏,在 HashMap 中就有这个概念。
ThreadLocal 有 两个办法用于失去上一个 / 下一个索引 ,留神这里实际上是 环形 意义下的上一个与下一个。

因为 ThreadLocalMap 应用 线性探测法来解决散列抵触 ,所以实际上 Entry[] 数组在程序逻辑上是作为一个环形存在的。
对于凋谢寻址、线性探测等内容,能够参考网上材料或者 TAOCP(《计算机程序设计艺术》)第三卷的 6.4 章节。

至此,咱们曾经能够大抵勾画出 ThreadLocalMap 的外部存储构造。上面是我绘制的示意图。虚线示意弱援用,实线示意强援用。

ThreadLocalMap 保护了 Entry 环形数组,数组中元素 Entry 的逻辑上的 key 为某个 ThreadLocal 对象(实际上是指向该 ThreadLocal 对象的弱援用),value 为代码中该线程往该 ThreadLoacl 变量理论塞入的值。

5)ThreadLocal 与内存透露

对于 ThreadLocal 是否会引起内存透露也是一个比拟有争议性的问题,其实就是要看对内存透露的精确定义是什么。
认为 ThreadLocal 会引起内存透露的说法是因为如果一个 ThreadLocal 对象被回收了,咱们往里面放的 value 对于【以后线程 -> 以后线程的 threadLocals(ThreadLocal.ThreadLocalMap 对象)->Entry 数组 -> 某个 entry.value】这样一条强援用链是可达的,因而 value 不会被回收。
认为 ThreadLocal 不会引起内存透露的说法是因为 ThreadLocal.ThreadLocalMap 源码实现中自带一套自我清理的机制。

之所以有对于内存泄露的探讨是因为在有线程复用如线程池的场景中,一个线程的寿命很长,大对象长期不被回收影响零碎运行效率与平安。如果线程不会复用,用完即销毁了也不会有 ThreadLocal 引发内存泄露的问题。《Effective Java》一书中的第 6 条对这种内存泄露称为 unintentional object retention(有意识的对象保留)。

当咱们认真读过 ThreadLocalMap 的源码,咱们能够推断,如果在应用的 ThreadLocal 的过程中,显式地进行 remove 是个很好的编码习惯,这样是不会引起内存透露。
那么如果没有显式地进行 remove 呢?只能说如果对应线程之后调用 ThreadLocal 的 get 和 set 办法都有很高的概率会顺便清理掉有效对象,断开 value 强援用,从而大对象被收集器回收。

但无论如何,咱们应该思考到 何时调用 ThreadLocal 的 remove 办法。一个比拟相熟的场景就是对于一个申请一个线程的 server 如 tomcat,在代码中对 web api 作一个切面,寄存一些如用户名等用户信息,在连接点办法完结后,再显式调用 remove。

OOM 起因及防治
ThreadLocal 只是一个工具类,具体寄存变量的是线程的 threadLocals 变量,threadLocals 是一个 ThreadLocalMap 类型的变量,外部是一个 Entry 数组,Entry 继承自 WeakReference,Entry 外部的 value 用来寄存通过 ThreadLocal 的 set 办法传递的值,key 是 ThreadLocal 的弱援用,key 尽管会被 GC 回收,但 value 不能被回收,这时候 ThreadLocalMap 中会存在 key 为 null,value 不为 null 的 entry 项,如果工夫长了就会存在大量无用对象,造成 OOM。尽管 set,get 也提供了一些对 Entry 项清理的机会,但不及时,所以在应用结束后须要及时调用 remove

6) InheritableThreadLocal 原理

对于 InheritableThreadLocal,本文不作过多介绍,只是简略略过。
ThreadLocal 自身是线程隔离的,InheritableThreadLocal 提供了一种父子线程之间的数据共享机制。

它的具体实现是在 Thread 类中除了 threadLocals 外还有一个 inheritableThreadLocals 对象。

在线程对象初始化的时候,会调用 ThreadLocal 的 createInheritedMap 从父线程的 inheritableThreadLocals 中把无效的 entry 都拷过去

须要留神的中央是 InheritableThreadLocal 只是在子线程创立的时候会去拷一份父线程的 inheritableThreadLocals。如果父线程是在子线程创立后再 set 某个 InheritableThreadLocal 对象的值,对子线程是不可见的。

3. 有哪些并发队列

  • ConcurrentLinkedQueue : 无界非阻塞 队列,底层应用 单向链表 实现,对于出队和入队应用CAS 来实现线程平安
  • LinkedBlockingQueue: 有界阻塞 队列,应用 单向链表 实现,通过 ReentrantLock 实现线程平安 阻塞通过 Condition 实现,出队和入队各一把锁,不存在相互竞争
  • ArrayBlockingQueue: 有界数组形式实现的阻塞队列 , 通过ReentrantLock 实现线程平安 阻塞通过 Condition 实现,出队和入队应用同一把锁
  • PriorityBlockingQueue: 带 优先级的无界阻塞 队列,外部应用 均衡二叉树堆 实现,遍历保障有序须要自定排序
  • DelayQueue: 无界阻塞提早 队列,队列中的每个元素都有个过期工夫,当从队列获取元素时,只有过期元素才会出队列,队列头元素是最快要过期的元素
  • SynchronousQueue: 任何一个写须要期待一个读的操作,读操作也必须期待一个写操作,相当于数据交换 https://www.cnblogs.com/dwlsx…
  • LinkedTransferQueue: 由链 表组成的无界阻塞 队列,多了 tryTransfer 和 transfer 办法。transfer 办法,可能把生产者元素立即传输给消费者,如果没有消费者在期待,那就会放入队列的 tail 节点,并阻塞期待元素被生产了返回,能够应用带超时的办法。tryTransfer 办法,会在没有消费者期待接管元素的时候马上返回 false
  • LinkedBlockingDeque: 由 链表组成的双向阻塞队列,能够从队列的两端插入和移除元素

4.ThreadPoolExecutor 构造函数有哪几个参数,实现原理,创立线程池的形式

  • 结构参数:

    • corePoolSize: 线程池外围线程个数
    • maximunPoolSize: 线程池最大线程数量
    • keeyAliveTime: 闲暇线程存活工夫
    • TimeUnit: 存活工夫单位
    • workQueue: 用于保留期待执行工作的阻塞队列
    • ThreadFactory: 创立线程的工厂
    • RejectedExecutionHandler: 队列满,并且线程达到最大线程数量的时候,对新工作的解决策略,AbortPolicy(抛出异样)、CallerRunsPolicy(应用调用者所在线程执行)、DiscardOldestPolicy(调用 poll 抛弃一个工作,执行当前任务)、DiscardPolicy(默默抛弃、不抛异样)
  • 原理:
    线程池次要是解决两个问题:

    一个是当执行 大量异步工作 时可能提供 较好的性能 ,能 复用线程解决工作

    二是可能 对线程池 进行 资源限度和治理

  • 一个工作提交的线程池,首先会 判断外围线程池是否已满 未满 就会 创立 worker 线程执行工作 已满 判断 阻塞队列是否已满 ,阻塞队列 未满退出阻塞队列 已满就 判断 线程池线程数量是否 曾经达到 最大值 没有 新建线程执行工作 达到最大 值的话 执行回绝策略

    • 回绝策略有:间接抛出异样、应用调用者所在线程执行、抛弃一个旧工作,执行当前任务、间接抛弃什么都不做。
  • 创立线程池的形式:间接 new ThreadPoolExecutor 或者通过 Executors 工具类创立

5.Executors 能够创立的线程池类型

  1. newFixedThreadPool 创立一个 外围线程数跟最大线程数雷同 的线程池,因而池中的线程数量既不会减少也不会变少,如果有闲暇线程工作就会被执行,如果没有就放入工作队列,期待闲暇线程
  2. newSingleThreadExecutor 创立一个 只有一个线程 的线程池,可能串行执行工作,如果线程因为异样而进行,会主动新建一个线程补充
  3. newCachedThreadPool 创立一个 外围线程数为 0 最大线程为 Inter.MAX_VALUE的线程池,也就是说 没有限度, 线程池中的线程数量不确定,但如果有闲暇线程能够复用,则优先应用,如果没有闲暇线程,则创立新线程解决工作,解决完放入线程池
  4. newSingleThreadScheduledExecutor 创立 只有一个线程的能够定时执行 的线程池
  5. newScheduledThreadPool 创立一个 没有最大线程数限度的能够定时执行 线程池
  6. newWorkStealingPool 创立一个 含有足够多线程 的线程池,可能调用闲置的 CPU 去解决其余的工作,应用 ForkJoinPool 实现,jdk8 新增

6. 线程池的阻塞队列为什么都用 LinkedBlockingQueue,而不必 ArrayBlockingQueue

LinkedBlockingQueue 应用单向链表实现,在申明 LinkedBlockingQueue 的时候,能够不指定队列长度 ,长度为 Integer.MAX_VALUE, 并且新建了一个 Node 对象,Node 对象具备 item,next 变量,item 用于存储元素,next 指向链表下一个 Node 对象,在刚开始的时候链表的 head,last 都指向该 Node 对象,item、next 都为 null, 新元素放在链表的尾部,并从头部取元素。 取元素的时候只是一些指针的变动 ,LinkedBlockingQueue 给put(放入元素),take(取元素) 都申明了一把锁 ,放入和取 互不影响,效率更高

ArrayBlockingQueue 应用数组实现,在申明的时候 必须指定长度 ,如果长度太大,造成内存节约,长度太小,并发性能不高,如果数组满了,就无奈放入元素,除非有其余线程取出元素, 放入和取出都应用同一把锁 ,因而 存在竞争,效率比 LinkedBlockingQueue 低

7. 为什么倡议在不必线程池的时候,敞开线程池

线程池的作用的确是为了缩小频繁创立线程,使线程达到复用。但如果在 不必线程池的状况下 ,线程池中的 外围线程会始终存在 浪费资源 ,所以倡议在不必的状况下调用shutdown 办法 敞开线程池。在须要的时候再调用创立线程池。

8. 如何正当的配置 Java 线程池

如 CPU 密集型的工作,根本线程池应该配置多大?IO 密集型的工作,根本线程池应该配置多大?用有界队列好还是无界队列好?工作十分多的时候,应用什么阻塞队列能获取最好的吞吐量?

CPU 密集型 ,为了充沛应用 CPU,缩小上下文切换,线程数配置成CPU 个数 +1 个即可

IO 密集型,因为可能大部分线程在解决 IO,IO 都比拟耗时,因而能够配置成 2*CPU 个数的线程,去解决其余工作

9.Timer 和 ScheduledThreadPoolExecutor 区别

Timer是固定的 多线程生产者单线程生产,如果其中一个工作报错,其余工作也会生效;

但后者是能够配置的,既 能够是多线程生产单线程生产也能够是多线程生产多线程生产

10. 说说 CopyOnWriteArrayList

CopyOnWriteArrayList 是一个 线程平安的 ArrayList,对其的 批改操作是在一个复制的数组上进行的,不影响其余线程的读操作

其中通过ReentrantLock 独占锁保障只有一个线程对底层数组进行批改

因为在进行 批改操作 的时候,底层会 复制一个新的数组 ,而 读是在原数组 上进行的,因而在多线程环境下这里会产生 数据不统一的状况 ,称为 弱一致性

实用于多读少写 场景

public class CopyOnWriteArrayListTest {

    // 模仿测试 CopyOnWriteArrayList 的弱一致性
    @Test
    public void ListrayTest() throws InterruptedException {AryTest aryTest = new AryTest();

        StrClass strClass = new StrClass(aryTest.str1);
        String[] str2 = (String[]) strClass.getObjects();

        Thread thread = new Thread(() -> {aryTest.add();
            System.out.println(Arrays.toString(aryTest.str1));
        });

        thread.start();
        thread.join();

        System.out.println("str2=" + Arrays.toString(str2));
    }

    static class AryTest {String[] str1 = new String[]{"a", "b"};

        public void add() {String[] str2 = Arrays.copyOf(str1, str1.length + 1);
            str2[str2.length - 1] = "c";
            str1 = str2;
        }
    }

    static class StrClass {final Object[] objects;

        public StrClass(Object [] objects) {this.objects = objects;}

        public Object[] getObjects() {return objects;}
    }


    //copyOnWriteArrayList 测试
    @Test
    public void copyOnWriteArrayList() throws InterruptedException {String[] str1 = new String[]{"a", "b"};
        List<String> copyOnWriteArrayList = new CopyOnWriteArrayList<>(str1);
        Thread thread = new Thread(() -> {copyOnWriteArrayList.add("c");
            System.out.println(copyOnWriteArrayList);
        });
        thread.start();
        System.out.println(copyOnWriteArrayList);
        thread.join();}
}
[a, b]
[a, b, c]

11.CountDownLatch 原理、CyclicBarrier 原理,两者区别

CountDownLatch:
应用 AQS 实现,通过 AQS 的状态变量 state 来作为计数器值, 当多个线程调用 countdown 办法时理论是 子性递加 AQS 的状态值 ,当线程 调用 await 办法 后以后线程会被放入AQS 阻塞队列期待计数器为 0 再返回

CyclicBarrier:

区别:CountDownLatch 计数器是一次性的 ,变为 0 后就起不到线程同步的作用了。而CyclicBarrier(撒克里克巴瑞儿) 在计数器变为 0 后从新开始 ,通过调用 await 办法,能在所有线程 达到屏障点后对立执行某个工作,再执行完后继续执行子线程,通过 ReentrantLock 实现

12.Phaser 的实现

Phaser 能够代替 CountDownLatch 和 CyclicBarrier,但比两者更加弱小,能够 动静调整须要的线程个数 ,能够通过 构造函数传入父 Phaser 实现档次 Phaser

public class PhaserTest {Phaser phaser = new Phaser(2);

    ExecutorService executor = Executors.newFixedThreadPool(2);

   @Test
   public void test1() {executor.submit(() -> {System.out.println(Thread.currentThread().getName() + "step1");
           // 等同 countDown()
           phaser.arrive();});

       executor.submit(() -> {System.out.println(Thread.currentThread().getName() + "step2");
           phaser.arrive(d);
       });

       // 等同 await()
       phaser.awaitAdvance(phaser.getPhase());
       System.out.println("thread end");
   }
}

13.Semaphore 原理

Semaphore 能够用来 管制同时拜访特定资源的线程数量 ,它通过 协调各个线程,以保障正当的应用公共资源 , 应用AQS 实现,AQS 的状态变量 state 做为许可证数量,每次通过 acquire()/tryAcquire(),许可证数量通过CAS 原子性递加,调用 release() 开释许可证,原子性递增,只有有许可证就能够重复使用

@Test
public void semaphoreTest() throws InterruptedException {Semaphore semaphore = new Semaphore(3);
    for (int i = 0; i < 5; i++) {executor.execute(() -> {
            try {semaphore.acquire();
                System.out.println("输入");
                //semaphore.release();} catch (InterruptedException e) {e.printStackTrace();
            }
        });
    }

    TimeUnit.SECONDS.sleep(3);
    System.out.println("开释许可证 ===");
    semaphore.release(2);
}
输入
输入
输入
开释许可证 ===
输入
输入

14.Exchanger 原理

用于进行 线程间的数据交换 ,它提供一个同步点,在这个 同步点两个线程能够替换彼此的数据 。如果第一个线程 先执行 exchange()办法 ,它会 始终期待第二个线程也执行 exchange 办法 ,当都达到 同步点时 ,这两个线程能够 替换数据

 @Test
public void exchangerTest() throws InterruptedException {Exchanger<Integer> exchanger = new Exchanger<> ();

    Thread thread = new Thread(() -> {
        Integer a = 1;
        try {Integer b = exchanger.exchange(a);
            System.out.println("Thread1:" + b);
        } catch (InterruptedException e) {e.printStackTrace();
        }
    });

    thread.start();
    Integer c = exchanger.exchange(2);
    Thread.sleep(3000);
    System.out.println("Thread2:" + c);
}
Thread1:2
Thread2:1

15.### volatile 的特点

  • 批改可见性,通过 volatile 润饰的变量(包含对象和数组),线程对变量的批改对另一个线程即时可见,因而另外一个线程在应用前会去判断该变量是否有批改;
  • 禁止指令重排序
    long 和 double 的读写操作分为两次 32 位操作进行,其余根本数据类型的读写操作都是原子性的
    实现:退出 volatile 关键字的代码生成汇编代码,会发现多了一个 lock 前缀指令,这个指令相当于一个内存屏障,能够避免重排序,通过空的写入操作将变量的批改写入到主内存中,这就实现了可见性

volatile 润饰对象剖析可见:https://www.jianshu.com/p/8f2…

什么时候应用 volatile?

  • 写入变量值不依赖变量的以后值。因为如果依赖以后值,将是获取 - 计算 - 写入三步操作,这三步操作不是原子性的,而 volatile 不保障原子性
  • 读写变量值时没有加锁。因为加锁曾经保障了内存可见性,没必要再应用 volatile

    volatile 不能保障原子性

public class VolatileTest {
    
    public volatile int inc = 0;

    public void increase() {inc++;}
    
    public static void main(String[] args) {final VolatileTest test = new VolatileTest();
        
        for(int i = 0; i < 10; i++) {new Thread(() -> {for(int j = 0; j < 1000; j++)
                    test.increase();}).start();}
        // 保障后面的线程都执行完
        while (Thread.activeCount() > 1)
            Thread.yield();

        System.out.println(test.inc);
    }
}
// 输入:<=10000

伪共享

在 CPU 和主内存之间增加一级或者多级高速缓冲存储器(Cache),这个缓存被集成到 CPU 外部,在 Cache 外部是按行存储的,每一行称为一个 Cache 行,大小个别为 2 的幂次数字节,当拜访某个变量时,首先会去看 CPU Cache 内是否有该变量,如果有则间接从中获取,否则就去主内存外面获取该变量,而后把该变量所在的内存区域的一个 Cache 行的内存复制到 Cache 中。因为 寄存到 Cache 行的是内存块而不是单个变量,所以可能会把多个变量寄存到一个 Cache 行中。依据缓存一致性协定,CPU 在批改缓存行中的变量时,同一缓存行的数据将生效,这时候其余 CPU 须要从二级缓存或者主存中加载数据,这就导致了性能问题,称为伪共享

伪共享解决:字节填充 应用 sun.misc.Contended 注解

@Contended 注解只用于 Java 外围类,如果用户类门路下的类要应用这个注解,须要增加 JVM 参数:-XX:-RestrictContended。默认填充宽度为 128,须要自定义宽度设置 -XX:ContendedPaddingWidth 参数

CPU 缓存行具体阐明:https://mp.weixin.qq.com/s/yo…

原子操作类

AtomicBoolean

布尔类型 的原子操作类,外部应用 int 型存储布尔值,0 示意 false,1 示意 true

AtomicInteger

整型 的原子操作类,1.8 后提供函数式操作的办法

  • int getAndUpdate(IntUnaryOperator updateFunction)

    应用 指定函数计算并更新 返回计算前后果

  • int updateAndGet(IntUnaryOperator updateFunction)

    应用 指定函数计算并更新 返回计算后的后果

  • int getAndAccumulate(int x,IntBinaryOperator accumulatorFunction)

    应用 指定的函数计算 x 值和以后值 返回计算前后果

  • int accumulateAndGet(int x,IntBinaryOperator accumulatorFunction)

    应用 指定的函数计算 x 值和以后值 返回结算后的后果

@Test
public void atomicIntegerTest() {AtomicInteger atomicInteger = new AtomicInteger(1);

    int result = atomicInteger.getAndUpdate(e -> e + 3);
    //// 返回计算前后果
    assert result == 1;
    //1 + 3
    assert atomicInteger.get() == 4;


    result = atomicInteger.updateAndGet(e -> e + 3);
    // 返回计算后后果
    assert result == 7;
    // 4 + 3
    assert atomicInteger.get() == 7;


    result = atomicInteger.getAndAccumulate(10, (x, y) -> x + y);
    // 返回计算前后果
    assert  result == 7;
    // 7 + 10
    assert atomicInteger.get() == 17;

    result = atomicInteger.accumulateAndGet(10, (x, y) -> x + y);
    // 返回计算后的后果
    assert result == 27;
    assert atomicInteger.get() == 27;}

AtomicIntegerArray

提供了 原子性更新整型数组元素 的形式

  • int getAndUpdate(int i, IntUnaryOperator updateFunction)

    应用 指定函数计算 i 索引的值 返回计算前后果

  • int updateAndGet(int i, IntUnaryOperator updateFunction)

    应用 指定函数计算 i 索引的值 返回计算后后果

  • int getAndAccumulate(int i, int x, IntBinaryOperator accumulatorFunction)

    应用 指定的函数计算 x 值和 i 索引的值 返回计算前后果

  • int accumulateAndGet(int i, int x, IntBinaryOperator accumulatorFunction)
  • 应用 指定的函数计算 x 值和 i 索引的值 返回计算后后果
@Test
public void atomicIntegerArrayTest() {AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(10);
    int result = atomicIntegerArray.getAndUpdate(0, e -> e + 1);
    assert result == 0;
    // 0 + 1
    assert atomicIntegerArray.get(0) == 1;

    result = atomicIntegerArray.updateAndGet(0, e -> e + 1);
    assert result == 2;
    //1 + 1
    assert atomicIntegerArray.get(0) == 2;

    result = atomicIntegerArray.getAndAccumulate(0, 3, (x, y) -> x + y);
    assert result == 2;
    // 2 + 3
    assert atomicIntegerArray.get(0) == 5;
}

AtomicIntegerFieldUpdater

通过 反射实现 ,能够 对指定类的指定字段(被 volatile 润饰)的 int 型字段进行原子更新,更新字段类型必须为 volatile int

源码剖析 > AtomicIntegerFieldUpdater 源码剖析

@Test
public void fieldUpdaterTest() {
    // 第一个参数 持有给定字段的指标对象类  第二个参数 要更新的字段名称,必须在给定的指标对象中
    AtomicIntegerFieldUpdater fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Person.class, "count");
    Person person = new Person("小明", "男");
    fieldUpdater.addAndGet(person, 10);
    //10
    System.out.println(fieldUpdater.get(person));
    //Person{name='小明', sex='男', count=10}
    System.out.println(person);
}

private static class Person{
    String name;
    String sex;
    volatile int count;

    public Person(String name, String sex) {
        this.name = name;
        this.sex = sex;
    }

    @Override
    public String toString() {
        return "Person{" +
            "name='" + name + '\'' +
            ", sex='" + sex + '\'' +
            ", count=" + count +
            '}';
    }
}

AtomicLong

long 型 原子操作类

AtomicLongArray

提供了 原子性更新 long 型数组 元素的形式

AtomicLongFieldUpdater

通过 反射 实现,能够对 指定类的指定字段(被 volatile 润饰)的 long 型字段进行原子更新,更新字段类型必须为 volatile long

AtomicMarkableReference

通过 布尔类型做为标记,原子更新援用变量

// 结构参数,提供了泛型,而不是像 AtomicInteger, 只能是 int 型
AtomicStampedReference(V initialRef, int initialStamp)

AtomicReference

同样 提供了泛型变量,原子性更新变量,但没有标识,会产生ABA 问题

AtomicReferenceArray

提供 原子性更新泛型类数组元素的形式 ,外部应用 Object[] 数组保留援用变量

AtomicReferenceFieldUpdater

援用类型的,能够对指定类的指定字段(被 volatile 批改,根本类型)进行原子更新

AtomicStampedReference

通过保护一个版本号,原子更新援用变量,解决了 ABA 问题

DoubleAccumulator

double 类型的原子更新 类,提供自定义函数接口,通过不同的 Cell 资源,缩小了竞争

DoubleAdder

double 类型的原子更新 类,只提供累计操作

LongAccumulator

LongAdder

ABA 问题

CAS 操作中会 ABA 问题,指线程 1 应用 CAS 批改初始值为 A 的变量 X 的时候,须要先获取变量 X 的值,但这时候线程 2 曾经将变量 X 的值批改为了 B, 而后又批改成了 A,尽管字面值还是 A,但这个 A 曾经是批改后的 A 了,对于线程 1 则还是认为是原来的 A, 而持续批改变量 X 的值,这就是 ABA 问题。

因为 CAS 须要在 操作值的时候查看下值有没有发生变化,如果没有发生变化则更新 ,然而如果一个值 原来是 A,变成了 B,又变成了 A ,那么应用 CAS 进行 查看时会发现它的值没有发生变化,然而实际上却变动了。ABA 问题的解决思路就是应用版本号。在变量后面追加上版本号,每次变量更新的时候把版本号加一,那么 A-B-A 就会变成 1A-2B-3A。

解决:JDK 中提供了 AtomicStampedReference 类解决了该问题,其给每个变量的状态值都提供了一个版本号

ABA 问题

@SneakyThrows
@Test
public void test1() {AtomicInteger atomicInteger = new AtomicInteger(10);

    CountDownLatch countDownLatch = new CountDownLatch(2);

    new Thread(() -> {atomicInteger.compareAndSet(10, 11);
        atomicInteger.compareAndSet(11,10);
        System.out.println(Thread.currentThread().getName() + ":10->11->10");
        countDownLatch.countDown();}).start();

    new Thread(() -> {
        try {TimeUnit.SECONDS.sleep(1);
            boolean isSuccess = atomicInteger.compareAndSet(10,12);
            System.out.println("设置是否胜利:" + isSuccess + ", 设置的新值:" + atomicInteger.get());
        } catch (InterruptedException e) {e.printStackTrace();
        }
        countDownLatch.countDown();}).start();

    countDownLatch.await();}
// 输入:线程 2 并没有发现初始值曾经被批改
//Thread-0:10->11->10
// 设置是否胜利:true, 设置的新值:12

AtomicStampedReference 解决 ABA 问题, 通过保护一个版本号

@SneakyThrows
@Test
public void test2() {AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference(10,1);

    CountDownLatch countDownLatch = new CountDownLatch(2);

    new Thread(() -> {System.out.println(Thread.currentThread().getName() + "第一次版本:" + atomicStampedReference.getStamp());
        atomicStampedReference.compareAndSet(10, 11, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
        System.out.println(Thread.currentThread().getName() + "第二次版本:" + atomicStampedReference.getStamp());
        atomicStampedReference.compareAndSet(11, 10, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
        System.out.println(Thread.currentThread().getName() + "第三次版本:" + atomicStampedReference.getStamp());
        countDownLatch.countDown();}).start();

    new Thread(() -> {System.out.println(Thread.currentThread().getName() + "第一次版本:" + atomicStampedReference.getStamp());
        try {TimeUnit.SECONDS.sleep(2);
            boolean isSuccess = atomicStampedReference.compareAndSet(10,12, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
            System.out.println(Thread.currentThread().getName() + "批改是否胜利:" + isSuccess + "以后版本:" + atomicStampedReference.getStamp() + "以后值:" + atomicStampedReference.getReference());
            countDownLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();
        }
    }).start();

    countDownLatch.await();}
// 输入
Thread-0 第一次版本:1
Thread-0 第二次版本:2
Thread-0 第三次版本:3
Thread-1 第一次版本:3
Thread-1 批改是否胜利:true 以后版本:4 以后值:12

AtomicMarkableReference 通过标记位, 因为其标记位只有 true 和 false,如果每次更新都变更标记位,在第三次的时候标记位还是跟第一次一样,并没有解决 ABA 问题

@SneakyThrows
@Test
public void test3() {AtomicMarkableReference<Integer> markableReference = new AtomicMarkableReference<>(10, false);

    CountDownLatch countDownLatch = new CountDownLatch(2);

    new Thread(() -> {System.out.println(Thread.currentThread().getName() + "第一次标记:" + markableReference.isMarked());
        markableReference.compareAndSet(10, 11, markableReference.isMarked(), true);
        System.out.println(Thread.currentThread().getName() + "第二次标记:" + markableReference.isMarked());
        markableReference.compareAndSet(11, 10, markableReference.isMarked(), false);
        System.out.println(Thread.currentThread().getName() + "第三次标记:" + markableReference.isMarked());
        countDownLatch.countDown();}).start();

    new Thread(() -> {System.out.println(Thread.currentThread().getName() + "第一次标记:" + markableReference.isMarked());
        try {TimeUnit.SECONDS.sleep(2);
            boolean isSuccess = markableReference.compareAndSet(10,12, false, true);
            System.out.println(Thread.currentThread().getName() + "批改是否胜利:" + isSuccess + "以后标记:" + markableReference.isMarked() + "以后值:" + markableReference.getReference());
            countDownLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();
        }
    }).start();

    countDownLatch.await();}
Thread-0 第一次标记:false
Thread-0 第二次标记:true
Thread-0 第三次标记:false
Thread-1 第一次标记:false
Thread-1 批改是否胜利:true 以后标记:true 以后值:12

该类的标记更多的用于示意援用值是否已逻辑删除

@SneakyThrows
@Test
public void test4() {AtomicMarkableReference<Integer> markableReference = new AtomicMarkableReference<>(10,true);

    CountDownLatch countDownLatch = new CountDownLatch(2);

    System.out.println("初始标记:" + markableReference.isMarked());

    // 该线程将 10 设置为逻辑删除
    new Thread(() -> {boolean isSuccess = markableReference.attemptMark(10,false);
        System.out.println("设置标记为 flase:" + isSuccess + ",以后标记:"+ markableReference.isMarked());
        countDownLatch.countDown();}).start();

    // 该线程想要更新 10->11,但标记曾经变为 false,与预期不符
    new Thread(() -> {
        try {TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        boolean isSuccess = markableReference.compareAndSet(10, 11, true, false);
        System.out.println("设置值:" + isSuccess + "冀望标记:true"  + ",以后标记:"+ markableReference.isMarked());
        countDownLatch.countDown();}).start();
    countDownLatch.await();}
初始标记: true
设置标记为 flase: true,以后标记:false
设置值: false 冀望标记:true,以后标记:false

Java8 新增的原子操作类

LongAdder 因为 AtomicLong 通过 CAS 提供非阻塞的原子性操作,性能曾经很好,在高并发下大量线程竞争更新同一个原子量,但只有一个线程可能更新胜利,这就造成大量的 CPU 资源节约。
LongAdder 通过 让多个线程去竞争多个 Cell 资源 ,来解决,再很高的并发状况下, 线程操作的是 Cell 数组,并不是 base,在 cell 元素有余时进行 2 倍扩容,在高并发下性能高于 AtomicLong

LongAdder 的实在值是 base 的值与 Cell 数组外面所有 Cell 元素中 value 值的累加

LongAdder 示例

@Test
public void longAdderTest() {LongAdder longAdder = new LongAdder();
    longAdder.add(101);
    longAdder.add(102);
    //203
    System.out.println(longAdder.sumThenReset());
    //0
    System.out.println(longAdder.longValue());
}

LongAdder 和 AtomicLong 多线程累加性能测试

@Test
public void multiplyThreadLongAdderTest() throws InterruptedException {LongAdder longAdder = new LongAdder();
    AtomicLong atomicLong = new AtomicLong();

    AtomicLong time1 = new AtomicLong();
    AtomicLong time2 = new AtomicLong();

    int threadNum = 5;
    int cycleNums = 500000;
    CountDownLatch countDownLatch1 = new CountDownLatch(threadNum);
    for (int a = 0; a < threadNum; a++) {executor.execute(() -> {long start = System.nanoTime();
            for (int i = 0; i < cycleNums; i++) {longAdder.increment();
            }
            //System.out.println(longAdder.longValue());
            time1.addAndGet(System.nanoTime() - start);
            countDownLatch1.countDown();});
    }
    countDownLatch1.await();

    CountDownLatch countDownLatch2 = new CountDownLatch(threadNum);
    for (int a = 0; a < threadNum ; a++) {executor.execute(() -> {long start = System.nanoTime();
            for (int i = 0; i < cycleNums; i++) {atomicLong.incrementAndGet();
            }
            //System.out.println(atomicLong.longValue());
            time2.addAndGet(System.nanoTime() - start);
            countDownLatch2.countDown();});
    }
    countDownLatch2.await();

    System.out.println("data=" + longAdder.longValue() + "time1 =" + time1.longValue());
    System.out.println("data=" + atomicLong.longValue() + "time2 =" + time2.longValue());
}
data=2500000 LongAdder time1 = 112762041
data=2500000 AtomicLong time2 = 292448392

LongAccumulator 是 LongAdder 的加强,提供了一个函数式接口,能够自定义运算规定

@Test
public void LongAccumulatorTest() {LongAccumulator longAccumulator = new LongAccumulator((x,y) -> x * y, 2);
    //2 * 10
    longAccumulator.accumulate(10);
    assert  longAccumulator.get() == 20;
    // 重置为 2
    longAccumulator.reset();
    assert longAccumulator.get() == 2;}

如何实现一个生产者与消费者模型

一共 5 种办法

  1. 同步对象的 wait() / notify() 办法
  2. ReetrantLock Condition 的 await() / signal()办法
  3. BlockingQueue 阻塞队列 put() 和 take 办法
  4. Semaphore 基于计数的信号量
  5. PipedInputStream / PipedOutputStream 管道输入输出流
    https://blog.csdn.net/ldx1998…

说说 Random 与 ThreadLocalRandom

两者都可能产生随机数,并且都可能在多线程下应用

在多线程下应用单个 Random 实例生成随机数时候,多个线程同时计算随机数计算新的种子时候会竞争同一个原子变量的更新操作,因为原子变量的更新是 CAS 操作,同时只有一个线程会胜利,所以会造成大量线程进行自旋重试,这是会升高并发性能的

ThreadLocalRandom 解决了这个问题,其应用 ThreadLocal 的原理,让每个线程内持有一个本地的种子变量,该种子变量只有在应用随机数时候才会被初始化,多线程下计算新种子时候是依据本人线程内保护的种子变量进行更新,从而防止了竞争

https://blog.csdn.net/xiaolon…

https://www.jianshu.com/p/89d…

如何让一段程序并发的执行,并最终汇总后果

能够应用 CyclicBarrier,CountDownLatch,Callable,ForkJoinPool,CompletableFuture, 并行流(LongStream)

https://blog.csdn.net/m0_3754…

正文完
 0