共计 10112 个字符,预计需要花费 26 分钟才能阅读完成。
Android 第三方框架之 RxJava
一:介绍
在 GitHub 上的介绍:一个在 Java VM 上应用可观测的序列来组成异步的,基于事件的程序库
总结:RxJava 是一个基于事件流,实现异步操作的库
特点:异步,简洁
异步操作很要害的一点是程序的简洁性,因为在调度过程比较复杂的状况下,异步代码常常会既难写也难被读懂。Android 发明的 AsyncTask 和 Handler,其实都是为了让异步代码更加简洁。RxJava 的劣势也是简洁,但它的简洁的不同凡响之处在于,随着程序逻辑变得越来越简单,它仍然可能放弃简洁
二:原理
RxJava 原理基于一种扩大的观察者模式
被观察者(Observable)通过订阅(Subscribe)按程序发送事件(Event)给观察者(Observer), 观察者(Observer)按程序接管事件以及做出对应的响应动作
. 被观察者(Observable)产生事件
. 观察者(Observer)接管事件,并给出响应动作
. 订阅(Subscribe)连贯 被观察者 & 观察者
. 事件(Event)被观察者 & 观察者连贯的桥梁
观察者模式是一种一对多的对象依赖关系
例如:铃声,学生, 老师,上课
铃声(Ring)被观察者(Observable)
学生,老师(Student,Teacher)观察者(Observer)
// 被观察者的抽象类
abstract class Subject {
// 定义一个汇合存储观察者对象
protected List<Observer> observers = new ArrayList<Observer>();
// 减少观察者,通过 add 办法订阅观察者
public void add(Observer observer) {observers.add(observer);
}
// 删除观察者
public void remove(Observer observer) {observers.remove(observer);
}
// 告诉观察者办法
public abstract void notifyObserve1();
public abstract void notifyObserve2();}
/**
* 铃声 (被观察者) 具体实现
*/
public class Ring extends Subject {
@Override
public void notifyObserve1() {System.out.println("上课铃响了");
for (Object object : observers) {((Observer) object).response1();}
System.out.println("------------");
}
@Override
public void notifyObserve2() {System.out.println("下课铃响了");
for (Object object : observers) {((Observer) object).response2();}
System.out.println("------------");
}
}
观察者
/**
* 形象观察者
*/
public interface Observer {
// 观察者反馈
public abstract void response1();// 上课反馈
public abstract void response2();// 下课反馈}
/**
* 老师 (观察者) 具体老师的实现
*/
public class Teacher implements Observer {
@Override
public void response1() {System.out.println("老师筹备认真讲课");
}
@Override
public void response2() {System.out.println("老师筹备下课劳动");
}
}
// 应用:Subject subject = new Ring();// 创立被观察者
Observer teacher = new Teacher();// 创立观察者
subject.add(teacher);// 订阅观察者
subject.notifyObserve1();// 发送事件 1
subject.notifyObserve2();// 发送事件 2
RxJava 个别的事件流的应用
增加依赖:
//rxjava2
implementation 'io.reactivex.rxjava2:rxjava:2.2.20'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
RxJava 观察者的事件回调办法除了一般事件 onNext(),(相当于 onClick()/onEvent())之外,还定义了两个非凡的事件:onCompleted()和 onError()
.onCompleted(): 事件队列完结。RxJava 不仅把每个事件独自解决,还会把他们看作一个队列。RxJava 规定,当不会再有新的 onNext()收回时,须要触发 onCompleted()办法作为标记
.onError(): 事件队列异样。在事件处理过程中出异样时,onError() 会被触发,同时队列主动终止,不容许再有事件收回
. 在一个正确运行的事件序列中,onCompleted() 和 onError()有且只有一个,并且是事件序列中的最初一个。须要留神的是,onCompleted()和 onError()二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个
//1. 创立被观察者 Observable 对象, 应用 Observable.create 创立
Observable.create(new ObservableOnSubscribe<Integer>() {//2. 在复写的 subscribe()里定义须要发送的事件
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
// 通过 ObservableEmitter 类对象产生事件并告诉观察者
// ObservableEmitter 类介绍
// a. 定义:事件发射器
// b. 作用:定义须要发送的事件 & 向观察者发送事件
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();// 事件实现, 能够抉择持续发送事件}
}).subscribe(new Observer<Integer>() {// 订阅观察者
// 通过通过订阅(subscribe)连贯观察者和被观察者
// 创立观察者 & 定义响应事件的行为
@Override
public void onSubscribe(@NonNull Disposable d) {Log.d("TAG", "开始采纳 subscribe 连贯");
}
@Override
public void onNext(@NonNull Integer value) {Log.d("TAG", "处理事件"+ value);
}
@Override
public void onError(@NonNull Throwable e) {Log.d("TAG", "解决 Error 事件, 不再接管事件");
}
@Override
public void onComplete() {Log.d("TAG", "解决 Complete 事件, 不再接管事件");
}
});
应用来说,每一个网络申请接口相当于一个被观察者
public interface ApiService {
// 获取工作
@POST("task/apply")
Observable<BaseBean<GetTaskBean>> getTask(@Body RequestTaskBean requestTaskBean);// 这相当于被观察者
须要去订阅观察者
}
应用
HttpObservable.getObservable(RetrofitFactory.getApiService().getTask(bean))
.subscribe(GET_TASK_BEAN_HTTPOBSERVER);
//HttpObservable.getObservable(RetrofitFactory.getApiService().getTask(bean))
这个是失去被观察者
.subscribe()订阅
GET_TASK_BEAN_HTTPOBSERVER:是观察者
HttpObserver<GetTaskBean> GET_TASK_BEAN_HTTPOBSERVER =
new HttpObserver<GetTaskBean>(RequestTagConfig.TASK_APPLY) {// 这是一个观察者的实现}
三:RxJava 的操作符
转换类操作符 (map flatMap concatMap flatMapIterable switchMap scan groupBy…);
过滤类操作符 (fileter take takeLast takeUntil distinct distinctUntilChanged skip skipLast …);
组合类操作符 (merge zip join combineLatest and/when/then switch startSwitch…)。
转换操作符 Map
map()函数承受一个 Func1 类型的参数,而后把这个 Func1 利用到每一个由 Observable 发射的值上,将发射的值转换为咱们冀望的值
假如咱们须要将一组数字转换成字符串,咱们能够通过 map 这样实现:
Observable.just(1, 2, 3, 4, 5)
.map(new Func1<Integer, String>() {
@Override
public String call(Integer i) {return "This is" + i;}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {System.out.println(s);
}
});
过滤操作符 Filter
filter(Func1)用来过滤观测序列中咱们不想要的值,只返回满足条件的值,咱们看下原理图
还是拿后面文章中的小区 Community[] communities 来举例,假如我须要赛选出所有房源数大于 10 个的小区,咱们能够这样实现:
Observable.from(communities)
.filter(new Func1<Community, Boolean>() {
@Override
public Boolean call(Community community) {return community.houses.size()>10;
}
}).subscribe(new Action1<Community>() {
@Override
public void call(Community community) {System.out.println(community.name);
}
});
组合操作符 Merge
merge(Observable, Observable)将两个 Observable 发射的事件序列组合并成一个事件序列,就像是一个 Observable 发射的一样。你能够简略的将它了解为两个 Obsrvable 合并成了一个 Observable,合并后的数据是无序的。
咱们看上面的例子,一共有两个 Observable:一个用来发送字母,另一个用来发送数字;当初咱们须要两连个 Observable 发射的数据合并。
String[] letters = new String[]{"A", "B", "C", "D", "E", "F", "G", "H"};
Observable<String> letterSequence = Observable.interval(300, TimeUnit.MILLISECONDS)
.map(new Func1<Long, String>() {
@Override
public String call(Long position) {return letters[position.intValue()];
}
}).take(letters.length);
Observable<Long> numberSequence = Observable.interval(500, TimeUnit.MILLISECONDS).take(5);
Observable.merge(letterSequence, numberSequence)
.subscribe(new Observer<Serializable>() {
@Override
public void onCompleted() {System.exit(0);
}
@Override
public void onError(Throwable e) {System.out.println("Error:" + e.getMessage());
}
@Override
public void onNext(Serializable serializable) {System.out.print(serializable.toString()+" ");
}
});
程序输入:A 0 B C 1 D E 2 F 3 G H 4
RxJava 线程切换
总所周知 RxJava 在切换线程时用到了两个办法 subscribeOn() 和 observeOn() 上面来别离解释一下这两个办法
subscribeOn() : 影响的是最开始的被观察者所在的线程。当应用多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用;
observeOn() : 影响的是跟在前面的操作(指定观察者运行的线程)。所以如果想要屡次扭转线程,能够屡次应用 observeOn;
*/
public static <T> Observable<T> getObservable(Observable<T> apiObservable) {// showLog(request);
Observable<T> observable =
apiObservable
.onErrorResumeNext(new HttpResultFunction<>())
.subscribeOn(Schedulers.io())// 线程切换,影响最开始的被观察者的线程
.observeOn(AndroidSchedulers.mainThread());// 切换主线程影响的是观察者的线程
return observable;
}
因为 subscribeOn(Schedulers.io())它指定了最开始的被观察者所在的线程所以前面的操作都是依据最开始的被观察者制订的线程运行的,又因为 .observeOn(AndroidSchedulers.mainThread()) 它指定了都面的操作符应用主线程运行。
RxJava 线程调度 Schedulers
类型 | 含意 | 利用场景 |
---|---|---|
Schedulers.immediate() | 以后线程 = 不指定线程 | 默认 |
AndroidSchedulers.mainThread() | Android 主线程 | 操作 UI |
Schedulers.newThread() | 惯例新线程 | 网络申请、读写文件等 io 密集型操作 |
Schedulers.io() | io 操作线程 | 网络申请、读写文件等 io 密集型操作 |
Schedulers.computation() | CPU 计算操作线程 | 大量计算操作 |
// 被观察者(Observable)在 子线程 中生产事件(如实现耗时操作等等)// 观察者(Observer)在 主线程 接管 & 响应事件(即实现 UI 操作)Observable.just("head.png", "icon.png")
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(String s) throws Exception {
// 网络获取
Log.i(TAG, "apply:----------->1" + Thread.currentThread().getName());
return Bitmap.createBitmap(100, 100, Bitmap.Config.ARGB_8888);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
// .subscribeOn(Schedulers.newThread())
.subscribe(new Consumer<Bitmap>() {
@Override
public void accept(Bitmap bitmap) throws Exception {
// 子线程
Log.i(TAG, "apply:----------->2" + Thread.currentThread().getName());
// imageView.setIamgeView(bitmap)
}
});
三:RxJava 背压策略
观察者和被观察者之间存在 2 种订阅关系,同步和异步
对于异步订阅关系,存在被观察者发送事件速度与观察者接管事件速度不匹配的状况
1. 发送 & 接管事件速度 = 单位工夫内 发送 & 接管事件的数量
2. 大多数状况,次要是 被观察者发送事件速度 > 观察者接管事件速度
问题:
被观察者 发送事件速度太快,而观察者 来不及接管所有事件,从而导致观察者无奈及时响应 / 解决所有发送过去事件的问题,最终导致缓存区溢出、事件失落 & OOM
例如:
被观察者发送事件的速度 =10ms/ 个
观察者的接管速度 =5s/ 个
即呈现发送 & 接管事件重大不匹配的问题
Observable.create(new ObservableOnSubscribe<Integer>() {
// 1. 创立被观察者 & 生产事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {for (int i = 0; ; i++) {Log.d(TAG, "发送了事件"+ i);
Thread.sleep(10);
// 发送事件速度:10ms / 个
emitter.onNext(i);
}
}
}).subscribeOn(Schedulers.io()) // 设置被观察者在 io 线程中进行
.observeOn(AndroidSchedulers.mainThread()) // 设置观察者在主线程中进行
.subscribe(new Observer<Integer>() {
// 2. 通过通过订阅(subscribe)连贯观察者和被观察者
@Override
public void onSubscribe(Disposable d) {Log.d(TAG, "开始采纳 subscribe 连贯");
}
@Override
public void onNext(Integer value) {
try {
// 接管事件速度:5s / 个
Thread.sleep(5000);
Log.d(TAG, "接管到了事件"+ value);
} catch (InterruptedException e) {e.printStackTrace();
}
}
@Override
public void onError(Throwable e) {Log.d(TAG, "对 Error 事件作出响应");
}
@Override
public void onComplete() {Log.d(TAG, "对 Complete 事件作出响应");
}
});
解决方案:采纳背压策略
在异步订阅关系中,管制事件发送 & 接管速度
背压的作用域 = 异步订阅关系,即 被观察者 & 观察者处在不同线程中
背压策略的具体实现:Flowable
在 RxJava2.0 中,采纳 Flowable 实现 背压策略, 对应的观察者 Subscriber
Flowable 应用
/**
* 步骤 1:创立被观察者 = Flowable
*/
Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();}
}, BackpressureStrategy.ERROR);
// 须要传入背压参数 BackpressureStrategy,上面会具体解说
/**
* 步骤 2:创立观察者 = Subscriber
*/
Subscriber<Integer> downstream = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
// 比照 Observer 传入的 Disposable 参数,Subscriber 此处传入的参数 = Subscription
// 相同点:Subscription 具备 Disposable 参数的作用,即 Disposable.dispose()切断连贯, 同样的调用 Subscription.cancel()切断连贯
// 不同点:Subscription 减少了 void request(long n)
Log.d(TAG, "onSubscribe");
s.request(Long.MAX_VALUE);// 响应式拉取,须要多少要多少
// 对于 request()上面会持续具体阐明}
@Override
public void onNext(Integer integer) {Log.d(TAG, "onNext:" + integer);
}
@Override
public void onError(Throwable t) {Log.w(TAG, "onError:", t);
}
@Override
public void onComplete() {Log.d(TAG, "onComplete");
}
};
/**
* 步骤 3:建设订阅关系
*/
upstream.subscribe(downstream);
相干的背压博客参考:https://www.jianshu.com/p/ceb…
四:RxJva 其余罕用操作符
1.from 接管一个汇合作为输出,而后每次输入一个元素给 subscriber
Observable.from(T[] params)Observable.from(new Integer[]{1, 2, 3, 4, 5})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {Log.i(TAG, "number:" + number);
}
});
2.just 接管一个可变参数作为输出,最终也是生成数组,和调 from()区别
Observable.just(T... params) //params 的个数为 1 ~ 10Observable.just(1, 2, 3, 4, 5)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {Log.i(TAG, "number:" + number);
}
});
3.timer 能够做定时操作,换句话,就是提早执行。事件间隔由 timer 管制。
// 提早 2s 执行 Log 打印 Hello World(一次)Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {} @Override
public void onError(Throwable e) { } @Override
public void onNext(Long aLong) {Log.i(TAG, "Hello World!");
}
});
4.interval 定时的周期性操作,与 timer 的区别就在于它能够反复操作。事件间隔由 interval 管制
// 定时 2s 反复打印 Log Hello World
Observable.interval(2, TimeUnit.SECONDS)
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {} @Override
public void onError(Throwable e) { } @Override
public void onNext(Long aLong) {Log.i(TAG, "Hello World!");
}
});
END: 春天播下种子,才有秋的播种;已经艰辛拼搏,才有苦涩生存。