共计 13506 个字符,预计需要花费 34 分钟才能阅读完成。
明天 Java19 正式公布,带来了一个 Java 开发者垂涎已久的新个性—— 虚构线程。在 Java 有这个新个性之前,Go 语言的协程风靡已久,在并发编程畛域能够说是叱咤风云。随着国内 Go 语言的疾速倒退与推广,协程如同成为了一个世界上最好语言的必备个性之一。Java19 虚构线程就是来补救这个空白的。本文将通过对虚构线程的介绍,以及与 Go 协程的比照来带大家尝鲜 Java19 虚构线程。
本文要点:
- Java 线程模型
- 平台线程与虚构线程性能比照
- Java 虚构线程与 Go 协程比照
- 如何应用虚构线程
Java 线程模型
java 线程 与 虚构线程
咱们罕用的 Java 线程与零碎内核线程是一一对应的,零碎内核的线程调度程序负责调度 Java 线程。为了减少应用程序的性能,咱们会减少越来越多的 Java 线程,显然系统调度 Java 线程时,会占据不少资源去解决线程上下文切换。
近几十年来,咱们始终依赖上述多线程模型来解决 Java 并发编程的问题。为了减少零碎的吞吐量,咱们要一直减少线程的数量,但机器的线程是低廉的、可用线程数量也是无限的。即便咱们应用了各种线程池来最大化线程的性价比,然而线程往往会在 CPU、网络或者内存资源耗尽之前成为咱们应用程序的性能晋升瓶颈,不能最大限度的开释硬件应该具备的性能。
为了解决这个问题 Java19 引入了虚构线程(Virtual Thread)。在 Java19 中,之前咱们罕用的线程叫做平台线程(platform thread),与零碎内核线程依然是一一对应的。其中大量(M)的虚构线程在较小数量(N)的平台线程(与操作系统线程一一对应)上运行(M:N 调度)。多个虚构线程会被 JVM 调度到某一个平台线程上执行,一个平台线程同时只会执行一个虚构线程。
创立 Java 虚构线程
新增线程相干 API
Thread.ofVirtual()
和 Thread.ofPlatform()
是创立虚构和平台线程的新 API:
// 输入线程 ID 包含虚构线程和零碎线程 Thread.getId() 从 jdk19 废除
Runnable runnable = () -> System.out.println(Thread.currentThread().threadId());
// 创立虚构线程
Thread thread = Thread.ofVirtual().name("testVT").unstarted(runnable);
testVT.start();
// 创立虚平台线程
Thread testPT = Thread.ofPlatform().name("testPT").unstarted(runnable);
testPT.start();
应用 Thread.startVirtualThread(Runnable)
疾速创立虚构线程并启动:
// 输入线程 ID 包含虚构线程和零碎线程
Runnable runnable = () -> System.out.println(Thread.currentThread().threadId());
Thread thread = Thread.startVirtualThread(runnable);
Thread.isVirtual()
判断线程是否为虚构线程:
// 输入线程 ID 包含虚构线程和零碎线程
Runnable runnable = () -> System.out.println(Thread.currentThread().isVirtual());
Thread thread = Thread.startVirtualThread(runnable);
Thread.join
和 Thread.sleep
期待虚构线程完结、使虚构线程 sleep:
Runnable runnable = () -> System.out.println(Thread.sleep(10));
Thread thread = Thread.startVirtualThread(runnable);
// 期待虚构线程完结
thread.join();
Executors.newVirtualThreadPerTaskExecutor()
创立一个 ExecutorService,该 ExecutorService 为每个工作创立一个新的虚构线程:
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {executor.submit(() -> System.out.println("hello"));
}
反对与应用线程池和 ExecutorService 的现有代码相互替换、迁徙。
留神:
因为虚构线程在 Java19 中是预览个性,所以本文呈现的代码需按以下形式运行:
- 应用
javac --release 19 --enable-preview Main.java
编译程序,并应用java --enable-preview Main
运行; - 或者应用
java --source 19 --enable-preview Main.java
运行程序;
是骡子是马
既然是为了解决平台线程的问题,那咱们就间接测试平台线程与虚构线程的性能。
测试内容很简略,并行执行一万个 sleep 一秒的工作,比照总的执行工夫和所用零碎线程数量。
为了监控测试所用零碎线程的数量,编写如下代码:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(() -> {ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfo = threadBean.dumpAllThreads(false, false);
System.out.println(threadInfo.length + "os thread");
}, 1, 1, TimeUnit.SECONDS);
调度线程池每一秒钟获取并打印零碎线程数量,便于察看线程的数量。
public static void main(String[] args) {
// 记录零碎线程数
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(() -> {ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfo = threadBean.dumpAllThreads(false, false);
System.out.println(threadInfo.length + "os thread");
}, 1, 1, TimeUnit.SECONDS);
long l = System.currentTimeMillis();
try(var executor = Executors.newCachedThreadPool()) {IntStream.range(0, 10000).forEach(i -> {executor.submit(() -> {Thread.sleep(Duration.ofSeconds(1));
System.out.println(i);
return i;
});
});
}
System.out.printf("耗时:%d ms", System.currentTimeMillis() - l);
}
首先咱们应用 Executors.newCachedThreadPool()
来执行 10000 个工作,因为 newCachedThreadPool
的最大线程数量是 Integer.MAX_VALUE,所以实践上至多会创立大几千个零碎线程来执行。
输入如下(多余输入已省略):
//output
1
7142
3914 os thread
Exception in thread "main" java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
at java.base/java.lang.Thread.start0(Native Method)
at java.base/java.lang.Thread.start(Thread.java:1560)
at java.base/java.lang.System$2.start(System.java:2526)
从上述输入能够看到,最高创立了 3914 个零碎线程,而后持续创立线程时异样,程序终止。咱们想通过大量零碎线程进步零碎的性能是不事实的,因为线程低廉,资源无限。
当初咱们应用固定大小为 200 的线程池来解决不能申请太多零碎线程的问题:
public static void main(String[] args) {
// 记录零碎线程数
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(() -> {ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfo = threadBean.dumpAllThreads(false, false);
System.out.println(threadInfo.length + "os thread");
}, 1, 1, TimeUnit.SECONDS);
long l = System.currentTimeMillis();
try(var executor = Executors.newFixedThreadPool(200)) {IntStream.range(0, 10000).forEach(i -> {executor.submit(() -> {Thread.sleep(Duration.ofSeconds(1));
System.out.println(i);
return i;
});
});
}
System.out.printf("耗时:%dms\n", System.currentTimeMillis() - l);
}
输入如下:
//output
1
9987
9998
207 os thread
耗时:50436ms
应用固定大小线程池后没有了创立大量零碎线程导致失败的问题,能失常跑完工作,最高创立了 207 个零碎线程,共耗时 50436ms。
再来看看应用虚构线程的后果:
public static void main(String[] args) {ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(() -> {ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfo = threadBean.dumpAllThreads(false, false);
System.out.println(threadInfo.length + "os thread");
}, 10, 10, TimeUnit.MILLISECONDS);
long l = System.currentTimeMillis();
try(var executor = Executors.newVirtualThreadPerTaskExecutor()) {IntStream.range(0, 10000).forEach(i -> {executor.submit(() -> {Thread.sleep(Duration.ofSeconds(1));
System.out.println(i);
return i;
});
});
}
System.out.printf("耗时:%dms\n", System.currentTimeMillis() - l);
}
应用虚构线程的代码和应用固定大小的只有一词只差,将 Executors.newFixedThreadPool(200)
替换为Executors.newVirtualThreadPerTaskExecutor()
。
输入后果如下:
//output
1
9890
15 os thread
耗时:1582ms
由输入可见,执行总耗时 1582 ms,最高应用零碎线程 15 个。论断很显著,应用虚构线程比平台线程要快很多,并且应用的零碎线程资源要更少。
如果咱们把刚刚这个测试程序中的工作换成执行了一秒钟的计算(例如,对一个微小的数组进行排序),而不仅仅是 sleep 1 秒钟,即便咱们把虚构线程或者平台线程的数量减少到远远大于处理器内核数量都不会有显著的性能晋升。因为虚构线程不是更快的线程,它们运行代码的速度与平台线程相比并无劣势。虚构线程的存在是为了提供更高的吞吐量,而不是速度(更低的提早)。
如果你的应用程序合乎上面两点特色,应用虚构线程能够显著进步程序吞吐量:
- 程序并发工作数量很高。
- IO 密集型、工作负载不受 CPU 束缚。
虚构线程有助于进步服务端应用程序的吞吐量,因为此类应用程序有大量并发,而且这些工作通常会有大量的 IO 期待。
Java vs Go
应用形式比照
Go 协程比照 Java 虚构线程
定义一个 say() 办法,办法体是循环 sleep 100ms,而后输入 index,将这个办法应用协程执行。
Go 实现:
package main
import (
"fmt"
"time"
)
func say(s string) {
for i := 0; i < 5; i++ {time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}
func main() {go say("world")
say("hello")
}
Java 实现:
public final class VirtualThreads {static void say(String s) {
try {for (int i = 0; i < 5; i++) {Thread.sleep(Duration.ofMillis(100));
System.out.println(s);
}
} catch (InterruptedException e) {throw new RuntimeException(e);
}
}
public static void main(String[] args) throws InterruptedException {
var worldThread = Thread.startVirtualThread(() -> say("world")
);
say("hello");
// 期待虚构线程完结
worldThread.join();}
}
能够看到两种语言协程的写法很类似,总体来说 Java 虚构线程的写法略微麻烦一点,Go 应用一个关键字就能不便的创立协程。
Go 管道比照 Java 阻塞队列
在 Go 语言编程中,协程与管道的配合井水不犯河水,应用协程计算数组元素的和(分治思维):
Go 实现:
package main
import "fmt"
func sum(s []int, c chan int) {
sum := 0
for _, v := range s {sum += v}
c <- sum // send sum to c
}
func main() {s := []int{7, 2, 8, -9, 4, 0}
c := make(chan int)
go sum(s[:len(s)/2], c)
go sum(s[len(s)/2:], c)
x, y := <-c, <-c // receive from c
fmt.Println(x, y, x+y)
}
Java 实现:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
public class main4 {static void sum(int[] s, int start, int end, BlockingQueue<Integer> queue) throws InterruptedException {
int sum = 0;
for (int i = start; i < end; i++) {sum += s[i];
}
queue.put(sum);
}
public static void main(String[] args) throws InterruptedException {int[] s = {7, 2, 8, -9, 4, 0};
var queue = new ArrayBlockingQueue<Integer>(1);
Thread.startVirtualThread(() -> {sum(s, 0, s.length / 2, queue);
});
Thread.startVirtualThread(() -> {sum(s, s.length / 2, s.length, queue);
});
int x = queue.take();
int y = queue.take();
System.out.printf("%d %d %d\n", x, y, x + y);
}
}
因为 Java 中没有数组切片,所以应用数组和下标来代替。Java 中没有管道,用与管道类似的 BlockingQueue 来代替,能够实现性能。
协程实现原理比照
GO G-M-P 模型
Go 语言采纳两级线程模型,协程与零碎内核线程是 M:N 的,这一点与 Java 虚构线程统一。最终 goroutine 还是会交给 OS 线程执行,然而须要一个中介,提供上下文。这就是 G-M-P 模型。
- G: goroutine, 相似过程管制块,保留栈,状态,id,函数等信息。G 只有绑定到 P 才能够被调度。
- M: machine, 零碎线程,绑定无效的 P 之后,进行调度。
- P: 逻辑处理器,保留各种队列 G。对于 G 而言,P 就是 cpu 外围。对于 M 而言,P 就是上下文。
- sched: 调度程序,保留 GRQ(全局运行队列),M 闲暇队列,P 闲暇队列以及 lock 等信息。
队列
Go 调度器有两个不同的运行队列:
- GRQ,全局运行队列,尚未调配给 P 的 G(在 Go1.1 之前只有 GRO 全局运行队列,然而因为全局队列加锁的性能问题加上了 LRQ,以缩小锁期待)。
- LRQ,本地运行队列,每个 P 都有一个 LRQ,用于治理调配给 P 执行的 G。当 LRQ 中没有待执行的 G 时会从 GRQ 中获取。
hand off 机制
当 G 执行阻塞操作时,G-M-P 为了避免阻塞 M,影响 LRQ 中其余 G 的执行,会调度闲暇 M 来执行阻塞 M LRQ 中的其余 G:
- G1 在 M1 上运行,P 的 LRQ 有其余 3 个 G;
- G1 进行同步调用,阻塞 M;
- 调度器将 M1 与 P 拆散,此时 M1 下只运行 G1,没有 P。
- 将 P 与闲暇 M2 绑定,M2 从 LRQ 抉择其余 G 运行。
- G1 完结梗塞操作,移回 LRQ。M1 会被搁置到闲暇队列中备用。
work stealing 机制
G-M-P 为了最大限度开释硬件性能,当 M 闲暇时会应用工作窃取机制执行其余期待执行的 G:
- 有两个 P,P1,P2。
- 如果 P1 的 G 都执行完了,LRQ 为空,P1 就开始工作窃取。
- 第一种状况,P1 从 GRQ 获取 G。
- 第二种状况,P1 从 GRQ 没有获取到 G,则 P1 从 P2 LRQ 中窃取 G。
hand off 机制是避免 M 阻塞,工作窃取是避免 M 闲暇。
Java 虚构线程调度
基于操作系统线程实现的平台线程,JDK 依赖于操作系统中的线程调度程序来进行调度。而对于虚构线程,JDK 有本人的调度器。JDK 的调度器没有间接将虚构线程调配给零碎线程,而是将虚构线程调配给平台线程(这是后面提到的虚构线程的 M:N 调度)。平台线程由操作系统的线程调度系统调度。
JDK 的虚构线程调度器是一个在 FIFO 模式下运行的相似 ForkJoinPool
的线程池。调度器的并行数量取决于调度器虚构线程的平台线程数量。默认状况下是 CPU 可用外围数量,但能够应用零碎属性 jdk.virtualThreadScheduler.parallelism
进行调整。留神,这里的 ForkJoinPool
与ForkJoinPool.commonPool()
不同,ForkJoinPool.commonPool()
用于实现并行流,并在 LIFO 模式下运行。
ForkJoinPool
和 ExecutorService
的工作形式不同,ExecutorService
有一个期待队列来存储它的工作,其中的线程将接管并解决这些工作。而 ForkJoinPool
的每一个线程都有一个期待队列,当一个由线程运行的工作生成另一个工作时,该工作被增加到该线程的期待队列中,当咱们运行Parallel Stream
,一个大工作划分成两个小工作时就会产生这种状况。
为了避免 线程饥饿 问题,当一个线程的期待队列中没有更多的工作时,ForkJoinPool
还实现了另一种模式,称为 工作窃取,也就是说:饥饿线程能够从另一个线程的期待队列中窃取一些工作。这和 Go G-M-P 模型中 work stealing 机制有殊途同归之妙。
虚构线程的执行
通常,当虚构线程执行 I/O 或 JDK 中的其余阻止操作(如 BlockingQueue.take()
时,虚构线程会从平台线程上卸载。当阻塞操作筹备实现时(例如,网络 IO 已收到字节数据),调度程序将虚构线程挂载到平台线程上以复原执行。
JDK 中的绝大多数阻塞操作会将虚构线程从平台线程上卸载,使平台线程可能执行其余工作工作。然而,JDK 中的多数阻塞操作不会卸载虚构线程,因而会阻塞平台线程。因为操作系统级别(例如许多文件系统操作)或 JDK 级别(例如 Object.wait()
)的限度。这些阻塞操作阻塞平台线程时,将通过临时减少平台线程的数量来弥补其余平台线程阻塞的损失。因而,调度器的ForkJoinPool
中的平台线程数量可能会临时超过 CPU 可用外围数量。调度器可用的平台线程的最大数量能够应用零碎属性 jdk.virtualThreadScheduler.maxPoolSize
进行调整。这个阻塞弥补机制与 Go G-M-P 模型中 hand off 机制有殊途同归之妙。
在以下两种状况下,虚构线程会被固定到运行它的平台线程,在阻塞操作期间无奈卸载虚构线程:
- 当在
synchronized
块或办法中执行代码时。 - 当执行
native
办法或 foreign function 时。
虚构线程被固定不会影响程序运行的正确性,但它可能会影响零碎的并发度和吞吐量。如果虚构线程在被固定时执行 I/ O 或 BlockingQueue.take()
等阻塞操作,则负责运行它的平台线程在操作期间会被阻塞。(如果虚构线程没有被固定,那会执行 I/O 等阻塞操作时会从平台线程上卸载)
如何卸载虚构线程
咱们通过 Stream 创立 5 个未启动的虚构线程,这些线程的工作是:打印以后线程,而后休眠 10 毫秒,而后再次打印线程。而后启动这些虚构线程,并调用 jion()
以确保控制台能够看到所有内容:
public static void main(String[] args) throws Exception {var threads = IntStream.range(0, 5).mapToObj(index -> Thread.ofVirtual().unstarted(() -> {System.out.println(Thread.currentThread());
try {Thread.sleep(10);
} catch (InterruptedException e) {throw new RuntimeException(e);
}
System.out.println(Thread.currentThread());
})).toList();
threads.forEach(Thread::start);
for (Thread thread : threads) {thread.join();
}
}
//output
src [main] ~/Downloads/jdk-19.jdk/Contents/Home/bin/java --enable-preview main7
VirtualThread[#23]/[email protected]
VirtualThread[#22]/[email protected]
VirtualThread[#21]/[email protected]
VirtualThread[#25]/[email protected]
VirtualThread[#24]/[email protected]
VirtualThread[#25]/[email protected]
VirtualThread[#24]/[email protected]
VirtualThread[#21]/[email protected]
VirtualThread[#22]/[email protected]
VirtualThread[#23]/[email protected]
由控制台输入,咱们能够发现,VirtualThread[#21] 首先运行在 ForkJoinPool 的线程 1 上,当它从 sleep 中返回时,持续在线程 4 上运行。
sleep 之后为什么虚构线程从一个平台线程跳转到另一个平台线程?
咱们浏览一下 sleep 办法的源码,会发现在 Java19 中 sleep 办法被重写了,重写后的办法里还减少了虚构线程相干的判断:
public static void sleep(long millis) throws InterruptedException {if (millis < 0) {throw new IllegalArgumentException("timeout value is negative");
}
if (currentThread() instanceof VirtualThread vthread) {long nanos = MILLISECONDS.toNanos(millis);
vthread.sleepNanos(nanos);
return;
}
if (ThreadSleepEvent.isTurnedOn()) {ThreadSleepEvent event = new ThreadSleepEvent();
try {event.time = MILLISECONDS.toNanos(millis);
event.begin();
sleep0(millis);
} finally {event.commit();
}
} else {sleep0(millis);
}
}
深追代码发现,虚构线程 sleep 时真正调用的办法是 Continuation.yield
:
@ChangesCurrentThread
private boolean yieldContinuation() {
boolean notifyJvmti = notifyJvmtiEvents;
// unmount
if (notifyJvmti) notifyJvmtiUnmountBegin(false);
unmount();
try {return Continuation.yield(VTHREAD_SCOPE);
} finally {
// re-mount
mount();
if (notifyJvmti) notifyJvmtiMountEnd(false);
}
}
也就是说 Continuation.yield
会将以后虚构线程的堆栈由平台线程的堆栈转移到 Java 堆内存,而后将其余就绪虚构线程的堆栈由 Java 堆中拷贝到以后平台线程的堆栈中继续执行。执行 IO 或 BlockingQueue.take()
等阻塞操作时会跟 sleep 一样导致虚构线程切换。虚构线程的切换也是一个绝对耗时的操作,然而与平台线程的上下文切换相比,还是轻量很多的。
其余
虚构线程与异步编程
响应式编程解决了平台线程须要阻塞期待其余零碎响应的问题。应用异步 API 不会阻塞期待响应,而是通过回调告诉后果。当响应达到时,JVM 将从线程池中调配另一个线程来解决响应。这样,解决单个异步申请会波及多个线程。
在异步编程中,咱们能够升高零碎的响应提早,但因为硬件限度,平台线程的数量依然无限,因而咱们的 零碎吞吐量仍有瓶颈 。另一个问题是, 异步程序在不同的线程中执行,很难调试或剖析它们。
虚构线程通过较小的语法调整来进步代码品质(升高编码、调试、剖析代码的难度),同时具备响应式编程的长处,能大幅提高零碎吞吐量。
不要池化虚构线程
因为虚构线程十分轻量,每个虚构线程都打算在其生命周期内只运行单个工作,所以没有池化虚构线程的必要。
虚构线程下的 ThreadLocal
public class main {private static ThreadLocal<String> stringThreadLocal = new ThreadLocal<>();
public static void getThreadLocal(String val) {stringThreadLocal.set(val);
System.out.println(stringThreadLocal.get());
}
public static void main(String[] args) throws InterruptedException {Thread testVT1 = Thread.ofVirtual().name("testVT1").unstarted(() ->main5.getThreadLocal("testVT1 local var"));
Thread testVT2 = Thread.ofVirtual().name("testVT2").unstarted(() ->main5.getThreadLocal("testVT2 local var"));
testVT1.start();
testVT2.start();
System.out.println(stringThreadLocal.get());
stringThreadLocal.set("main local var");
System.out.println(stringThreadLocal.get());
testVT1.join();
testVT2.join();}
}
//output
null
main local var
testVT1 local var
testVT2 local var
虚构线程反对 ThreadLocal 的形式与平台线程雷同,平台线程不能获取到虚构线程设置的变量,虚构线程也不能获取到平台线程设置的变量,对虚构线程而言,负责运行虚构线程的平台线程是通明的。然而因为虚构线程能够创立数百万个,在虚构线程中应用 ThreadLocal 请三思而后行。如果咱们在应用程序中创立一百万个虚构线程,那么将会有一百万个 ThreadLocal 实例以及它们援用的数据。大量的对象可能会给内存带来较大的累赘。
应用 ReentrantLock 替换 Synchronized
因为 Synchronized 会使虚构线程被固定在平台线程上,导致阻塞操作不会卸载虚构线程,影响程序的吞吐量,所以须要应用 ReentrantLock 替换 Synchronized:
befor:
public synchronized void m() {
try {// ... access resource} finally {//}
}
after:
private final ReentrantLock lock = new ReentrantLock();
public void m() {lock.lock(); // block until condition holds
try {// ... access resource} finally {lock.unlock();
}
}
如何迁徙
- 间接替换线程池为虚构线程池。如果你的我的项目应用了
CompletableFuture
你也能够间接替换执行异步工作的线程池为Executors.newVirtualThreadPerTaskExecutor()
。 - 勾销池化机制。虚构线程十分轻量级,无需池化。
synchronized
改为ReentrantLock
,以缩小虚构线程被固定到平台线程。
总结
本文形容了 Java 线程模型、Java 虚构线程的应用、原理以及实用场景,也与风靡的 Go 协程做了比拟,也能找到两种实现上的相似之处,心愿能帮忙你了解 Java 虚构线程。Java19 虚构线程是预览个性,很有可能在 Java21 成为正式个性,将来可期。笔者程度无限,如有写的不好的中央还请大家批评指正。
参考
https://openjdk.org/jeps/425
https://howtodoinjava.com/jav…
https://mccue.dev/pages/5-2-2…
公众号:DailyHappy 一位后端写码师,一位光明操持制造者。