共计 1667 个字符,预计需要花费 5 分钟才能阅读完成。
概述
音讯队列置信大家都不生疏,平时在一些须要解耦、高并发的场景下常常能看见它们的身影,Kafka、RabbitMQ 这些罕用的音讯队列当初甚至曾经成为后端程序员的必须技能了。那么一个音讯队列的根底性能有哪些,是如何实现的?当初咱们就用 Go 语言来实现一个简略的单机版音讯队列,借以理解音讯队列的原理。
这个音讯队列的实现参考了 Go 语言中最受欢迎的音讯队列 nsq 的实现,取名就叫 smq(simple message queue),代码曾经上传到 Github 上:https://github.com/yhao1206/SMQ。为了尽可能简洁明了,这个音讯队列的性能都很根底。因为工夫紧迫,加上自己也是 Go 语言的菜鸟,程度无限,有什么谬误或是倡议,欢送一起探讨。
次要组件
- topic:一个 topic 就是程序公布音讯的一个逻辑键,当程序第一次公布音讯时就会创立 topic。
- channel:channel 与消费者相干,每当一个发布者发送一条音讯到一个 topic,音讯会被复制到 topic 上面所有的 channel 上,消费者通过 channel 读取音讯。同一个 channel 能够由多个消费者同时连贯,以达到负载平衡的成果。
- message:s 音讯是数据流的形象,消费者能够抉择完结音讯,表明它们已被失常解决,或者从新将它们排队待到前面再进行解决。
- smqd:smqd 是一个守护过程,负责接管,排队,投递音讯给客户端。
topic、channel、consumer 之间的关系能够参考下图:
话不多说,让咱们直入主题,首先第一步就是定义音讯。
音讯
咱们定义的音讯就是一般的字节数组,前 16 位是 uuid,用作音讯的惟一标识,前面实现和重排音讯时须要用到。前面就是音讯自身的内容,还提供了几个导出的封装办法。
message.go
package message
type Message struct {data []byte
}
func NewMessage(data []byte) *Message {
return &Message{data: data,}
}
func (m *Message) Uuid() []byte {return m.data[:16]
}
func (m *Message) Body() []byte {return m.data[16:]
}
func (m *Message) Data() []byte {return m.data}
工具库
音讯体须要 uuid 作为惟一标识,那么咱们须要一个 uuid 生成器,咱们间接应用一个工厂,一直地朝一个 chan 中写入 uuid,代码如下:
uuid.go
package util
import (
"crypto/rand"
"fmt"
"io"
"log"
)
var UuidChan = make(chan []byte, 1000)
func UuidFactory() {
for {UuidChan <- uuid()
}
}
func uuid() []byte {b := make([]byte, 16)
_, err := io.ReadFull(rand.Reader, b)
if err != nil {log.Fatal(err)
}
return b
}
func UuidToStr(b []byte) string {return fmt.Sprintf("%x-%x-%x-%x-%x", b[:4], b[4:6], b[6:8], b[8:10], b[10:])
}
另外,因为 channel 和 topic 的实现中须要大量应用 Go 语言中的管道,甚至须要两个 goroutine 之间通过同一个管道来传递信息,多以咱们当时定义好一个构造体不便后续 goroutine 之间通信:
chan_req.go
package util
type ChanReq struct {Variable interface{}
RetChan chan interface{}}
type ChanRet struct {
Err error
Variable interface{}}
当前目录构造如下:
因为本文只是开篇,内容比较简单,只是一些“地基”,不过咱们后续会在这块地基上添砖加瓦,实现一个可用的音讯队列。