概述

音讯队列置信大家都不生疏,平时在一些须要解耦、高并发的场景下常常能看见它们的身影,Kafka、RabbitMQ 这些罕用的音讯队列当初甚至曾经成为后端程序员的必须技能了。那么一个音讯队列的根底性能有哪些,是如何实现的?当初咱们就用 Go 语言来实现一个简略的单机版音讯队列,借以理解音讯队列的原理。

这个音讯队列的实现参考了 Go 语言中最受欢迎的音讯队列 nsq 的实现,取名就叫 smq (simple message queue),代码曾经上传到 Github 上:https://github.com/yhao1206/SMQ。为了尽可能简洁明了,这个音讯队列的性能都很根底。因为工夫紧迫,加上自己也是 Go 语言的菜鸟,程度无限,有什么谬误或是倡议,欢送一起探讨。

次要组件

  • topic:一个 topic 就是程序公布音讯的一个逻辑键,当程序第一次公布音讯时就会创立 topic。
  • channel:channel 与消费者相干,每当一个发布者发送一条音讯到一个 topic,音讯会被复制到 topic 上面所有的 channel 上,消费者通过 channel 读取音讯。同一个 channel 能够由多个消费者同时连贯,以达到负载平衡的成果。
  • message:s音讯是数据流的形象,消费者能够抉择完结音讯,表明它们已被失常解决,或者从新将它们排队待到前面再进行解决。
  • smqd:smqd 是一个守护过程,负责接管,排队,投递音讯给客户端。

topic、channel、consumer 之间的关系能够参考下图:

话不多说,让咱们直入主题,首先第一步就是定义音讯。

音讯

咱们定义的音讯就是一般的字节数组,前 16 位是 uuid,用作音讯的惟一标识,前面实现和重排音讯时须要用到。前面就是音讯自身的内容,还提供了几个导出的封装办法。

message.go

package messagetype Message struct {    data []byte}func NewMessage(data []byte) *Message {    return &Message{        data: data,    }}func (m *Message) Uuid() []byte {    return m.data[:16]}func (m *Message) Body() []byte {    return m.data[16:]}func (m *Message) Data() []byte {    return m.data}

工具库

音讯体须要 uuid 作为惟一标识,那么咱们须要一个 uuid 生成器,咱们间接应用一个工厂,一直地朝一个 chan 中写入uuid,代码如下:

uuid.go

package utilimport (    "crypto/rand"    "fmt"    "io"    "log")var UuidChan = make(chan []byte, 1000)func UuidFactory() {    for {        UuidChan <- uuid()    }}func uuid() []byte {    b := make([]byte, 16)    _, err := io.ReadFull(rand.Reader, b)    if err != nil {        log.Fatal(err)    }    return b}func UuidToStr(b []byte) string {    return fmt.Sprintf("%x-%x-%x-%x-%x", b[:4], b[4:6], b[6:8], b[8:10], b[10:])}

另外,因为 channel 和 topic 的实现中须要大量应用 Go 语言中的管道,甚至须要两个 goroutine 之间通过同一个管道来传递信息,多以咱们当时定义好一个构造体不便后续 goroutine 之间通信:

chan_req.go

package utiltype ChanReq struct {    Variable interface{}    RetChan  chan interface{}}type ChanRet struct {    Err      error    Variable interface{}}

当前目录构造如下:

因为本文只是开篇,内容比较简单,只是一些“地基”,不过咱们后续会在这块地基上添砖加瓦,实现一个可用的音讯队列。