DevUI是一支兼具设计视角和工程视角的团队,服务于华为云DevCloud平台和华为外部数个中后盾零碎,服务于设计师和前端工程师。
官方网站:devui.design
Ng组件库:ng-devui(欢送Star)
官网交换群:增加DevUI小助手(微信号:devui-official)进群
DevUIHelper插件:DevUIHelper-LSP(欢送Star)

本篇是 RxJS 源码解析的第三篇文章,应用源码的版本是 6.6.0 。本篇文章的内容可能会比拟多,请急躁浏览。为了不便浏览,文中的相干代码均通过裁剪和解决。如有不妥,还请斧正。

RxJS 中,Creation Operator 次要分为以下两类:

  • 执行个别创立操作的 Normal Creation Operator
  • 执行简单的创立操作的 Join Creation Operator

pipe 中应用的 operator ,我称之为 Pipe Operator ,它次要分为以下几类:

  • 用于数据映射的 Transformation Operators
  • 过滤用的 Filtering Operators
  • 将以后的 Observable 多播化的 Multicasting Operators
  • 处理错误的 Error Handling Operators
  • 工具操作函数 Utility Operators
  • Conditional and Boolean Operators
  • Mathematical and Aggregate Operators

限于篇幅本篇将先介绍 Normal Creation Operator ,它的次要作用是帮忙开发者疾速创立 Observable

of , empty & throwError

ofemptythrowError ,首先讲这三个 operator 的重要起因是,它提供了一系列根底的操作:next、complete、以及 error。

const observableA = of(1);const observableB = empty();const observableC = throwError(Error('test'));observableA.subscribe({    next: (v) => console.log('A: ' + v),    complete: () => console.log('A: complete');    error: (e) => console.log('A: error is ' + e);});observableB.subscribe({    next: (v) => console.log('B: ' + v),    complete: () => console.log('B: complete');    error: (e) => console.log('B: error is ' + e);});observableC.subscribe({    next: (v) => console.log('C: ' + v),    complete: () => console.log('C: complete'),    error: (e) => console.log(`C: error is (${e}).`),});// 打印后果// A: 1// A: complete// B: complete// C: error is Error: test

of source code

它的构建形式如下,其中,调度器是最初一个参数。

export function of<T>(...args: Array<T | SchedulerLike>): Observable<T> {  let scheduler = args[args.length - 1] as SchedulerLike;  if (isScheduler(scheduler)) {    args.pop();    return scheduleArray(args as T[], scheduler);  } else {    return fromArray(args as T[]);  }}

of 由两个函数 fromArrayscheduleArrayfromArray 是一个简略循环的函数,它将数据循环发送给 Observable 的订阅者。

export function fromArray<T>(input: ArrayLike<T>) {  return new Observable<T>((subscriber: Subscriber<T>) => {      // 循环获取数据      for (let i = 0, len = array.length; i < len && !subscriber.closed; i++) {        subscriber.next(array[i]);      }          subscriber.complete();    });}

empty source code

这部分的代码很简略,scheduler 局部能够疏忽。实际上就是在 Observable 中调用 subscriber.complete()

export function empty(scheduler?: SchedulerLike) {  if (scheduler) {    return new Observable<never>(subscriber => scheduler.schedule(      () => subscriber.complete()    ));  } else {    return new Observable<never>(subscriber => subscriber.complete());  }}

throwError source code

throwErrorempty 的实现是统一的,只不过 complete 换成了 error 。

export function throwError(error: any, scheduler?: SchedulerLike): Observable<never> {  if (!scheduler) {    return new Observable(subscriber => subscriber.error(error));  } else {    return new Observable(subscriber => scheduler.schedule(      dispatch,       0,       { error, subscriber }    ));  }}// 以下是 调度器中想要执行的状态。interface DispatchArg {  error: any;  subscriber: Subscriber<any>;}// 最终执行的是 subcriber 的 error 办法。function dispatch({ error, subscriber }: DispatchArg) {  subscriber.error(error);}

iif & defer

iifdefer 的体现是统一的。

  • defer 的次要作用是延后了具体 Observable 的生成,是一个 Lazy Observable Factory
  • iif 则是放大了 defer 的表白范畴,次要作用是加强了Rx 的命令式的语义。
let test = false;const observableA = iif(  () => test,  of('1'),  of('2'),);const observableB = defer(function() {    return test ? of('1') : of('2');});

iif Source Code

看到 iif 的源码的那一刻我震惊了,什么叫大道至简(战术后仰)。

export function iif<T = never, F = never>(  condition: () => boolean,  trueResult: SubscribableOrPromise<T> = EMPTY,  falseResult: SubscribableOrPromise<F> = EMPTY): Observable<T|F> {  // 间接调用了 defer  return defer(() => condition() ? trueResult : falseResult);}

defer Source Code

defer 原理上比较简单:在结构 Observable 的时候,在传入的订阅函数中返回一个 Subscription。那么在这个传入的订阅函数中,defer 的过程分为以下三步:

  • 调用工厂,获取输出数据。
  • 调用 from 将数据转换成一个 observable
  • 返回这个 observable 的订阅。
export function defer<R extends ObservableInput<any> | void>(observableFactory: () => R): Observable<ObservedValueOf<R>> {  return new Observable<ObservedValueOf<R>>(subscriber => {    let input: R | void;    try {      // 调用工厂函数,获取输出的数据。      input = observableFactory();    } catch (err) {      subscriber.error(err);      return undefined;    }    // 通过 from 将 input 转换为 observable。    const source = input ? from(input as ObservableInput<ObservedValueOf<R>>) : empty();        // 返回一个订阅器到内部。    return source.subscribe(subscriber);  });}

其中的 ObservedValueOf 是这样定义的,应用了 ts 的 infer 来推导出 ObservableInput<T>T 的具体类型。

export type ObservedValueOf<OV> = OV extends ObservableInput<infer T> ? T : never;

from

from 提供了一种映射的性能,能够将传入的数据映射成 Observables 。它能够承受以下参数:

  • 原生数组 和 Iterable<T>
  • dom 迭代器
  • Promise<T>
  • Observable<T>

略微的修剪了一下,源码如下:

export function from<T>(input: ObservableInput<T>): Observable<T> { return new Observable<T>(subscribeTo(input));}

它间接创立一个新的 Observable,并且调用了 subscribeTo ,依据输出类型,对输出进行不同的解决。

  • 如果输出是 Observable,调用 subscribeToObservable
  • 如果输出是原生数组,调用 subscribeToArray
  • 如果输出是 Promise,调用 subscribeToPromise
  • 如果输出是生成器,调用 subscribeToIterable

subscribeToArray

如果输出是原生数组或者是实现了数组性能的数据结构,那么间接调用 subscriber.next 把所有数据顺次发送给订阅者。

export const subscribeToArray = <T>(array: ArrayLike<T>) => (subscriber: Subscriber<T>) => {  for (let i = 0, len = array.length; i < len && !subscriber.closed; i++) {    subscriber.next(array[i]);  }  subscriber.complete();};

subscribeToObservable

如果输出是 Obervable,那么要通过一个特定的 Symbol 取出 Observable,而后再订阅它。

(基于 Symbol 的个性,以后很多我的项目都会应用一个固定的 Symbol 对特定数据取值,来验证这个数据是不是合乎类型)

export const subscribeToObservable = <T>(obj: any) => (subscriber: Subscriber<T>) => {  const obs = obj[Symbol_observable]();  if (typeof obs.subscribe !== 'function') {    throw new TypeError('Provided object does not correctly implement Symbol.observable');  } else {    return obs.subscribe(subscriber);  }};

subscribeToPromise

如果输出是一个 Promise,那么通过 then 获取到 Promise 的内容,并将内容发送给订阅者。

export const subscribeToPromise = <T>(promise: PromiseLike<T>) => (subscriber: Subscriber<T>) => {  promise.then(    (value) => {      if (!subscriber.closed) {        subscriber.next(value);        subscriber.complete();      }    },    (err: any) => subscriber.error(err)  );  return subscriber;};

subscribeToIterable

生成器跟数组的形式相似,也是通过循环的形式将数据发送给订阅者。

export const subscribeToIterable = <T>(iterable: Iterable<T>) => (subscriber: Subscriber<T>) => {  const iterator = (iterable as any)[Symbol_iterator]();  do {    let item: IteratorResult<T>;    try {      item = iterator.next();    } catch (err) {      subscriber.error(err);      return subscriber;    }    if (item.done) {      subscriber.complete();      break;    }    subscriber.next(item.value);    if (subscriber.closed) {      break;    }  } while (true);  return subscriber;};

generate

generate 能够让你用一种相似 for 循环的形式取得数据流。不过,我目前还没有遇到过十分须要这种形式生成流的形式,如果你遇到这种状况,欢送交换。一般来说,我习惯于这样调用它。

const observable = generate({    initialState: 1,    condition: x => x < 5,    iterate: x => x + 1,});observable.subscribe((value) => {    console.log(value);});// 打印后果// 1// 2// 3// 4

原来的源码蕴含了较多的参数判断,把外部逻辑梳理一下,实际上就是分为三个大步骤:

  • 判断完结条件, 如果为假代表曾经完结,则应该实现订阅,否则进行下一步。
  • 发送数据订阅给到订阅者。
  • 调用迭代办法,生成下一组数据,反复第一步。
export function generate<S>(options: GenerateOptions<S>): Observable<S> {  const initialState = options.initialState;  const condition = options.condition;  const iterate = options.iterate;  // 返回 Observable  return new Observable<S>(subscriber => {    let state = initialState;    try {      while (true) {        // 判断完结条件        if (condition && !condition(state)) {          subscriber.complete();          break;        }        // 发送数据给订阅者        subscriber.next(state);        // 调用迭代,获取下一组数据        state = iterate(state);                if (subscriber.closed) {          break;        }      }    } catch (err) {      subscriber.error(err);    }    return undefined;  });}

其中 GenerateOptions 蕴含了三个成员,initialStatecondition 以及 iterate

export interface GenerateOptions<S> {  // 初始状态  initialState: S;    // 完结条件  condition?: (x: S) => boolean;  // 迭代形式  iterate: (x: S) => S;}

range

range 能够创立一个给定范畴的数字流。这个次要就是提供了一个简略的语义化函数,次要就是通过循环给订阅者喂数据。

export function range(start: number = 0, count?: number): Observable<number> {  return new Observable<number>(subscriber => {    if (count === undefined) {      count = start;      start = 0;    }    for (let index = 0; index < count; ++index) {      subscriber.next(start + index);      if (subscriber.closed) {        break;      }    }    return undefined;  });}

fromEvent & fromEventPattern

fromEvent

fromEvent 是的 Observable 能够封装一系列的零碎事件。既能够承受 NodeJS EventEmitter,也能够承受 DOM EventTargetJQuery-like event target, NodeList 或者 HTMLCollection 等浏览器对象。

const clicksA = fromEvent(document, 'click');const clicksB = fromEvent($(document), 'click');clicksA.subscribe(x => console.log('A: ', x));clicksB.subscribe(x => console.log('B: ', x));// 每当点击一下页面,都会打印出 event 。

它的实现很简略,依据 target 的对象类型调用其对应的事件监听函数,而后通过 subscriber 调用 next 获取到订阅的输入。为了不便浏览,我略微的改了一下,让 fromEvent 只反对 DOM EventTarget。

export interface HasEventTargetAddRemove<E> {  addEventListener(type: string, listener: ((evt: E) => void) | null, options?: boolean | AddEventListenerOptions): void;  removeEventListener(type: string, listener?: ((evt: E) => void) | null, options?: EventListenerOptions | boolean): void;}// 一个只反对 DOM EventTarget 的 fromEventexport function fromEvent<T>(  target: HasEventTargetAddRemove<T>,  eventName: string,  options?: EventListenerOptions,): Observable<T> {  return new Observable<T>(subscriber => {    // 处理结果    const handler = (...e: T[]) => subscriber.next(e.length === 1 ? e[0] : e);        // 调用 addEventListener,并让其在 handler 中解决。    target.addEventListener(eventName, handler, options);    // 勾销订阅的时候,间接调用 removeEventListener 对 dom 勾销订阅。    // 返回的是一个函数,这个函数负责了勾销订阅的时,所做的内容。    return () => {      target.removeEventListener(eventName, handler, options);    };  });}

下面的代码能够分解成这三个步骤:

  • 在闭包中创立一个 handler 函数,handler 函数最终会调用 subscriber.next
  • target 增加指定事件监听。
  • subscriber 增加一个销毁 target 事件监听的逻辑。

对于其余的事件监听,不再赘述,流程齐全是一样。

fromEventPattern

fromEventPattern 则是对 fromEvent 的泛化。

function addClickHandler(handler) {  document.addEventListener('click', handler);} function removeClickHandler(handler) {  document.removeEventListener('click', handler);} const clicks = fromEventPattern(  addClickHandler,  removeClickHandler);clicks.subscribe(x => console.log(x)); // 点击的时候,就会输入点击事件。

它的源码的与 fromEvent 相似。

export type NodeEventHandler = (...args: any[]) => void;export function fromEventPattern<T>(  addHandler: (handler: NodeEventHandler) => any,  removeHandler?: (handler: NodeEventHandler, signal?: any) => void,): Observable<T | T[]> {  return new Observable<T | T[]>(subscriber => {    const handler = (...e: T[]) => subscriber.next(e.length === 1 ? e[0] : e);    // 有一点不同的中央在于,获取了返回值 addHandler 的返回值    let retValue: any;    try {      retValue = addHandler(handler);    } catch (err) {      subscriber.error(err);      return undefined;    }        if (!isFunction(removeHandler)) {      return undefined;    }        // 而后在这里传入 removeHandler 中    return () => removeHandler(handler, retValue);  });}

bindCallback, bindNodeCallback

它们都是一种非凡的 Operator ,思路应该是源于 Function.bind ,提供一种转换操作,将带有回调的函数转换成 Observable Factory

function setTimeoutWithCallback(callback: () => void) {  setTimeout(() => {    callback();  }, 2000);}const obfactory = bindCallback(setTimeoutWithCallback);const ob1 = obfactory();const ob2 = obfactory();const now = Date.now();ob1.subscribe(() => {  console.log('ob1' + (Date.now() - now));});setTimeout(() => {  ob1.subscribe(() => {    console.log('ob1 later: ' + (Date.now() - now));  });  ob2.subscribe(() => {    console.log('ob2: ' + (Date.now() - now));  });}, 3000);// 打印后果:// ob1: 2001// ob1 later: 3004// ob2: 5008

以下是 bindNodeCallback 的例子。

/*     file: ~/desktop/test.json        { "name": "Hello World" } */import * as fs from 'fs';const readerFactory = bindNodeCallback(fs.readFile);const reader$ = readerFactory('./src/person.json');reader$.subscribe({    next: (value) => console.log(value.toString()),    error: (err) => console.log(err),    complete: () => console.log('complete')});// 如果没有谬误,打印后果如下:// { name: 'Hello World' }// complete// 如果有谬误,打印后果如下:// [Error: ENOENT: no such file or directory, open './src/person.json'] {//     errno: -2,//     code: 'ENOENT',//     syscall: 'open',//     path: './src/person'// }

bindCallback 和 bindNodeCallback 的源码十分相似。

export function bindCallback<T>(  callbackFunc: Function): (...args: any[]) => Observable<T> {  return function (this: any, ...args: any[]): Observable<T> {    const context = this;    //     let subject: AsyncSubject<T>;    return new Observable<T>(subscriber => {      if (!subject) {           subject = new AsyncSubject<T>();        const handler = (...innerArgs: any[]) => {          subject.next(innerArgs.length <= 1 ? innerArgs[0] : innerArgs);          subject.complete();        };                try {          callbackFunc.apply(context, [...args, handler]);        } catch (err) {          subject.error(err);        }      }      return subject.subscribe(subscriber);    });  };}

bindCallback 和 bindNodeCallback 的源码惟一不同的中央就是在于 handler 这个函数解决的内容不同,bindNodeCallback 传入的函数的回调,第一个参数为是错误信息。

const handler = (...innerArgs: any[]) => {    const err = innerArgs.shift();  // 如果第一个参数存在,阐明有问题。    if (err) {      subject.error(err);      return;    }  subject.next(innerArgs.length <= 1 ? innerArgs[0] : innerArgs);  subject.complete();};

源码中比拟乏味的中央在于,创立的时候,返回的工厂函数蕴含了一个 AsyncSubject。这个 AsyncSubject 保留了曾经到来数据,能够看看例子中,ob1 被订阅了2次,第二次订阅后实际上是立即就能拿到返回值;而 ob2 仍要执行一次 setTimeoutWithCallback。这种设计与这个 bind 的语义相吻合。

interval & timer

下面的 operators 中,我曾经把 scheduler 相干的内容进行了裁剪,基本上与 scheduler 无关。而 intervaltimer 都必须通过 scheduler 来相应的定时操作,所以这部分放到了最初。它们是用于创立定时数据源的 operators

  • interval: 传入的参数示意每隔指定毫秒发送一条数据。
  • timer:传入的第一个参数是指第一条发送数据的工夫距离,第二个参数是指后续数据发送的距离。
const observableA = interval(1000).pipe(take(2));const observableB = timer(500, 1000).pipe(take(3));console.log('hello');observableA.subscribe(value => {  console.log('A: ' + value);});observableB.subscribe(value => {  console.log('B: ' + value);});// 打印后果// hello// B: 0// A: 0// B: 1// A: 1// B: 2

intervaltimer 都应用了一个默认的异步调度器,这个异步调度器次要是通过 setInterval 来实现相应的性能,实际上 Rx 把异步调度器通过 intervaltimer 转化成 Observable 的模式提供到给用户。

timer Source Code

timer 的实现如下图所示。它首先创立了一个 Observable ,而后在订阅函数中,返回调度器的订阅。在这里, schedulerschedule 函数返回了一个 Subscription

export function timer(    dueTime: number | Date = 0,   period: number,  scheduler: SchedulerLike = async): Observable<number> {  return new Observable(subscriber => {    let due = 0;    // 判断是不是 Date 类型    if (dueTime instanceof Date) {      due = +dueTime - scheduler.now();    }    // 判断是不是 number 类型    if (isNumeric(dueTime)) {      due = dueTime as number;    }        // 此处调用跟 interval 相似。    return scheduler.schedule(dispatch, due, {      index: 0, period, subscriber    });  });}

dispatch 实际上是一个递归函数,这个函数绑定了 SchedulerAction ,通过传入订阅者,使得 Action 外部的 setInterval 可能始终调用 subscriber.next

interface TimerState {  index: number;  period: number;  subscriber: Subscriber<number>;}function dispatch(this: SchedulerAction<TimerState>, state: TimerState) {  const { index, period, subscriber } = state;  subscriber.next(index);  if (subscriber.closed) {    return;  } else if (period === -1) {    return subscriber.complete();  }  state.index = index + 1;  this.schedule(state, period);}

interval Source Code

以下是 interval 的源码。

export function interval(period = 0): Observable<number> {    if (!isNumeric(period) || period < 0) {    period = 0;  }    const scheduler = async;    return new Observable<number>(subscriber => {    // 订阅器接管 scheduler 的订阅后果。    subscriber.add(      scheduler.schedule(dispatch, period, { subscriber, counter: 0, period })    );    return subscriber;  });}

认真的剖析下面的代码,我发现 interval 的实现实际上就是 timer 的一个束缚版本,它能够改写成这样。

export function interval(    period = 0,  scheduler: SchedulerLike = async,): Observable<number> {    if (!isNumeric(period) || period < 0) {    period = 0;  }    return timer(period, period, sch);}

本篇小结

整体而言,这部分的源码并没有写得很绕,删去了 scheduler 相干的内容后,逻辑立即就变得清晰了起来。同时,从源码的格调上能够看到它们由不同的人来编写。

最初,限于自己的程度,如果文章中有谬误的中央,欢送斧正。

退出咱们

咱们是DevUI团队,欢送来这里和咱们一起打造优雅高效的人机设计/研发体系。招聘邮箱:muyang2@huawei.com。

作者:zcx(公众号:Coder写字的中央)

原文链接:https://mp.weixin.qq.com/s/6fVoI_JtSXu6YfZur1TDNw

往期文章举荐

《RxJS 源码解析(二)—— Muticasted Observable》

《Web界面深色模式和主题化开发》

《手把手教你搭建一个灰度公布环境》