共计 9039 个字符,预计需要花费 23 分钟才能阅读完成。
本篇为【写给 go 开发者的 gRPC 教程系列】第二篇
- 第一篇:protobuf 根底
- 第二篇:通信模式
上一篇介绍了如何编写 protobuf 的 idl,并应用 idl 生成了 gRPC 的代码,当初来看看如何编写客户端和服务端的代码
Simple RPC (Unary RPC)
syntax = "proto3"; | |
package ecommerce; | |
import "google/protobuf/wrappers.proto"; | |
option go_package = "ecommerce/"; | |
message Order { | |
string id = 1; | |
repeated string items = 2; | |
string description = 3; | |
float price = 4; | |
string destination = 5; | |
} | |
service OrderManagement {rpc getOrder(google.protobuf.StringValue) returns (Order); | |
} |
定义如上的 idl,须要关注几个事项
- 应用
protobuf
最新版本syntax = "proto3";
protoc-gen-go
要求 pb 文件必须指定 go 包的门路。即option go_package = "ecommerce/";
- 定义的
method
仅能有一个入参和出参数。如果须要传递多个参数须要定义成message
- 应用
import
援用另外一个文件的 pb。google/protobuf/wrappers.proto
是 google 内置的类型
生成 go 和 grpc 的代码
$ protoc -I ./pb \ | |
--go_out ./ecommerce --go_opt paths=source_relative \ | |
--go-grpc_out ./ecommerce --go-grpc_opt paths=source_relative \ | |
./pb/product.proto |
ecommerce | |
├── product.pb.go | |
└── product_grpc.pb.go | |
pb | |
└── product.proto |
server 实现
1、由 pb 文件生成的 gRPC 代码中蕴含了 service 的接口定义,它和咱们定义的 idl 是吻合的
service OrderManagement {rpc getOrder(google.protobuf.StringValue) returns (Order); | |
} |
type OrderManagementServer interface {GetOrder(context.Context, *wrapperspb.StringValue) (*Order, error) | |
mustEmbedUnimplementedOrderManagementServer()} |
2、咱们的业务逻辑就是实现这个接口
package main | |
import ( | |
"context" | |
"log" | |
pb "github.com/liangwt/note/grpc/unary_rpc_example/ecommerce" | |
"google.golang.org/grpc/codes" | |
"google.golang.org/grpc/status" | |
"google.golang.org/protobuf/types/known/wrapperspb" | |
) | |
var _ pb.OrderManagementServer = &OrderManagementImpl{} | |
var orders = make(map[string]pb.Order) | |
type OrderManagementImpl struct {pb.UnimplementedOrderManagementServer} | |
// Simple RPC | |
func (s *OrderManagementImpl) GetOrder(ctx context.Context, orderId *wrapperspb.StringValue) (*pb.Order, error) {ord, exists := orders[orderId.Value] | |
if exists {return &ord, status.New(codes.OK, "").Err()} | |
return nil, status.Errorf(codes.NotFound, "Order does not exist. :", orderId) | |
} |
3、在实现完业务逻辑之后,咱们能够创立并启动服务
package main | |
import ( | |
"net" | |
pb "github.com/liangwt/note/grpc/unary_rpc_example/ecommerce" | |
"google.golang.org/grpc" | |
) | |
func main() {s := grpc.NewServer() | |
pb.RegisterOrderManagementServer(s, &OrderManagementImpl{}) | |
lis, err := net.Listen("tcp", ":8009") | |
if err != nil {panic(err) | |
} | |
if err := s.Serve(lis); err != nil {panic(err) | |
} | |
} |
服务端代码实现的流程如下
client 实现
1、由 pb 文件生成的 gRPC 代码中蕴含了 client 的实现,它和咱们定义的 idl 也是吻合的
service OrderManagement {rpc getOrder(google.protobuf.StringValue) returns (Order); | |
} |
type orderManagementClient struct {cc grpc.ClientConnInterface} | |
func NewOrderManagementClient(cc grpc.ClientConnInterface) OrderManagementClient {return &orderManagementClient{cc} | |
} | |
func (c *orderManagementClient) GetOrder(ctx context.Context, in *wrapperspb.StringValue, opts ...grpc.CallOption) (*Order, error) {out := new(Order) | |
err := c.cc.Invoke(ctx, "/ecommerce.OrderManagement/getOrder", in, out, opts...) | |
if err != nil {return nil, err} | |
return out, nil | |
} |
2、间接应用 client 来进行 rpc 调用
package main | |
import ( | |
"context" | |
"log" | |
"time" | |
pb "github.com/liangwt/note/grpc/unary_rpc_example/ecommerce" | |
"google.golang.org/grpc" | |
"google.golang.org/protobuf/types/known/wrapperspb" | |
) | |
func main() {conn, err := grpc.Dial("127.0.0.1:8009", grpc.WithInsecure()) | |
if err != nil {panic(err) | |
} | |
defer conn.Close() | |
client := pb.NewOrderManagementClient(conn) | |
ctx, cancel := context.WithTimeout(context.Background(), time.Second) | |
defer cancel() | |
// Get Order | |
retrievedOrder, err := client.GetOrder(ctx, &wrapperspb.StringValue{Value: "101"}) | |
if err != nil {panic(err) | |
} | |
log.Print("GetOrder Response -> :", retrievedOrder) | |
} |
客户端代码实现的流程如下
小总结
✨ 前文提到过 protobuf 协定
是平台无关的。演示的客户端和服务端都是 golang 的,即便客户端和服务端不同语言也是相似的能够通信的
✨ 对于下面介绍的的这种相似于 http1.x
的模式:客户端发送申请,服务端响应申请,一问一答的模式在 gRPC 里叫做Simple RPC
(也称Unary RPC)
。gRPC 同时也反对其余类型的交互方式。
Server-Streaming RPC 服务器端流式 RPC
服务器端流式 RPC
,显然是单向流,并代指 Server 为 Stream 而 Client 为一般 RPC 申请
简略来讲就是客户端发动一次一般的 RPC 申请,服务端通过流式响应屡次发送数据集,客户端 Recv 接收数据集。大抵如图:
pb 定义
syntax = "proto3"; | |
package ecommerce; | |
option go_package = "ecommerce/"; | |
import "google/protobuf/wrappers.proto"; | |
message Order { | |
string id = 1; | |
repeated string items = 2; | |
string description = 3; | |
float price = 4; | |
string destination = 5; | |
} | |
service OrderManagement {rpc searchOrders(google.protobuf.StringValue) returns (stream Order); | |
} |
server 实现
✨ 留神与 Simple RPC
的区别:因为咱们的服务端是流式响应的,因而对于服务端来说函数入参 多了一个 stream OrderManagement_SearchOrdersServer
参数用来写入多个响应,能够把它看作是客户端的对象
✨ 能够通过调用这个流对象的Send(...)
,来往客户端写入数据
✨ 通过返回 nil
或者 error
来示意全副数据写完了
func (s *server) SearchOrders(query *wrapperspb.StringValue, | |
stream pb.OrderManagement_SearchOrdersServer) error { | |
for _, order := range orders { | |
for _, str := range order.Items {if strings.Contains(str, query.Value) {err := stream.Send(&order) | |
if err != nil {return fmt.Errorf("error send: %v", err) | |
} | |
} | |
} | |
} | |
return nil | |
} |
client 实现
✨ 留神与 Simple RPC
的区别:因为咱们的服务端是流式响应的,因而 RPC 函数返回值 stream
是一个流,能够把它看作是服务端的对象
✨ 应用 stream
的Recv
函数来一直从服务端接收数据
✨ 当 Recv
返回 io.EOF
代表流曾经完结
c := pb.NewOrderManagementClient(conn) | |
ctx, cancelFn := context.WithCancel(context.Background()) | |
defer cancelFn() | |
stream, err := c.SearchOrders(ctx, &wrapperspb.StringValue{Value: "Google"}) | |
if err != nil{panic(err) | |
} | |
for{order, err := stream.Recv() | |
if err == io.EOF{break} | |
log.Println("Search Result:", order) | |
} |
小总结
Client-Streaming RPC 客户端流式 RPC
客户端流式 RPC
,显然也是单向流,客户端通过流式发动 屡次 RPC 申请给服务端,服务端发动 一次 响应给客户端,大抵如图:
服务端没有必要等到客户端发送完所有申请再响应,能够在收到局部申请之后就响应
pb 定义
syntax = "proto3"; | |
package ecommerce; | |
option go_package = "ecommerce/"; | |
import "google/protobuf/wrappers.proto"; | |
message Order { | |
string id = 1; | |
repeated string items = 2; | |
string description = 3; | |
float price = 4; | |
string destination = 5; | |
} | |
service OrderManagement {rpc updateOrders(stream Order) returns (google.protobuf.StringValue); | |
} |
server 实现
✨ 留神与 Simple RPC
的区别:因为咱们的客户端是流式申请的,因而申请参数 stream OrderManagement_UpdateOrdersServer
就是流对象
✨ 能够从 stream OrderManagement_UpdateOrdersServer
的Recv
函数读取音讯
✨ 当 Recv
返回 io.EOF
代表流曾经完结
✨ 应用 stream OrderManagement_UpdateOrdersServer
的SendAndClose
函数敞开并发送响应
// 在这段程序中,咱们对每一个 Recv 都进行了解决 | |
// 当发现 io.EOF (流敞开) 后,须要将最终的响应后果发送给客户端,同时敞开正在另外一侧期待的 Recv | |
func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error { | |
ordersStr := "Updated Order IDs :" | |
for {order, err := stream.Recv() | |
if err == io.EOF { | |
// Finished reading the order stream. | |
return stream.SendAndClose(&wrapperspb.StringValue{Value: "Orders processed" + ordersStr}) | |
} | |
// Update order | |
orders[order.Id] = *order | |
log.Println("Order ID", order.Id, ": Updated") | |
ordersStr += order.Id + "," | |
} | |
} |
Client 实现
✨ 留神与 Simple RPC
的区别:因为咱们的客户端是流式响应的,因而 RPC 函数返回值 stream
是一个流
✨ 能够通过调用这个流对象的Send(...)
,来往这个对象写入数据
✨ 应用 stream
的CloseAndRecv
函数敞开并发送响应
c := pb.NewOrderManagementClient(conn) | |
ctx, cancelFn := context.WithCancel(context.Background()) | |
defer cancelFn() | |
stream, err := c.UpdateOrders(ctx) | |
if err != nil {panic(err) | |
} | |
if err := stream.Send(&pb.Order{ | |
Id: "00", | |
Items: []string{"A", "B"}, | |
Description: "A with B", | |
Price: 0.11, | |
Destination: "ABC", | |
}); err != nil {panic(err) | |
} | |
if err := stream.Send(&pb.Order{ | |
Id: "01", | |
Items: []string{"C", "D"}, | |
Description: "C with D", | |
Price: 1.11, | |
Destination: "ABCDEFG", | |
}); err != nil {panic(err) | |
} | |
res, err := stream.CloseAndRecv() | |
if err != nil {panic(err) | |
} | |
log.Printf("Update Orders Res : %s", res) |
小总结
Bidirectional-Streaming RPC 双向流式 RPC
双向流式 RPC
,顾名思义是双向流。由客户端以流式的形式发动申请,服务端同样以流式的形式响应申请
首个申请肯定是 Client 发动,但具体交互方式(谁先谁后、一次发多少、响应多少、什么时候敞开)依据程序编写的形式来确定(能够联合协程)
假如该双向流是 按程序发送 的话,大抵如图:
pb 定义
syntax = "proto3"; | |
package ecommerce; | |
option go_package = "ecommerce/"; | |
import "google/protobuf/wrappers.proto"; | |
message Order { | |
string id = 1; | |
repeated string items = 2; | |
string description = 3; | |
float price = 4; | |
string destination = 5; | |
} | |
message CombinedShipment { | |
string id = 1; | |
string status = 2; | |
repeated Order orderList = 3; | |
} | |
service OrderManagement {rpc processOrders(stream google.protobuf.StringValue) | |
returns (stream CombinedShipment); | |
} |
server 实现
✨ 函数入参 OrderManagement_ProcessOrdersServer
是用来写入多个响应和读取多个音讯的对象援用
✨ 能够通过调用这个流对象的Send(...)
,来往这个对象写入响应
✨ 能够通过调用这个流对象的 Recv(...)
函数读取音讯,当 Recv
返回 io.EOF
代表流曾经完结
✨ 通过返回 nil
或者 error
示意全副数据写完了
func (s *server) ProcessOrders(stream pb.OrderManagement_ProcessOrdersServer) error { | |
batchMarker := 1 | |
var combinedShipmentMap = make(map[string]pb.CombinedShipment) | |
for {orderId, err := stream.Recv() | |
log.Printf("Reading Proc order : %s", orderId) | |
if err == io.EOF {log.Printf("EOF : %s", orderId) | |
for _, shipment := range combinedShipmentMap {if err := stream.Send(&shipment); err != nil {return err} | |
} | |
return nil | |
} | |
if err != nil {log.Println(err) | |
return err | |
} | |
destination := orders[orderId.GetValue()].Destination | |
shipment, found := combinedShipmentMap[destination] | |
if found {ord := orders[orderId.GetValue()] | |
shipment.OrderList = append(shipment.OrderList, &ord) | |
combinedShipmentMap[destination] = shipment | |
} else {comShip := pb.CombinedShipment{Id: "cmb -" + (orders[orderId.GetValue()].Destination), Status: "Processed!"} | |
ord := orders[orderId.GetValue()] | |
comShip.OrderList = append(shipment.OrderList, &ord) | |
combinedShipmentMap[destination] = comShip | |
log.Print(len(comShip.OrderList), comShip.GetId()) | |
} | |
if batchMarker == orderBatchSize { | |
for _, comb := range combinedShipmentMap {log.Printf("Shipping : %v -> %v", comb.Id, len(comb.OrderList)) | |
if err := stream.Send(&comb); err != nil {return err} | |
} | |
batchMarker = 0 | |
combinedShipmentMap = make(map[string]pb.CombinedShipment) | |
} else {batchMarker++} | |
} | |
} |
Client 实现
✨ 函数返回值 OrderManagement_ProcessOrdersClient
是用来获取多个响应和写入多个音讯的对象援用
✨ 能够通过调用这个流对象的Send(...)
,来往这个对象写入响应
✨ 能够通过调用这个流对象的 Recv(...)
函数读取音讯,当 Recv
返回 io.EOF
代表流曾经完结
c := pb.NewOrderManagementClient(conn) | |
ctx, cancelFn := context.WithCancel(context.Background()) | |
defer cancelFn() | |
stream, err := c.ProcessOrders(ctx) | |
if err != nil {panic(err) | |
} | |
go func() {if err := stream.Send(&wrapperspb.StringValue{Value: "101"}); err != nil {panic(err) | |
} | |
if err := stream.Send(&wrapperspb.StringValue{Value: "102"}); err != nil {panic(err) | |
} | |
if err := stream.CloseSend(); err != nil {panic(err) | |
} | |
}() | |
for {combinedShipment, err := stream.Recv() | |
if err == io.EOF {break} | |
log.Println("Combined shipment :", combinedShipment.OrderList) | |
} |
小总结
双向流绝对还是比较复杂的,大部分场景都是应用事件机制进行异步交互,须要精心的设计
示例代码
https://github.com/liangwt/gr…