乐趣区

基于websocket的简单广播系统

在年初的时候,我们有点儿小迷茫,于是也跟风去做了一些轻娱乐类的小游戏。那时为了实战对战,想到需要一个实时性很强的技术实现,于是我去实现了一个 websocket server, 没想到后来这些小程序没有成,但是我们的这个 web socket server 演化得无处不在。下面介绍一下这个技术实现。
看理论肯定会有点拗口是不是,我们直接上代码就得了。我们现在假设有这么一个用户付款的逻辑,在写用户付款事件时,我们事先并不知道以后还需要加什么逻辑,于是我们先把这个行为广播出去。以下是伪代码:
req := httplib.Post(“https://ws.app.12zan.net/eventcast/user/5905e89db43fec42e3055df05ff72afe”)
text, er := zanjson.Encode(order)
if er != nil {
log.Println(ev)
return
}
req.Param(“data”, string(text))
resp,_ = req.Response()
好了,现在,每当有用户付款时,这个用户系统都会往 /eventcast/user/5905e89db43fec42e3055df05ff72afe 这个频道广播一条消息。但是很遗憾,目前没有客户端订阅这类消息,所有的消息都被丢弃了。
有一天,我们英明神武的老板决定要加一个通知,每当有一个新的用户付款时,都给公司的同胞们发一个邮件通知一下,我们获得了新的付费用户,好让大家小开心一把,尤其是第一个试用客户付费的时候,我们肯定都要开心地跳起来。这时我们如果去改线上运行好的付款系统,还是有点儿风险的,一旦有修改,我们就得走一下测试流程,不然万一有问题不是影响公司发财了吗。没关系,我们之前不是已经把付款事件广播出来了吗,我们现在用起来。写这么一段 js, 线上运行起来,就好了。
const webSocket = require(‘ws’);
let ws = new webSocket(“wss://ws.app.12zan.net/eventcast/user/5905e89db43fec42e3055df05ff72afe”);
ws.on(‘open’, function open() {
console.log(“connected”);
});
ws.on(‘message’, function incoming(data) {
let user = JSON.parse(data);
Mail.send(“ 一个叫 ”+user.name+” 的好心人支付了 ”+user.amount+” 元,让主赞美他!”);
});

好了,现在一旦有人付款,我们全公司都能收到一个邮件,及时得到这一好消息了。让我们小小地庆祝一下吧。
接下来又过了几天,我们想改进一下体验,用户一旦付款成功,就发送一条短信,告知用户他的有效期和我们的 24 小时客服电话;只需要这么一段代码部署起来运行就好了,之前的任何代码都不用动:
const webSocket = require(‘ws’);
let ws = new webSocket(“wss://ws.app.12zan.net/eventcast/user/5905e89db43fec42e3055df05ff72afe”);
ws.on(‘open’, function open() {
console.log(“connected”);
});
ws.on(‘message’, function incoming(data) {
let user = JSON.parse(data);
let expiresAt = (zan.Date.now().add(“+365 day”).format(“YYYY-mm-dd”));
SMS.send(user.Mobile,” 尊敬的 ”+user.name+”,您成功购买了十二赞旗舰版,有效期至 ”+expiresAt+”, 请登陆:https://www.12zan.cn 查看,如有任何疑问, 欢迎致电 4006681102″);
});
发送通知邮件和发送告知短信,都基于用户付款动作,但是发邮件和发短信的代码完全隔离,相互之间出完全不知道对方的存在。
是不是很赞?那我们接下来梳理一下逻辑。
概念及主要逻辑
也许我们来不及去翻看 websocket 的定义,但是我们可以简单地理解,Websocket 是对 HTTP 协议的一个扩展升级,在发起连接时,HTTP 部分都是有效的,只是连接成功以后,服务端和客户端的连接不断,双方可以双向数据传输,且服务端可以主动向客户端推送数据。
我们看一次 Websocket 发起连接的过程(来自维基百科):
客户端向服务端发起连接:
GET / HTTP/1.1
Upgrade: websocket
Connection: Upgrade
Host: example.com
Origin: http://example.com
Sec-WebSocket-Key: sN9cRrP/n9NdMgdcy2VJFQ==
Sec-WebSocket-Version: 13
服务端的返回:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: fFBooB7FAkLlXgRSz0BT3v4hq5s=
Sec-WebSocket-Location: ws://example.com/
在 HTTP 协议中常见的字段,如 Cookies,Host 等,依然有效。
但是具体到我们的应用上,十二赞的这个 websocket server 实现了两个小目标【多遗憾了,并没有赚到两个亿】:

我们实现的是一个广播系统,一个广播系统意味着一个地方去发送数据,n 多个接受端来接受数据。要支持非常多的客户端同时连上数据来实时接受数据。我们最终的 server 端的实现,全内存实现,没有用 redis 或是 MySQL 类似的数据库,就是为了实现超多客户端的支持。
我们希望采用最简单、最通用的文案,并且,非常高效,支持非常多的客户端同时连接,我们认为 http 协议更简单,所以在发送的时候,我们是走 http 协议来发送数据的。并且,没有任何安全上的设计,如果数据很重要,请自行加密之后发送。

当然我们也有一些遗憾:

允许数据丢失。有得必有失,我们允许一个比例的信息丢失。产生数据丢失时,不影响主逻辑。就像刚才的例子,发送邮件通知我们有新付款的这个事件没有触发并没有关系,我们到下午才发现有新用户付款,这时再去开香槟也不迟:(。
容忍时序错乱。像刚才的例子,有新用户付款时,是先告诉我们全体同事有新付款,还是先给用户发送一条短信,并不那么重要。

好了,回到我们的系统,我们给一点点总结。
我们定义,每个 websocket 的入口,都是一个 URL; 去掉协议和 HOST 部分, 剩下的 PATH 部分代表了不同的频道。比如,发起 websocket 时连接到 ws://ws.app.12zan.net/channel/hello, 那么这个频道地址就是 /channel/hello; 所有连接到 ws.app.12zan.net/channel/hello 的 websocket 客户端,他们会收到一模一样的消息,我们称之为订阅。
同时,为了简化发起数据的过程,我们还在 websocket server 中定义:当一个 http 的客户端,以 POST 方式请求某一个地址时,我们截取 URL 中的 PATH 部分,得到频道名,并取 POST 的数据中的 data 域,作为要广播的数据,将之广播到相应的频道。
在十二赞的应用:
这个广播系统,在十二赞的整个技术架构中,后来应用的特别广。比如,我们的部署系统 zeus, 在网页端实现了一个客户端,当服务端有应用重启、关闭、启动时,都会弹出消息通知。任何在打开了这个系统的网页的人都能看到。比如我和同事小王都正在 zeus 的网页上,我新建了一个 search 系统的一个节点,启动完毕的时候,我和小王会收到通知,在第三号服务器上新启了一个 search 系统的节点。我在操作,很关心这个,所心这时我可以放心去继续我的工作。小王正要在三号机器上新部署一个系统,他收到这个通知后,觉得这个机器可能会很忙,于是把自己的新实例部署在了四号机器上。
再比如,我们的日志服务器,担负着收集所有服务器上日志的使命。但是如果它挂掉了呢?于是我们在这个日志服务器上跑了一个定时器,每 5 秒钟向某个频道广播一条心跳消息,告诉世界自己还活着。然后另行跑了一个进程,收听这个频道的广播,如果连续 30 秒没有收到这个心跳包,证明这个日志服务器挂掉了,就发一条报警短信,通知同学去看看这个服务。
再比如,我们在日志服务上的应用,参见这里: 十二赞日志系统简介

退出移动版