subscribeOn
Observable.subscribeOn() 在方法内部生成了一个 ObservableSubscribeOn 对象. 主要看一下 ObservableSubscribeOn 的 subscribeActual 方法.
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
// 调用下游的 Observer 的 onSubscribe 方法
observer.onSubscribe(parent);
// 通过 SubscribeTask 执行了上游 Observable 的 subscribeActual 方法
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
scheduler.scheduleDirect(Runnable) 用于执行 SubscribeTask 这个任务.SubscribeTask 本身是 Runnable 的实现类. 看一下其 run 方法.
@Override
public void run() {
// 上游的 Observable.subscribe 方法被切换到了新的线程
source.subscribe(parent);
}
首先可以得出结论:subscribeOn 将上游的 Observable 的 subscribe 方法切换到了新的线程.
如果多次调用 subscribeOn 切换线程, 会有什么效果?
由下往上, 每次调用 subscribeOn, 都会导致上游的 Observable 的 subscribeActual 切换到指定的线程. 那么最后一次调用的切换最上游的创建型操作符的 subscribeActual 的执行线程. 如果操作符有默认执行线程怎么办?
操作符默认线程
如果是创建型操作符, 处于最上游, 那么 subscribeOn 的线程切换对它不起作用. 天高皇帝远, 县官不如现管. 就是这个道理. 如果是其它操作符, 会是怎样的? 以操作符 timeout 为例: 它对应 ObservableTimeoutTimed 和 TimeoutObserver
@Override
public void onNext(T t) {
downstream.onNext(t);
// 超时计时
startTimeout(idx + 1);
}
void startTimeout(long nextIndex) {
// 交给操作符默认的线程执行
task.replace(worker.schedule(new TimeoutTask(nextIndex, this), timeout, unit));
}
@Override
public void onError(Throwable t) {
downstream.onError(t);
}
@Override
public void onComplete() {
downstream.onComplete();
}
}
@Override
public void onTimeout(long idx) {
downstream.onError(new TimeoutException(timeoutMessage(timeout, unit)));
}
//TimeoutTask.java
static final class TimeoutTask implements Runnable {
final TimeoutSupport parent;
final long idx;
TimeoutTask(long idx, TimeoutSupport parent) {
this.idx = idx;
this.parent = parent;
}
@Override
public void run() {
parent.onTimeout(idx);
}
}
可以看到操作符默认的执行线程只用来做超时计时任务, 如果超时了, 会在操作符的默认线程执行 onError 方法.. 操作符默认线程对下游的 observer 造成什么影响要做具体对待.
observeOn
observeOn 对应 ObservableObserveOn 和 ObserveOnObserver.
//ObservableObserveOn.java
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
//ObserveOnObserver.java
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
if (d instanceof QueueDisposable) {
if (m == QueueDisposable.SYNC) {
// 执行下游 Observer 的 onSubscribe 方法
downstream.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
// 执行下游 Observer 的 onSubscribe 方法
downstream.onSubscribe(this);
return;
}
}
// 执行下游 Observer 的 onSubscribe 方法
downstream.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
// 省略
schedule();
}
@Override
public void onError(Throwable t) {
// 省略
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
/*
ObserveOnObserver 是 Runnable 的实现类. 交给线程池执行
*/
worker.schedule(this);
}
}
void drainNormal() {
final Observer<? super T> a = downstream;
for (;;) {
for (;;) {
T v;
try {
v = q.poll();
} catch (Throwable ex) {
a.onError(ex);
return;
}
// 执行下游 Observer 的 onNext 方法
a.onNext(v);
}
}
}
void drainFused() {
for (;;) {
if (!delayError && d && ex != null) {
// 执行下游 Observer 的 onError 方法
downstream.onError(error);
return;
}
downstream.onNext(null);
if (d) {
ex = error;
if (ex != null) {
// 执行下游 Observer 的 onError 方法
downstream.onError(ex);
} else {
// 执行下游 Observer 的 onComplete 方法
downstream.onComplete();
}
return;
}
}
}
// 执行线程任务
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
从上面可以看出 ObservableObserveOn 在其 subscribeActual 方法中并没有切换上游 Observable 的 subscribe 方法的执行线程. 但是 ObserveOnObserver 在其 onNext,onError 和 onComplete 中通过 schedule() 方法将下游 Observer 的各个方法切换到了新的线程. 得出结论: observeOn 负责切换的是下游 Observer 的各个方法的执行线程
如果下游多次通过 observeOn 切换线程, 会有什么效果? 每次切换都会对其下游造成影响, 直到遇到下一个 observeOn 为止.
Observer(onSubscribe,onNext,onError,onComplete)
onNext,onError,onComplete 与上游最近的 observeOn 所切换的线程保持一致.onSubscribe 则不同. 遇到线程切换的时候, 会首先在对应的 Observable 的 subscribeActual 方法内, 先调用 observer.onSubscribe 方法. 而 observer.onSubscribe 会逐级向上传递直到最上游, 而最上游的 observer.onSubscribe 是在 subscribeActual 方法内调用, 这是在主线程执行的. 所以 onSubscribe 方法无论如何都是在主线程执行.
doOnSubscribe
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
}
})
我们要看的是方法 accept 的执行线程. 通过源码找到对应的 DisposableLambdaObserver.
@Override
public void onSubscribe(Disposable d) {
// 在这里调用了 accept 方法.
onSubscribe.accept(d);
}
这就要看上游在哪个线程执行了 Observer.onSubscribe(disposable) 方法.
在创建型操作符的 subscribeActual 方法和 subscribeOn 对应的 Observable 的 subscribeActual 方法内调用了 Observer.onSubscribe(disposable) 方法. 那么这两处的执行线程就决定了 onSubscribe.accept(d); 的执行线程.
doFinally
对应 ObservableDoFinally 和 DoFinallyObserver
//DoFinallyObserver.java
@Override
public void onError(Throwable t) {
runFinally();
}
@Override
public void onComplete() {
runFinally();
}
@Override
public void dispose() {
runFinally();
}
void runFinally() {
onFinally.run();
}
可以看到与它所对应的 DoFinallyObserver 的 onError,onComplete,dispose 方法的执行线程有关, 这三个方法的执行线程又受到上游的 observeOn 的影响. 如果没有 observeOn, 则会受到最上游的 observable.subscribeActual 方法影响.
doOnError
对应 ObservableDoOnEach 和 DoOnEachObserver
//DoOnEachObserver.java
@Override
public void onError(Throwable t) {
onError.accept(t);
}
和自身对应的 observer.onError 所在线程保持一致.
doOnNext
对应 ObservableDoOnEach 和 DoOnEachObserver
//DoOnEachObserver.java
@Override
public void onNext(T t) {
onNext.accept(t);
}
和自身对应的 observer.onNext 所在线程保持一致.
操作符对应方法参数的执行线程
包 io.reactivex.functions 下的接口类一般用于处理上游数据然后往下传递. 这些接口类的方法一般在对应的 observer.onNext 中调用. 所以他们的线程保持一致.
总结:
subscribeOn 由下往上逐级切换 Observable.subscribe 的执行线程, 不受 observeOn 影响, 也不受具有默认指定线程的非创建型操作符影响, 但是会被更上游的 subscribeOn 夺取线程切换的权利, 直到最上游. 如果最上游的创建型操作符也有默认执行线程, 那么任何一个 subscribeOn 的线程切换不起作用.subscribeOn 由下向上到达最上游后, 然后由上往下影响下游的 observer 的执行线程. 遇到 observeOn 会被夺取线程切换的权利.observeOn 影响的是下游的 observer 的执行线程, 由上往下, 遇到另一个 observeOn 会移交线程控制权力, 遇到指定默认线程非创建型的操作符, 要视具体情况对待.