引言:

spring cloud gateway --> webflux --> reactor-netty --> reactor-core

以上是github上几个我的项目的依赖关系。

阐明:
reactor-core是reactive programming模型的一个具体实现。

本文简略阐明在reactor-core中,flux工作底层到底做了什么。

先说给一个 demo代码

    Flux<String> data = Flux.just("hello", "hello2");    data = data.map(e -> e + " world");     data.subscribe( e ->  System.out.println(e) );

Flux<String> data = Flux.just("hello", "hello2");
这个办法外部创立一个FluxArray对象,把数据存储到array参数里。

data = data.map(e -> e + " world");
这个办法外部创立一个FluxMapFuseable对象,把FluxArray对象存储到source参数里,把(e -> e + " world")存储到mapper。

这个办法逻辑就是通过把原来的数据和新的解决逻辑,一层一层的封装起来。

data.subscribe( e -> System.out.println(e) );
在办法的底层调用的是Flux类的subscribe()办法,代码如下(省略局部代码):

     OptimizableOperator operator = (OptimizableOperator)publisher;                  while(true) {                subscriber = operator.subscribeOrReturn(subscriber);//1                if (subscriber == null) {                    return;                }                OptimizableOperator newSource = operator.nextOptimizableSource();//2                if (newSource == null) {                    publisher = operator.source();                    break;                }                operator = newSource;            }        }        publisher.subscribe(subscriber);//3

其中,最开始的publisher就是下面的FluxMapFuseable对象,subscriber就是下面( 的(e -> System.out.println(e) )。

while循环外面次要有三步:
1、subscriber = operator.subscribeOrReturn(subscriber);
这个办法外部MapFuseableSubscriber创立一个FluxMapFuseable对象,把subscriber对象存储到actual参数里,把FluxMapFuseable对象里的map,也就是(e -> e + " world")存储到mapper。

这个办法与生成FluxMapFuseable对象相似,之前是把原始的array一层一层的套上map,当初反过来把FluxMapFuseable对象上的map一层一层解开,再套到subscriber上。

2、OptimizableOperator newSource = operator.nextOptimizableSource();
这个办法比拟好了解了,就是获取FluxMapFuseable对象的外面一层。

3、publisher.subscribe(subscriber);
出了while循环,publisher就是原始的fluxArray数据,subscriber是一层一层的map,最外层是(e -> e + " world"),最里层是(e -> System.out.println(e) )。

这个办法里的先调用fluxArray的fastPath(这个办法名先疏忽含意),外面会对每个array元素调用subscriber的onNext办法。onNext办法调用map中的(e -> e + " world")解决数据,而后交给里一层subscriber,持续调用onNext办法,最终到(e -> System.out.println(e) )。

到此,调用实现。