1. 多线程 J.U.C
1.1 线程池
1.1.1 线程回顾
1)回顾线程创立形式
- 继承 Thread
- 实现 Runnable
2)线程的状态
-
NEW:刚刚创立,没做任何操作
Thread thread = new Thread(); System.out.println(thread.getState());
-
RUNNABLE:调用 run,能够执行,但不代表肯定在执行(RUNNING,READY)
thread.start(); System.out.println(thread.getState());
-
BLOCKED:抢不到锁
final byte[] lock = new byte[0]; new Thread(new Runnable() {public void run() {synchronized (lock){ try {Thread.sleep(3000); } catch (InterruptedException e) {e.printStackTrace(); } } } }).start(); Thread thread2 = new Thread(new Runnable() {public void run() {synchronized (lock){}} }); thread2.start(); Thread.sleep(1000); System.out.println(thread2.getState());
-
WAITING
Thread thread2 = new Thread(new Runnable() {public void run() {LockSupport.park(); } }); thread2.start(); Thread.sleep(500); System.out.println(thread2.getState()); LockSupport.unpark(thread2); Thread.sleep(500); System.out.println(thread2.getState());
-
TIMED_WAITING
Thread thread3 = new Thread(new Runnable() {public void run() { try {Thread.sleep(10000); } catch (InterruptedException e) {e.printStackTrace(); } } }); thread3.start(); Thread.sleep(500); System.out.println(thread3.getState());
-
TERMINATED
// 期待 1s 后再来看 Thread.sleep(1000); System.out.println(thread.getState());
3)线程池
依据下面的状态,一般线程执行完,就会进入 TERMINATED 销毁掉,而线程池就是创立一个缓冲池寄存线程,执行完结当前,该线程并不会死亡,而是再次返回线程池中成为闲暇状态,等待下次工作降临,这使得线程池比手动创立线程有着更多的劣势:
- 升高系统资源耗费,通过重用已存在的线程,升高线程创立和销毁造成的耗费;
- 进步零碎响应速度,当有工作达到时,通过复用已存在的线程,无需期待新线程的创立便能立刻执行;
- 不便线程并发数的管控。因为线程若是无限度的创立,可能会导致内存占用过多而产生 OOM
- 节俭 cpu 切换线程的工夫老本(须要放弃以后执行线程的现场,并复原要执行线程的现场)。
- 提供更弱小的性能,延时定时线程池。(Timer vs ScheduledThreadPoolExecutor)
4)线程池体系(查看:ScheduledThreadPoolExecutor,ForkJoinPool 类图)
阐明:
- 最罕用的是 ThreadPoolExecutor
- 调度用 ScheduledThreadPoolExecutor
- 工作拆分合并用 ForkJoinPool
- Executors 是工具类,帮助你创立线程池的
1.1.2 工作机制
1)线程池状态
- RUNNING:初始化状态是 RUNNING。线程池被一旦被创立,就处于 RUNNING 状态,并且线程池中的工作数为 0。RUNNING 状态下,可能接管新工作,以及对已增加的工作进行解决。
-
SHUTDOWN:SHUTDOWN 状态时,不接管新工作,但能解决已增加的工作。调用线程池的 shutdown()接口时,线程池由 RUNNING -> SHUTDOWN。
//shutdown 后不承受新工作,然而 task1,依然能够执行实现 ExecutorService poolExecutor = Executors.newFixedThreadPool(5); poolExecutor.execute(new Runnable() {public void run() { try {Thread.sleep(1000); System.out.println("finish task 1"); } catch (InterruptedException e) {e.printStackTrace(); } } }); poolExecutor.shutdown(); poolExecutor.execute(new Runnable() {public void run() { try {Thread.sleep(1000); } catch (InterruptedException e) {e.printStackTrace(); } } }); System.out.println("ok");
-
STOP:不接管新工作,不解决已增加的工作,并且会中断正在解决的工作。调用线程池的 shutdownNow()接口时,线程池由(RUNNING 或 SHUTDOWN) -> STOP
留神:容易引发不可预知的后果!运行中的工作兴许还会打印,直到完结,因为调的是 Thread.interrupt
// 改为 shutdownNow 后,工作立马终止,sleep 被打断,新工作无奈提交,task1 进行 poolExecutor.shutdownNow();
-
TIDYING:所有的工作已终止,队列中的”工作数量”为 0,线程池会变为 TIDYING。线程池变为 TIDYING 状态时,会执行钩子函数 terminated(),能够通过重载 terminated()函数来实现自定义行为
// 自定义类,重写 terminated 办法 public class MyExecutorService extends ThreadPoolExecutor {public MyExecutorService(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected void terminated() {super.terminated(); System.out.println("terminated"); } // 调用 shutdownNow,ternimated 办法被调用打印 public static void main(String[] args) throws InterruptedException {MyExecutorService service = new MyExecutorService(1,2,10000,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(5)); service.shutdownNow();} }
- TERMINATED:线程池处在 TIDYING 状态时,执行完 terminated()之后,就会由 TIDYING -> TERMINATED
2)构造阐明
在线程池的编程模式下,工作是提交给整个线程池,而不是间接提交给某个线程,线程池在拿到工作后,就在外部协调闲暇的线程,如果有,则将工作交给某个闲暇的线程。一个线程同时只能执行一个工作,但能够同时向一个线程池提交多个工作。
(源码查看:两个汇合,一个 queue,一个 hashset)
3)工作的提交
- 增加工作,如果线程池中线程数没达到 coreSize,间接创立新线程执行
- 达到 core,放入 queue
- queue 已满,未达到 maxSize 持续创立线程
- 达到 maxSize,依据 reject 策略解决
- 超时后,线程被开释,降落到 coreSize
1.1.3 源码分析
// 工作提交阶段:(4 个 if 条件路线)public void execute(Runnable command) {if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 判断工作数,如果小于 coreSize,addWork,留神第二个参数 core=true
if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))
return;
c = ctl.get();}
// 否则,如果线程池还在运行,offer 到队列
if (isRunning(c) && workQueue.offer(command)) {
// 再检查一下状态
int recheck = ctl.get();
// 如果线程池曾经终止,间接移除工作,不再响应
if (! isRunning(recheck) && remove(command))
reject(command);
// 否则,如果没有可用线程的话(比方 coreSize=0),创立一个空 work
// 该 work 创立时不会给指派工作(为 null),然而会被放入 works 汇合,进而从队列获取工作去执行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 队列也满,持续调 addWork,然而留神,core=false,开启到 maxSize 的大门
// 超出 max 的话,addWork 会返回 false,进入 reject
else if (!addWorker(command, false))
reject(command);
}
// 线程创立
private boolean addWorker(Runnable firstTask, boolean core) {
// 第一步,计数判断,不符合条件打回 false
retry:
for (;;) {int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
for (;;) {int wc = workerCountOf(c);
// 判断线程数,留神这里!// 也就阐明线程池的线程数是不可能设置任意大的。// 最大 29 位(CAPACITY=29 位二进制)// 超出规定范畴,返回 false,示意不容许再开启新工作线程,创立 worker 失败!if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 第二步,创立新 work 放入线程汇合 works(一个 HashSet)boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 符合条件,创立新的 work 并包装 task
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 加锁,workers 是一个 hashset,这里要保障线程安全性
mainLock.lock();
try {
//...
// 在这里!!!workers.add(w);
//...
workerAdded = true;
} finally {mainLock.unlock();
}
if (workerAdded) {
// 留神,只有是胜利 add 了新的 work,那么将该新 work 立刻启动,工作失去执行
t.start();
workerStarted = true;
}
}
} finally {if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
// 工作获取与执行
// 在 worker 执行 runWorker()的时候,不停循环,先查看本人有没有携带 Task,如果有,执行
while (task != null || (task = getTask()) != null)
// 如果没用,会调用 getTask,从队列获取工作
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?
for (;;) {int c = ctl.get();
int rs = runStateOf(c);
// ...
int wc = workerCountOf(c);
// Are workers subject to culling? - 很形象,要不要乖乖的被“捕杀”?// 判断是不是要超时解决,重点!!!决定了以后线程要不要被开释
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 线程数超出 max,并且上次循环中 poll 期待超时了,那么阐明该线程已终止
// 将线程队列数量原子性减
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 计数器做原子递加,递加胜利后,返回 null,for 被停止
if (compareAndDecrementWorkerCount(c))
return null;
// 递加失败,持续下一轮循环,直到胜利
continue;
}
try {
// 重点!!!// 如果线程可被开释,那就 poll,开释的工夫为:keepAliveTime
// 否则,线程是不会被开释的,take 始终被阻塞在这里,直到来了新工作持续工作
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 到这里阐明可被开释的线程期待超时,曾经销毁,设置该标记,下次循环将线程数缩小
timedOut = true;
} catch (InterruptedException retry) {timedOut = false;}
}
}
1.1.4 Executors
以上构造函数比拟多,为了方便使用,juc 提供了一个 Executors 工具类,外部提供静态方法
1)newCachedThreadPool():弹性线程数
2)newFixedThreadPool(int nThreads):固定线程数
3)newSingleThreadExecutor() : 繁多线程数
4)newScheduledThreadPool(int corePoolSize):可调度,罕用于定时
1.1.5 经典面试
1)线程池是如何保障线程不被销毁的呢?
答案:如果队列中没有工作时,外围线程会始终阻塞在获取工作的办法,直到返回工作。而工作执行完后,又会进入下一轮 work.runWork()中循环
验证:机密就藏在外围源码里 ThreadPoolExecutor.getTask()
//work.runWork():
while (task != null || (task = getTask()) != null)
//work.getTask():
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
2)那么线程池中的线程会处于什么状态?
答案:RUNNABLE,WAITING
验证:起一个线程池,搁置一个工作 sleep,debug 查看完结前后的状态
//debug add watcher:
((ThreadPoolExecutor) poolExecutor).workers.iterator().next().thread.getState()
ThreadPoolExecutor poolExecutor = Executors.newFixedThreadPool(1);
poolExecutor.execute(new Runnable() {public void run() {
try {Thread.sleep(5000);
} catch (InterruptedException e) {e.printStackTrace();
}
}
});
System.out.println("ok");
3)外围线程与非核心线程有区别吗?
答案:没有。被销毁的线程和创立的先后无关。即使是第一个被创立的外围线程,依然有可能被销毁
验证:看源码,每个 work 在 runWork()的时候去 getTask(),在 getTask 外部,并没有针对性的辨别以后 work 是否是外围线程或者相似的标记。只有判断 works 数量超出 core,就会调用 poll(),否则 take()
1.2 Fork/Join
1.2.1 概念
ForkJoin 是由 JDK1.7 后提供多线并发解决框架。能够了解为一种非凡的线程池。
1. 工作宰割:Fork(分岔),先把大的工作宰割成足够小的子工作,如果子工作比拟大的话还要对子工作进行持续宰割。
2. 合并后果:join,宰割后的子工作被多个线程执行后,再合并后果,失去最终的残缺输入。
1.2.2 组成
- ForkJoinTask:次要提供 fork 和 join 两个办法用于工作拆分与合并;个别用子类 RecursiveAction(无返回值的工作)和 RecursiveTask(须要返回值)来实现 compute 办法。
- ForkJoinPool:调度 ForkJoinTask 的线程池;
- ForkJoinWorkerThread:Thread 的子类,寄存于线程池中的工作线程(Worker);
- WorkQueue:工作队列,用于保留工作;
1.2.3 根本应用
一个典型的例子:计算 1 -1000 的和
package com.itheima.thread;
import java.util.concurrent.*;
public class SumTask {
private static final Integer MAX = 100;
static class SubTask extends RecursiveTask<Integer> {
// 子工作开始计算的值
private Integer start;
// 子工作完结计算的值
private Integer end;
public SubTask(Integer start , Integer end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {if(end - start < MAX) {
// 小于边界,开始计算
System.out.println("start =" + start + ";end =" + end);
Integer totalValue = 0;
for(int index = this.start ; index <= this.end ; index++) {totalValue += index;}
return totalValue;
}else {
// 否则,两头劈开持续拆分
SubTask subTask1 = new SubTask(start, (start + end) / 2);
subTask1.fork();
SubTask subTask2 = new SubTask((start + end) / 2 + 1 , end);
subTask2.fork();
return subTask1.join() + subTask2.join();
}
}
}
public static void main(String[] args) {ForkJoinPool pool = new ForkJoinPool();
Future<Integer> taskFuture = pool.submit(new SubTask(1,1000));
try {Integer result = taskFuture.get();
System.out.println("result =" + result);
} catch (InterruptedException | ExecutionException e) {e.printStackTrace(System.out);
}
}
}
1.2.4 设计思维
- 一般线程池外部有两个重要汇合:工作线程汇合(一般线程),和工作队列。
- ForkJoinPool 也相似,线程汇合里放的是非凡线程 ForkJoinWorkerThread,工作队列里放的是特殊任务 ForkJoinTask
- 不同之处在于,一般线程池只有一个队列。而 ForkJoinPool 的工作线程 ForkJoinWorkerThread 每个线程内都绑定一个双端队列。
- 在 fork 的时候,也就是工作拆分,将拆分的 task 会被以后线程放到本人的队列中。
- 如果有工作,那么线程优先从本人的队列里取工作执行,默认从队尾
- 当本人队列中执行完后,工作线程会跑到其余队列的队首偷工作来执行。也就是所说的“窃取”
1.2.5 留神点
应用 ForkJoin 将雷同的计算工作通过多线程执行。然而在应用中须要留神:
- 留神工作切分的粒度,也就是 fork 的界线。并非越小越好
- 判断要不要应用 ForkJoin。任务量不是太大的话,串行可能优于并行。因为多线程会波及到上下文的切换
1.3 原子操作
1.3.1 概念
原子(atom)本意是“不能被进一步宰割的最小粒子”,而原子操作(atomic operation)意为 ” 不可被中断的一个或一系列操作 ”。类比于数据库事务,redis 的 multi。
1.3.2 CAS
Compare And Set(或 Compare And Swap),翻译过去就是比拟并替换,CAS 操作蕴含三个操作数——内存地位(V)、预期原值(A)、新值(B)。从第一视角来看,了解为:我认为地位 V 应该是 A,如果是 A,则将 B 放到这个地位;否则,不要更改,只通知我这个地位当初的值即可。所以 cas 外部个别随同着 while 循环操作,不停的去尝试
juc 中提供了 Atomic 结尾的类,基于 cas 实现原子性操作,最根本的利用就是计数器
package com.itheima;
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicCounter {private static AtomicInteger i = new AtomicInteger(0);
public int get(){return i.get();
}
public void inc(){i.incrementAndGet();
}
public static void main(String[] args) throws InterruptedException {final AtomicCounter counter = new AtomicCounter();
for (int i = 0; i < 10; i++) {new Thread(new Runnable() {public void run() {counter.inc();
}
}).start();}
Thread.sleep(3000);
// 能够正确输入 10
System.out.println(counter.i.get());
}
}
注:AtomicInteger 源码。基于 unsafe 类 cas 思维实现,性能篇会讲到
1.3.3 atomic
下面展现了 AtomicInteger,对于 atomic 包,还有很多其余类型:
-
根本类型
- AtomicBoolean:以原子更新的形式更新 boolean;
- AtomicInteger:以原子更新的形式更新 Integer;
- AtomicLong:以原子更新的形式更新 Long;
-
援用类型
- AtomicReference:原子更新援用类型
- AtomicReferenceFieldUpdater:原子更新援用类型的字段
- AtomicMarkableReference:原子更新带有标记位的援用类型
-
数组
- AtomicIntegerArray:原子更新整型数组里的元素。
- AtomicLongArray:原子更新长整型数组里的元素。
- AtomicReferenceArray:原子更新援用类型数组里的元素。
-
字段
- AtomicIntegerFieldUpdater:原子更新整型的字段的更新器。
- AtomicLongFieldUpdater:原子更新长整型字段的更新器。
- AtomicStampedReference:原子更新带有版本号的援用类型。
1.3.4 留神!
应用 atomic 要留神原子性的边界,把握不好会起不到应有的成果,原子性被毁坏。
案例:原子性被毁坏景象
package com.itheima;
import java.util.concurrent.atomic.AtomicInteger;
public class BadAtomic {AtomicInteger i = new AtomicInteger(0);
static int j=0;
public void badInc(){int k = i.incrementAndGet();
try {Thread.sleep(new Random().nextInt(100));
} catch (InterruptedException e) {e.printStackTrace();
}
j=k;
}
public static void main(String[] args) throws InterruptedException {BadAtomic atomic = new BadAtomic();
for (int i = 0; i < 10; i++) {new Thread(()->{atomic.badInc();
}).start();}
Thread.sleep(3000);
System.out.println(atomic.j);
}
}
后果剖析:
- 每次都不一样,总之不是 10
- i 是原子性的,没问题。然而再赋值,变成了两部操作,原子性被突破
- 在 badInc 上加 synchronized,问题解决
1.4 AQS
1.4.1 概念
首先搞清楚,AbstractQueuedSynchronizer 形象的队列式同步器,是一个抽象类,这个类在 java.util.concurrent.locks 包。
除了 java 自带的 synchronized 关键字之外,jdk 提供的另外一种锁机制。如果须要本人实现灵便的锁逻辑,能够思考应用 AQS,十分的便捷。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable
jdk 中应用 AQS 的线程工具类很多,自旋锁、互斥锁、读锁写锁、信号量、通过类继承关系能够轻松查看,所以说,AQS 是 juc 中很多类的基石。
1.4.2 原理
- state:状态,int 类型的成员变量,当 state>0 时示意锁有人占着,当 state = 0 时示意开释了锁。
- 队列:拿不到锁的线程进队列。
1.4.3 源码
AQS 应用了模板设计模式。只须要实现指定的锁获取办法即可,外部的机制 AQS 已帮你封装好。
(AQS 源码 idea 中查看)
须要子类继承 AQS,并实现的办法(protected):
protected boolean tryAcquire(int arg) // 独占式获取同步状态
protected boolean tryRelease(int arg) // 独占式开释同步状态
protected int tryAcquireShared(int arg) // 共享式获取同步状态
protected boolean tryReleaseShared(int arg) // 共享式开释同步状态
应用时,调用的是父类的办法(public)
public final void acquire(int arg) // 独享锁获取
public final boolean release(int arg) // 独享锁开释
public final void acquireShared(int arg) // 共享锁获取
public final boolean releaseShared(int arg) // 共享锁开释
源码剖析
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 可共享式获取锁,内部调用,模板模式
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// 须要实现的局部,空 protected 办法,被下面的对外办法所调用
protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();
}
// 同理,锁的开释,模板模式
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();
}
// 独占式获取
public final void acquire(int arg) {if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();}
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}
// 独占式开释
public final boolean release(int arg) {if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();
}
}
1.4.4 经典面试
一个阿里面试题:本人实现一个锁,最大容许指定数量的线程并行运作。其余排队等待
package com.itheima;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class MyLock extends AbstractQueuedSynchronizer {public MyLock(int count){setState(count);
}
@Override
protected int tryAcquireShared(int arg) {
// 自旋,cas 形式不停获取数量
for (; ;) {int current = getState();
int newCount = current - arg;
if (newCount < 0 || compareAndSetState(current, newCount)) {return newCount;}
}
}
@Override
protected boolean tryReleaseShared(int arg) {for (; ;) {int current = getState();
int newState = current + arg;
if (compareAndSetState(current, newState)) {return true;}
}
}
public static void main(String[] args) {final MyLock lock = new MyLock(3);
for (int i = 0; i < 30; i++) {new Thread(new Runnable() {public void run() {lock.acquireShared(1);
try {Thread.sleep(1000);
System.out.println("ok");
} catch (InterruptedException e) {e.printStackTrace();
} finally {lock.releaseShared(1);
}
}
}).start();}
}
}
验证后果:尽管 30 个一次性 start,然而会每 1s 输入 3 个 ok,达到了并发管制
1.5 并发容器
juc 中还蕴含很多其余的并发容器(理解)
1.ConcurrentHashMap
对应:HashMap
指标:代替 Hashtable、synchronizedMap,应用最多,源码篇会具体解说
原理:JDK7 中采纳 Segment 分段锁,JDK8 中采纳 CAS+synchronized
2.CopyOnWriteArrayList
对应:ArrayList
指标:代替 Vector、synchronizedList
原理:高并发往往是读多写少的个性,读操作不加锁,而对写操作加 Lock 独享锁,先复制一份新的汇合,在新的汇合下面批改,而后将新汇合赋值给旧的援用,并通过 volatile 保障其可见性。
查看源码:volatile array,lock 加锁,数组复制
3.CopyOnWriteArraySet
对应:HashSet
指标:代替 synchronizedSet
原理:与 CopyOnWriteArrayList 实现原理相似。
4.ConcurrentSkipListMap
对应:TreeMap
指标:代替 synchronizedSortedMap(TreeMap)
原理:基于 Skip list(跳表)来代替均衡树,依照分层 key 高低链接指针来实现。
附加:跳表
5.ConcurrentSkipListSet
对应:TreeSet
指标:代替 synchronizedSortedSet(TreeSet)
原理:外部基于 ConcurrentSkipListMap 实现,原理统一
6.ConcurrentLinkedQueue
对应:LinkedList
对应:无界限程平安队列
原理:通过队首队尾指针,以及 Node 类元素的 next 实现 FIFO 队列
7.BlockingQueue
对应:Queue
特点:拓展了 Queue,减少了可阻塞的插入和获取等操作
原理:通过 ReentrantLock 实现线程平安,通过 Condition 实现阻塞和唤醒
实现类:
- LinkedBlockingQueue:基于链表实现的可阻塞的 FIFO 队列
- ArrayBlockingQueue:基于数组实现的可阻塞的 FIFO 队列
- PriorityBlockingQueue:按优先级排序的队列
本文由传智教育博学谷 – 狂野架构师教研团队公布
转载请注明出处!