共计 7600 个字符,预计需要花费 19 分钟才能阅读完成。
本篇咱们实现音讯队列的收尾工作,将咱们的服务器搭建起来。咱们的构想是消费者通过 tcp 连贯到服务端,生产者则是通过 http 协定间接发送音讯。
监听消费者连贯
在 server 目录下新建一个 tcp.go 文件,用于监听消费者的 tcp 连贯,代码如下:
package server | |
import ( | |
"context" | |
"log" | |
"net" | |
) | |
func TcpServer(ctx context.Context, addr, port string) { | |
fqAddress := addr + ":" + port | |
listener, err := net.Listen("tcp", fqAddress) | |
if err != nil {panic("tcp listen(" + fqAddress + ") failed") | |
} | |
log.Printf("listening for clients on %s", fqAddress) | |
for { | |
select {case <-ctx.Done(): | |
return | |
default: | |
conn, err := listener.Accept() | |
if err != nil {panic("accept failed:" + err.Error()) | |
} | |
client := NewClient(conn, conn.RemoteAddr().String()) | |
go client.Handle(ctx) | |
} | |
} | |
} |
每当监听到 tcp 连贯,就新建一个 client 来解决,同时传入一个 context 不便对立进行退出治理。client 和 protocol 也都要同时加上对这个 context 的监听代码,具体可参考代码仓库。
HTTP Server
同样在 server 目录下新建一个 http.go 文件来编写 HTTP 服务器,咱们的 HTTP 服务器只提供三个办法:测试连贯、写入音讯和查看所有 topic,先来看一下这三个办法的代码:
http.go | |
package server | |
import ( | |
"context" | |
"errors" | |
"fmt" | |
"github.com/yhao1206/SMQ/message" | |
"github.com/yhao1206/SMQ/protocol" | |
"io" | |
"log" | |
"net/http" | |
"net/url" | |
"strconv" | |
"time" | |
) | |
type ReqParams struct { | |
params url.Values | |
body []byte} | |
func NewReqParams(req *http.Request) (*ReqParams, error) {reqParams, err := url.ParseQuery(req.URL.RawQuery) | |
if err != nil {return nil, err} | |
data, err := io.ReadAll(req.Body) | |
if err != nil {return nil, err} | |
return &ReqParams{reqParams, data}, nil | |
} | |
func (r *ReqParams) Query(key string) (string, error) {keyData := r.params[key] | |
if len(keyData) == 0 {return "", errors.New("key not in query params") | |
} | |
return keyData[0], nil | |
} | |
func pingHandler(w http.ResponseWriter, req *http.Request) {w.Header().Set("Content-Length", "2") | |
io.WriteString(w, "OK") | |
} | |
func putHandler(w http.ResponseWriter, req *http.Request) {reqParams, err := NewReqParams(req) | |
if err != nil {log.Printf("HTTP: error - %s", err.Error()) | |
return | |
} | |
topicName, err := reqParams.Query("topic") | |
if err != nil {log.Printf("HTTP: error - %s", err.Error()) | |
return | |
} | |
conn := &FakeConn{} | |
client := NewClient(conn, "HTTP") | |
proto := &protocol.Protocol{} | |
resp, err := proto.Execute(client, "PUB", topicName, string(reqParams.body)) | |
if err != nil {log.Printf("HTTP: error - %s", err.Error()) | |
return | |
} | |
w.Header().Set("Content-Length", strconv.Itoa(len(resp))) | |
w.Write(resp) | |
} | |
func statsHandler(w http.ResponseWriter, req *http.Request) { | |
for topicName, _ := range message.TopicMap {io.WriteString(w, fmt.Sprintf("%s\n", topicName)) | |
} | |
} |
为了对立治理,咱们的 putHandler 办法没有间接操作 topic,而是包装了一个假的客户端向协定中发送 PUB 指令,由协定与 topic 进行交互。fake_conn 和 PUB 办法的代码如下:
server/fake_conn.go | |
package server | |
import ("io") | |
type FakeConn struct {io.ReadWriter} | |
func (c *FakeConn) Close() error {return nil} | |
---------- | |
protocol/protocol.go | |
func (p *Protocol) PUB(client StatefulReadWriter, params []string) ([]byte, error) { | |
var buf bytes.Buffer | |
var err error | |
// fake clients don't get to ClientInit | |
if client.GetState() != -1 {return nil, ClientErrInvalid} | |
if len(params) < 3 {return nil, ClientErrInvalid} | |
topicName := params[1] | |
body := []byte(params[2]) | |
_, err = buf.Write(<-util.UuidChan) | |
if err != nil {return nil, err} | |
_, err = buf.Write(body) | |
if err != nil {return nil, err} | |
topic := message.GetTopic(topicName) | |
topic.PutMessage(message.NewMessage(buf.Bytes())) | |
return []byte("OK"), nil | |
} |
再来就是启动一个 http server:
http.go | |
func HttpServer(ctx context.Context, address string, port string, endChan chan struct{}) {http.HandleFunc("/ping", pingHandler) | |
http.HandleFunc("/put", putHandler) | |
http.HandleFunc("/stats", statsHandler) | |
fqAddress := address + ":" + port | |
httpServer := http.Server{Addr: fqAddress,} | |
go func() {log.Printf("listening for http requests on %s", fqAddress) | |
err := http.ListenAndServe(fqAddress, nil) | |
if err != nil {log.Fatal("http.ListenAndServe:", err) | |
} | |
}() | |
<-ctx.Done() | |
log.Printf("HTTP server on %s is shutdowning...", fqAddress) | |
timeoutCtx, fn := context.WithTimeout(context.Background(), 10*time.Second) | |
defer fn() | |
if err := httpServer.Shutdown(timeoutCtx); err != nil {log.Printf("HTTP server shutdown error: %v", err) | |
} | |
close(endChan) | |
} |
作为参数传入的 context 也是为了对立退出治理,当监听到退出信号(<-ctx.Done()
)后,咱们再生成一个带超时工夫的 context,让 http server 有足够的工夫实现清理工作,实现优雅退出。退出之后敞开 endChan,调用方能够依据从 endChan 接管到数据判断优雅退出已实现。
main 函数
终于到了最初一步,就是实现咱们的 main 函数,将咱们的服务都启动起来。在我的项目根目录下创立 smq.go 文件,写入以下代码:
package main | |
import ( | |
"context" | |
"flag" | |
"github.com/yhao1206/SMQ/message" | |
"github.com/yhao1206/SMQ/server" | |
"github.com/yhao1206/SMQ/util" | |
"os" | |
"os/signal" | |
"strconv" | |
) | |
var bindAddress = flag.String("address", "","address to bind to") | |
var webPort = flag.Int("web-port", 5150, "port to listen on for HTTP connections") | |
var tcpPort = flag.Int("tcp-port", 5151, "port to listen on for TCP connections") | |
var memQueueSize = flag.Int("mem-queue-size", 10000, "number of messages to keep in memory (per topic)") | |
func main() {flag.Parse() | |
endChan := make(chan struct{}) | |
signalChan := make(chan os.Signal, 1) | |
signal.Notify(signalChan, os.Interrupt) | |
ctx, fn := context.WithCancel(context.Background()) | |
go func() { | |
<-signalChan | |
fn()}() | |
go message.TopicFactory(ctx, *memQueueSize) | |
go util.UuidFactory(ctx) | |
go server.TcpServer(ctx, *bindAddress, strconv.Itoa(*tcpPort)) | |
server.HttpServer(ctx, *bindAddress, strconv.Itoa(*webPort), endChan) | |
for _, topic := range message.TopicMap {topic.Close() | |
} | |
<-endChan | |
} |
第一步按照常规都是解析命令行参数,让用户能够自主管制监听端口和内存队列的大小。
第二步是监听中断信号,接管到信号后通过 context 的个性向所有的后盾服务发送退出信号,而后敞开所有的 topic,期待 http server 退出实现即可。简略两步,咱们的音讯队列零碎就搭建起来了。
测试
咱们当初来测试一下咱们的音讯队列,为了不便测试,这边提供了一个简略的消费者连贯库:
client/conn.go | |
package client | |
import ( | |
"bytes" | |
"encoding/binary" | |
"fmt" | |
"github.com/yhao1206/SMQ/message" | |
"io" | |
"net" | |
"strconv" | |
) | |
type Client struct {conn io.ReadWriteCloser} | |
type Command struct {name []byte | |
params [][]byte | |
} | |
type Response struct { | |
FrameType int32 | |
Data interface{}} | |
func NewClient(conn net.Conn) *Client {return &Client{conn} | |
} | |
func (c *Client) Connect(address string, port int) error {fqAddress := address + ":" + strconv.Itoa(port) | |
conn, err := net.Dial("tcp", fqAddress) | |
if err != nil {return err} | |
c.conn = conn | |
return nil | |
} | |
func (c *Client) Version(version string) error {_, err := c.conn.Write([]byte(version)) | |
return err | |
} | |
func (c *Client) Subscribe(topic string, channel string) *Command {params := make([][]byte, 2) | |
params[0] = []byte(topic) | |
params[1] = []byte(channel) | |
return &Command{[]byte("SUB"), params} | |
} | |
func (c *Client) Ready(count int) *Command {params := make([][]byte, 1) | |
params[0] = []byte(strconv.Itoa(count)) | |
return &Command{[]byte("RDY"), params} | |
} | |
func (c *Client) Finish(uuid string) *Command {params := make([][]byte, 1) | |
params[0] = []byte(uuid) | |
return &Command{[]byte("FIN"), params} | |
} | |
func (c *Client) Requeue(uuid string) *Command {params := make([][]byte, 1) | |
params[0] = []byte(uuid) | |
return &Command{[]byte("REQ"), params} | |
} | |
func (c *Client) Get() *Command {return &Command{[]byte("GET"), nil} | |
} | |
func (c *Client) WriteCommand(cmd *Command) error {if len(cmd.params) > 0 {_, err := fmt.Fprintf(c.conn, "%s %s\n", cmd.name, string(bytes.Join(cmd.params, []byte(" ")))) | |
if err != nil {return err} | |
} else {_, err := fmt.Fprintf(c.conn, "%s\n", cmd.name) | |
if err != nil {return err} | |
} | |
return nil | |
} | |
func (c *Client) ReadResponse() (*message.Message, error) {err := c.WriteCommand(c.Get()) | |
if err != nil {return nil, err} | |
var msgSize int32 | |
err = binary.Read(c.conn, binary.BigEndian, &msgSize) | |
if err != nil {return nil, err} | |
buf := make([]byte, msgSize) | |
_, err = c.conn.Read(buf) | |
if err != nil {return nil, err} | |
msg := message.NewMessage(buf) | |
return msg, nil | |
} |
新建 examples/cousumer/consumer.go 文件,写下咱们的消费者测试代码:
examples/cousumer/consumer.go | |
package main | |
import ( | |
"github.com/yhao1206/SMQ/client" | |
"github.com/yhao1206/SMQ/util" | |
"log" | |
) | |
func main() {consumeClient := client.NewClient(nil) | |
err := consumeClient.Connect("127.0.0.1", 5151) | |
if err != nil {log.Fatal(err) | |
} | |
consumeClient.WriteCommand(consumeClient.Subscribe("test", "ch")) | |
for {msg, err := consumeClient.ReadResponse() | |
if err != nil {log.Fatal(err) | |
} | |
log.Printf("%s - %s", util.UuidToStr(msg.Uuid()), msg.Body()) | |
consumeClient.WriteCommand(consumeClient.Finish(util.UuidToStr(msg.Uuid()))) | |
} | |
} |
启动服务端:go run smq.go
,输入如下:
服务已胜利启动。
再启动咱们的测试消费者:go run ./examples/consumer/consumer.go
,此时服务端多了几行输入:
能够看出咱们的消费者已胜利连贯到服务器并执行了订阅申请,相应的 topic 也曾经创立结束。
通过 curl 客户端(shell 或者 postman 等)发送 curl 命令来公布一条音讯:
curl --location --request POST '127.0.0.1:5150/put?topic=test' --header 'Content-Type: text/plain' --data-raw 'hello'
服务端输入日志:
查看一下消费者客户端的输入:
能够看到咱们的消费者胜利拉取到了生产者的音讯。
到这里咱们整个系列就全副完结了,欢送大家提出贵重的意见。
我的项目地址:https://github.com/yhao1206/SMQ
相干浏览:
- 用 Go 写一个简略音讯队列(一):定义音讯和根底工具
- 用 Go 写一个简略音讯队列(二):客户端解决和 channel 设计
- 用 Go 写一个简略音讯队列(三):减少实现确认和重入队列性能
- 用 Go 写一个简略音讯队列(四):topic 设计
- 用 Go 写一个简略音讯队列(五):协定与后盾队列实现