关于rxjava:RxJava源码剖析
前言本篇的文章是基于Rxjava 2.1.2。从上面的一段代码中,咱们从源码的角度剖析 RxJava 的实现原理: ObservableOnSubscribe<Integer> oos = new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { final int max = 100; for (int i = 1; i <= max; i++) { e.onNext(max); } e.onComplete(); } }; Observer<Integer> o = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }; Observable.create(oos) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.computation()) .subscribe(o);剖析源码之前,咱们先定义一下名词,RxJava 是基于观察者模式的,这里将被观察者叫做主题(Source),观察者叫做观察者(Observer)。 ...