RxJava2:操作符执行顺序和数据传递方向

39次阅读

共计 2388 个字符,预计需要花费 6 分钟才能阅读完成。

以 Observable 为例, 先上代码:
//①
ObservableJust<String> observable = (ObservableJust<String>) Observable.just(“hello rxjava2”);
//②
ObservableSubscribeOn<String> subscribe = (ObservableSubscribeOn<String>) observable.subscribeOn(Schedulers.io());
//③
ObservableObserveOn<String> observerOn = (ObservableObserveOn<String>) subscribe.observeOn(AndroidSchedulers.mainThread());
//④
ObservableDoFinally<String> doFinally = (ObservableDoFinally<String>) observerOn.doFinally(new Action() {
@Override
public void run() throws Exception {
System.out.println(“doFinally”);
}
});
//⑤
ObservableDoOnLifecycle<String> doOnSubscribe = (ObservableDoOnLifecycle<String>) doFinally.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
System.out.println(“doOnSubscribe: ” + disposable.hashCode());
}
});
//⑥
doOnSubscribe.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println(“onSubscribe: “+d.hashCode());
/* if (!d.isDisposed()){
System.out.println(“onSubscribe: dispose”);
d.dispose();
}*/
}

@Override
public void onNext(String s) {
System.out.println(“onNext: “+s);
Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
}

@Override
public void onError(Throwable e) {
System.out.println(“onError: “+e.getMessage());
Toast.makeText(MainActivity.this, e.getMessage(), Toast.LENGTH_SHORT).show();

}

@Override
public void onComplete() {
System.out.println(“onComplete”);
Toast.makeText(MainActivity.this, “onComplete”, Toast.LENGTH_SHORT).show();
}
});

这里每次调用一个操作符, 返回的都是 Observable 的直接子类或者间接之类. 以 just 为例:
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, “The item is null”);
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}

这里重新 new 了一个 Observable 的子类对象 ObservableJust.
结论如下:

每个操作符都会对应返回一个 Observable 的子类对象, 类名格式 ObservableXXX 然后去调用下一个操作符. 比如 interval 操作符, 返回的是 ObservableInterval 的实例对象.
对于 Observable 的创建型操作符, 返回的是其直接子类, 而其他操作符, 返回的是 AbstractObservableWithUpstream 的子类对象.AbstractObservableWithUpstream 的构造函数中, 第一个参数就是 Observable 对象, 这一点非常重要, 这个参数是上一个操作符返回的 Observable 对象. 这保证了整个调用流程的起始处的 Observable 对象能在整个流程中传递.

操作符的调用是由上至下, 顺序调用的. 数据流的传递流程是怎么样的?
数据流的传递
如果没有最下游的观察者对数据做接收, 整个调用流程是不会执行的. 先从⑥开始看 ObservableDoOnLifecycle 的 subscribe 方法做了什么.
@Override
protected void subscribeActual(Observer<? super T> observer) {
source.subscribe(new DisposableLambdaObserver<T>(observer, onSubscribe, onDispose));
}
source 就是上游操作符返回的 Observable 的子类对象, 通过 AbstractObservableWithUpstream 的构造函数传递给下游的. 这里去调用了上一个 Observable 对象的 subscribe 方法. 这个调用由下至上, 直到整个流程的起始处.

下一步看数据是怎么传递的?

正文完
 0