缘起

最近浏览<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采纳golang练习之

案例需要(聊天服务器)

  • 用户能够连贯到服务器。
  • 用户能够设定本人的用户名。
  • 用户能够向服务器发送音讯,同时服务器也会向其余用户播送该音讯。

指标

  • 实现聊天服务端, 反对端口监听, 多个客户端的连入, 音讯收发, 播送, 断开, 并采集日志
  • 革新已有的聊天客户端, 使之能同时适配客户端和服务端的通信, 并设置写缓冲以避免死锁
  • 测试多个客户端的连入, 收发和断开, 并诊断服务端日志

设计

  • IMsg: 定义音讯接口, 以及相干音讯的实现. 为不便任意音讯内容的解码, 音讯传输时, 采纳base64转码
  • IMsgDecoder: 定义音讯解码器及其实现
  • IChatClient: 定义聊天客户端接口. 本次增加敞开告诉办法, 以适配服务端.
  • tChatClient: 聊天客户端, 实现IChatClient接口. 本次增加敞开告诉, 写缓冲和读超时管制, 修复写循环细节问题.
  • IChatServer: 定义聊天服务器接口, 为不便测试, 提供日志采集办法
  • tChatServer: 实现聊天服务器IChatServer

单元测试

ChatServer_test.go

package chat_serverimport (    "fmt"    cs "learning/gooop/chat_server"    "strings"    "testing"    "time")func Test_ChatServer(t *testing.T) {    fnAssertTrue := func(b bool, msg string) {        if !b {            t.Fatal(msg)        }    }    port := 3333    server := cs.NewChatServer()    err := server.Open(port)    if err != nil {        t.Fatal(err)    }    clientCount := 3    address := fmt.Sprintf("localhost:%v", port)    for i := 0;i < clientCount;i++ {        err, client := cs.DialChatClient(address)        if err != nil {            t.Fatal(err)        }        id := fmt.Sprintf("c%02d", i)        client.RecvHandler(func(client cs.IChatClient, msg cs.IMsg) {            t.Logf("%v recv: %v\n", id, msg)        })        go func() {            client.SetName(id)            client.Send(&cs.NameMsg{id })            n := 0            for range time.Tick(time.Duration(1) * time.Second) {                client.Send(&cs.ChatMsg{id, fmt.Sprintf("msg %02d from %v", n, id) })                n++                if n >= 3 {                    break                }            }            client.Close()        }()    }    passedSeconds := 0    for range time.Tick(time.Second) {        passedSeconds++        t.Logf("%v seconds passed", passedSeconds)        if passedSeconds >= 5 {            break        }    }    server.Close()    logs := server.GetLogs()    fnHasLog := func(log string) bool {        for _,it := range logs {            if strings.Contains(it, log) {                return true            }        }        return false    }    for i := 0;i < clientCount;i++ {        msg := fmt.Sprintf("tChatServer.handleIncomingConn, clientCount=%v", i + 1)        fnAssertTrue(fnHasLog(msg), "expecting log: " + msg)        msg = fmt.Sprintf("tChatServer.handleClientClosed, c%02d", i)        fnAssertTrue(fnHasLog(msg), "expecting log: " + msg)    }}

测试输入

$ go test -v ChatServer_test.go === RUN   Test_ChatServertChatServer.handleIncomingConn, clientCount=1tChatServer.handleIncomingConn, clientCount=2tChatServer.handleIncomingConn, clientCount=3    ChatServer_test.go:59: 1 seconds passed    ChatServer_test.go:35: c00 recv: &{c00 msg 00 from c00}    ChatServer_test.go:35: c02 recv: &{c00 msg 00 from c00}    ChatServer_test.go:35: c02 recv: &{c01 msg 00 from c01}    ChatServer_test.go:35: c01 recv: &{c00 msg 00 from c00}    ChatServer_test.go:35: c01 recv: &{c01 msg 00 from c01}    ChatServer_test.go:35: c01 recv: &{c02 msg 00 from c02}    ChatServer_test.go:35: c00 recv: &{c01 msg 00 from c01}    ChatServer_test.go:35: c00 recv: &{c02 msg 00 from c02}    ChatServer_test.go:35: c02 recv: &{c02 msg 00 from c02}    ChatServer_test.go:35: c00 recv: &{c01 msg 01 from c01}    ChatServer_test.go:35: c01 recv: &{c00 msg 01 from c00}    ChatServer_test.go:35: c01 recv: &{c02 msg 01 from c02}    ChatServer_test.go:35: c02 recv: &{c01 msg 01 from c01}    ChatServer_test.go:35: c02 recv: &{c00 msg 01 from c00}    ChatServer_test.go:59: 2 seconds passed    ChatServer_test.go:35: c00 recv: &{c00 msg 01 from c00}    ChatServer_test.go:35: c02 recv: &{c02 msg 01 from c02}    ChatServer_test.go:35: c00 recv: &{c02 msg 01 from c02}tChatClient.postConnClosed, c00, serverFlag=falsetChatClient.postConnClosed, c02, serverFlag=falsetChatClient.postConnClosed, c01, serverFlag=falsetChatClient.postConnClosed, c02, serverFlag=truetChatServer.handleClientClosed, c02tChatServer.handleClientClosed, c02, clientCount=2tChatClient.postConnClosed, c01, serverFlag=truetChatServer.handleClientClosed, c01tChatServer.handleClientClosed, c01, clientCount=1    ChatServer_test.go:59: 3 seconds passedtChatClient.postConnClosed, c00, serverFlag=truetChatServer.handleClientClosed, c00tChatServer.handleClientClosed, c00, clientCount=0    ChatServer_test.go:59: 4 seconds passed    ChatServer_test.go:59: 5 seconds passed--- PASS: Test_ChatServer (5.00s)PASSok      command-line-arguments  5.003s

IMsg.go

定义音讯接口, 以及相干音讯的实现. 为不便任意音讯内容的解码, 音讯传输时, 采纳base64转码

package chat_serverimport (    "encoding/base64"    "fmt")type IMsg interface {    Encode() string}type NameMsg struct {    Name string}func (me *NameMsg) Encode() string {    return fmt.Sprintf("NAME %s\n", base64.StdEncoding.EncodeToString([]byte(me.Name)))}type ChatMsg struct {    Name string    Words string}func (me *ChatMsg) Encode() string {    return fmt.Sprintf("CHAT %s %s\n",        base64.StdEncoding.EncodeToString([]byte(me.Name)),        base64.StdEncoding.EncodeToString([]byte(me.Words)),    )}

IMsgDecoder.go

定义音讯解码器及其实现

package chat_serverimport (    "encoding/base64"    "strings")type IMsgDecoder interface {    Decode(line string) (bool, IMsg)}type tMsgDecoder struct {}func (me *tMsgDecoder) Decode(line string) (bool, IMsg) {    items := strings.Split(line, " ")    size := len(items)    if items[0] == "NAME" && size == 2 {        name, err := base64.StdEncoding.DecodeString(items[1])        if err != nil {            return false, nil        }        return true, &NameMsg{            Name: string(name),        }    }    if items[0] == "CHAT" && size == 3 {        name, err := base64.StdEncoding.DecodeString(items[1])        if err != nil {            return false, nil        }        words, err := base64.StdEncoding.DecodeString(items[2])        if err != nil {            return false, nil        }        return true, &ChatMsg{            Name: string(name),            Words: string(words),        }    }    return false, nil}var MsgDecoder = &tMsgDecoder{}

IChatClient.go

定义聊天客户端接口. 本次增加敞开告诉办法, 以适配服务端.

package chat_servertype IChatClient interface {    GetName() string    SetName(name string)    Send(msg IMsg)    RecvHandler(handler ClientRecvFunc)    CloseHandler(handler ClientCloseFunc)    Close()}type ClientRecvFunc func(client IChatClient, msg IMsg)type ClientCloseFunc func(client IChatClient)

tChatClient.go

聊天客户端, 实现IChatClient接口. 本次增加敞开告诉, 写缓冲和读超时管制, 修复写循环细节问题.

package chat_serverimport (    "bufio"    "fmt"    "io"    "net"    "sync/atomic"    "time")type tChatClient struct {    conn net.Conn    name string    openFlag int32    closeFlag int32    serverFlag bool    closeChan chan bool    sendChan chan IMsg    sendLogs []IMsg    dropLogs []IMsg    recvLogs []IMsg    pendingSend int32    recvHandler ClientRecvFunc    closeHandler ClientCloseFunc}var gMaxPendingSend int32 = 100func DialChatClient(address string) (error, IChatClient) {    conn, err := net.Dial("tcp", address)    if err != nil {        return err, nil    }    return nil, openChatClient(conn, false)}func openChatClient(conn net.Conn, serverFlag bool) IChatClient {    it := &tChatClient{        conn: conn,        openFlag: 0,        closeFlag: 0,        serverFlag: serverFlag,        closeChan: make(chan bool),        sendChan: make(chan IMsg, gMaxPendingSend),        name: "anonymous",        sendLogs: []IMsg{},        dropLogs: []IMsg{},        recvLogs: []IMsg{},    }    it.open()    return it}func (me *tChatClient) GetName() string {    return me.name}func (me *tChatClient) SetName(name string) {    me.name = name}func (me *tChatClient) open(){    if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) {        return    }    go me.beginWrite()    go me.beginRead()}func (me *tChatClient) isClosed() bool {    return me.closeFlag != 0}func (me *tChatClient) isNotClosed() bool {    return !me.isClosed()}func (me *tChatClient) Send(msg IMsg) {    if me.isClosed() {        return    }    if me.pendingSend < gMaxPendingSend {        atomic.AddInt32(&me.pendingSend, 1)        me.sendChan <- msg    } else {        me.dropLogs = append(me.dropLogs, msg)    }}func (me *tChatClient) RecvHandler(handler ClientRecvFunc) {    if me.isNotClosed() {        me.recvHandler = handler    }}func (me *tChatClient) CloseHandler(handler ClientCloseFunc) {    if me.isNotClosed() {        me.closeHandler = handler    }}func (me *tChatClient) Close() {    if me.isNotClosed() {        me.closeConn()    }}func (me *tChatClient) beginWrite() {    writer := io.Writer(me.conn)    for {        select {        case <- me.closeChan:            _ = me.conn.Close()            me.closeFlag = 2            me.postConnClosed()            return        case msg := <- me.sendChan:            atomic.AddInt32(&me.pendingSend, -1)            _,e := writer.Write([]byte(msg.Encode()))            if e != nil {                me.closeConn()                break            } else {                me.sendLogs = append(me.sendLogs, msg)            }        case <- time.After(time.Duration(10) * time.Second):            me.postRecvTimeout()            break        }    }}func (me *tChatClient) postRecvTimeout() {    fmt.Printf("tChatClient.postRecvTimeout, %v, serverFlag=%v\n", me.name, me.serverFlag)    me.closeConn()}func (me *tChatClient) beginRead() {    reader := bufio.NewReader(me.conn)    for {        line, err := reader.ReadString('\n')        if err != nil {            me.closeConn()            break        }        ok, msg := MsgDecoder.Decode(line)        if ok {            fn := me.recvHandler            if fn != nil {                fn(me, msg)            }            me.recvLogs = append(me.recvLogs, msg)        }    }}func (me *tChatClient) closeConn() {    if !atomic.CompareAndSwapInt32(&me.closeFlag, 0, 1) {        return    }    me.closeChan <- true}func (me *tChatClient) postConnClosed() {    fmt.Printf("tChatClient.postConnClosed, %v, serverFlag=%v\n", me.name, me.serverFlag)    handler := me.closeHandler    if handler != nil {        handler(me)    }    me.closeHandler = nil    me.recvHandler = nil}

IChatServer.go

定义聊天服务器接口, 为不便测试, 提供日志采集办法

package chat_servertype IChatServer interface {    Open(port int) error    Broadcast(msg IMsg)    Close()    GetLogs() []string}

tChatServer.go

实现聊天服务器IChatServer

package chat_serverimport (    "errors"    "fmt"    "net"    "sync"    "sync/atomic")type tChatServer struct {    openFlag int32    closeFlag int32    clients []IChatClient    clientCount int    clientLock *sync.RWMutex    listener net.Listener    recvLogs []IMsg    logs []string}func NewChatServer() IChatServer {    it := &tChatServer{        openFlag: 0,        closeFlag: 0,        clients: []IChatClient{},        clientCount: 0,        clientLock: new(sync.RWMutex),        listener: nil,        recvLogs: []IMsg{},    }    return it}func (me *tChatServer) Open(port int) error {    if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) {        return errors.New("server already opened")    }    listener, err := net.Listen("tcp", fmt.Sprintf(":%v", port))    if err != nil {        return err    }    me.listener = listener    go me.beginListening()    return nil}func (me *tChatServer) logf(f string, args... interface{}) {    msg := fmt.Sprintf(f, args...)    me.logs = append(me.logs, msg)    fmt.Println(msg)}func (me *tChatServer) GetLogs() []string {    return me.logs}func (me *tChatServer) isClosed() bool {    return me.closeFlag != 0}func (me *tChatServer) isNotClosed() bool {    return !me.isClosed()}func (me *tChatServer) beginListening() {    for !me.isClosed() {        conn, err := me.listener.Accept()        if err != nil {            me.Close()            break        }        me.handleIncomingConn(conn)    }}func (me *tChatServer) Close() {    if !atomic.CompareAndSwapInt32(&me.closeFlag, 0, 1) {        return    }    _ = me.listener.Close()    me.closeAllClients()}func (me *tChatServer) closeAllClients() {    me.clientLock.Lock()    defer me.clientLock.Unlock()    for i,it := range me.clients {        if it != nil {            it.Close()            me.clients[i] = nil        }    }    me.clientCount = 0}func (me *tChatServer) handleIncomingConn(conn net.Conn) {    // init client    client := openChatClient(conn, true)    client.RecvHandler(me.handleClientMsg)    client.CloseHandler(me.handleClientClosed)    // lock me.clients    me.clientLock.Lock()    defer me.clientLock.Unlock()    // append to me.clients    if len(me.clients) > me.clientCount {        me.clients[me.clientCount] = client    } else {        me.clients = append(me.clients, client)    }    me.clientCount++    me.logf("tChatServer.handleIncomingConn, clientCount=%v", me.clientCount)}func (me *tChatServer) handleClientMsg(client IChatClient, msg IMsg) {    me.recvLogs = append(me.recvLogs, msg)    if nameMsg,ok := msg.(*NameMsg);ok {        client.SetName(nameMsg.Name)    } else if _, ok := msg.(*ChatMsg);ok {        me.Broadcast(msg)    }}func (me *tChatServer) handleClientClosed(client IChatClient) {    me.logf("tChatServer.handleClientClosed, %s", client.GetName())    me.clientLock.Lock()    defer me.clientLock.Unlock()    if me.clientCount <= 0 {        return    }    lastI := me.clientCount - 1    for i,it := range me.clients {        if it == client {            if i == lastI {                me.clients[i] = nil            } else {                me.clients[i], me.clients[lastI] = me.clients[lastI], nil            }            me.clientCount--            break        }    }    me.logf("tChatServer.handleClientClosed, %s, clientCount=%v", client.GetName(), me.clientCount)}func (me *tChatServer) Broadcast(msg IMsg) {    me.clientLock.RLock()    defer me.clientLock.RUnlock()    for _,it := range me.clients {        if it != nil {            it.Send(msg)        }    }}

(未完待续)