关于go:用-Go-写一个简单消息队列一定义消息和基础工具

39次阅读

共计 1667 个字符,预计需要花费 5 分钟才能阅读完成。

概述

音讯队列置信大家都不生疏,平时在一些须要解耦、高并发的场景下常常能看见它们的身影,Kafka、RabbitMQ 这些罕用的音讯队列当初甚至曾经成为后端程序员的必须技能了。那么一个音讯队列的根底性能有哪些,是如何实现的?当初咱们就用 Go 语言来实现一个简略的单机版音讯队列,借以理解音讯队列的原理。

这个音讯队列的实现参考了 Go 语言中最受欢迎的音讯队列 nsq 的实现,取名就叫 smq(simple message queue),代码曾经上传到 Github 上:https://github.com/yhao1206/SMQ。为了尽可能简洁明了,这个音讯队列的性能都很根底。因为工夫紧迫,加上自己也是 Go 语言的菜鸟,程度无限,有什么谬误或是倡议,欢送一起探讨。

次要组件

  • topic:一个 topic 就是程序公布音讯的一个逻辑键,当程序第一次公布音讯时就会创立 topic。
  • channel:channel 与消费者相干,每当一个发布者发送一条音讯到一个 topic,音讯会被复制到 topic 上面所有的 channel 上,消费者通过 channel 读取音讯。同一个 channel 能够由多个消费者同时连贯,以达到负载平衡的成果。
  • message:s 音讯是数据流的形象,消费者能够抉择完结音讯,表明它们已被失常解决,或者从新将它们排队待到前面再进行解决。
  • smqd:smqd 是一个守护过程,负责接管,排队,投递音讯给客户端。

topic、channel、consumer 之间的关系能够参考下图:

话不多说,让咱们直入主题,首先第一步就是定义音讯。

音讯

咱们定义的音讯就是一般的字节数组,前 16 位是 uuid,用作音讯的惟一标识,前面实现和重排音讯时须要用到。前面就是音讯自身的内容,还提供了几个导出的封装办法。

message.go

package message

type Message struct {data []byte
}

func NewMessage(data []byte) *Message {
    return &Message{data: data,}
}

func (m *Message) Uuid() []byte {return m.data[:16]
}

func (m *Message) Body() []byte {return m.data[16:]
}

func (m *Message) Data() []byte {return m.data}

工具库

音讯体须要 uuid 作为惟一标识,那么咱们须要一个 uuid 生成器,咱们间接应用一个工厂,一直地朝一个 chan 中写入 uuid,代码如下:

uuid.go

package util

import (
    "crypto/rand"
    "fmt"
    "io"
    "log"
)

var UuidChan = make(chan []byte, 1000)

func UuidFactory() {
    for {UuidChan <- uuid()
    }
}

func uuid() []byte {b := make([]byte, 16)
    _, err := io.ReadFull(rand.Reader, b)
    if err != nil {log.Fatal(err)
    }
    return b
}

func UuidToStr(b []byte) string {return fmt.Sprintf("%x-%x-%x-%x-%x", b[:4], b[4:6], b[6:8], b[8:10], b[10:])
}

另外,因为 channel 和 topic 的实现中须要大量应用 Go 语言中的管道,甚至须要两个 goroutine 之间通过同一个管道来传递信息,多以咱们当时定义好一个构造体不便后续 goroutine 之间通信:

chan_req.go

package util

type ChanReq struct {Variable interface{}
    RetChan  chan interface{}}

type ChanRet struct {
    Err      error
    Variable interface{}}

当前目录构造如下:

因为本文只是开篇,内容比较简单,只是一些“地基”,不过咱们后续会在这块地基上添砖加瓦,实现一个可用的音讯队列。

正文完
 0