缘起
最近浏览 <<Go 微服务实战 >> (刘金亮, 2021.1)
本系列笔记拟采纳 golang 练习之
案例需要 (聊天服务器)
- 用户能够连贯到服务器。
- 用户能够设定本人的用户名。
- 用户能够向服务器发送音讯,同时服务器也会向其余用户播送该音讯。
指标
- 实现聊天服务端, 反对端口监听, 多个客户端的连入, 音讯收发, 播送, 断开, 并采集日志
- 革新已有的聊天客户端, 使之能同时适配客户端和服务端的通信, 并设置写缓冲以避免死锁
- 测试多个客户端的连入, 收发和断开, 并诊断服务端日志
设计
- IMsg: 定义音讯接口, 以及相干音讯的实现. 为不便任意音讯内容的解码, 音讯传输时, 采纳 base64 转码
- IMsgDecoder: 定义音讯解码器及其实现
- IChatClient: 定义聊天客户端接口. 本次增加敞开告诉办法, 以适配服务端.
- tChatClient: 聊天客户端, 实现 IChatClient 接口. 本次增加敞开告诉, 写缓冲和读超时管制, 修复写循环细节问题.
- IChatServer: 定义聊天服务器接口, 为不便测试, 提供日志采集办法
- tChatServer: 实现聊天服务器 IChatServer
单元测试
ChatServer_test.go
package chat_server
import (
"fmt"
cs "learning/gooop/chat_server"
"strings"
"testing"
"time"
)
func Test_ChatServer(t *testing.T) {fnAssertTrue := func(b bool, msg string) {
if !b {t.Fatal(msg)
}
}
port := 3333
server := cs.NewChatServer()
err := server.Open(port)
if err != nil {t.Fatal(err)
}
clientCount := 3
address := fmt.Sprintf("localhost:%v", port)
for i := 0;i < clientCount;i++ {err, client := cs.DialChatClient(address)
if err != nil {t.Fatal(err)
}
id := fmt.Sprintf("c%02d", i)
client.RecvHandler(func(client cs.IChatClient, msg cs.IMsg) {t.Logf("%v recv: %v\n", id, msg)
})
go func() {client.SetName(id)
client.Send(&cs.NameMsg{id})
n := 0
for range time.Tick(time.Duration(1) * time.Second) {client.Send(&cs.ChatMsg{id, fmt.Sprintf("msg %02d from %v", n, id) })
n++
if n >= 3 {break}
}
client.Close()}()}
passedSeconds := 0
for range time.Tick(time.Second) {
passedSeconds++
t.Logf("%v seconds passed", passedSeconds)
if passedSeconds >= 5 {break}
}
server.Close()
logs := server.GetLogs()
fnHasLog := func(log string) bool {
for _,it := range logs {if strings.Contains(it, log) {return true}
}
return false
}
for i := 0;i < clientCount;i++ {msg := fmt.Sprintf("tChatServer.handleIncomingConn, clientCount=%v", i + 1)
fnAssertTrue(fnHasLog(msg), "expecting log:" + msg)
msg = fmt.Sprintf("tChatServer.handleClientClosed, c%02d", i)
fnAssertTrue(fnHasLog(msg), "expecting log:" + msg)
}
}
测试输入
$ go test -v ChatServer_test.go
=== RUN Test_ChatServer
tChatServer.handleIncomingConn, clientCount=1
tChatServer.handleIncomingConn, clientCount=2
tChatServer.handleIncomingConn, clientCount=3
ChatServer_test.go:59: 1 seconds passed
ChatServer_test.go:35: c00 recv: &{c00 msg 00 from c00}
ChatServer_test.go:35: c02 recv: &{c00 msg 00 from c00}
ChatServer_test.go:35: c02 recv: &{c01 msg 00 from c01}
ChatServer_test.go:35: c01 recv: &{c00 msg 00 from c00}
ChatServer_test.go:35: c01 recv: &{c01 msg 00 from c01}
ChatServer_test.go:35: c01 recv: &{c02 msg 00 from c02}
ChatServer_test.go:35: c00 recv: &{c01 msg 00 from c01}
ChatServer_test.go:35: c00 recv: &{c02 msg 00 from c02}
ChatServer_test.go:35: c02 recv: &{c02 msg 00 from c02}
ChatServer_test.go:35: c00 recv: &{c01 msg 01 from c01}
ChatServer_test.go:35: c01 recv: &{c00 msg 01 from c00}
ChatServer_test.go:35: c01 recv: &{c02 msg 01 from c02}
ChatServer_test.go:35: c02 recv: &{c01 msg 01 from c01}
ChatServer_test.go:35: c02 recv: &{c00 msg 01 from c00}
ChatServer_test.go:59: 2 seconds passed
ChatServer_test.go:35: c00 recv: &{c00 msg 01 from c00}
ChatServer_test.go:35: c02 recv: &{c02 msg 01 from c02}
ChatServer_test.go:35: c00 recv: &{c02 msg 01 from c02}
tChatClient.postConnClosed, c00, serverFlag=false
tChatClient.postConnClosed, c02, serverFlag=false
tChatClient.postConnClosed, c01, serverFlag=false
tChatClient.postConnClosed, c02, serverFlag=true
tChatServer.handleClientClosed, c02
tChatServer.handleClientClosed, c02, clientCount=2
tChatClient.postConnClosed, c01, serverFlag=true
tChatServer.handleClientClosed, c01
tChatServer.handleClientClosed, c01, clientCount=1
ChatServer_test.go:59: 3 seconds passed
tChatClient.postConnClosed, c00, serverFlag=true
tChatServer.handleClientClosed, c00
tChatServer.handleClientClosed, c00, clientCount=0
ChatServer_test.go:59: 4 seconds passed
ChatServer_test.go:59: 5 seconds passed
--- PASS: Test_ChatServer (5.00s)
PASS
ok command-line-arguments 5.003s
IMsg.go
定义音讯接口, 以及相干音讯的实现. 为不便任意音讯内容的解码, 音讯传输时, 采纳 base64 转码
package chat_server
import (
"encoding/base64"
"fmt"
)
type IMsg interface {Encode() string
}
type NameMsg struct {Name string}
func (me *NameMsg) Encode() string {return fmt.Sprintf("NAME %s\n", base64.StdEncoding.EncodeToString([]byte(me.Name)))
}
type ChatMsg struct {
Name string
Words string
}
func (me *ChatMsg) Encode() string {
return fmt.Sprintf("CHAT %s %s\n",
base64.StdEncoding.EncodeToString([]byte(me.Name)),
base64.StdEncoding.EncodeToString([]byte(me.Words)),
)
}
IMsgDecoder.go
定义音讯解码器及其实现
package chat_server
import (
"encoding/base64"
"strings"
)
type IMsgDecoder interface {Decode(line string) (bool, IMsg)
}
type tMsgDecoder struct {
}
func (me *tMsgDecoder) Decode(line string) (bool, IMsg) {items := strings.Split(line, " ")
size := len(items)
if items[0] == "NAME" && size == 2 {name, err := base64.StdEncoding.DecodeString(items[1])
if err != nil {return false, nil}
return true, &NameMsg{Name: string(name),
}
}
if items[0] == "CHAT" && size == 3 {name, err := base64.StdEncoding.DecodeString(items[1])
if err != nil {return false, nil}
words, err := base64.StdEncoding.DecodeString(items[2])
if err != nil {return false, nil}
return true, &ChatMsg{Name: string(name),
Words: string(words),
}
}
return false, nil
}
var MsgDecoder = &tMsgDecoder{}
IChatClient.go
定义聊天客户端接口. 本次增加敞开告诉办法, 以适配服务端.
package chat_server
type IChatClient interface {GetName() string
SetName(name string)
Send(msg IMsg)
RecvHandler(handler ClientRecvFunc)
CloseHandler(handler ClientCloseFunc)
Close()}
type ClientRecvFunc func(client IChatClient, msg IMsg)
type ClientCloseFunc func(client IChatClient)
tChatClient.go
聊天客户端, 实现 IChatClient 接口. 本次增加敞开告诉, 写缓冲和读超时管制, 修复写循环细节问题.
package chat_server
import (
"bufio"
"fmt"
"io"
"net"
"sync/atomic"
"time"
)
type tChatClient struct {
conn net.Conn
name string
openFlag int32
closeFlag int32
serverFlag bool
closeChan chan bool
sendChan chan IMsg
sendLogs []IMsg
dropLogs []IMsg
recvLogs []IMsg
pendingSend int32
recvHandler ClientRecvFunc
closeHandler ClientCloseFunc
}
var gMaxPendingSend int32 = 100
func DialChatClient(address string) (error, IChatClient) {conn, err := net.Dial("tcp", address)
if err != nil {return err, nil}
return nil, openChatClient(conn, false)
}
func openChatClient(conn net.Conn, serverFlag bool) IChatClient {
it := &tChatClient{
conn: conn,
openFlag: 0,
closeFlag: 0,
serverFlag: serverFlag,
closeChan: make(chan bool),
sendChan: make(chan IMsg, gMaxPendingSend),
name: "anonymous",
sendLogs: []IMsg{},
dropLogs: []IMsg{},
recvLogs: []IMsg{},
}
it.open()
return it
}
func (me *tChatClient) GetName() string {return me.name}
func (me *tChatClient) SetName(name string) {me.name = name}
func (me *tChatClient) open(){if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) {return}
go me.beginWrite()
go me.beginRead()}
func (me *tChatClient) isClosed() bool {return me.closeFlag != 0}
func (me *tChatClient) isNotClosed() bool {return !me.isClosed()
}
func (me *tChatClient) Send(msg IMsg) {if me.isClosed() {return}
if me.pendingSend < gMaxPendingSend {atomic.AddInt32(&me.pendingSend, 1)
me.sendChan <- msg
} else {me.dropLogs = append(me.dropLogs, msg)
}
}
func (me *tChatClient) RecvHandler(handler ClientRecvFunc) {if me.isNotClosed() {me.recvHandler = handler}
}
func (me *tChatClient) CloseHandler(handler ClientCloseFunc) {if me.isNotClosed() {me.closeHandler = handler}
}
func (me *tChatClient) Close() {if me.isNotClosed() {me.closeConn()
}
}
func (me *tChatClient) beginWrite() {writer := io.Writer(me.conn)
for {
select {
case <- me.closeChan:
_ = me.conn.Close()
me.closeFlag = 2
me.postConnClosed()
return
case msg := <- me.sendChan:
atomic.AddInt32(&me.pendingSend, -1)
_,e := writer.Write([]byte(msg.Encode()))
if e != nil {me.closeConn()
break
} else {me.sendLogs = append(me.sendLogs, msg)
}
case <- time.After(time.Duration(10) * time.Second):
me.postRecvTimeout()
break
}
}
}
func (me *tChatClient) postRecvTimeout() {fmt.Printf("tChatClient.postRecvTimeout, %v, serverFlag=%v\n", me.name, me.serverFlag)
me.closeConn()}
func (me *tChatClient) beginRead() {reader := bufio.NewReader(me.conn)
for {line, err := reader.ReadString('\n')
if err != nil {me.closeConn()
break
}
ok, msg := MsgDecoder.Decode(line)
if ok {
fn := me.recvHandler
if fn != nil {fn(me, msg)
}
me.recvLogs = append(me.recvLogs, msg)
}
}
}
func (me *tChatClient) closeConn() {if !atomic.CompareAndSwapInt32(&me.closeFlag, 0, 1) {return}
me.closeChan <- true
}
func (me *tChatClient) postConnClosed() {fmt.Printf("tChatClient.postConnClosed, %v, serverFlag=%v\n", me.name, me.serverFlag)
handler := me.closeHandler
if handler != nil {handler(me)
}
me.closeHandler = nil
me.recvHandler = nil
}
IChatServer.go
定义聊天服务器接口, 为不便测试, 提供日志采集办法
package chat_server
type IChatServer interface {Open(port int) error
Broadcast(msg IMsg)
Close()
GetLogs() []string
}
tChatServer.go
实现聊天服务器 IChatServer
package chat_server
import (
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
)
type tChatServer struct {
openFlag int32
closeFlag int32
clients []IChatClient
clientCount int
clientLock *sync.RWMutex
listener net.Listener
recvLogs []IMsg
logs []string}
func NewChatServer() IChatServer {
it := &tChatServer{
openFlag: 0,
closeFlag: 0,
clients: []IChatClient{},
clientCount: 0,
clientLock: new(sync.RWMutex),
listener: nil,
recvLogs: []IMsg{},
}
return it
}
func (me *tChatServer) Open(port int) error {if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) {return errors.New("server already opened")
}
listener, err := net.Listen("tcp", fmt.Sprintf(":%v", port))
if err != nil {return err}
me.listener = listener
go me.beginListening()
return nil
}
func (me *tChatServer) logf(f string, args... interface{}) {msg := fmt.Sprintf(f, args...)
me.logs = append(me.logs, msg)
fmt.Println(msg)
}
func (me *tChatServer) GetLogs() []string {return me.logs}
func (me *tChatServer) isClosed() bool {return me.closeFlag != 0}
func (me *tChatServer) isNotClosed() bool {return !me.isClosed()
}
func (me *tChatServer) beginListening() {for !me.isClosed() {conn, err := me.listener.Accept()
if err != nil {me.Close()
break
}
me.handleIncomingConn(conn)
}
}
func (me *tChatServer) Close() {if !atomic.CompareAndSwapInt32(&me.closeFlag, 0, 1) {return}
_ = me.listener.Close()
me.closeAllClients()}
func (me *tChatServer) closeAllClients() {me.clientLock.Lock()
defer me.clientLock.Unlock()
for i,it := range me.clients {
if it != nil {it.Close()
me.clients[i] = nil
}
}
me.clientCount = 0
}
func (me *tChatServer) handleIncomingConn(conn net.Conn) {
// init client
client := openChatClient(conn, true)
client.RecvHandler(me.handleClientMsg)
client.CloseHandler(me.handleClientClosed)
// lock me.clients
me.clientLock.Lock()
defer me.clientLock.Unlock()
// append to me.clients
if len(me.clients) > me.clientCount {me.clients[me.clientCount] = client
} else {me.clients = append(me.clients, client)
}
me.clientCount++
me.logf("tChatServer.handleIncomingConn, clientCount=%v", me.clientCount)
}
func (me *tChatServer) handleClientMsg(client IChatClient, msg IMsg) {me.recvLogs = append(me.recvLogs, msg)
if nameMsg,ok := msg.(*NameMsg);ok {client.SetName(nameMsg.Name)
} else if _, ok := msg.(*ChatMsg);ok {me.Broadcast(msg)
}
}
func (me *tChatServer) handleClientClosed(client IChatClient) {me.logf("tChatServer.handleClientClosed, %s", client.GetName())
me.clientLock.Lock()
defer me.clientLock.Unlock()
if me.clientCount <= 0 {return}
lastI := me.clientCount - 1
for i,it := range me.clients {
if it == client {
if i == lastI {me.clients[i] = nil
} else {me.clients[i], me.clients[lastI] = me.clients[lastI], nil
}
me.clientCount--
break
}
}
me.logf("tChatServer.handleClientClosed, %s, clientCount=%v", client.GetName(), me.clientCount)
}
func (me *tChatServer) Broadcast(msg IMsg) {me.clientLock.RLock()
defer me.clientLock.RUnlock()
for _,it := range me.clients {
if it != nil {it.Send(msg)
}
}
}
(未完待续)