前言
本篇的文章是基于Rxjava 2.1.2。从上面的一段代码中,咱们从源码的角度剖析 RxJava 的实现原理:

ObservableOnSubscribe<Integer> oos = new ObservableOnSubscribe<Integer>() {     @Override     public void subscribe(ObservableEmitter<Integer> e) throws Exception {         final int max = 100;         for (int i = 1; i <= max; i++) {             e.onNext(max);         }         e.onComplete();     } }; Observer<Integer> o = new Observer<Integer>() {     @Override     public void onSubscribe(Disposable d) {     }     @Override     public void onNext(Integer integer) {     }     @Override     public void onError(Throwable e) {     }     @Override     public void onComplete() {     } }; Observable.create(oos)         .observeOn(AndroidSchedulers.mainThread())         .subscribeOn(Schedulers.computation())         .subscribe(o);

剖析源码之前,咱们先定义一下名词,RxJava 是基于观察者模式的,这里将被观察者叫做主题(Source),观察者叫做观察者(Observer)。

下面的代码首先创立了一个主题对象,而后又创立了一个观察者对象,最初将两者关联起来,并且最重要的一点,指定了主题对象和观察者对象执行的线程。

注释
Observable.create(oos)

首先剖析这行代码做了什么事件:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {     ObjectHelper.requireNonNull(source, "source is null");     return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }

ObjectHelper 只是用来做非空判断,这里就不必管它了。看看 RxJavaPlugins 做了什么:

/**  * Calls the associated hook function.  * @param <T> the value type  * @param source the hook's input value  * @return the value returned by the hook  */ @SuppressWarnings({ "rawtypes", "unchecked" }) @NonNull public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {     Function<? super Observable, ? extends Observable> f = onObservableAssembly;     if (f != null) {         return apply(f, source);     }     return source; 

正文上都说了,这是一个钩子函数,也就是说如果 onObservableAssembly 的值不为空,那么就调用这个钩子函数,onObservableAssembly 是一个动态变量,须要咱们被动的去设置才会赋值,这里当做空来思考,如果 onObservableAssembly 为空的话,也就是说这个办法啥都没做,间接返回 source 参数,也就是下面的 ObservableCreate 对象。

总结一下,Observable.create(oos) 只是创立了一个 ObservableCreate 对象。这个办法就临时先剖析到这里,至于这个对象外部有什么货色,咱们前面会说到。

observeOn(AndroidSchedulers.mainThread())
既然之前的 create 办法创立了一个 ObservableCreate 对象并返回,也就是说 observeOn(Schedulers.computation()) 这个办法是调用的 ObservableCreate 这个对象上的办法。

public final class ObservableCreate<T> extends Observable<T> {

ObservableCreate 是继承至 Observable 的。

public final Observable<T> observeOn(Scheduler scheduler) {     return observeOn(scheduler, false, bufferSize()); }

Observable 的 observeOn 是 final 的,所以走的父类的办法。持续跟踪 observeOn 调用的同名办法:

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {     ObjectHelper.requireNonNull(scheduler, "scheduler is null");     ObjectHelper.verifyPositive(bufferSize, "bufferSize");     return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); }

ObjectHelper 跳过。这里又是一个 onAssembly 办法的调用,我很好奇这个单词是什么意思。点进去看一下:

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {     Function<? super Observable, ? extends Observable> f = onObservableAssembly;     if (f != null) {         return apply(f, source);     }     return source; }

同样是一个钩子办法,当初也是有教训的人呢,再看到这个办法,就间接跳过,只关怀它传递的参数和返回值就行了。它的返回值默认就是传递进来的参数。

所以,observeOn 办法就是创立并返回了一个 ObservableObserveOn 对象(大神教你起类名系列二),这里预警一下,之后像这样相似的类名差不多还有3个。

subscribeOn(Schedulers.computation())
因为 observeOn 创立并返回了一个 ObservableObserveOn 对象,所以这里调用的是 ObservableObserveOn 对象上的办法。

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {  abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

也是继承至 Observable 。

public final Observable<T> subscribeOn(Scheduler scheduler) {     ObjectHelper.requireNonNull(scheduler, "scheduler is null");     return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }

不出预料,也是 final 的,创立了一个 ObservableSubscribeOn 对象(大神教你起类名系列三)并返回。

subscribe(o)
后面 subscribeOn 创立了一个 ObservableSubscribeOn 对象并返回,所以这里调用的是 ObservableSubscribeOn 这个对象下面的办法。

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {  abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

继承至 Observable 类。

public final void subscribe(Observer<? super T> observer) {     ObjectHelper.requireNonNull(observer, "observer is null");     try {         observer = RxJavaPlugins.onSubscribe(this, observer);         ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");         subscribeActual(observer);     } catch (NullPointerException e) { // NOPMD         throw e;     } catch (Throwable e) {         Exceptions.throwIfFatal(e);         // can't call onError because no way to know if a Disposable has been set or not         // can't call onSubscribe because the call might have set a Subscription already         RxJavaPlugins.onError(e);         NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");         npe.initCause(e);         throw npe;     } }

这个办法也是 final 的,所以是调用的这个办法。

public final void subscribe(Observer<? super T> observer) { … }

这个办法是咱们须要剖析的重点,看看外部具体的代码吧先:

public final void subscribe(Observer<? super T> observer) {     ObjectHelper.requireNonNull(observer, "observer is null");     try {         observer = RxJavaPlugins.onSubscribe(this, observer);         ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");         subscribeActual(observer);     } catch (NullPointerException e) { // NOPMD         throw e;     } catch (Throwable e) {         Exceptions.throwIfFatal(e);         // can't call onError because no way to know if a Disposable has been set or not         // can't call onSubscribe because the call might have set a Subscription already         RxJavaPlugins.onError(e);         NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");         npe.initCause(e);         throw npe;     } }

这里疏忽 ObjectHelper 和异样解决的代码只有两行代码是要害。先看 observer = RxJavaPlugins.onSubscribe(this, observer):

public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {     BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;     if (f != null) {         return apply(f, source, observer);     }     return observer; }

没想到啊没想到,你这浓眉大眼的家伙也是一个钩子办法。所以这行代码相当于 obsever = observer。接着看,subscribeActual(observer):

protected abstract void subscribeActual(Observer<? super T> observer);

这是一个形象办法,没啥好剖析的。接下来咱们要进入正题了,依据咱们编写的代码,是 ObservableSubscribeOn 这个对象调用了 subscribe 办法,所以咱们看看这个类的 subscribeActual 办法。

@Override public void subscribeActual(final Observer<? super T> s) {     final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);     s.onSubscribe(parent);     parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }

这里须要留神的是 s 这个参数,前面会有很多中央看到这个参数,肯定要搞清楚这个参数是谁传递过去的。比如说:

A.subscribe(B) 那么,参数 s 就是 B。

在咱们的代码中是 ObservableSubscribeOn.subscribe(o); 了解了这一点,咱们详细分析代码外面的内容:首先创立了一个 SubscribeOnObserver (大神教你起类名系列四)。而后调用了咱们创立的对象 o 的 onSubscribe 办法:

@Override public void onSubscribe(Disposable d) { }

咱们的 onSubscribe 办法外面啥都没做。不过一般来说,你应该调用一下 onStart 办法。

接下来是调用 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));,setDisposable 的办法不影响流程剖析,这里就先跳过了,有趣味的能够点进去看一下。而后就是 scheduler 变量,这个变量就是咱们应用 subscribeOn 传递的参数:

public final Observable<T> subscribeOn(Scheduler scheduler) {     ObjectHelper.requireNonNull(scheduler, "scheduler is null");     return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }  public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {     super(source);     this.scheduler = scheduler; }

这个 scheduler 就是 Schedulers.computation()。而后调用了它的 scheduleDirect 办法:

public Disposable scheduleDirect(@NonNull Runnable run) {     return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS); }  public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {     final Worker w = createWorker();     final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);     DisposeTask task = new DisposeTask(decoratedRun, w);     w.schedule(task, delay, unit);     return task; }

这两个办法是父类的,Schedulers.computation() 返回的是一个 ComputationScheduler 对象,这里找具体的实现类因为调用链比拟长,就不给出了,本人点着点着就能找到了。看看 ComputationScheduler 有没有复写这两个办法:

@NonNull @Override public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) {     PoolWorker w = pool.get().getEventLoop();     return w.scheduleDirect(run, delay, unit); }

它笼罩了父类的第2个 scheduleDirect 办法。这里就不深入分析外面的池了。看 w.scheduleDirect(run, delay, unit);:

public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {     ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));     try {         Future<?> f;         if (delayTime <= 0L) {             f = executor.submit(task);         } else {             f = executor.schedule(task, delayTime, unit);         }         task.setFuture(f);         return task;     } catch (RejectedExecutionException ex) {         RxJavaPlugins.onError(ex);         return EmptyDisposable.INSTANCE;     } }

相熟的线程池应用代码。心愿看到这里你还没有遗记咱们要剖析的是什么。简略的演绎一下,其实就是向咱们创立的 scheduler 外面提交了一个 runnable。最终这个 Runnable 必定会执行,那么看看这个 Runnable 外面有什么代码:

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

final class SubscribeTask implements Runnable {    private final SubscribeOnObserver<T> parent;    SubscribeTask(SubscribeOnObserver<T> parent) {        this.parent = parent;    }    @Override    public void run() {        source.subscribe(parent);    } }

run 办法外面就只有一句代码,然而咱们须要搞清楚这里的 source 和 parent 别离是哪个对象。

parent 能够间接看到是 SubscribeOnObserver 对象。

source 是应用的外部类的变量。

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {     super(source);     this.scheduler = scheduler; }

这里调用了 super 办法,所以构造函数里传递的变量就是 source。

public final Observable<T> subscribeOn(Scheduler scheduler) {     ObjectHelper.requireNonNull(scheduler, "scheduler is null");     return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }

这里是咱们之前剖析过的创立 ObservableSubscribeOn 的代码,这里的 this 指的是 observeOn 创立的 ObservableObserveOn 对象。心愿看到这里你没有搞晕,如果你是应用手机看的,并且看懂了,我是真的拜服。

也就是说,run 外面的代码就是调用了 ObservableObserveOn 对象的 subscribe 办法。之前咱们剖析过了,subscribe 办法实际上没有做什么,只是调用了 subscribeActual 办法,所以咱们进入这个类外部看看:

@Override protected void subscribeActual(Observer<? super T> observer) {     if (scheduler instanceof TrampolineScheduler) {         source.subscribe(observer);     } else {         Scheduler.Worker w = scheduler.createWorker();         source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));     } }

咱们在 observeOn 传递的 scheduler 不是 TrampolineScheduler 类型的,所以只须要看 else 的代码。这里是先创立了一个工作线程(因为咱们应用的是 AndroidScheduler,所以这里是指的主线程),而后调用了 source 的 subscribe 办法。须要留神的是这里最初创立了一个 ObserveOnObserver 对象(大神教你起类名系列五)。

先看看 createWork,在 HandlerScheduler 中:

@Override public Worker createWorker() {     return new HandlerWorker(handler); }

返回了一个 HandlerWorker 对象。再看 source.subscribe(),首先这里的 source 指的是 create 办法创立的 ObservableCreate 对象,调用 subscribe 传递的是 ObserveOnObserver。看看这个对象的 subscribeActual 办法:

@Override protected void subscribeActual(Observer<? super T> observer) {     CreateEmitter<T> parent = new CreateEmitter<T>(observer);     observer.onSubscribe(parent);     try {         source.subscribe(parent);     } catch (Throwable ex) {         Exceptions.throwIfFatal(ex);         parent.onError(ex);     } }

这里的参数 observer 是 ObserveOnObserver,source 是咱们代码中创立的 oos 对象。

首先创立了一个 CreateEmitter 对象。接着看 ObserveOnObserver 的 onSubscribe 办法做了啥:

@Override public void onSubscribe(Disposable s) {     if (DisposableHelper.validate(this.s, s)) {         this.s = s;         if (s instanceof QueueDisposable) {             @SuppressWarnings("unchecked")             QueueDisposable<T> qd = (QueueDisposable<T>) s;             int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);             if (m == QueueDisposable.SYNC) {                 sourceMode = m;                 queue = qd;                 done = true;                 actual.onSubscribe(this);                 schedule();                 return;             }             if (m == QueueDisposable.ASYNC) {                 sourceMode = m;                 queue = qd;                 actual.onSubscribe(this);                 return;             }         }         queue = new SpscLinkedArrayQueue<T>(bufferSize);         actual.onSubscribe(this);     } }

这里代码比拟长,只剖析重要的代码,就是 actual.onSubscribe 这句。actual 是构造函数中赋值的,所以咱们回到创立 ObserveOnObserver 的中央,actual 指的是 SubscribeOnObserver 对象。所以它调用了 SubscribeOnObserver 的 onSubscribe 办法。接下来剖析一下它的 onSubscribe 办法做了什么,这里不看也不会影响流程。

SubscribeOnObserver(Observer<? super T> actual) {     this.actual = actual;     this.s = new AtomicReference<Disposable>(); } @Override public void onSubscribe(Disposable s) {     DisposableHelper.setOnce(this.s, s); }

这个办法调用了 setOnce 办法:

public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {     ObjectHelper.requireNonNull(d, "d is null");     if (!field.compareAndSet(null, d)) {         d.dispose();         if (field.get() != DISPOSED) {             reportDisposableSet();         }         return false;     }     return true; }

这里波及到了乐观锁等玩意,简略来说就是先判断 field 的值是否为空,如果为空则设置为 d,不为空则将 d dispose。而后判断 field 的值,因为 field 的值只能设定一次非 DISPOSED 值,所以如果不为 DISPOSED,阐明曾经被设置过了,再报出异样,如果为 DISPOSED 是能够再次设置的。依照失常的流程,这里只是将 field 的值设置为 d,而后返回true。这个办法能够先不必管。

回到主线流程上,source.subscribe(parent); 这是最重要的一句代码。source 是在构造函数赋值的,看看构造方法:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {     ObjectHelper.requireNonNull(source, "source is null");     return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }

也就是说这里的 source 是咱们代码中创立的 oos 对象。parent 是新创建的 CreateEmitter 对象。看看咱们 oos 的 subscribe 办法:

@Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {     Log.e("aprz", Thread.currentThread().getName());     final int max = 100;     for (int i = 1; i <= max; i++) {         e.onNext(i);     }     e.onComplete(); }

这里就是事件开始的终点。所有的事件都由 ObservableEmitter 开始发送,看看它的代码,它是一个接口,在咱们的例子中,它的实现类是 CreateEmitter,所有咱们剖析这个类的 onNext 办法:

@Override public void onNext(T t) {     if (t == null) {         onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));         return;     }     if (!isDisposed()) {         observer.onNext(t);     } }

isDisposed 办法返回 false 才会去调用 observer 的 onNext 办法,这个 observer 是谁呢? 看到这里咱们就要从后往前推一遍之前的代码了,不论你绝不失望,反正我是很失望。这的 observer 是 ObserveOnObserver 对象。

接下来咱们就进入 ObserveOnObserver 外面,看看它承受事件之后做了什么,下面的参数 e 就是:

@Override public void onNext(T t) {     if (done) {         return;     }     if (sourceMode != QueueDisposable.ASYNC) {         queue.offer(t);     }     schedule(); }

调用了 schedule 办法:

void schedule() {     if (getAndIncrement() == 0) {         worker.schedule(this);     } }

向 work 中提交了一个 Runnable,这里传递的是 this。阐明它本人必定实现了这个接口,咱们看看它的 run 办法做了啥:

@Override public void run() {     if (outputFused) {         drainFused();     } else {         drainNormal();     } }

这里个别是走 drainNormal 吧,我猜的,咱们剖析这个办法吧。

void drainNormal() {     int missed = 1;     final SimpleQueue<T> q = queue;     final Observer<? super T> a = actual;     for (;;) {         if (checkTerminated(done, q.isEmpty(), a)) {             return;         }         for (;;) {             boolean d = done;             T v;             try {                 v = q.poll();             } catch (Throwable ex) {                 Exceptions.throwIfFatal(ex);                 s.dispose();                 q.clear();                 a.onError(ex);                 worker.dispose();                 return;             }             boolean empty = v == null;             if (checkTerminated(d, empty, a)) {                 return;             }             if (empty) {                 break;             }             a.onNext(v);         }         missed = addAndGet(-missed);         if (missed == 0) {             break;         }     } }

代码很长,具体做了啥咱们临时不必关怀,只须要留神到 a.onNexe(v) 这行代码,这个 a 是 actual 变量,actual 又是 SubscribeOnObserver 对象,咱们看看它的 onNext 办法:

@Override public void onNext(T t) {     actual.onNext(t); }

很简略,这里的 actual 就是咱们创立的 o 了,所以最终调用到了咱们的代码外面。

好了,到这里一个残缺的流程就整理出来了,然而还有一个问题没有解决,就是线程切换是产生在哪里。因为为了不影响整体流程的剖析,所以下面并没有去剖析线程切换的货色,上面开始剖析。

间接从 subscribeOn 开始,看 ObservableSubscribeOn 的代码:

@Override public void subscribeActual(final Observer<? super T> s) {     final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);     s.onSubscribe(parent);     parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }

从这里开始就进行了线程的切换,依据下面的剖析咱们晓得这里是将 SubscribeTask 作为一个 Runnable 对象给提交进了咱们指定的 scheduler (subscribeOn 传递的)中。所以前面的流程都是在 scheduler 所在的线程在运行。

再看 observeOn,看 ObservableObserveOn 的代码。

@Override protected void subscribeActual(Observer<? super T> observer) {     if (scheduler instanceof TrampolineScheduler) {         source.subscribe(observer);     } else {         Scheduler.Worker w = scheduler.createWorker();         source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));     } }

这里的线程切换是产生在 ObserveOnObserver 这个对象的外面。

void schedule() {     if (getAndIncrement() == 0) {         worker.schedule(this);     } }

schedule 的流程,咱们下面剖析过,worker.schedule(this) 这行代码就产生了线程切换,是将 this 作为 Runnable 对象提交到了咱们指定的(observerOn 传递的)scheduler 中。具体分析,因为之前的流程是在别的线程中,所以想要进行线程切换,最先想到的必定是 Handler。因为咱们传递的是 AndroidSchedulers.mainThread(),所以咱们就剖析这个吧。

AndroidSchedulers.mainThread() 的实现是 HandlerScheduler。看看它的 schedule 办法:

@Override public Disposable schedule(Runnable run, long delay, TimeUnit unit) {     if (run == null) throw new NullPointerException("run == null");     if (unit == null) throw new NullPointerException("unit == null");     if (disposed) {         return Disposables.disposed();     }     run = RxJavaPlugins.onSchedule(run);     ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);     Message message = Message.obtain(handler, scheduled);     message.obj = this; // Used as token for batch disposal of this worker's runnables.     handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));     // Re-check disposed state for removing in case we were racing a call to dispose().     if (disposed) {         handler.removeCallbacks(scheduled);         return Disposables.disposed();     }     return scheduled; }

post 了一个 msg,这样就实现了线程的切换。上面上一张图,有助于了解和记忆:

只须要了解,每次 observerOn 和 subscribeOn 的时候,外部都会创立一个新的 observable 和 observer。

。新创建的 observable 会援用后面的 observable,就是代码中咱们剖析的 source 变量。

。新创建的 observer 会援用后面的 observer,就是代码中咱们剖析的 actual 变量。

最初咱们 subscribe 的时候,是调用的最初创立的 observable 的办法。而每个 observable 外部又调用了 source 的 subscribe 办法,这样就造成了一层一层往前传递的调用链。当调用到最后面的一个 observable 的时候,就是咱们本人创立的 observable,在这里咱们须要手动触发该与该 observable 对应的 observer 对象的 onNext 办法。而 observer 的 onNext 办法的外部又调用了 actual 的 onNext 办法,这样就造成了一层一层往后传递的调用链。

总结
尽管在咱们的例子中,CreateEmitter 并不是一个 observer ,然而它也有 onNext 等办法,能够把它看做一个 observer。

如此,RxJava 的一个流程就理分明了。这货的流程和 OkHttp 怎么有点像,只是略微有点不一样。

补充一下对于背压的常识:在异步订阅的时候,应用 Observable,默认的缓冲大小是 128,超过 这个数量之后会 resize,也就是说会缓冲所有的事件,这样就会导致内存占用始终减少。

结语:后续会继续更新哦,喜爱的话点赞关注一下吧。
相干视频
【Android进阶】Rxjava与low响应式编程