前言
经过前几篇的介绍,对 RxJava 对模式有了一定的理解:由 Observable 发起事件,经过中间的处理后由 Observer 消费。(对 RxJava 还不了解的可以出门左拐)之前的代码中,事件的发起和消费都是在同一个线程中执行,也就是说之前我们使用的 RxJava 是同步的~~~观察者模式本身的目的不就是后台处理,将处理结果回调给前台?这同步的是要哪样?所以,这篇为大家介绍 RxJava 的重要的概念——Scheduler
参考:给 Android 开发者的 RxJava 详解(本文部分内容引用自该博客)
介绍
RxJava 在不指定线程的情况下,发起时间和消费时间默认使用当前线程。所以之前的做法
Observable.just(student1, student2, student2)
// 使用 map 进行转换,参数 1:转换前的类型,参数 2:转换后的类型
.map(new Func1<Student, String>() {
@Override
public String call(Student i) {
String name = i.getName();// 获取 Student 对象中的 name
return name;// 返回 name
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
nameList.add(s);
}
});
因为是在主线程中发起的,所以不管中间 map 的处理还是 Action1 的执行都是在主线程中进行的。若是 map 中有耗时的操作,这样会导致主线程拥塞,这并不是我们想看到的。
Scheduler
Scheduler:线程控制器,可以指定每一段代码在什么样的线程中执行。模拟一个需求:新的线程发起事件,在主线程中消费
private void rxJavaTest3() {
Observable.just(“Hello”, “Word”)
.subscribeOn(Schedulers.newThread())// 指定 subscribe() 发生在新的线程
.observeOn(AndroidSchedulers.mainThread())// 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, s);
}
});
上面用到了 subscribeOn(),和 observeOn() 方法来指定发生的线程和消费的线程。
subscribeOn():指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
observeOn():指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。
以及参数 Scheduler,RxJava 已经为我们提供了一下几个 Scheduler
Schedulers.immediate():直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
Schedulers.newThread():总是启用新线程,并在新线程执行操作。
Schedulers.io():I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
Schedulers.computation():计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
AndroidSchedulers.mainThread():它指定的操作将在 Android 主线程运行。
多次切换线程
看完上面的介绍想必对 RxJava 线程的切换有了一些理解,上面只是对事件的发起和消费制定了线程。如果中间有 map 之类的操作呢?是否可以实现发起的线程在新线程中,map 的处理在 IO 线程,最后的消费在主线程中。
Observable.just(“Hello”, “Wrold”)
.subscribeOn(Schedulers.newThread())// 指定:在新的线程中发起
.observeOn(Schedulers.io()) // 指定:在 io 线程中处理
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return handleString(s); // 处理数据
}
})
.observeOn(AndroidSchedulers.mainThread())// 指定:在主线程中处理
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
show(s); // 消费事件
}
});
可以看到 observeOn() 被调用了两次,分别指定了 map 的处理的现场和消费事件 show(s) 的线程。
若将 observeOn(AndroidSchedulers.mainThread()) 去掉会怎么样?不为消费事件 show(s) 指定线程后,show(s) 会在那里执行?其实,observeOn() 指定的是它之后的操作所在的线程。也就是说,map 的处理和最后的消费事件 show(s) 都会在 io 线程中执行。observeOn() 可以多次使用,可以随意变换线程
小结
学会线程控制后才算是真正学会了使用 RxJava。RxJava 的使用十分灵活,想要对其熟悉使用只有一个办法,那就是多用啦,熟能生巧。
以上有错误之处感谢指出
参考:给 Android 开发者的 RxJava 详解(本文部分内容引用自该博客)