从 new Observable 开始
import {Observable} from ‘rxjs’
const observable = new Observable<number>(subscriber => {
subscriber.next(1)
subscriber.next(2)
subscriber.complete()
})
observable.subscribe({
next: data => console.log(‘next data:’, data),
complete: () => {
console.log('complete')
}
})
输入如下:
// 开始输入
next data: 1
next data: 2
complete
// 完结输入
通过 new Observable() 办法创立了一个可察看对象 observable,而后通过 subscribe 办法订阅这个 observable,订阅的时候会执行在 new Observable 时候传入的函数参数,那么就来看下 new Observable 到底做了什么
// /src/internal/Observable.ts
export class Observable<T> implements Subscribable<T> {
// …
constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
if (subscribe) {this._subscribe = subscribe;}
}
// …
}
Observable 的初始化办法很简略,就是将回调函数绑定到实例的 _subscribe 属性上
subscribe
Observable 实现 (implements) 了 Subscribable(订阅)接口
// /src/internal/types.ts
export interface Subscribable<T> {
subscribe(observer: Partial<Observer<T>>): Unsubscribable;
}
这个 subscribe 正是下一步要用于订阅的办法,在以后版本中 subscribe 的办法签名有三个,三个只是传参模式不同,最终都会解决成雷同的对象,着重看第一个
subscribe(observer?: Partial<Observer<T>>): Subscription;
对于第一个签名,接管的参数与 Observer 接口相干,这个接口有三个办法属性
export interface Observer<T> {
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}
subscribe 能够是一个对象,这个对象蕴含三个办法属性 next、error、complete,当你不关怀 error 和 complete 这两个属性的时候,那么能够依照第二个函数签名间接传入一个办法,这个办法就默认代表 next 办法属性
进入 subscribe 办法
subscribe(
observerOrNext?: Partial<Observer<T>> | ((value: T) => void) | null,
error?: ((error: any) => void) | null,
complete?: (() => void) | null
): Subscription {
const subscriber = isSubscriber(observerOrNext) ? observerOrNext : new SafeSubscriber(observerOrNext, error, complete);
errorContext(() => {
// ...
});
return subscriber;
}
subscribe 的第一个参数能够是一个 subscriber(具备 next、error、complete 三个属性,所以类型非法),不过这种传参模式个别都是库外部应用,咱们失常写法还是传入一个纯正的对象或者办法,那么就意味着会执行 new SafeSubscriber(observerOrNext, error, complete)
// node_modules/rxjs/src/internal/Subscriber.ts
export class SafeSubscriber<T> extends Subscriber<T> {
// …
}
SafeSubscriber 继承了 Subscriber,次要作用是对 next、error、complete 三个办法属性进行了一层封装,保障可能更好地进行错误处理
subscriber.add(
operator
? // We're dealing with a subscription in the
// operator chain to one of our lifted operators.
operator.call(subscriber, source)
: source
? // If `source` has a value, but `operator` does not, something that
// had intimate knowledge of our API, like our `Subject`, must have
// set it. We're going to just call `_subscribe` directly.
this._subscribe(subscriber)
: // In all other cases, we're likely wrapping a user-provided initializer
// function, so we need to catch errors and handle them appropriately.
this._trySubscribe(subscriber)
);
errorContext 也是一个错误处理的包装办法,外面只调用了一个 subscriber.add 办法,这个办法的参数用了两个嵌套的三元表达式。
rxjs 内置的泛滥操作符(operator) 会调用 Observable,这个场景下,this.operator 就有值了,所以如果是操作符调用,就会走 operator.call(subscriber, source);rxjs 外部的一些 Subject 在某些状况下会执行到第二个逻辑 this._subscribe(subscriber);其余状况(即开发者失常应用的状况)会执行 this._trySubscribe(subscriber),前两个波及到 operator 和 Subject,而且最终的大略流程跟间接执行第三个是差不多的,所以这里只看第三个
this._subscribe 就是在最开始 new Observable 的时候传入的参数,所以只有有订阅操作(subscribe),就会执行这个办法
protected _trySubscribe(sink: Subscriber<T>): TeardownLogic {
try {
return this._subscribe(sink);
} catch (err) {
// We don't need to return anything in this case,
// because it's just going to try to `add()` to a subscription
// above.
sink.error(err);
}
}
而在本文的例子里,new Observable 的函数参数里,调用了 subscriber.next 和 subscriber.complete
protected _next(value: T): void {
this.destination.next(value);
}
protected _error(err: any): void {
try {
this.destination.error(err);
} finally {
this.unsubscribe();
}
}
protected _complete(): void {
try {
this.destination.complete();
} finally {
this.unsubscribe();
}
}
this.destination 这个对象,在 new SafeSubscriber 的时候,被设置了 next、error、complete 三个办法属性,就是订阅的时候传入的三个自定义办法,在这里调用到了
// 简化后的代码
subscriber.add(this._trySubscribe(subscriber));
这个是为了收集 teardown,也就是订阅勾销 (unsubscribe) 的时候执行的收尾 / 清理办法,比方在订阅里启动了一个轮询办法,那么完结订阅的时候,你想同时也勾销掉这个轮询逻辑,那么就能够在 new Observable 的办法体里,最初返回一个勾销轮询的办法,那么在 unsubscribe 的时候就会主动调用这个 teardown 办法执行你定义的勾销轮询逻辑,相似于 React.useEffect 最初返回的那个办法
add(teardown: TeardownLogic): void {
// Only add the teardown if it’s not undefined
// and don’t add a subscription to itself.
if (teardown && teardown !== this) {
if (this.closed) {
// If this subscription is already closed,
// execute whatever teardown is handed to it automatically.
execTeardown(teardown);
} else {if (teardown instanceof Subscription) {
// We don't add closed subscriptions, and we don't add the same subscription
// twice. Subscription unsubscribe is idempotent.
if (teardown.closed || teardown._hasParent(this)) {return;}
teardown._addParent(this);
}
(this._teardowns = this._teardowns ?? []).push(teardown);
}
}
}
this.closed 的值用于标识以后 subscription 是否曾经勾销订阅了(complete、error、unsubscribe 都会将此值置为 true),this._teardowns 就是用于寄存与以后 subscription 所有无关的 teardown,能够看到,teardown 除了是一个自定义的清理办法外,还能够是一个 Subscription
一个 subscription(称为父 subscription)能够通过 add 连贯到另外一个 subscription(称为子 subscription),那么在父 subscription 调用 unsubscribe 办法勾销订阅的时候,因为会执行 this._teardowns 里所有的办法,也就会调用子 subscription 的 unsubscribe,勾销其下所有子孙 subscription 的订阅
这种关系看起来是一种父子关系,所以通过公有属性 _parentage 来表明这种关系,作用是防止 B subscription 被同一个 subscription 反复订阅的问题,Subscription 里定义了几个办法用于治理 _parentage 的数据,例如 _hasParent、_addParent、_removeParent
const observable1 = interval(100)
const observable2 = interval(200)
const subscription1 = observable1.subscribe(x => console.log(‘first: ‘ + x))
const subscription2 = observable2.subscribe(x => console.log(‘second: ‘ + x))
subscription2.add(subscription1)
setTimeout(() => {
subscription2.unsubscribe()
}, 400)
上述代码中,subscription2 通过 add 办法连贯到了 subscription1,那么在 subscription2 调用 unsubscribe 的时候,也会同时执行 subscription1 的 unsubscribe,所以输入为
// 开始输入
first: 0
first: 1
second: 0
first: 2
first: 3
second: 1
// 完结输入
unsubscribe
有订阅就有勾销订阅,unsubscribe 次要用作执行一些清理动作,例如执行在 subscribe 的时候收集到的 teardown,以及更新 _parentage 的数据
// node_modules/rxjs/src/internal/Subscription.ts
unsubscribe(): void {
// …
const {_parentage} = this;
if (_parentage) {
// 更新 _parentage
}
const {initialTeardown} = this;
if (isFunction(initialTeardown)) {
// 执行 initialTeardown
}
const {_teardowns} = this;
if (_teardowns) {
// ...
// 执行 teardown
}
// …
}
这里有个 initialTeardown 办法,能够了解为 Subscription 勾销订阅时会执行的函数,作为使用者个别不须要关怀这个,库外部会应用到
const subscription = new Subscription(() => {
console.log(‘ 勾销订阅时执行 initialTeardown’)
})
const observable = new Observable<number>(subscribe => {
subscribe.next(1)
return subscription
})
const subscription1 = observable.subscribe(d => console.log(d))
subscription1.unsubscribe()
// 开始输入
// 1
// 勾销订阅时执行 initialTeardown
// 完结输入
至此,由文章结尾例子所引申进去的源码逻辑都看完了,对于 Subscription 的也看得差不多,再回头看看 Observable 中没提到的中央
lift
lift<R>(operator?: Operator<T, R>): Observable<R> {
const observable = new Observable<R>();
observable.source = this;
observable.operator = operator;
return observable;
}
lift 通过 new Observable 返回新的 observable,并且标记了 source 和 operator,这是为了不便链式操作,在以后版本中,官网曾经不倡议开发者间接调用这个办法了,次要是供应 rxjs 外部泛滥的 operators 应用
forEach
forEach(next: (value: T) => void, promiseCtor?: PromiseConstructorLike): Promise<void> {
promiseCtor = getPromiseCtor(promiseCtor);
return new promiseCtor<void>((resolve, reject) => {
// Must be declared in a separate statement to avoid a ReferenceError when
// accessing subscription below in the closure due to Temporal Dead Zone.
let subscription: Subscription;
subscription = this.subscribe((value) => {
try {next(value);
} catch (err) {reject(err);
subscription?.unsubscribe();}
},
reject,
resolve
);
}) as Promise<void>;
}
getPromiseCtor 能够了解为 js 中的 Promise 对象,次要看调用 this.subscribe 这一句
subscribe(next?: ((value: T) => void) | null, error?: ((error: any) => void) | null, complete?: (() => void) | null): Subscription;
subscribe 的函数定义后面曾经看过了,这里调用 subscribe 传入的三个参数与 next、error、complete 一一对应,next 会继续调用直到 complete 执行,这个 promise 才算是完结了,所以如果你想要应用这个办法,就必须确保所应用的 observable 最终会调用 complete 办法,否则意味着 promise 不会完结,forEach 也就始终处于 hung up 的状态
个别状况下,咱们是不会应用到这个办法的,因为很多须要 forEach 的场景齐全能够用操作符来代替,比方针对 forEach 源码中给的一个应用例子
import {interval} from ‘rxjs’;
import {take} from ‘rxjs/operators’;
const source$ = interval(1000).pipe(take(4));
async function getTotal() {
let total = 0;
await source$.forEach(value => {
total += value;
console.log('observable ->', value);
});
return total;
}
getTotal().then(
total => console.log(‘Total:’, total)
)
如果用 reduce 操作符来实现会更加直观
import {interval} from ‘rxjs’;
import {reduce} from ‘rxjs/operators’;
const source$ = interval(1000).pipe(take(4));
source$.pipe(
reduce((acc, value) => {
console.log('observable ->', value);
return acc + value;
}, 0)
).subscribe(total => console.log(‘Total:’, total));
pipe
pipe 的类型签名很多,实际上是为了辅助类型的主动推导,只有 pipe 传入的参数数量在 9 个及以内,则就能够正确推导出类型,而一旦超过 9 个,主动推导就生效了,必须使用者本人指定类型
// node_modules/rxjs/src/internal/Observable.ts
pipe(…operations: OperatorFunction<any, any>[]): Observable<any> {
return pipeFromArray(operations)(this);
}
// node_modules/rxjs/src/internal/util/identity.ts
export function identity<T>(x: T): T {
return x;
}
// node_modules/rxjs/src/internal/util/pipe.ts
/* @internal /
export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
if (fns.length === 0) {
return identity as UnaryFunction<any, any>;
}
if (fns.length === 1) {
return fns[0];
}
return function piped(input: T): R {
return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);
};
}
pipe 调用了 pipeFromArray,pipeFromArray 的参数 fns 即所有传入 pipe 的参数,也就是操作符 operator
如果没有传入任何操作符办法,则间接返回 Observable 对象;如果只传入了一个操作符办法,则间接返回该操作符办法,否则返回一个函数,将在函数体里通过 reduce 办法顺次执行所有的操作符,执行的逻辑是将上一个操作符办法返回的值作为下一个操作符的参数,就像是一个管道串联起了所有的操作符,这里借鉴了函数式编程的思维,通过一个 pipe 函数将函数组合起来,上一个函数的输入成为下一个函数的输出参数
最初,不论是传入了几个操作符,最终返回的都是一个 Observable 的实例,所以能够接着调用 subscribe 办法
toPromise
// node_modules/rxjs/src/internal/Observable.ts
toPromise(promiseCtor?: PromiseConstructorLike): Promise<T | undefined> {
promiseCtor = getPromiseCtor(promiseCtor);
return new promiseCtor((resolve, reject) => {
let value: T | undefined;
this.subscribe((x: T) => (value = x),
(err: any) => reject(err),
() => resolve(value)
);
}) as Promise<T | undefined>;
}
toPromise 办法跟下面提到的 forEach 的实现很类似,将一个 Observable 对象转换成了一个 Promise 对象,会在 .then 的时候返回这个 Observable 最初一个值,这个办法曾经被标记为 deprecated 了,将会在 v8.x 中被移除,并且作者在源码正文里倡议咱们应用 firstValueFrom 和 lastValueFrom 来代替这个办法
const source$ = interval(100).pipe(take(4))
source$.toPromise().then(total => console.log(total))
// 相当于
const source$ = interval(100).pipe(take(4))
lastValueFrom(source$).then(total => console.log(total))
// 输入
// 3
用法上看着如同区别不大,实际上 lastValueFrom 的实现和 toPromise 也差不多,但从办法名上来说显然更加语义化
// node_modules/rxjs/src/internal/lastValueFrom.ts
export function lastValueFrom<T, D>(source: Observable<T>, config?: LastValueFromConfig<D>): Promise<T | D> {
const hasConfig = typeof config === ‘object’;
return new Promise<T | D>((resolve, reject) => {
let _hasValue = false;
let _value: T;
source.subscribe({next: (value) => {
_value = value;
_hasValue = true;
},
error: reject,
complete: () => {if (_hasValue) {resolve(_value);
} else if (hasConfig) {resolve(config!.defaultValue);
} else {reject(new EmptyError());
}
},
});
});
}
小结
Observable、Subscription 局部的代码还是比较简单的,并没有什么七拐八拐的逻辑,官网源码中的正文也十分具体(甚至在正文里写 example),几乎就是在文档里写代码,再加上 ts 的助攻,能够说源码看起来没啥难度,当然了,这只是 rxjs 零碎中两个最根底的概念,个别状况下应用 rxjs 是不会用到这两个概念的,Subject 和 operators 才是常客