关于json:Go-语言实现-WebSocket-推送

5次阅读

共计 3539 个字符,预计需要花费 9 分钟才能阅读完成。

作者:周慧婷

写在后面

零碎开发的过程中,咱们常常须要实现音讯推送的需要。单端单实例的状况下很好解决(网上有许多教程这里不做开展),但在分布式系统及多端须要推送的状况下该如何解决呢?

在分布式系统中,音讯推送服务端是多实例的。某零碎中一个服务生成一条音讯,这条音讯须要实时推送到多个终端,此时该如何进行无效的 WebSocket 推送呢?首先一起看看如下场景:

假如推送音讯由音讯实例 2 产生,然而终端真正连贯的音讯实例是实例 1 和实例 3,并没有连贯到生产音讯的实例 2,零碎是如何将实例 2 的音讯同步推送到终端 1 和终端 2 的呢?下文将详细描述。

基本原理

为了满足需要咱们采纳 redis 做协同中间件,用于存储用户信息、生成用户连贯的唯一性标识以及 pod address,音讯的生产者实例通过订阅 redis 获取终端连贯的唯一性标识和 pod address,并告诉到对应的音讯实例,最终由相应连贯终端的音讯实例通过 WebSocket 将音讯发推送到用户终端。具体流程如下图:

服务端实现

Client

Client 组件的作用,是当用户与音讯服务中某个实例建设连贯后,治理这个连贯的信息,这里通过一个 Golang 构造体来定义:

type Client struct {
    UUID   string 
    UserID string
    Socket *websocket.Conn
    Send   chan []byte}

构造体中的数据类型阐明如下:

  • UUID:对连贯进行唯一性的标识,通过此标识能够查找到连贯信息。
  • UserID:用户 ID。
  • Socket:连贯对象。
  • Send:音讯数据 channel。

咱们为 Client 构造体实现了两个办法:Read、Write 来解决音讯的承受和发送。

Read 办法

Read 办法比较简单,从终端接管申请音讯后,音讯实例通过 WebSocket 回应接管音讯状态,并不返回申请后果。后果通过 Write 办法返回。

func (c *Client) Read(close, renewal chan *Client) {defer func() {close <- c}()

for {_, message, err := c.Socket.ReadMessage()
    if err != nil {break}
  // ...
     // message logic
}
}

Write 办法

Write 办法将申请后果返回给终端。Client 会监听 send channel,当 channel 有数据时,通过 socket 连贯将音讯发送给终端。

func (c *Client) Write(close chan *Client) {
    for {
        select {
        case message, ok := <-c.Send:
            if !ok {return}
            c.Socket.WriteMessage(websocket.TextMessage, message)
        case <-c.Ctx.Done():
      return
        }
    }
}

ClientManger

ClientManager 组件相当于连接池,能够治理所有的终端连贯,并提供注册、登记、续期性能。

type ClientManager struct {
    sync.RWMutex
    Clients    map[string]*Client 
    Register   chan *Client
    Unregister chan *Client
    Renewal    chan *Client
}

构造体的数据类型阐明如下:

  • Clients:是一个汇合,用于存储创立的 Client 对象。
  • Register:注册的 channel。

    • 把连贯注册到 Clients 中,并通过 key-value 退出 Client 汇合中,key 是连贯的唯一性标识,value 是连贯自身。
    • 把连贯的唯一性标识和用户的 ID 以及建设连贯的 pod address 信息,存储到 redis 中。
  • Unregister:登记的 channel。

    • 从 ClientManager 组件的 Clients 汇合中移除连贯对象。
    • 删除 redis 对应的缓存信息。
  • Renewal:续期的 channel,用于对 redis 的键续期。

ClientManager 只提供了一个 Start 办法,Start 办法提供监听注册、登记以及续期的 channel,通过监听这些 channel 来治理创立的连贯对象。当这些 channel 有数据时,执行对应的操作。

func (manager *ClientManager) Start(ctx context.Context) {
    for {
        select {
        case conn := <-manager.Register:
            manager.Lock()
            manager.Clients[conn.UUID] = conn
            manager.Unlock()
            _, err := manager.affair.Register(ctx, &RegisterReq{
                UserID: conn.UserID,
                UUID:   conn.UUID,
                IP:     manager.IP,
            })
        case conn := <-manager.Unregister:
            _, err := manager.affair.Unregister(ctx, &UnregisterReq{
                UserID: conn.UserID,
                UUID:   conn.UUID,
            })
            conn.Socket.Close()
            close(conn.Send)
            delete(manager.Clients, conn.UUID)
        case conn := <-manager.Renewal:
                    //...
            // Key renewal to redis
        }
    }
}

音讯推送

当一个音讯服务实例生产用户的音讯,须要推送音讯给终端时,推送步骤如下:

  1. 依据 userID 从 redis 读取数据,失去连贯唯一性标识和 pod address 地址,这些信息是在终端第一次与服务端建设连贯的时候写入 redis 的。
  2. 此时依据 pod address,向对应的服务器发送申请。
  3. 相应的音讯服务实例接管到申请。

服务端接管申请的解决逻辑如下:

  1. 依据传递过去连贯唯一性标识的参数,找到标识对应的连贯。咱们为 ClientManager 提供了一个 Write 办法。
func (manager *ClientManager) Write(message *Message) error {manager.RLock()
  client, ok := manager.Clients[message.Recipient]
  manager.RUnlock()
  if !ok {return errors.New("client miss [" + message.Recipient + "]")
  }
  return client.SendOut(message)
}

此办法用到 ClientManager 组件的 Clients 汇合,依据唯一性标识找到对应的 Client。再利用 Client 的 SendOut 办法,写出数据到终端。
2. 定义 Client 的 SendOut 办法。此办法只负责:把接管到的音讯转换为字节数组后,发送 Client 的 Send Channel 中。

func (c *Client) SendOut(message *Message) error {content, err := json.Marshal(message.Content)
    if err != nil {return err}
    c.Send <- content
    return nil
}
  1. 发送数据给终端。在前文介绍 Client 组件中,已阐明 Client 组件的 send channel 有数据时,会读取 channel 产生的数据,通过连贯对象发送给对应的终端。

总结

以上是 Web Socket 推送音讯给终端的次要思路:通过 redis 把用户的信息以及连贯的标识和 pod address 存储起来,当某个音讯服务实例产生音讯,从 redis 读取信息,告诉连贯着终端的音讯服务实例,再由这些服务实例通过 WebSocket 对象给终端发送音讯。全象云低代码平台也集成了音讯的实时推送,用户应用平台时能及时获取最新消息状态。

下期咱们将为大家带来 Knative Serving 自定义弹性伸缩,请大家继续关注。

对于全象云

全象云平台(https://portal.clouden.io)是青云科技自主研发的低代码平台,是基于云原生、用于辅助构建企业各类数字化利用的工具和集成平台。

平台目前提供云上无代码和低代码两种利用开发模式,屏蔽了技术的复杂度。反对可视化设计器,让开发人员和业务用户可能通过简略的拖拽、参数配置等形式疾速实现利用开发。同时集成了 IDaaS 身份认证能力、容器 DevOps 能力,反对企业存量业务与全象云业务交融。平台还蕴含丰盛的开发接口和弱小的插件机制,开发者可依据须要一直拓展平台的利用能力。

全象云的愿景是:在企业生产经营的各个象限、各个环节提供软件构件或反对服务。

本文由博客一文多发平台 OpenWrite 公布!

正文完
 0