缘起
最近浏览<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采纳golang练习之
案例需要(聊天服务器)
- 用户能够连贯到服务器。
- 用户能够设定本人的用户名。
- 用户能够向服务器发送音讯,同时服务器也会向其余用户播送该音讯。
指标
- 实现聊天服务端, 反对端口监听, 多个客户端的连入, 音讯收发, 播送, 断开, 并采集日志
- 革新已有的聊天客户端, 使之能同时适配客户端和服务端的通信, 并设置写缓冲以避免死锁
- 测试多个客户端的连入, 收发和断开, 并诊断服务端日志
设计
- IMsg: 定义音讯接口, 以及相干音讯的实现. 为不便任意音讯内容的解码, 音讯传输时, 采纳base64转码
- IMsgDecoder: 定义音讯解码器及其实现
- IChatClient: 定义聊天客户端接口. 本次增加敞开告诉办法, 以适配服务端.
- tChatClient: 聊天客户端, 实现IChatClient接口. 本次增加敞开告诉, 写缓冲和读超时管制, 修复写循环细节问题.
- IChatServer: 定义聊天服务器接口, 为不便测试, 提供日志采集办法
- tChatServer: 实现聊天服务器IChatServer
单元测试
ChatServer_test.go
package chat_serverimport ( "fmt" cs "learning/gooop/chat_server" "strings" "testing" "time")func Test_ChatServer(t *testing.T) { fnAssertTrue := func(b bool, msg string) { if !b { t.Fatal(msg) } } port := 3333 server := cs.NewChatServer() err := server.Open(port) if err != nil { t.Fatal(err) } clientCount := 3 address := fmt.Sprintf("localhost:%v", port) for i := 0;i < clientCount;i++ { err, client := cs.DialChatClient(address) if err != nil { t.Fatal(err) } id := fmt.Sprintf("c%02d", i) client.RecvHandler(func(client cs.IChatClient, msg cs.IMsg) { t.Logf("%v recv: %v\n", id, msg) }) go func() { client.SetName(id) client.Send(&cs.NameMsg{id }) n := 0 for range time.Tick(time.Duration(1) * time.Second) { client.Send(&cs.ChatMsg{id, fmt.Sprintf("msg %02d from %v", n, id) }) n++ if n >= 3 { break } } client.Close() }() } passedSeconds := 0 for range time.Tick(time.Second) { passedSeconds++ t.Logf("%v seconds passed", passedSeconds) if passedSeconds >= 5 { break } } server.Close() logs := server.GetLogs() fnHasLog := func(log string) bool { for _,it := range logs { if strings.Contains(it, log) { return true } } return false } for i := 0;i < clientCount;i++ { msg := fmt.Sprintf("tChatServer.handleIncomingConn, clientCount=%v", i + 1) fnAssertTrue(fnHasLog(msg), "expecting log: " + msg) msg = fmt.Sprintf("tChatServer.handleClientClosed, c%02d", i) fnAssertTrue(fnHasLog(msg), "expecting log: " + msg) }}
测试输入
$ go test -v ChatServer_test.go === RUN Test_ChatServertChatServer.handleIncomingConn, clientCount=1tChatServer.handleIncomingConn, clientCount=2tChatServer.handleIncomingConn, clientCount=3 ChatServer_test.go:59: 1 seconds passed ChatServer_test.go:35: c00 recv: &{c00 msg 00 from c00} ChatServer_test.go:35: c02 recv: &{c00 msg 00 from c00} ChatServer_test.go:35: c02 recv: &{c01 msg 00 from c01} ChatServer_test.go:35: c01 recv: &{c00 msg 00 from c00} ChatServer_test.go:35: c01 recv: &{c01 msg 00 from c01} ChatServer_test.go:35: c01 recv: &{c02 msg 00 from c02} ChatServer_test.go:35: c00 recv: &{c01 msg 00 from c01} ChatServer_test.go:35: c00 recv: &{c02 msg 00 from c02} ChatServer_test.go:35: c02 recv: &{c02 msg 00 from c02} ChatServer_test.go:35: c00 recv: &{c01 msg 01 from c01} ChatServer_test.go:35: c01 recv: &{c00 msg 01 from c00} ChatServer_test.go:35: c01 recv: &{c02 msg 01 from c02} ChatServer_test.go:35: c02 recv: &{c01 msg 01 from c01} ChatServer_test.go:35: c02 recv: &{c00 msg 01 from c00} ChatServer_test.go:59: 2 seconds passed ChatServer_test.go:35: c00 recv: &{c00 msg 01 from c00} ChatServer_test.go:35: c02 recv: &{c02 msg 01 from c02} ChatServer_test.go:35: c00 recv: &{c02 msg 01 from c02}tChatClient.postConnClosed, c00, serverFlag=falsetChatClient.postConnClosed, c02, serverFlag=falsetChatClient.postConnClosed, c01, serverFlag=falsetChatClient.postConnClosed, c02, serverFlag=truetChatServer.handleClientClosed, c02tChatServer.handleClientClosed, c02, clientCount=2tChatClient.postConnClosed, c01, serverFlag=truetChatServer.handleClientClosed, c01tChatServer.handleClientClosed, c01, clientCount=1 ChatServer_test.go:59: 3 seconds passedtChatClient.postConnClosed, c00, serverFlag=truetChatServer.handleClientClosed, c00tChatServer.handleClientClosed, c00, clientCount=0 ChatServer_test.go:59: 4 seconds passed ChatServer_test.go:59: 5 seconds passed--- PASS: Test_ChatServer (5.00s)PASSok command-line-arguments 5.003s
IMsg.go
定义音讯接口, 以及相干音讯的实现. 为不便任意音讯内容的解码, 音讯传输时, 采纳base64转码
package chat_serverimport ( "encoding/base64" "fmt")type IMsg interface { Encode() string}type NameMsg struct { Name string}func (me *NameMsg) Encode() string { return fmt.Sprintf("NAME %s\n", base64.StdEncoding.EncodeToString([]byte(me.Name)))}type ChatMsg struct { Name string Words string}func (me *ChatMsg) Encode() string { return fmt.Sprintf("CHAT %s %s\n", base64.StdEncoding.EncodeToString([]byte(me.Name)), base64.StdEncoding.EncodeToString([]byte(me.Words)), )}
IMsgDecoder.go
定义音讯解码器及其实现
package chat_serverimport ( "encoding/base64" "strings")type IMsgDecoder interface { Decode(line string) (bool, IMsg)}type tMsgDecoder struct {}func (me *tMsgDecoder) Decode(line string) (bool, IMsg) { items := strings.Split(line, " ") size := len(items) if items[0] == "NAME" && size == 2 { name, err := base64.StdEncoding.DecodeString(items[1]) if err != nil { return false, nil } return true, &NameMsg{ Name: string(name), } } if items[0] == "CHAT" && size == 3 { name, err := base64.StdEncoding.DecodeString(items[1]) if err != nil { return false, nil } words, err := base64.StdEncoding.DecodeString(items[2]) if err != nil { return false, nil } return true, &ChatMsg{ Name: string(name), Words: string(words), } } return false, nil}var MsgDecoder = &tMsgDecoder{}
IChatClient.go
定义聊天客户端接口. 本次增加敞开告诉办法, 以适配服务端.
package chat_servertype IChatClient interface { GetName() string SetName(name string) Send(msg IMsg) RecvHandler(handler ClientRecvFunc) CloseHandler(handler ClientCloseFunc) Close()}type ClientRecvFunc func(client IChatClient, msg IMsg)type ClientCloseFunc func(client IChatClient)
tChatClient.go
聊天客户端, 实现IChatClient接口. 本次增加敞开告诉, 写缓冲和读超时管制, 修复写循环细节问题.
package chat_serverimport ( "bufio" "fmt" "io" "net" "sync/atomic" "time")type tChatClient struct { conn net.Conn name string openFlag int32 closeFlag int32 serverFlag bool closeChan chan bool sendChan chan IMsg sendLogs []IMsg dropLogs []IMsg recvLogs []IMsg pendingSend int32 recvHandler ClientRecvFunc closeHandler ClientCloseFunc}var gMaxPendingSend int32 = 100func DialChatClient(address string) (error, IChatClient) { conn, err := net.Dial("tcp", address) if err != nil { return err, nil } return nil, openChatClient(conn, false)}func openChatClient(conn net.Conn, serverFlag bool) IChatClient { it := &tChatClient{ conn: conn, openFlag: 0, closeFlag: 0, serverFlag: serverFlag, closeChan: make(chan bool), sendChan: make(chan IMsg, gMaxPendingSend), name: "anonymous", sendLogs: []IMsg{}, dropLogs: []IMsg{}, recvLogs: []IMsg{}, } it.open() return it}func (me *tChatClient) GetName() string { return me.name}func (me *tChatClient) SetName(name string) { me.name = name}func (me *tChatClient) open(){ if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) { return } go me.beginWrite() go me.beginRead()}func (me *tChatClient) isClosed() bool { return me.closeFlag != 0}func (me *tChatClient) isNotClosed() bool { return !me.isClosed()}func (me *tChatClient) Send(msg IMsg) { if me.isClosed() { return } if me.pendingSend < gMaxPendingSend { atomic.AddInt32(&me.pendingSend, 1) me.sendChan <- msg } else { me.dropLogs = append(me.dropLogs, msg) }}func (me *tChatClient) RecvHandler(handler ClientRecvFunc) { if me.isNotClosed() { me.recvHandler = handler }}func (me *tChatClient) CloseHandler(handler ClientCloseFunc) { if me.isNotClosed() { me.closeHandler = handler }}func (me *tChatClient) Close() { if me.isNotClosed() { me.closeConn() }}func (me *tChatClient) beginWrite() { writer := io.Writer(me.conn) for { select { case <- me.closeChan: _ = me.conn.Close() me.closeFlag = 2 me.postConnClosed() return case msg := <- me.sendChan: atomic.AddInt32(&me.pendingSend, -1) _,e := writer.Write([]byte(msg.Encode())) if e != nil { me.closeConn() break } else { me.sendLogs = append(me.sendLogs, msg) } case <- time.After(time.Duration(10) * time.Second): me.postRecvTimeout() break } }}func (me *tChatClient) postRecvTimeout() { fmt.Printf("tChatClient.postRecvTimeout, %v, serverFlag=%v\n", me.name, me.serverFlag) me.closeConn()}func (me *tChatClient) beginRead() { reader := bufio.NewReader(me.conn) for { line, err := reader.ReadString('\n') if err != nil { me.closeConn() break } ok, msg := MsgDecoder.Decode(line) if ok { fn := me.recvHandler if fn != nil { fn(me, msg) } me.recvLogs = append(me.recvLogs, msg) } }}func (me *tChatClient) closeConn() { if !atomic.CompareAndSwapInt32(&me.closeFlag, 0, 1) { return } me.closeChan <- true}func (me *tChatClient) postConnClosed() { fmt.Printf("tChatClient.postConnClosed, %v, serverFlag=%v\n", me.name, me.serverFlag) handler := me.closeHandler if handler != nil { handler(me) } me.closeHandler = nil me.recvHandler = nil}
IChatServer.go
定义聊天服务器接口, 为不便测试, 提供日志采集办法
package chat_servertype IChatServer interface { Open(port int) error Broadcast(msg IMsg) Close() GetLogs() []string}
tChatServer.go
实现聊天服务器IChatServer
package chat_serverimport ( "errors" "fmt" "net" "sync" "sync/atomic")type tChatServer struct { openFlag int32 closeFlag int32 clients []IChatClient clientCount int clientLock *sync.RWMutex listener net.Listener recvLogs []IMsg logs []string}func NewChatServer() IChatServer { it := &tChatServer{ openFlag: 0, closeFlag: 0, clients: []IChatClient{}, clientCount: 0, clientLock: new(sync.RWMutex), listener: nil, recvLogs: []IMsg{}, } return it}func (me *tChatServer) Open(port int) error { if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) { return errors.New("server already opened") } listener, err := net.Listen("tcp", fmt.Sprintf(":%v", port)) if err != nil { return err } me.listener = listener go me.beginListening() return nil}func (me *tChatServer) logf(f string, args... interface{}) { msg := fmt.Sprintf(f, args...) me.logs = append(me.logs, msg) fmt.Println(msg)}func (me *tChatServer) GetLogs() []string { return me.logs}func (me *tChatServer) isClosed() bool { return me.closeFlag != 0}func (me *tChatServer) isNotClosed() bool { return !me.isClosed()}func (me *tChatServer) beginListening() { for !me.isClosed() { conn, err := me.listener.Accept() if err != nil { me.Close() break } me.handleIncomingConn(conn) }}func (me *tChatServer) Close() { if !atomic.CompareAndSwapInt32(&me.closeFlag, 0, 1) { return } _ = me.listener.Close() me.closeAllClients()}func (me *tChatServer) closeAllClients() { me.clientLock.Lock() defer me.clientLock.Unlock() for i,it := range me.clients { if it != nil { it.Close() me.clients[i] = nil } } me.clientCount = 0}func (me *tChatServer) handleIncomingConn(conn net.Conn) { // init client client := openChatClient(conn, true) client.RecvHandler(me.handleClientMsg) client.CloseHandler(me.handleClientClosed) // lock me.clients me.clientLock.Lock() defer me.clientLock.Unlock() // append to me.clients if len(me.clients) > me.clientCount { me.clients[me.clientCount] = client } else { me.clients = append(me.clients, client) } me.clientCount++ me.logf("tChatServer.handleIncomingConn, clientCount=%v", me.clientCount)}func (me *tChatServer) handleClientMsg(client IChatClient, msg IMsg) { me.recvLogs = append(me.recvLogs, msg) if nameMsg,ok := msg.(*NameMsg);ok { client.SetName(nameMsg.Name) } else if _, ok := msg.(*ChatMsg);ok { me.Broadcast(msg) }}func (me *tChatServer) handleClientClosed(client IChatClient) { me.logf("tChatServer.handleClientClosed, %s", client.GetName()) me.clientLock.Lock() defer me.clientLock.Unlock() if me.clientCount <= 0 { return } lastI := me.clientCount - 1 for i,it := range me.clients { if it == client { if i == lastI { me.clients[i] = nil } else { me.clients[i], me.clients[lastI] = me.clients[lastI], nil } me.clientCount-- break } } me.logf("tChatServer.handleClientClosed, %s, clientCount=%v", client.GetName(), me.clientCount)}func (me *tChatServer) Broadcast(msg IMsg) { me.clientLock.RLock() defer me.clientLock.RUnlock() for _,it := range me.clients { if it != nil { it.Send(msg) } }}
(未完待续)