关于响应式:Reactor中的Thread和Scheduler

41次阅读

共计 3672 个字符,预计需要花费 10 分钟才能阅读完成。

简介

明天咱们要介绍的是 Reactor 中的多线程模型和定时器模型,Reactor 之前咱们曾经介绍过了,它实际上是观察者模式的延长。

所以从实质上来说,Reactor 是和多线程无关的。你能够把它用在多线程或者不必在多线程。

明天将会给大家介绍一下如何在 Reactor 中应用多线程和定时器模型。

Thread 多线程

先看一下之前举的 Flux 的创立的例子:

        Flux<String> flux = Flux.generate(() -> 0,
                (state, sink) -> {sink.next("3 x" + state + "=" + 3*state);
                    if (state == 10) sink.complete();
                    return state + 1;
                });

        flux.subscribe(System.out::println);

能够看到,不论是 Flux generator 还是 subscriber,他们实际上都是运行在同一个线程中的。

如果咱们想让 subscribe 产生在一个新的线程中,咱们须要新启动一个线程,而后在线程外部进行 subscribe 操作。

        Mono<String> mono = Mono.just("hello");

        Thread t = new Thread(() -> mono
                .map(msg -> msg + "thread")
                .subscribe(v ->
                        System.out.println(v + Thread.currentThread().getName())
                )
        );
        t.start();
        t.join();

下面的例子中,Mono 在主线程中创立,而 subscribe 产生在新启动的 Thread 中。

Schedule 定时器

很多状况下,咱们的 publisher 是须要定时去调用一些办法,来产生元素的。Reactor 提供了一个新的 Schedule 类来负责定时工作的生成和治理。

Scheduler 是一个接口:

public interface Scheduler extends Disposable 

它定义了一些定时器中必须要实现的办法:

比方立刻执行的:

Disposable schedule(Runnable task);

延时执行的:

default Disposable schedule(Runnable task, long delay, TimeUnit unit)

和定期执行的:

default Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit)

Schedule 有一个工具类叫做 Schedules,它提供了多个创立 Scheduler 的办法,它的实质就是对 ExecutorService 和 ScheduledExecutorService 进行封装,将其做为 Supplier 来创立 Schedule。

简略点看 Schedule 就是对 ExecutorService 的封装。

Schedulers 工具类

Schedulers 工具类提供了很多个有用的工具类,咱们来具体介绍一下:

Schedulers.immediate():

提交的 Runnable 将会立马在以后线程执行。

Schedulers.single():

应用同一个线程来执行所有的工作。

Schedulers.boundedElastic():

创立一个可重用的线程池,如果线程池中的线程在长时间内都没有被应用,那么将会被回收。boundedElastic 会有一个最大的线程个数,一般来说是 CPU cores x 10。如果目前没有可用的 worker 线程,提交的工作将会被放入队列期待。

Schedulers.parallel():

创立固定个数的工作线程,个数和 CPU 的核数相干。

Schedulers.fromExecutorService(ExecutorService):

从一个现有的线程池创立 Scheduler。

Schedulers.newXXX:

Schedulers 提供了很多 new 结尾的办法,来创立各种各样的 Scheduler。

咱们看一个 Schedulers 的具体利用,咱们能够指定特定的 Scheduler 来产生元素:

Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))

publishOn 和 subscribeOn

publishOn 和 subscribeOn 次要用来进行切换 Scheduler 的执行上下文。

先讲一个论断,就是在链式调用中,publishOn 能够切换 Scheduler,然而 subscribeOn 并不会起作用。

这是因为真正的 publish-subscribe 关系只有在 subscriber 开始 subscribe 的时候才建设。

上面咱们来具体看一下这两个办法的应用状况:

publishOn

publishOn 能够在链式调用的过程中,进行 publish 的切换:

    @Test
    public void usePublishOn() throws InterruptedException {Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
        final Flux<String> flux = Flux
                .range(1, 2)
                .map(i -> 10 + i + ":"+ Thread.currentThread())
                .publishOn(s)
                .map(i -> "value" + i+":"+ Thread.currentThread());

        new Thread(() -> flux.subscribe(System.out::println),"ThreadA").start();
        System.out.println(Thread.currentThread());
        Thread.sleep(5000);
    }

下面咱们创立了一个名字为 parallel-scheduler 的 scheduler。

而后创立了一个 Flux,Flux 先做了一个 map 操作,而后切换执行上下文到 parallel-scheduler,最初右执行了一次 map 操作。

最初,咱们采纳一个新的线程来进行 subscribe 的输入。

先看下输入后果:

Thread[main,5,main]
value 11:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]
value 12:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]

能够看到, 主线程的名字是 Thread。Subscriber 线程的名字是 ThreadA。

那么在 publishOn 之前,map 应用的线程就是 ThreadA。而在 publishOn 之后,map 应用的线程就切换到了 parallel-scheduler 线程池。

subscribeOn

subscribeOn 是用来切换 Subscriber 的执行上下文,不论 subscribeOn 呈现在调用链的哪个局部,最终都会利用到整个调用链上。

咱们看一个例子:

    @Test
    public void useSubscribeOn() throws InterruptedException {Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
        final Flux<String> flux = Flux
                .range(1, 2)
                .map(i -> 10 + i + ":" + Thread.currentThread())
                .subscribeOn(s)
                .map(i -> "value" + i + ":"+ Thread.currentThread());

        new Thread(() -> flux.subscribe(System.out::println), "ThreadA").start();
        Thread.sleep(5000);
    }

同样的,下面的例子中,咱们应用了两个 map,而后在两个 map 中应用了一个 subscribeOn 用来切换 subscribe 执行上下文。

看下输入后果:

value 11:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]
value 12:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]

能够看到,不论哪个 map,都是用的是切换过的 parallel-scheduler。

本文的例子 learn-reactive

本文作者:flydean 程序那些事

本文链接:http://www.flydean.com/reactor-thread-scheduler/

本文起源:flydean 的博客

欢送关注我的公众号:「程序那些事」最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!

正文完
 0