引言中后台仪表盘是一个非常复杂,特别是当需要全面屏运用时,数据的实时性需求非常高。WebSocket 不管在什么环境中使用其实都是非常简单,各现代浏览器实现标准都很统一,而且接口也足够简单。即便是在 Angular 也是如此,只需要简单几行代码就能使用 WebSocket。const ws = new WebSocket(‘wss://echo.websocket.org’);ws.onmessage = (e) => { console.log(‘message’, e);}若需要向服务端发送消息,则:ws.send(content
);在 Angular 里绝大多数的人都会根据上述代码进一步拓展,比如统一消息解析、错误处理、多路复用等,并最终将其封装成一个服务类。事实上,RxJS 也包裹了一个 WebSocket Subject,位于 rxjs/websocket。如何使用假如将上面的示例使用 RxJS 来写,则:import { webSocket, WebSocketSubject } from ‘rxjs/webSocket’;const ws = webSocket(‘wss://echo.websocket.org’);ws.subscribe(res => { console.log(‘message’, res);});ws.next(content
);webSocket 是一个工厂函数,所生产出来的 WebSocketSubject 对象可被多次订阅,若未订阅或取消最后一个订阅时都会导致 WebSocket 连接中断,当再一次订阅时会重新自动连接。WebSocketSubjectConfigwebSocket 除了接收字符串(WebSocket服务远程地址)外,还允许指定更复杂的配置项。默认情况下,消息是使用 JSON.parse 和 JSON.stringify 对消息格式序列化和反序列化操作,所以不管消息发送或接收都以 JSON 为准,可通过 serializer、deserializer 属性来改变。若需要关心 WebSocket 什么时候开始或结束(closeObserver),则:const open$ = new Subject();const ws = webSocket({ url: ‘wss://echo.websocket.org’, openObserver: open$});// 订阅打开事件open$.subscribe(() => {});消息WebSocketSubject 也是 Subject 的变体之一,因此订阅它表示接收消息,反之则利用 next、complete、error 来维护消息的推送。使用 next 来发送消息使用 complete 会尝试检测是否最后一个订阅,若是将会关闭连接使用 error 相当于原始 close 方法且必须提供 { code: number, reason?: string} 参数,注意 code 务必遵守取值范围可被重放调用 next 发送消息时若 WebSocket 连接中断(例如:没人订阅时),消息会被缓存当下一次重新连接以后会按顺序发送。这对于异步世界里非常方便,我们只需要确保 Angular 启动前初始化好 WebSocket 不管什么时候订阅接收消息,都可以随时发送也无须等待。事实上这一点是 RxJS WebSocket 默认情况下是通过 webSocket 所生产的 WebSocketSubject 其本质上是 ReplaySubject 的“重放”能力。当然你可以通过 webSocket 的第二个参数改变这种行为。多路复用一般来说我们不太可能只会一个 Web Socket 服务完成所有的事,然而也不太可能针对每一个业务实例创建一个 webSocket。往往我们会增加一层网关并将这些业务 WebSocket 进行汇总,对于前端始终只需要一个连接,这就是多路复用存在的意义。而核心是必须要让后端知道,什么时候发送什么消息给什么样的服务。首先必须先使用 multiplex 方法来创建 Observable 以便订阅某一路消息,它有三个参数来帮助我们区分消息:subMsg 告知正在订阅哪一路消息unsubMsg 告知取消订阅哪一路消息messageFilter 过滤消息,使订阅者只接收哪一路消息const ws = webSocket(‘wss://echo.websocket.org’);const user$ = this.ws.multiplex( () => ({ type: ‘subscribe’, tag: ‘user’ }), () => ({ type: ‘unsubscribe’, tag: ‘user’ }), message => message.type === ‘user’);user$.subscribe(message => console.log(message));const todo$ = this.ws.multiplex( () => ({ type: ‘subscribe’, tag: ’todo’ }), () => ({ type: ‘unsubscribe’, tag: ’todo’ }), message => message.type === ’todo’);todo$.subscribe(message => console.log(message));user$ 流和 todo$ 流他们共用一个 WebSocket 连接,这便是多路复用。虽然订阅是通过 multiplex 创建的,然后消息的推送依然还是需要使用 ws.next()。总结这原本是对内部一个简单培训,然而我发现竟然极少人会讨论 RxJS 里面 Web Socket 的实现。其实一直有想着要给 ng-alain 内置 WebSocket,只是就封装角度来讲完全没有价值,因为已经足够优雅。