乐趣区

从观察者模式到迭代器模式系统讲解-RxJS-Observable一

RxJS 是 Reactive Extensions for JavaScript 的缩写,起源于 Reactive Extensions,是一个基于可观测数据流 Stream 结合观察者模式和迭代器模式的一种异步编程的应用库。RxJS 是 Reactive Extensions 在 JavaScript 上的实现。

Reactive Extensions(Rx)是对 LINQ 的一种扩展,他的目标是对异步的集合进行操作,也就是说,集合中的元素是异步填充的,比如说从 Web
或者云端获取数据然后对集合进行填充。LINQ(Language Integrated Query)语言集成查询是一组用于 C# 和
Visual Basic 语言的扩展。它允许编写 C# 或者 Visual Basic 代码以操作内存数据的方式,查询数据库。

RxJS 的主要功能是利用响应式编程的模式来实现 JavaScript 的 异步式编程(现前端主流框架 Vue React Angular 都是响应式的开发框架)。

RxJS 是基于观察者模式和迭代器模式以函数式编程思维来实现的。学习 RxJS 之前我们需要先了解观察者模式和迭代器模式,还要对 Stream 流的概念有所认识。下面我们将对其逐一进行介绍,准备好了吗?让我们现在就开始吧。

RxJS 前置知识点

观察者模式

观察者模式又叫发布订阅模式(Publish/Subscribe),它是一种一对多的关系,让多个观察者(Obesver)同时监听一个主题(Subject),这个主题也就是被观察者(Observable),被观察者的状态发生变化时就会通知所有的观察者,使得它们能够接收到更新的内容。

观察者模式主题和观察者是分离的,不是主动触发而是被动监听。

举个常见的例子,例如微信公众号关注者和微信公众号之间的信息订阅。当微信用户关注微信公众号 webinfoq就是一个订阅过程,webinfoq负责发布内容和信息,webinfoq有内容推送时,webinfoq的关注者就能收到最新发布的内容。这里,关注公众号的朋友就是观察者的角色,公众号 webinfoq 就是被观察者的角色。

示例代码:

// 定义一个主题类(被观察者 / 发布者)class Subject {constructor() {this.observers = [];   // 记录订阅者(观察者)的集合
    this.state = 0;        // 发布的初始状态
  }
  getState() {return this.state;}
  setState(state) {        
    this.state = state;    // 推送新信息
    this.notify();         // 通知订阅者有更新了}
  attach(observer) {this.observers.push(observer);   // 对观察者进行登记
  }
  notify() {
    // 遍历观察者集合,一一进行通知
    this.observers.forEach(observer = {observer.update();   
    })
  }
}

// 定义一个观察者(订阅)类
class Observer {constructor(name, subject) {  
    this.name = name;   // name 表示观察者的标识
    this.subject = subject;  // 观察者订阅主题
    this.subject.attach(this);  // 向登记处传入观察者实体
  }
  update() {console.log(`${this.name} update, state: ${this.subject.getState()}`);
  }
}

// 创建一个主题
let subject = new Subject();

// 创建三个观察者:observer$1 observer$2 observer$3
let observer$1 = new Observer("observer$1", subject);
let observer$2 = new Observer("observer$2", subject);
let observer$3 = new Observer("observer$3", subject);

// 主题有更新
subject.setState(1);
subject.setState(2);
subject.setState(3);

// 输出结果
// observer$1 update, state: 1
// observer$1 update, state: 1
// observer$1 update, state: 1
// observer$2 update, state: 2
// observer$2 update, state: 2
// observer$2 update, state: 2
// observer$3 update, state: 3
// observer$3 update, state: 3
// observer$3 update, state: 3

迭代器模式

迭代器(Iterator)模式又叫游标(Sursor)模式,迭代器具有 next 方法,可以顺序访问一个聚合对象中的各个元素,而不需要暴露该对象的内部表现。

迭代器模式可以把迭代的过程从从业务逻辑中分离出来,迭代器将使用者和目标对象隔离开来,即使不了解对象的内部构造,也可以通过迭代器提供的方法顺序访问其每个元素。

了解更多可迭代对象:「JS 篇」你不知道的 JS 知识点总结(一)

使用 ES5 创建一个迭代器

// 创建一个迭代类,传入目标对象
function Iterator(container) {
  this.list = container.list;
  this.index = 0;

  // 定义私有的 next 方法,执行迭代
  this.next = function() {if(this.hasNext()) { // 判断是否迭代完毕
      return {value: this.list[this.index++],
        done: false
      }
    }
    return {value: null, done: true}
  }
  this.hasNext = function() {if(this.index >= this.list.length) {return false;}
    return true;
  }
}

// 定义目标对象
function Container(list) {
  this.list = list;
  this.getIterator = function() {return new Iterator(this); // 用户返回一个迭代器
  }
}

// 调用
var container = new Container([1, 2, 3, 4, 5]);
var iterator = container.getIterator();
iterator.next();  // {value: 1, done: false}
iterator.next();  // {value: 2, done: false}
iterator.next();  // {value: 3, done: false}
iterator.next();  // {value: 4, done: false}
iterator.next();  // {value: 5, done: false}
iterator.next();  // {value: null, done: true}

使用 ES6 构造一个迭代器

class Iterator {constructor(container) {
    this.list = container.list;
    this.index = 0;
  }
  next() {if(this.hasNext()) {
      return {value: this.list[this.index++],
        done: false
      }
    }
    return {value: null, done: true}
  }
  hasNext() {if(this.index >= this.list.length) {return false;}
    return true;
  }
}

class Container {constructor(list) {this.list = list;}
  getIterator() {return new Iterator(this);
  }
}

let container = new Container([1, 2, 3, 4, 5]);
let iterator = container.getIterator();

iterator.next();  // {value: 1, done: false}
iterator.next();  // {value: 2, done: false}
iterator.next();  // {value: 3, done: false}
iterator.next();  // {value: 4, done: false}
iterator.next();  // {value: 5, done: false}
iterator.next();  // {value: null, done: true}

使用 ES6 的 Symbol.iterator 创建一个迭代器

var list = [1, 2, 3, 4, 5];
var iterator = list[Symbol.iterator]();

iterator.next();  // {value: 1, done: false}
iterator.next();  // {value: 2, done: false}
iterator.next();  // {value: 3, done: false}
iterator.next();  // {value: 4, done: false}
iterator.next();  // {value: 5, done: false}
iterator.next();  // {value: null, done: true}

通过上边的示例代码我们可以得知,我们不了解对象的内部构造,但是可以通过调用迭代器提供的 next() 方法就能顺序访问其每个元素。

Stream 流

在这里可以将一系列的鼠标点击、键盘点击产生的事件和将要处理的元素集合看作一种流,流的特点是数据源的本身是无限的,流在 管道 中传输,并且可以在 管道 的节点上进行处理,比如筛选,排序,聚合等。

流数据源(source)经过数据转换等中间操作的处理,最后由最终操作得到前面处理的结果,每次转换原有 Stream 对象不改变,返回一个新的 Stream 对象(可以有多次转换),这就允许对其操作可以像链条一样排列,变成一个管道。

为了对 stream 有一个更感性的认识,我们说点击事件可以看作一种 stream,在介绍 RxJS 之前,我们不妨先看一个 RxJS 官网上的例子(列举官方的例子更能充分体现 RxJS 是基于可观测数据流 Stream 的)。

通常,注册一个事件侦听器是这样的。

document.addEventListener('click', () => console.log('Clicked!'));

使用 RxJS 可以创建一个 observable

import {fromEvent} from 'rxjs';
fromEvent(document, 'click').subscribe(() => console.log('Clicked!')); 

自定义源创建一个 Observable

结合上边讲到流和设计模式,为了方便我们对 RxJS 有进一步的认识,我们就用代码自己来实现一个 Obserable

其实 Observable 实际上就是一个函数,它接收一个 Observer 对象作为参数,返回一个函数用来取消订阅。Observer 对象可以声明 next、err、complete 方法来处理流的不同状态。

首先我们定义数据源 Source

// 创建数据源
class Source {constructor() {
    this.state = 0;
    this.data = setInterval(() => this.emit(this.state++), 200);
  }
  
  emit(state) {
    const limit = 10;  // 定义数据上限
    if (this.onData) {this.onData(state);  // 产生数据
    }
    if (state === limit) {if (this.onComplete) {this.onComplete();  // 数据终止
      }
      this.destroy();}
  }
  
  destroy() {  // 停止定时器,清除数据
    clearInterval(this.data);
  }
}

创建一个 Observable

// 创建 Observable
class Observable {constructor() {}
  getStream() {return new Source();
  }
  subscribe(observer) {
    // 获取流数据源
    this.stream = this.getStream();  
    
    // 转换
    this.stream.onData = (e) => observer.next(e); // 处理流数据
    this.stream.onError = (err) => observer.error(err);  // 处理异常
    this.stream.onComplete = () => observer.complete();  // 处理流数据终止
    
    // 返回一个函数
    return () => {this.stream.destroy();        
    }
  }
}

调用 subscribe 进行订阅

const observable = new Observable();
// 订阅
let observer = {next(data) {console.log(data); },
  error(err) {console.error(err); },
  complete() { console.log('done')}
}
const unsubscribe = observable.subscribe(observer);

输出结果

我们可以调用 unsubscribe 取消订阅

//0.5 后取消订阅
setTimeout(unsubscribe, 500);

我们可以看到 Observable 作为生产者与观察者之间的桥梁,并返回一种方法来解除生产者与观察者之间的联系,其中观察者用于处理时间序列上数据流。

RxJS(Reactive Extensions for JavaScript)

介绍完 RxJS 的一些前置知识点,下面就让我们一起来认识下什么是 RxJS 吧。

RxJS 中含有两个基本概念:ObservablesObserver。Observables 作为被观察者,是一个值或事件的流集合;而 Observer 则作为观察者,根据 Observables 进行处理。

Observables 与 Observer 之间的订阅发布关系(观察者模式) 如下:

  • 订阅:Observer 通过 Observable 提供的 subscribe() 方法订阅 Observable。
  • 发布:Observable 通过回调 next 方法向 Observer 发布事件。

Observable 属于全新的 push 体系,让我们先了解下什么是 pull 体系和 push 体系吧。

Pull vs Push

PullPush 是两种不同的协议,描述了数据生产者如何与数据消费者进行通信。

生产者 消费者
pull 被请求的时候产生数据 决定何时请求数据
push 按自己的节奏生产数据 对接收的数据进行处理

Pull 体系中,数据的消费者决定何时从数据生产者那里获取数据,而生产者自身并不会意识到什么时候数据将会被发送给消费者。

每一个 JavaScript 函数 都是一个 Pull 体系,函数是数据的生产者,调用函数的代码通过 ‘ 拉出 ’ 一个单一的返回值来消费该数据。

function add(x, y) {console.log('Hello');
  return x + y;
}
const x = add(4, 5); 

ES6 介绍了 Iterator 迭代器 和 Generator 生成器,另一种 Pull 体系,调用 iterator.next() 的代码是消费者,可从中拉取多个值。

Push 体系中,数据的生产者决定何时发送数据给消费者,消费者不会在接收数据之前意识到它将要接收这个数据。

Promise 是当今 JS 中最常见的 Push 体系,一个 Promise(数据的生产者)发送一个 resolved value(成功状态的值)来执行一个回调(数据消费者)。但是不同于函数的地方的是:Promise 决定着何时数据才被推送至这个回调函数。

RxJS 引入了 Observable (可观察对象),一个全新的 Push 体系。一个可观察对象是一个产生多值的生产者,当产生新数据的时候,会主动 “ 推送给 ” Observer (观察者)。

Observable(可观察对象)是基于推送(Push)运行时执行(lazy)的多值集合。

MagicQ 单值 多值
拉取(Pull) Function Iterator
推送(Push) Promise Observable

ObservablePromise 之间的差异:

  • Promise:只能返回单个值,不可取消,要么 resolve 要么 reject 并且只响应一次。
  • Observable:随着时间的推移发出多个值,可以调用 unsubscribe() 取消订阅,支持 map、filter、reduce 等操作符,延迟执行,当订阅的时候才会开始执行,可以响应多次。

RxJS 之 Observable

使用 RxJS 我们可以使用 npm 进行安装(更多使用方法请参考 github):

npm install rxjs 

需要注意的是,很多人认为 RxJS 中的所有操作都是异步的,但其实这个观念是错的。RxJS 的核心特性是它的异步处理能力,但它也是可以用来处理同步的行为。具体示例如下:

import {Observable} from 'rxjs';
 
const observable = new Observable(subscriber => {subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();});
 
console.log('start');
observable.subscribe({next(x) {console.log(x); },
  error(err) {console.error(err); },
  complete() { console.log('done'); }
});
console.log('end');

以上代码运行后,控制台的输出结果:

start
1
2
3
done
end

当然我们也可以用它处理异步行为:

import {Observable} from 'rxjs';
 
const observable = new Observable(subscriber => {subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  setTimeout(() => {subscriber.next(4);
    subscriber.complete();}, 1000);
});
 
console.log('start');
observable.subscribe({next(x) {console.log(x); },
  error(err) {console.error(err); },
  complete() { console.log('done'); }
});
console.log('end');

代码运行后的输出结果为:

start
1
2
3
end
4
done

RxJS 使用创建类操作符创建 Observable

RxJS 中提供了很多操作符 Operators,下篇文章我们将对 Operators 进行介绍,创建类操作符(Creation Operator)用于创建 Observable 对象。

官网列举的一些创建类操作符如下:

  • ajax
  • bindCallback
  • bindNodeCallback
  • defer
  • empty
  • from
  • fromEvent
  • fromEventPattern
  • generate
  • interval
  • of
  • range
  • throwError
  • timer
  • iif

最后我们简单的来看一下,如何使用 from 创建一个 Observable(关于操作符的介绍我们将在下篇文章进行详细介绍,保持关注哦)。

from:可以把数组、Promise、以及 Iterable 转化为 Observable。

// 将数组转换为 Observable:import {from} from 'rxjs';

const array = [10, 20, 30];
const result = from(array);

result.subscribe(x => console.log(x));

// Logs:
// 10
// 20
// 30

由于 RxJS 涉及到的概念和知识点比较宽泛和复杂,我们需要一步一步的去理解和掌握它,最终可以做到知其然亦知其所以然。接下来文章中会继续介绍 RxJS 中涉及到知识点,关注此公众号 webinfoq 不要错过哦。

退出移动版