gws 是一款由 golang
开发的高性能 websocket
库, 提供 websocket event api
:
type Event interface {OnOpen(socket *Conn)
OnError(socket *Conn, err error)
OnClose(socket *Conn, code uint16, reason []byte)
OnPing(socket *Conn, payload []byte)
OnPong(socket *Conn, payload []byte)
OnMessage(socket *Conn, message *Message)
}
反对大部分 RFC
规范:
- 接管分片音讯
- 发送敞开帧
- ping/pong
- deflate 数据压缩
- 应用 bufio 读写音讯
得益于高效的协定解析器, 相比其余库 gws
领有更高的 IOPS
, 更低的提早和 CPU
占用率. 然而因为 bufio
的应用, 内存占用会高一些.
$ tcpkali -c 1000 --connect-rate 500 -r 1000 -T 300s -f assets/1K.txt --ws 127.0.0.1:${port}/connect
个性方面, 次要有:
- 无依赖
- 反对连贯多路复用, 在一个连贯上并行处理多条音讯, 管制好了并发下限
- 反对异步非阻塞写入, 且不减少常驻协程
每个连贯上都有两个工作队列, 一个读一个写, 它们十分轻量, 并行度别离是 8(默认值) 和 1. 读队列用来并行处理申请, 写队列用来解决异步写入, 非常适合音讯播送场景. workerQueue
没有应用 channel
, 它依赖递归进行任务调度, 所有工作执行结束协程便会退出.
type (
workerQueue struct {
mu sync.Mutex // 锁
q []asyncJob // 工作队列
maxConcurrency int32 // 最大并发
curConcurrency int32 // 以后并发
capacity int // 容量
}
asyncJob func())
// 获取一个工作
func (c *workerQueue) getJob(delta int32) asyncJob {c.mu.Lock()
defer c.mu.Unlock()
c.curConcurrency += delta
if c.curConcurrency >= c.maxConcurrency {return nil}
if n := len(c.q); n == 0 {return nil}
var result = c.q[0]
c.q = c.q[1:]
c.curConcurrency++
return result
}
// 递归地执行工作
func (c *workerQueue) do(job asyncJob) {job()
if nextJob := c.getJob(-1); nextJob != nil {c.do(nextJob)
}
}
// Push 追加工作, 有资源闲暇的话会立刻执行
func (c *workerQueue) Push(job asyncJob) error {c.mu.Lock()
if n := len(c.q); n >= c.capacity {c.mu.Unlock()
return internal.ErrAsyncIOCapFull
} else {c.q = append(c.q, job)
c.mu.Unlock()}
if item := c.getJob(0); item != nil {go c.do(item)
}
return nil
}