乐趣区

关于websocket:gws-高性能websocket服务器新的选择

gws 是一款由 golang 开发的高性能 websocket 库, 提供 websocket event api :

type Event interface {OnOpen(socket *Conn)
    OnError(socket *Conn, err error)
    OnClose(socket *Conn, code uint16, reason []byte)
    OnPing(socket *Conn, payload []byte)
    OnPong(socket *Conn, payload []byte)
    OnMessage(socket *Conn, message *Message)
}

反对大部分 RFC 规范:

  • 接管分片音讯
  • 发送敞开帧
  • ping/pong
  • deflate 数据压缩
  • 应用 bufio 读写音讯

得益于高效的协定解析器, 相比其余库 gws 领有更高的 IOPS , 更低的提早和 CPU 占用率. 然而因为 bufio 的应用, 内存占用会高一些.

$ tcpkali -c 1000 --connect-rate 500 -r 1000 -T 300s -f assets/1K.txt --ws 127.0.0.1:${port}/connect

个性方面, 次要有:

  1. 无依赖
  2. 反对连贯多路复用, 在一个连贯上并行处理多条音讯, 管制好了并发下限
  3. 反对异步非阻塞写入, 且不减少常驻协程

每个连贯上都有两个工作队列, 一个读一个写, 它们十分轻量, 并行度别离是 8(默认值) 和 1. 读队列用来并行处理申请, 写队列用来解决异步写入, 非常适合音讯播送场景. workerQueue 没有应用 channel , 它依赖递归进行任务调度, 所有工作执行结束协程便会退出.

type (
    workerQueue struct {
        mu             sync.Mutex // 锁
        q              []asyncJob // 工作队列
        maxConcurrency int32      // 最大并发
        curConcurrency int32      // 以后并发
        capacity       int        // 容量
    }

    asyncJob func())

// 获取一个工作
func (c *workerQueue) getJob(delta int32) asyncJob {c.mu.Lock()
    defer c.mu.Unlock()

    c.curConcurrency += delta
    if c.curConcurrency >= c.maxConcurrency {return nil}
    if n := len(c.q); n == 0 {return nil}
    var result = c.q[0]
    c.q = c.q[1:]
    c.curConcurrency++
    return result
}

// 递归地执行工作
func (c *workerQueue) do(job asyncJob) {job()
    if nextJob := c.getJob(-1); nextJob != nil {c.do(nextJob)
    }
}

// Push 追加工作, 有资源闲暇的话会立刻执行
func (c *workerQueue) Push(job asyncJob) error {c.mu.Lock()
    if n := len(c.q); n >= c.capacity {c.mu.Unlock()
        return internal.ErrAsyncIOCapFull
    } else {c.q = append(c.q, job)
        c.mu.Unlock()}
    if item := c.getJob(0); item != nil {go c.do(item)
    }
    return nil
}
退出移动版