乐趣区

Rxjava2的原理一步一步的看得懂Rxjava源码

1 Rxjava2 最简单使用方式拆解

    Observable p=Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {e.onNext("hello world");
                e.onComplete();}
        });
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

调用 create 方法之后实际上返回了一个 ObservableCreate 对象. 继承了 Observable, 是一个被观察者对象.

        p.subscribe(new Observer() {
            @Override
            public void onSubscribe(Disposable d) { }

            @Override
            public void onNext(Object value) { }

            @Override
            public void onError(Throwable e) { }

            @Override
            public void onComplete() {}
        });

我们看下 subscribe 方法.

   public final void subscribe(Observer<? super T> observer) {
            ...
            subscribeActual(observer);
            ...
    }

其他代码都删掉了, 剩下最核心的 subscribeActual(observer), 这个 observer 就是我们创建的匿名内部类对象.subscribeActual()方法是个抽象方法, 我们看下 ObservableCreate 中是怎么实现的.

    public ObservableCreate(ObservableOnSubscribe<T> source) {this.source = source;}

    @Override
    protected void subscribeActual(Observer<? super T> observer) {CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {source.subscribe(parent);
        } catch (Throwable ex) {Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

CreateEmitter 发射器, 在这里我们调用了 observer.onSubscribe(parent)也就是我们创建的匿名 observer 类的 onSubscribe 方法.

source.subscribe(parent)最重要的方法可能没有之一, 观察者和被观察者顺利会师, 事件开始执行,

            @Override
            public void subscribe(ObservableEmitter e) throws Exception {// 这里的 ObservableEmitter 就是 parent, 也就是 CreateEmitter 发射器对象
                e.onNext("hello world");
                e.onComplete();}

接下来看看 CreateEmitter 的 onNext 和 onComplete 方法.

         @Override
        public void onNext(T t) {if (t == null) {onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {observer.onNext(t);
            }
        }

我们看到在发射器的 onNext 方法中, 啥也没做, 就是当了个二传手, 调用了我们观察者的 onNext 方法.

        @Override
        public void onComplete() {if (!isDisposed()) {
                try {observer.onComplete();
                } finally {dispose();
                }
            }
        }

onComplete 方法中也就是调用了观察者的 onComplete 方法.
我们来缕缕这个过程
1 create 方法传返回了一个对象是 ObservableCreate,ObservableCreate 的构造方法中有一个 ObservableOnSubscribe 对象, 也就是我们使用 create 时候创建的匿名内部类对象.
2 p.subscribe(o) 实际上调用了 ObservableCreate 的 subscribeActual 方法
3 subscribeActual 中首先调用了 observer 的 onSubscribe 方法, 紧接着调用了 source.subscribe(parent) 也就是 ObservableOnSubscribe 的 subscribe 方法, 事件开始执行
4 subscribe 方法中调用 CreateEmitter 的 onNext 方法, 这个方法调用了 observer 的 onNext 方法, 观察者对事件进行反应.
5 subscribe 方法中调用 CreateEmitter 的 onComplete 方法, 这个方法调用了 observer 的 onComplete 方法, 整个流程结束.

2 MAP 操作符是怎么工作的

    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

map 操作符把我们的 observable 对象变化成了具体的 ObservableMap, 参数是我们之前创建好的 observable 和 mapper function

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {source.subscribe(new MapObserver<T, U>(t, function));
    }

注意注意: 这里形成了一个新的订阅关系
这里的 source 是我们 create 创建的 observable, 要不然会懵, 创建 ObservableMap 时候我们传进来的 this 是我们生成的 observable.
到这里我们会重新调用 onSubscribe() subscribeActual(), 这里就回到了我们最简单模式时候的调用步骤. 不同的是我们真正的调用 observer 的方法实在 MapObserver 对应的方法中.
具体流程是 发射器调用 onNext 方法 –>MapObserver 的 onNext 方法 –> 再到我们定义的 observer 的 onNext 方法

         @Override
        public void onNext(T t) {if (done) {return;}

            if (sourceMode != NONE) {actual.onNext(null);
                return;
            }

            U v;

            try {
            // 调用 mapper 改变数据
                **v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");**
            } catch (Throwable ex) {fail(ex);
                return;
            }
            //actual 我们定义的 observer
            actual.onNext(v);
        }

3 进阶 flatMap

  @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
     ...
        return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));

看看 ObservableFlatMap 代码

 public ObservableFlatMap(ObservableSource<T> source,
            Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {super(source);
        this.mapper = mapper;
        this.delayErrors = delayErrors;
        this.maxConcurrency = maxConcurrency;
        this.bufferSize = bufferSize;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {return;}

        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }

是不是和 MAP 超级像, 我们这几看 MergeObserver onNext 做了什么

@Override
        public void onNext(T t) {
             ...
               p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");

            ...

            subscribeInner(p);
        }
         @SuppressWarnings("unchecked")
        void subscribeInner(ObservableSource<? extends U> p) {for (;;) {if (p instanceof Callable) { } else {InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                    addInner(inner);
                    p.subscribe(inner);
                    break;
                }
            }
        }

省略了很多代码, 我们看主要逻辑, 获取到 flatMap 生成的 observableSource, 然后 p.subscribe(inner);注意这里的 P 不是 observable
看 innerObserver 的 onNext 做了什么

    // 这里的 onNext 事件由 p.subscribe(inner)触发
  @Override
        public void onNext(U t) {if (fusionMode == QueueDisposable.NONE) {parent.tryEmit(t, this);
            } else {parent.drain();
            }
        }
        
        void tryEmit(U value, InnerObserver<T, U> inner) {if (get() == 0 && compareAndSet(0, 1)) {actual.onNext(value);
                if (decrementAndGet() == 0) {return;}
            } else {
                SimpleQueue<U> q = inner.queue;
                if (q == null) {q = new SpscLinkedArrayQueue<U>(bufferSize);
                    inner.queue = q;
                }
                q.offer(value);
                if (getAndIncrement() != 0) {return;}
            }
            drainLoop();}

在这里我们终于看到我们定义的 observer 接收到了 onNext 事件

4 总结

Observable ObservableSource 要分清楚, 他们都有一个方法叫 subscribe()
Observer Emitter 分清楚, 他们有共同的方法 onNext() onError() onComplete()
否则话很容易晕头转向.

文章如有表述有错误, 请指出, 谢谢.

退出移动版