1.运行中的线程是否强制杀死
Jdk提供了stop()办法用于强制进行线程,但官网并不倡议应用,因为强制进行线程会导致线程应用的资源,比方文件描述符、网络连接处于不失常的状态。倡议应用标记位的形式来终止线程,如果线程中有应用无限期的阻塞形式,比方wait()没有设置超时工夫,就只能应用interrupt()办法来终止线程
@SneakyThrows
@Test
public void stack() {
Thread1 thread1 = new Thread1();
thread1.start();
TimeUnit.MILLISECONDS.sleep(1);
thread1.setStop();
}
class Thread1 extends Thread{
private volatile boolean isStop = false;
@SneakyThrows
@Override
public void run() {
while (!isStop) {
System.out.println(Thread.currentThread().getName() + " run...");
}
}
public void setStop() {
isStop = true;
}
}
2.ThreadLocal 子类及原理, OOM产生起因及防治
原文:https://www.cnblogs.com/micra…
1)什么是ThreadLocal?
ThreadLocal类顾名思义能够了解为线程本地变量。也就是说如果定义了一个ThreadLocal,每个线程往这个ThreadLocal中读写是线程隔离,相互之间不会影响的。它提供了一种将可变数据通过每个线程有本人的独立正本从而实现线程关闭的机制。
2)它大抵的实现思路是怎么的?
Thread类有一个类型为ThreadLocal.ThreadLocalMap的实例变量threadLocals,也就是说每个线程有一个本人的ThreadLocalMap。ThreadLocalMap有本人的独立实现,能够简略地将它的key视作ThreadLocal,value为代码中放入的值(实际上key并不是ThreadLocal自身,而是它的一个弱援用
)。每个线程在往某个ThreadLocal里塞值的时候,都会往本人的ThreadLocalMap里存,读也是以某个ThreadLocal作为援用,在本人的map里找对应的key,从而实现了线程隔离。
3)ThreadLocal的API
- 构造函数ThreadLocal<T>()
- 初始化initialValue():返回此线程以后线程局部变量的初始值。
- 拜访器get/set
- 回收 remove
4) ThreadLocalMap的源码实现
ThreadLocalMap提供了一种为ThreadLocal定制的高效实现,并且自带一种基于弱援用的垃圾清理机制。
上面从根本构造开始一点点解读。
4.1 存储构造
既然是个map(留神不要与java.util.map一概而论,这里指的是概念上的map),当然得要有本人的key和value,下面答复的问题2中也曾经提及,咱们能够将其简略视作key为ThreadLocal,value为理论放入的值。之所以说是简略视作,因为实际上ThreadLocal中寄存的是ThreadLocal的弱援用。咱们来看看ThreadLocalMap里的节点是如何定义的。
static class Entry extends WeakReference<java.lang.ThreadLocal<?>> {
// 往ThreadLocal里理论塞入的值
Object value;
Entry(java.lang.ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
Entry便是ThreadLocalMap里定义的节点,它继承了WeakReference类,定义了一个类型为Object的value,用于寄存塞到ThreadLocal里的值。
4.2 为什么要弱援用
因为如果这里应用一般的key-value模式来定义存储构造,本质上就会造成节点的生命周期与线程强绑定,只有线程没有销毁,那么节点在GC剖析中始终处于可达状态,没方法被回收,而程序自身也无奈判断是否能够清理节点。弱援用是Java中四档援用的第三档,比软援用更加弱一些,如果一个对象没有强援用链可达,那么个别活不过下一次GC。当某个ThreadLocal曾经没有强援用可达,则随着它被垃圾回收,在ThreadLocalMap里对应的Entry的键值会生效,这为ThreadLocalMap自身的垃圾清理提供了便当。
4.3 类成员变量与相应办法
/**
* 初始容量,必须为2的幂
*/
private static final int INITIAL_CAPACITY = 16;
/**
* Entry表,大小必须为2的幂
*/
private Entry[] table;
/**
* 表里entry的个数
*/
private int size = 0;
/**
* 重新分配表大小的阈值,默认为0
*/
private int threshold;
能够看到,ThreadLocalMap保护了一个Entry表或者说Entry数组,并且要求表的大小必须为2的幂,同时记录表外面entry的个数以及下一次须要扩容的阈值。
显然这里会产生一个问题,为什么必须是2的幂?很好,然而目前还无法回答,带着问题接着往下读。
/**
* 设置resize阈值以维持最坏2/3的装载因子
*/
private void setThreshold(int len) {
threshold = len * 2 / 3;
}
/**
* 环形意义的下一个索引
*/
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
/**
* 环形意义的上一个索引
*/
private static int prevIndex(int i, int len) {
return ((i - 1 >= 0) ? i - 1 : len - 1);
}
ThreadLocal须要维持一个最坏2/3的负载因子,对于负载因子置信应该不会生疏,在HashMap中就有这个概念。
ThreadLocal有两个办法用于失去上一个/下一个索引,留神这里实际上是环形意义下的上一个与下一个。
因为ThreadLocalMap应用线性探测法来解决散列抵触,所以实际上Entry[]数组在程序逻辑上是作为一个环形存在的。
对于凋谢寻址、线性探测等内容,能够参考网上材料或者TAOCP(《计算机程序设计艺术》)第三卷的6.4章节。
至此,咱们曾经能够大抵勾画出ThreadLocalMap的外部存储构造。上面是我绘制的示意图。虚线示意弱援用,实线示意强援用。
ThreadLocalMap保护了Entry环形数组,数组中元素Entry的逻辑上的key为某个ThreadLocal对象(实际上是指向该ThreadLocal对象的弱援用),value为代码中该线程往该ThreadLoacl变量理论塞入的值。
5)ThreadLocal与内存透露
对于ThreadLocal是否会引起内存透露也是一个比拟有争议性的问题,其实就是要看对内存透露的精确定义是什么。
认为ThreadLocal会引起内存透露的说法是因为如果一个ThreadLocal对象被回收了,咱们往里面放的value对于【以后线程->以后线程的threadLocals(ThreadLocal.ThreadLocalMap对象)->Entry数组->某个entry.value】这样一条强援用链是可达的,因而value不会被回收。
认为ThreadLocal不会引起内存透露的说法是因为ThreadLocal.ThreadLocalMap源码实现中自带一套自我清理的机制。
之所以有对于内存泄露的探讨是因为在有线程复用如线程池的场景中,一个线程的寿命很长,大对象长期不被回收影响零碎运行效率与平安。如果线程不会复用,用完即销毁了也不会有ThreadLocal引发内存泄露的问题。《Effective Java》一书中的第6条对这种内存泄露称为unintentional object retention(有意识的对象保留)。
当咱们认真读过ThreadLocalMap的源码,咱们能够推断,如果在应用的ThreadLocal的过程中,显式地进行remove是个很好的编码习惯,这样是不会引起内存透露。
那么如果没有显式地进行remove呢?只能说如果对应线程之后调用ThreadLocal的get和set办法都有很高的概率会顺便清理掉有效对象,断开value强援用,从而大对象被收集器回收。
但无论如何,咱们应该思考到何时调用ThreadLocal的remove办法。一个比拟相熟的场景就是对于一个申请一个线程的server如tomcat,在代码中对web api作一个切面,寄存一些如用户名等用户信息,在连接点办法完结后,再显式调用remove。
OOM起因及防治
ThreadLocal只是一个工具类,具体寄存变量的是线程的threadLocals变量,threadLocals是一个ThreadLocalMap类型的变量,外部是一个Entry数组,Entry继承自WeakReference,Entry外部的value用来寄存通过ThreadLocal的set办法传递的值,key是ThreadLocal的弱援用,key尽管会被GC回收,但value不能被回收,这时候ThreadLocalMap中会存在key为null,value不为null的entry项,如果工夫长了就会存在大量无用对象,造成OOM。尽管set,get也提供了一些对Entry项清理的机会,但不及时,所以在应用结束后须要及时调用remove
6) InheritableThreadLocal原理
对于InheritableThreadLocal,本文不作过多介绍,只是简略略过。
ThreadLocal自身是线程隔离的,InheritableThreadLocal提供了一种父子线程之间的数据共享机制。
它的具体实现是在Thread类中除了threadLocals外还有一个inheritableThreadLocals对象。
在线程对象初始化的时候,会调用ThreadLocal的createInheritedMap从父线程的inheritableThreadLocals中把无效的entry都拷过去
须要留神的中央是InheritableThreadLocal只是在子线程创立的时候会去拷一份父线程的inheritableThreadLocals。如果父线程是在子线程创立后再set某个InheritableThreadLocal对象的值,对子线程是不可见的。
3.有哪些并发队列
- ConcurrentLinkedQueue : 无界非阻塞队列,底层应用单向链表实现,对于出队和入队应用CAS来实现线程平安
- LinkedBlockingQueue: 有界阻塞队列,应用单向链表实现,通过ReentrantLock实现线程平安,阻塞通过Condition实现,出队和入队各一把锁,不存在相互竞争
- ArrayBlockingQueue: 有界数组形式实现的阻塞队列 , 通过ReentrantLock实现线程平安,阻塞通过Condition实现,出队和入队应用同一把锁
- PriorityBlockingQueue: 带优先级的无界阻塞队列,外部应用均衡二叉树堆实现,遍历保障有序须要自定排序
- DelayQueue: 无界阻塞提早队列,队列中的每个元素都有个过期工夫,当从队列获取元素时,只有过期元素才会出队列,队列头元素是最快要过期的元素
- SynchronousQueue: 任何一个写须要期待一个读的操作,读操作也必须期待一个写操作,相当于数据交换 https://www.cnblogs.com/dwlsx…
- LinkedTransferQueue: 由链表组成的无界阻塞队列,多了tryTransfer 和 transfer办法。transfer办法,可能把生产者元素立即传输给消费者,如果没有消费者在期待,那就会放入队列的tail节点,并阻塞期待元素被生产了返回,能够应用带超时的办法。tryTransfer办法,会在没有消费者期待接管元素的时候马上返回false
- LinkedBlockingDeque: 由链表组成的双向阻塞队列,能够从队列的两端插入和移除元素
4.ThreadPoolExecutor构造函数有哪几个参数,实现原理,创立线程池的形式
-
结构参数:
- corePoolSize: 线程池外围线程个数
- maximunPoolSize: 线程池最大线程数量
- keeyAliveTime: 闲暇线程存活工夫
- TimeUnit: 存活工夫单位
- workQueue: 用于保留期待执行工作的阻塞队列
- ThreadFactory: 创立线程的工厂
- RejectedExecutionHandler: 队列满,并且线程达到最大线程数量的时候,对新工作的解决策略,AbortPolicy(抛出异样)、CallerRunsPolicy(应用调用者所在线程执行)、DiscardOldestPolicy(调用poll抛弃一个工作,执行当前任务)、DiscardPolicy(默默抛弃、不抛异样)
-
原理:
线程池次要是解决两个问题:一个是当执行大量异步工作时可能提供较好的性能,能复用线程解决工作;
二是可能对线程池进行资源限度和治理。
-
一个工作提交的线程池,首先会判断外围线程池是否已满,未满就会创立worker线程执行工作,已满判断阻塞队列是否已满,阻塞队列未满退出阻塞队列,已满就判断线程池线程数量是否曾经达到最大值,没有就新建线程执行工作,达到最大值的话执行回绝策略。
- 回绝策略有:间接抛出异样、应用调用者所在线程执行、抛弃一个旧工作,执行当前任务、间接抛弃什么都不做。
- 创立线程池的形式:间接new ThreadPoolExecutor 或者通过Executors工具类创立
5.Executors 能够创立的线程池类型
- newFixedThreadPool 创立一个外围线程数跟最大线程数雷同的线程池,因而池中的线程数量既不会减少也不会变少,如果有闲暇线程工作就会被执行,如果没有就放入工作队列,期待闲暇线程
- newSingleThreadExecutor 创立一个只有一个线程的线程池,可能串行执行工作,如果线程因为异样而进行,会主动新建一个线程补充
- newCachedThreadPool 创立一个外围线程数为0,最大线程为Inter.MAX_VALUE的线程池,也就是说没有限度,线程池中的线程数量不确定,但如果有闲暇线程能够复用,则优先应用,如果没有闲暇线程,则创立新线程解决工作,解决完放入线程池
- newSingleThreadScheduledExecutor 创立只有一个线程的能够定时执行的线程池
- newScheduledThreadPool 创立一个没有最大线程数限度的能够定时执行线程池
- newWorkStealingPool 创立一个含有足够多线程的线程池,可能调用闲置的CPU去解决其余的工作,应用ForkJoinPool实现,jdk8新增
6.线程池的阻塞队列为什么都用LinkedBlockingQueue,而不必ArrayBlockingQueue
LinkedBlockingQueue 应用单向链表实现,在申明LinkedBlockingQueue的时候,能够不指定队列长度,长度为Integer.MAX_VALUE, 并且新建了一个Node对象,Node对象具备item,next变量,item用于存储元素,next指向链表下一个Node对象,在刚开始的时候链表的head,last都指向该Node对象,item、next都为null,新元素放在链表的尾部,并从头部取元素。取元素的时候只是一些指针的变动,LinkedBlockingQueue给put(放入元素),take(取元素)都申明了一把锁,放入和取互不影响,效率更高
ArrayBlockingQueue 应用数组实现,在申明的时候必须指定长度,如果长度太大,造成内存节约,长度太小,并发性能不高,如果数组满了,就无奈放入元素,除非有其余线程取出元素,放入和取出都应用同一把锁,因而存在竞争,效率比LinkedBlockingQueue低
7.为什么倡议在不必线程池的时候,敞开线程池
线程池的作用的确是为了缩小频繁创立线程,使线程达到复用。但如果在不必线程池的状况下,线程池中的外围线程会始终存在,浪费资源,所以倡议在不必的状况下调用shutdown办法敞开线程池。在须要的时候再调用创立线程池。
8.如何正当的配置Java线程池
如CPU密集型的工作,根本线程池应该配置多大?IO密集型的工作,根本线程池应该配置多大?用有界队列好还是无界队列好?工作十分多的时候,应用什么阻塞队列能获取最好的吞吐量?
CPU密集型,为了充沛应用CPU,缩小上下文切换,线程数配置成CPU个数+1个即可
IO密集型,因为可能大部分线程在解决IO,IO都比拟耗时,因而能够配置成 2*CPU个数的线程,去解决其余工作
9.Timer 和 ScheduledThreadPoolExecutor 区别
Timer是固定的多线程生产者单线程生产,如果其中一个工作报错,其余工作也会生效;
但后者是能够配置的,既能够是多线程生产单线程生产也能够是多线程生产多线程生产
10.说说CopyOnWriteArrayList
CopyOnWriteArrayList是一个线程平安的ArrayList,对其的批改操作是在一个复制的数组上进行的,不影响其余线程的读操作
其中通过ReentrantLock独占锁保障只有一个线程对底层数组进行批改
因为在进行批改操作的时候,底层会复制一个新的数组,而读是在原数组上进行的,因而在多线程环境下这里会产生数据不统一的状况,称为弱一致性
实用于多读少写场景
public class CopyOnWriteArrayListTest {
//模仿测试CopyOnWriteArrayList 的弱一致性
@Test
public void ListrayTest() throws InterruptedException {
AryTest aryTest = new AryTest();
StrClass strClass = new StrClass(aryTest.str1);
String[] str2 = (String[]) strClass.getObjects();
Thread thread = new Thread(() -> {
aryTest.add();
System.out.println(Arrays.toString(aryTest.str1));
});
thread.start();
thread.join();
System.out.println("str2=" + Arrays.toString(str2));
}
static class AryTest {
String[] str1 = new String[]{"a", "b"};
public void add() {
String[] str2 = Arrays.copyOf(str1, str1.length + 1);
str2[str2.length - 1] = "c";
str1 = str2;
}
}
static class StrClass {
final Object[] objects;
public StrClass(Object [] objects) {
this.objects = objects;
}
public Object[] getObjects() {
return objects;
}
}
//copyOnWriteArrayList测试
@Test
public void copyOnWriteArrayList() throws InterruptedException {
String[] str1 = new String[]{"a", "b"};
List<String> copyOnWriteArrayList = new CopyOnWriteArrayList<>(str1);
Thread thread = new Thread(() -> {
copyOnWriteArrayList.add("c");
System.out.println(copyOnWriteArrayList);
});
thread.start();
System.out.println(copyOnWriteArrayList);
thread.join();
}
}
[a, b]
[a, b, c]
11.CountDownLatch原理、CyclicBarrier原理,两者区别
CountDownLatch:
应用AQS实现,通过AQS的状态变量state来作为计数器值,当多个线程调用countdown办法时理论是子性递加AQS的状态值,当线程调用await办法后以后线程会被放入AQS阻塞队列期待计数器为0再返回
CyclicBarrier:
区别:CountDownLatch计数器是一次性的,变为0后就起不到线程同步的作用了。而CyclicBarrier(撒克里克巴瑞儿)在计数器变为0后从新开始,通过调用await办法,能在所有线程达到屏障点后对立执行某个工作,再执行完后继续执行子线程,通过ReentrantLock实现
12.Phaser 的实现
Phaser能够代替CountDownLatch 和CyclicBarrier,但比两者更加弱小,能够动静调整须要的线程个数,能够通过构造函数传入父Phaser实现档次Phaser
public class PhaserTest {
Phaser phaser = new Phaser(2);
ExecutorService executor = Executors.newFixedThreadPool(2);
@Test
public void test1() {
executor.submit(() -> {
System.out.println(Thread.currentThread().getName() + " step1");
//等同 countDown()
phaser.arrive();
});
executor.submit(() -> {
System.out.println(Thread.currentThread().getName() + " step2");
phaser.arrive(d);
});
//等同await()
phaser.awaitAdvance(phaser.getPhase());
System.out.println("thread end");
}
}
13.Semaphore 原理
Semaphore 能够用来管制同时拜访特定资源的线程数量,它通过协调各个线程,以保障正当的应用公共资源,应用AQS实现,AQS的状态变量state做为许可证数量,每次通过acquire()/tryAcquire(),许可证数量通过CAS原子性递加,调用release()开释许可证,原子性递增,只有有许可证就能够重复使用
@Test
public void semaphoreTest() throws InterruptedException {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 5; i++) {
executor.execute(() -> {
try {
semaphore.acquire();
System.out.println("输入");
//semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
TimeUnit.SECONDS.sleep(3);
System.out.println("开释许可证===");
semaphore.release(2);
}
输入
输入
输入
开释许可证===
输入
输入
14.Exchanger 原理
用于进行线程间的数据交换,它提供一个同步点,在这个同步点两个线程能够替换彼此的数据。如果第一个线程先执行exchange()办法,它会始终期待第二个线程也执行exchange办法,当都达到同步点时,这两个线程能够替换数据。
@Test
public void exchangerTest() throws InterruptedException {
Exchanger<Integer> exchanger = new Exchanger<> ();
Thread thread = new Thread(() -> {
Integer a = 1;
try {
Integer b = exchanger.exchange(a);
System.out.println("Thread1:" + b);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread.start();
Integer c = exchanger.exchange(2);
Thread.sleep(3000);
System.out.println("Thread2:" + c);
}
Thread1:2
Thread2:1
15.### volatile的特点
- 批改可见性,通过volatile润饰的变量(包含对象和数组),线程对变量的批改对另一个线程即时可见,因而另外一个线程在应用前会去判断该变量是否有批改;
- 禁止指令重排序
long 和double的读写操作分为两次32位操作进行,其余根本数据类型的读写操作都是原子性的
实现:退出volatile关键字的代码生成汇编代码,会发现多了一个lock前缀指令,这个指令相当于一个内存屏障,能够避免重排序,通过空的写入操作将变量的批改写入到主内存中,这就实现了可见性
volatile润饰对象剖析可见:https://www.jianshu.com/p/8f2…
什么时候应用volatile?
- 写入变量值不依赖变量的以后值。因为如果依赖以后值,将是获取-计算-写入三步操作,这三步操作不是原子性的,而volatile不保障原子性
-
读写变量值时没有加锁。因为加锁曾经保障了内存可见性,没必要再应用volatile
volatile不能保障原子性
public class VolatileTest {
public volatile int inc = 0;
public void increase() {
inc++;
}
public static void main(String[] args) {
final VolatileTest test = new VolatileTest();
for(int i = 0; i < 10; i++) {
new Thread(() -> {
for(int j = 0; j < 1000; j++)
test.increase();
}).start();
}
//保障后面的线程都执行完
while (Thread.activeCount() > 1)
Thread.yield();
System.out.println(test.inc);
}
}
//输入:<=10000
伪共享
在CPU和主内存之间增加一级或者多级高速缓冲存储器(Cache),这个缓存被集成到CPU外部,在Cache外部是按行存储的,每一行称为一个Cache行,大小个别为2的幂次数字节,当拜访某个变量时,首先会去看CPU Cache内是否有该变量,如果有则间接从中获取,否则就去主内存外面获取该变量,而后把该变量所在的内存区域的一个Cache行的内存复制到Cache中。因为寄存到Cache行的是内存块而不是单个变量,所以可能会把多个变量寄存到一个Cache行中。依据缓存一致性协定,CPU在批改缓存行中的变量时,同一缓存行的数据将生效,这时候其余CPU须要从二级缓存或者主存中加载数据,这就导致了性能问题,称为伪共享
伪共享解决:字节填充;应用sun.misc.Contended注解
@Contended注解只用于Java外围类,如果用户类门路下的类要应用这个注解,须要增加JVM参数:-XX:-RestrictContended。默认填充宽度为128,须要自定义宽度设置 -XX:ContendedPaddingWidth参数
CPU缓存行具体阐明:https://mp.weixin.qq.com/s/yo…
原子操作类
AtomicBoolean
布尔类型的原子操作类,外部应用int型存储布尔值,0示意false,1示意true
AtomicInteger
整型的原子操作类,1.8后提供函数式操作的办法
-
int getAndUpdate(IntUnaryOperator updateFunction)
应用指定函数计算并更新,
返回计算前后果
-
int updateAndGet(IntUnaryOperator updateFunction)
应用指定函数计算并更新,
返回计算后的后果
-
int getAndAccumulate(int x,IntBinaryOperator accumulatorFunction)
应用指定的函数计算x值和以后值,
返回计算前后果
-
int accumulateAndGet(int x,IntBinaryOperator accumulatorFunction)
应用指定的函数计算x值和以后值,
返回结算后的后果
@Test
public void atomicIntegerTest() {
AtomicInteger atomicInteger = new AtomicInteger(1);
int result = atomicInteger.getAndUpdate(e -> e + 3);
////返回计算前后果
assert result == 1;
//1 + 3
assert atomicInteger.get() == 4;
result = atomicInteger.updateAndGet(e -> e + 3);
//返回计算后后果
assert result == 7;
// 4 + 3
assert atomicInteger.get() == 7;
result = atomicInteger.getAndAccumulate(10, (x, y) -> x + y);
//返回计算前后果
assert result == 7;
// 7 + 10
assert atomicInteger.get() == 17;
result = atomicInteger.accumulateAndGet(10, (x, y) -> x + y);
//返回计算后的后果
assert result == 27;
assert atomicInteger.get() == 27;
}
AtomicIntegerArray
提供了原子性更新整型数组元素的形式
-
int getAndUpdate(int i, IntUnaryOperator updateFunction)
应用指定函数计算i索引的值,
返回计算前后果
-
int updateAndGet(int i, IntUnaryOperator updateFunction)
应用指定函数计算i索引的值,
返回计算后后果
-
int getAndAccumulate(int i, int x, IntBinaryOperator accumulatorFunction)
应用指定的函数计算x值和i索引的值,
返回计算前后果
- int accumulateAndGet(int i, int x, IntBinaryOperator accumulatorFunction)
- 应用指定的函数计算x值和i索引的值,
返回计算后后果
@Test
public void atomicIntegerArrayTest() {
AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(10);
int result = atomicIntegerArray.getAndUpdate(0, e -> e + 1);
assert result == 0;
// 0 + 1
assert atomicIntegerArray.get(0) == 1;
result = atomicIntegerArray.updateAndGet(0, e -> e + 1);
assert result == 2;
//1 + 1
assert atomicIntegerArray.get(0) == 2;
result = atomicIntegerArray.getAndAccumulate(0, 3, (x, y) -> x + y);
assert result == 2;
// 2 + 3
assert atomicIntegerArray.get(0) == 5;
}
AtomicIntegerFieldUpdater
通过反射实现,能够对指定类的指定字段(被volatile润饰)的int型字段进行原子更新,更新字段类型必须为 volatile int
源码剖析 > AtomicIntegerFieldUpdater源码剖析
@Test
public void fieldUpdaterTest() {
//第一个参数 持有给定字段的指标对象类 第二个参数 要更新的字段名称,必须在给定的指标对象中
AtomicIntegerFieldUpdater fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Person.class, "count");
Person person = new Person("小明", "男");
fieldUpdater.addAndGet(person, 10);
//10
System.out.println(fieldUpdater.get(person));
//Person{name='小明', sex='男', count=10}
System.out.println(person);
}
private static class Person{
String name;
String sex;
volatile int count;
public Person(String name, String sex) {
this.name = name;
this.sex = sex;
}
@Override
public String toString() {
return "Person{" +
"name='" + name + '\'' +
", sex='" + sex + '\'' +
", count=" + count +
'}';
}
}
AtomicLong
long型原子操作类
AtomicLongArray
提供了原子性更新long型数组元素的形式
AtomicLongFieldUpdater
通过反射实现,能够对指定类的指定字段(被volatile润饰)的long型字段进行原子更新,更新字段类型必须为 volatile long
AtomicMarkableReference
通过布尔类型做为标记,原子更新援用变量
//结构参数,提供了泛型,而不是像AtomicInteger,只能是int型
AtomicStampedReference(V initialRef, int initialStamp)
AtomicReference
同样提供了泛型变量,原子性更新变量,但没有标识,会产生ABA问题
AtomicReferenceArray
提供原子性更新泛型类数组元素的形式,外部应用Object[]数组保留援用变量
AtomicReferenceFieldUpdater
援用类型的,能够对指定类的指定字段(被volatile批改,根本类型)进行原子更新
AtomicStampedReference
通过保护一个版本号,原子更新援用变量,解决了ABA问题
DoubleAccumulator
double类型的原子更新类,提供自定义函数接口,通过不同的Cell资源,缩小了竞争
DoubleAdder
double类型的原子更新类,只提供累计操作
LongAccumulator
LongAdder
ABA问题
CAS操作中会ABA问题,指线程1应用CAS批改初始值为A的变量X的时候,须要先获取变量X的值,但这时候线程2曾经将变量X的值批改为了B,而后又批改成了A,尽管字面值还是A,但这个A曾经是批改后的A了,对于线程1则还是认为是原来的A,而持续批改变量X的值,这就是ABA问题。
因为CAS须要在操作值的时候查看下值有没有发生变化,如果没有发生变化则更新,然而如果一个值原来是A,变成了B,又变成了A,那么应用CAS进行查看时会发现它的值没有发生变化,然而实际上却变动了。ABA问题的解决思路就是应用版本号。在变量后面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A-2B-3A。
解决:JDK中提供了AtomicStampedReference类解决了该问题,其给每个变量的状态值都提供了一个版本号
ABA问题
@SneakyThrows
@Test
public void test1() {
AtomicInteger atomicInteger = new AtomicInteger(10);
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(() -> {
atomicInteger.compareAndSet(10, 11);
atomicInteger.compareAndSet(11,10);
System.out.println(Thread.currentThread().getName() + ":10->11->10");
countDownLatch.countDown();
}).start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
boolean isSuccess = atomicInteger.compareAndSet(10,12);
System.out.println("设置是否胜利:" + isSuccess + ",设置的新值:" + atomicInteger.get());
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown();
}).start();
countDownLatch.await();
}
//输入:线程2并没有发现初始值曾经被批改
//Thread-0:10->11->10
//设置是否胜利:true,设置的新值:12
AtomicStampedReference解决ABA问题,通过保护一个版本号
@SneakyThrows
@Test
public void test2() {
AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference(10,1);
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 第一次版本:" + atomicStampedReference.getStamp());
atomicStampedReference.compareAndSet(10, 11, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + " 第二次版本:" + atomicStampedReference.getStamp());
atomicStampedReference.compareAndSet(11, 10, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + " 第三次版本:" + atomicStampedReference.getStamp());
countDownLatch.countDown();
}).start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 第一次版本:" + atomicStampedReference.getStamp());
try {
TimeUnit.SECONDS.sleep(2);
boolean isSuccess = atomicStampedReference.compareAndSet(10,12, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + " 批改是否胜利:" + isSuccess + " 以后版本:" + atomicStampedReference.getStamp() + " 以后值:" + atomicStampedReference.getReference());
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
countDownLatch.await();
}
//输入
Thread-0 第一次版本:1
Thread-0 第二次版本:2
Thread-0 第三次版本:3
Thread-1 第一次版本:3
Thread-1 批改是否胜利:true 以后版本:4 以后值:12
AtomicMarkableReference 通过标记位,因为其标记位只有true和false,如果每次更新都变更标记位,在第三次的时候标记位还是跟第一次一样,并没有解决ABA问题
@SneakyThrows
@Test
public void test3() {
AtomicMarkableReference<Integer> markableReference = new AtomicMarkableReference<>(10, false);
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 第一次标记:" + markableReference.isMarked());
markableReference.compareAndSet(10, 11, markableReference.isMarked(), true);
System.out.println(Thread.currentThread().getName() + " 第二次标记:" + markableReference.isMarked());
markableReference.compareAndSet(11, 10, markableReference.isMarked(), false);
System.out.println(Thread.currentThread().getName() + " 第三次标记:" + markableReference.isMarked());
countDownLatch.countDown();
}).start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 第一次标记:" + markableReference.isMarked());
try {
TimeUnit.SECONDS.sleep(2);
boolean isSuccess = markableReference.compareAndSet(10,12, false, true);
System.out.println(Thread.currentThread().getName() + " 批改是否胜利:" + isSuccess + " 以后标记:" + markableReference.isMarked() + " 以后值:" + markableReference.getReference());
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
countDownLatch.await();
}
Thread-0 第一次标记:false
Thread-0 第二次标记:true
Thread-0 第三次标记:false
Thread-1 第一次标记:false
Thread-1 批改是否胜利:true 以后标记:true 以后值:12
该类的标记更多的用于示意援用值是否已逻辑删除
@SneakyThrows
@Test
public void test4() {
AtomicMarkableReference<Integer> markableReference = new AtomicMarkableReference<>(10,true);
CountDownLatch countDownLatch = new CountDownLatch(2);
System.out.println("初始标记: " + markableReference.isMarked());
//该线程将10设置为逻辑删除
new Thread(() -> {
boolean isSuccess = markableReference.attemptMark(10,false);
System.out.println("设置标记为flase: " + isSuccess + ",以后标记:"+ markableReference.isMarked());
countDownLatch.countDown();
}).start();
//该线程想要更新10->11,但标记曾经变为false,与预期不符
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean isSuccess = markableReference.compareAndSet(10, 11, true, false);
System.out.println("设置值: " + isSuccess + "冀望标记:true" + ",以后标记:"+ markableReference.isMarked());
countDownLatch.countDown();
}).start();
countDownLatch.await();
}
初始标记: true
设置标记为flase: true,以后标记:false
设置值: false冀望标记:true,以后标记:false
Java8新增的原子操作类
LongAdder 因为AtomicLong通过CAS提供非阻塞的原子性操作,性能曾经很好,在高并发下大量线程竞争更新同一个原子量,但只有一个线程可能更新胜利,这就造成大量的CPU资源节约。
LongAdder 通过让多个线程去竞争多个Cell资源,来解决,再很高的并发状况下,线程操作的是Cell数组,并不是base,在cell元素有余时进行2倍扩容,在高并发下性能高于AtomicLong
LongAdder的实在值是base的值与Cell数组外面所有Cell元素中value值的累加
LongAdder示例
@Test
public void longAdderTest() {
LongAdder longAdder = new LongAdder();
longAdder.add(101);
longAdder.add(102);
//203
System.out.println(longAdder.sumThenReset());
//0
System.out.println(longAdder.longValue());
}
LongAdder和AtomicLong多线程累加性能测试
@Test
public void multiplyThreadLongAdderTest() throws InterruptedException {
LongAdder longAdder = new LongAdder();
AtomicLong atomicLong = new AtomicLong();
AtomicLong time1 = new AtomicLong();
AtomicLong time2 = new AtomicLong();
int threadNum = 5;
int cycleNums = 500000;
CountDownLatch countDownLatch1 = new CountDownLatch(threadNum);
for (int a = 0; a < threadNum; a++) {
executor.execute(() -> {
long start = System.nanoTime();
for (int i = 0; i < cycleNums; i++) {
longAdder.increment();
}
//System.out.println(longAdder.longValue());
time1.addAndGet(System.nanoTime() - start);
countDownLatch1.countDown();
});
}
countDownLatch1.await();
CountDownLatch countDownLatch2 = new CountDownLatch(threadNum);
for (int a = 0; a < threadNum ; a++) {
executor.execute(() -> {
long start = System.nanoTime();
for (int i = 0; i < cycleNums; i++) {
atomicLong.incrementAndGet();
}
//System.out.println(atomicLong.longValue());
time2.addAndGet(System.nanoTime() - start);
countDownLatch2.countDown();
});
}
countDownLatch2.await();
System.out.println("data=" + longAdder.longValue() + " time1 = " + time1.longValue());
System.out.println("data=" + atomicLong.longValue() + " time2 = " + time2.longValue());
}
data=2500000 LongAdder time1 = 112762041
data=2500000 AtomicLong time2 = 292448392
LongAccumulator 是LongAdder的加强,提供了一个函数式接口,能够自定义运算规定
@Test
public void LongAccumulatorTest() {
LongAccumulator longAccumulator = new LongAccumulator((x,y) -> x * y, 2);
//2 * 10
longAccumulator.accumulate(10);
assert longAccumulator.get() == 20;
//重置为2
longAccumulator.reset();
assert longAccumulator.get() == 2;
}
如何实现一个生产者与消费者模型
一共5种办法
- 同步对象的 wait() / notify() 办法
- ReetrantLock Condition 的 await() / signal()办法
- BlockingQueue阻塞队列 put() 和take办法
- Semaphore 基于计数的信号量
- PipedInputStream / PipedOutputStream 管道输入输出流
https://blog.csdn.net/ldx1998…
说说Random 与 ThreadLocalRandom
两者都可能产生随机数,并且都可能在多线程下应用
在多线程下应用单个Random实例生成随机数时候,多个线程同时计算随机数计算新的种子时候会竞争同一个原子变量的更新操作,因为原子变量的更新是CAS操作,同时只有一个线程会胜利,所以会造成大量线程进行自旋重试,这是会升高并发性能的
ThreadLocalRandom解决了这个问题,其应用ThreadLocal的原理,让每个线程内持有一个本地的种子变量,该种子变量只有在应用随机数时候才会被初始化,多线程下计算新种子时候是依据本人线程内保护的种子变量进行更新,从而防止了竞争
https://blog.csdn.net/xiaolon…
https://www.jianshu.com/p/89d…
如何让一段程序并发的执行,并最终汇总后果
能够应用CyclicBarrier,CountDownLatch,Callable,ForkJoinPool,CompletableFuture,并行流(LongStream)
https://blog.csdn.net/m0_3754…
发表回复