简介
明天咱们要介绍的是Reactor中的多线程模型和定时器模型,Reactor之前咱们曾经介绍过了,它实际上是观察者模式的延长。
所以从实质上来说,Reactor是和多线程无关的。你能够把它用在多线程或者不必在多线程。
明天将会给大家介绍一下如何在Reactor中应用多线程和定时器模型。
Thread多线程
先看一下之前举的Flux的创立的例子:
Flux<String> flux = Flux.generate(
() -> 0,
(state, sink) -> {
sink.next("3 x " + state + " = " + 3*state);
if (state == 10) sink.complete();
return state + 1;
});
flux.subscribe(System.out::println);
能够看到,不论是Flux generator还是subscriber,他们实际上都是运行在同一个线程中的。
如果咱们想让subscribe产生在一个新的线程中,咱们须要新启动一个线程,而后在线程外部进行subscribe操作。
Mono<String> mono = Mono.just("hello ");
Thread t = new Thread(() -> mono
.map(msg -> msg + "thread ")
.subscribe(v ->
System.out.println(v + Thread.currentThread().getName())
)
);
t.start();
t.join();
下面的例子中,Mono在主线程中创立,而subscribe产生在新启动的Thread中。
Schedule定时器
很多状况下,咱们的publisher是须要定时去调用一些办法,来产生元素的。Reactor提供了一个新的Schedule类来负责定时工作的生成和治理。
Scheduler是一个接口:
public interface Scheduler extends Disposable
它定义了一些定时器中必须要实现的办法:
比方立刻执行的:
Disposable schedule(Runnable task);
延时执行的:
default Disposable schedule(Runnable task, long delay, TimeUnit unit)
和定期执行的:
default Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit)
Schedule有一个工具类叫做Schedules,它提供了多个创立Scheduler的办法,它的实质就是对ExecutorService和ScheduledExecutorService进行封装,将其做为Supplier来创立Schedule。
简略点看Schedule就是对ExecutorService的封装。
Schedulers工具类
Schedulers工具类提供了很多个有用的工具类,咱们来具体介绍一下:
Schedulers.immediate():
提交的Runnable将会立马在以后线程执行。
Schedulers.single():
应用同一个线程来执行所有的工作。
Schedulers.boundedElastic():
创立一个可重用的线程池,如果线程池中的线程在长时间内都没有被应用,那么将会被回收。boundedElastic会有一个最大的线程个数,一般来说是CPU cores x 10。 如果目前没有可用的worker线程,提交的工作将会被放入队列期待。
Schedulers.parallel():
创立固定个数的工作线程,个数和CPU的核数相干。
Schedulers.fromExecutorService(ExecutorService):
从一个现有的线程池创立Scheduler。
Schedulers.newXXX:
Schedulers提供了很多new结尾的办法,来创立各种各样的Scheduler。
咱们看一个Schedulers的具体利用,咱们能够指定特定的Scheduler来产生元素:
Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))
publishOn 和 subscribeOn
publishOn和subscribeOn次要用来进行切换Scheduler的执行上下文。
先讲一个论断,就是在链式调用中,publishOn能够切换Scheduler,然而subscribeOn并不会起作用。
这是因为真正的publish-subscribe关系只有在subscriber开始subscribe的时候才建设。
上面咱们来具体看一下这两个办法的应用状况:
publishOn
publishOn能够在链式调用的过程中,进行publish的切换:
@Test
public void usePublishOn() throws InterruptedException {
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i + ":"+ Thread.currentThread())
.publishOn(s)
.map(i -> "value " + i+":"+ Thread.currentThread());
new Thread(() -> flux.subscribe(System.out::println),"ThreadA").start();
System.out.println(Thread.currentThread());
Thread.sleep(5000);
}
下面咱们创立了一个名字为parallel-scheduler的scheduler。
而后创立了一个Flux,Flux先做了一个map操作,而后切换执行上下文到parallel-scheduler,最初右执行了一次map操作。
最初,咱们采纳一个新的线程来进行subscribe的输入。
先看下输入后果:
Thread[main,5,main]
value 11:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]
value 12:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]
能够看到,主线程的名字是Thread。Subscriber线程的名字是ThreadA。
那么在publishOn之前,map应用的线程就是ThreadA。 而在publishOn之后,map应用的线程就切换到了parallel-scheduler线程池。
subscribeOn
subscribeOn是用来切换Subscriber的执行上下文,不论subscribeOn呈现在调用链的哪个局部,最终都会利用到整个调用链上。
咱们看一个例子:
@Test
public void useSubscribeOn() throws InterruptedException {
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i + ":" + Thread.currentThread())
.subscribeOn(s)
.map(i -> "value " + i + ":"+ Thread.currentThread());
new Thread(() -> flux.subscribe(System.out::println), "ThreadA").start();
Thread.sleep(5000);
}
同样的,下面的例子中,咱们应用了两个map,而后在两个map中应用了一个subscribeOn用来切换subscribe执行上下文。
看下输入后果:
value 11:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]
value 12:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]
能够看到,不论哪个map,都是用的是切换过的parallel-scheduler。
本文的例子learn-reactive
本文作者:flydean程序那些事
本文链接:http://www.flydean.com/reactor-thread-scheduler/
本文起源:flydean的博客
欢送关注我的公众号:「程序那些事」最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!
发表回复