前言
java 多线程我集体感觉是 javaSe 中最难的一部分,我以前也是感觉学会了,然而真正有多线程的需要却不晓得怎么下手,实际上还是对多线程这块常识理解不粗浅,不晓得多线程 api 的利用场景,不晓得多线程的运行流程等等,本篇文章将应用实例 + 图解 + 源码的形式来解析 java 多线程。
文章篇幅较长,大家也能够有抉择的看具体章节,倡议多线程的代码全副手敲,永远不要置信你看到的论断,本人编码后运行进去的,才是本人的。
什么是 java 多线程?
过程与线程
过程
- 当一个程序被运行,就开启了一个过程,比方启动了 qq,word
- 程序由指令和数据组成,指令要运行,数据要加载,指令被 cpu 加载运行,数据被加载到内存,指令运行时可由 cpu 调度硬盘、网络等设施
线程
- 一个过程内可分为多个线程
- 一个线程就是一个指令流,cpu 调度的最小单位,由 cpu 一条一条执行指令
并行与并发
并发:单核 cpu 运行多线程时,工夫片进行很快的切换。线程轮流执行 cpu
并行:多核 cpu 运行 多线程时,真正的在同一时刻运行
java 提供了丰盛的 api 来反对多线程。
为什么用多线程?
多线程能实现的都能够用单线程来实现,那单线程运行的好好的,为什么 java 要引入多线程的概念呢?
多线程的益处:
- 程序运行的更快!快!快!
- 充分利用 cpu 资源,目前简直没有线上的 cpu 是单核的,施展多核 cpu 弱小的能力
多线程难在哪里?
单线程只有一条执行线,过程容易了解,能够在大脑中清晰的勾画出代码的执行流程
多线程却是多条线,而且个别多条线之间有交互,多条线之间须要通信,个别难点有以下几点
- 多线程的执行后果不确定, 受到 cpu 调度的影响
- 多线程的平安问题
- 线程资源贵重,依赖线程池操作线程,线程池的参数设置问题
- 多线程执行是动静的,同时的, 难以追踪过程
- 多线程的底层是操作系统层面的,源码难度大
有时候心愿本人变成一个字节穿梭于服务器中,搞清楚前因后果,就像无敌毁坏王一样(没看过这部电影的能够看下,脑洞大开)。
java 多线程的根本应用
定义工作、创立和运行线程
工作:线程的执行体。也就是咱们的外围代码逻辑
定义工作
- 继承 Thread 类(能够说是 将工作和线程合并在一起)
- 实现 Runnable 接口(能够说是 将工作和线程离开了)
- 实现 Callable 接口 (利用 FutureTask 执行工作)
Thread 实现工作的局限性
- 工作逻辑写在 Thread 类的 run 办法中,有单继承的局限性
- 创立多线程时,每个工作有成员变量时不共享,必须加 static 能力做到共享
Runnable 和 Callable 解决了 Thread 的局限性
然而 Runbale 相比 Callable 有以下的局限性
- 工作没有返回值
- 工作无奈抛异样给调用方
如下代码 几种定义线程的形式
@Slf4j
class T extends Thread {
@Override
public void run() {log.info("我是继承 Thread 的工作");
}
}
@Slf4j
class R implements Runnable {
@Override
public void run() {log.info("我是实现 Runnable 的工作");
}
}
@Slf4j
class C implements Callable<String> {
@Override
public String call() throws Exception {log.info("我是实现 Callable 的工作");
return "success";
}
}
创立线程的形式
- 通过 Thread 类间接创立线程
- 利用线程池外部创立线程
启动线程的形式
- 调用线程的 start()办法
// 启动继承 Thread 类的工作
new T().start();
// 启动继承 Thread 匿名外部类的工作 可用 lambda 优化
Thread t = new Thread(){
@Override
public void run() {log.info("我是 Thread 匿名外部类的工作");
}
};
// 启动实现 Runnable 接口的工作
new Thread(new R()).start();
// 启动实现 Runnable 匿名实现类的工作
new Thread(new Runnable() {
@Override
public void run() {log.info("我是 Runnable 匿名外部类的工作");
}
}).start();
// 启动实现 Runnable 的 lambda 简化后的工作
new Thread(() -> log.info("我是 Runnable 的 lambda 简化后的工作")).start();
// 启动实现了 Callable 接口的工作 联合 FutureTask 能够获取线程执行的后果
FutureTask<String> target = new FutureTask<>(new C());
new Thread(target).start();
log.info(target.get());
以上各个线程相干的类的类图如下
上下文切换
多核 cpu 下,多线程是并行工作的,如果线程数多,单个核又会并发的调度线程, 运行时会有上下文切换的概念
cpu 执行线程的工作时,会为线程调配工夫片,以下几种状况会产生上下文切换。
- 线程的 cpu 工夫片用完
- 垃圾回收
- 线程本人调用了 sleep、yield、wait、join、park、synchronized、lock 等办法
当产生上下文切换时,操作系统会保留以后线程的状态,并复原另一个线程的状态,jvm 中有块内存地址叫程序计数器,用于记录线程执行到哪一行代码, 是线程公有的。
idea 打断点的时候能够设置为 Thread 模式,idea 的 debug 模式能够看出栈帧的变动
线程的礼让 -yield()& 线程的优先级
yield()办法会让运行中的线程切换到就绪状态,从新争抢 cpu 的工夫片,争抢时是否获取到工夫片看 cpu 的调配。
代码如下
// 办法的定义
public static native void yield();
Runnable r1 = () -> {
int count = 0;
for (;;){log.info("---- 1>" + count++);
}
};
Runnable r2 = () -> {
int count = 0;
for (;;){Thread.yield();
log.info("---- 2>" + count++);
}
};
Thread t1 = new Thread(r1,"t1");
Thread t2 = new Thread(r2,"t2");
t1.start();
t2.start();
// 运行后果
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129504
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129505
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129506
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129507
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129508
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129509
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129510
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129511
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129512
11:49:15.798 [t2] INFO thread.TestYield - ---- 2>293
11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129513
11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129514
11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129515
11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129516
11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129517
11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129518
如上述后果所示,t2 线程每次执行时进行了 yield(),线程 1 执行的机会显著比线程 2 要多。
线程的优先级
线程外部用 1~10 的数来调整线程的优先级,默认的线程优先级为 NORM_PRIORITY:5
cpu 比较忙时,优先级高的线程获取更多的工夫片
cpu 比拟闲时,优先级设置根本没用
public final static int MIN_PRIORITY = 1;
public final static int NORM_PRIORITY = 5;
public final static int MAX_PRIORITY = 10;
// 办法的定义
public final void setPriority(int newPriority) { }
cpu 比较忙时
Runnable r1 = () -> {
int count = 0;
for (;;){log.info("---- 1>" + count++);
}
};
Runnable r2 = () -> {
int count = 0;
for (;;){log.info("---- 2>" + count++);
}
};
Thread t1 = new Thread(r1,"t1");
Thread t2 = new Thread(r2,"t2");
t1.setPriority(Thread.NORM_PRIORITY);
t2.setPriority(Thread.MAX_PRIORITY);
t1.start();
t2.start();
// 可能的运行后果
11:59:00.696 [t1] INFO thread.TestYieldPriority - ---- 1>44102
11:59:00.696 [t2] INFO thread.TestYieldPriority - ---- 2>135903
11:59:00.696 [t2] INFO thread.TestYieldPriority - ---- 2>135904
11:59:00.696 [t2] INFO thread.TestYieldPriority - ---- 2>135905
11:59:00.696 [t2] INFO thread.TestYieldPriority - ---- 2>135906
cpu 比拟闲时
Runnable r1 = () -> {
int count = 0;
for (int i = 0; i < 10; i++) {log.info("---- 1>" + count++);
}
};
Runnable r2 = () -> {
int count = 0;
for (int i = 0; i < 10; i++) {log.info("---- 2>" + count++);
}
};
Thread t1 = new Thread(r1,"t1");
Thread t2 = new Thread(r2,"t2");
t1.setPriority(Thread.MIN_PRIORITY);
t2.setPriority(Thread.MAX_PRIORITY);
t1.start();
t2.start();
// 可能的运行后果 线程 1 优先级低 却先运行完
12:01:09.916 [t1] INFO thread.TestYieldPriority - ---- 1>7
12:01:09.916 [t1] INFO thread.TestYieldPriority - ---- 1>8
12:01:09.916 [t1] INFO thread.TestYieldPriority - ---- 1>9
12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>2
12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>3
12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>4
12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>5
12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>6
12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>7
12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>8
12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>9
守护线程
默认状况下,java 过程须要期待所有线程都运行完结,才会完结,有一种非凡线程叫守护线程,当所有的非守护线程都完结后,即便它没有执行完,也会强制完结。
默认的线程都是非守护线程。
垃圾回收线程就是典型的守护线程
// 办法的定义
public final void setDaemon(boolean on) {
}
Thread thread = new Thread(() -> {while (true) {}});
// 具体的 api。设为 true 示意未守护线程,当主线程完结后,守护线程也完结。// 默认是 false,当主线程完结后,thread 持续运行,程序不进行
thread.setDaemon(true);
thread.start();
log.info("完结");
线程的阻塞
线程的阻塞能够分为好多种,从操作系统层面和 java 层面阻塞的定义可能不同,然而狭义上使得线程阻塞的形式有上面几种
- BIO 阻塞,即应用了阻塞式的 io 流
- sleep(long time) 让线程休眠进入阻塞状态
- a.join() 调用该办法的线程进入阻塞,期待 a 线程执行完复原运行
- sychronized 或 ReentrantLock 造成线程未取得锁进入阻塞状态 (同步锁章节细说)
- 取得锁之后调用 wait()办法 也会让线程进入阻塞状态 (同步锁章节细说)
- LockSupport.park() 让线程进入阻塞状态 (同步锁章节细说)
sleep()
使线程休眠,会将运行中的线程进入阻塞状态。当休眠工夫完结后,从新争抢 cpu 的工夫片持续运行
// 办法的定义 native 办法
public static native void sleep(long millis) throws InterruptedException;
try {
// 休眠 2 秒
// 该办法会抛出 InterruptedException 异样 即休眠过程中可被中断,被中断后抛出异样
Thread.sleep(2000);
} catch (InterruptedException 异样 e) { }
try {
// 应用 TimeUnit 的 api 可代替 Thread.sleep
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) { }
join()
join 是指调用该办法的线程进入阻塞状态,期待某线程执行实现后复原运行
// 办法的定义 有重载
// 期待线程执行完才复原运行
public final void join() throws InterruptedException {}
// 指定 join 的工夫。指定工夫内 线程还未执行完 调用方线程不持续期待就复原运行
public final synchronized void join(long millis)
throws InterruptedException{}
Thread t = new Thread(() -> {
try {Thread.sleep(1000);
} catch (InterruptedException e) {e.printStackTrace();
}
r = 10;
});
t.start();
// 让主线程阻塞 期待 t 线程执行完才继续执行
// 去除该行,执行后果为 0,加上该行 执行后果为 10
t.join();
log.info("r:{}", r);
// 运行后果
13:09:13.892 [main] INFO thread.TestJoin - r:10
线程的打断 -interrupt()
// 相干办法的定义
public void interrupt() {}
public boolean isInterrupted() {}
public static boolean interrupted() {}
打断标记:线程是否被打断,true 示意被打断了,false 示意没有
isInterrupted() 获取线程的打断标记 , 调用后不会批改线程的打断标记
interrupt()办法用于中断线程
- 能够打断 sleep,wait,join 等显式的抛出 InterruptedException 办法的线程,然而打断后, 线程的打断标记还是 false
- 打断失常线程,线程不会真正被中断,然而线程的打断标记为 true
interrupted() 获取线程的打断标记,调用后清空打断标记 即如果获取为 true 调用后打断标记为 false (不罕用)
interrupt 实例:有个后盾监控线程不停的监控,当外界打断它时,就完结运行。代码如下
@Slf4j
class TwoPhaseTerminal{
// 监控线程
private Thread monitor;
public void start(){monitor = new Thread(() ->{
// 不停的监控
while (true){Thread thread = Thread.currentThread();
// 判断以后线程是否被打断
if (thread.isInterrupted()){log.info("以后线程被打断, 完结运行");
break;
}
try {Thread.sleep(1000);
// 监控逻辑中被打断后,打断标记为 true
log.info("监控");
} catch (InterruptedException e) {
// 睡眠时被打断时抛出异样 在该处捕捉到 此时打断标记还是 false
// 在调用一次中断 使得中断标记为 true
thread.interrupt();}
}
});
monitor.start();}
public void stop(){monitor.interrupt();
}
}
线程的状态
下面说了一些根本的 api 的应用,调用下面的办法后都会使得线程有对应的状态。
线程的状态可从 操作系统层面分为五种状态 从 java api 层面分为六种状态。
五种状态
- 初始状态:创立线程对象时的状态
- 可运行状态 (就绪状态):调用 start() 办法后进入就绪状态,也就是筹备好被 cpu 调度执行
- 运行状态:线程获取到 cpu 的工夫片,执行 run()办法的逻辑
- 阻塞状态: 线程被阻塞,放弃 cpu 的工夫片,期待解除阻塞从新回到就绪状态争抢工夫片
- 终止状态: 线程执行实现或抛出异样后的状态
六种状态
Thread 类中的外部枚举 State
public enum State {
NEW,
RUNNABLE,
BLOCKED,
WAITING,
TIMED_WAITING,
TERMINATED;
}
- NEW 线程对象被创立
-
Runnable 线程调用了 start()办法后进入该状态,该状态蕴含了三种状况
- 就绪状态 : 期待 cpu 调配工夫片
- 运行状态: 进入 Runnable 办法执行工作
- 阻塞状态:BIO 执行阻塞式 io 流时的状态
- Blocked 没获取到锁时的阻塞状态(同步锁章节会细说)
- WAITING 调用 wait()、join()等办法后的状态
- TIMED_WAITING 调用 sleep(time)、wait(time)、join(time)等办法后的状态
- TERMINATED 线程执行实现或抛出异样后的状态
六种线程状态和办法的对应关系
线程的相干办法总结
次要总结 Thread 类中的外围办法
办法名称 | 是否 static | 办法阐明 |
---|---|---|
start() | 否 | 让线程启动,进入就绪状态, 期待 cpu 调配工夫片 |
run() | 否 | 重写 Runnable 接口的办法, 线程获取到 cpu 工夫片时执行的具体逻辑 |
yield() | 是 | 线程的礼让,使得获取到 cpu 工夫片的线程进入就绪状态,从新争抢工夫片 |
sleep(time) | 是 | 线程休眠固定工夫,进入阻塞状态,休眠工夫实现后从新争抢工夫片, 休眠可被打断 |
join()/join(time) | 否 | 调用线程对象的 join 办法,调用者线程进入阻塞, 期待线程对象执行完或者达到指定工夫才复原,从新争抢工夫片 |
isInterrupted() | 否 | 获取线程的打断标记,true: 被打断,false:没有被打断。调用后不会批改打断标记 |
interrupt() | 否 | 打断线程,抛出 InterruptedException 异样的办法均可被打断,然而打断后不会批改打断标记,失常执行的线程被打断后会批改打断标记 |
interrupted() | 否 | 获取线程的打断标记。调用后会清空打断标记 |
stop() | 否 | 进行线程运行 不举荐 |
suspend() | 否 | 挂起线程 不举荐 |
resume() | 否 | 复原线程运行 不举荐 |
currentThread() | 是 | 获取以后线程 |
Object 中与线程相干办法
办法名称 | 办法阐明 |
---|---|
wait()/wait(long timeout) | 获取到锁的线程进入阻塞状态 |
notify() | 随机唤醒被 wait()的一个线程 |
notifyAll(); | 唤醒被 wait()的所有线程,从新争抢工夫片 |
同步锁
线程平安
- 一个程序运行多个线程自身是没有问题的
-
问题有可能呈现在多个线程访问共享资源
- 多个线程都是读共享资源也是没有问题的
- 当多个线程读写共享资源时, 如果产生指令交织,就会呈现问题
临界区: 一段代码如果对共享资源的多线程读写操作, 这段代码就被称为临界区。
留神的是 指令交织指的是 java 代码在解析成字节码文件时,java 代码的一行代码在字节码中可能有多行,在线程上下文切换时就有可能交织。
线程平安指的是多线程调用同一个对象的临界区的办法时,对象的属性值肯定不会产生谬误,这就是保障了线程平安。
如上面不平安的代码
// 对象的成员变量
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
// t1 线程对变量 +5000 次
Thread t1 = new Thread(() -> {for (int i = 0; i < 5000; i++) {count++;}
});
// t2 线程对变量 -5000 次
Thread t2 = new Thread(() -> {for (int i = 0; i < 5000; i++) {count--;}
});
t1.start();
t2.start();
// 让 t1 t2 都执行完
t1.join();
t2.join();
System.out.println(count);
}
// 运行后果
-1399
下面的代码 两个线程,一个 +5000 次,一个 -5000 次,如果线程平安,count 的值应该还是 0。
然而运行很屡次,每次的后果不同,且都不是 0,所以是线程不平安的。
线程平安的类肯定所有的操作都线程平安吗?
开发中常常会说到一些线程平安的类,如 ConcurrentHashMap,线程平安指的是类里每一个独立的办法是线程平安的,然而 办法的组合就不肯定是线程平安的。
成员变量和动态变量是否线程平安?
- 如果没有多线程共享,则线程平安
-
如果存在多线程共享
- 多线程只有读操作,则线程平安
- 多线程存在写操作,写操作的代码又是临界区, 则线程不平安
局部变量是否线程平安?
- 局部变量是线程平安的
-
局部变量援用的对象未必是线程平安的
- 如果该对象没有逃离该办法的作用范畴,则线程平安
- 如果该对象逃离了该办法的作用范畴,比方:办法的返回值, 须要思考线程平安
synchronized
同步锁也叫对象锁,是锁在对象上的,不同的对象就是不同的锁。
该关键字是用于保障线程平安的,是阻塞式的解决方案。
让同一个时刻最多只有一个线程能持有对象锁,其余线程在想获取这个对象锁就会被阻塞,不必放心上下文切换的问题。
留神:不要了解为一个线程加了锁,进入 synchronized 代码块中就会始终执行上来。如果工夫片切换了,也会执行其余线程,再切换回来会紧接着执行,只是不会执行到有竞争锁的资源,因为以后线程还未开释锁。
当一个线程执行完 synchronized 的代码块后 会唤醒正在期待的线程
synchronized 实际上应用对象锁保障临界区的 原子性 临界区的代码是不可分割的 不会因为线程切换所打断
根本应用
// 加在办法上 理论是对 this 对象加锁
private synchronized void a() {}
// 同步代码块, 锁对象能够是任意的,加在 this 上 和 a()办法作用雷同
private void b(){synchronized (this){}}
// 加在静态方法上 理论是对类对象加锁
private synchronized static void c() {}
// 同步代码块 理论是对类对象加锁 和 c()办法作用雷同
private void d(){synchronized (TestSynchronized.class){}}
// 上述 b 办法对应的字节码源码 其中 monitorenter 就是加锁的中央
0 aload_0
1 dup
2 astore_1
3 monitorenter
4 aload_1
5 monitorexit
6 goto 14 (+8)
9 astore_2
10 aload_1
11 monitorexit
12 aload_2
13 athrow
14 return
线程平安的代码
private static int count = 0;
private static Object lock = new Object();
private static Object lock2 = new Object();
// t1 线程和 t2 对象都是对同一对象加锁。保障了线程平安。此段代码无论执行多少次,后果都是 0
public static void main(String[] args) throws InterruptedException {Thread t1 = new Thread(() -> {for (int i = 0; i < 5000; i++) {synchronized (lock) {count++;}
}
});
Thread t2 = new Thread(() -> {for (int i = 0; i < 5000; i++) {synchronized (lock) {count--;}
}
});
t1.start();
t2.start();
// 让 t1 t2 都执行完
t1.join();
t2.join();
System.out.println(count);
}
重点:加锁是加在对象上,肯定要保障是同一对象,加锁能力失效
线程通信
wait+notify
线程间通信能够通过共享变量 +wait()¬ify()来实现
wait()将线程进入阻塞状态,notify()将线程唤醒
当多线程竞争拜访对象的同步办法时,锁对象会关联一个底层的 Monitor 对象(重量级锁的实现)
如下图所示 Thread0,1 先竞争到锁执行了代码后,2,3,4,5 线程同时来执行临界区的代码, 开始竞争锁
- Thread- 0 先获取到对象的锁,关联到 monitor 的 owner,同步代码块内调用了锁对象的 wait()办法,调用后会进入 waitSet 期待,Thread- 1 同样如此,此时 Thread- 0 的状态为 Waitting
- Thread2、3、4、5 同时竞争,2 获取到锁后,关联了 monitor 的 owner,3、4、5 只能进入 EntryList 中期待,此时 2 线程状态为 Runnable,3、4、5 状态为 Blocked
- 2 执行后,唤醒 entryList 中的线程,3、4、5 进行竞争锁,获取到的线程即会关联 monitor 的 owner
- 3、4、5 线程在执行过程中,调用了锁对象的 notify()或 notifyAll()时,会唤醒 waitSet 的线程,唤醒的线程进入 entryList 期待从新竞争锁
留神:
- Blocked 状态和 Waitting 状态都是阻塞状态
- Blocked 线程会在 owner 线程开释锁时唤醒
- wait 和 notify 应用场景是必须要有同步,且必须取得对象的锁能力调用, 应用锁对象去调用, 否则会抛异样
- wait() 开释锁 进入 waitSet 可传入工夫,如果指定工夫内未被唤醒 则主动唤醒
- notify()随机唤醒一个 waitSet 里的线程
- notifyAll()唤醒 waitSet 中所有的线程
static final Object lock = new Object();
new Thread(() -> {synchronized (lock) {log.info("开始执行");
try {
// 同步代码外部能力调用
lock.wait();} catch (InterruptedException e) {e.printStackTrace();
}
log.info("继续执行外围逻辑");
}
}, "t1").start();
new Thread(() -> {synchronized (lock) {log.info("开始执行");
try {lock.wait();
} catch (InterruptedException e) {e.printStackTrace();
}
log.info("继续执行外围逻辑");
}
}, "t2").start();
try {Thread.sleep(2000);
} catch (InterruptedException e) {e.printStackTrace();
}
log.info("开始唤醒");
synchronized (lock) {
// 同步代码外部能力调用
lock.notifyAll();}
// 执行后果
14:29:47.138 [t1] INFO TestWaitNotify - 开始执行
14:29:47.141 [t2] INFO TestWaitNotify - 开始执行
14:29:49.136 [main] INFO TestWaitNotify - 开始唤醒
14:29:49.136 [t2] INFO TestWaitNotify - 继续执行外围逻辑
14:29:49.136 [t1] INFO TestWaitNotify - 继续执行外围逻辑
wait 和 sleep 的区别?
二者都会让线程进入阻塞状态,有以下区别
- wait 是 Object 的办法 sleep 是 Thread 的办法
- wait 会立刻开释锁 sleep 不会开释锁
- wait 后线程的状态是 Watting sleep 后线程的状态为 Time_Waiting
park&unpark
LockSupport 是 juc 下的工具类,提供了 park 和 unpark 办法,能够实现线程通信
与 wait 和 notity 相比的不同点
- wait 和 notify 须要获取对象锁 park unpark 不要
- unpark 能够指定唤醒线程 notify 随机唤醒
- park 和 unpark 的程序能够先 unpark wait 和 notify 的程序不能颠倒
生产者消费者模型
指的是有生产者来生产数据,消费者来生产数据,生产者生产满了就不生产了,告诉消费者取,等生产了再进行生产。
消费者生产不到了就不生产了,告诉生产者生产,生产到了再持续生产。
public static void main(String[] args) throws InterruptedException {MessageQueue queue = new MessageQueue(2);
// 三个生产者向队列里存值
for (int i = 0; i < 3; i++) {
int id = i;
new Thread(() -> {queue.put(new Message(id, "值" + id));
}, "生产者" + i).start();}
Thread.sleep(1000);
// 一个消费者不停的从队列里取值
new Thread(() -> {while (true) {queue.take();
}
}, "消费者").start();}
}
// 音讯队列被生产者和消费者持有
class MessageQueue {private LinkedList<Message> list = new LinkedList<>();
// 容量
private int capacity;
public MessageQueue(int capacity) {this.capacity = capacity;}
/**
* 生产
*/
public void put(Message message) {synchronized (list) {while (list.size() == capacity) {log.info("队列已满,生产者期待");
try {list.wait();
} catch (InterruptedException e) {e.printStackTrace();
}
}
list.addLast(message);
log.info("生产音讯:{}", message);
// 生产后告诉消费者
list.notifyAll();}
}
public Message take() {synchronized (list) {while (list.isEmpty()) {log.info("队列已空,消费者期待");
try {list.wait();
} catch (InterruptedException e) {e.printStackTrace();
}
}
Message message = list.removeFirst();
log.info("生产音讯:{}", message);
// 生产后告诉生产者
list.notifyAll();
return message;
}
}
}
// 音讯
class Message {
private int id;
private Object value;
}
同步锁案例
为了更形象的表白加同步锁的概念,这里举一个生存中的例子,尽量把以上的概念具体化进去。
这里举一个每个人十分感兴趣的一件货色。钱!!!(马老师除外)。
事实中,咱们去银行门口的主动取款机取钱,取款机的钱就是共享变量,为了保障平安,不可能两个陌生人同时进入同一个取款机内取钱,所以只能一个人进入取钱,而后锁上取款机的门,其他人只能在取款机门口期待。
取款机有多个,外面的钱互不影响,锁也有多个(多个对象锁),取钱人在多个取款机里同时取钱也没有平安问题。
如果每个取钱的陌生人都是 线程 ,当取钱人进入取款机锁了门后( 线程取得锁 ),取到钱后出门( 线程开释锁),下一个人竞争到锁来取钱。
假如工作人员也是一个线程, 如果取钱人进入后发现取款机钱有余了,这时告诉工作人员来向取款机里加钱 ( 调用 notifyAll 办法 ),取钱人暂停取钱,进入银行大堂阻塞期待( 调用 wait 办法)。
银行大堂里的工作人员和取钱人都被唤醒,从新竞争锁,进入后如果是取钱人,因为取款机没钱,还得进入银行大堂期待。
当工作人员取得取款机的锁进入后,加了钱后会告诉大厅里的人来取钱 ( 调用 notifyAll 办法 )。本人暂停加钱,进入银行大堂期待唤醒加钱( 调用 wait 办法)。
这时大堂里期待的人都来竞争锁,谁获取到谁进入持续取钱。
和事实中不同的就是这里没有排队的概念,谁抢到锁谁进去取。
ReentrantLock
可重入锁 : 一个线程获取到对象的锁后,执行办法外部在须要获取锁的时候是能够获取到的。如以下代码
private static final ReentrantLock LOCK = new ReentrantLock();
private static void m() {LOCK.lock();
try {log.info("begin");
// 调用 m1()
m1();} finally {
// 留神锁的开释
LOCK.unlock();}
}
public static void m1() {LOCK.lock();
try {log.info("m1");
m2();} finally {
// 留神锁的开释
LOCK.unlock();}
}
synchronized 也是可重入锁,ReentrantLock 有以下长处
- 反对获取锁的超时工夫
- 获取锁时可被打断
- 可设为偏心锁
- 能够有不同的条件变量,即有多个 waitSet,能够指定唤醒
api
// 默认非偏心锁,参数传 true 示意未偏心锁
ReentrantLock lock = new ReentrantLock(false);
// 尝试获取锁
lock()
// 开释锁 应放在 finally 块中 必须执行到
unlock()
try {
// 获取锁时可被打断, 阻塞中的线程可被打断
LOCK.lockInterruptibly();} catch (InterruptedException e) {return;}
// 尝试获取锁 获取不到就返回 false
LOCK.tryLock()
// 反对超时工夫 一段时间没获取到就返回 false
tryLock(long timeout, TimeUnit unit)
// 指定条件变量 休息室 一个锁能够创立多个休息室
Condition waitSet = ROOM.newCondition();
// 开释锁 进入 waitSet 期待 开释后其余线程能够抢锁
yanWaitSet.await()
// 唤醒具体休息室的线程 唤醒后 重写竞争锁
yanWaitSet.signal()
实例:一个线程输入 a,一个线程输入 b,一个线程输入 c,abc 依照程序输入,间断输入 5 次
这个考的就是线程的通信,利用 wait()/notify()和控制变量能够实现,此处应用 ReentrantLock 即可实现该性能。
public static void main(String[] args) {AwaitSignal awaitSignal = new AwaitSignal(5);
// 构建三个条件变量
Condition a = awaitSignal.newCondition();
Condition b = awaitSignal.newCondition();
Condition c = awaitSignal.newCondition();
// 开启三个线程
new Thread(() -> {awaitSignal.print("a", a, b);
}).start();
new Thread(() -> {awaitSignal.print("b", b, c);
}).start();
new Thread(() -> {awaitSignal.print("c", c, a);
}).start();
try {Thread.sleep(1000);
} catch (InterruptedException e) {e.printStackTrace();
}
awaitSignal.lock();
try {
// 先唤醒 a
a.signal();} finally {awaitSignal.unlock();
}
}
}
class AwaitSignal extends ReentrantLock {
// 循环次数
private int loopNumber;
public AwaitSignal(int loopNumber) {this.loopNumber = loopNumber;}
/**
* @param print 输入的字符
* @param current 以后条件变量
* @param next 下一个条件变量
*/
public void print(String print, Condition current, Condition next) {for (int i = 0; i < loopNumber; i++) {lock();
try {
try {
// 获取锁之后期待
current.await();
System.out.print(print);
} catch (InterruptedException e) { }
next.signal();} finally {unlock();
}
}
}
死锁
说到死锁, 先举个例子,
上面是代码实现
static Beer beer = new Beer();
static Story story = new Story();
public static void main(String[] args) {new Thread(() ->{synchronized (beer){log.info("我有酒,给我故事");
try {Thread.sleep(1000);
} catch (InterruptedException e) {e.printStackTrace();
}
synchronized (story){log.info("小王开始喝酒讲故事");
}
}
},"小王").start();
new Thread(() ->{synchronized (story){log.info("我有故事,给我酒");
try {Thread.sleep(1000);
} catch (InterruptedException e) {e.printStackTrace();
}
synchronized (beer){log.info("老王开始喝酒讲故事");
}
}
},"老王").start();}
class Beer {
}
class Story{
}
死锁导致程序无奈失常运行上来
检测工具能够查看到死锁信息
java 内存模型(JMM)
jmm 体现在以下三个方面
- 原子性 保障指令不会受到上下文切换的影响
- 可见性 保障指令不会受到 cpu 缓存的影响
- 有序性 保障指令不会受并行优化的影响
可见性
停不下来的程序
static boolean run = true;
public static void main(String[] args) throws InterruptedException {Thread t = new Thread(() -> {while (run) {// ....}
});
t.start();
Thread.sleep(1000);
// 线程 t 不会如料想的停下来
run = false;
}
如上图所示,线程有本人的工作缓存,当主线程批改了变量并同步到主内存时,t 线程没有读取到,所以程序停不下来
有序性
JVM 在不影响程序正确性的状况下可能会调整语句的执行程序,该状况也称为 指令重排序
static int i;
static int j;
// 在某个线程内执行如下赋值操作
i = ...;
j = ...;
有可能将 j 先赋值
原子性
原子性大家应该比拟相熟,上述同步锁的 synchronized 代码块就是保障了原子性,就是一段代码是一个整体,原子性保障了线程平安,不会受到上下文切换的影响。
volatile
该关键字解决了可见性和有序性,volatile 通过内存屏障来实现的
- 写屏障
会在对象写操作之后加写屏障,会对写屏障的之前的数据都同步到主存,并且保障写屏障的执行程序在写屏障之前
- 读屏障
会在对象读操作之前加读屏障,会在读屏障之后的语句都从主存读,并保障读屏障之后的代码执行在读屏障之后
留神:volatile 不能解决原子性,即不能通过该关键字实现线程平安。
volatile 利用场景:一个线程读取变量,另外的线程操作变量,加了该关键字后保障写变量后,读变量的线程能够及时感知。
无锁 -cas
cas(compare and swap) 比拟并替换
为变量赋值时,从内存中读取到的值 v,获取到要替换的新值 n,执行 compareAndSwap()办法时,比拟 v 和以后内存中的值是否统一,如果统一则将 n 和 v 替换,如果不统一,则自旋重试。
cas 底层是 cpu 层面的,即不应用同步锁也能够保障操作的原子性。
private AtomicInteger balance;
// 模仿 cas 的具体操作
@Override
public void withdraw(Integer amount) {while (true) {
// 获取以后值
int pre = balance.get();
// 进行操作后失去新值
int next = pre - amount;
// 比拟并设置胜利 则中断 否则自旋重试
if (balance.compareAndSet(pre, next)) {break;}
}
}
无锁的效率是要高于之前的锁的,因为无锁不会波及线程的上下文切换
cas 是乐观锁的思维,sychronized 是乐观锁的思维
cas 适宜很少有线程竞争的场景,如果竞争很强,重试常常产生,反而升高效率
juc 并发包下蕴含了实现了 cas 的原子类
- AtomicInteger/AtomicBoolean/AtomicLong
- AtomicIntegerArray/AtomicLongArray/AtomicReferenceArray
- AtomicReference/AtomicStampedReference/AtomicMarkableReference
AtomicInteger
罕用 api
new AtomicInteger(balance)
get()
compareAndSet(pre, next)
// i.incrementAndGet() ++i
// i.decrementAndGet() --i
// i.getAndIncrement() i++
// i.getAndDecrement() ++i
i.addAndGet()
// 传入函数式接口 批改 i
int getAndUpdate(IntUnaryOperator updateFunction)
// cas 的外围办法
compareAndSet(int expect, int update)
ABA 问题
cas 存在 ABA 问题,即比拟并替换时,如果原值为 A, 有其余线程将其批改为 B,在有其余线程将其批改为 A。
此时理论产生过替换,然而比拟和替换因为值没扭转能够替换胜利
解决形式
AtomicStampedReference/AtomicMarkableReference
下面两个类解决 ABA 问题,原理就是为对象减少版本号, 每次批改时减少版本号,就能够防止 ABA 问题
或者减少个布尔变量标识,批改后调整布尔变量值,也能够防止 ABA 问题
线程池
线程池的介绍
线程池是 java 并发最重要的一个知识点,也是难点,是理论利用最宽泛的。
线程的资源很贵重,不可能有限的创立,必须要有治理线程的工具,线程池就是一种治理线程的工具,java 开发中常常有池化的思维,如 数据库连接池、Redis 连接池等。
事后创立好一些线程,工作提交时间接执行,既能够节约创立线程的工夫,又能够控制线程的数量。
线程池的益处
- 升高资源耗费,通过池化思维,缩小创立线程和销毁线程的耗费,管制资源
- 进步响应速度,工作达到时,无需创立线程即可运行
- 提供更多更弱小的性能,可扩展性高
线程池的构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
结构器参数的意义
参数名 | 参数意义 |
---|---|
corePoolSize | 外围线程数 |
maximumPoolSize | 最大线程数 |
keepAliveTime | 救急线程的闲暇工夫 |
unit | 救急线程的闲暇工夫单位 |
workQueue | 阻塞队列 |
threadFactory | 创立线程的工厂,次要定义线程名 |
handler | 回绝策略 |
线程池案例
上面 咱们通过一个实例来了解线程池的参数以及线程池的接管工作的过程
如上图 银行办理业务。
- 客户到银行时,开启柜台进行办理,柜台相当于线程,客户相当于工作,有两个是常开的柜台,三个是长期柜台。2 就是外围线程数,5 是最大线程数。即有两个外围线程
- 当柜台开到第二个后,都还在解决业务。客户再来就到排队大厅排队。排队大厅只有三个座位。
- 排队大厅坐满时,再来客户就持续开柜台解决,目前最大有三个长期柜台,也就是三个救急线程
- 此时再来客户,就无奈失常为其 提供业务,采纳回绝策略来解决它们
- 当柜台解决完业务,就会从排队大厅取工作,当柜台隔一段闲暇工夫都取不到工作时,如果以后线程数大于外围线程数时,就会回收线程。即撤销该柜台。
线程池的状态
线程池通过一个 int 变量的高 3 位来示意线程池的状态,低 29 位来存储线程池的数量
状态名称 | 高三位 | 接管新工作 | 解决阻塞队列工作 | 阐明 |
---|---|---|---|---|
Running | 111 | Y | Y | 失常接管工作,失常解决工作 |
Shutdown | 000 | N | Y | 不会接管工作, 会执行完正在执行的工作, 也会解决阻塞队列里的工作 |
stop | 001 | N | N | 不会接管工作,会中断正在执行的工作, 会放弃解决阻塞队列里的工作 |
Tidying | 010 | N | N | 工作全副执行结束,以后流动线程是 0,行将进入终结 |
Termitted | 011 | N | N | 终结状态 |
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
线程池的次要流程
线程池创立、接管工作、执行工作、回收线程的步骤
- 创立线程池后,线程池的状态是 Running,该状态下能力有上面的步骤
- 提交工作时,线程池会创立线程去解决工作
- 当线程池的工作线程数达到 corePoolSize 时,持续提交工作会进入阻塞队列
- 当阻塞队列装满时,持续提交工作,会创立救急线程来解决
- 当线程池中的工作线程数达到 maximumPoolSize 时,会执行回绝策略
- 当线程取工作的工夫达到 keepAliveTime 还没有取到工作,工作线程数大于 corePoolSize 时,会回收该线程
留神:不是刚创立的线程是外围线程,前面创立的线程是非核心线程,线程是没有外围非核心的概念的,这是我长期以来的误会。
回绝策略
- 调用者抛出 RejectedExecutionException (默认策略)
- 让调用者运行工作
- 抛弃此次工作
- 抛弃阻塞队列中最早的工作,退出该工作
提交工作的办法
// 执行 Runnable
public void execute(Runnable command) {if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))
return;
c = ctl.get();}
if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
// 提交 Callable
public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();
// 外部构建 FutureTask
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
// 提交 Runnable, 指定返回值
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();
// 外部构建 FutureTask
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
// 提交 Runnable, 指定返回值
public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();
// 外部构建 FutureTask
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value);
}
Execetors 创立线程池
留神:上面几种形式都不举荐应用
1.newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- 外围线程数 = 最大线程数 没有救急线程
- 阻塞队列无界 可能导致 oom
2.newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- 外围线程数是 0,最大线程数无限度,救急线程 60 秒回收
- 队列采纳 SynchronousQueue 实现 没有容量,即放入队列后没有线程来取就放不进去
- 可能导致线程数过多,cpu 累赘太大
3.newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- 外围线程数和最大线程数都是 1,没有救急线程,无界队列 能够不停的接管工作
- 将工作串行化 一个个执行,应用包装类是为了屏蔽批改线程池的一些参数 比方 corePoolSize
- 如果某线程抛出异样了,会从新创立一个线程继续执行
- 可能造成 oom
4.newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);
}
- 任务调度的线程池 能够指定延迟时间调用,能够指定隔一段时间调用
线程池的敞开
shutdown()
会让线程池状态为 shutdown,不能接管工作,然而会将工作线程和阻塞队列里的工作执行完 相当于优雅敞开
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();
}
tryTerminate();}
shutdownNow()
会让线程池状态为 stop,不能接管工作,会立刻中断执行中的工作线程,并且不会执行阻塞队列里的工作,会返回阻塞队列的工作列表
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();} finally {mainLock.unlock();
}
tryTerminate();
return tasks;
}
线程池的正确应用姿态
线程池难就难在参数的配置,有一套实践配置参数
cpu 密集型 : 指的是程序次要产生 cpu 的运算
外围线程数:CPU 外围数 +1
IO 密集型: 近程调用 RPC,操作数据库等,不须要应用 cpu 进行大量的运算。大多数利用的场景
外围线程数 = 核数 cpu 冀望利用率 总工夫 /cpu 运算工夫
然而基于以上实践还是很难去配置,因为 cpu 运算工夫不好估算
理论配置大小可参考下表
cpu 密集型 | io 密集型 | |
---|---|---|
线程数数量 | 核数 <=x<= 核数 *2 | 外围数50<=x<= 外围数 100 |
队列长度 | y>=100 | 1<=y<=10 |
1. 线程池参数通过分布式配置,批改配置无需重启利用
线程池参数是依据线上的申请数变动而变动的,最好的形式是 外围线程数、最大线程数 队列大小都是可配置的
次要配置 corePoolSize maxPoolSize queueSize
java 提供了可办法笼罩参数,线程池外部会解决好参数 进行平滑的批改
public void setCorePoolSize(int corePoolSize) {
}
2. 减少线程池的监控
3.io 密集型可调整为先新增工作到最大线程数后再将工作放到阻塞队列
代码 次要可重写阻塞队列 退出工作的办法
public boolean offer(Runnable runnable) {if (executor == null) {throw new RejectedExecutionException("The task queue does not have executor!");
}
final ReentrantLock lock = this.lock;
lock.lock();
try {int currentPoolThreadSize = executor.getPoolSize();
// 如果提交工作数小于以后创立的线程数, 阐明还有闲暇线程,
if (executor.getTaskCount() < currentPoolThreadSize) {
// 将工作放入队列中,让线程去解决工作
return super.offer(runnable);
}
// 外围改变
// 如果以后线程数小于最大线程数,则返回 false,让线程池去创立新的线程
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {return false;}
// 否则,就将工作放入队列中
return super.offer(runnable);
} finally {lock.unlock();
}
}
3. 回绝策略 倡议应用 tomcat 的回绝策略(给一次机会)
// tomcat 的源码
@Override
public void execute(Runnable command) {if ( executor != null) {
try {executor.execute(command);
} catch (RejectedExecutionException rx) {
// 捕捉到异样后 在从队列获取,相当于重试 1 取不到工作 在执行回绝工作
if (!( (TaskQueue) executor.getQueue()).force(command) ) throw new RejectedExecutionException("Work queue full.");
}
} else throw new IllegalStateException("StandardThreadPool not started.");
}
倡议批改从队列取工作的形式:减少超时工夫,超时 1 分钟取不到在进行返回
public boolean offer(E e, long timeout, TimeUnit unit){}
结语
文章篇幅较长,给看到这里的小伙伴点个大大的赞! 因为作者程度无限,加之第一次写博客,文章中难免会有谬误之处,欢送小伙伴们反馈斧正。
如果感觉文章对你有帮忙, 麻烦 点赞、评论、转发、在看 走起
文末福利,点击支付上百本 Java 电子书合集,总有你须要的那一本