本篇为【写给go开发者的gRPC教程系列】第五篇

第一篇:protobuf根底

第二篇:通信模式

第三篇:拦截器

第四篇:错误处理

第五篇:metadata

本系列将继续更新,欢送关注获取实时告诉


导语

和在一般HTTP申请中一样,gRPC提供了在每一次RPC中携带的上下文构造:metadata。在Go语言中,它与context.Context紧密结合,帮忙咱们实现服务端与客户端之间相互传递信息

什么是metadata

gRPC 的 metadata 简略了解,就是 HTTP Header 中的 key-value 对

  • metadata 是以 key-value 的模式存储数据的,其中 key 是 string 类型,而 value 是 []string,即一个字符串数组类型
  • metadata 使得 client 和 server 可能为对方提供对于本次调用的一些信息,就像一次HTTP申请的Request HeaderResponse Header一样
  • HTTP Header 的生命周期是一次 HTTP 申请,那么 metadata 的生命周期就是一次 RPC 调用

Metadata 创立

应用New():

md := metadata.New(map[string]string{"key1":"value1","key2":"value2"})

应用Pairs():

要留神如果有雷同的 key 会主动合并

md := metadata.Pairs(    "key1", "value1",    "key1", "value1.2", // "key1" will have map value []string{"value1", "value1.2"}    "key2", "value2",)

合并多个metadata

md1 :=  metadata.Pairs("k1", "v1", "k2", "v2")md2 := metadata.New(map[string]string{"key1":"value1","key2":"value2"})md := metadata.Join(md1, md2)

存储二进制数据

在 metadata 中,key 永远是 string 类型,然而 value 能够是 string 也能够是二进制数据。为了在 metadata 中存储二进制数据,咱们仅仅须要在 key 的前面加上一个 - bin 后缀。具备 - bin 后缀的 key 所对应的 value 在创立 metadata 时会被编码(base64),收到的时候会被解码:

md := metadata.Pairs(    "key", "string value",    "key-bin", string([]byte{96, 102}),)

metadata 构造自身也有一些操作方法,参考文档非常容易了解。这里不再赘述:https://pkg.go.dev/google.gol...

Metadata 发送与接管

让咱们再次回顾下pb文件和生成进去的client与server端的接口

service OrderManagement {    rpc getOrder(google.protobuf.StringValue) returns (Order);}
type OrderManagementClient interface {    GetOrder(ctx context.Context,            in *wrapperspb.StringValue, opts ...grpc.CallOption) (*Order, error)}
type OrderManagementServer interface {    GetOrder(context.Context, *wrapperspb.StringValue) (*Order, error)    mustEmbedUnimplementedOrderManagementServer()}

能够看到相比pb中的接口定义,生成进去的Go代码除了减少了error返回值,还多了context.Context

和错误处理相似,gRPC中的context.Context 也合乎Go语言的应用习惯:通常状况下咱们在函数首个参数搁置context.Context用来传递一次RPC中无关的上下文,借助context.WithValue()ctx.Value()context增加变量或读取变量

metadata就是gRPC中能够传递的上下文信息之一,所以metadata的应用形式就是:metadata记录到context,从context读取metadata

Clinet发送Server接管

client发送metadata,那就是把metadata存储到contex.Context

server接管metadata,就是从contex.Context中读取Metadata

Clinet 发送 Metadata

Metadata放到contex.Context,有几种形式

应用NewOutgoingContext

将新创建的metadata增加到context中,这样会 笼罩 掉原来已有的metadata

// 将metadata增加到context中,获取新的contextmd := metadata.Pairs("k1", "v1", "k1", "v2", "k2", "v3")ctx := metadata.NewOutgoingContext(context.Background(), md)// unary RPCresponse, err := client.SomeRPC(ctx, someRequest)// streaming RPCstream, err := client.SomeStreamingRPC(ctx)

应用AppendToOutgoingContext

能够间接将 key-value 对增加到已有的context

  • 如果context中没有metadata,那么就会 创立 一个
  • 如果已有metadata,那么就将数据 增加 到原来的metadata
// 如果对应的 context 没有 metadata,那么就会创立一个ctx := metadata.AppendToOutgoingContext(ctx, "k1", "v1", "k1", "v2", "k2", "v3")// 如果已有 metadata 了,那么就将数据增加到原来的 metadata  (例如在拦截器中)ctx := metadata.AppendToOutgoingContext(ctx, "k3", "v4")// 一般RPC(unary RPC)response, err := client.SomeRPC(ctx, someRequest)// 流式RPC(streaming RPC)stream, err := client.SomeStreamingRPC(ctx)

Server 接管 Metedata

一般RPC与流式RPC的区别不大,都是从contex.Context中读取metadata

应用FromIncomingContext

一般RPC(unary RPC)

//Unary Callfunc (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {    md, ok := metadata.FromIncomingContext(ctx)    // do something with metadata}

流式RPC(streaming RPC)

//Streaming Callfunc (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {    md, ok := metadata.FromIncomingContext(stream.Context()) // get context from stream    // do something with metadata}

Server发送Clinet接管

服务端发送的metadata被分成了headertrailer两者,因此客户端也能够读取两者

Server 发送 Metadata

对于一般RPC(unary RPC)server能够应用grpc包中提供的函数向client发送 headertrailer

  • grpc.SendHeader()
  • grpc.SetHeader()
  • grpc.SetTrailer()

对于**流式RPC(streaming RPC)server能够应用ServerStream接口中定义的函数向client发送headertrailer

  • ServerStream.SendHeader()
  • ServerStream.SetHeader()
  • ServerStream.SetTrailer()

一般RPC(unary RPC)

应用 grpc.SendHeader()grpc.SetTrailer() 办法 ,这两个函数将context.Context作为第一个参数

func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {  // 创立并发送header  header := metadata.Pairs("header-key", "val")  grpc.SendHeader(ctx, header)    // 创立并发送trailer  trailer := metadata.Pairs("trailer-key", "val")  grpc.SetTrailer(ctx, trailer)}

如果不想立刻发送header,也能够应用grpc.SetHeader()grpc.SetHeader()能够被屡次调用,在如下时机会把多个metadata合并发送进来

  • 调用grpc.SendHeader()
  • 第一个响应被发送时
  • RPC完结时(蕴含胜利或失败)
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {  // 创立header,在适当时机会被发送  header := metadata.Pairs("header-key1", "val1")  grpc.SetHeader(ctx, header)      // 创立header,在适当时机会被发送  header := metadata.Pairs("header-key2", "val2")  grpc.SetHeader(ctx, header)    // 创立并发送trailer  trailer := metadata.Pairs("trailer-key", "val")  grpc.SetTrailer(ctx, trailer)}

流式RPC(streaming RPC)

应用 ServerStream.SendHeader()ServerStream.SetTrailer() 办法

func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {  // create and send header  header := metadata.Pairs("header-key", "val")  stream.SendHeader(header)    // create and set trailer  trailer := metadata.Pairs("trailer-key", "val")  stream.SetTrailer(trailer)}

如果不想立刻发送header,也能够应用ServerStream.SetHeader()ServerStream.SetHeader()能够被屡次调用,在如下时机会把多个metadata合并发送进来

  • 调用ServerStream.SendHeader()
  • 第一个响应被发送时
  • RPC完结时(蕴含胜利或失败)
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {  // create and send header  header := metadata.Pairs("header-key", "val")  stream.SetHeader(header)    // create and set trailer  trailer := metadata.Pairs("trailer-key", "val")  stream.SetTrailer(trailer)}

Client 接管 Metadata

一般RPC(unary RPC)

一般RPC(unary RPC)应用grpc.Header()grpc.Trailer()办法来接管 Metadata

// RPC using the context with new metadata.var header, trailer metadata.MD// Add Orderorder := pb.Order{Id: "101", Items: []string{"iPhone XS", "Mac Book Pro"}, Destination: "San Jose, CA", Price: 2300.00}res, err := client.AddOrder(ctx, &order, grpc.Header(&header), grpc.Trailer(&trailer))if err != nil {  panic(err)}

流式RPC(streaming RPC)

流式RPC(streaming RPC)通过调用返回的 ClientStream接口的Header() Trailer()办法接管 metadata

stream, err := client.SomeStreamingRPC(ctx)// retrieve headerheader, err := stream.Header()stream.CloseAndRecv()// retrieve trailertrailer := stream.Trailer()

HeaderTrailer区别

基本区别:发送的机会不同!

headers会在上面三种场景下被发送

  • SendHeader() 被调用时(蕴含grpc.SendHeaderstream.SendHeader)
  • 第一个响应被发送时
  • RPC完结时(蕴含胜利或失败)

trailer会在rpc返回的时候,即这个申请完结的时候被发送

差别在流式RPC(streaming RPC)中比拟显著:

因为trailer是在服务端发送完申请之后才发送的,所以client获取trailer的时候须要在stream.CloseAndRecv或者stream.Recv 返回非nil谬误 (蕴含 io.EOF)之后

如果stream.CloseAndRecv之前调用stream.Trailer()获取的是空

stream, err := client.SomeStreamingRPC(ctx)// retrieve headerheader, err := stream.Header()// retrieve trailer // `trailer`会在rpc返回的时候,即这个申请完结的时候被发送// 因而此时调用`stream.Trailer()`获取的是空trailer := stream.Trailer()stream.CloseAndRecv()// retrieve trailer // `trailer`会在rpc返回的时候,即这个申请完结的时候被发送// 因而此时调用`stream.Trailer()`才能够获取到值trailer := stream.Trailer()

应用场景

既然咱们把metadata类比成HTTP Header,那么metadata的应用场景也能够借鉴HTTPHeader。如传递用户token进行用户认证,传递trace进行链路追踪等

拦截器中的metadata

在拦截器中,咱们岂但能够获取或批改接管到的metadata,甚至还能够截取并批改要发送进来的metadata

还记得拦截器如何实现么?如果曾经忘了快快回顾一下吧:

举个例子:

咱们在客户端拦截器中从要发送给服务端的metadata中读取一个工夫戳字段,如果没有则补充这个工夫戳字段

留神这里用到了一个上文没有提到的FromOutgoingContext(ctx)函数

func orderUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{},    cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {    var s string    // 获取要发送给服务端的`metadata`    md, ok := metadata.FromOutgoingContext(ctx)    if ok && len(md.Get("time")) > 0 {        s = md.Get("time")[0]    } else {        // 如果没有则补充这个工夫戳字段        s = "inter" + strconv.FormatInt(time.Now().UnixNano(), 10)        ctx = metadata.AppendToOutgoingContext(ctx, "time", s)    }    log.Printf("call timestamp: %s", s)    // Invoking the remote method    err := invoker(ctx, method, req, reply, cc, opts...)    return err}func main() {    conn, err := grpc.Dial("127.0.0.1:8009",        grpc.WithInsecure(),        grpc.WithChainUnaryInterceptor(            orderUnaryClientInterceptor,        ),    )    if err != nil {        panic(err)    }        c := pb.NewOrderManagementClient(conn)    ctx = metadata.AppendToOutgoingContext(context.Background(), "time",        "raw"+strconv.FormatInt(time.Now().UnixNano(), 10))    // RPC using the context with new metadata.    var header, trailer metadata.MD    // Add Order    order := pb.Order{        Id:          "101",        Items:       []string{"iPhone XS", "Mac Book Pro"},        Destination: "San Jose, CA",        Price:       2300.00,    }    res, err := c.AddOrder(ctx, &order)    if err != nil {        panic(err)    }}

以上的思路在server同样实用。基于以上原理咱们能够实现链路追踪、用户认证等性能

错误信息

还记得错误处理一文中留下的问题么:gRPC 中如何传递谬误音讯Status的呢?没错!也是应用的metadata或者说http2.0headerStatus的三种信息别离应用了三个header

  • Grpc-Status: 传递Statuscode
  • Grpc-Message: 传递Statusmessage
  • Grpc-Status-Details-Bin: 传递Statusdetails
func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error {    // ...        h := ht.rw.Header()        h.Set("Grpc-Status", fmt.Sprintf("%d", st.Code()))        if m := st.Message(); m != "" {            h.Set("Grpc-Message", encodeGrpcMessage(m))        }        if p := st.Proto(); p != nil && len(p.Details) > 0 {            stBytes, err := proto.Marshal(p)            if err != nil {                // TODO: return error instead, when callers are able to handle it.                panic(err)            }            h.Set("Grpc-Status-Details-Bin", encodeBinHeader(stBytes))        }    // ...}

总结

一张图总结下整个metadata的应用办法(公众号发送:metadata总结获取高清原文件)

举荐浏览

  • 写给go开发者的gRPC教程-拦截器
  • 写给go开发者的gRPC教程-错误处理

参考资料

  • gRPC 中的 Metadata
  • pkg.go.dev/grpc@v1.44.0/metadata
  • concept of metadata
  • Documentation/grpc-metadata

✨ 微信公众号【凉凉的知识库】同步更新,欢送关注获取最新最有用的后端常识 ✨