本篇为【写给go开发者的gRPC教程系列】第二篇

  • 第一篇:protobuf根底
  • 第二篇:通信模式

上一篇介绍了如何编写 protobuf 的 idl,并应用 idl 生成了 gRPC 的代码,当初来看看如何编写客户端和服务端的代码

Simple RPC (Unary RPC)

syntax = "proto3";package ecommerce;import "google/protobuf/wrappers.proto";option go_package = "ecommerce/";message Order {  string id = 1;  repeated string items = 2;  string description = 3;  float price = 4;  string destination = 5;}service OrderManagement {  rpc getOrder(google.protobuf.StringValue) returns (Order);}

定义如上的 idl,须要关注几个事项

  • 应用protobuf最新版本syntax = "proto3";
  • protoc-gen-go要求 pb 文件必须指定 go 包的门路。即option go_package = "ecommerce/";
  • 定义的method仅能有一个入参和出参数。如果须要传递多个参数须要定义成message
  • 应用import援用另外一个文件的 pb。google/protobuf/wrappers.proto是 google 内置的类型

生成 go 和 grpc 的代码

$ protoc -I ./pb \  --go_out ./ecommerce --go_opt paths=source_relative \  --go-grpc_out ./ecommerce --go-grpc_opt paths=source_relative \  ./pb/product.proto
ecommerce├── product.pb.go└── product_grpc.pb.gopb└── product.proto

server 实现

1、由 pb 文件生成的 gRPC 代码中蕴含了 service 的接口定义,它和咱们定义的 idl 是吻合的

service OrderManagement {  rpc getOrder(google.protobuf.StringValue) returns (Order);}
type OrderManagementServer interface {    GetOrder(context.Context, *wrapperspb.StringValue) (*Order, error)    mustEmbedUnimplementedOrderManagementServer()}

2、咱们的业务逻辑就是实现这个接口

package mainimport (    "context"    "log"    pb "github.com/liangwt/note/grpc/unary_rpc_example/ecommerce"    "google.golang.org/grpc/codes"    "google.golang.org/grpc/status"    "google.golang.org/protobuf/types/known/wrapperspb")var _ pb.OrderManagementServer = &OrderManagementImpl{}var orders = make(map[string]pb.Order)type OrderManagementImpl struct {    pb.UnimplementedOrderManagementServer}// Simple RPCfunc (s *OrderManagementImpl) GetOrder(ctx context.Context, orderId *wrapperspb.StringValue) (*pb.Order, error) {    ord, exists := orders[orderId.Value]    if exists {        return &ord, status.New(codes.OK, "").Err()    }    return nil, status.Errorf(codes.NotFound, "Order does not exist. : ", orderId)}

3、在实现完业务逻辑之后,咱们能够创立并启动服务

package mainimport (    "net"    pb "github.com/liangwt/note/grpc/unary_rpc_example/ecommerce"    "google.golang.org/grpc")func main() {    s := grpc.NewServer()    pb.RegisterOrderManagementServer(s, &OrderManagementImpl{})    lis, err := net.Listen("tcp", ":8009")    if err != nil {        panic(err)    }    if err := s.Serve(lis); err != nil {        panic(err)    }}

服务端代码实现的流程如下

client 实现

1、由 pb 文件生成的 gRPC 代码中蕴含了 client 的实现,它和咱们定义的 idl 也是吻合的

service OrderManagement {  rpc getOrder(google.protobuf.StringValue) returns (Order);}
type orderManagementClient struct {    cc grpc.ClientConnInterface}func NewOrderManagementClient(cc grpc.ClientConnInterface) OrderManagementClient {    return &orderManagementClient{cc}}func (c *orderManagementClient) GetOrder(ctx context.Context, in *wrapperspb.StringValue, opts ...grpc.CallOption) (*Order, error) {    out := new(Order)    err := c.cc.Invoke(ctx, "/ecommerce.OrderManagement/getOrder", in, out, opts...)    if err != nil {        return nil, err    }    return out, nil}

2、间接应用 client 来进行 rpc 调用

package mainimport (    "context"    "log"    "time"    pb "github.com/liangwt/note/grpc/unary_rpc_example/ecommerce"    "google.golang.org/grpc"    "google.golang.org/protobuf/types/known/wrapperspb")func main() {    conn, err := grpc.Dial("127.0.0.1:8009", grpc.WithInsecure())    if err != nil {        panic(err)    }    defer conn.Close()    client := pb.NewOrderManagementClient(conn)    ctx, cancel := context.WithTimeout(context.Background(), time.Second)    defer cancel()    // Get Order    retrievedOrder, err := client.GetOrder(ctx, &wrapperspb.StringValue{Value: "101"})    if err != nil {        panic(err)    }    log.Print("GetOrder Response -> : ", retrievedOrder)}

客户端代码实现的流程如下

小总结

✨ 前文提到过protobuf协定是平台无关的。演示的客户端和服务端都是 golang 的,即便客户端和服务端不同语言也是相似的能够通信的

✨ 对于下面介绍的的这种相似于http1.x的模式:客户端发送申请,服务端响应申请,一问一答的模式在 gRPC 里叫做Simple RPC (也称Unary RPC)。gRPC 同时也反对其余类型的交互方式。

Server-Streaming RPC 服务器端流式 RPC

服务器端流式 RPC,显然是单向流,并代指 Server 为 Stream 而 Client 为一般 RPC 申请

简略来讲就是客户端发动一次一般的 RPC 申请,服务端通过流式响应屡次发送数据集,客户端 Recv 接收数据集。大抵如图:

pb 定义

syntax = "proto3";package ecommerce;option go_package = "ecommerce/";import "google/protobuf/wrappers.proto";message Order {  string id = 1;  repeated string items = 2;  string description = 3;  float price = 4;  string destination = 5;}service OrderManagement {  rpc searchOrders(google.protobuf.StringValue) returns (stream Order);}

server 实现

✨ 留神与Simple RPC的区别:因为咱们的服务端是流式响应的,因而对于服务端来说函数入参了一个stream OrderManagement_SearchOrdersServer参数用来写入多个响应,能够把它看作是客户端的对象

✨ 能够通过调用这个流对象的Send(...),来往客户端写入数据

✨ 通过返回nil或者error来示意全副数据写完了

func (s *server) SearchOrders(query *wrapperspb.StringValue,                              stream pb.OrderManagement_SearchOrdersServer) error {    for _, order := range orders {        for _, str := range order.Items {            if strings.Contains(str, query.Value) {                err := stream.Send(&order)                if err != nil {                    return fmt.Errorf("error send: %v", err)                }            }        }    }    return nil}

client 实现

✨ 留神与Simple RPC的区别:因为咱们的服务端是流式响应的,因而 RPC 函数返回值stream是一个流,能够把它看作是服务端的对象

✨ 应用streamRecv函数来一直从服务端接收数据

✨ 当Recv返回io.EOF代表流曾经完结

c := pb.NewOrderManagementClient(conn)ctx, cancelFn := context.WithCancel(context.Background())defer cancelFn()stream, err := c.SearchOrders(ctx, &wrapperspb.StringValue{Value: "Google"})if err != nil{  panic(err)}for{  order, err := stream.Recv()  if err == io.EOF{    break  }  log.Println("Search Result: ", order)}

小总结

Client-Streaming RPC 客户端流式 RPC

客户端流式 RPC,显然也是单向流,客户端通过流式发动屡次 RPC 申请给服务端,服务端发动一次响应给客户端,大抵如图:

服务端没有必要等到客户端发送完所有申请再响应,能够在收到局部申请之后就响应

pb 定义

syntax = "proto3";package ecommerce;option go_package = "ecommerce/";import "google/protobuf/wrappers.proto";message Order {  string id = 1;  repeated string items = 2;  string description = 3;  float price = 4;  string destination = 5;}service OrderManagement {  rpc updateOrders(stream Order) returns (google.protobuf.StringValue);}

server 实现

✨ 留神与Simple RPC的区别:因为咱们的客户端是流式申请的,因而申请参数stream OrderManagement_UpdateOrdersServer就是流对象

✨ 能够从stream OrderManagement_UpdateOrdersServerRecv函数读取音讯

✨ 当Recv返回io.EOF代表流曾经完结

✨ 应用stream OrderManagement_UpdateOrdersServerSendAndClose函数敞开并发送响应

// 在这段程序中,咱们对每一个 Recv 都进行了解决// 当发现 io.EOF (流敞开) 后,须要将最终的响应后果发送给客户端,同时敞开正在另外一侧期待的 Recvfunc (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {    ordersStr := "Updated Order IDs : "    for {        order, err := stream.Recv()        if err == io.EOF {            // Finished reading the order stream.            return stream.SendAndClose(                &wrapperspb.StringValue{Value: "Orders processed " + ordersStr})        }        // Update order        orders[order.Id] = *order        log.Println("Order ID ", order.Id, ": Updated")        ordersStr += order.Id + ", "    }}

Client 实现

✨ 留神与Simple RPC的区别:因为咱们的客户端是流式响应的,因而 RPC 函数返回值stream是一个流

✨ 能够通过调用这个流对象的Send(...),来往这个对象写入数据

✨ 应用streamCloseAndRecv函数敞开并发送响应

c := pb.NewOrderManagementClient(conn)ctx, cancelFn := context.WithCancel(context.Background())defer cancelFn()stream, err := c.UpdateOrders(ctx)if err != nil {  panic(err)}if err := stream.Send(&pb.Order{  Id:          "00",  Items:       []string{"A", "B"},  Description: "A with B",  Price:       0.11,  Destination: "ABC",}); err != nil {  panic(err)}if err := stream.Send(&pb.Order{  Id:          "01",  Items:       []string{"C", "D"},  Description: "C with D",  Price:       1.11,  Destination: "ABCDEFG",}); err != nil {  panic(err)}res, err := stream.CloseAndRecv()if err != nil {  panic(err)}log.Printf("Update Orders Res : %s", res)

小总结

Bidirectional-Streaming RPC 双向流式 RPC

双向流式 RPC,顾名思义是双向流。由客户端以流式的形式发动申请,服务端同样以流式的形式响应申请

首个申请肯定是 Client 发动,但具体交互方式(谁先谁后、一次发多少、响应多少、什么时候敞开)依据程序编写的形式来确定(能够联合协程)

假如该双向流是按程序发送的话,大抵如图:

pb 定义

syntax = "proto3";package ecommerce;option go_package = "ecommerce/";import "google/protobuf/wrappers.proto";message Order {  string id = 1;  repeated string items = 2;  string description = 3;  float price = 4;  string destination = 5;}message CombinedShipment {  string id = 1;  string status = 2;  repeated Order orderList = 3;}service OrderManagement {  rpc processOrders(stream google.protobuf.StringValue)      returns (stream CombinedShipment);}

server 实现

✨ 函数入参OrderManagement_ProcessOrdersServer是用来写入多个响应和读取多个音讯的对象援用

✨ 能够通过调用这个流对象的Send(...),来往这个对象写入响应

✨ 能够通过调用这个流对象的Recv(...)函数读取音讯,当Recv返回io.EOF代表流曾经完结

✨ 通过返回nil或者error示意全副数据写完了

func (s *server) ProcessOrders(stream pb.OrderManagement_ProcessOrdersServer) error {    batchMarker := 1    var combinedShipmentMap = make(map[string]pb.CombinedShipment)    for {        orderId, err := stream.Recv()        log.Printf("Reading Proc order : %s", orderId)        if err == io.EOF {            log.Printf("EOF : %s", orderId)            for _, shipment := range combinedShipmentMap {                if err := stream.Send(&shipment); err != nil {                    return err                }            }            return nil        }        if err != nil {            log.Println(err)            return err        }        destination := orders[orderId.GetValue()].Destination        shipment, found := combinedShipmentMap[destination]        if found {            ord := orders[orderId.GetValue()]            shipment.OrderList = append(shipment.OrderList, &ord)            combinedShipmentMap[destination] = shipment        } else {            comShip := pb.CombinedShipment{Id: "cmb - " + (orders[orderId.GetValue()].Destination), Status: "Processed!"}            ord := orders[orderId.GetValue()]            comShip.OrderList = append(shipment.OrderList, &ord)            combinedShipmentMap[destination] = comShip            log.Print(len(comShip.OrderList), comShip.GetId())        }        if batchMarker == orderBatchSize {            for _, comb := range combinedShipmentMap {                log.Printf("Shipping : %v -> %v", comb.Id, len(comb.OrderList))                if err := stream.Send(&comb); err != nil {                    return err                }            }            batchMarker = 0            combinedShipmentMap = make(map[string]pb.CombinedShipment)        } else {            batchMarker++        }    }}

Client 实现

✨ 函数返回值OrderManagement_ProcessOrdersClient是用来获取多个响应和写入多个音讯的对象援用

✨ 能够通过调用这个流对象的Send(...),来往这个对象写入响应

✨ 能够通过调用这个流对象的Recv(...)函数读取音讯,当Recv返回io.EOF代表流曾经完结

c := pb.NewOrderManagementClient(conn)ctx, cancelFn := context.WithCancel(context.Background())defer cancelFn()stream, err := c.ProcessOrders(ctx)if err != nil {  panic(err)}go func() {  if err := stream.Send(&wrapperspb.StringValue{Value: "101"}); err != nil {    panic(err)  }  if err := stream.Send(&wrapperspb.StringValue{Value: "102"}); err != nil {    panic(err)  }  if err := stream.CloseSend(); err != nil {    panic(err)  }}()for {  combinedShipment, err := stream.Recv()  if err == io.EOF {    break  }  log.Println("Combined shipment : ", combinedShipment.OrderList)}

小总结

双向流绝对还是比较复杂的,大部分场景都是应用事件机制进行异步交互,须要精心的设计

示例代码

https://github.com/liangwt/gr...