关于java:Reactor响应式编程你只差这个

42次阅读

共计 15976 个字符,预计需要花费 40 分钟才能阅读完成。

哈哈哈哈哈,题目有点猖獗。然而既然你都来了,那就看看吧,毕竟响应式编程随着高并发对于性能的吃紧,越来越重要了。

哦对了,这是一篇 Java 文章。

废话不多说,间接步入正题。

响应式编程外围组件

步入正题之前,我心愿你对发布者 / 订阅者模型有一些理解。

间接看图:

Talk is cheap, show you the code!

public class Main {public static void main(String[] args) {Flux<Integer> flux = Flux.range(0, 10);
        flux.subscribe(i -> {System.out.println("run1:" + i);
        });
        flux.subscribe(i -> {System.out.println("run2:" + i);
        });
    }
}

输入:

run1: 0
run1: 1
run1: 2
run1: 3
run1: 4
run1: 5
run1: 6
run1: 7
run1: 8
run1: 9
run2: 0
run2: 1
run2: 2
run2: 3
run2: 4
run2: 5
run2: 6
run2: 7
run2: 8
run2: 9

Process finished with exit code 0

Flux

Flux 是一个多元素的生产者,话中有话,它能够生产多个元素,组成元素序列,供订阅者应用。

Mono

Mono 和 Flux 的区别在于,它只能生产一个元素供生产者订阅,也就是数量的不同。

Mono 的一个常见的利用就是 Mono<ServerResponse\> 作为 WebFlux 的返回值。毕竟每次申请只有一个 Response 对象,所以 Mono 刚刚好。

疾速创立一个 Flux/Mono 并订阅它

来看一些官网文档演示的办法。

Flux<String> seq1 = Flux.just("foo", "bar", "foobar");

List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);

Mono<String> noData = Mono.empty();

Mono<String> data = Mono.just("foo");

Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);

subscribe() 办法 (Lambda 模式)

  • subscribe() 办法默认承受一个 Lambda 表达式作为订阅者来应用。它有四个变种模式。
  • 在这里阐明一下 subscribe() 第四个参数,指出了当订阅信号达到,首次申请的个数,如果是 null 则全副申请 (Long.MAX_VALUE)
public class FluxIntegerWithSubscribe {public static void main(String[] args) {Flux<Integer> integerFlux = Flux.range(0, 10);
        integerFlux.subscribe(i -> {System.out.println("run");
            System.out.println(i);
        }, error -> {System.out.println("error");
        }, () -> {System.out.println("done");
        }, p -> {p.request(2);
        });
    }
}

如果去掉首次申请,那么会申请最大值:

public class FluxIntegerWithSubscribe {public static void main(String[] args) {Flux<Integer> integerFlux = Flux.range(0, 10);
        // 在这里阐明一下 subscribe() 第四个参数,指出了当订阅信号达到,首次申请的个数,如果是 null 则全副申请 (Long.MAX_VALUE)
        // 其余 subscribe() 详见源码或文档:https://projectreactor.io/docs/core/release/reference/#flux
        integerFlux.subscribe(i -> {System.out.println("run");
            System.out.println(i);
        }, error -> {System.out.println("error");
        }, () -> {System.out.println("done");
        });
    }
}

输入:

run
0
run
1
run
2
run
3
run
4
run
5
run
6
run
7
run
8
run
9
done

Process finished with exit code 0

继承 BaseSubscriber(非 Lambda 模式)

  • 这种形式更多像是对于 Lambda 表达式的一种替换表白。
  • 对于基于此办法的订阅,有一些注意事项,比方首次订阅时,要至多申请一次。否则会导致程序无奈持续取得新的元素。
public class FluxWithBaseSubscriber {public static void main(String[] args) {Flux<Integer> integerFlux = Flux.range(0, 10);
        integerFlux.subscribe(new MySubscriber());
    }

    /**
     * 一般来说,通过继承 BaseSubscriber<T> 来实现,而且个别自定义 hookOnSubscribe() 和 hookOnNext() 办法
     */
    private static class MySubscriber extends BaseSubscriber<Integer> {

        /**
         * 首次订阅时被调用
         */
        @Override
        protected void hookOnSubscribe(Subscription subscription) {System.out.println("开始啦!");
            // 记得至多申请一次,否则不会执行 hookOnNext() 办法
            request(1);
        }

        /**
         * 每次读取新值调用
         */
        @Override
        protected void hookOnNext(Integer value) {System.out.println("开始读取...");
            System.out.println(value);
            // 指出下一次读取多少个
            request(2);
        }

        @Override
        protected void hookOnComplete() {System.out.println("完结啦");
        }
    }
}

输入:

 开始啦!开始读取...
0
开始读取...
1
开始读取...
2
开始读取...
3
开始读取...
4
开始读取...
5
开始读取...
6
开始读取...
7
开始读取...
8
开始读取...
9
完结啦

Process finished with exit code 0

终止订阅:Disposable

  • Disposable 是一个订阅时返回的接口,外面蕴含很多能够操作订阅的办法。
  • 比方勾销订阅。

在这里应用多线程模仿生产者生产的很快,而后立马勾销订阅(尽管立即勾销然而因为生产者切实太快了,所以订阅者还是接管到了一些元素)。

其余的办法,比方 Disposables.composite() 会失去一个 Disposable 的汇合,调用它的 dispose() 办法会把汇合里的所有 Disposable 的 dispose() 办法都调用。

public class FluxWithDisposable {public static void main(String[] args) {Disposable disposable = getDis();
        // 每次打印数量个别不同,因为调用了 disposable 的 dispose() 办法进行了勾销,不过如果生产者产地太快了,那么可能来不及终止。disposable.dispose();}

    private static Disposable getDis() {
        class Add implements Runnable {

            private final FluxSink<Integer> fluxSink;

            public Add(FluxSink<Integer> fluxSink) {this.fluxSink = fluxSink;}

            @Override
            public synchronized void run() {fluxSink.next(new Random().nextInt());
            }
        }
        Flux<Integer> integerFlux = Flux.create(integerFluxSink -> {Add add = new Add(integerFluxSink);
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();
            new Thread(add).start();});
        return integerFlux.subscribe(System.out::println);
    }
}

输入:

 这里的输入每次调用可能都会不同,因为订阅之后勾销了,所以能打印多少取决于那一瞬间 CPU 的速度。

调整发布者公布速率

  • 为了缓解订阅者压力,订阅者能够通过负压流回溯进行重塑发布者公布的速率。最典型的用法就是上面这个——通过继承 BaseSubscriber 来设置本人的申请速率。然而有一点必须明确,就是 hookOnSubscribe() 办法必须至多申请一次,不然你的发布者可能会“卡住”。
public class FluxWithLimitRate1 {public static void main(String[] args) {Flux<Integer> integerFlux = Flux.range(0, 100);
        integerFlux.subscribe(new MySubscriber());
    }

    private static class MySubscriber extends BaseSubscriber<Integer> {

        @Override
        protected void hookOnSubscribe(Subscription subscription) {System.out.println("开始啦!");
            // 记得至多申请一次,否则不会执行 hookOnNext() 办法
            request(1);
        }

        @Override
        protected void hookOnNext(Integer value) {System.out.println("开始读取...");
            System.out.println(value);
            // 指出下一次读取多少个
            request(2);
        }

        @Override
        protected void hookOnComplete() {System.out.println("完结啦!");
        }
    }
}
  • 或者应用 limitRate() 实例办法进行限度,它返回一个被限度了速率的 Flux 或 Mono。某些上流的操作能够更改上流订阅者的申请速率,有一些操作有一个 prefetch 整型作为输出,能够获取大于上流订阅者申请的数量的序列元素,这样做是为了解决它们本人的外部序列。这些预获取的操作方法个别默认预获取 32 个,不过为了优化;每次曾经获取了预获取数量的 75% 的时候,会再获取 75%。这叫“补充优化”。
public class FluxWithLimitRate2 {public static void main(String[] args) {Flux<Integer> integerFlux = Flux.range(0, 100);
        // 最初,来看一些 Flux 提供的预获取办法:// 指出预取数量
        integerFlux.limitRate(10);
        // lowTide 指出预获取操作的补充优化的值,即批改 75% 的默认值;highTide 指出预获取数量。integerFlux.limitRate(10, 15);
        // 哎~最典型的就是,申请有数:request(Long.MAX_VALUE) 然而我给你 limitRate(2);那你也只能乖乖每次失去两个哈哈哈哈!// 还有一个就是 limitRequest(N),它会把上流总申请限度为 N。如果上流申请超过了 N,那么只返回 N 个,否则返回理论数量。而后认为申请实现,向上流发送 onComplete 信号。integerFlux.limitRequest(5).subscribe(new MySubscriber());
        // 下面这个只会输入 5 个。}
}

程序化地创立一个序列

动态同步办法:generate()

当初到了程序化生成 Flux/Mono 的时候。首先介绍 generate() 办法,这是一个同步的办法。话中有话就是,它是线程不平安的,且它的接收器只能一次一个的承受输出来生成 Flux/Mono。也就是说,它在任意时刻只能被调用一次且只承受一个输出。

或者这么说,它生成的元素序列的程序,取决于代码编写的形式。

public class FluxWithGenerate {public static void main(String[] args) {// 上面这个是它的变种办法之一:第一个参数是提供初始状态的,第二个参数是一个向接收器写入数据的生成器,入参为 state( 个别为整数,用来记录状态),和接收器。// 其余变种请看源码
        Flux.generate(() -> 0, (state, sink) -> {sink.next(state+"asdf");
            // 加上对于 sink.complete() 的调用即可终止生成;否则就是有限序列。return state+1;
        }).subscribe(System.out::println);
        // generate 办法的第三个参数用于完结生成时被调用,耗费 state。Flux.generate(AtomicInteger::new, (state, sink) -> {sink.next(state.getAndIncrement()+"qwer");
            return state;
        }).subscribe(System.out::println);
        // generate() 的工作流看起来就像:next()->next()->next()->...
    }
}
  • 通过上述代码不难看到,每次的接收器承受的值来自于上一次生成办法的返回值,也就是 state= 上一个迭代的返回值 (其实称为上一个流才精确,这么说只是为了不便了解)。
  • 不过这个 state 每次都是一个全新的 (每次都 + 1 当然是新的),那么有没有什么办法能够做到前后两次迭代的 state 是同一个援用且还能够更新值呢?答案就是原子类型。也就是下面的第二种形式。

动态异步多线程办法:create()

说完了同步生成,接下来就是异步生成,还是多线程的!让咱们有请:create() 闪亮退场!!!

  • create() 办法对外暴露出一个 FluxSink 对象,通过它咱们能够拜访并生成须要的序列。除此之外,它还能够触发回调中的多线程事件。
  • create 另一个性就是很容易把其余的接口与响应式桥接起来。留神,它是异步多线程并不意味着 create 能够并行化你写的代码或者异步执行;怎么了解呢?就是,create 办法外面的 Lambda 表达式代码还是单线程阻塞的。如果你在创立序列的中央阻塞了代码,那么可能造成订阅者即便申请了数据,也得不到,因为序列被阻塞了,没法生成新的。
  • 其实通过下面的景象能够猜想,默认状况下订阅者应用的线程和 create 应用的是一个线程,当然阻塞 create 就会导致订阅者没法运行咯!
  • 上述问题能够通过 Scheduler 解决,前面会提到。
public class FluxWithCreate {public static void main(String[] args) throws InterruptedException {TestProcessor<String> testProcessor = new TestProcessor<>() {

            private TestListener<String> testListener;

            @Override
            public void register(TestListener<String> stringTestListener) {this.testListener = stringTestListener;}

            @Override
            public TestListener<String> get() {return testListener;}
        };
        Flux<String> flux = Flux.create(stringFluxSink -> testProcessor.register(new TestListener<String>() {
            @Override
            public void onChunk(List<String> chunk) {for (String s : chunk) {stringFluxSink.next(s);
                }
            }

            @Override
            public void onComplete() {stringFluxSink.complete();
            }
        }));
        flux.subscribe(System.out::println);
        System.out.println("当初是 2020/10/22 22:58;我好困");
        TestListener<String> testListener = testProcessor.get();
        Runnable1<String> runnable1 = new Runnable1<>() {

            private TestListener<String> testListener;

            @Override
            public void set(TestListener<String> testListener) {this.testListener = testListener;}

            @Override
            public void run() {List<String> list = new ArrayList<>(10);
                for (int i = 0; i < 10; ++ i) {list.add(i+"-run1");
                }
                testListener.onChunk(list);
            }
        };
        Runnable1<String> runnable2 = new Runnable1<>() {

            private TestListener<String> testListener;

            @Override
            public void set(TestListener<String> testListener) {this.testListener = testListener;}

            @Override
            public void run() {List<String> list = new ArrayList<>(10);
                for (int i = 0; i < 10; ++ i) {list.add(i+"-run2");
                }
                testListener.onChunk(list);
            }
        };
        Runnable1<String> runnable3 = new Runnable1<>() {

            private TestListener<String> testListener;

            @Override
            public void set(TestListener<String> testListener) {this.testListener = testListener;}

            @Override
            public void run() {List<String> list = new ArrayList<>(10);
                for (int i = 0; i < 10; ++ i) {list.add(i+"-run3");
                }
                testListener.onChunk(list);
            }
        };
        runnable1.set(testListener);
        runnable2.set(testListener);
        runnable3.set(testListener);
        // create 所谓的 "异步","多线程" 指的是在多线程中调用 sink.next() 办法。这一点在上面的 push 比照中能够看到
        new Thread(runnable1).start();
        new Thread(runnable2).start();
        new Thread(runnable3).start();
        Thread.sleep(1000);
        testListener.onComplete();
        // 另一方面,create 的另一个变体能够设置参数来实现负压管制,具体看源码。}
    public interface TestListener<T> {void onChunk(List<T> chunk);

        void onComplete();}

    public interface TestProcessor<T> {void register(TestListener<T> tTestListener);

        TestListener<T> get();}

    public interface Runnable1<T> extends Runnable {void set(TestListener<T> testListener);
    }
}

动态异步单线程办法:push()

说完了异步多线程,同步的生成办法,接下来就是异步单线程:push()。

其实说到 push 和 create 的比照,我集体了解如下:

  • reate 容许多线程环境下调用.next() 办法,只管生成元素,元素序列的程序取决于 … 算了,随机的,毕竟多线程;
  • 然而 push 只容许一个线程生产元素,所以是有序的,至于异步指的是在新的线程中也能够,而不用非得在以后线程。
  • 顺带一提,push 和 create 都反对 onCancel() 和 onDispose() 操作。一般来说,onCancel 只响应于 cancel 操作,而 onDispose 响应于 error,cancel,complete 等操作。
public class FluxWithPush {public static void main(String[] args) throws InterruptedException {TestProcessor<String> testProcessor = new TestProcessor<>() {

            private TestListener<String> testListener;

            @Override
            public void register(TestListener<String> testListener) {this.testListener = testListener;}

            @Override
            public TestListener<String> get() {return this.testListener;}
        };
        Flux<String> flux = Flux.push(stringFluxSink -> testProcessor.register(new TestListener<>() {
            @Override
            public void onChunk(List<String> list) {for (String s : list) {stringFluxSink.next(s);
                }
            }

            @Override
            public void onComplete() {stringFluxSink.complete();
            }
        }));
        flux.subscribe(System.out::println);
        Runnable1<String> runnable = new Runnable1<>() {

            private TestListener<String> testListener;

            @Override
            public void set(TestListener<String> testListener) {this.testListener = testListener;}

            @Override
            public void run() {List<String> list = new ArrayList<>(10);
                for (int i = 0; i < 10; ++i) {list.add(UUID.randomUUID().toString());
                }
                testListener.onChunk(list);
            }
        };
        TestListener<String> testListener = testProcessor.get();
        runnable.set(testListener);
        new Thread(runnable).start();
        Thread.sleep(15);
        testListener.onComplete();}

    public interface TestListener<T> {void onChunk(List<T> list);
        void onComplete();}

    public interface TestProcessor<T> {void register(TestListener<T> testListener);
        TestListener<T> get();}

    public interface Runnable1<T> extends Runnable {void set(TestListener<T> testListener);
    }
}

同 create 一样,push 也反对负压调节。然而我没写进去,我试过的 Demo 都是间接申请 Long.MAX_VALUE,其实就是通过 sink.onRequest(LongConsumer) 办法调用来实现负压管制的。原理在这,想深究的请自行摸索,鄙人不才,破费一下午没实现。

实例办法:handle()

在 Flux 的实例办法里,handle 相似 filter 和 map 的操作。

public class FluxWithHandle {public static void main(String[] args) {
        Flux<String> stringFlux = Flux.push(stringFluxSink -> {for (int i = 0; i < 10; ++ i) {stringFluxSink.next(UUID.randomUUID().toString().substring(0, 5));
            }
        });
        // 获取所有蕴含 'a' 的串
        Flux<String> flux = stringFlux.handle((str, sink) -> {String s = f(str);
            if (s != null) {sink.next(s);
            }
        });
        flux.subscribe(System.out::println);
    }

    private static String f(String str) {return str.contains("a") ? str : null;
    }
}

线程和调度

Schedulers 的那些静态方法

一般来说,响应式框架都不反对并发,P.s. create 那个是生产者并发,它自身不是并发的。所以也没有可用的并发库,须要开发者本人实现。

同时,每一个操作个别都是在上一个操作所在的线程里运行,它们不会领有本人的线程,而最顶的操作则是和 subscribe() 在同一个线程。比方 Flux.create(…).handle(…).subscribe(…) 都在主线程运行的。

在响应式框架里,Scheduler 决定了操作在哪个线程被怎么执行,它的作用相似于 ExecutorService。不过性能略微多点。如果你想实现一些并发操作,那么能够思考应用 Schedulers 提供的静态方法,来看看有哪些可用的:

Schedulers.immediate(): 间接在以后线程提交 Runnable 工作,并立刻执行。

package com.learn.reactor.flux;

import reactor.core.scheduler.Schedulers;

/**
 * @author Mr.M
 */
public class FluxWithSchedulers {public static void main(String[] args) throws InterruptedException {// Schedulers.immediate(): 间接在以后线程提交 Runnable 工作,并立刻执行。System.out.println("以后线程:" + Thread.currentThread().getName());
        System.out.println("zxcv");
        Schedulers.immediate().schedule(() -> {System.out.println("以后线程是:" + Thread.currentThread().getName());
            System.out.println("qwer");
        });
        System.out.println("asdf");
        // 确保异步工作能够打印进去
        Thread.sleep(1000);
    }
}

通过下面看得出,immediate() 其实就是在执行地位插入须要执行的 Runnable 来实现的。和间接把代码写在这里没什么区别。

Schedulers.newSingle():保障每次执行的操作都应用的是一个新的线程。

package com.learn.reactor.flux;

import reactor.core.scheduler.Schedulers;

/**
 * @author Mr.M
 */
public class FluxWithSchedulers {public static void main(String[] args) throws InterruptedException {// 如果你想让每次调用都是一个新的线程的话,能够应用 Schedulers.newSingle(),它能够保障每次执行的操作都应用的是一个新的线程。Schedulers.single().schedule(() -> {System.out.println("以后线程是:" + Thread.currentThread().getName());
            System.out.println("bnmp");
        });
        Schedulers.single().schedule(() -> {System.out.println("以后线程是:" + Thread.currentThread().getName());
            System.out.println("ghjk");
        });
        Schedulers.newSingle("线程 1").schedule(() -> {System.out.println("以后线程是:" + Thread.currentThread().getName());
            System.out.println("1234");
        });
        Schedulers.newSingle("线程 1").schedule(() -> {System.out.println("以后线程是:" + Thread.currentThread().getName());
            System.out.println("5678");
        });
        Schedulers.newSingle("线程 2").schedule(() -> {System.out.println("以后线程是:" + Thread.currentThread().getName());
            System.out.println("0100");
        });
        Thread.sleep(1000);
    }
}

Schedulers.single(),它的作用是为以后操作开拓一个新的线程,然而记住,所有应用这个办法的操作都共用一个线程;

Schedulers.elastic():一个弹性无界限程池。

无界个别意味着不可治理,因为它可能会导致负压问题和过多的线程被创立。所以马上就要提到它的代替办法。

Schedulers.bounededElastic():有界可复用线程池

package com.learn.reactor.flux;

import reactor.core.scheduler.Schedulers;

/**
 * @author Mr.M
 */
public class FluxWithSchedulers {public static void main(String[] args) throws InterruptedException {Schedulers.boundedElastic().schedule(() -> {System.out.println("以后线程是:" + Thread.currentThread().getName());
            System.out.println("1478");
        });
        Schedulers.boundedElastic().schedule(() -> {System.out.println("以后线程是:" + Thread.currentThread().getName());
            System.out.println("2589");
        });
        Schedulers.boundedElastic().schedule(() -> {System.out.println("以后线程是:" + Thread.currentThread().getName());
            System.out.println("0363");
        });
        Thread.sleep(1000);
    }
}

Schedulers.boundedElastic() 是一个更好的抉择,因为它能够在须要的时候创立工作线程池,并复用闲暇的池;同时,某些池如果闲暇工夫超过一个限定的数值就会被摈弃。

同时,它还有一个容量限度,个别 10 倍于 CPU 外围数,这是它后备线程池的最大容量。最多提交 10 万条工作,而后会被装进工作队列,等到有可用时再调度,如果是延时调度,那么延时开始工夫是在有线程可用时才开始计算。

由此可见 Schedulers.boundedElastic() 对于阻塞的 I / O 操作是一个不错的抉择,因为它能够让每一个操作都有本人的线程。然而记得,太多的线程会让零碎备受压力。

Schedulers.parallel():提供了零碎级并行的能力

package com.learn.reactor.flux;

import reactor.core.scheduler.Schedulers;

/**
 * @author Mr.M
 */
public class FluxWithSchedulers {public static void main(String[] args) throws InterruptedException {Schedulers.parallel().schedule(() -> {System.out.println("以后线程是:" + Thread.currentThread().getName());
            System.out.println("6541");
        });
        Schedulers.parallel().schedule(() -> {System.out.println("以后线程是:" + Thread.currentThread().getName());
            System.out.println("9874");
        });
        Thread.sleep(1000);
    }
}

最初,Schedulers.parallel() 提供了并行的能力,它会创立数量等于 CPU 外围数的线程来实现这一性能。

其余线程操作

顺带一提,还能够通过 ExecutorService 创立新的 Scheduler。当然,Schedulers 的一堆 newXXX 办法也能够。

有一点很重要,就是 boundedElastic() 办法能够实用于传统阻塞式代码,然而 single() 和 parallel() 都不行,如果你非要这么做那就会抛异样。自定义 Schedulers 能够通过设置 ThreadFactory 属性来设置接管的线程是否是被 NonBlocking 接口润饰的 Thread 实例。

Flux 的某些办法会应用默认的 Scheduler,比方 Flux.interval() 办法就默认应用 Schedulers.parallel() 办法,当然能够通过设置 Scheduler 来更改这种默认。

在响应式链中,有两种形式能够切换执行上下文,别离是 publishOn() 和 subscribeOn() 办法,前者在流式链中的地位很重要。在 Reactor 中,能够以任意模式增加任意数量的订阅者来满足你的需要,然而,只有在设置了订阅办法后,能力激活这条订阅链上的全副对象。只有这样,申请才会上溯到发布者,进而产生源序列。

在订阅链中切换执行上下文

publishOn()

publishOn() 就和一般操作一样,增加在操作链的两头,它会影响在它上面的所有操作的执行上下文。看个例子:

public class FluxWithPublishOnSubscribeOn {public static void main(String[] args) throws InterruptedException {
        // 创立一个并行线程
        Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
        final Flux<String> flux = Flux
                .range(1, 2)
                // map 必定是跑在 T 上的。.map(i -> 10 + i)
                // 此时的执行上下文被切换到了并行线程
                .publishOn(s)
                // 这个 map 还是跑在并行线程上的,因为 publishOn() 的前面的操作都被切换到了另一个执行上下文中。.map(i -> "value" + i);
        // 假如这个 new 进去的线程名为 T
        new Thread(() -> flux.subscribe(System.out::println));
        Thread.sleep(1000);
    }
}

subscribeOn()

public class FluxWithPublishOnSubscribeOn {public static void main(String[] args) throws InterruptedException {
        // 仍旧是创立一个并行线程
        Scheduler ss = Schedulers.newParallel("parallel-scheduler", 4);
        final Flux<String> fluxflux = Flux
                .range(1, 2)
                // 不过这里的 map 就曾经在 ss 里跑了
                .map(i -> 10 + i)
                // 这里切换,然而切换的是整个链
                .subscribeOn(s)
                // 这里的 map 也运行在 ss 上
                .map(i -> "value" + i);
        // 这是一个匿名线程 TT
        new Thread(() -> fluxflux.subscribe(System.out::println));
        Thread.sleep(1000);
    }
}

subscribeOn() 办法会把订阅之后的整个订阅链都切换到新的执行上下文中。无论在 subscribeOn() 哪里,都能够把最后面的订阅之后的订阅序列进行切换,当然了,如果前面还有 publishOn(),publishOn() 会进行新的切换。

正文完
 0