本篇咱们实现音讯队列的收尾工作,将咱们的服务器搭建起来。咱们的构想是消费者通过 tcp 连贯到服务端,生产者则是通过 http 协定间接发送音讯。

监听消费者连贯

在 server 目录下新建一个 tcp.go 文件,用于监听消费者的 tcp 连贯,代码如下:

package serverimport (    "context"    "log"    "net")func TcpServer(ctx context.Context, addr, port string) {    fqAddress := addr + ":" + port    listener, err := net.Listen("tcp", fqAddress)    if err != nil {        panic("tcp listen(" + fqAddress + ") failed")    }    log.Printf("listening for clients on %s", fqAddress)    for {        select {        case <-ctx.Done():            return        default:            conn, err := listener.Accept()            if err != nil {                panic("accept failed: " + err.Error())            }            client := NewClient(conn, conn.RemoteAddr().String())            go client.Handle(ctx)        }    }}

每当监听到 tcp 连贯,就新建一个 client 来解决,同时传入一个 context 不便对立进行退出治理。client 和 protocol 也都要同时加上对这个 context 的监听代码,具体可参考代码仓库。

HTTP Server

同样在 server 目录下新建一个 http.go 文件来编写 HTTP 服务器,咱们的 HTTP 服务器只提供三个办法:测试连贯、写入音讯和查看所有 topic,先来看一下这三个办法的代码:

http.gopackage serverimport (    "context"    "errors"    "fmt"    "github.com/yhao1206/SMQ/message"    "github.com/yhao1206/SMQ/protocol"    "io"    "log"    "net/http"    "net/url"    "strconv"    "time")type ReqParams struct {    params url.Values    body   []byte}func NewReqParams(req *http.Request) (*ReqParams, error) {    reqParams, err := url.ParseQuery(req.URL.RawQuery)    if err != nil {        return nil, err    }    data, err := io.ReadAll(req.Body)    if err != nil {        return nil, err    }    return &ReqParams{reqParams, data}, nil}func (r *ReqParams) Query(key string) (string, error) {    keyData := r.params[key]    if len(keyData) == 0 {        return "", errors.New("key not in query params")    }    return keyData[0], nil}func pingHandler(w http.ResponseWriter, req *http.Request) {    w.Header().Set("Content-Length", "2")    io.WriteString(w, "OK")}func putHandler(w http.ResponseWriter, req *http.Request) {    reqParams, err := NewReqParams(req)    if err != nil {        log.Printf("HTTP: error - %s", err.Error())        return    }    topicName, err := reqParams.Query("topic")    if err != nil {        log.Printf("HTTP: error - %s", err.Error())        return    }    conn := &FakeConn{}    client := NewClient(conn, "HTTP")    proto := &protocol.Protocol{}    resp, err := proto.Execute(client, "PUB", topicName, string(reqParams.body))    if err != nil {        log.Printf("HTTP: error - %s", err.Error())        return    }    w.Header().Set("Content-Length", strconv.Itoa(len(resp)))    w.Write(resp)}func statsHandler(w http.ResponseWriter, req *http.Request) {    for topicName, _ := range message.TopicMap {        io.WriteString(w, fmt.Sprintf("%s\n", topicName))    }}

为了对立治理,咱们的 putHandler 办法没有间接操作 topic,而是包装了一个假的客户端向协定中发送 PUB 指令,由协定与 topic 进行交互。fake_conn 和 PUB 办法的代码如下:

server/fake_conn.gopackage serverimport (    "io")type FakeConn struct {    io.ReadWriter}func (c *FakeConn) Close() error {    return nil}----------protocol/protocol.gofunc (p *Protocol) PUB(client StatefulReadWriter, params []string) ([]byte, error) {    var buf bytes.Buffer    var err error    //  fake clients don't get to ClientInit    if client.GetState() != -1 {        return nil, ClientErrInvalid    }    if len(params) < 3 {        return nil, ClientErrInvalid    }    topicName := params[1]    body := []byte(params[2])    _, err = buf.Write(<-util.UuidChan)    if err != nil {        return nil, err    }    _, err = buf.Write(body)    if err != nil {        return nil, err    }    topic := message.GetTopic(topicName)    topic.PutMessage(message.NewMessage(buf.Bytes()))    return []byte("OK"), nil}

再来就是启动一个 http server:

http.gofunc HttpServer(ctx context.Context, address string, port string, endChan chan struct{}) {    http.HandleFunc("/ping", pingHandler)    http.HandleFunc("/put", putHandler)    http.HandleFunc("/stats", statsHandler)    fqAddress := address + ":" + port    httpServer := http.Server{        Addr: fqAddress,    }    go func() {        log.Printf("listening for http requests on %s", fqAddress)        err := http.ListenAndServe(fqAddress, nil)        if err != nil {            log.Fatal("http.ListenAndServe:", err)        }    }()    <-ctx.Done()    log.Printf("HTTP server on %s is shutdowning...", fqAddress)    timeoutCtx, fn := context.WithTimeout(context.Background(), 10*time.Second)    defer fn()    if err := httpServer.Shutdown(timeoutCtx); err != nil {        log.Printf("HTTP server shutdown error: %v", err)    }    close(endChan)}

作为参数传入的 context 也是为了对立退出治理,当监听到退出信号(<-ctx.Done())后,咱们再生成一个带超时工夫的 context,让 http server 有足够的工夫实现清理工作,实现优雅退出。退出之后敞开 endChan,调用方能够依据从 endChan 接管到数据判断优雅退出已实现。

main 函数

终于到了最初一步,就是实现咱们的 main 函数,将咱们的服务都启动起来。在我的项目根目录下创立 smq.go 文件,写入以下代码:

package mainimport (    "context"    "flag"    "github.com/yhao1206/SMQ/message"    "github.com/yhao1206/SMQ/server"    "github.com/yhao1206/SMQ/util"    "os"    "os/signal"    "strconv")var bindAddress = flag.String("address", "", "address to bind to")var webPort = flag.Int("web-port", 5150, "port to listen on for HTTP connections")var tcpPort = flag.Int("tcp-port", 5151, "port to listen on for TCP connections")var memQueueSize = flag.Int("mem-queue-size", 10000, "number of messages to keep in memory (per topic)")func main() {    flag.Parse()    endChan := make(chan struct{})    signalChan := make(chan os.Signal, 1)    signal.Notify(signalChan, os.Interrupt)    ctx, fn := context.WithCancel(context.Background())    go func() {        <-signalChan        fn()    }()    go message.TopicFactory(ctx, *memQueueSize)    go util.UuidFactory(ctx)    go server.TcpServer(ctx, *bindAddress, strconv.Itoa(*tcpPort))    server.HttpServer(ctx, *bindAddress, strconv.Itoa(*webPort), endChan)    for _, topic := range message.TopicMap {        topic.Close()    }    <-endChan}

第一步按照常规都是解析命令行参数,让用户能够自主管制监听端口和内存队列的大小。

第二步是监听中断信号,接管到信号后通过 context 的个性向所有的后盾服务发送退出信号,而后敞开所有的 topic,期待 http server 退出实现即可。简略两步,咱们的音讯队列零碎就搭建起来了。

测试

咱们当初来测试一下咱们的音讯队列,为了不便测试,这边提供了一个简略的消费者连贯库:

client/conn.gopackage clientimport (    "bytes"    "encoding/binary"    "fmt"    "github.com/yhao1206/SMQ/message"    "io"    "net"    "strconv")type Client struct {    conn io.ReadWriteCloser}type Command struct {    name   []byte    params [][]byte}type Response struct {    FrameType int32    Data      interface{}}func NewClient(conn net.Conn) *Client {    return &Client{conn}}func (c *Client) Connect(address string, port int) error {    fqAddress := address + ":" + strconv.Itoa(port)    conn, err := net.Dial("tcp", fqAddress)    if err != nil {        return err    }    c.conn = conn    return nil}func (c *Client) Version(version string) error {    _, err := c.conn.Write([]byte(version))    return err}func (c *Client) Subscribe(topic string, channel string) *Command {    params := make([][]byte, 2)    params[0] = []byte(topic)    params[1] = []byte(channel)    return &Command{[]byte("SUB"), params}}func (c *Client) Ready(count int) *Command {    params := make([][]byte, 1)    params[0] = []byte(strconv.Itoa(count))    return &Command{[]byte("RDY"), params}}func (c *Client) Finish(uuid string) *Command {    params := make([][]byte, 1)    params[0] = []byte(uuid)    return &Command{[]byte("FIN"), params}}func (c *Client) Requeue(uuid string) *Command {    params := make([][]byte, 1)    params[0] = []byte(uuid)    return &Command{[]byte("REQ"), params}}func (c *Client) Get() *Command {    return &Command{[]byte("GET"), nil}}func (c *Client) WriteCommand(cmd *Command) error {    if len(cmd.params) > 0 {        _, err := fmt.Fprintf(c.conn, "%s %s\n", cmd.name, string(bytes.Join(cmd.params, []byte(" "))))        if err != nil {            return err        }    } else {        _, err := fmt.Fprintf(c.conn, "%s\n", cmd.name)        if err != nil {            return err        }    }    return nil}func (c *Client) ReadResponse() (*message.Message, error) {    err := c.WriteCommand(c.Get())    if err != nil {        return nil, err    }    var msgSize int32    err = binary.Read(c.conn, binary.BigEndian, &msgSize)    if err != nil {        return nil, err    }    buf := make([]byte, msgSize)    _, err = c.conn.Read(buf)    if err != nil {        return nil, err    }    msg := message.NewMessage(buf)    return msg, nil}

新建 examples/cousumer/consumer.go 文件,写下咱们的消费者测试代码:

examples/cousumer/consumer.gopackage mainimport (    "github.com/yhao1206/SMQ/client"    "github.com/yhao1206/SMQ/util"    "log")func main() {    consumeClient := client.NewClient(nil)    err := consumeClient.Connect("127.0.0.1", 5151)    if err != nil {        log.Fatal(err)    }    consumeClient.WriteCommand(consumeClient.Subscribe("test", "ch"))    for {        msg, err := consumeClient.ReadResponse()        if err != nil {            log.Fatal(err)        }        log.Printf("%s - %s", util.UuidToStr(msg.Uuid()), msg.Body())        consumeClient.WriteCommand(consumeClient.Finish(util.UuidToStr(msg.Uuid())))    }}

启动服务端:go run smq.go,输入如下:

服务已胜利启动。

再启动咱们的测试消费者:go run ./examples/consumer/consumer.go,此时服务端多了几行输入:

能够看出咱们的消费者已胜利连贯到服务器并执行了订阅申请,相应的 topic 也曾经创立结束。

通过 curl 客户端(shell 或者 postman 等)发送 curl 命令来公布一条音讯:

curl --location --request POST '127.0.0.1:5150/put?topic=test' --header 'Content-Type: text/plain' --data-raw 'hello'

服务端输入日志:

查看一下消费者客户端的输入:

能够看到咱们的消费者胜利拉取到了生产者的音讯。

到这里咱们整个系列就全副完结了,欢送大家提出贵重的意见。


我的项目地址:https://github.com/yhao1206/SMQ
相干浏览:

  • 用 Go 写一个简略音讯队列(一):定义音讯和根底工具
  • 用 Go 写一个简略音讯队列(二):客户端解决和 channel 设计
  • 用 Go 写一个简略音讯队列(三):减少实现确认和重入队列性能
  • 用 Go 写一个简略音讯队列(四):topic 设计
  • 用 Go 写一个简略音讯队列(五):协定与后盾队列实现