RxJava20转载

3次阅读

共计 8749 个字符,预计需要花费 22 分钟才能阅读完成。

零、来源

来源:Carson_Ho- 简书

一、基础知识

角色 作用 类比
被观察者(Observable) 产生事件 顾客
观察者(Observer) 接收事件,并给出响应动作 厨房
订阅(Subscribe) 连接 被观察者 & 观察者 服务员
事件(Event) 被观察者 & 观察者 沟通的载体 菜式


二、基础使用

1. 导入连接

implementation 'io.reactivex.rxjava2:rxjava:2.2.19'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'

2. 创建被观察者

   // 创建被观察者,产生事件
    public Observable<Integer> createObservable() {Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();}
        });
        return observable;
    }

3. 创建观察者

    // 创建观察者
    public Observer<Integer> createObserver() {Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {Log.v("lanjiabinRx", "onSubscribe 连接");
            }

            @Override
            public void onNext(Integer value) {Log.v("lanjiabinRx", "onNext" + value + "事件");
            }

            @Override
            public void onError(Throwable e) {Log.v("lanjiabinRx", "onError 事件");
            }

            @Override
            public void onComplete() {Log.v("lanjiabinRx", "onComplete 事件");
            }
        };
        return observer;
    }

4. 建立 subscribe() 连接

     // 观察者订阅被观察者
    public void createSubscribe() {createObservable().subscribe(createObserver());
    }

5. 调用和结果

 createSubscribe();

6. 链式调用

     // 链式调用
    public void chainCall() {Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();}
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {Log.v("lanjiabinRx", "onSubscribe 连接");
            }

            @Override
            public void onNext(Integer value) {Log.v("lanjiabinRx", "onNext" + value + "事件");
            }

            @Override
            public void onError(Throwable e) {Log.v("lanjiabinRx", "onError 事件");
            }

            @Override
            public void onComplete() {Log.v("lanjiabinRx", "onComplete 事件");
            }
        });
    }

6. 切断连接

即观察者 无法继续 接收 被观察者的事件,但被观察者还是可以继续发送事件

Disposable mDisposable; //1. 定义

@Override
public void onSubscribe(Disposable d) {
       mDisposable=d; //2. 赋值
       Log.v("lanjiabinRx", "onSubscribe 连接");
}

@Override
public void onNext(Integer value) {if (value==2) mDisposable.dispose(); //3. 在第二个 next 事件断开连接
      Log.v("lanjiabinRx", "onNext" + value + "事件");
}

三、创建操作符

0. 总图


1. create(基础发送)

最基础的创建


    //1.create
    public void chainCall() {Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();}
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {Log.v("lanjiabinRx", "onSubscribe 连接");
            }

            @Override
            public void onNext(Integer value) {Log.v("lanjiabinRx", "onNext" + value + "事件");
            }

            @Override
            public void onError(Throwable e) {Log.v("lanjiabinRx", "onError 事件");
            }

            @Override
            public void onComplete() {Log.v("lanjiabinRx", "onComplete 事件");
            }
        });
    }

2. just(立刻发送 10 以下)

  • 快速创建 1 个被观察者对象(Observable)
  • 发送事件的特点:直接发送传入的事件
  • 最多只能发送十个参数
  • 应用场景:快速创建 被观察者对象(Observable)& 发送 10 个以下事件

    //2.just
    public void justDo() {Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {Log.d("lanjiabinRx", "开始采用 subscribe 连接");
            }

            @Override
            public void onNext(Integer integer) {Log.d("lanjiabinRx", "接受的事件 onNext =" + integer);
            }

            @Override
            public void onError(Throwable e) {Log.d("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {Log.d("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }

结果:

3. fromArray(数组发送)

  • 快速创建 1 个被观察者对象(Observable)
  • 发送事件的特点:直接发送 传入的数组数据
  • 会将数组中的数据转换为 Observable 对象

应用场景:
1. 快速创建 被观察者对象(Observable)& 发送 10 个以上事件(数组形式)
2. 数组元素遍历


    //3.fromArray
    public void fromArrayDo() {Integer[] items = {0, 1, 2, 3, 4};
        Observable.fromArray(items).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {Log.v("lanjiabinRx", "开始采用 subscribe 连接");
            }

            @Override
            public void onNext(Integer integer) {Log.v("lanjiabinRx", "接受的事件 onNext =" + integer);
            }

            @Override
            public void onError(Throwable e) {Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {Log.v("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }

结果:

4. fromIterable(集合发送)

  • 快速创建 1 个被观察者对象(Observable)
  • 发送事件的特点:直接发送 传入的集合 List 数据
  • 会将数组中的数据转换为 Observable 对象

应用场景:
1. 快速创建 被观察者对象(Observable)& 发送 10 个以上事件(集合形式)
2. 集合元素遍历


    //4.fromIterable
    public void fromIterableDo(){List<Integer> list = new ArrayList<>();
        list.add(1);
        list.add(2);
        list.add(3);
        Observable.fromIterable(list).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {Log.v("lanjiabinRx", "开始采用 subscribe 连接");
            }

            @Override
            public void onNext(Integer integer) {Log.v("lanjiabinRx", "接受的事件 onNext =" + integer);
            }

            @Override
            public void onError(Throwable e) {Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {Log.v("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }

结果:

// 下列方法一般用于测试使用

<-- empty()  -->
// 该方法创建的被观察者对象发送事件的特点:仅发送 Complete 事件,直接通知完成
Observable observable1=Observable.empty(); 
// 即观察者接收后会直接调用 onCompleted()<-- error()  -->
// 该方法创建的被观察者对象发送事件的特点:仅发送 Error 事件,直接通知异常
// 可自定义异常
Observable observable2=Observable.error(new RuntimeException())
// 即观察者接收后会直接调用 onError()<-- never()  -->
// 该方法创建的被观察者对象发送事件的特点:不发送任何事件
Observable observable3=Observable.never();
// 即观察者接收后什么都不调用

5. defer(获取最新数据)

  • 直到有观察者(Observer)订阅时,才动态创建被观察者对象(Observable)& 发送事件
  • 通过 Observable 工厂方法创建被观察者对象(Observable)
  • 每次订阅后,都会得到一个刚创建的最新的 Observable 对象,这可以确保 Observable 对象里的数据是最新的

应用场景:
动态创建被观察者对象(Observable)& 获取最新的 Observable 对象数据

   //5.defer
    Integer i = 10; // 第一次赋值

    public void deferDo() {Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {

            @Override
            public ObservableSource<? extends Integer> call() throws Exception {return Observable.just(i);
            }
        });

        i = 15; // 第二次赋值
        observable.subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {Log.v("lanjiabinRx", "开始采用 subscribe 连接");
            }

            @Override
            public void onNext(Integer integer) {Log.v("lanjiabinRx", "接受的事件 onNext =" + integer);
            }

            @Override
            public void onError(Throwable e) {Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {Log.v("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }

结果:得到最新赋值的数字,说明取到了最新的数据

6. timer(延迟发送)

  • 快速创建 1 个被观察者对象(Observable)
  • 发送事件的特点:延迟指定时间后,发送 1 个数值 0(Long 类型)
  • 本质 = 延迟指定时间后,调用一次 onNext(0)

应用场景:
延迟指定事件,发送一个 0,一般用于检测


    //6.timer
    public void timerDo() {
        // 注:timer 操作符默认运行在一个新线程上
        // 也可自定义线程调度器(第 3 个参数):timer(long,TimeUnit,Scheduler)
        //TimeUnit.SECONDS 延迟 2s 后,发送一个 0
     
        /**
         * timer(long delay, TimeUnit unit)
         * delay 数值
         * unit 单位
         * 下面就是 2 数值,单位为秒,所以是 2 秒
         * */
        Observable.timer(2, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {Log.v("lanjiabinRx", "开始采用 subscribe 连接");
            }

            @Override
            public void onNext(Long aLong) {
                /*
                * 得到的结果为 0 一般用于检测
                * */
                Log.v("lanjiabinRx", "接受的事件 onNext =" + aLong);
            }

            @Override
            public void onError(Throwable e) {Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {Log.v("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }

结果:

7. interval(周期发送,无限)

  • 快速创建 1 个被观察者对象(Observable)
  • 发送事件的特点:每隔指定时间 就发送 事件
  • 发送的事件序列 = 从 0 开始、无限递增 1 的的整数序列

    //7.interval
    public void intervalDo() {
        /**
         * 从 0 开始递增
         *
         * @param initialDelay(Long)*          初始延迟时间(第一次延迟时间)* @param period(Long)*          后续数字发射之间的时间间隔(一个周期时间)* @param unit
         *          时间单位
         * */
        Observable.interval(3, 2, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {Log.v("lanjiabinRx", "开始采用 subscribe 连接");
            }

            @Override
            public void onNext(Long aLong) {Log.v("lanjiabinRx", "接受的事件 onNext =" + aLong);
            }

            @Override
            public void onError(Throwable e) {Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {Log.v("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }

结果:

8. intervalRange(周期发送,有限,指定数据)

  • 作用类似于 interval(),但可指定发送的数据的数量
     //8.intervalRange
    public void intervalRangeDo() {
        /**
         *
         * @param start 起始值
         * @param count 总共要发送的值的数量,如果为零,则运算符将在初始延迟后发出 onComplete
         * @param initialDelay 发出第一个值(开始)之前的初始延迟
         * @param period 后续值之间的时间段
         * @param unit 时间单位
         * */
        Observable.intervalRange(3, 10, 2, 1, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {Log.v("lanjiabinRx", "开始采用 subscribe 连接");
            }

            @Override
            public void onNext(Long aLong) {Log.v("lanjiabinRx", "接受的事件 onNext =" + aLong);
            }

            @Override
            public void onError(Throwable e) {Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {Log.v("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }

结果:3-12 经历 10 个数

9. range (无延迟,Integer 类型指定数据)

  • 作用类似于 intervalRange(),但区别在于:无延迟发送事件
    //9.range
    public void rangeDo(){
        /**
         * @param start
         *            序列中第一个 Integer 的值
         * @param count
         *           要生成的顺序整数的数量
         * */
        Observable.range(3,5).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {Log.v("lanjiabinRx", "开始采用 subscribe 连接");
            }

            @Override
            public void onNext(Integer integer) {Log.v("lanjiabinRx", "接受的事件 onNext =" + integer);
            }

            @Override
            public void onError(Throwable e) {Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {Log.v("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }    

结果:

10. rangeLong(无延迟,Long 类型指定数据)


    //10.rangeLong
    public void rangeLongDo(){
        /**
         * @param start
         *            Long 类型,序列中第一个 Integer 的值
         * @param count
         *           Long 类型,要生成的顺序整数的数量
         * */
        Observable.rangeLong(3,8).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {Log.v("lanjiabinRx", "开始采用 subscribe 连接");
            }

            @Override
            public void onNext(Long aLong) {Log.v("lanjiabinRx", "接受的事件 onNext =" + aLong);
            }

            @Override
            public void onError(Throwable e) {Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {Log.v("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }

结果:

编程中我们会遇到多少挫折?表放弃,沙漠尽头必是绿洲。

正文完
 0