关于rxjava:RxJava源码剖析

前言本篇的文章是基于Rxjava 2.1.2。从上面的一段代码中,咱们从源码的角度剖析 RxJava 的实现原理: ObservableOnSubscribe<Integer> oos = new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { final int max = 100; for (int i = 1; i <= max; i++) { e.onNext(max); } e.onComplete(); } }; Observer<Integer> o = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }; Observable.create(oos) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.computation()) .subscribe(o);剖析源码之前,咱们先定义一下名词,RxJava 是基于观察者模式的,这里将被观察者叫做主题(Source),观察者叫做观察者(Observer)。 ...

December 8, 2021 · 8 min · jiezi

RxJava2-Retrofit2-结合使用详解

不讲 rxjava 和 retrofit 而是直接上手 2 了,因为 2 封装的更好用的更多。 1. 观察者模式常见的 button 点击事件为例,button 是被观察者,listener 是观察者,setOnClickListener 过程是订阅,有了订阅关系后在 button 被点击的时候,监听者 listener 就可以响应事件。 这里的button.setOnClickListener(listener)看上去意思是被观察者订阅了观察者(杂志订阅了读者),逻辑上不符合日常生活习惯。其实这是设计模式的习惯,不必纠结,习惯了这种模式就利于理解观察者模式了。 2. RxJava 中的观察者模式Observable:被观察者(ble 结尾的单词一般表示 可...的,可观察的)Observer:观察者(er 结尾的单词一般表示 ...者,...人)subscribe:订阅首先创建 Observable 和 Observer,然后 observable.subscribe(observer),这样 Observable 发出的事件就会被 Observer 响应。一般我们不手动创建 Observable,而是由 Retrofit 返回给我们,我们拿到 Observable 之后只需关心如何操作 Observer 中的数据即可。 不过为了由浅入深的演示,还是手动创建 Observable 来讲解。 2.1 创建 Observable常见的几种方式,不常用的不写了,因为我觉得这个模块不是重点。 var observable=Observable.create(ObservableOnSubscribe<String> {...})var observable=Observable.just(...)var observable = Observable.fromIterable(mutableListOf(...))2.1.1 create()var observable2=Observable.create(object :ObservableOnSubscribe<String>{ override fun subscribe(emitter: ObservableEmitter<String>) { emitter.onNext("Hello ") emitter.onNext("RxJava ") emitter.onNext("GoodBye ") emitter.onComplete() } })ObservableOnSubscribe和ObservableEmitter都是陌生人,这个要是详细讲涉及到源码分析,东西可就多了(主要是我不熟悉),所以可以理解成 ObservableOnSubscribe 是用来帮助创建 Observable 的,ObservableEmitter 是用来发出事件的(这些事件在观察者 Observer 中可以响应处理)。 emitter 一次发射了三个事件,然后调用了 onComplete() 这些在下面讲观察者 Observer 时还会提到,一并讲解。 ...

September 30, 2019 · 6 min · jiezi

Java并发15-CompletableFuture-异步编程

前面我们不止一次提到,用多线程优化性能,其实不过就是将串行操作变成并行操作。如果仔细观察,你还会发现在串行转换成并行的过程中,一定会涉及到异步化,例如下面的示例代码,现在是串行的,为了提升性能,我们得把它们并行化。 // 以下两个方法都是耗时操作doBizA();doBizB();//创建两个子线程去执行就可以了,两个操作已经被异步化了。new Thread(()->doBizA()) .start();new Thread(()->doBizB()) .start(); 异步化,是并行方案得以实施的基础,更深入地讲其实就是:利用多线程优化性能这个核心方案得以实施的基础。Java 在 1.8 版本提供了 CompletableFuture 来支持异步编程。 CompletableFuture 的核心优势为了领略 CompletableFuture 异步编程的优势,这里我们用 CompletableFuture 重新实现前面曾提及的烧水泡茶程序。首先还是需要先完成分工方案,在下面的程序中,我们分了 3 个任务:任务 1 负责洗水壶、烧开水,任务 2 负责洗茶壶、洗茶杯和拿茶叶,任务 3 负责泡茶。其中任务 3 要等待任务 1 和任务 2 都完成后才能开始。这个分工如下图所示。 烧水泡茶分工方案 // 任务 1:洗水壶 -> 烧开水CompletableFuture<Void> f1 = CompletableFuture.runAsync(()->{ System.out.println("T1: 洗水壶..."); sleep(1, TimeUnit.SECONDS); System.out.println("T1: 烧开水..."); sleep(15, TimeUnit.SECONDS);});// 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{ System.out.println("T2: 洗茶壶..."); sleep(1, TimeUnit.SECONDS); System.out.println("T2: 洗茶杯..."); sleep(2, TimeUnit.SECONDS); System.out.println("T2: 拿茶叶..."); sleep(1, TimeUnit.SECONDS); return " 龙井 ";});// 任务 3:任务 1 和任务 2 完成后执行:泡茶CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf)->{ System.out.println("T1: 拿到茶叶:" + tf); System.out.println("T1: 泡茶..."); return " 上茶:" + tf; });// 等待任务 3 执行结果System.out.println(f3.join());void sleep(int t, TimeUnit u) { try { u.sleep(t); }catch(InterruptedException e){}}// 一次执行结果:T1: 洗水壶...T2: 洗茶壶...T1: 烧开水...T2: 洗茶杯...T2: 拿茶叶...T1: 拿到茶叶: 龙井T1: 泡茶...上茶: 龙井从整体上来看,我们会发现 ...

June 25, 2019 · 3 min · jiezi

Java编程方法论响应式RxJava与代码设计实战序

原文链接:《Java编程方法论:响应式RxJava与代码设计实战》序,来自于微信公众号:次灵均阁正文内容在《2019 一月的InfoQ 架构和设计趋势报告》1中,响应式编程(Reactive Programming)和函数式(Functional Programing)仍旧编列在第一季度(Q1)的 Early Adopters(早期采纳者) 中。尽管这仅是一家之言,然而不少的开发人员逐渐意识到 Reactive 之风俨然吹起。也许您的生产系统尚未出现 Reactive 的身影,不过您可能听说过 Spring WebFlux 或 Netflix Hystrix 等开源框架。笔者曾请教过 Pivotal(Spring 母公司)布道师 Josh Long2:”Spring 技术栈未来的重心是否要布局在 Reactive 之上?“。对方的答复是:”没错,Reactive 是未来趋势。“。同时,越来越多的开源项目开始签署 Reactive 宣言(The Reactive Manifesto)3,并喊出 ”Web Are Reactive“ 的口号。 或许开源界的种种举动无法说服您向 Reactive 的”港湾“中停靠,不过 Java 9 Flow API4 的引入,又给业界注入了一剂强心针。不难看出,无论是 Java API,还是 Java 框架均走向了 Reactive 编程模型的道路,这并非是一种巧合。 通常,人们谈到的 Reactive 可与 Reactive 编程划上等号,以”非阻塞(Non-Blocking)“和”异步(Asynchronous)“的特性并述,数据结构与操作相辅相成。Reactive 涉及函数式和并发两种编程模型,前者关注语法特性,后者强调执行效率。简言之,Reactive 编程的意图在于 ”Less Code,More Efficient“。除此之外,个人认为 Reactive 更大的价值在于统一 Java 并发编程模型,使得同步和异步的实现代码无异,同时做到 Java 编程风格与其他编程语言更好地融合,或许您也发现 Java 与 JS 在 Reactive 方面并不存在本质区别。纵观 Java 在 Reactive 编程上的发展而看,其特性更新可谓是步步为营,如履薄冰。尽管 Java 线程 API Thread 与 Runnable 就已具备异步以及非阻塞的能力,然而同步和异步编程的模式并不统一,并且理解 Thread API 的细节和管理线程生命周期的成本均由开发人员概括承受。虽然 Java 5 引入 J.U.C 框架(Java 并发框架)之后, ExecutorService 实现减轻了以上负担。不过开发人员仍需关注 ExecutorService 实现细节,比如怎样合理地设置线程池空间以及阻塞队列又成为新的挑战。为此,Java 7 又引入 ForkJoinPool API,不过此时的J.U.C 框架与 Reactive 理念仍存在距离,即使是线程安全的数据结构,也并不具备并行计算的能力,如:集合并行排序,同时操作集合的手段也相当的贫瘠,缺少类似 Map/Reduce 等操作。不过这些困难只是暂时的,终究被 Java 8 ”救赎“。Stream API 的出现不但具备数据操作在串行和并行间自由切换的能力,如 sequential() 以及 parallel() 方法,而且淡化了并发的特性,如 sorted() 方法即可能是传统排序,亦或是并行排序。相同的设计哲学也体现在 Java Reactive 实现框架中,如同书中提及的 RxJava5 API io.reactivex.Observable 。统一编程模型只是 Stream 其中设计目标之一,它结合 Lambda 语法特性,虽然提供了数量可观的操作方法,如 flatMap() 等,然而无论对比 RxJava,还是 Reactor6 ,Stream 操作方法却又相形见绌。值得一提的是,这些操作方法在 Reactive 的术语中称之为操作符(Operators)。当然框架内建的操作符的多与寡,并非判断其是否为 Reactive 实现的依据。其中决定性因素在于数据必须来源于发布方(生产者)的”推送(Push)“,而非消费端的”拉取(Pull)“。显然,Stream 属于消费端已就绪(Ready)的数据集合,并不存在其他数据推送源。不过 JVM 语言早期的 Reactive 定义处于模糊地带,如 RxJava API 属于观察者模式(Observer Pattern)7的扩展,而非迭代器(Iterator Pattern)模式8的实现。而 Reactor 的实现则拥抱 Reactive Streams 规范9 ,该规范消费端对于数据的操作是被动的处理,而非主动的索。换言之,数据的到达存在着不确定性10。当推送的数据无法得到消费端及时效应时,Reactive 框架必须提供背压(Backpressure)11实现,确保消费端拥有”拒绝的权利(cancel)”。在此理论基础上,Reactive Streams 规范定义了一套抽象的 API,作为 Java 9 java.util.concurrent.Flow API 的顶层设计。不过关于操作符的部分,该规范似乎不太关心,这也是为什么 RxJava 和 Reactor 均称自身为 Reactive 扩展框架的原因,同时两者在 API 级别提供多种调度器(Schedulers)12实现,适配不同并发场景提供。尽管 Reactive 定义在不同的阵营之间存在差异,援引本人在《Reactive-Programming-一种技术-各自表述》13文中的总结: ...

June 20, 2019 · 3 min · jiezi

Rxjava2x源码解析二-线程切换

上一篇文章Rxjava2.x源码解析(一): 订阅流程中我们讲了 RxJava2 的订阅部分的源码。但 RxJava2 最强大的部分其实是在异步。默认情况下,下游接收事件所在的线程和上游发送事件所在的线程是同一个线程。接下来我们在上一篇文章的示例代码中加入线程切换相关代码: // 上游 observable Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "subscribe: "); emitter.onNext(1); emitter.onNext(2); emitter.onComplete(); } }); // 下游 observer Observer<Integer> observer = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { // onSubscribe 方法会最先被执行 Log.d(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: "); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "); } @Override public void onComplete() { Log.d(TAG, "onComplete: "); } }; // 在子线程中进行事件的发送 observable.subscribeOn(Schedulers.newThread()) // 切换到UI线程进行监听 .observeOn(AndroidSchedulers.mainThread()) // 将上游和下游进行关联 .subscribe(observer);我们通过subscribeOn(Schedulers.newThread())这行代码,就可以将我们上游的代码切换到子线程中去执行,通过observeOn(AndroidSchedulers.mainThread())又能指定下游监听的代码执行在主线程(这里的 AndroidSchedulers 并不是RxJava2 默认提供的,而是属于Android领域的,由RxAndroid这个库实现)。一行代码,就能自由切换上下游的代码执行的线程,这么骚的操作,到底是怎么实现的呢? ...

May 21, 2019 · 10 min · jiezi

Rxjava2x源码解析一-订阅流程

现在网上已经有大量的源码分析文章,各种技术的都有。但我觉得很多文章对初学者并不友好,让人读起来云里雾里的,比源码还源码。究其原因,是根本没有从学习者的角度去分析。在自己完成了源码阅读之后,却忘记了自己是如何一步步提出问题,进而走到这里的。 所以,我想在本篇及以后的文章中,花更多的精力去进行源码的分析,争取用浅显易懂的语言,用适合的逻辑去组织内容。这样不至于陷入源码里,导致文章难懂。尽量让更多的人愿意去读源码。 阅读本文,你需要对 RxJava2 的一些基本使用有所了解,不过不用太深。这里推荐下Season_zlc的给初学者的RxJava2.0教程(一),比较浅显易懂。 提到 RxJava,你第一个想到的词是什么? “异步”。 RxJava 在 GitHub 上的官网主页也说了,“RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.”(RxJava是一个使用可观测序列来组建异步、基于事件的程序的库,它是 Reactive Extensions 在Java虚拟机上的一个实现)。它的优点嘛,用扔物线凯哥的话讲,就是“简洁”,并且“随着程序逻辑变得越来越复杂,它依然能够保持简洁”。 这里要注意一点,虽然对大多数人来讲,更多的是使用 RxJava 来配合 Retrofit、OkHttp 进行网络请求框架的封装及数据的异步处理,但是,RxJava和网络请求本质上没有半毛钱的关系。它的本质,官网已经说的很明白了,就是“异步”。 RxJava 基于观察者模式实现,基于事件流进行链式调用。 首先,我们需要添加必要的依赖,这里以最新的2.2.8版本为例: implementation "io.reactivex.rxjava2:rxjava:2.2.8"当然,对于 Android 项目来讲,我们一般还需要添加一个补充库: implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'这个库其实就是提供了 Android 相关的主线程的支持。 然后写个简单的代码,就可以开始我们的源码分析啦。 // 上游 observable Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "subscribe: "); emitter.onNext(1); emitter.onNext(2); emitter.onComplete(); } }); // 下游 observer Observer<Integer> observer = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { // onSubscribe 方法会最先被执行 Log.d(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: "); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "); } @Override public void onComplete() { Log.d(TAG, "onComplete: "); } }; // 将上游和下游进行关联 observable.subscribe(observer);为便于理解,我故意将可以链式调用的代码,拆成了三部分。你完全可以写成下面的链式风格: ...

May 21, 2019 · 5 min · jiezi

RxHttp-一条链发送请求新一代Http请求神器一

简介RxHttp是基于OkHttp的二次封装,并于RxJava做到无缝衔接,一条链就能发送一个完整的请求。主要功能如下: 支持Get、Post、Put、Delete等任意请求方式,可自定义请求方式支持Json、DOM等任意数据解析方式,可自定义数据解析器支持文件下载/上传,及进度的监听,并且支持断点下载支持在Activity/Fragment的任意生命周期方法,自动关闭未完成的请求支持添加公共参数/头部信息,且可动态更改baseUrl支持请求串行和并行gradle依赖 implementation 'com.rxjava.rxhttp:rxhttp:1.0.3'//注解处理器,生成RxHttp类,即可一条链发送请求annotationProcessor 'com.rxjava.rxhttp:rxhttp-compiler:1.0.3'//管理RxJava及生命周期,Activity/Fragment 销毁,自动关闭未完成的请求implementation 'com.rxjava.rxlife:rxlife:1.0.4'RxHttp 源码RxLife 源码 初始化//设置debug模式,此模式下有日志打印HttpSender.setDebug(boolean debug)//非必须,只能初始化一次,第二次将抛出异常HttpSender.init(OkHttpClient okHttpClient)//或者,调试模式下会有日志输出HttpSender.init(OkHttpClient okHttpClient, boolean debug)此步骤是非必须的,不初始化或者传入null即代表使用默认OkHttpClient对象。 疑问:标题不是说好的是RxHttp,这么用HttpSender做一些初始化呢?这里先卖一个关子,后面会解答 添加公共参数/头部及重新设置url相信大多数开发者在开发中,都遇到要为Http请求添加公共参数/请求头,甚至要为不同类型的请求添加不同的公共参数/请求头,为此,RxHttp为大家提供了一个静态接口回调,如下,每发起一次请求,此接口就会被回调一次,并且此回调在子线程进行(在请求执行线程回调) HttpSender.setOnParamAssembly(new Function() { @Override public Param apply(Param p) { if (p instanceof GetRequest) {//根据不同请求添加不同参数 } else if (p instanceof PostRequest) { } else if (p instanceof PutRequest) { } else if (p instanceof DeleteRequest) { } //可以通过 p.getSimpleUrl() 拿到url更改后,重新设置 //p.setUrl(""); return p.add("versionName", "1.0.0")//添加公共参数 .addHeader("deviceType", "android"); //添加公共请求头 }});然后有些请求我们不希望添加公共参数/请求头,RxHttp又改如何实现呢?很简单,发起请求前,设置不添加公共参数,如下: Param param = Param.get("http://...") //设置是否对Param对象修饰,即是否添加公共参数,默认为true .setAssemblyEnabled(false); //设为false,就不会回调上面的静态接口到这,也许你们会有疑问,Param 是什么东东,下面就为大家讲解。 ...

May 6, 2019 · 3 min · jiezi

RxLifecycle笔记

添加依赖 implementation ‘com.trello.rxlifecycle2:rxlifecycle:2.2.1’ implementation ‘com.trello.rxlifecycle2:rxlifecycle-android-lifecycle:2.2.1’在rxlifecycle依赖包下游如下几个关键类RxLifecycleLifecycleProviderLifecycleTransformaer在rxlifecycle-android-lifecycle依赖包下有如下几个关键类AndroidLifecycleRxLifecycleAndroidLifecycle不直接使用RxLifecycle,而是使用AndroidLifecycle.如何创建AndroidLifecycle.//AndroidLifecycle.javaLifecycleProvider<Lifecycle.Event> provider = AndroidLifecycle.createLifecycleProvider(this);AndroidLifecycle实现了android.arch.lifecycle.LifecycleObserver.可以通过注解@OnLifecycleEvent来监听对应的Lifecycle.Event. @OnLifecycleEvent(Lifecycle.Event.ON_ANY) void onEvent(LifecycleOwner owner, Lifecycle.Event event) { //向下游传递 lifecycleSubject.onNext(event); if (event == Lifecycle.Event.ON_DESTROY) { //解除观察者 owner.getLifecycle().removeObserver(this); } }这里的lifecycleSubject是一个BehaviorSubject对象.既可以作为观察者,也可以作为被观察对象使用.当监听到对应的Lifecycle.Event时,就会通过lifecycleSubject.onNext(event);向下游传递.在AndroidLifecycle中定义了两个绑定相关的方法.通过这两个绑定方法,将lifecycleSubject与下游连接起来,才能确保lifecycleSubject携带的信息能传递给下游. /** * 绑定某个具体的生命周期环节 * event具体为 * ON_CREATE * ON_START * ON_RESUME * ON_PAUSE * ON_STOP * ON_DESTROY * ON_ANY / public <T> LifecycleTransformer<T> bindUntilEvent(@NonNull Lifecycle.Event event) { return RxLifecycle.bindUntilEvent(lifecycleSubject, event); } /* * 绑定到生命周期 */ public <T> LifecycleTransformer<T> bindToLifecycle() { return RxLifecycleAndroidLifecycle.bindLifecycle(lifecycleSubject); }第一个方法使用到了RxLifecycle.bindUntilEvent方法.public static <T, R> LifecycleTransformer<T> bindUntilEvent(@Nonnull final Observable<R> lifecycle, @Nonnull final R event) { checkNotNull(lifecycle, “lifecycle == null”); checkNotNull(event, “event == null”); return bind(takeUntilEvent(lifecycle, event)); }这里的takeUntilEvent方法是判断lifecycle所携带的event是否与参数event一致.bind方法源码如下: public static <T, R> LifecycleTransformer<T> bind(@Nonnull final Observable<R> lifecycle) { return new LifecycleTransformer<>(lifecycle); }创建了一个LifecycleTransformer对象.public final class LifecycleTransformer<T> implements ObservableTransformer<T, T>, FlowableTransformer<T, T>, SingleTransformer<T, T>, MaybeTransformer<T, T>, CompletableTransformer{ final Observable<?> observable; LifecycleTransformer(Observable<?> observable) { checkNotNull(observable, “observable == null”); this.observable = observable; } @Override public ObservableSource<T> apply(Observable<T> upstream) { return upstream.takeUntil(observable); } @Override public Publisher<T> apply(Flowable<T> upstream) { return upstream.takeUntil(observable.toFlowable(BackpressureStrategy.LATEST)); } @Override public SingleSource<T> apply(Single<T> upstream) { return upstream.takeUntil(observable.firstOrError()); } @Override public MaybeSource<T> apply(Maybe<T> upstream) { return upstream.takeUntil(observable.firstElement()); } @Override public CompletableSource apply(Completable upstream) { return Completable.ambArray(upstream, observable.flatMapCompletable(Functions.CANCEL_COMPLETABLE)); } }先说一下takeUntil操作符的作用. ObservableA.takeUntil(ObservableB);当ObservableB开始发射数据,ObservableA停止发射数据.那么对于LifecycleTransformer,当observable开始发射数据,upstream就会停止发射数据.这里的observable就是AndroidLifecycle中的BehaviorSubject.而upstream就是我们自己的数据源.LifecycleTransformer通过与RxJava2的操作符compose结合使用. val lifecycleProvider = AndroidLifecycle.createLifecycleProvider(this)observable.compose(lifecycleProvider.bindUntilEvent(Lifecycle.Event.ON_DESTROY)) ...

March 25, 2019 · 1 min · jiezi

RxJava2:线程调度

subscribeOnObservable.subscribeOn()在方法内部生成了一个ObservableSubscribeOn对象.主要看一下ObservableSubscribeOn的subscribeActual方法. @Override public void subscribeActual(final Observer<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer); //调用下游的Observer的onSubscribe方法 observer.onSubscribe(parent); //通过SubscribeTask执行了上游Observable的subscribeActual方法 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }scheduler.scheduleDirect(Runnable)用于执行SubscribeTask这个任务.SubscribeTask本身是Runnable的实现类.看一下其run方法. @Override public void run() { //上游的Observable.subscribe方法被切换到了新的线程 source.subscribe(parent); }首先可以得出结论:subscribeOn将上游的Observable的subscribe方法切换到了新的线程.如果多次调用subscribeOn切换线程,会有什么效果?由下往上,每次调用subscribeOn,都会导致上游的Observable的subscribeActual切换到指定的线程.那么最后一次调用的切换最上游的创建型操作符的subscribeActual的执行线程.如果操作符有默认执行线程怎么办?操作符默认线程如果是创建型操作符,处于最上游,那么subscribeOn的线程切换对它不起作用.天高皇帝远,县官不如现管.就是这个道理.如果是其它操作符,会是怎样的?以操作符timeout为例:它对应ObservableTimeoutTimed和TimeoutObserver @Override public void onNext(T t) { downstream.onNext(t); //超时计时 startTimeout(idx + 1); } void startTimeout(long nextIndex) { //交给操作符默认的线程执行 task.replace(worker.schedule(new TimeoutTask(nextIndex, this), timeout, unit)); } @Override public void onError(Throwable t) { downstream.onError(t); } @Override public void onComplete() { downstream.onComplete(); } } @Override public void onTimeout(long idx) { downstream.onError(new TimeoutException(timeoutMessage(timeout, unit))); }//TimeoutTask.javastatic final class TimeoutTask implements Runnable { final TimeoutSupport parent; final long idx; TimeoutTask(long idx, TimeoutSupport parent) { this.idx = idx; this.parent = parent; } @Override public void run() { parent.onTimeout(idx); } }可以看到操作符默认的执行线程只用来做超时计时任务,如果超时了,会在操作符的默认线程执行onError方法..操作符默认线程对下游的observer造成什么影响要做具体对待.observeOnobserveOn对应ObservableObserveOn和ObserveOnObserver. //ObservableObserveOn.java @Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } } //ObserveOnObserver.java @Override public void onSubscribe(Disposable d) { if (DisposableHelper.validate(this.upstream, d)) { if (d instanceof QueueDisposable) { if (m == QueueDisposable.SYNC) { //执行下游Observer的onSubscribe方法 downstream.onSubscribe(this); schedule(); return; } if (m == QueueDisposable.ASYNC) { //执行下游Observer的onSubscribe方法 downstream.onSubscribe(this); return; } } //执行下游Observer的onSubscribe方法 downstream.onSubscribe(this); } } @Override public void onNext(T t) { //省略 schedule(); } @Override public void onError(Throwable t) { //省略 schedule(); } void schedule() { if (getAndIncrement() == 0) { /* ObserveOnObserver是Runnable的实现类.交给线程池执行 */ worker.schedule(this); } } void drainNormal() { final Observer<? super T> a = downstream; for (;;) { for (;;) { T v; try { v = q.poll(); } catch (Throwable ex) { a.onError(ex); return; } //执行下游Observer的onNext方法 a.onNext(v); } } } void drainFused() { for (;;) { if (!delayError && d && ex != null) { //执行下游Observer的onError方法 downstream.onError(error); return; } downstream.onNext(null); if (d) { ex = error; if (ex != null) { //执行下游Observer的onError方法 downstream.onError(ex); } else { //执行下游Observer的onComplete方法 downstream.onComplete(); } return; } } } //执行线程任务 @Override public void run() { if (outputFused) { drainFused(); } else { drainNormal(); } }从上面可以看出ObservableObserveOn在其subscribeActual方法中并没有切换上游Observable的subscribe方法的执行线程.但是ObserveOnObserver在其onNext,onError和onComplete中通过schedule()方法将下游Observer的各个方法切换到了新的线程.得出结论: observeOn负责切换的是下游Observer的各个方法的执行线程如果下游多次通过observeOn切换线程,会有什么效果?每次切换都会对其下游造成影响,直到遇到下一个observeOn为止.Observer(onSubscribe,onNext,onError,onComplete)onNext,onError,onComplete与上游最近的observeOn所切换的线程保持一致.onSubscribe则不同.遇到线程切换的时候,会首先在对应的Observable的subscribeActual方法内,先调用observer.onSubscribe方法.而observer.onSubscribe会逐级向上传递直到最上游,而最上游的observer.onSubscribe是在subscribeActual方法内调用,这是在主线程执行的.所以onSubscribe方法无论如何都是在主线程执行.doOnSubscribe.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { } })我们要看的是方法accept的执行线程. 通过源码找到对应的DisposableLambdaObserver. @Override public void onSubscribe(Disposable d) { //在这里调用了accept方法. onSubscribe.accept(d); }这就要看上游在哪个线程执行了Observer.onSubscribe(disposable)方法.在创建型操作符的subscribeActual方法和subscribeOn对应的Observable的subscribeActual方法内调用了Observer.onSubscribe(disposable)方法.那么这两处的执行线程就决定了onSubscribe.accept(d);的执行线程.doFinally对应ObservableDoFinally和DoFinallyObserver //DoFinallyObserver.java @Override public void onError(Throwable t) { runFinally(); } @Override public void onComplete() { runFinally(); } @Override public void dispose() { runFinally(); } void runFinally() { onFinally.run(); } 可以看到与它所对应的DoFinallyObserver的onError,onComplete,dispose方法的执行线程有关,这三个方法的执行线程又受到上游的observeOn的影响.如果没有observeOn,则会受到最上游的observable.subscribeActual方法影响.doOnError对应ObservableDoOnEach和DoOnEachObserver//DoOnEachObserver.java @Override public void onError(Throwable t) { onError.accept(t); }和自身对应的observer.onError所在线程保持一致.doOnNext对应ObservableDoOnEach和DoOnEachObserver//DoOnEachObserver.java @Override public void onNext(T t) { onNext.accept(t); }和自身对应的observer.onNext所在线程保持一致.操作符对应方法参数的执行线程包io.reactivex.functions下的接口类一般用于处理上游数据然后往下传递.这些接口类的方法一般在对应的observer.onNext中调用.所以他们的线程保持一致.总结:subscribeOn由下往上逐级切换Observable.subscribe的执行线程,不受observeOn影响,也不受具有默认指定线程的非创建型操作符影响,但是会被更上游的subscribeOn夺取线程切换的权利,直到最上游.如果最上游的创建型操作符也有默认执行线程,那么任何一个subscribeOn的线程切换不起作用.subscribeOn由下向上到达最上游后,然后由上往下影响下游的observer的执行线程.遇到observeOn会被夺取线程切换的权利.observeOn影响的是下游的observer的执行线程,由上往下,遇到另一个observeOn会移交线程控制权力,遇到指定默认线程非创建型的操作符,要视具体情况对待. ...

March 14, 2019 · 2 min · jiezi

【Android】RxJava + Retrofit完成网络请求

前言本文基于RxJava、Retrofit的使用,若是对RxJava或Retrofit还不了解的简友可以先了解RxJava、Retrofit的用法再来看这篇文章。在这片文章之前分别单独介绍过Rxjava以及Retrofit的使用:Android Retrofit 2.0 的使用Android RxJava的使用(一)基本用法(以及后面的几篇,就不一一列出了)使用在了解了RxJava和Retrofit分别的用法后,RxJava、Retrofit的搭配使用也就不再话下了。先看看使用Retrofit完成一次网络请求是怎样的单独使用Retrofit1、先写一个serviceinterface MyService { @GET(“user/login” ) Call<UserInfo> login( @Query(“username”) String username, @Query(“password”) String password );}2、获取Call执行网络请求 Retrofit retrofit = new Retrofit.Builder() .addConverterFactory(GsonConverterFactory.create()) .baseUrl(BASE_URL) .build(); MyService service = retrofit.create(MyService.class); Call<UserInfo> call = service.login(“1111”, “ssss”); call.enqueue(new Callback<UserInfo>() { @Override public void onResponse(Call<UserInfo> call, Response<UserInfo> response) { //请求成功操作 } @Override public void onFailure(Call<UserInfo> call, Throwable t) { //请求失败操作 } });以上是Retrofit单独使用时的做法。那Retrofit与RxJava结合是怎样使用的?下面就来说说这篇文章的重点。RxJava + Retrofit完成网络请求1、添加依赖。前四个分别是RxJava、RxAndroid、Retrofit以及Gson的库,最后那个才是新加入的,RxJava + Retrofit的使用需要用到最后那个包。 compile ‘io.reactivex:rxjava:x.y.z’ compile ‘io.reactivex:rxandroid:1.0.1’ compile ‘com.squareup.retrofit2:retrofit:2.0.2’ compile ‘com.squareup.retrofit2:converter-gson:2.0.2’ compile ‘com.squareup.retrofit2:adapter-rxjava:2.0.2’注意:最后三个包的版本号必须一样,这里用的是2.0.2。2、写一个登录的serviceinterface MyService { @GET(“user/login” ) Observable<UserInfo> login( @Query(“username”) String username, @Query(“password”) String password );}相比之前的service,这里getNews方法的返回值是Observable类型。Observable…是不是觉得很熟悉,这货不就是之前在RxJava使用到的被监听者?3、使用Observable完成一个网络请求,登录成功后保存数据到本地。 Retrofit retrofit = new Retrofit.Builder() .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(RxJavaCallAdapterFactory.create())//新的配置 .baseUrl(BASE_URL) .build(); MyService service = retrofit.create(MyService.class); service.login(phone, password) //获取Observable对象 .subscribeOn(Schedulers.newThread())//请求在新的线程中执行 .observeOn(Schedulers.io()) //请求完成后在io线程中执行 .doOnNext(new Action1<UserInfo>() { @Override public void call(UserInfo userInfo) { saveUserInfo(userInfo);//保存用户信息到本地 } }) .observeOn(AndroidSchedulers.mainThread())//最后在主线程中执行 .subscribe(new Subscriber<UserInfo>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { //请求失败 } @Override public void onNext(UserInfo userInfo) { //请求成功 } });RxJava + Retrofit 形式的时候,Retrofit 把请求封装进 Observable ,在请求结束后调用 onNext() 或在请求失败后调用 onError()。可以看到,调用了service的login方法后得到Observable对象,在新的线程中执行网络请求,请求成功后切换到io线程执行保存用户信息的动作,最后再切换到主线程执行请求失败onError()、请求成功onNext()。整体的逻辑十分清晰都在一条链中,就算还有别的要求还可以往里面添加,丝毫不影响代码的简洁。(终于举了一个有实际意义的例子)注意:retrofit的初始化加了一行代码addCallAdapterFactory(RxJavaCallAdapterFactory.create())RxJava + Retrofit 进阶在上面举到登录后保存用户信息的例子,其实在做项目的时候,往往在登录后得到的并不是用户信息。一般登录后会得到token,然后根据token去获取用户的信息。他们的步骤是这样的:1、登录2、获取用户信息(前提:登录成功)可以看得出来,这是一个嵌套的结构…嵌套啊!!!天呐,最怕嵌套的结构了。使用RxJava + Retrofit来完成这样的请求(借用抛物线的例子,稍微做了点改动) //登录,获取token@GET("/login")public Observable<String> login( @Query(“username”) String username, @Query(“password”) String password); //根据token获取用户信息@GET("/user")public Observable<User> getUser( @Query(“token”) String token);//…………………………….service.login(“11111”, “22222”) .flatMap(new Func1<String, Observable<User>>() { //得到token后获取用户信息 @Override public Observable<User> onNext(String token) { return service.getUser(token); }) .subscribeOn(Schedulers.newThread())//请求在新的线程中执行请求 .observeOn(Schedulers.io()) //请求完成后在io线程中执行 .doOnNext(new Action1<User>() { //保存用户信息到本地 @Override public void call(User userInfo) { saveUserInfo(userInfo); } }) .observeOn(AndroidSchedulers.mainThread())//在主线程中执行 .subscribe(new Observer<User>() { @Override public void onNext(User user) { //完成一次完整的登录请求 userView.setUser(user); } @Override public void onCompleted() { } @Override public void onError(Throwable error) { //请求失败 } });通过一个flatMap()轻松完成一次嵌套的请求,而且逻辑十分清晰。so easy~~~小结RxJava的实用性从上面的两个例子慢慢体现了出来,逻辑越是复杂,RxJava的优势就越明显。RxJava的使用就暂时介绍到这里吧,使用过程中遇到好用的再出来跟大家分享。以上有错误之处感谢指出参考:给 Android 开发者的 RxJava 详解(本文部分内容引用自该博客) ...

February 22, 2019 · 2 min · jiezi

【Android】RxJava的使用(四)线程控制 —— Scheduler

前言经过前几篇的介绍,对RxJava对模式有了一定的理解:由Observable发起事件,经过中间的处理后由Observer消费。(对RxJava还不了解的可以出门左拐)之前的代码中,事件的发起和消费都是在同一个线程中执行,也就是说之前我们使用的RxJava是同步的~~~观察者模式本身的目的不就是后台处理,将处理结果回调给前台?这同步的是要哪样?所以,这篇为大家介绍RxJava的重要的概念——Scheduler参考:给 Android 开发者的 RxJava 详解(本文部分内容引用自该博客)介绍RxJava在不指定线程的情况下,发起时间和消费时间默认使用当前线程。所以之前的做法 Observable.just(student1, student2, student2) //使用map进行转换,参数1:转换前的类型,参数2:转换后的类型 .map(new Func1<Student, String>() { @Override public String call(Student i) { String name = i.getName();//获取Student对象中的name return name;//返回name } }) .subscribe(new Action1<String>() { @Override public void call(String s) { nameList.add(s); } });因为是在主线程中发起的,所以不管中间map的处理还是Action1的执行都是在主线程中进行的。若是map中有耗时的操作,这样会导致主线程拥塞,这并不是我们想看到的。SchedulerScheduler:线程控制器,可以指定每一段代码在什么样的线程中执行。模拟一个需求:新的线程发起事件,在主线程中消费 private void rxJavaTest3() { Observable.just(“Hello”, “Word”) .subscribeOn(Schedulers.newThread())//指定 subscribe() 发生在新的线程 .observeOn(AndroidSchedulers.mainThread())// 指定 Subscriber 的回调发生在主线程 .subscribe(new Action1<String>() { @Override public void call(String s) { Log.i(TAG, s); } });上面用到了subscribeOn(),和observeOn()方法来指定发生的线程和消费的线程。subscribeOn():指定subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。observeOn():指定Subscriber 所运行在的线程。或者叫做事件消费的线程。以及参数Scheduler,RxJava已经为我们提供了一下几个SchedulerSchedulers.immediate():直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。Schedulers.newThread():总是启用新线程,并在新线程执行操作。Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。Schedulers.computation():计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。 AndroidSchedulers.mainThread():它指定的操作将在 Android 主线程运行。多次切换线程看完上面的介绍想必对RxJava线程的切换有了一些理解,上面只是对事件的发起和消费制定了线程。如果中间有map之类的操作呢?是否可以实现发起的线程在新线程中,map的处理在IO线程,最后的消费在主线程中。 Observable.just(“Hello”, “Wrold”) .subscribeOn(Schedulers.newThread())//指定:在新的线程中发起 .observeOn(Schedulers.io()) //指定:在io线程中处理 .map(new Func1<String, String>() { @Override public String call(String s) { return handleString(s); //处理数据 } }) .observeOn(AndroidSchedulers.mainThread())//指定:在主线程中处理 .subscribe(new Action1<String>() { @Override public void call(String s) { show(s); //消费事件 } });可以看到observeOn()被调用了两次,分别指定了map的处理的现场和消费事件show(s)的线程。若将observeOn(AndroidSchedulers.mainThread())去掉会怎么样?不为消费事件show(s)指定线程后,show(s)会在那里执行?其实,observeOn() 指定的是它之后的操作所在的线程。也就是说,map的处理和最后的消费事件show(s)都会在io线程中执行。observeOn()可以多次使用,可以随意变换线程小结学会线程控制后才算是真正学会了使用RxJava。RxJava的使用十分灵活,想要对其熟悉使用只有一个办法,那就是多用啦,熟能生巧。以上有错误之处感谢指出参考:给 Android 开发者的 RxJava 详解(本文部分内容引用自该博客) ...

February 22, 2019 · 1 min · jiezi

【Android】RxJava的使用(三)转换——map、flatMap

前两篇Android RxJava的使用(一)基本用法、Android RxJava的使用(二)Action介绍了RxJava的基本用法,对Rxjava还不了解的请先看以上两篇。这篇为大家讲解RxJava中map和flatMap的使用。参考:给 Android 开发者的 RxJava 详解(本文部分内容引用自该博客)回顾前两篇为大家介绍了使用RxJava打印多个字符串的方法 Observable.just(“Hellow”, “Wrold”).subscribe(new Action1<String>() { @Override public void call(String s) { Log.i(TAG, s); } });这样的例子基本没有实际用处,只是为大家演示如何使用Rxjava。今天就抛开这个例子。map在使用map之前要先说道一个接口:Func1,Func1和上一篇提到的Action1相似。Func1 和 Action的区别在于, Func1 包装的是有返回值的方法。接下来就是map的用法,看代码更直观点;例:得到多个Student对象中的name,保存到nameList中 Observable.just(student1, student2, student2) //使用map进行转换,参数1:转换前的类型,参数2:转换后的类型 .map(new Func1<Student, String>() { @Override public String call(Student i) { String name = i.getName();//获取Student对象中的name return name;//返回name } }) .subscribe(new Action1<String>() { @Override public void call(String s) { nameList.add(s); } });可以看到Observable中原来的参数是Student对象,而最后我们需要的是name,这里使用了map来实现这一转换的过程。当然,map可以多次使用。 //多次使用map,想用几个用几个 Observable.just(“Hello”, “World”) .map(new Func1<String, Integer>() {//将String类型的转化为Integer类型的哈希码 @Override public Integer call(String s) { return s.hashCode(); } }) .map(new Func1<Integer, String>() {//将转化后得到的Integer类型的哈希码再转化为String类型 @Override public String call(Integer integer) { return integer.intValue() + “”; } }) .subscribe(new Action1<String>() { @Override public void call(String s) { Log.i(TAG, s); } });flatMapflatMap是一个比教难理解的一个转换,在这里先假设一个需求,需要打印多个Student所学的课程。这跟之前获取Student的name又不同了,这里先确定一下关系,一个Student类中只有一个name,而一个Student却有多门课程(Course),Student我们可以理解成这样: /** * 学生类 / class Student { private String name;//姓名 private List<Course> coursesList;//所修的课程 … } /* * 课程类 */ class Course { private String name;//课程名 private String id; … }如果使用map来实现打印所有学生所修个课程名,实现的代码是这样的: List<Student> students = new ArrayList<Student>(); students.add… … Action1<List<Course>> action1 = new Action1<List<Course>>() { @Override public void call(List<Course> courses) { //遍历courses,输出cuouses的name for (int i = 0; i < courses.size(); i++){ Log.i(TAG, courses.get(i).getName()); } } }; Observable.from(students) .map(new Func1<Student, List<Course>>() { @Override public List<Course> call(Student student) { //返回coursesList return student.getCoursesList(); } }) .subscribe(action1);可以看到,在Action1中出现了for来循环打印课程名,使用RxJava就是为了剔除这样的嵌套结构,使得整体的逻辑性更强。这时候就可以使用flatMap了,使用flatMap实现的代码是这样的: List<Student> students = new ArrayList<Student>(); students.add… … Observable.from(students) .flatMap(new Func1<Student, Observable<Course>>() { @Override public Observable<Course> call(Student student) { return Observable.from(student.getCoursesList()); } }) .subscribe(new Action1<Course>() { @Override public void call(Course course) { Log.i(TAG, course.getName()); } });这样就实现了跟上面代码一样的效果,看起来有点懵?确实,flatMap理解起来有点绕,刚接触flatMap的时候我也是懵逼一个。下面我将flatMap的示意图,希望能帮助理解:由上图可以看出Student1、Student2经过flatMap后,按顺序依次经历了Observable1、Observable2,分别转化为Course。最后按顺序得到Course1、Course2、Course3、Course4、Course5、Course6,其中1-3由Student1得到,4-6由Student2得到。结合代码和示意图,是不是对flatMap有了一定的理解。注意:FlatMap对这些Observables发射的数据做的是合并(merge)操作,因此它们可能是交错的。也就说,传入的顺序可能跟出来的顺序不一样。如果要保证顺的的话,可以使用concatMap。其他操作符除了map和flatMap之外,还有其他操作符以供使用。这里就不一一列举他们的用法了,其他常用的操作符如下:filter:集合进行过滤each:遍历集合take:取出集合中的前几个skip:跳过前几个元素更多操作符小结看完map、flatMap后,慢慢能看到RxJava的实际用处了。不过只是这点功能的RxJava是远远不能满足我们的需求,更多的用法我只能在后面更新了。今天就到这里吧!!!以上有错误之处感谢指出更多:Android RxJava的使用(四)线程控制 —— Scheduler参考:给 Android 开发者的 RxJava 详解(本文部分内容引用自该博客) ...

February 22, 2019 · 2 min · jiezi

【Android】RxJava的使用(二)Action

回顾在上一节Android RxJava的使用(一)基本用法中,介绍了RxJava的基本用法。下面来回顾下实现一次RxJava的基本使用。例:分别打印"Hello"、" World" Observable.just(“Hello”, “World”) .subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { Log.i(TAG, s); } });可以看到,我们这里只用了onNext(obj),还有两个重写的onError(error)和onCompleted()并没有用到,这样导致我们多出了几行根本用不到的代码。于是就想能不能只写我们使用到的,其他几个没用到的就不写,这样的代码看着才舒服。接下来就是使用本次的主角Action来代替SubscriberAction上部分的代码使用Action来代替Subscriber得到的代码是这样的: Observable.just(“Hello”, “World”) .subscribe(new Action1<String>() { @Override public void call(String s) { Log.i(TAG, s); } });舒服多了有没有!!什么是ActionAction是RxJava 的一个接口,常用的有Action0和Action1。Action0: 它只有一个方法 call(),这个方法是无参无返回值的;由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。Ation1:它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;与 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是单参数无返回值的,因此 Action1 可以将 onNext(obj)和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调Action的使用定义三个对象,分别打包onNext(obj)、onError(error) 、onCompleted()。 Observable observable = Observable.just(“Hello”, “World”); //处理onNext()中的内容 Action1<String> onNextAction = new Action1<String>() { @Override public void call(String s) { Log.i(TAG, s); } }; //处理onError()中的内容 Action1<Throwable> onErrorAction = new Action1<Throwable>() { @Override public void call(Throwable throwable) { } }; //处理onCompleted()中的内容 Action0 onCompletedAction = new Action0() { @Override public void call() { Log.i(TAG, “Completed”); } };接下来使用subscribe重载的方法//使用 onNextAction 来定义 onNext()Observable.just(“Hello”, “World”).subscribe(onNextAction);//使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()Observable.just(“Hello”, “World”).subscribe(onNextAction, onErrorAction);//使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()Observable.just(“Hello”, “World”).subscribe(onNextAction, onErrorAction, onCompletedAction);根据实际情况使用以上的方法处理onNext(obj)、onError(error) 、onCompleted()的回调。现在有个疑问,为什么使用Action也能达到使用Subscriber的结果?进subscribe(Action1 onNext)的源码看看。还以为有多高深,原来就是把Action对象转化成对应的Subscriber对象了。这样就不难理解为什么可以使用Action来代替Subscriber了。重新写打印"Hello"、" Wrod"的方法 Observable.just(“Hello”, “Wrold”).subscribe(new Action1<String>() { @Override public void call(String s) { Log.i(TAG, s); } });相比第一篇那冗长的代码,现在感觉怎么样?还是觉得代码多?确实,如果只是打印几个字符串确实还是略微复杂。不急,RxJava还有其他功能呢。小结Action的使用为我们减少了不必要的代码,使得写出的代码看上去更加得简洁。不过就目前来看RxJava还是没有什么优势值得我们去使用,下一篇我还会介绍更多RxJava的用法。慢慢得,你就会发现使用RxJava写出来的代码看上去真的很漂亮。更多:更多:Android RxJava的使用(三)转换(map、flatMap)Android RxJava的使用(四)线程控制 —— Scheduler以上有错误之处感谢指出参考:给 Android 开发者的 RxJava 详解(本文部分内容引用自该博客) ...

February 22, 2019 · 1 min · jiezi

RxJava2:操作符执行顺序和数据传递方向

以Observable为例,先上代码://①ObservableJust<String> observable = (ObservableJust<String>) Observable.just(“hello rxjava2”);//② ObservableSubscribeOn<String> subscribe = (ObservableSubscribeOn<String>) observable.subscribeOn(Schedulers.io());//③ ObservableObserveOn<String> observerOn = (ObservableObserveOn<String>) subscribe.observeOn(AndroidSchedulers.mainThread());//④ ObservableDoFinally<String> doFinally = (ObservableDoFinally<String>) observerOn.doFinally(new Action() { @Override public void run() throws Exception { System.out.println(“doFinally”); } });//⑤ ObservableDoOnLifecycle<String> doOnSubscribe = (ObservableDoOnLifecycle<String>) doFinally.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { System.out.println(“doOnSubscribe: " + disposable.hashCode()); } });//⑥ doOnSubscribe.subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println(“onSubscribe: “+d.hashCode()); /* if (!d.isDisposed()){ System.out.println(“onSubscribe: dispose”); d.dispose(); }*/ } @Override public void onNext(String s) { System.out.println(“onNext: “+s); Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show(); } @Override public void onError(Throwable e) { System.out.println(“onError: “+e.getMessage()); Toast.makeText(MainActivity.this, e.getMessage(), Toast.LENGTH_SHORT).show(); } @Override public void onComplete() { System.out.println(“onComplete”); Toast.makeText(MainActivity.this, “onComplete”, Toast.LENGTH_SHORT).show(); } });这里每次调用一个操作符,返回的都是Observable的直接子类或者间接之类.以just为例: public static <T> Observable<T> just(T item) { ObjectHelper.requireNonNull(item, “The item is null”); return RxJavaPlugins.onAssembly(new ObservableJust<T>(item)); }这里重新new了一个Observable的子类对象ObservableJust.结论如下:每个操作符都会对应返回一个Observable的子类对象,类名格式ObservableXXX然后去调用下一个操作符.比如interval操作符,返回的是ObservableInterval的实例对象.对于Observable的创建型操作符,返回的是其直接子类,而其他操作符,返回的是AbstractObservableWithUpstream的子类对象.AbstractObservableWithUpstream的构造函数中,第一个参数就是Observable对象,这一点非常重要,这个参数是上一个操作符返回的Observable对象.这保证了整个调用流程的起始处的Observable对象能在整个流程中传递.操作符的调用是由上至下,顺序调用的.数据流的传递流程是怎么样的?数据流的传递如果没有最下游的观察者对数据做接收,整个调用流程是不会执行的.先从⑥开始看ObservableDoOnLifecycle的subscribe方法做了什么.@Override protected void subscribeActual(Observer<? super T> observer) { source.subscribe(new DisposableLambdaObserver<T>(observer, onSubscribe, onDispose)); }source就是上游操作符返回的Observable的子类对象,通过AbstractObservableWithUpstream的构造函数传递给下游的.这里去调用了上一个Observable对象的subscribe方法.这个调用由下至上,直到整个流程的起始处.下一步看数据是怎么传递的? ...

February 14, 2019 · 1 min · jiezi