手撸golang GO与微服务 grpc
缘起
最近浏览 [Go微服务实战] (刘金亮, 2021.1)
本系列笔记拟采纳golang练习之
gitee: https://gitee.com/ioly/learning.gooop
GRPC
gRPC是跨平台、跨语言并且效率十分高的RPC形式。gRPC默认应用protobuf。能够用proto files创立gRPC服务,用protobuf音讯类型来定义方法参数和返回类型。倡议在gRPC里应用proto3,因为这样能够应用gRPC反对的全副语言,并且能防止proto2客户端与proto3服务端交互时呈现兼容性问题。在实战我的项目中应用gRPC时,肯定要留神服务器的防火墙必须反对HTTP2.0,因为gRPC是基于HTTP2.0设计的。
指标
测试验证gRPC的四种通信模式:
- 申请-应答模式
- 客户端推流模式
- 客户端拉流模式
- 双向流模式
设计
- hello.proto: 定义gRPC通信协定
- HelloServer.go: gRPC服务端的实现
- pb/hello.pb.go: protoc生成的代码,源码略
- logger/logger.go: 收集运行日志以便诊断, 源码略
单元测试
grpc_test.go,顺次在四种gRPC通信模式下发送/接管1000条音讯,并对所有消息日志进行校验
package grpcimport ( "context" "fmt" "google.golang.org/grpc" "io" g "learning/gooop/grpc" "learning/gooop/grpc/logger" "learning/gooop/grpc/pb" "strconv" "sync" "testing" "time")func Test_HelloServer(t *testing.T) { fnAssertTrue := func(b bool, msg string) { if !b { t.Fatal(msg) } } logger.Verbose(false) serverPort := 3333 serverAddress := fmt.Sprintf("127.0.0.1:%d", serverPort) iTotalMsgCount := 1000 // start server srv := new(g.HelloServer) err := srv.BeginServeTCP(serverPort) if err != nil { t.Fatal(err) } time.Sleep(100 * time.Millisecond) // connect to grpc server conn, err := grpc.Dial(serverAddress, grpc.WithInsecure()) if err != nil { t.Fatal(err) } defer conn.Close() // create grpc client client := pb.NewHelloServerClient(conn) // test SimpleRequest ctx := context.Background() for i := 0; i < iTotalMsgCount; i++ { msg := &pb.HelloMessage{ Msg: fmt.Sprintf("SimpleRequest %d", i), } reply, err := client.SimpleRequest(ctx, msg) if err != nil { t.Fatal(err) } fnAssertTrue(reply.Msg == "reply "+msg.Msg, "invalid SimpleRequest response") } t.Log("passed SimpleRequest") // test ClientStream clientStream, err := client.ClientStream(ctx) if err != nil { t.Fatal(err) } for i := 0; i < iTotalMsgCount; i++ { msg := &pb.HelloMessage{ Msg: fmt.Sprintf("ClientStream %08d", i), } err = clientStream.Send(msg) if err != nil { t.Fatal(err) } } reply, err := clientStream.CloseAndRecv() if err != nil { t.Fatal(err) } fnAssertTrue(reply.Msg == "reply ClientStream", "invalid ClientStream response") // logger.Logf("HelloServer.ClientStream, recv %s", msg.String()) for i := 0; i < iTotalMsgCount; i++ { log := fmt.Sprintf("HelloServer.ClientStream, recv ClientStream %08d", i) fnAssertTrue(logger.Count(log) == 1, "expecting log "+log) } t.Log("passed ClientStream") // test ServerStream serverStream, err := client.ServerStream(ctx, &pb.HelloMessage{Msg: strconv.Itoa(iTotalMsgCount)}) if err != nil { t.Fatal(err) } for { msg, err := serverStream.Recv() if err == io.EOF { break } if err != nil { t.Fatal(err) } logger.Logf("ServerStream.Recv %s", msg.Msg) } for i := 0; i < iTotalMsgCount; i++ { log := fmt.Sprintf("ServerStream.Recv ServerStream-%08d", i) fnAssertTrue(logger.Count(log) == 1, "expecting log "+log) } t.Log("passed ServerStream") // test DualStream dualStream, err := client.DualStream(ctx) var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() for i := 0; i < iTotalMsgCount; i++ { msg := &pb.HelloMessage{ Msg: fmt.Sprintf("DualStream.Send %08d", i), } err := dualStream.Send(msg) if err != nil { t.Fatal(err) } } err = dualStream.CloseSend() if err != nil { t.Fatal(err) } }() wg.Add(1) go func() { defer wg.Done() for { msg, err := dualStream.Recv() if err == io.EOF { break } if err != nil { t.Fatal(err) } logger.Logf("DualStream.Recv %s", msg.Msg) } }() wg.Wait() for i := 0; i < iTotalMsgCount; i++ { // Msg: "reply " + msg.Msg, // logger.Logf("DualStream.Recv %s", msg.Msg) log := fmt.Sprintf("DualStream.Recv reply DualStream.Send %08d", i) fnAssertTrue(logger.Count(log) == 1, "expecting log "+log) } t.Log("passed DualStream")}
测试输入
$ go test -v grpc_test.go === RUN Test_HelloServer grpc_test.go:60: passed SimpleRequest grpc_test.go:87: passed ClientStream grpc_test.go:110: passed ServerStream grpc_test.go:159: passed DualStream--- PASS: Test_HelloServer (0.79s)PASSok command-line-arguments 0.791s
hello.proto
定义四种通信模式的rpc接口
syntax = "proto3";package pb;option go_package="./pb";service HelloServer { rpc SimpleRequest(HelloMessage) returns (HelloMessage); rpc ClientStream(stream HelloMessage) returns (HelloMessage); rpc ServerStream(HelloMessage) returns (stream HelloMessage); rpc DualStream(stream HelloMessage) returns (stream HelloMessage);}message HelloMessage { string msg = 1;}
HelloServer.go
gRPC服务端的实现
package grpcimport ( "context" "fmt" "google.golang.org/grpc" "io" "learning/gooop/grpc/logger" "learning/gooop/grpc/pb" "net" "strconv")type HelloServer intfunc (me *HelloServer) SimpleRequest(ctx context.Context, msg *pb.HelloMessage) (*pb.HelloMessage, error) { //logger.Logf("HelloServer.SimpleRequest, %s", msg.Msg) msg.Msg = "reply " + msg.Msg return msg, nil}func (me *HelloServer) ClientStream(stream pb.HelloServer_ClientStreamServer) error { for { msg, err := stream.Recv() if err == io.EOF { logger.Logf("HelloServer.ClientStream, EOF") break } if err != nil { logger.Logf("HelloServer.ClientStream, err=%v", err) return err } logger.Logf("HelloServer.ClientStream, recv %s", msg.Msg) } err := stream.SendAndClose(&pb.HelloMessage{ Msg: "reply ClientStream", }) if err != nil { logger.Logf("HelloServer.ClientStream, SendAndClose err=%v", err) } return nil}func (me *HelloServer) ServerStream(msg *pb.HelloMessage, stream pb.HelloServer_ServerStreamServer) error { iMsgCount, err := strconv.Atoi(msg.Msg) if err != nil { return err } for i := 0; i < iMsgCount; i++ { msg := &pb.HelloMessage{ Msg: fmt.Sprintf("ServerStream-%08d", i), } err := stream.Send(msg) if err != nil { return err } } return nil}func (me *HelloServer) DualStream(stream pb.HelloServer_DualStreamServer) error { for { msg, err := stream.Recv() if err == io.EOF { return nil } if err != nil { logger.Logf("HelloServer.DualStream, recv err=%v", err) return err } logger.Logf("HelloServer.DualStream, recv msg=%v", msg.Msg) ret := &pb.HelloMessage{ Msg: "reply " + msg.Msg, } err = stream.Send(ret) if err != nil { logger.Logf("HelloServer.DualStream, send err=%v", err) return err } }}func (me *HelloServer) BeginServeTCP(port int) error { listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port)) if err != nil { return err } server := grpc.NewServer() pb.RegisterHelloServerServer(server, me) go func() { panic(server.Serve(listener)) }() return nil}
(end)