手撸golang GO与微服务 grpc

缘起

最近浏览 [Go微服务实战] (刘金亮, 2021.1)
本系列笔记拟采纳golang练习之
gitee: https://gitee.com/ioly/learning.gooop

GRPC

gRPC是跨平台、跨语言并且效率十分高的RPC形式。gRPC默认应用protobuf。能够用proto files创立gRPC服务,用protobuf音讯类型来定义方法参数和返回类型。倡议在gRPC里应用proto3,因为这样能够应用gRPC反对的全副语言,并且能防止proto2客户端与proto3服务端交互时呈现兼容性问题。在实战我的项目中应用gRPC时,肯定要留神服务器的防火墙必须反对HTTP2.0,因为gRPC是基于HTTP2.0设计的。

指标

  • 测试验证gRPC的四种通信模式:

    • 申请-应答模式
    • 客户端推流模式
    • 客户端拉流模式
    • 双向流模式

设计

  • hello.proto: 定义gRPC通信协定
  • HelloServer.go: gRPC服务端的实现
  • pb/hello.pb.go: protoc生成的代码,源码略
  • logger/logger.go: 收集运行日志以便诊断, 源码略

单元测试

grpc_test.go,顺次在四种gRPC通信模式下发送/接管1000条音讯,并对所有消息日志进行校验

package grpcimport (    "context"    "fmt"    "google.golang.org/grpc"    "io"    g "learning/gooop/grpc"    "learning/gooop/grpc/logger"    "learning/gooop/grpc/pb"    "strconv"    "sync"    "testing"    "time")func Test_HelloServer(t *testing.T) {    fnAssertTrue := func(b bool, msg string) {        if !b {            t.Fatal(msg)        }    }    logger.Verbose(false)    serverPort := 3333    serverAddress := fmt.Sprintf("127.0.0.1:%d", serverPort)    iTotalMsgCount := 1000    // start server    srv := new(g.HelloServer)    err := srv.BeginServeTCP(serverPort)    if err != nil {        t.Fatal(err)    }    time.Sleep(100 * time.Millisecond)    // connect to grpc server    conn, err := grpc.Dial(serverAddress, grpc.WithInsecure())    if err != nil {        t.Fatal(err)    }    defer conn.Close()    // create grpc client    client := pb.NewHelloServerClient(conn)    // test SimpleRequest    ctx := context.Background()    for i := 0; i < iTotalMsgCount; i++ {        msg := &pb.HelloMessage{            Msg: fmt.Sprintf("SimpleRequest %d", i),        }        reply, err := client.SimpleRequest(ctx, msg)        if err != nil {            t.Fatal(err)        }        fnAssertTrue(reply.Msg == "reply "+msg.Msg, "invalid SimpleRequest response")    }    t.Log("passed SimpleRequest")    // test ClientStream    clientStream, err := client.ClientStream(ctx)    if err != nil {        t.Fatal(err)    }    for i := 0; i < iTotalMsgCount; i++ {        msg := &pb.HelloMessage{            Msg: fmt.Sprintf("ClientStream %08d", i),        }        err = clientStream.Send(msg)        if err != nil {            t.Fatal(err)        }    }    reply, err := clientStream.CloseAndRecv()    if err != nil {        t.Fatal(err)    }    fnAssertTrue(reply.Msg == "reply ClientStream", "invalid ClientStream response")    // logger.Logf("HelloServer.ClientStream, recv %s", msg.String())    for i := 0; i < iTotalMsgCount; i++ {        log := fmt.Sprintf("HelloServer.ClientStream, recv ClientStream %08d", i)        fnAssertTrue(logger.Count(log) == 1, "expecting log "+log)    }    t.Log("passed ClientStream")    // test ServerStream    serverStream, err := client.ServerStream(ctx, &pb.HelloMessage{Msg: strconv.Itoa(iTotalMsgCount)})    if err != nil {        t.Fatal(err)    }    for {        msg, err := serverStream.Recv()        if err == io.EOF {            break        }        if err != nil {            t.Fatal(err)        }        logger.Logf("ServerStream.Recv %s", msg.Msg)    }    for i := 0; i < iTotalMsgCount; i++ {        log := fmt.Sprintf("ServerStream.Recv ServerStream-%08d", i)        fnAssertTrue(logger.Count(log) == 1, "expecting log "+log)    }    t.Log("passed ServerStream")    // test DualStream    dualStream, err := client.DualStream(ctx)    var wg sync.WaitGroup    wg.Add(1)    go func() {        defer wg.Done()        for i := 0; i < iTotalMsgCount; i++ {            msg := &pb.HelloMessage{                Msg: fmt.Sprintf("DualStream.Send %08d", i),            }            err := dualStream.Send(msg)            if err != nil {                t.Fatal(err)            }        }        err = dualStream.CloseSend()        if err != nil {            t.Fatal(err)        }    }()    wg.Add(1)    go func() {        defer wg.Done()        for {            msg, err := dualStream.Recv()            if err == io.EOF {                break            }            if err != nil {                t.Fatal(err)            }            logger.Logf("DualStream.Recv %s", msg.Msg)        }    }()    wg.Wait()    for i := 0; i < iTotalMsgCount; i++ {        // Msg: "reply " + msg.Msg,        // logger.Logf("DualStream.Recv %s", msg.Msg)        log := fmt.Sprintf("DualStream.Recv reply DualStream.Send %08d", i)        fnAssertTrue(logger.Count(log) == 1, "expecting log "+log)    }    t.Log("passed DualStream")}

测试输入

$ go test -v grpc_test.go === RUN   Test_HelloServer    grpc_test.go:60: passed SimpleRequest    grpc_test.go:87: passed ClientStream    grpc_test.go:110: passed ServerStream    grpc_test.go:159: passed DualStream--- PASS: Test_HelloServer (0.79s)PASSok      command-line-arguments  0.791s

hello.proto

定义四种通信模式的rpc接口

syntax = "proto3";package pb;option go_package="./pb";service HelloServer {  rpc SimpleRequest(HelloMessage) returns (HelloMessage);  rpc ClientStream(stream HelloMessage) returns (HelloMessage);  rpc ServerStream(HelloMessage) returns (stream HelloMessage);  rpc DualStream(stream HelloMessage) returns (stream HelloMessage);}message HelloMessage {  string msg = 1;}

HelloServer.go

gRPC服务端的实现

package grpcimport (    "context"    "fmt"    "google.golang.org/grpc"    "io"    "learning/gooop/grpc/logger"    "learning/gooop/grpc/pb"    "net"    "strconv")type HelloServer intfunc (me *HelloServer) SimpleRequest(ctx context.Context, msg *pb.HelloMessage) (*pb.HelloMessage, error) {    //logger.Logf("HelloServer.SimpleRequest, %s", msg.Msg)    msg.Msg = "reply " + msg.Msg    return msg, nil}func (me *HelloServer) ClientStream(stream pb.HelloServer_ClientStreamServer) error {    for {        msg, err := stream.Recv()        if err == io.EOF {            logger.Logf("HelloServer.ClientStream, EOF")            break        }        if err != nil {            logger.Logf("HelloServer.ClientStream, err=%v", err)            return err        }        logger.Logf("HelloServer.ClientStream, recv %s", msg.Msg)    }    err := stream.SendAndClose(&pb.HelloMessage{        Msg: "reply ClientStream",    })    if err != nil {        logger.Logf("HelloServer.ClientStream, SendAndClose err=%v", err)    }    return nil}func (me *HelloServer) ServerStream(msg *pb.HelloMessage, stream pb.HelloServer_ServerStreamServer) error {    iMsgCount, err := strconv.Atoi(msg.Msg)    if err != nil {        return err    }    for i := 0; i < iMsgCount; i++ {        msg := &pb.HelloMessage{            Msg: fmt.Sprintf("ServerStream-%08d", i),        }        err := stream.Send(msg)        if err != nil {            return err        }    }    return nil}func (me *HelloServer) DualStream(stream pb.HelloServer_DualStreamServer) error {    for {        msg, err := stream.Recv()        if err == io.EOF {            return nil        }        if err != nil {            logger.Logf("HelloServer.DualStream, recv err=%v", err)            return err        }        logger.Logf("HelloServer.DualStream, recv msg=%v", msg.Msg)        ret := &pb.HelloMessage{            Msg: "reply " + msg.Msg,        }        err = stream.Send(ret)        if err != nil {            logger.Logf("HelloServer.DualStream, send err=%v", err)            return err        }    }}func (me *HelloServer) BeginServeTCP(port int) error {    listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port))    if err != nil {        return err    }    server := grpc.NewServer()    pb.RegisterHelloServerServer(server, me)    go func() {        panic(server.Serve(listener))    }()    return nil}

(end)