关于golang:手撸golang-GO与微服务-ChatServer之2

41次阅读

共计 10447 个字符,预计需要花费 27 分钟才能阅读完成。

缘起

最近浏览 <<Go 微服务实战 >> (刘金亮, 2021.1)
本系列笔记拟采纳 golang 练习之

案例需要 (聊天服务器)

  • 用户能够连贯到服务器。
  • 用户能够设定本人的用户名。
  • 用户能够向服务器发送音讯,同时服务器也会向其余用户播送该音讯。

指标

  • 实现聊天服务端, 反对端口监听, 多个客户端的连入, 音讯收发, 播送, 断开, 并采集日志
  • 革新已有的聊天客户端, 使之能同时适配客户端和服务端的通信, 并设置写缓冲以避免死锁
  • 测试多个客户端的连入, 收发和断开, 并诊断服务端日志

设计

  • IMsg: 定义音讯接口, 以及相干音讯的实现. 为不便任意音讯内容的解码, 音讯传输时, 采纳 base64 转码
  • IMsgDecoder: 定义音讯解码器及其实现
  • IChatClient: 定义聊天客户端接口. 本次增加敞开告诉办法, 以适配服务端.
  • tChatClient: 聊天客户端, 实现 IChatClient 接口. 本次增加敞开告诉, 写缓冲和读超时管制, 修复写循环细节问题.
  • IChatServer: 定义聊天服务器接口, 为不便测试, 提供日志采集办法
  • tChatServer: 实现聊天服务器 IChatServer

单元测试

ChatServer_test.go

package chat_server

import (
    "fmt"
    cs "learning/gooop/chat_server"
    "strings"
    "testing"
    "time"
)

func Test_ChatServer(t *testing.T) {fnAssertTrue := func(b bool, msg string) {
        if !b {t.Fatal(msg)
        }
    }

    port := 3333
    server := cs.NewChatServer()
    err := server.Open(port)
    if err != nil {t.Fatal(err)
    }

    clientCount := 3
    address := fmt.Sprintf("localhost:%v", port)
    for i := 0;i < clientCount;i++ {err, client := cs.DialChatClient(address)
        if err != nil {t.Fatal(err)
        }

        id := fmt.Sprintf("c%02d", i)
        client.RecvHandler(func(client cs.IChatClient, msg cs.IMsg) {t.Logf("%v recv: %v\n", id, msg)
        })

        go func() {client.SetName(id)
            client.Send(&cs.NameMsg{id})

            n := 0
            for range time.Tick(time.Duration(1) * time.Second) {client.Send(&cs.ChatMsg{id, fmt.Sprintf("msg %02d from %v", n, id) })

                n++
                if n >= 3 {break}
            }

            client.Close()}()}

    passedSeconds := 0
    for range time.Tick(time.Second) {
        passedSeconds++
        t.Logf("%v seconds passed", passedSeconds)

        if passedSeconds >= 5 {break}
    }
    server.Close()

    logs := server.GetLogs()
    fnHasLog := func(log string) bool {
        for _,it := range logs {if strings.Contains(it, log) {return true}
        }
        return false
    }

    for i := 0;i < clientCount;i++ {msg := fmt.Sprintf("tChatServer.handleIncomingConn, clientCount=%v", i + 1)
        fnAssertTrue(fnHasLog(msg), "expecting log:" + msg)

        msg = fmt.Sprintf("tChatServer.handleClientClosed, c%02d", i)
        fnAssertTrue(fnHasLog(msg), "expecting log:" + msg)
    }
}

测试输入

$ go test -v ChatServer_test.go 
=== RUN   Test_ChatServer
tChatServer.handleIncomingConn, clientCount=1
tChatServer.handleIncomingConn, clientCount=2
tChatServer.handleIncomingConn, clientCount=3
    ChatServer_test.go:59: 1 seconds passed
    ChatServer_test.go:35: c00 recv: &{c00 msg 00 from c00}
    ChatServer_test.go:35: c02 recv: &{c00 msg 00 from c00}
    ChatServer_test.go:35: c02 recv: &{c01 msg 00 from c01}
    ChatServer_test.go:35: c01 recv: &{c00 msg 00 from c00}
    ChatServer_test.go:35: c01 recv: &{c01 msg 00 from c01}
    ChatServer_test.go:35: c01 recv: &{c02 msg 00 from c02}
    ChatServer_test.go:35: c00 recv: &{c01 msg 00 from c01}
    ChatServer_test.go:35: c00 recv: &{c02 msg 00 from c02}
    ChatServer_test.go:35: c02 recv: &{c02 msg 00 from c02}
    ChatServer_test.go:35: c00 recv: &{c01 msg 01 from c01}
    ChatServer_test.go:35: c01 recv: &{c00 msg 01 from c00}
    ChatServer_test.go:35: c01 recv: &{c02 msg 01 from c02}
    ChatServer_test.go:35: c02 recv: &{c01 msg 01 from c01}
    ChatServer_test.go:35: c02 recv: &{c00 msg 01 from c00}
    ChatServer_test.go:59: 2 seconds passed
    ChatServer_test.go:35: c00 recv: &{c00 msg 01 from c00}
    ChatServer_test.go:35: c02 recv: &{c02 msg 01 from c02}
    ChatServer_test.go:35: c00 recv: &{c02 msg 01 from c02}
tChatClient.postConnClosed, c00, serverFlag=false
tChatClient.postConnClosed, c02, serverFlag=false
tChatClient.postConnClosed, c01, serverFlag=false
tChatClient.postConnClosed, c02, serverFlag=true
tChatServer.handleClientClosed, c02
tChatServer.handleClientClosed, c02, clientCount=2
tChatClient.postConnClosed, c01, serverFlag=true
tChatServer.handleClientClosed, c01
tChatServer.handleClientClosed, c01, clientCount=1
    ChatServer_test.go:59: 3 seconds passed
tChatClient.postConnClosed, c00, serverFlag=true
tChatServer.handleClientClosed, c00
tChatServer.handleClientClosed, c00, clientCount=0
    ChatServer_test.go:59: 4 seconds passed
    ChatServer_test.go:59: 5 seconds passed
--- PASS: Test_ChatServer (5.00s)
PASS
ok      command-line-arguments  5.003s

IMsg.go

定义音讯接口, 以及相干音讯的实现. 为不便任意音讯内容的解码, 音讯传输时, 采纳 base64 转码

package chat_server

import (
    "encoding/base64"
    "fmt"
)

type IMsg interface {Encode() string
}

type NameMsg struct {Name string}

func (me *NameMsg) Encode() string {return fmt.Sprintf("NAME %s\n", base64.StdEncoding.EncodeToString([]byte(me.Name)))
}

type ChatMsg struct {
    Name string
    Words string
}

func (me *ChatMsg) Encode() string {
    return fmt.Sprintf("CHAT %s %s\n",
        base64.StdEncoding.EncodeToString([]byte(me.Name)),
        base64.StdEncoding.EncodeToString([]byte(me.Words)),
    )
}

IMsgDecoder.go

定义音讯解码器及其实现

package chat_server

import (
    "encoding/base64"
    "strings"
)


type IMsgDecoder interface {Decode(line string) (bool, IMsg)
}

type tMsgDecoder struct {
}

func (me *tMsgDecoder) Decode(line string) (bool, IMsg) {items := strings.Split(line, " ")
    size := len(items)

    if items[0] == "NAME" && size == 2 {name, err := base64.StdEncoding.DecodeString(items[1])
        if err != nil {return false, nil}

        return true, &NameMsg{Name: string(name),
        }
    }

    if items[0] == "CHAT" && size == 3 {name, err := base64.StdEncoding.DecodeString(items[1])
        if err != nil {return false, nil}

        words, err := base64.StdEncoding.DecodeString(items[2])
        if err != nil {return false, nil}

        return true, &ChatMsg{Name: string(name),
            Words: string(words),
        }
    }

    return false, nil
}


var MsgDecoder = &tMsgDecoder{}

IChatClient.go

定义聊天客户端接口. 本次增加敞开告诉办法, 以适配服务端.

package chat_server

type IChatClient interface {GetName() string
    SetName(name string)

    Send(msg IMsg)
    RecvHandler(handler ClientRecvFunc)
    CloseHandler(handler ClientCloseFunc)

    Close()}

type ClientRecvFunc func(client IChatClient, msg IMsg)
type ClientCloseFunc func(client IChatClient)

tChatClient.go

聊天客户端, 实现 IChatClient 接口. 本次增加敞开告诉, 写缓冲和读超时管制, 修复写循环细节问题.

package chat_server

import (
    "bufio"
    "fmt"
    "io"
    "net"
    "sync/atomic"
    "time"
)

type tChatClient struct {
    conn net.Conn
    name string
    openFlag int32
    closeFlag int32
    serverFlag bool

    closeChan chan bool
    sendChan chan IMsg

    sendLogs []IMsg
    dropLogs []IMsg
    recvLogs []IMsg
    pendingSend int32

    recvHandler ClientRecvFunc
    closeHandler ClientCloseFunc
}

var gMaxPendingSend int32 = 100

func DialChatClient(address string) (error, IChatClient) {conn, err := net.Dial("tcp", address)
    if err != nil {return err, nil}

    return nil, openChatClient(conn, false)
}

func openChatClient(conn net.Conn, serverFlag bool) IChatClient {
    it := &tChatClient{
        conn: conn,
        openFlag: 0,
        closeFlag: 0,
        serverFlag: serverFlag,

        closeChan: make(chan bool),
        sendChan: make(chan IMsg, gMaxPendingSend),

        name: "anonymous",
        sendLogs: []IMsg{},
        dropLogs: []IMsg{},
        recvLogs: []IMsg{},
    }
    it.open()
    return it
}


func (me *tChatClient) GetName() string {return me.name}

func (me *tChatClient) SetName(name string) {me.name = name}

func (me *tChatClient) open(){if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) {return}

    go me.beginWrite()
    go me.beginRead()}


func (me *tChatClient) isClosed() bool {return me.closeFlag != 0}

func (me *tChatClient) isNotClosed() bool {return !me.isClosed()
}

func (me *tChatClient) Send(msg IMsg) {if me.isClosed() {return}

    if me.pendingSend < gMaxPendingSend {atomic.AddInt32(&me.pendingSend, 1)
        me.sendChan <- msg

    } else {me.dropLogs = append(me.dropLogs, msg)
    }
}

func (me *tChatClient) RecvHandler(handler ClientRecvFunc) {if me.isNotClosed() {me.recvHandler = handler}
}


func (me *tChatClient) CloseHandler(handler ClientCloseFunc) {if me.isNotClosed() {me.closeHandler = handler}
}


func (me *tChatClient) Close() {if me.isNotClosed() {me.closeConn()
    }
}

func (me *tChatClient) beginWrite() {writer := io.Writer(me.conn)
    for {
        select {
        case <- me.closeChan:
            _ = me.conn.Close()
            me.closeFlag = 2
            me.postConnClosed()
            return

        case msg := <- me.sendChan:
            atomic.AddInt32(&me.pendingSend, -1)
            _,e := writer.Write([]byte(msg.Encode()))
            if e != nil {me.closeConn()
                break
            } else {me.sendLogs = append(me.sendLogs, msg)
            }

        case <- time.After(time.Duration(10) * time.Second):
            me.postRecvTimeout()
            break
        }
    }
}

func (me *tChatClient) postRecvTimeout() {fmt.Printf("tChatClient.postRecvTimeout, %v, serverFlag=%v\n", me.name, me.serverFlag)
    me.closeConn()}

func (me *tChatClient) beginRead() {reader := bufio.NewReader(me.conn)
    for {line, err := reader.ReadString('\n')
        if err != nil {me.closeConn()
            break
        }

        ok, msg := MsgDecoder.Decode(line)
        if ok {
            fn := me.recvHandler
            if fn != nil {fn(me, msg)
            }

            me.recvLogs = append(me.recvLogs, msg)
        }
    }
}

func (me *tChatClient) closeConn() {if !atomic.CompareAndSwapInt32(&me.closeFlag, 0, 1) {return}
    me.closeChan <- true
}

func (me *tChatClient) postConnClosed() {fmt.Printf("tChatClient.postConnClosed, %v, serverFlag=%v\n", me.name, me.serverFlag)

    handler := me.closeHandler
    if handler != nil {handler(me)
    }

    me.closeHandler = nil
    me.recvHandler = nil
}

IChatServer.go

定义聊天服务器接口, 为不便测试, 提供日志采集办法

package chat_server

type IChatServer interface {Open(port int) error
    Broadcast(msg IMsg)
    Close()
    GetLogs() []string
}

tChatServer.go

实现聊天服务器 IChatServer

package chat_server

import (
    "errors"
    "fmt"
    "net"
    "sync"
    "sync/atomic"
)

type tChatServer struct {
    openFlag int32
    closeFlag int32

    clients []IChatClient
    clientCount int
    clientLock *sync.RWMutex

    listener net.Listener
    recvLogs []IMsg

    logs []string}

func NewChatServer() IChatServer {
    it := &tChatServer{
        openFlag: 0,
        closeFlag: 0,

        clients: []IChatClient{},
        clientCount: 0,
        clientLock: new(sync.RWMutex),

        listener: nil,
        recvLogs: []IMsg{},
    }
    return it
}

func (me *tChatServer) Open(port int) error {if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) {return errors.New("server already opened")
    }

    listener, err := net.Listen("tcp", fmt.Sprintf(":%v", port))
    if err != nil {return err}

    me.listener = listener
    go me.beginListening()
    return nil
}

func (me *tChatServer) logf(f string, args... interface{}) {msg := fmt.Sprintf(f, args...)
    me.logs = append(me.logs, msg)
    fmt.Println(msg)
}

func (me *tChatServer) GetLogs() []string {return me.logs}

func (me *tChatServer) isClosed() bool {return me.closeFlag != 0}

func (me *tChatServer) isNotClosed() bool {return !me.isClosed()
}

func (me *tChatServer) beginListening() {for !me.isClosed() {conn, err := me.listener.Accept()
        if err != nil {me.Close()
            break
        }

        me.handleIncomingConn(conn)
    }
}


func (me *tChatServer) Close() {if !atomic.CompareAndSwapInt32(&me.closeFlag, 0, 1) {return}

    _ = me.listener.Close()
    me.closeAllClients()}

func (me *tChatServer) closeAllClients() {me.clientLock.Lock()
    defer me.clientLock.Unlock()

    for i,it := range me.clients {
        if it != nil {it.Close()
            me.clients[i] = nil
        }
    }
    me.clientCount = 0
}


func (me *tChatServer) handleIncomingConn(conn net.Conn) {
    // init client
    client := openChatClient(conn, true)
    client.RecvHandler(me.handleClientMsg)
    client.CloseHandler(me.handleClientClosed)

    // lock me.clients
    me.clientLock.Lock()
    defer me.clientLock.Unlock()

    // append to me.clients
    if len(me.clients) > me.clientCount {me.clients[me.clientCount] = client
    } else {me.clients = append(me.clients, client)
    }
    me.clientCount++

    me.logf("tChatServer.handleIncomingConn, clientCount=%v", me.clientCount)
}

func (me *tChatServer) handleClientMsg(client IChatClient, msg IMsg) {me.recvLogs = append(me.recvLogs, msg)

    if nameMsg,ok := msg.(*NameMsg);ok {client.SetName(nameMsg.Name)

    } else if _, ok := msg.(*ChatMsg);ok {me.Broadcast(msg)
    }
}

func (me *tChatServer) handleClientClosed(client IChatClient) {me.logf("tChatServer.handleClientClosed, %s", client.GetName())

    me.clientLock.Lock()
    defer me.clientLock.Unlock()

    if me.clientCount <= 0 {return}

    lastI := me.clientCount - 1
    for i,it := range me.clients {
        if it == client {
            if i == lastI {me.clients[i] = nil
            } else {me.clients[i], me.clients[lastI] = me.clients[lastI], nil
            }
            me.clientCount--
            break
        }
    }

    me.logf("tChatServer.handleClientClosed, %s, clientCount=%v", client.GetName(), me.clientCount)
}

func (me *tChatServer) Broadcast(msg IMsg) {me.clientLock.RLock()
    defer me.clientLock.RUnlock()

    for _,it := range me.clients {
        if it != nil {it.Send(msg)
        }
    }
}

(未完待续)

正文完
 0