原文:Comprehensive Guide to Higher-Order RxJs Mapping Operators: switchMap, mergeMap, concatMap (and exhaustMap)
咱们日常发现的一些最罕用的 RxJs 操作符是 RxJs 高阶映射操作符:switchMap、mergeMap、concatMap 和 exhaustMap。
例如,咱们程序中的大部分网络调用都将应用这些运算符之一实现,因而相熟它们对于编写简直所有反应式程序至关重要。
晓得在给定状况下应用哪个运算符(以及为什么)可能有点令人困惑,咱们常常想晓得这些运算符是如何真正工作的,以及为什么它们会这样命名。
这些运算符可能看起来不相干,但咱们真的很想一口气学习它们,因为抉择谬误的运算符可能会意外地导致咱们程序中的奥妙问题。
Why are the mapping operators a bit confusing?
这样做是有起因的:为了了解这些操作符,咱们首先须要理解每个外部应用的 Observable 组合策略。
与其试图本人了解 switchMap,不如先理解什么是 Observable 切换;咱们须要先学习 Observable 连贯等,而不是间接深刻 concatMap。
这就是咱们在这篇文章中要做的事件,咱们将按逻辑程序学习 concat、merge、switch 和 exhaust 策略及其对应的映射运算符:concatMap、mergeMap、switchMap 和 exhaustMap。
咱们将联合应用 marble 图和一些理论示例(包含运行代码)来解释这些概念。
最初,您将确切地晓得这些映射运算符中的每一个是如何工作的,何时应用,为什么应用,以及它们名称的起因。
The RxJs Map Operator
让咱们从头开始,介绍这些映射运算符的个别作用。
正如运算符的名称所暗示的那样,他们正在做某种映射:但到底是什么被映射了?咱们先来看看 RxJs Map 操作符的弹珠图:
How the base Map Operator works
应用 map 运算符,咱们能够获取输出流(值为 1、2、3),并从中创立派生的映射输入流(值为 10、20、30)。
底部输入流的值是通过获取输出流的值并将它们利用到一个函数来取得的:这个函数只是将这些值乘以 10。
所以 map 操作符就是映射输出 observable 的值。以下是咱们如何应用它来解决 HTTP 申请的示例:
const http$ : Observable<Course[]> = this.http.get('/api/courses');
http$
.pipe(tap(() => console.log('HTTP request executed')),
map(res => Object.values(res['payload']))
)
.subscribe(courses => console.log("courses", courses)
);
在这个例子中,咱们正在创立一个 HTTP observable 来进行后端调用,咱们正在订阅它。observable 将收回后端 HTTP 响应的值,它是一个 JSON 对象。
在这种状况下,HTTP 响应将数据包装在无效负载属性中,因而为了获取数据,咱们利用了 RxJs 映射运算符。而后映射函数将映射 JSON 响应负载并提取负载属性的值。
既然咱们曾经回顾了根本映射的工作原理,当初让咱们来谈谈高阶映射。
What is Higher-Order Observable Mapping?
在高阶映射中,咱们不是将像 1 这样的一般值映射到另一个像 10 这样的值,而是将一个值映射到一个 Observable 中!
后果是一个高阶的 Observable。它只是一个 Observable,但它的值自身也是 Observable,咱们能够独自订阅。
这听起来可能有些牵强,但实际上,这种类型的映射始终在产生。让咱们举一个这种类型映射的理论例子。假如例如,咱们有一个 Angular Reactive Form,它通过 Observable 随工夫收回无效的表单值:
@Component({
selector: 'course-dialog',
templateUrl: './course-dialog.component.html'
})
export class CourseDialogComponent implements AfterViewInit {
form: FormGroup;
course:Course;
@ViewChild('saveButton') saveButton: ElementRef;
constructor(
private fb: FormBuilder,
private dialogRef: MatDialogRef<CourseDialogComponent>,
@Inject(MAT_DIALOG_DATA) course:Course) {
this.course = course;
this.form = fb.group({
description: [course.description,
Validators.required],
category: [course.category, Validators.required],
releasedAt: [moment(), Validators.required],
longDescription: [course.longDescription,
Validators.required]
});
}
}
Reactive Form 提供了一个 Observable this.form.valueChanges,它在用户与表单交互时收回最新的表单值。这将是咱们的源 Observable。
咱们想要做的是在这些值随着工夫的推移收回时至多保留其中一些值,以实现表单草稿预保留性能。这样,随着用户填写表单,数据会逐步保留,从而防止因为意外从新加载而失落整个表单数据。
Why Higher-Order Observables?
为了实现表单草稿保留性能,咱们须要获取表单值,而后创立第二个执行后端保留的 HTTP observable,而后订阅它。
咱们能够尝试手动实现所有这些,然而咱们会陷入嵌套的订阅反模式:
this.form.valueChanges
.subscribe(
formValue => {
const httpPost$ =
this.http.put(`/api/course/${courseId}`, formValue);
httpPost$.subscribe(
res => ... handle successful save ...
err => ... handle save error ...
);
}
);
正如咱们所见,这会导致咱们的代码很快在多个级别嵌套,这是咱们在应用 RxJs 时首先要防止的问题之一。
让咱们称这个新的 httpPost$ Observable 为外部 Observable,因为它是在外部嵌套代码块中创立的。
Avoiding nested subscriptions
咱们心愿以更不便的形式实现所有这些过程:咱们心愿获取表单值,并将其映射到保留 Observable 中。这将无效地创立一个高阶 Observable,其中每个值对应一个保留申请。
而后咱们心愿通明地订阅这些网络 Observable 中的每一个,并且一次性间接接管网络响应,以防止任何嵌套。
如果咱们有某种更高阶的 RxJs 映射运算符,咱们就能够做到这所有!那为什么咱们须要四个不同的操作符呢?
为了了解这一点,设想一下如果 valueChanges observable 疾速间断收回多个表单值并且保留操作须要一些工夫来实现,会产生什么状况:
- 咱们应该期待一个保留申请实现后再进行另一次保留吗?
- 咱们应该并行进行屡次保留吗?
- 咱们应该勾销正在进行的保留并开始新的保留吗?
- 当一个曾经在进行中时,咱们应该疏忽新的保留尝试吗?
在摸索这些用例中的每一个之前,让咱们回到下面的嵌套订阅代码。
在嵌套订阅示例中,咱们实际上是并行触发保留操作,这不是咱们想要的,因为没有强有力的保障后端将按程序解决保留,并且最初一个无效的表单值的确是存储在 后端。
让咱们看看如何确保仅在上一次保留实现后才实现保留申请。
Understanding Observable Concatenation
为了实现程序保留,咱们将引入 Observable 连贯的新概念。在此代码示例中,咱们应用 concat() RxJs 函数连贯两个示例 observable:
const series1$ = of('a', 'b');
const series2$ = of('x', 'y');
const result$ = concat(series1$, series2$);
result$.subscribe(console.log);
在应用 of 创立函数创立了两个 Observables series1$ 和 series2$ 之后,咱们创立了第三个 result$ Observable,它是串联 series1$ 和 series2$ 的后果。
这是该程序的控制台输入,显示了后果 Observable 收回的值:
a
b
x
y
如咱们所见,这些值是将 series1$ 的值与 series2$ 的值连贯在一起的后果。但这里有一个问题:这个例子能工作的起因是因为这些 Observable 正在实现!!
of() 函数将创立 Observables,它收回传递给 of() 的值,而后在收回所有值后实现 Observables。
Observable Concatenation Marble Diagram
你留神到第一个 Observable 的值 b 前面的竖线了吗?这标记着第一个具备值 a 和 b (series1$) 的 Observable 实现的工夫点。
让咱们依照时间表逐渐合成这里产生的事件:
- 两个 Observables series1$ 和 series2$ 被传递给 concat() 函数
- concat() 而后将订阅第一个 Observable series1$,但不会订阅第二个 Observable series2$(这对于了解串联至关重要)
- source1$ 收回值 a,该值立刻反映在输入 result$ Observable 中
- 留神 source2$ Observable 还没有收回值,因为它还没有被订阅
- 而后 source1$ 将收回 b 值,该值反映在输入中
- 而后 source1$ 将实现,只有在此之后 concat() 当初订阅 source2$
- 而后 source2$ 值将开始反映在输入中,直到 source2$ 实现
- 当 source2$ 实现时,result$ Observable 也将实现
- 请留神,咱们能够将任意数量的 Observable 传递给 concat(),而不仅仅是本示例中的两个
The key point about Observable Concatenation
正如咱们所看到的,Observable 连贯就是对于 Observable 的实现!咱们取第一个 Observable 并应用它的值,期待它实现,而后咱们应用下一个 Observable,依此类推,直到所有 Observable 实现。
回到咱们的高阶 Observable 映射示例,让咱们看看串联的概念如何帮忙咱们。
Using Observable Concatenation to implement sequential saves
正如咱们所见,为了确保咱们的表单值按程序保留,咱们须要获取每个表单值并将其映射到 httpPost$ Observable。
而后咱们须要订阅它,但咱们心愿在订阅下一个 httpPost$ Observable 之前实现保留。
In order to ensure sequentiality, we need to concatenate the multiple httpPost$ Observables together!
而后咱们将订阅每个 httpPost$ 并按程序解决每个申请的后果。最初,咱们须要的是一个混合了以下内容的运算符:
- 一个高阶映射操作(获取表单值并将其转换为 httpPost$ Observable)
- 应用 concat() 操作,将多个 httpPost$ Observables 连贯在一起以确保在前一个正在进行的保留首先实现之前不会进行下一个 HTTP 保留。
咱们须要的是失当命名的 RxJs concatMap Operator,它混合了高阶映射和 Observable 连贯。
The RxJs concatMap Operator
代码如下:
this.form.valueChanges
.pipe(concatMap(formValue => this.http.put(`/api/course/${courseId}`,
formValue))
)
.subscribe(
saveResult => ... handle successful save ...,
err => ... handle save error ...
);
正如咱们所见,应用像 concatMap 这样的高阶映射运算符的第一个益处是当初咱们不再有嵌套订阅。
通过应用 concatMap,当初所有表单值都将按程序发送到后端,如 Chrome DevTools Network 选项卡中所示:
Breaking down the concatMap network log diagram
正如咱们所见,只有在上一次保留实现后才会启动一个保留 HTTP 申请。以下是 concatMap 运算符如何确保申请始终按程序产生:
- concatMap 正在获取每个表单值并将其转换为保留的 HTTP Observable,称为外部 Observable
- concatMap 而后订阅外部 Observable 并将其输入发送到后果 Observable
第二个表单值可能比在后端保留前一个表单值更快 - 如果产生这种状况,新的表单值将不会立刻映射到 HTTP 申请
- 相同,concatMap 将期待先前的 HTTP Observable 实现,而后将新值映射到 HTTP Observable,订阅它并因而触发下一次保留
Observable Merging
将 Observable 串联利用于一系列 HTTP 保留操作仿佛是确保保留按预期程序产生的好办法。
然而在其余状况下,咱们心愿并行运行,而不须要期待前一个外部 Observable 实现。
为此,咱们有合并 Observable 组合策略!与 concat 不同,Merge 不会在订阅下一个 Observable 之前期待 Observable 实现。
相同,merge 同时订阅每个合并的 Observable,而后随着多个值随着工夫的推移达到,它将每个源 Observable 的值输入到组合后果 Observable。
Practical Merge Example
为了明确合并不依赖于实现,让咱们合并两个从未实现的 Observables,因为它们是 interval Observables:
const series1$ = interval(1000).pipe(map(val => val*10));
const series2$ = interval(1000).pipe(map(val => val*100));
const result$ = merge(series1$, series2$);
result$.subscribe(console.log);
应用 interval() 创立的 Observable 将每隔一秒收回值 0、1、2 等,并且永远不会实现。
请留神,咱们对这些区间 Observable 利用了几个 map 运算符,只是为了更容易在控制台输入中辨别它们。
以下是控制台中可见的前几个值:
0
0
10
100
20
200
30
300
Merging and Observable Completion
正如咱们所见,合并的源 Observable 的值在收回时立刻显示在后果 Observable 中。如果合并的 Observable 之一实现,merge 将持续收回其余 Observable 随着工夫达到的值。
请留神,如果源 Observables 实现,合并仍会以雷同的形式工作。
The Merge Marble Diagram
看另一个例子:
正如咱们所见,合并的源 Observables 的值立刻显示在输入中。直到所有合并的 Observable 实现后,后果 Observable 才会实现。
当初咱们理解了合并策略,让咱们看看它如何在高阶 Observable 映射的上下文中应用。
回到咱们之前的表单草稿保留示例,很显著在这种状况下咱们须要 concatMap 而不是 mergeMap,因为咱们不心愿保留并行产生。
让咱们看看如果咱们不小心抉择了 mergeMap 会产生什么:
this.form.valueChanges
.pipe(
mergeMap(formValue =>
this.http.put(`/api/course/${courseId}`,
formValue))
)
.subscribe(
saveResult => ... handle successful save ...,
err => ... handle save error ...
);
当初假如用户与表单交互并开始相当快地输出数据。在这种状况下,咱们当初会在网络日志中看到多个并行运行的保留申请:
正如咱们所看到的,申请是并行产生的,在这种状况下是一个谬误!在高负载下,这些申请可能会被乱序解决。
Observable Switching
当初咱们来谈谈另一个 Observable 组合策略:切换。切换的概念更靠近于合并而不是串联,因为咱们不期待任何 Observable 终止。
然而在切换时,与合并不同,如果一个新的 Observable 开始收回值,咱们将在订阅新的 Observable 之前勾销订阅之前的 Observable。
Observable 切换就是为了确保未应用的 Observables 的勾销订阅逻辑被触发,从而能够开释资源!
Switch Marble Diagram
留神对角线,这些不是偶尔的!在切换策略的状况下,在图中示意高阶 Observable 很重要,它是图像的顶行。
这个高阶 Observable 收回的值自身就是 Observable。
对角线从高阶 Observable 顶线分叉的那一刻,是 Observable 值被 switch 收回和订阅的那一刻。
Breaking down the switch Marble Diagram
这是这张图中产生的事件:
- 高阶 Observable 收回它的第一个外部 Observable (a-b-c-d),它被订阅(通过 switch 策略实现)
- 第一个外部 Observable (a-b-c-d) 收回值 a 和 b,它们立刻反映在输入中
- 但随后第二个外部 Observable (e-f-g) 被发射,这触发了第一个外部 Observable (a-b-c-d) 的勾销订阅,这是切换的要害局部
- 而后第二个外部 Observable (e-f-g) 开始收回新值,这些值反映在输入中
- 但请留神,第一个外部 Observable (a-b-c-d) 同时仍在收回新值 c 和 d
- 然而,这些起初的值没有反映在输入中,那是因为咱们同时勾销了第一个外部 Observable (a-b-c-d) 的订阅
咱们当初能够了解为什么必须以这种不寻常的形式绘制图表,用对角线:这是因为咱们须要在每个外部 Observable 被订阅(或勾销订阅)时直观地示意,这产生在对角线从源高阶 Observable。
The RxJs switchMap Operator
而后让咱们采纳切换策略并将其利用于高阶映射。假如咱们有一个一般的输出流,它收回值 1、3 和 5。
而后咱们将每个值映射到一个 Observable,就像咱们在 concatMap 和 mergeMap 的状况下所做的那样,并取得一个更高阶的 Observable。
如果咱们当初在收回的外部 Observable 之间切换,而不是连贯或合并它们,咱们最终会失去 switchMap 运算符:
Breaking down the switchMap Marble Diagram
这是该运算符的工作原理:
- 源 observable 收回值 1、3 和 5
- 而后通过利用映射函数将这些值转换为 Observable
- 映射的外部 Observable 被 switchMap 订阅
- 当外部 Observables 收回一个值时,该值会立刻反映在输入中
- 然而如果在前一个 Observable 有机会实现之前收回了像 5 这样的新值,则前一个外部 Observable (30-30-30) 将被勾销订阅,并且它的值将不再反映在输入中
- 留神上图中红色的 30-30-30 外部 Observable:最初 30 个值没有收回,因为 30-30-30 外部 Observable 被勾销订阅
如咱们所见,Observable 切换就是确保咱们从未应用的 Observable 触发勾销订阅逻辑。当初让咱们看看 switchMap 的运行状况!
Search TypeAhead – switchMap Operator Example
switchMap 的一个十分常见的用例是搜寻 Typeahead。首先让咱们定义源 Observable,其值自身将触发搜寻申请。
这个源 Observable 将收回值,这些值是用户在输出中键入的搜寻文本:
const searchText$: Observable<string> =
fromEvent<any>(this.input.nativeElement, 'keyup')
.pipe(map(event => event.target.value),
startWith('')
)
.subscribe(console.log);
此源 Observable 链接到用户键入其搜寻的输出文本字段。当用户输出单词“Hello World”作为搜寻时,这些是 searchText$ 收回的值:
Debouncing and removing duplicates from a Typeahead
请留神反复值,要么是因为应用两个单词之间的空格,要么是应用 Shift 键将字母 H 和 W 大写。
为了防止将所有这些值作为独自的搜寻申请发送到后端,让咱们应用 debounceTime 运算符期待用户输出稳固:
const searchText$: Observable<string> =
fromEvent<any>(this.input.nativeElement, 'keyup')
.pipe(map(event => event.target.value),
startWith(''),
debounceTime(400)
)
.subscribe(console.log);
应用此运算符,如果用户以失常速度键入,则 searchText$ 的输入中当初只有一个值:Hello World
这曾经比咱们之前的要好得多,当初只有在稳固至多 400 毫秒时才会收回值!
然而如果用户在思考搜寻时输出迟缓,以至于两个值之间须要超过 400 毫秒,那么搜寻流可能如下所示:
此外,用户能够键入一个值,按退格键并再次键入,这可能会导致反复的搜寻值。咱们能够通过增加 distinctUntilChanged 操作符来避免反复搜寻的产生。
Cancelling obsolete searches in a Typeahead
但更重要的是,咱们须要一种办法来勾销以前的搜寻,因为新的搜寻开始了。
咱们在这里要做的是将每个搜寻字符串转换为后端搜寻申请并订阅它,并在两个间断的搜寻申请之间利用切换策略,如果触发新的搜寻,则勾销之前的搜寻。
这正是 switchMap 运算符将要做的!这是应用它的 Typeahead 逻辑的最终实现:
const searchText$: Observable<string> =
fromEvent<any>(this.input.nativeElement, 'keyup')
.pipe(map(event => event.target.value),
startWith(''),
debounceTime(400),
distinctUntilChanged());
const lessons$: Observable<Lesson[]> = searchText$
.pipe(switchMap(search => this.loadLessons(search))
)
.subscribe();
function loadLessons(search:string): Observable<Lesson[]> {const params = new HttpParams().set('search', search);
return this.http.get(`/api/lessons/${coursesId}`, {params});
}
switchMap Demo with a Typeahead
当初让咱们看看 switchMap 操作符的作用!如果用户在搜寻栏上输出,而后犹豫并输出其余内容,咱们通常会在网络日志中看到以下内容:
正如咱们所看到的,之前的一些搜寻在进行时已被勾销,这很棒,因为这将开释可用于其余事件的服务器资源。
The Exhaust Strategy
switchMap 操作符是预输出场景的现实抉择,但在其余状况下,咱们想要做的是疏忽源 Observable 中的新值,直到前一个值被齐全解决。
例如,假如咱们正在触发后端保留申请以响应单击保留按钮。咱们可能首先尝试应用 concatMap 运算符来实现这一点,以确保保留操作按程序产生:
fromEvent(this.saveButton.nativeElement, 'click')
.pipe(concatMap(() => this.saveCourse(this.form.value))
)
.subscribe();
这确保保留按程序实现,然而如果用户屡次单击保留按钮会产生什么?以下是咱们将在网络日志中看到的内容:
正如咱们所见,每次点击都会触发本人的保留:如果咱们点击 20 次,咱们会失去 20 次保留!在这种状况下,咱们想要的不仅仅是确保按程序进行保留。
咱们还心愿可能疏忽点击,但前提是保留曾经在进行中。排气 Observable 组合策略将容许咱们做到这一点。
Exhaust Marble Diagram
就像以前一样,咱们在第一行有一个更高阶的 Observable,它的值自身就是 Observable,从第一行分叉进去。这是这张图中产生的事件:
- 就像 switch 的状况一样,exhaust 订阅第一个外部 Observable (a-b-c)
像平常一样,值 a、b 和 c 会立刻反映在输入中 - 而后收回第二个外部 Observable (d-e-f),而第一个 Observable (a-b-c) 仍在进行中
- 第二个 Observable 被排放策略抛弃,并且不会被订阅(这是排放的要害局部)
只有在第一个 Observable (a-b-c) 实现后,排气策略才会订阅新的 Observable - 当第三个 Observable (g-h-i) 收回时,第一个 Observable (a-b-c) 曾经实现,所以第三个 Observable 不会被抛弃,会被订阅
- 而后,第三个 Observable 的值 g-h-i 将显示在后果 Observable 的输入中,与输入中不存在的值 d-e-f 不同
就像 concat、merge 和 switch 的状况一样,咱们当初能够在高阶映射的上下文中利用 exhaust 策略。
The RxJs exhaustMap Operator
当初让咱们看看 exhaustMap 操作符的弹珠图。让咱们记住,与上图的第一行不同,源 Observable 1-3-5 收回的值不是 Observable。
相同,这些值能够是例如鼠标点击:
所以这是在排放地图图的状况下产生的事件:
- 收回值 1,并创立外部 Observable 10-10-10
- Observable 10-10-10 收回所有值并在源 Observable 中收回值 3 之前实现,因而所有 10-10-10 值在输入中收回
- 在输出中收回一个新值 3,触发一个新的 30-30-30 外部 Observable
- 然而当初,尽管 30-30-30 仍在运行,但咱们在源 Observable 中失去了一个新值 5
- 这个值 5 被排气策略抛弃,这意味着从未创立 50-50-50 Observable,因而 50-50-50 值从未呈现在输入中
A Practical Example for exhaustMap
当初让咱们将这个新的 exhaustMap Operator 利用到咱们的保留按钮场景中:
fromEvent(this.saveButton.nativeElement, 'click')
.pipe(exhaustMap(() => this.saveCourse(this.form.value))
)
.subscribe();
如果咱们当初点击保留,假如间断 5 次,咱们将取得以下网络日志:
正如咱们所看到的,咱们在保留申请仍在进行时所做的点击被忽略了,正如预期的那样!
请留神,如果咱们间断点击例如 20 次,最终正在进行的保留申请将实现,而后第二个保留申请将开始。
How to choose the right mapping Operator?
concatMap、mergeMap、switchMap 和 exhaustMap 的行为类似,因为它们都是高阶映射运算符。
但它在许多奥妙的方面也如此不同,以至于实际上没有一个运算符能够平安地指向默认值。
相同,咱们能够简略地依据用例抉择适合的运算符:
- 如果咱们须要在期待实现的同时按程序做事件,那么 concatMap 是正确的抉择
- 对于并行处理,mergeMap 是最好的抉择
- 如果咱们须要勾销逻辑,switchMap 是要走的路
- 为了在以后的 Observables 仍在进行时疏忽新的 Observables,exhaustMap 就是这样做的
总结
正如咱们所见,RxJ 的高阶映射运算符对于在响应式编程中执行一些十分常见的操作(例如网络调用)至关重要。
为了真正了解这些映射操作符及其名称,咱们首先须要重点理解底层的 Observable 组合策略 concat、merge、switch 和 exhaust。
咱们还须要意识到有一个更高阶的映射操作正在产生,其中值被转换成拆散的 Observables,并且这些 Observables 被映射运算符自身以暗藏的形式订阅。
抉择正确的算子就是抉择正确的外部 Observable 组合策略。抉择谬误的运算符通常不会导致程序立刻损坏,但随着工夫的推移可能会导致一些难以解决的问题。
更多 Jerry 的原创文章,尽在:” 汪子熙 ”: