起源:如何疾速构建牢靠的分布式 IM 聊天零碎
tlnetim 聊天我的项目是一个分布式 im demo,基于 tlnet http 框架和 tldb 数据库。tldb 是一个高性能的分布式数据库,基于 tldb 能够疾速构建分布式系统。
tlnetim 的开源程序:
- https://github.com/donnie4w/tlnetim
- https://gitee.com/donnie4w/tlnetim
tlnetim 次要的性能:多聊天室多人聊天零碎,程度扩大多服务器部署构建分布式
im.tlnet.top 与 im2.tlnet.top 是分布式系统的两个不同的服务
用户能够连贯任意一个服务器相互通信
除去局部存储数据的实现,im 的逻辑代码理论只有几十行,基于 tldb mq 代码实现分布式的代码也只有几十行。
room, ok := wsmap.Get(ws)
if !ok {
if wa.ATYPE == LOGIN {if iu, ok := getUserInfo(wa.MSG); ok {room = strings.TrimSpace(wa.ROOM)
store(ws, iu, room)
// 记录登录日志
orm.Insert(&ImLog{UserId: iu.Id, Room: room, Time: TimeNow()})
ws.Send(wsack{ATYPE: wa.ATYPE, USERNAME: iu.Name, ICON: iu.Icon, TIME: TimeNow(), ROOM: room}.toJson())
immq.PubId(room, iu.Id)
// 返回好友列表
if *UseRobot {ws.Send(wsack{ATYPE: FRIEND, USERNAME: robot.Name, ICON: robot.Icon, LABEL: robot.Label}.toJson())
}
broadcastToSelf(&wsack{ATYPE: FRIEND}, ws, room)
// 告诉好友
broadcast(&wsack{ATYPE: FRIEND, USERNAME: iu.Name, TIME: TimeNow(), ICON: iu.Icon}, ws, room, true, true)
// 返回聊天室 最新 N 条数据
if id, _ := orm.SelectIdByIdx[ImMessage]("Room", room); id > 0 {
startid := id - 20
if startid < 0 {startid = 0}
if ims, _ := orm.SelectByIdxLimit[ImMessage](startid, 21, "Room", room); ims != nil {
for _, im := range ims {
var u *ImUser
if im.UserId > 1<<60 {u = robot} else {u, _ = orm.SelectById[ImUser](im.UserId)
}
if u != nil {ws.Send(wsack{ATYPE: MSG, USERNAME: u.Name, ICON: u.Icon, MSG: im.Content, TIME: im.Time}.toJson())
}
}
}
}
} else {ws.Send(wsack{ATYPE: NOPASS}.toJson())
}
}
} else if wa.ATYPE == MSG {iu, _ := getIu(room, ws)
t := TimeNow()
// 保留聊天信息
if _, err := orm.Insert(&ImMessage{UserId: iu.Id, Content: wa.MSG, Time: t, Room: room}); err == nil {
// 发送聊天数据
broadcast(&wsack{ATYPE: MSG, USERNAME: iu.Name, MSG: wa.MSG, TIME: t, ICON: iu.Icon}, nil, room, true, false)
}
}
tlnet 将服务器的 websocket 封装为 三个阶段:
- 关上连贯阶段:OnOpen
- 读到信息阶段:WS.Read()
-
链接敞开或出错阶段:OnError
这些封装,让 websocket 的用法与应用一般的 http 服务基本一致:wc = &tlnet.WebsocketConfig{} //websocket 断开时,触发 OnError。删除 wsmap 中的连贯 wc.OnError = func(self *tlnet.Websocket) {if r, ok := wsmap.Get(self); ok {if u, ok := getIu(r, self); ok { // 掉线告诉 broadcast(&wsack{ATYPE: LOGOUT, USERNAME: u.Name}, nil, r, true, true) } } } //wc.OnOpen 用在连贯胜利时调用 //hc.WS.Read() 读取 websocket 接管的数据 var wa wsack if err := json.Unmarshal(hc.WS.Read(), &wa); err == nil {parse(wa, hc.WS) // 解析并解决信息 }
基于 tldb 的 MQ 音讯订阅公布,简洁地实现分布式构建
this.mq = cli.NewMqClient("ws://127.0.0.1:5000", "mymq=123") //mq 服务器地址与用户名明码 if err := this.mq.Connect(); err != nil { //mq.Connect() 连贯服务器 panic("mq connect err:" + err.Error()) } this.mq.MergeOn(1) // 设置服务器信息聚合发送到客户端,1 示意数据包大小下限为 1MB this.mq.Sub("immsg") // 订阅 topic:immsg this.mq.Sub("id") // 订阅 topic:id this.mq.Sub(fmt.Sprint(nodeId)) // 订阅本节点信息 // 解决订阅信息,接管公布函数 PubMem()发送的数据, 不存储信息 this.mq.PubMemHandler(func(jmb *JMqBean) {defer MyRecover() var ms mqws json.Unmarshal([]byte(jmb.Msg), &ms) switch jmb.Topic { case "immsg": if ms.NodeId != nodeId {broadcast(ms.Wa[0], nil, ms.Room, false, false) } case "id": if m, ok := roomap.Get(ms.Room); ok {wss := make([]*wsack, 0) m.Range(func(_ *tlnet.Websocket, vu *ImUser) bool { if ms.UserId != vu.Id {wss = append(wss, &wsack{ATYPE: FRIEND, USERNAME: vu.Name, ICON: vu.Icon, LABEL: vu.Label}) } return true }) immq.PubInfo(ms.NodeId, ms.UserId, ms.Room, wss) } case fmt.Sprint(nodeId): if k, ok := wamap.Get(ms.UserId); ok { for _, v := range ms.Wa {k.Range(func(w *tlnet.Websocket, _ int8) bool {w.Send(v.toJson()) return true }) } } } }) // 解决订阅信息,这里应用 json 格局,接管公布函数 PubJson()发送的数据,也能够应用 PubByteHandler()对应 PubByte() this.mq.PubJsonHandler(func(jmb *JMqBean) {defer MyRecover() var ms mqws json.Unmarshal([]byte(jmb.Msg), &ms) switch jmb.Topic { case "immsg": if ms.NodeId != nodeId {broadcast(ms.Wa[0], nil, ms.Room, false, false) } } })
告诉好友函数:
broadcast(&wsack{ATYPE: FRIEND, USERNAME: iu.Name, TIME: TimeNow(), ICON: iu.Icon}, ws, room, true, true)
broadcast 理论是 tlnetim 实现的一个音讯路由性能,将状态(上线,下线等),收回信息等进行播送,其中包含路由给其余分布式节点。tlnetim 在发送给其余节点中的实现采纳 tldb MQ 的订阅公布,次要用 PubMem 与 PubJson,这两个公布函数也是有区别的,PubMem 不存储公布的信息,个别用于状态信息的公布,比方登录信息,下线信息等。PubJson 公布的信息会记录在 tldb 中,保障信息不失落。
针对在并发量比拟大的分布式系统。tldb MQ 对音讯解决提供一些解决方案
this.mq.MergeOn(1) // 服务器信息聚合发送到客户端,1 示意数据包大小下限为 1MB this.mq.SetZlib(true) // 服务器消息压缩
MQ 提供音讯的可靠性保障:客户端信息回执,拉取信息等:
this.mq.RecvAckOn(10) // 音讯回执与重发工夫的设置,10 示意如果客户端 10 内不回执服务器信息,则服务器会再次发送信息 this.mq.PullJsonSync("immsg",1) // 拉取 topic:immsg id 为 1 的信息 json 格局 this.mq.PullByteSync("immsg",1) // 拉取 topic:immsg id 为 1 的信息 二进制格局 this.mq.PullIdSync("immsg") // 拉取 topic:immsg 的最大信息 id
MQ 反对各个客户端对 MergeOn,SetZlib,RecvAckOn 这些性能 依据不同理论状况各自设定。
SetZlib 是用 zlib 压缩音讯发送,在大量应用 zlib 压缩时,服务器会耗费大量内存,所以并非每个音讯都适宜压缩发送,应该依据理论状况,如果音讯体比拟大,或采纳聚合信息,也就是多少个音讯聚合发送,总数据比拟大,这时压缩信息会有比拟好的成果,压缩比例较大,达到减小传输工夫,进步吞掉量的成果。相同,如果音讯体自身曾经较小,压缩成果不佳,这时压缩音讯反而减少服务器压力。
RecvAckOn 是音讯回执,保障音讯不失落。MQ 服务器给节点推送信息时,节点会发 Ack 给 MQ 告知音讯已收到,否则,当 MQ 服务器没有收到 Ack 时,会一直给节点推送信息;相似这样的性能,不论是 tldb MQ 还是其余 MQ,节点的状态应该实时监控,如果节点压力过大,比方,某个服务节点 cpu 被打满了,此时,节点可能无奈回复 ack 给 mq 服务器,mq 服务器因为收不到回执,会呈现大量信息积压。
不开启 RecvAckOn 的状况下,tldb mq 也提供了反对客户端信息不失落的办法。如果不开启 RecvAckOn,对于同一信息,mq 服务器只给节点发送一次。如果节点狐疑服务器信息没有达到,即信息丢了(须要客户端依据理论业务状况实现一个狐疑发现策略),能够通过拉取 id 函数 PullIdSync 拉取订阅主题的最大 id,与本地的 id 比拟,来判断本地是否有信息未读取到,通过拉取信息函数 PullJsonSync 或 PullByteSync 将 mq 服务器的信息拉取到本地。
即时通讯零碎,依据不同业务要求,有不同的实现。tlnetim 聊天 IM 在协定方面只是简略设计,只是 demo,不实用简单或残缺的 IM 零碎。残缺的 IM 聊天协定能够参考 xmpp 即时通讯协定。
有任何问题或倡议请 Email:donnie4w@gmail.com 或 http://tlnet.top/contact 发信给我,谢谢!