共计 1720 个字符,预计需要花费 5 分钟才能阅读完成。
引言:
spring cloud gateway –> webflux –> reactor-netty –> reactor-core
以上是 github 上几个我的项目的依赖关系。
阐明:
reactor-core 是 reactive programming 模型的一个具体实现。
本文简略阐明在 reactor-core 中,flux 工作底层到底做了什么。
先说给一个 demo 代码
Flux<String> data = Flux.just("hello", "hello2");
data = data.map(e -> e + "world");
data.subscribe(e -> System.out.println(e) );
Flux<String> data = Flux.just(“hello”, “hello2”);
这个办法外部创立一个 FluxArray 对象,把数据存储到 array 参数里。
data = data.map(e -> e + ” world”);
这个办法外部创立一个 FluxMapFuseable 对象,把 FluxArray 对象存储到 source 参数里,把 (e -> e + ” world”) 存储到 mapper。
这个办法逻辑就是通过把原来的数据和新的解决逻辑,一层一层的封装起来。
data.subscribe(e -> System.out.println(e) );
在办法的底层调用的是 Flux 类的 subscribe()办法,代码如下(省略局部代码):
OptimizableOperator operator = (OptimizableOperator)publisher;
while(true) {subscriber = operator.subscribeOrReturn(subscriber);//1
if (subscriber == null) {return;}
OptimizableOperator newSource = operator.nextOptimizableSource();//2
if (newSource == null) {publisher = operator.source();
break;
}
operator = newSource;
}
}
publisher.subscribe(subscriber);//3
其中,最开始的 publisher 就是下面的 FluxMapFuseable 对象,subscriber 就是下面(的(e -> System.out.println(e) )。
while 循环外面次要有三步:
1、subscriber = operator.subscribeOrReturn(subscriber);
这个办法外部 MapFuseableSubscriber 创立一个 FluxMapFuseable 对象,把 subscriber 对象存储到 actual 参数里,把 FluxMapFuseable 对象里的 map,也就是 (e -> e + ” world”) 存储到 mapper。
这个办法与生成 FluxMapFuseable 对象相似,之前是把原始的 array 一层一层的套上 map,当初反过来把 FluxMapFuseable 对象上的 map 一层一层解开,再套到 subscriber 上。
2、OptimizableOperator newSource = operator.nextOptimizableSource();
这个办法比拟好了解了,就是获取 FluxMapFuseable 对象的外面一层。
3、publisher.subscribe(subscriber);
出了 while 循环,publisher 就是原始的 fluxArray 数据,subscriber 是一层一层的 map,最外层是(e -> e + ” world”),最里层是(e -> System.out.println(e) )。
这个办法里的先调用 fluxArray 的 fastPath(这个办法名先疏忽含意),外面会对每个 array 元素调用 subscriber 的 onNext 办法。onNext 办法调用 map 中的 (e -> e + ” world”) 解决数据,而后交给里一层 subscriber,持续调用 onNext 办法,最终到(e -> System.out.println(e) )。
到此,调用实现。