共计 5557 个字符,预计需要花费 14 分钟才能阅读完成。
手撸 golang GO 与微服务 grpc
缘起
最近浏览 [Go 微服务实战] (刘金亮, 2021.1)
本系列笔记拟采纳 golang 练习之
gitee: https://gitee.com/ioly/learning.gooop
GRPC
gRPC 是跨平台、跨语言并且效率十分高的 RPC 形式。gRPC 默认应用 protobuf。能够用 proto files 创立 gRPC 服务,用 protobuf 音讯类型来定义方法参数和返回类型。倡议在 gRPC 里应用 proto3,因为这样能够应用 gRPC 反对的全副语言,并且能防止 proto2 客户端与 proto3 服务端交互时呈现兼容性问题。在实战我的项目中应用 gRPC 时,肯定要留神服务器的防火墙必须反对 HTTP2.0,因为 gRPC 是基于 HTTP2.0 设计的。
指标
-
测试验证 gRPC 的四种通信模式:
- 申请 - 应答模式
- 客户端推流模式
- 客户端拉流模式
- 双向流模式
设计
- hello.proto: 定义 gRPC 通信协定
- HelloServer.go: gRPC 服务端的实现
- pb/hello.pb.go: protoc 生成的代码,源码略
- logger/logger.go: 收集运行日志以便诊断,源码略
单元测试
grpc_test.go,顺次在四种 gRPC 通信模式下发送 / 接管 1000 条音讯,并对所有消息日志进行校验
package grpc
import (
"context"
"fmt"
"google.golang.org/grpc"
"io"
g "learning/gooop/grpc"
"learning/gooop/grpc/logger"
"learning/gooop/grpc/pb"
"strconv"
"sync"
"testing"
"time"
)
func Test_HelloServer(t *testing.T) {fnAssertTrue := func(b bool, msg string) {
if !b {t.Fatal(msg)
}
}
logger.Verbose(false)
serverPort := 3333
serverAddress := fmt.Sprintf("127.0.0.1:%d", serverPort)
iTotalMsgCount := 1000
// start server
srv := new(g.HelloServer)
err := srv.BeginServeTCP(serverPort)
if err != nil {t.Fatal(err)
}
time.Sleep(100 * time.Millisecond)
// connect to grpc server
conn, err := grpc.Dial(serverAddress, grpc.WithInsecure())
if err != nil {t.Fatal(err)
}
defer conn.Close()
// create grpc client
client := pb.NewHelloServerClient(conn)
// test SimpleRequest
ctx := context.Background()
for i := 0; i < iTotalMsgCount; i++ {
msg := &pb.HelloMessage{Msg: fmt.Sprintf("SimpleRequest %d", i),
}
reply, err := client.SimpleRequest(ctx, msg)
if err != nil {t.Fatal(err)
}
fnAssertTrue(reply.Msg == "reply"+msg.Msg, "invalid SimpleRequest response")
}
t.Log("passed SimpleRequest")
// test ClientStream
clientStream, err := client.ClientStream(ctx)
if err != nil {t.Fatal(err)
}
for i := 0; i < iTotalMsgCount; i++ {
msg := &pb.HelloMessage{Msg: fmt.Sprintf("ClientStream %08d", i),
}
err = clientStream.Send(msg)
if err != nil {t.Fatal(err)
}
}
reply, err := clientStream.CloseAndRecv()
if err != nil {t.Fatal(err)
}
fnAssertTrue(reply.Msg == "reply ClientStream", "invalid ClientStream response")
// logger.Logf("HelloServer.ClientStream, recv %s", msg.String())
for i := 0; i < iTotalMsgCount; i++ {log := fmt.Sprintf("HelloServer.ClientStream, recv ClientStream %08d", i)
fnAssertTrue(logger.Count(log) == 1, "expecting log"+log)
}
t.Log("passed ClientStream")
// test ServerStream
serverStream, err := client.ServerStream(ctx, &pb.HelloMessage{Msg: strconv.Itoa(iTotalMsgCount)})
if err != nil {t.Fatal(err)
}
for {msg, err := serverStream.Recv()
if err == io.EOF {break}
if err != nil {t.Fatal(err)
}
logger.Logf("ServerStream.Recv %s", msg.Msg)
}
for i := 0; i < iTotalMsgCount; i++ {log := fmt.Sprintf("ServerStream.Recv ServerStream-%08d", i)
fnAssertTrue(logger.Count(log) == 1, "expecting log"+log)
}
t.Log("passed ServerStream")
// test DualStream
dualStream, err := client.DualStream(ctx)
var wg sync.WaitGroup
wg.Add(1)
go func() {defer wg.Done()
for i := 0; i < iTotalMsgCount; i++ {
msg := &pb.HelloMessage{Msg: fmt.Sprintf("DualStream.Send %08d", i),
}
err := dualStream.Send(msg)
if err != nil {t.Fatal(err)
}
}
err = dualStream.CloseSend()
if err != nil {t.Fatal(err)
}
}()
wg.Add(1)
go func() {defer wg.Done()
for {msg, err := dualStream.Recv()
if err == io.EOF {break}
if err != nil {t.Fatal(err)
}
logger.Logf("DualStream.Recv %s", msg.Msg)
}
}()
wg.Wait()
for i := 0; i < iTotalMsgCount; i++ {
// Msg: "reply" + msg.Msg,
// logger.Logf("DualStream.Recv %s", msg.Msg)
log := fmt.Sprintf("DualStream.Recv reply DualStream.Send %08d", i)
fnAssertTrue(logger.Count(log) == 1, "expecting log"+log)
}
t.Log("passed DualStream")
}
测试输入
$ go test -v grpc_test.go
=== RUN Test_HelloServer
grpc_test.go:60: passed SimpleRequest
grpc_test.go:87: passed ClientStream
grpc_test.go:110: passed ServerStream
grpc_test.go:159: passed DualStream
--- PASS: Test_HelloServer (0.79s)
PASS
ok command-line-arguments 0.791s
hello.proto
定义四种通信模式的 rpc 接口
syntax = "proto3";
package pb;
option go_package="./pb";
service HelloServer {rpc SimpleRequest(HelloMessage) returns (HelloMessage);
rpc ClientStream(stream HelloMessage) returns (HelloMessage);
rpc ServerStream(HelloMessage) returns (stream HelloMessage);
rpc DualStream(stream HelloMessage) returns (stream HelloMessage);
}
message HelloMessage {string msg = 1;}
HelloServer.go
gRPC 服务端的实现
package grpc
import (
"context"
"fmt"
"google.golang.org/grpc"
"io"
"learning/gooop/grpc/logger"
"learning/gooop/grpc/pb"
"net"
"strconv"
)
type HelloServer int
func (me *HelloServer) SimpleRequest(ctx context.Context, msg *pb.HelloMessage) (*pb.HelloMessage, error) {//logger.Logf("HelloServer.SimpleRequest, %s", msg.Msg)
msg.Msg = "reply" + msg.Msg
return msg, nil
}
func (me *HelloServer) ClientStream(stream pb.HelloServer_ClientStreamServer) error {
for {msg, err := stream.Recv()
if err == io.EOF {logger.Logf("HelloServer.ClientStream, EOF")
break
}
if err != nil {logger.Logf("HelloServer.ClientStream, err=%v", err)
return err
}
logger.Logf("HelloServer.ClientStream, recv %s", msg.Msg)
}
err := stream.SendAndClose(&pb.HelloMessage{Msg: "reply ClientStream",})
if err != nil {logger.Logf("HelloServer.ClientStream, SendAndClose err=%v", err)
}
return nil
}
func (me *HelloServer) ServerStream(msg *pb.HelloMessage, stream pb.HelloServer_ServerStreamServer) error {iMsgCount, err := strconv.Atoi(msg.Msg)
if err != nil {return err}
for i := 0; i < iMsgCount; i++ {
msg := &pb.HelloMessage{Msg: fmt.Sprintf("ServerStream-%08d", i),
}
err := stream.Send(msg)
if err != nil {return err}
}
return nil
}
func (me *HelloServer) DualStream(stream pb.HelloServer_DualStreamServer) error {
for {msg, err := stream.Recv()
if err == io.EOF {return nil}
if err != nil {logger.Logf("HelloServer.DualStream, recv err=%v", err)
return err
}
logger.Logf("HelloServer.DualStream, recv msg=%v", msg.Msg)
ret := &pb.HelloMessage{Msg: "reply" + msg.Msg,}
err = stream.Send(ret)
if err != nil {logger.Logf("HelloServer.DualStream, send err=%v", err)
return err
}
}
}
func (me *HelloServer) BeginServeTCP(port int) error {listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port))
if err != nil {return err}
server := grpc.NewServer()
pb.RegisterHelloServerServer(server, me)
go func() {panic(server.Serve(listener))
}()
return nil
}
(end)
正文完