记一次-Angular-基于-STOMP-over-WebSocket-实现流文本传输

3次阅读

共计 6300 个字符,预计需要花费 16 分钟才能阅读完成。

本文成文时间是 2019-08-18,文中提到的最新版本号是以 2019-08-18 为基准的。

前情摘要

在介绍正文之前需要先简单了解几个概念:STOMP 协议、STOMP over WebSocket 以及 RxJS。(关于这三点本文不会进行详细介绍)

什么是 STOMP?

STOMPSimple or Streaming Text Orientated Messageing Protocal,是一种简单(流)文本定向传输协议。

STOMP 是 WebSocket 更高级的子协议,它使用一个基于帧的格式来定义消息,与 HTTP 的 RequestResponse 类似。

STOMP 提供可互操作的连接格式,允许 STOMP 客户端与任意代理进行交互。STOMP 是一个非常简单易用的协议,服务器端实现起来会相对困难一些,编写客户端非常容易。

STOMP over WebSocket

STOMP over Websocket 即通过 WebSocket 建立 STOMP 连接,也就是说是在 WebSocket 连接的基础上再建立 STOMP 连接。

WebSocket 协议定义了两种类型的消息,文本和二进制,但它们的内容是未定义的。

如果说 Socket 是 C/S 架构 的 TCP 编程,那么同理 WebSocket 就是 B/ S 架构的 TCP 编程,所以需要在客户端与服务端之间定义一个机制去协商一个子协议 – 更高级别的消息协议,将它使用在 WebSocket 之上去定义每次发送消息的类别、格式和内容,等等。

子协议的使用是可选的,但无论哪种方式,客户端和服务器都需要就一些定义消息内容的协议达成一致。于是,通常选择在 WebSocket 协议上使用 STOMP 协议来定义内容格式。

RxJS

RxJS 是一个用于使用 Observables 进行反应式编程的库,可简化编写异步或基于回调的代码的过程。该项目是对 Reactive-Extensions / RxJS 的重写,具有更好的性能,更好的模块化,更好的可调试调用堆栈,同时主要保持向后兼容,并且进行了一些重大更改,从而减少了 API 操作。

RxJS 是 Reactive Extensions for JavaScript 的缩写,起源于 Reactive Extensions(Rx),Rx 是对 LINQ 的一种扩展,他的目标是对异步的集合进行操作,也就是说,集合中的元素是异步填充的,比如说从 Web 或者云端获取数据然后对集合进行填充。

LINQ(Language Integrated Query)语言集成查询是一组用于 C# 和 Visual Basic 语言的扩展。它允许编写 C# 或者 Visual Basic 代码以操作内存数据的方式,查询数据库。

RxJS 的主要功能是利用响应式编程的模式来实现 JavaScript 的异步式编程(现前端主流框架 Vue React Angular 都是响应式的开发框架)。

RxJS 是基于 观察者模式 迭代器模式 以函数式编程思维来实现的。学习 RxJS 之前我们需要先了解观察者模式和迭代器模式,还要对 Stream 流的概念有所认识。

接下来我们就一起来看下如何在的 Angular 项目中是使用 STOMP over WebSocket 进行数据流传输的。

Angular 实战

本文案例是实际 Angular 项目中的一个小功能模块,Angular 是 8.0 版本,本文涉及的组件主要包括右键菜单项负责生产消息的 context-menu-component 动态组件,进度监控 app-progress-bar 组件和日志输出组件 app-console-area

项目中使用的 UI 框架为 ng-zorro-antd,下面是 tabs 组件中的相关伪代码(省略了组件间 Input Ouput 接口):

...
<nz-tab [nzType]="'card'"> 
  <ng-template #consoleArea> 控制台 </ng-template>    
  <app-progress-bar></app-progress-bar>    
  <app-console-area></app-console-area>  
</nz-tab>
...

代码与 UI 视图的对应关系如下:

项目中使用的 STOMP 客户端是 ng2-stompjs 库,ng2-stompjs 目前的版本是 7.xx,其底层的 @stomp/stompjs 已被重写,自此与 STOMP 标准具有严格的兼容性。

ng2-stompjs 是第一个可靠地支持二进制有效负载的 STOMP JS 客户端库。

安装 ng-stompjs

$ npm install @stomp/ng-stompjs

添加和注入 @stomp/ng2-stompjs

使用前需要定义配置文件,在目录 src/app/config/ 创建 stomp.config.js 文件:

import {InjectableRxStompConfig} from '@stomp/ng2-stompjs';
import {STOMP_SERVER_BASE_URL} from 'server.config';
const _window: any = window;

export const myRxStompConfig: InjectableRxStompConfig = {
  // Which server?
  brokerURL: _window.STOMP_SERVER_BASE_URL 
              ? _window.STOMP_SERVER_BASE_URL
              : STOMP_SERVER_BASE_URL

  // Headers
  // Typical keys: login, passcode, host
  connectHeaders: {
    login: 'guest',
    passcode: 'guest'
  },

  // How often to heartbeat?
  // Interval in milliseconds, set to 0 to disable
  heartbeatIncoming: 0, // Typical value 0 - disabled
  heartbeatOutgoing: 20000, // Typical value 20000 - every 20 seconds

  // Wait in milliseconds before attempting auto reconnect
  // Set to 0 to disable
  // Typical value 500 (500 milli seconds)
  reconnectDelay: 200,

  // Will log diagnostics on console
  // It can be quite verbose, not recommended in production
  // Skip this key to stop logging to console
  debug: (msg: string): void => {console.log(new Date(), msg);
  }
};

在创建实例时,此配置将由 Angular Dependency Injection 机制注入 RxStompService 服务,在 src/app/app.module.ts 文件中,添加以下内容。

import {InjectableRxStompConfig, RxStompService, rxStompServiceFactory} from '@stomp/ng2-stompjs';
import {myRxStompConfig} from './config/stomp.config';
...
@NgModule({declarations: [/* 声明模块内部成员的地方 */],
  imports: [/* 导入的其他 module */],  
  providers: [
    {
      provide: InjectableRxStompConfig,
      useValue: myRxStompConfig
    },
    {
      provide: RxStompService,
      useFactory: rxStompServiceFactory,
      deps: [InjectableRxStompConfig]
    }
  ],
  entryComponents: [/* 不会在模版中引用到的组件 */],
  bootstrap: [AppComponent]
})

export class AppModule {}

建立连接

我们现在将 RxStompService 依赖注入 app-progress-bar 组件中,为此我们将它添加到构造函数中,如下所示:

constructor(private rxStompService: RxStompService) {}

为了能实时接收服务器发送过来的消息,我们需要在 app-progress-bar 组件的生命周期函数 OnInit 中,使用 watch() 方法进行订阅:

ngOnInit() {
  // 订阅 STOMP 消息
  this.topicSubscription = this.rxStompService.watch('/topic/message').subscribe((message: Message) => {console.log(message.body);
}
  
this.errorSubscription = this.rxStompService.watch('/topic/error').subscribe((message: Message) => {this.progressInfo = message.body;});
}

注:app-message-bar 组件默认是不显示的,当有消息传递进来时,此组件才会显示在页面中,进度达到 100% 时,会自动隐藏。

STOMP 协议是如何将消息准确发送的目的地的呢?

文章开头提到,STOMP 是一种基于帧的协议,其帧在 HTTP 上建立模型。一个框架由一个命令,一组可选的标题和一个可选的主体组成。

STOMP 服务器被建模为可以向其发送消息的一组目标,STOMP 协议将目标视为不透明字符串,其语法是特定于服务器实现的。另外,STOMP 没有定义目的地 destination 的传递语义应该是什么。目的地的传递或「消息交换」语义可能因服务器而异,甚至从目的地到目的地也不同,这使得服务器可以使用 STOMP 支持的语义进行创作。

STOMP 客户端是一个用户代理,可以在两种(可能是同时的)模式下运行:

  • 作为生产者,通过 SEND 框架将消息发送到服务器上的目的地。
  • 作为消费者,发送 SUBSCRIBE 给定目的地的帧并从服务器接收消息作为 MESSAGE 帧。

我们的案例中两种模式同时存在,发送消息的是生产者(我们上文提到的 context-menu-component 动态组件),接收消息的是消费者(app-progress-bar 组件)。消

费者可以通过订阅不同的 destination,来获得不同的推送消息,不需要开发人员去管理这些订阅与推送目的地之前的关系。

接下来就介绍下作为生产者的 context-menu-component 组件,看看它都做了哪些事情吧。

发送消息

context-menu-component 组件是触发右键时动态产生的组件,它负责通过向不同的目的地 destination 下达不同的指令,进而来实现不同的功能需求。

使用 ng-zorro-antdDropdown 组件,动态生成:

public openProjectManagerContextMenu(context: ProjectManagerContext): void {this.contextMenuComponent = this.nzDropdownService.create(context.mouseEvent,          this.contextMenuTemplate);
}

当我们点击「运行用例」按钮时,它作为生产者会向 STOMP 服务端目的地 SEND 消息指令。

  // 运行用例
 public runProjectCases(): void {const streamTaskParam: StreamTaskParam = new StreamTaskParam();
   streamTaskParam.project = this.globalService.projectInfo.projectName;
   this.openTaskProgressModal('/app/run-project-cases', JSON.stringify(streamTaskParam));
 }

从代码得知,这会将消息发送到名为的 /app/run-project-cases 的目的地,STOMP 将此目标视为不透明字符串,并且目标名称不承担传递语义。

STOMP 定义了自己的消息传输体制。首先是通过一个后台绑定的连接点 endpoint 来建立 socket 连接,然后生产者通过 SEND 方法,绑定好发送的 destination。而 topic 和 app 则是一种消息处理手段的分支,走 app/url 的消息会被你设置到的 MassageMapping 拦截到,进行你自己定义的具体逻辑处理,而走 topic/url 的消息就不会被拦截,直接到 Simplebroker 节点中将消息推送出去。(其中 simplebroker 是 spring 的一种基于内存的消息队列,你也可以使用 activeMQ,rabbitMQ 代替)。

因此目的地 /app/run-project-cases 生产出来的消息会被拦截,最终会转发到消费者 app-progress-bar 组件 的 /topic/message

接收消息

app-progress-bar 组件作为消费者使用 watch() 方法启动与代理的订阅,this.rxStompService.watch('/topic/message') 将代理到目的地为 /topic/message 的订阅上,并返回 RxJS Observable。

ngOnInit() {
  // 订阅 STOMP 消息
  this.topicSubscription = this.rxStompService.watch('/topic/message').subscribe((message: Message) => {console.log(message.body);
    // do something
  }
}

app-progress-bar 组件都做了些什么事情呢?它负责建立 STOMP 连接,从服务器端接收文本流,并将这些流进行数据解析,解析出来的数据一部分用来控制进度条的数值变化,一部分用来控制 app-console-area 组件日志的输出节点。

也就是说 app-console-area 组件中打印的内容是由 app-progress-bar 组件解析和传递的。

取消订阅

我们知道 RxJS Observable 实际上就是一个函数,它接收一个 Observer 对象作为参数,返回一个函数用来取消订阅。所以我们可以在 app-progress-bar 组件销毁时,调用 unsubscribe() 方法取消订阅。

ngOnDestroy() {  this.topicSubscription.unsubscribe();}

本文主要目的是是结合案例展现 STOMP 协议的使用场景,所以不会着重介绍案例上的功能以及实现细节。

「记一次」系列文章:

  • 记一次 Vue 组件设计及 computed 动态引入组件
  • 记一次 <keep-alive> 缓存及其缓存优化原理

相关文献:

  • STOMP over WebSocket http://jmesnil.net/stomp-webs…
  • ng2-stompjs – https://github.com/stomp-js/n…
  • RxJS – https://rxjs-dev.firebaseapp….
  • ng-zorro-antd – https://ng.ant.design/docs/in…
正文完
 0