这是《应用Kotlin开发一个古代的APP》系列文章的第三局部,还没看过前2局部的,能够先看一下:

【译】应用Kotlin从零开始写一个古代Android 我的项目-Part1

【译】应用Kotlin从零开始写一个古代Android 我的项目-Part2

注释开始!

什么是RxJava ?

对于RxJava,一个宽泛的概念是-RxJava是用于异步编程的API的Java实现,它具备可察看流和响应式的API。实际上,它是这三个概念的联合:观察者模式、迭代器模式和函数式编程。这里也有其余编程语言实现的库,如:RxSwift、RxJs 、RxNet等。

我RxJava上手很难,有时,它的确很令人困惑,如果施行不当,可能会给您带来一些问题。尽管如此,咱们还是值得花工夫学习它。我将尝试通过简略的步骤来解释RxJava。

首先,让咱们答复一些简略的问题,当您开始浏览无关RxJava时,可能会问本人:

咱们真的须要它吗?

答案是否定的,RxJava只是能够在Android开发中应用的又一个库。如果应用Kotlin开发,它也不是必须的,我心愿你明确我说的,它只一个很帮忙你的库,就像你应用的所以其余库一样。

要学习RxJava2,必须先学RxJava1吗?

你能够间接从RxJava2开始,不过,作为Android开发人员,晓得这两种状况对你还是有益处的,因为你可能会参加保护其他人的RxJava1代码。

我看到有RxAndroid,应该应用RxAndroid还是RxJava?

RxJava能用在任何Java开发平台,不仅仅是Android,比方,对于后端开发来说,RxJava 能够与Spring等框架一起应用,RxAndroid是一个库,其中蕴含在Android中应用RxJava所需的库。因而,如果要在Android开发中应用RxJava,则必须再增加RxAndroid。稍后,我将解释RxAndroid基于RxJava所增加的内容。

咱们应用Kotlin开发,为什么不必RxKotin呢?

咱们没有必要另外再增加一个Rx 库了,因为Kotlin与Java是齐全兼容的,这里的确有一个RxKotin库:https://github.com/ReactiveX/... ,不过该库是在RxJava之上编写的。它只是将Kotlin性能增加到RxJava。您能够将RxJava与Kotlin一起应用,而无需应用RxKotlin库。为了简略起见,在这一部分中我将不应用RxKotlin。

如何将Rxjava2增加到我的项目中?

要应用RxJava,你须要在build.gradle中增加如下代码:

dependencies {    ...     implementation "io.reactivex.rxjava2:rxjava:2.1.8"    implementation "io.reactivex.rxjava2:rxandroid:2.0.1"    ...}

而后,点击sync,下载Rxjava库。

RxJava蕴含了些啥?

我想把RxJava分为以下三局部:

  • 1、用于观察者模式和数据流的类:ObservablesObservers
  • 2、Schedulers
  • 3、数据流操作符
ObservablesObservers

咱们曾经解释了这种模式。您能够将Observable视为数据的源(被观察者),将Observer视为接收数据的源(观察者)。

有很多创立Observables的形式,最简略的办法是应用Observable.just()来获取一个我的项目并创立Observable来发射该我的项目。

让咱们转到GitRepoRemoteDataSource类并更改getRepositories办法,以返回Observable:

class GitRepoRemoteDataSource {    fun getRepositories() : Observable<ArrayList<Repository>> {        var arrayList = ArrayList<Repository>()        arrayList.add(Repository("First from remote", "Owner 1", 100, false))        arrayList.add(Repository("Second from remote", "Owner 2", 30, true))        arrayList.add(Repository("Third from remote", "Owner 3", 430, false))        return Observable.just(arrayList).delay(2,TimeUnit.SECONDS)    }}

Observable <ArrayList <Repository >>示意Observable收回Repository对象的数组列表。如果要创立收回Repository对象的Observable <Repository>,则应应用Observable.from(arrayList)

.delay(2,TimeUnit.SECONDS)示意提早2s后才开始发射数据。

然而,等等!咱们并没有高数Observable何时发射数据啊?Observables通常在一些Observer订阅后就开始收回数据。

请留神,咱们不再须要以下接口了

interface OnRepoRemoteReadyCallback {    fun onRemoteDataReady(data: ArrayList<Repository>)}

GitRepoLocalDataSource:类中做同样的更改

class GitRepoLocalDataSource {    fun getRepositories() : Observable<ArrayList<Repository>> {        var arrayList = ArrayList<Repository>()        arrayList.add(Repository("First From Local", "Owner 1", 100, false))        arrayList.add(Repository("Second From Local", "Owner 2", 30, true))        arrayList.add(Repository("Third From Local", "Owner 3", 430, false))        return Observable.just(arrayList).delay(2, TimeUnit.SECONDS)    }    fun saveRepositories(arrayList: ArrayList<Repository>) {        //todo save repositories in DB    }}

同样的,也不须要这个接口了:

interface OnRepoLocalReadyCallback {    fun onLocalDataReady(data: ArrayList<Repository>)}

当初,咱们须要在repository中返回Observable

class GitRepoRepository(private val netManager: NetManager) {    private val localDataSource = GitRepoLocalDataSource()    private val remoteDataSource = GitRepoRemoteDataSource()    fun getRepositories(): Observable<ArrayList<Repository>> {        netManager.isConnectedToInternet?.let {            if (it) {                //todo save those data to local data store                return remoteDataSource.getRepositories()            }        }        return localDataSource.getRepositories()    }}

如果网络已连贯,咱们从近程数据源返回Observable,否则,从本地数据源返回Observable,同样的,咱们也不再须要OnRepositoryReadyCallback接口。

如你所料,咱们须要更改在MainViewModel中获取数据的形式。当初咱们应该从gitRepoRepository获取Observable并订阅它。一旦咱们向Observer订阅了该Observable,Observable将开始收回数据:

class MainViewModel(application: Application) : AndroidViewModel(application) {    ...    fun loadRepositories() {        isLoading.set(true)        gitRepoRepository.getRepositories().subscribe(object: Observer<ArrayList<Repository>>{            override fun onSubscribe(d: Disposable) {                //todo            }            override fun onError(e: Throwable) {               //todo            }            override fun onNext(data: ArrayList<Repository>) {                repositories.value = data            }            override fun onComplete() {                isLoading.set(false)            }        })    }}

一旦Observer订阅了Observable,onSubscribe办法将被调用,次要onSubscribe的参数Disposable,稍后将讲到它。

每当Observable收回数据时,将调用onNext()办法。当Observable实现s数据发射时,onComplete()将被调用一次。之后,Observable终止。

如果产生某些异样,onError()办法将被回调,而后Observable终止。这意味着Observable将不再收回数据,因而onNext()不会被调用,也不会调用onComplete()

另外,请留神。如果尝试订阅已终止的Observable,则将收到IllegalStateException

那么,RxJava如何帮忙咱们?

  • 首先,咱们解脱了这些接口,它是所有repository和数据源建设的样板接口。
  • 如果咱们应用接口,并且在数据层中产生某些异样,则咱们的应用程序可能会解体。应用RxJava谬误将在onError()办法中返回,因而咱们能够向用户显示适当的谬误音讯。
  • 因为咱们始终将RxJava用于数据层,它更清晰。
  • 我之前没有通知过你:以前的办法可能会导致内存透露。
应用RxJava2和ViewModel时,如何避免内存透露

咱们再一次看一下ViewModel的生命周期图

一旦Activity销毁,ViewModel的onCleared办法将被调用,在onCleared办法中,咱们须要勾销所有订阅

class MainViewModel(application: Application) : AndroidViewModel(application) {    ...        lateinit var disposable: Disposable    fun loadRepositories() {        isLoading.set(true)        gitRepoRepository.getRepositories().subscribe(object: Observer<ArrayList<Repository>>{            override fun onSubscribe(d: Disposable) {                disposable = d            }            override fun onError(e: Throwable) {                //if some error happens in our data layer our app will not crash, we will                // get error here            }            override fun onNext(data: ArrayList<Repository>) {                repositories.value = data            }            override fun onComplete() {                isLoading.set(false)            }        })    }    override fun onCleared() {        super.onCleared()        if(!disposable.isDisposed){            disposable.dispose()        }    }}

咱们能够优化一下下面的代码:

首先,应用DisposableObserver替换Observer,它实现了Disposable并且有dispose()办法,咱们不再须要onSubscribe()办法,因为咱们能够间接在DisposableObserver实例上调用dispose()

第二步,替换掉返回Void的.subscribe()办法,应用.subscribeWith() 办法,他能返回指定的Observer

class MainViewModel(application: Application) : AndroidViewModel(application) {    ...    lateinit var disposable: Disposable    fun loadRepositories() {        isLoading.set(true)        disposable = gitRepoRepository.getRepositories().subscribeWith(object: DisposableObserver<ArrayList<Repository>>() {            override fun onError(e: Throwable) {               // todo            }            override fun onNext(data: ArrayList<Repository>) {                repositories.value = data            }            override fun onComplete() {                isLoading.set(false)            }        })    }    override fun onCleared() {        super.onCleared()        if(!disposable.isDisposed){            disposable.dispose()        }    }}

下面的代码还能够持续优化:

咱们保留了一个Disposable实例,因而,咱们才能够在onCleared()回调中调用dispose(),然而等等!咱们须要为每一个调用都这样做吗?如果有10个回调,那么咱们得保留10个实例,在onCleared()中勾销10次订阅?显然不可能,这里有更好的办法,咱们应该将它们全副保留在一个存储桶中,并在调用onCleared()办法时,将它们全部一次解决。咱们能够应用CompositeDisposable

CompositeDisposable:可包容多个Disposable的容器

因而,每次创立一个Disposable,都须要将其增加到CompositeDisposable中:

class MainViewModel(application: Application) : AndroidViewModel(application) {    ...      private val compositeDisposable = CompositeDisposable()    fun loadRepositories() {        isLoading.set(true)        compositeDisposable.add(gitRepoRepository.getRepositories().subscribeWith(object: DisposableObserver<ArrayList<Repository>>() {            override fun onError(e: Throwable) {                //if some error happens in our data layer our app will not crash, we will                // get error here            }            override fun onNext(data: ArrayList<Repository>) {                repositories.value = data            }            override fun onComplete() {                isLoading.set(false)            }        }))    }    override fun onCleared() {        super.onCleared()        if(!compositeDisposable.isDisposed){            compositeDisposable.dispose()        }    }}

感激Kotlin的扩大函数,咱们还能够更进一步:

与C#和Gosu类似,Kotlin提供了应用新性能扩大类的能力,而不用继承该类,也就是扩大函数。

让咱们创立一个新的包,叫做extensions,并且增加一个新的文件RxExtensions.kt

import io.reactivex.disposables.CompositeDisposableimport io.reactivex.disposables.Disposableoperator fun CompositeDisposable.plusAssign(disposable: Disposable) {    add(disposable)}

当初咱们能够应用+ =符号将Disposable对象增加到CompositeDisposable实例:

class MainViewModel(application: Application) : AndroidViewModel(application) {    ...    private val compositeDisposable = CompositeDisposable()    fun loadRepositories() {        isLoading.set(true)        compositeDisposable += gitRepoRepository.getRepositories().subscribeWith(object : DisposableObserver<ArrayList<Repository>>() {            override fun onError(e: Throwable) {                //if some error happens in our data layer our app will not crash, we will                // get error here            }            override fun onNext(data: ArrayList<Repository>) {                repositories.value = data            }            override fun onComplete() {                isLoading.set(false)            }        })    }    override fun onCleared() {        super.onCleared()        if (!compositeDisposable.isDisposed) {            compositeDisposable.dispose()        }    }}

当初,咱们运行程序,当你点击Load Data按钮,2s之后,程序crash,而后,如果查看日志,您将看到onNext办法外部产生谬误,并且异样的起因是:

java.lang.IllegalStateException: Cannot invoke setValue on a background thread

为何会产生这个异样?

Schedulers

RxJava附带有调度器(Schedulers),使咱们能够抉择在哪个线程代码上执行。更精确地说,咱们能够抉择应用subscribeOn()方在哪个线程执行,observeOn()办法能够察看哪个线程观察者。通常状况下,咱们所有的数据层代码都应该在后盾线程执行,例如,如果咱们应用Schedulers.newThread(),每当咱们调用它时,调度器都会给咱们调配一个新的线程,为了简略起见,Scheduler中还有其余一些办法,我将不在本博文中介绍。

可能您曾经晓得所有UI代码都是在Android 主线程上实现的。 RxJava是Java库,它不理解Android主线程,这就是咱们应用RxAndroid的起因。 RxAndroid使咱们能够抉择Android Main线程作为执行代码的线程。显然,咱们的Observer应该在Android Main线程上运行。

让咱们更改一下代码:

...fun loadRepositories() {        isLoading.set(true)        compositeDisposable += gitRepoRepository                .getRepositories()                .subscribeOn(Schedulers.newThread())                .observeOn(AndroidSchedulers.mainThread())                .subscribeWith(object : DisposableObserver<ArrayList<Repository>>() {              ...        })    }...

而后再运行代码,所有都失常了,nice~

其余 observables types

这里还有一些其余的observable 类型

  • Single<T>: 被观察者仅发射一个数据,或者是一个异样
  • Maybe<T>: 被观察者不发射数据,或者仅发射一个数据,或者是一个异样
  • Completable : 发射onSuccess()事件或者异样
  • Flowable<T>:和 Observable<T> 一样,不发射数据,或者发射n个数据,或者发射异样,然而Observable不反对背压,而Flowable却反对。
什么是背压(backpressure)?

为了记住一些概念,我喜爱将它们与事实中的一些例子类比

把它类比成一个通道,如果你向通道中塞入瓶颈可能承受的最多的商品,这将会变得很糟,这里也是同样的,有时,你的观察者无奈解决其收到的事件数量,因而须要加快速度。

你能够看看RxJava 对于背压的文档:https://github.com/ReactiveX/...

操作符

RxJava中,最牛逼的就是它的操作符了,仅用一行代码即可在RxJava中解决一些通常须要10行或更多行的问题。这些是操作符能够帮咱们做的:

  • 合并observables
  • 过滤
  • 按条件来做操作
  • 将observables 转换为其余类型

我给你举一个例子,让咱们将数据保留到GitRepoLocalDataSource中。因为咱们正在保留数据,所以咱们须要Completable来模仿它。假如咱们还想模仿1秒的提早。天真的办法是:

fun saveRepositories(arrayList: ArrayList<Repository>): Completable {    return Completable.complete().delay(1,TimeUnit.SECONDS)}

为什么说天真?

Completable.complete()返回一个Completable实例,该实例在订阅后立刻实现。

一旦Completable 实现后,它将终止。因而,之后将不执行任何运算符(提早是运算符之一)。在这种状况下,咱们的Completable不会有任何提早。让咱们找解决办法:

fun saveRepositories(arrayList: ArrayList<Repository>): Completable {    return Single.just(1).delay(1,TimeUnit.SECONDS).toCompletable()}

为什么是这种形式?

Single.just(1)创立一个Single实例,并且仅发射一个数字1,因为咱们用了delay(1,TimeUnit.SECONDS) ,因而发射操作提早1s。

toCompletable()返回一个Completable,它抛弃Single的后果,并在此Single调用onSuccess时调用onComplete

因而,下面的代码将返回Completable,并且1s后调用onComplete()

当初,咱们应该更改咱们的GitRepoRepository。让咱们回顾一下逻辑。咱们查看互联网连贯。如果有互联网连贯,咱们从近程数据源获取数据,将其保留在本地数据源中并返回数据。否则,咱们仅从本地数据源获取数据。看一看:

fun getRepositories(): Observable<ArrayList<Repository>> {    netManager.isConnectedToInternet?.let {        if (it) {            return remoteDataSource.getRepositories().flatMap {                return@flatMap localDataSource.saveRepositories(it)                        .toSingleDefault(it)                        .toObservable()            }        }    }    return localDataSource.getRepositories()}

应用了.flatMap,一旦remoteDataSource.getRepositories()发射数据,该我的项目将被映射到收回雷同我的项目的新Observable。咱们从Completable创立的新Observable发射的雷同我的项目保留在本地数据存储中,并且将其转换为收回雷同发射项的Single。因为咱们须要返回Observable,所以咱们必须将Single转换为Observable。

很疯狂,huh? 设想一下RxJava还能为咱们做些啥!

RxJava是一个十分有用的工具,去应用它,摸索它,我置信你会爱上它的!

以上就是本文得全部内容,下一篇文章将是本系列的最初一篇文章,敬请期待!

本系列已更新结束:

【译】应用Kotlin从零开始写一个古代Android 我的项目-Part1

【译】应用Kotlin从零开始写一个古代Android 我的项目-Part2

【译】应用Kotlin从零开始写一个古代Android 我的项目-Part3

【译】应用Kotlin从零开始写一个古代Android 我的项目-Part4

文章首发于公众号:「 技术最TOP 」,每天都有干货文章继续更新,能够微信搜寻「 技术最TOP 」第一工夫浏览,回复【思维导图】【面试】【简历】有我筹备一些Android进阶路线、面试领导和简历模板送给你