共计 10635 个字符,预计需要花费 27 分钟才能阅读完成。
这是《应用 Kotlin 开发一个古代的 APP》系列文章的第三局部,还没看过前 2 局部的,能够先看一下:
【译】应用 Kotlin 从零开始写一个古代 Android 我的项目 -Part1
【译】应用 Kotlin 从零开始写一个古代 Android 我的项目 -Part2
注释开始!
什么是 RxJava ?
对于 RxJava, 一个宽泛的概念是 -RxJava 是用于异步编程的 API 的 Java 实现,它具备可察看流和响应式的 API。实际上,它是这三个概念的联合:观察者模式、迭代器模式和函数式编程。这里也有其余编程语言实现的库,如:RxSwift、RxJs、RxNet 等。
我 RxJava 上手很难,有时,它的确很令人困惑,如果施行不当,可能会给您带来一些问题。尽管如此,咱们还是值得花工夫学习它。我将尝试通过简略的步骤来解释 RxJava。
首先,让咱们答复一些简略的问题,当您开始浏览无关 RxJava 时,可能会问本人:
咱们真的须要它吗?
答案是否定的,RxJava 只是能够在 Android 开发中应用的又一个库。如果应用 Kotlin 开发,它也不是必须的,我心愿你明确我说的,它只一个很帮忙你的库,就像你应用的所以其余库一样。
要学习 RxJava2,必须先学 RxJava1 吗?
你能够间接从 RxJava2 开始,不过,作为 Android 开发人员,晓得这两种状况对你还是有益处的,因为你可能会参加保护其他人的 RxJava1 代码。
我看到有 RxAndroid,应该应用 RxAndroid 还是 RxJava?
RxJava 能用在任何 Java 开发平台,不仅仅是 Android, 比方,对于后端开发来说,RxJava 能够与 Spring 等框架一起应用,RxAndroid 是一个库,其中蕴含在 Android 中应用 RxJava 所需的库。因而,如果要在 Android 开发中应用 RxJava,则必须再增加 RxAndroid。稍后,我将解释 RxAndroid 基于 RxJava 所增加的内容。
咱们应用 Kotlin 开发,为什么不必 RxKotin 呢?
咱们没有必要另外再增加一个 Rx 库了,因为 Kotlin 与 Java 是齐全兼容的,这里的确有一个 RxKotin 库:https://github.com/ReactiveX/… , 不过该库是在 RxJava 之上编写的。它只是将 Kotlin 性能增加到 RxJava。您能够将 RxJava 与 Kotlin 一起应用,而无需应用 RxKotlin 库。为了简略起见,在这一部分中我将不应用 RxKotlin。
如何将 Rxjava2 增加到我的项目中?
要应用 RxJava,你须要在 build.gradle
中增加如下代码:
dependencies {
...
implementation "io.reactivex.rxjava2:rxjava:2.1.8"
implementation "io.reactivex.rxjava2:rxandroid:2.0.1"
...
}
而后,点击sync
,下载 Rxjava 库。
RxJava 蕴含了些啥?
我想把 RxJava 分为以下三局部:
- 1、用于观察者模式和数据流的类:
Observables
和Observers
- 2、Schedulers
- 3、数据流操作符
Observables
和 Observers
咱们曾经解释了这种模式。您能够将 Observable 视为数据的源(被观察者
),将 Observer 视为接收数据的源( 观察者
)。
有很多创立 Observables 的形式,最简略的办法是应用 Observable.just()
来获取一个我的项目并创立 Observable 来发射该我的项目。
让咱们转到 GitRepoRemoteDataSource
类并更改 getRepositories
办法, 以返回 Observable:
class GitRepoRemoteDataSource {fun getRepositories() : Observable<ArrayList<Repository>> {var arrayList = ArrayList<Repository>()
arrayList.add(Repository("First from remote", "Owner 1", 100, false))
arrayList.add(Repository("Second from remote", "Owner 2", 30, true))
arrayList.add(Repository("Third from remote", "Owner 3", 430, false))
return Observable.just(arrayList).delay(2,TimeUnit.SECONDS)
}
}
Observable <ArrayList <Repository >>
示意 Observable 收回 Repository 对象的数组列表。如果要创立收回 Repository 对象的 Observable <Repository>,则应应用Observable.from(arrayList)
。
.delay(2,TimeUnit.SECONDS)
示意提早 2s 后才开始发射数据。
然而,等等!咱们并没有高数 Observable 何时发射数据啊?Observables 通常在一些 Observer 订阅后就开始收回数据。
请留神,咱们不再须要以下接口了
interface OnRepoRemoteReadyCallback {fun onRemoteDataReady(data: ArrayList<Repository>)
}
在 GitRepoLocalDataSource:
类中做同样的更改
class GitRepoLocalDataSource {fun getRepositories() : Observable<ArrayList<Repository>> {var arrayList = ArrayList<Repository>()
arrayList.add(Repository("First From Local", "Owner 1", 100, false))
arrayList.add(Repository("Second From Local", "Owner 2", 30, true))
arrayList.add(Repository("Third From Local", "Owner 3", 430, false))
return Observable.just(arrayList).delay(2, TimeUnit.SECONDS)
}
fun saveRepositories(arrayList: ArrayList<Repository>) {//todo save repositories in DB}
}
同样的,也不须要这个接口了:
interface OnRepoLocalReadyCallback {fun onLocalDataReady(data: ArrayList<Repository>)
}
当初,咱们须要在 repository
中返回 Observable
class GitRepoRepository(private val netManager: NetManager) {private val localDataSource = GitRepoLocalDataSource()
private val remoteDataSource = GitRepoRemoteDataSource()
fun getRepositories(): Observable<ArrayList<Repository>> {
netManager.isConnectedToInternet?.let {if (it) {
//todo save those data to local data store
return remoteDataSource.getRepositories()}
}
return localDataSource.getRepositories()}
}
如果网络已连贯,咱们从近程数据源返回 Observable, 否则,从本地数据源返回 Observable, 同样的,咱们也不再须要 OnRepositoryReadyCallback
接口。
如你所料,咱们须要更改在 MainViewModel 中获取数据的形式。当初咱们应该从 gitRepoRepository
获取 Observable 并订阅它。一旦咱们向 Observer 订阅了该 Observable,Observable 将开始收回数据:
class MainViewModel(application: Application) : AndroidViewModel(application) {
...
fun loadRepositories() {isLoading.set(true)
gitRepoRepository.getRepositories().subscribe(object: Observer<ArrayList<Repository>>{override fun onSubscribe(d: Disposable) {//todo}
override fun onError(e: Throwable) {//todo}
override fun onNext(data: ArrayList<Repository>) {repositories.value = data}
override fun onComplete() {isLoading.set(false)
}
})
}
}
一旦 Observer 订阅了 Observable,onSubscribe
办法将被调用,次要 onSubscribe
的参数Disposable
, 稍后将讲到它。
每当 Observable 收回数据时,将调用 onNext()
办法。当 Observable 实现 s 数据发射时,onComplete()
将被调用一次。之后,Observable 终止。
如果产生某些异样,onError()
办法将被回调,而后 Observable 终止。这意味着 Observable 将不再收回数据,因而 onNext()
不会被调用,也不会调用onComplete()
。
另外,请留神。如果尝试订阅已终止的 Observable,则将收到IllegalStateException
。
那么,RxJava 如何帮忙咱们?
- 首先,咱们解脱了这些接口,它是所有 repository 和数据源建设的样板接口。
- 如果咱们应用接口,并且在数据层中产生某些异样,则咱们的应用程序可能会解体。应用 RxJava 谬误将在
onError()
办法中返回,因而咱们能够向用户显示适当的谬误音讯。 - 因为咱们始终将 RxJava 用于数据层,它更清晰。
- 我之前没有通知过你:以前的办法可能会导致内存透露。
应用 RxJava2 和 ViewModel 时,如何避免内存透露
咱们再一次看一下 ViewModel 的生命周期图
一旦 Activity 销毁,ViewModel 的 onCleared
办法将被调用,在 onCleared
办法中,咱们须要勾销所有订阅
class MainViewModel(application: Application) : AndroidViewModel(application) {
...
lateinit var disposable: Disposable
fun loadRepositories() {isLoading.set(true)
gitRepoRepository.getRepositories().subscribe(object: Observer<ArrayList<Repository>>{override fun onSubscribe(d: Disposable) {disposable = d}
override fun onError(e: Throwable) {
//if some error happens in our data layer our app will not crash, we will
// get error here
}
override fun onNext(data: ArrayList<Repository>) {repositories.value = data}
override fun onComplete() {isLoading.set(false)
}
})
}
override fun onCleared() {super.onCleared()
if(!disposable.isDisposed){disposable.dispose()
}
}
}
咱们能够优化一下下面的代码:
首先,应用 DisposableObserver
替换 Observer
, 它实现了 Disposable 并且有dispose()
办法,咱们不再须要 onSubscribe()
办法,因为咱们能够间接在 DisposableObserver 实例上调用dispose()
。
第二步,替换掉返回 Void 的 .subscribe()
办法,应用 .subscribeWith()
办法,他能返回指定的 Observer
class MainViewModel(application: Application) : AndroidViewModel(application) {
...
lateinit var disposable: Disposable
fun loadRepositories() {isLoading.set(true)
disposable = gitRepoRepository.getRepositories().subscribeWith(object: DisposableObserver<ArrayList<Repository>>() {override fun onError(e: Throwable) {// todo}
override fun onNext(data: ArrayList<Repository>) {repositories.value = data}
override fun onComplete() {isLoading.set(false)
}
})
}
override fun onCleared() {super.onCleared()
if(!disposable.isDisposed){disposable.dispose()
}
}
}
下面的代码还能够持续优化:
咱们保留了一个 Disposable 实例,因而,咱们才能够在 onCleared()
回调中调用 dispose()
,然而等等!咱们须要为每一个调用都这样做吗?如果有 10 个回调,那么咱们得保留 10 个实例,在onCleared()
中勾销 10 次订阅?显然不可能,这里有更好的办法,咱们应该将它们全副保留在一个存储桶中,并在调用 onCleared()
办法时, 将它们全部一次解决。咱们能够应用CompositeDisposable
。
CompositeDisposable
: 可包容多个 Disposable 的容器
因而,每次创立一个 Disposable,都须要将其增加到 CompositeDisposable
中:
class MainViewModel(application: Application) : AndroidViewModel(application) {
...
private val compositeDisposable = CompositeDisposable()
fun loadRepositories() {isLoading.set(true)
compositeDisposable.add(gitRepoRepository.getRepositories().subscribeWith(object: DisposableObserver<ArrayList<Repository>>() {override fun onError(e: Throwable) {
//if some error happens in our data layer our app will not crash, we will
// get error here
}
override fun onNext(data: ArrayList<Repository>) {repositories.value = data}
override fun onComplete() {isLoading.set(false)
}
}))
}
override fun onCleared() {super.onCleared()
if(!compositeDisposable.isDisposed){compositeDisposable.dispose()
}
}
}
感激 Kotlin 的扩大函数,咱们还能够更进一步:
与 C#和 Gosu 类似,Kotlin 提供了应用新性能扩大类的能力,而不用继承该类,也就是扩大函数。
让咱们创立一个新的包,叫做extensions
, 并且增加一个新的文件RxExtensions.kt
import io.reactivex.disposables.CompositeDisposable
import io.reactivex.disposables.Disposable
operator fun CompositeDisposable.plusAssign(disposable: Disposable) {add(disposable)
}
当初咱们能够应用 + =
符号将 Disposable 对象增加到 CompositeDisposable 实例:
class MainViewModel(application: Application) : AndroidViewModel(application) {
...
private val compositeDisposable = CompositeDisposable()
fun loadRepositories() {isLoading.set(true)
compositeDisposable += gitRepoRepository.getRepositories().subscribeWith(object : DisposableObserver<ArrayList<Repository>>() {override fun onError(e: Throwable) {
//if some error happens in our data layer our app will not crash, we will
// get error here
}
override fun onNext(data: ArrayList<Repository>) {repositories.value = data}
override fun onComplete() {isLoading.set(false)
}
})
}
override fun onCleared() {super.onCleared()
if (!compositeDisposable.isDisposed) {compositeDisposable.dispose()
}
}
}
当初,咱们运行程序,当你点击 Load Data
按钮,2s 之后,程序 crash,而后,如果查看日志,您将看到 onNext
办法外部产生谬误,并且异样的起因是:
java.lang.IllegalStateException: Cannot invoke setValue on a background thread
为何会产生这个异样?
Schedulers
RxJava 附带有调度器(Schedulers),使咱们能够抉择在哪个线程代码上执行。更精确地说,咱们能够抉择应用 subscribeOn()
方在哪个线程执行,observeOn()
办法能够察看哪个线程观察者。通常状况下,咱们所有的数据层代码都应该在后盾线程执行,例如,如果咱们应用Schedulers.newThread()
, 每当咱们调用它时,调度器都会给咱们调配一个新的线程,为了简略起见,Scheduler 中还有其余一些办法,我将不在本博文中介绍。
可能您曾经晓得所有 UI 代码都是在 Android 主线程上实现的。RxJava 是 Java 库,它不理解 Android 主线程,这就是咱们应用 RxAndroid 的起因。RxAndroid 使咱们能够抉择 Android Main 线程作为执行代码的线程。显然,咱们的 Observer 应该在 Android Main 线程上运行。
让咱们更改一下代码:
...
fun loadRepositories() {isLoading.set(true)
compositeDisposable += gitRepoRepository
.getRepositories()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(object : DisposableObserver<ArrayList<Repository>>() {...})
}
...
而后再运行代码,所有都失常了,nice~
其余 observables types
这里还有一些其余的 observable 类型
- Single<T>:被观察者仅发射一个数据,或者是一个异样
- Maybe<T>:被观察者不发射数据,或者仅发射一个数据,或者是一个异样
- Completable:发射
onSuccess()
事件或者异样 - Flowable<T>:和
Observable<T>
一样,不发射数据,或者发射 n 个数据,或者发射异样,然而 Observable 不反对 背压,而 Flowable 却反对。
什么是背压(backpressure)?
为了记住一些概念,我喜爱将它们与事实中的一些例子类比
把它类比成一个通道,如果你向通道中塞入瓶颈可能承受的最多的商品,这将会变得很糟,这里也是同样的,有时,你的观察者无奈解决其收到的事件数量,因而须要加快速度。
你能够看看 RxJava 对于背压的文档:https://github.com/ReactiveX/…
操作符
RxJava 中,最牛逼的就是它的操作符了,仅用一行代码即可在 RxJava 中解决一些通常须要 10 行或更多行的问题。这些是操作符能够帮咱们做的:
- 合并 observables
- 过滤
- 按条件来做操作
- 将 observables 转换为其余类型
我给你举一个例子,让咱们将数据保留到 GitRepoLocalDataSource 中。因为咱们正在保留数据,所以咱们须要 Completable 来模仿它。假如咱们还想模仿 1 秒的提早。天真的办法是:
fun saveRepositories(arrayList: ArrayList<Repository>): Completable {return Completable.complete().delay(1,TimeUnit.SECONDS)
}
为什么说天真?
Completable.complete()
返回一个 Completable 实例,该实例在订阅后立刻实现。
一旦 Completable 实现后,它将终止。因而,之后将不执行任何运算符(提早是运算符之一)。在这种状况下,咱们的 Completable 不会有任何提早。让咱们找解决办法:
fun saveRepositories(arrayList: ArrayList<Repository>): Completable {return Single.just(1).delay(1,TimeUnit.SECONDS).toCompletable()}
为什么是这种形式?
Single.just(1)
创立一个 Single 实例,并且仅发射一个数字 1, 因为咱们用了delay(1,TimeUnit.SECONDS)
, 因而发射操作提早 1s。
toCompletable()
返回一个 Completable,它抛弃 Single 的后果,并在此 Single 调用onSuccess
时调用onComplete
。
因而,下面的代码将返回 Completable,并且 1s 后调用onComplete()
。
当初,咱们应该更改咱们的 GitRepoRepository。让咱们回顾一下逻辑。咱们查看互联网连贯。如果有互联网连贯,咱们从近程数据源获取数据,将其保留在本地数据源中并返回数据。否则,咱们仅从本地数据源获取数据。看一看:
fun getRepositories(): Observable<ArrayList<Repository>> {
netManager.isConnectedToInternet?.let {if (it) {return remoteDataSource.getRepositories().flatMap {return@flatMap localDataSource.saveRepositories(it)
.toSingleDefault(it)
.toObservable()}
}
}
return localDataSource.getRepositories()}
应用了 .flatMap
, 一旦remoteDataSource.getRepositories()
发射数据,该我的项目将被映射到收回雷同我的项目的新 Observable。咱们从 Completable 创立的新 Observable 发射的雷同我的项目保留在本地数据存储中,并且将其转换为收回雷同发射项的 Single。因为咱们须要返回 Observable,所以咱们必须将 Single 转换为 Observable。
很疯狂,huh? 设想一下 RxJava 还能为咱们做些啥!
RxJava 是一个十分有用的工具,去应用它,摸索它,我置信你会爱上它的!
以上就是本文得全部内容,下一篇文章将是本系列的最初一篇文章,敬请期待!
本系列已更新结束:
【译】应用 Kotlin 从零开始写一个古代 Android 我的项目 -Part1
【译】应用 Kotlin 从零开始写一个古代 Android 我的项目 -Part2
【译】应用 Kotlin 从零开始写一个古代 Android 我的项目 -Part3
【译】应用 Kotlin 从零开始写一个古代 Android 我的项目 -Part4
文章首发于公众号:
「技术最 TOP」
,每天都有干货文章继续更新,能够微信搜寻「技术最 TOP」
第一工夫浏览,回复【思维导图】【面试】【简历】有我筹备一些 Android 进阶路线、面试领导和简历模板送给你