共计 19771 个字符,预计需要花费 50 分钟才能阅读完成。
上一篇文章 Rxjava2.x 源码解析(一): 订阅流程中我们讲了 RxJava2 的订阅部分的源码。但 RxJava2 最强大的部分其实是在异步。默认情况下,下游接收事件所在的线程和上游发送事件所在的线程是同一个线程。接下来我们在上一篇文章的示例代码中加入线程切换相关代码:
// 上游 observable
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {Log.d(TAG, "subscribe:");
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();}
});
// 下游 observer
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
// onSubscribe 方法会最先被执行
Log.d(TAG, "onSubscribe:");
}
@Override
public void onNext(Integer integer) {Log.d(TAG, "onNext:");
}
@Override
public void onError(Throwable e) {Log.d(TAG, "onError:");
}
@Override
public void onComplete() {Log.d(TAG, "onComplete:");
}
};
// 在子线程中进行事件的发送
observable.subscribeOn(Schedulers.newThread())
// 切换到 UI 线程进行监听
.observeOn(AndroidSchedulers.mainThread())
// 将上游和下游进行关联
.subscribe(observer);
我们通过 subscribeOn(Schedulers.newThread())
这行代码,就可以将我们上游的代码切换到子线程中去执行,通过 observeOn(AndroidSchedulers.mainThread())
又能指定下游监听的代码执行在主线程(这里的 AndroidSchedulers 并不是 RxJava2 默认提供的,而是属于 Android 领域的,由 RxAndroid 这个库实现)。一行代码,就能自由切换上下游的代码执行的线程,这么骚的操作,到底是怎么实现的呢?
我们上面两个方法中传入的都是一个 Scheduler
实例,翻译过来就是“调度器”,负责线程相关的调度。
那接下来我们就先从上游相关的 subscribeOn(Schedulers.newThread())
开始分析。
先从参数入手,看看这个 Schedulers.newThread()
中执行了什么:
public final class Schedulers {
static final Scheduler SINGLE;
static final Scheduler COMPUTATION;
static final Scheduler IO;
static final Scheduler TRAMPOLINE;
// 这里是 NEW_THREAD
static final Scheduler NEW_THREAD;
static final class SingleHolder {...}
static final class ComputationHolder {...}
static final class IoHolder {...}
// 初始化一个默认的 NewThreadScheduler
static final class NewThreadHolder {static final Scheduler DEFAULT = new NewThreadScheduler();
}
static {
...
// 由一个新创建的 NewThreadTask 来初始化 NEW_THREAD
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
@NonNull
public static Scheduler newThread() {return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
...
static final class IOTask implements Callable<Scheduler> {...}
// 这里是 NewThreadTask
static final class NewThreadTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {return NewThreadHolder.DEFAULT;}
}
static final class SingleTask implements Callable<Scheduler> {...}
static final class ComputationTask implements Callable<Scheduler> {...}
}
可以看到,newThread(...)
方法会返回一个 Scheduler
类型的静态变量NEW_THREAD
,而该变量的初始化是在如下的静态代码块中:
static {
...
// 由一个新创建的 NewThreadTask 来初始化 NEW_THREAD,类型为 Scheduler
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
这里面创建了一个 NewThreadTask
实例,该类也比较简单,就是在 call()
方法中返回了NewThreadHolder.DEFAULT
:
static final class NewThreadTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {return NewThreadHolder.DEFAULT;}
}
NewThreadHolder.DEFAULT
则是一个 NewThreadScheduler
对象:
// 初始化一个默认的 NewThreadScheduler
static final class NewThreadHolder {static final Scheduler DEFAULT = new NewThreadScheduler();
}
那我们不禁好奇,这个 call()
方法又是什么时候调用的呢?我们继续回到 RxJavaPlugins.initNewThreadScheduler(new NewThreadTask())
这行代码,从名称来看是初始化 NewThreadScheduler 对象的,那我们进去看下是如何进行的:
public static Scheduler initNewThreadScheduler(@NonNull Callable<Scheduler> defaultScheduler) {ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitNewThreadHandler;
if (f == null) {
// 直接看这里
return callRequireNonNull(defaultScheduler);
}
return applyRequireNonNull(f, defaultScheduler);
}
作为聪明人,我们直接看 callRequireNonNull(defaultScheduler)
这行代码:
static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) {
try {// 可以看到,这里调用了 s.call(),并将结果返回;若为空,则报异常
return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
} catch (Throwable ex) {throw ExceptionHelper.wrapOrThrow(ex);
}
}
可以看到,里面直接调用了传入的参数的 call()
方法,并返回。
到这里,就知道了,RxJavaPlugins.initNewThreadScheduler(new NewThreadTask())
这行代码其实就是初始化一个 NewThreadScheduler
对象。
绕了这么远,其实 Schedulers.newThread()
这句就是创建了一个 NewThreadScheduler
对象,这里讲的比较细。
我们继续回来,看看 subscribeOn(Schedulers.newThread())
里面做了什么:
public final Observable<T> subscribeOn(Scheduler scheduler) {ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
根据第一篇文章里的经验,我们知道,这里又是将上一步生成的 Observable 进一步封装成一个 ObservableSubscribeOn
并返回。其实,RxJava 之所以能进行链式调用,无外乎就是在每次调用操作符方法的时候,返回一个 Observable 的引用,但是这个 Observable 所具体指向的对象,可能是不同的。中间可能就创建了新的对象,经过了一层层的包装。RxJava 里装饰器模式用的还是比较厉害的,所以说,千万别觉的实际模式都是虚无缥缈的东西。
这里返回的是一个 ObservableSubscribeOn
对象(注意看命名哦!规律之前讲过的)
经过上篇文章分析,我们知道,使用 Observable 的 subscribe 方法进行订阅的时候,最终会调用到 Observable 的 subscribeActual(...)
方法,这里的 Observable
具体就是ObservableSubscribeOn
:
// ObservableSubscribeOn.java
public void subscribeActual(final Observer<? super T> observer) {final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
可以看到,这里将 observer 也进行了包装,包装成 SubscribeOnObserver
对象。也相当于配套啦,haha。
然后又将这个封装后的对象传进了一个新建的 SubscribeTask 对象中。
???
这个 SubscribeTask
又是啥?
这个 SubscribeTask
是ObservableSubscribeOn
这个类的内部类,其实就是一个 Runnable
实现类:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
// 创建一个新的 Observer
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
// 进行线程任务的创建及分发
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
...
// 是个 Runnable 实现类
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {this.parent = parent;}
@Override
public void run() {
// 注意,此处是关键,正是从这里开始,上游(即:source)在新线程重新对下游进行订阅。// 从而达到上游发送事件的线程进行切换的目的
// 这里提前提醒下,多次订阅,并不是只有第一次订阅指定的线程才有效,那只是普通使用场景下的“凑巧”source.subscribe(parent);
}
}
}
到这,我们总算看到了线程相关的东西了。Runnable 大家肯定都熟悉吧?在它的 run()
方法中,调用了 source.subscribe(parent)
, 这里的 parent 我们知道,是封装之后的SubscribeOnObserver
,但source
又是啥?其实就是我们在 ObservableSubscribeOn 的构造函数中传进来的this
, 即上游的 Observable:
// Observable.java
public final Observable<T> subscribeOn(Scheduler scheduler) {ObjectHelper.requireNonNull(scheduler, "scheduler is null");
// 这里传进来的 this 对象,就是上游 Observable 对象
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
抽象类 Observable 实现了 ObservableSource 接口,这个接口就是我们进行订阅时候用到的subscribe(...)
:
public interface ObservableSource<T> {void subscribe(@NonNull Observer<? super T> observer);
}
继续看这个 run() 方法,它相当于是把之前的上游通过 subscribe(...)
订阅到了新的下游。也就是说:
subscribeOn(...)方法的本质是,在指定的线程中将上游和下游进行订阅 `。
这和我们链式调用中最后一步的订阅本质上是一样的。
明白了这点,也就能知道,这个线程一旦启动,新的 observer 接收和处理事件,也是在这个子线程里。即,默认情况下它会随着上游线程的切换而切换,二者始终在一个线程,除非它通过 observeOn(...)
自行指定。
我们现在明白了上游是如何通过一行代码就能运行在子线程里,但还没看到这个线程是什么时候、如何启动起来的。
那我们就回到之前的位置,继续看 scheduler.scheduleDirect(new SubscribeTask(parent))
这行代码,scheduler 具体指 NewThreadScheduler
,但scheduleDirect(...)
这个方法是在父类中实现的,它没有进行重写(其他类型的 scheduler 有进行重写,比如 ComputationScheduler 等),那就进父类看看:
// Scheduler.java
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {// createWorker()为抽象方法,由子类实现
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
这个方法的参数中有个 Runnable 对象,那我们直接启动个线程不就好了?当然是可以的。但是作为一个成熟的库,它一定要考虑更多的场景。需要考虑到线程安全问题,以及对线程的控制,比如,通过 Dispose 来截断上下游之间事件的事件流。
我们先看 final Worker w = createWorker();
这行代码,它创建了一个 Worker,具体点就是 NewThreadWorker
,这里贴下NewThreadScheduler.java
的源码:
/**
* Schedules work on a new thread.
*/
public final class NewThreadScheduler extends Scheduler {
final ThreadFactory threadFactory;
private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
private static final RxThreadFactory THREAD_FACTORY;
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";
static {
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
}
public NewThreadScheduler() {this(THREAD_FACTORY);
}
public NewThreadScheduler(ThreadFactory threadFactory) {this.threadFactory = threadFactory;}
@NonNull
@Override
public Worker createWorker() {return new NewThreadWorker(threadFactory);
}
}
继续回到 scheduleDirect(...)
方法的第 8 行:
DisposeTask task = new DisposeTask(decoratedRun, w);
它将我们要执行的 runnable 和 Worker,又封装进了一个 DisposeTask
中,便于对流进行控制。DisposeTask
是 Scheduler 的静态内部类,实现了 Disposable
, Runnable
, SchedulerRunnableIntrospection
这三个接口:
public abstract class Scheduler {
...
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
@NonNull
final Runnable decoratedRun;
@NonNull
final Worker w;
@Nullable
Thread runner;
DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {runner = Thread.currentThread();
try {decoratedRun.run();
} finally {dispose();
runner = null;
}
}
@Override
public void dispose() {if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {((NewThreadWorker)w).shutdown();} else {w.dispose();
}
}
@Override
public boolean isDisposed() {return w.isDisposed();
}
@Override
public Runnable getWrappedRunnable() {return this.decoratedRun;}
}
}
创建了 DisposeTask 之后,就将它传递给了 worker
执行:
w.schedule(task, delay, unit);
这行代码就是开始执行指定任务,我们可以进入 NewThreadWorker.java
源码中查看详细细节:
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {executor = SchedulerPoolFactory.create(threadFactory);
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable run) {return schedule(run, 0, null);
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {if (disposed) {return EmptyDisposable.INSTANCE;}
// 最终会调用到 scheduleActual(...)方法
return scheduleActual(action, delayTime, unit, null);
}
public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
try {
Future<?> f;
if (delayTime <= 0L) {f = executor.submit(task);
} else {f = executor.schedule(task, delayTime, unit);
}
task.setFuture(f);
return task;
} catch (RejectedExecutionException ex) {RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
}
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {...}
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
/**********************************************
*** 将我们的 runnable 对象,又经过了一层封装 *****
*********************************************/
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {if (!parent.add(sr)) {return sr;}
}
/*********************************************************************************
*** 最终会通过 executor 线程池去执行相应的任务, 通过 Future,来获取线程执行后的返回值 *****
********************************************************************************/
Future<?> f;
try {if (delayTime <= 0) {f = executor.submit((Callable<Object>)sr);
} else {f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {if (parent != null) {parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
@Override
public void dispose() {if (!disposed) {
disposed = true;
executor.shutdownNow();}
}
/**
* Shuts down the underlying executor in a non-interrupting fashion.
*/
public void shutdown() {if (!disposed) {
disposed = true;
executor.shutdown();}
}
@Override
public boolean isDisposed() {return disposed;}
}
w.schedule(task, delay, unit)
最终会调用到第 46 行的 scheduleActual(...)
方法。在该方法中,又将新传进来的 runnable 对象封装进 ScheduledRunnable,封装了这么多层 …~~(>_<)~~。然后就直接将这个 ScheduledRunnable
交给线程池去执行了。为了能在线程执行完之后,接收返回值,使用了Future
。再往下,就完全是线程池相关的知识点了,此处不再赘述。
到这,我们就完全分析完了 RxJava2 是如何通过一行 subscribeOn(...)
代码切换上游发送事件所在线程的。接下来我们就来分析 observeOn(...)
是如何切换下游处理事件的线程的。
线程的创建,这里跟之前是相同的。该方法最终会调用到如下重载方法:
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
...
// 创建了一个 ObservableObserveOn 并返回
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
直接进 ObservableObserveOn
的subscribeActual(...)
方法:
protected void subscribeActual(Observer<? super T> observer) {if (scheduler instanceof TrampolineScheduler) {source.subscribe(observer);
} else {Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
这个方法就比较简单了,直接将上游和新创建的 ObserveOnObserver
进行绑定。并且在创建的 ObserveOnObserver
的同时,也将 worker 传进去,进行线程任务的相关处理。到这里,我们可以猜想下,封装之后的新的 ObserveOnObserver 是如何做到使原 observer 中的任务在指定的线程中执行的。其实就是重写对应的方法,将之前的逻辑通过 worker 来指定执行线程。边追源码边猜想,才能更好的理解。
接下来就来看 ObservableObserveOn.java#ObserveOnObserver
的源码:
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
...
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable d) {if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) d;
// 注意,这里调用了 requestFusion 来获取 mode,之后会用到
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
downstream.onSubscribe(this);
// 如果是 sync,会立即调用 schedule()
// 执行线程任务,查看 run 方法
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
downstream.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
downstream.onSubscribe(this);
}
}
@Override
public void onNext(T t) {if (done) {return;}
if (sourceMode != QueueDisposable.ASYNC) {queue.offer(t);
}
// 执行线程任务,查看 run 方法
schedule();}
@Override
public void onError(Throwable t) {if (done) {RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
// 执行线程任务,查看 run 方法
schedule();}
@Override
public void onComplete() {if (done) {return;}
done = true;
// 执行线程任务,查看 run 方法
schedule();}
@Override
public void dispose() {...}
@Override
public boolean isDisposed() {return disposed;}
void schedule() {if (getAndIncrement() == 0) {worker.schedule(this);
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
// checkTerminated 方法会检查任务是否执行结束。if (checkTerminated(done, q.isEmpty(), a)) {return;}
for (;;) {
boolean d = done;
T v;
try {v = q.poll();
} catch (Throwable ex) {Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
// checkTerminated 方法会检查任务是否执行结束。if (checkTerminated(d, empty, a)) {return;}
if (empty) {break;}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {break;}
}
}
void drainFused() {...}
@Override
public void run() {if (outputFused) {drainFused();
} else {// outputFused 是跟背压及操作符相关,这里直接分析 drainNormal()
drainNormal();}
}
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {if (disposed) {queue.clear();
return true;
}
if (d) {
Throwable e = error;
// 是否设置了超时错误,是在 observeOn(scheduler, delayError, bufferSize()) 的第二个参数传入的,// 默认传了 false
if (delayError) {if (empty) {
disposed = true;
if (e != null) {a.onError(e);
} else {a.onComplete();
}
worker.dispose();
return true;
}
} else {
// 根据是否报了异常,来决定是执行 onError 还是 onComplete
if (e != null) {
disposed = true;
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
disposed = true;
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
...
}
为了验证我们的猜想,我们看看在 onSubscribe/onNext/onError/onComplete
这些函数中都调用了什么。
我们发现,在这些函数中,差不多都调用了schedule();
(调用 requestFusion(…)相关逻辑暂时忽略)。查看该函数的调用出,在第 93 行:
void schedule() {if (getAndIncrement() == 0) {worker.schedule(this);
}
}
这里直接将 this
传递给了 worker 进行线程任务的执行,这里的 this
指的就是 ObserveOnObserver
,上面说道,它实现了 runnable 接口。而onSubscribe/onNext/onError/onComplete
这些函数中都调用了同一个函数 schedule();
,有理由猜想,对各个函数的区分处理,肯定就在重写的run()
方法里了, 查看第 150 行:
public void run() {if (outputFused) {drainFused();
} else {drainNormal();
}
}
outputFused 涉及背压及操作符的相关处理,这里我们直接看drainNormal();
:
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
// checkTerminated 方法会检查任务是否执行结束。if (checkTerminated(done, q.isEmpty(), a)) {return;}
for (;;) {
boolean d = done;
T v;
try {v = q.poll();
} catch (Throwable ex) {Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
// checkTerminated 方法会检查任务是否执行结束。if (checkTerminated(d, empty, a)) {return;}
if (empty) {break;}
// 如果没结束,就调用新的 Observer 的 onNext 方法
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {break;}
}
}
在该方法中,首先通过 checkTerminated(...)
判断线程任务是否执行结束(complete 或者 error),如果没有,就去执行新的下游 Observer 的 onNext()方法。如果执行完了,就直接返回。
那啥时候调用了新的下游 Observer 的 onComplete/onError
方法呢?当然是在 checkTerminated(...)
方法中啦:
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {if (disposed) {queue.clear();
return true;
}
if (d) {
Throwable e = error;
// 是否设置了超时错误,是在 observeOn(scheduler, delayError, bufferSize()) 的第二个参数传入的,// 默认传了 false
if (delayError) {if (empty) {
disposed = true;
if (e != null) {a.onError(e);
} else {a.onComplete();
}
worker.dispose();
return true;
}
} else {
// 根据是否报了异常,来决定是执行 onError 还是 onComplete
if (e != null) {
disposed = true;
queue.clear();
// 执行 onError
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
disposed = true;
// 执行 onComplete
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
在该方法里,我们就看到了对 onComplete()/onError
方法的调用了。
好了,到这里,我们就把 rxjava2 中线程切换的知识讲完了,里面还有很多细节需要大家自己细细研究。
总结:
-
下游 observer
的onSubscribe(...)
方法一直是在它所在的线程调用的。即observable.subscribe(observer)
这行代码所在的线程。 -
subscribeOn(...)
指定的是上游发送事件的线程, 比如ObservableOnSubscribe
的subscribe(ObservableEmitter<Integer> emitter){...}
方法执行的线程, 在该方法里我们往往会调用emitter.onNext(...)/onComplete()/onError(...)
来发送事件。 -
observeOn(...)
指定的是下游接收事件的线程, 即onSubscribe(...)/ onNext(...)/onError(...)/onComplete()
这些回调方法的执行线程。 - 默认情况下,下游接收事件的线程和上游发送事件的线程,是同一个线程,下游与上游保持一致。上游通过
subscribeOn(...)
切换线程的时候,下游仍会自动与其保持一致。除非下游单独通过observeOn(...)
来指定下游自己的线程。
此外,还需要特别指出的一点就是,多次指定上游的线程只有第一次指定的有效
这个结论是:错误的 错误的 错误的
很多文章中也都是这么说的,但是很遗憾,是错误的,因为很多人都只是从表象出发,连续调用两次 subscribeOn
,然后在下游 Observer 的onSubscribe
回调里打印线程名称,发现一直是第一次指定的那个线程,就开始想当然的总结结论了,他们的代码应该是下面这样的:
// 上游 observable
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {Log.d(TAG, "subscribe:");
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
Log.d(TAG, "subscribe: 当前线程为:" + Thread.currentThread().getName());
}
});
// 下游 observer
Observer<Integer> observer = new Observer<Integer>() {...}
observable
// 第一次指定
.subscribeOn(AndroidSchedulers.mainThread())
// 第二次指定
.subscribeOn(Schedulers.newThread())
// 切换到 UI 线程进行监听
.observeOn(AndroidSchedulers.mainThread())
// 将上游和下游进行关联
.subscribe(observer);
打印结果为:
你不断调整两个的位置,发现仍然是指定的第一个有效,似乎你是对的。不防试试下面的例子:
// 上游 observable
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {Log.d(TAG, "subscribe:");
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
Log.d(TAG, "subscribe: 当前线程为:" + Thread.currentThread().getName());
}
});
// 下游 observer
Observer<Integer> observer = new Observer<Integer>() {...}
observable
// 第一次指定
.subscribeOn(AndroidSchedulers.mainThread())
// 创建第一个 onSubscribe
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {Log.d(TAG, "accept1: 当前线程为:" + Thread.currentThread().getName());
}
})
// 第二次指定
.subscribeOn(Schedulers.newThread())
// 创建第二个 onSubscribe
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {Log.d(TAG, "accept2: 当前线程为:" + Thread.currentThread().getName());
}
})
// 切换到 UI 线程进行监听
.observeOn(AndroidSchedulers.mainThread())
// 将上游和下游进行关联
.subscribe(observer);
结果如下:
可以看到,每个 doOnSubscribe(...)
内的代码,运行在它上面离它最近的 subscribeOn()
指定的线程。也就是说,多次切换都生效了。这点也可以参考我们上面的总结里的第一条:
下游 observer 的 onSubscribe(...)方法一直是在它所在的线程调用的。即 observable.subscribe(observer)这行代码所在的线程。
对 doOnSubscribe
操作符就不展开讲了。
再仔细看上面的截图,发现我们在第二个 doOnSubscribe(...)
方法中的代码反而要比第一个先执行。Why? 这其实是在向上回溯。希望你还能记得,我们前面说:
subscribeOn(...)方法的本质是,在指定的线程中将上游和下游进行订阅 `。
这个“上游”是个相对概念,上游之上,还有上游,所以就不断回溯,最终调用到最开始指定的那个线程。
虽然表面上看,确实是第一个指定的有效,但是千万别被欺骗了。
好了,到这,本篇文章就结束了。文章较长,可以耐心点,反复看看。
通过对 RxJava2 的研究,发现里面涉及到很多知识,我也是一边读一遍补其他知识。比如里面涉及很多并发编程的知识,而并发编程又需要你对计算机组成原理、操作系统、编译原理这些有一定的了解,还好大学考软考的时候看过这些方面的书,拾起来相对容易点。
欠的技术债总是要还的,正面刚吧。
欢迎关注公众号来获取其他最新消息,有趣的灵魂在等你。