RxJava2:操作符执行顺序和数据传递方向

以Observable为例,先上代码:
//①
ObservableJust<String> observable = (ObservableJust<String>) Observable.just(“hello rxjava2”);
//②
ObservableSubscribeOn<String> subscribe = (ObservableSubscribeOn<String>) observable.subscribeOn(Schedulers.io());
//③
ObservableObserveOn<String> observerOn = (ObservableObserveOn<String>) subscribe.observeOn(AndroidSchedulers.mainThread());
//④
ObservableDoFinally<String> doFinally = (ObservableDoFinally<String>) observerOn.doFinally(new Action() {
@Override
public void run() throws Exception {
System.out.println(“doFinally”);
}
});
//⑤
ObservableDoOnLifecycle<String> doOnSubscribe = (ObservableDoOnLifecycle<String>) doFinally.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
System.out.println(“doOnSubscribe: ” + disposable.hashCode());
}
});
//⑥
doOnSubscribe.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println(“onSubscribe: “+d.hashCode());
/* if (!d.isDisposed()){
System.out.println(“onSubscribe: dispose”);
d.dispose();
}*/
}

@Override
public void onNext(String s) {
System.out.println(“onNext: “+s);
Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
}

@Override
public void onError(Throwable e) {
System.out.println(“onError: “+e.getMessage());
Toast.makeText(MainActivity.this, e.getMessage(), Toast.LENGTH_SHORT).show();

}

@Override
public void onComplete() {
System.out.println(“onComplete”);
Toast.makeText(MainActivity.this, “onComplete”, Toast.LENGTH_SHORT).show();
}
});

这里每次调用一个操作符,返回的都是Observable的直接子类或者间接之类.以just为例:
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, “The item is null”);
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}

这里重新new了一个Observable的子类对象ObservableJust.
结论如下:

每个操作符都会对应返回一个Observable的子类对象,类名格式ObservableXXX然后去调用下一个操作符.比如interval操作符,返回的是ObservableInterval的实例对象.
对于Observable的创建型操作符,返回的是其直接子类,而其他操作符,返回的是AbstractObservableWithUpstream的子类对象.AbstractObservableWithUpstream的构造函数中,第一个参数就是Observable对象,这一点非常重要,这个参数是上一个操作符返回的Observable对象.这保证了整个调用流程的起始处的Observable对象能在整个流程中传递.

操作符的调用是由上至下,顺序调用的.数据流的传递流程是怎么样的?
数据流的传递
如果没有最下游的观察者对数据做接收,整个调用流程是不会执行的.先从⑥开始看ObservableDoOnLifecycle的subscribe方法做了什么.
@Override
protected void subscribeActual(Observer<? super T> observer) {
source.subscribe(new DisposableLambdaObserver<T>(observer, onSubscribe, onDispose));
}
source就是上游操作符返回的Observable的子类对象,通过AbstractObservableWithUpstream的构造函数传递给下游的.这里去调用了上一个Observable对象的subscribe方法.这个调用由下至上,直到整个流程的起始处.

下一步看数据是怎么传递的?

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理