关于golang:手撸golang-GO与微服务-grpc

11次阅读

共计 5557 个字符,预计需要花费 14 分钟才能阅读完成。

手撸 golang GO 与微服务 grpc

缘起

最近浏览 [Go 微服务实战] (刘金亮, 2021.1)
本系列笔记拟采纳 golang 练习之
gitee: https://gitee.com/ioly/learning.gooop

GRPC

gRPC 是跨平台、跨语言并且效率十分高的 RPC 形式。gRPC 默认应用 protobuf。能够用 proto files 创立 gRPC 服务,用 protobuf 音讯类型来定义方法参数和返回类型。倡议在 gRPC 里应用 proto3,因为这样能够应用 gRPC 反对的全副语言,并且能防止 proto2 客户端与 proto3 服务端交互时呈现兼容性问题。在实战我的项目中应用 gRPC 时,肯定要留神服务器的防火墙必须反对 HTTP2.0,因为 gRPC 是基于 HTTP2.0 设计的。

指标

  • 测试验证 gRPC 的四种通信模式:

    • 申请 - 应答模式
    • 客户端推流模式
    • 客户端拉流模式
    • 双向流模式

设计

  • hello.proto: 定义 gRPC 通信协定
  • HelloServer.go: gRPC 服务端的实现
  • pb/hello.pb.go: protoc 生成的代码,源码略
  • logger/logger.go: 收集运行日志以便诊断,源码略

单元测试

grpc_test.go,顺次在四种 gRPC 通信模式下发送 / 接管 1000 条音讯,并对所有消息日志进行校验

package grpc

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "io"
    g "learning/gooop/grpc"
    "learning/gooop/grpc/logger"
    "learning/gooop/grpc/pb"
    "strconv"
    "sync"
    "testing"
    "time"
)

func Test_HelloServer(t *testing.T) {fnAssertTrue := func(b bool, msg string) {
        if !b {t.Fatal(msg)
        }
    }
    logger.Verbose(false)

    serverPort := 3333
    serverAddress := fmt.Sprintf("127.0.0.1:%d", serverPort)
    iTotalMsgCount := 1000

    // start server
    srv := new(g.HelloServer)
    err := srv.BeginServeTCP(serverPort)
    if err != nil {t.Fatal(err)
    }
    time.Sleep(100 * time.Millisecond)

    // connect to grpc server
    conn, err := grpc.Dial(serverAddress, grpc.WithInsecure())
    if err != nil {t.Fatal(err)
    }
    defer conn.Close()

    // create grpc client
    client := pb.NewHelloServerClient(conn)

    // test SimpleRequest
    ctx := context.Background()

    for i := 0; i < iTotalMsgCount; i++ {
        msg := &pb.HelloMessage{Msg: fmt.Sprintf("SimpleRequest %d", i),
        }
        reply, err := client.SimpleRequest(ctx, msg)
        if err != nil {t.Fatal(err)
        }
        fnAssertTrue(reply.Msg == "reply"+msg.Msg, "invalid SimpleRequest response")
    }
    t.Log("passed SimpleRequest")

    // test ClientStream
    clientStream, err := client.ClientStream(ctx)
    if err != nil {t.Fatal(err)
    }
    for i := 0; i < iTotalMsgCount; i++ {
        msg := &pb.HelloMessage{Msg: fmt.Sprintf("ClientStream %08d", i),
        }
        err = clientStream.Send(msg)
        if err != nil {t.Fatal(err)
        }
    }
    reply, err := clientStream.CloseAndRecv()
    if err != nil {t.Fatal(err)
    }
    fnAssertTrue(reply.Msg == "reply ClientStream", "invalid ClientStream response")

    // logger.Logf("HelloServer.ClientStream, recv %s", msg.String())
    for i := 0; i < iTotalMsgCount; i++ {log := fmt.Sprintf("HelloServer.ClientStream, recv ClientStream %08d", i)
        fnAssertTrue(logger.Count(log) == 1, "expecting log"+log)
    }
    t.Log("passed ClientStream")

    // test ServerStream
    serverStream, err := client.ServerStream(ctx, &pb.HelloMessage{Msg: strconv.Itoa(iTotalMsgCount)})
    if err != nil {t.Fatal(err)
    }

    for {msg, err := serverStream.Recv()
        if err == io.EOF {break}
        if err != nil {t.Fatal(err)
        }
        logger.Logf("ServerStream.Recv %s", msg.Msg)
    }

    for i := 0; i < iTotalMsgCount; i++ {log := fmt.Sprintf("ServerStream.Recv ServerStream-%08d", i)
        fnAssertTrue(logger.Count(log) == 1, "expecting log"+log)
    }
    t.Log("passed ServerStream")

    // test DualStream
    dualStream, err := client.DualStream(ctx)
    var wg sync.WaitGroup

    wg.Add(1)
    go func() {defer wg.Done()
        for i := 0; i < iTotalMsgCount; i++ {
            msg := &pb.HelloMessage{Msg: fmt.Sprintf("DualStream.Send %08d", i),
            }
            err := dualStream.Send(msg)
            if err != nil {t.Fatal(err)
            }
        }

        err = dualStream.CloseSend()
        if err != nil {t.Fatal(err)
        }
    }()

    wg.Add(1)
    go func() {defer wg.Done()
        for {msg, err := dualStream.Recv()
            if err == io.EOF {break}

            if err != nil {t.Fatal(err)
            }

            logger.Logf("DualStream.Recv %s", msg.Msg)
        }
    }()

    wg.Wait()
    for i := 0; i < iTotalMsgCount; i++ {
        // Msg: "reply" + msg.Msg,
        // logger.Logf("DualStream.Recv %s", msg.Msg)
        log := fmt.Sprintf("DualStream.Recv reply DualStream.Send %08d", i)
        fnAssertTrue(logger.Count(log) == 1, "expecting log"+log)
    }
    t.Log("passed DualStream")
}

测试输入

$ go test -v grpc_test.go 
=== RUN   Test_HelloServer
    grpc_test.go:60: passed SimpleRequest
    grpc_test.go:87: passed ClientStream
    grpc_test.go:110: passed ServerStream
    grpc_test.go:159: passed DualStream
--- PASS: Test_HelloServer (0.79s)
PASS
ok      command-line-arguments  0.791s

hello.proto

定义四种通信模式的 rpc 接口

syntax = "proto3";

package pb;

option go_package="./pb";

service HelloServer {rpc SimpleRequest(HelloMessage) returns (HelloMessage);
  rpc ClientStream(stream HelloMessage) returns (HelloMessage);
  rpc ServerStream(HelloMessage) returns (stream HelloMessage);
  rpc DualStream(stream HelloMessage) returns (stream HelloMessage);
}

message HelloMessage {string msg = 1;}

HelloServer.go

gRPC 服务端的实现

package grpc

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "io"
    "learning/gooop/grpc/logger"
    "learning/gooop/grpc/pb"
    "net"
    "strconv"
)

type HelloServer int

func (me *HelloServer) SimpleRequest(ctx context.Context, msg *pb.HelloMessage) (*pb.HelloMessage, error) {//logger.Logf("HelloServer.SimpleRequest, %s", msg.Msg)
    msg.Msg = "reply" + msg.Msg
    return msg, nil
}

func (me *HelloServer) ClientStream(stream pb.HelloServer_ClientStreamServer) error {
    for {msg, err := stream.Recv()
        if err == io.EOF {logger.Logf("HelloServer.ClientStream, EOF")
            break
        }

        if err != nil {logger.Logf("HelloServer.ClientStream, err=%v", err)
            return err
        }

        logger.Logf("HelloServer.ClientStream, recv %s", msg.Msg)
    }

    err := stream.SendAndClose(&pb.HelloMessage{Msg: "reply ClientStream",})
    if err != nil {logger.Logf("HelloServer.ClientStream, SendAndClose err=%v", err)
    }
    return nil
}

func (me *HelloServer) ServerStream(msg *pb.HelloMessage, stream pb.HelloServer_ServerStreamServer) error {iMsgCount, err := strconv.Atoi(msg.Msg)
    if err != nil {return err}

    for i := 0; i < iMsgCount; i++ {
        msg := &pb.HelloMessage{Msg: fmt.Sprintf("ServerStream-%08d", i),
        }
        err := stream.Send(msg)
        if err != nil {return err}
    }

    return nil
}

func (me *HelloServer) DualStream(stream pb.HelloServer_DualStreamServer) error {
    for {msg, err := stream.Recv()
        if err == io.EOF {return nil}

        if err != nil {logger.Logf("HelloServer.DualStream, recv err=%v", err)
            return err
        }
        logger.Logf("HelloServer.DualStream, recv msg=%v", msg.Msg)

        ret := &pb.HelloMessage{Msg: "reply" + msg.Msg,}
        err = stream.Send(ret)
        if err != nil {logger.Logf("HelloServer.DualStream, send err=%v", err)
            return err
        }
    }
}

func (me *HelloServer) BeginServeTCP(port int) error {listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port))
    if err != nil {return err}

    server := grpc.NewServer()
    pb.RegisterHelloServerServer(server, me)
    go func() {panic(server.Serve(listener))
    }()

    return nil
}

(end)

正文完
 0