咱们应用 subscribeOn 和 publishOn 操作符在响应链中切换执行上下文 (Reactor 中叫 Scheduler)。
上一篇文章中,咱们说到 Reactor 默认行为是执行订阅的同一线程将用于整个管道执行。如果要切换执行线程怎么办?能够应用 publishOn 和 SubscribeOn
让咱们看个简略的例子:
class ReactiveJavaTutorial {public static void main(String[] args) {Flux<String> cities = Flux.just("New York", "London", "Paris", "Amsterdam")
.map(String::toUpperCase)
.filter(cityName -> cityName.length() <= 8)
.map(cityName -> cityName.concat("City"))
.log();
cities.subscribe();}
输入:
17:39:41.693 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
17:39:41.712 [main] INFO reactor.Flux.MapFuseable.1 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
17:39:41.714 [main] INFO reactor.Flux.MapFuseable.1 - | request(unbounded)
17:39:41.715 [main] INFO reactor.Flux.MapFuseable.1 - | onNext(NEW YORK City)
17:39:41.715 [main] INFO reactor.Flux.MapFuseable.1 - | onNext(LONDON City)
17:39:41.715 [main] INFO reactor.Flux.MapFuseable.1 - | onNext(PARIS City)
17:39:41.716 [main] INFO reactor.Flux.MapFuseable.1 - | onComplete()
中括号中的就是线程名称,在这个例子中,都是 main。能够看到整个管道执行器中都是应用的 main 线程。
有的时候,咱们可能想通知 Reactor 别在整个管道中应用同一个线程。我能够应用 subscribeOn() 和 publishOn() 办法达到成果。
subscribeOn() 办法
subscribeOn()
办法实用于订阅过程。 咱们能够把它放在响应链条中的任意地位 。它接管 Scheduler 参数,且在提供的线程池中抉择线程执行。
在上面的例子中,咱们应用有界弹性线程池 (Schedulers.boundElastic())
。
@Test
public void testSubscribeThread() {Flux<String> cities = Flux.just("New York", "London", "Paris", "Amsterdam")
.subscribeOn(Schedulers.boundedElastic())
.map(String::toUpperCase)
.filter(cityName -> cityName.length() <= 8)
.map(cityName -> cityName.concat("City"))
.map(TestCase::concat)
.map(TestCase::stringToUpperCase)
.log();
// cities.subscribe();
System.out.println(cities.blockFirst());
}
PS: 原文提供的 case,没有输入,简略批改了一下。
输入:
20:07:53.517 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
20:07:53.558 [main] INFO reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber)
20:07:53.560 [main] INFO reactor.Flux.Map.1 - request(unbounded)
concat: boundedElastic-1
stringToUpperCase: boundedElastic-1
20:07:53.564 [boundedElastic-1] INFO reactor.Flux.Map.1 - onNext(NEW YORK CITY CITY)
20:07:53.565 [boundedElastic-1] INFO reactor.Flux.Map.1 - cancel()
NEW YORK CITY CITY
能够看到 main
线程开始订阅,然而被切换成 boundedElastic-1
线程。咱们提供了一个 Scheduler(Schedulers.boundedElastic())
,而后这个线程池中的一个线程被选中来替换 main
线程。
publishOn() 办法
publishOn()
办法跟 subscribeOn()
很相似,然而有一个次要区别。
来看个例子:
class ReactiveJavaTutorial {public static void main(String[] args) {Flux.just("New York", "London", "Paris", "Amsterdam")
.map(ReactiveJavaTutorial::stringToUpperCase)
.publishOn(Schedulers.boundedElastic())
.map(ReactiveJavaTutorial::concat)
.subscribe();}
private static String stringToUpperCase(String name) {System.out.println("stringToUpperCase:" + Thread.currentThread().getName());
return name.toUpperCase();}
private static String concat(String name) {System.out.println("concat:" + Thread.currentThread().getName());
return name.concat("City");
}
}
这里,咱们在两个 map 操作中放一个 publishOn()
。咱们来看输入:
stringToUpperCase: main
stringToUpperCase: main
stringToUpperCase: main
concat: boundedElastic-1
concat: boundedElastic-1
concat: boundedElastic-1
能够看到,所有在 publishOn
操作之前的都是 main 线程执行,所有 publishOn
之后的都是 boundedElastic-1 执行。这是因为 publishOn
充当任何其余操作符。它从上游接管信号,并在关联的 Scheduler
上对一个 worker
执行回调时向上游重播。
这就是 publishOn
和 subscribeOn()
的次要区别。无论咱们把 subscribeOn()
放在哪里,它提供的 Scheduler
都会利用到整条响应链。
subscribeOn and publishOn operators in Project Reactor