乐趣区

关于go:写给go开发者的gRPC教程metadata

本篇为【写给 go 开发者的 gRPC 教程系列】第五篇

第一篇:protobuf 根底

第二篇:通信模式

第三篇:拦截器

第四篇:错误处理

第五篇:metadata 👈

本系列将继续更新,欢送关注👏获取实时告诉


导语

和在一般 HTTP 申请中一样,gRPC 提供了在每一次 RPC 中携带的上下文构造:metadata。在 Go 语言中,它与 context.Context 紧密结合,帮忙咱们实现服务端与客户端之间相互传递信息

什么是metadata

gRPC 的 metadata 简略了解,就是 HTTP Header 中的 key-value 对

  • metadata 是以 key-value 的模式存储数据的,其中 key 是 string 类型,而 value 是 []string,即一个字符串数组类型
  • metadata 使得 client 和 server 可能为对方提供对于本次调用的一些信息,就像一次 HTTP 申请的 Request HeaderResponse Header一样
  • HTTP Header 的生命周期是一次 HTTP 申请,那么 metadata 的生命周期就是一次 RPC 调用

Metadata 创立

🌲 应用 New():

md := metadata.New(map[string]string{"key1":"value1","key2":"value2"})

🌲 应用 Pairs():

要留神如果有雷同的 key 会主动合并

md := metadata.Pairs(
    "key1", "value1",
    "key1", "value1.2", // "key1" will have map value []string{"value1", "value1.2"}
    "key2", "value2",
)

🌲 合并多个 metadata

md1 :=  metadata.Pairs("k1", "v1", "k2", "v2")
md2 := metadata.New(map[string]string{"key1":"value1","key2":"value2"})

md := metadata.Join(md1, md2)

🌲 存储二进制数据

在 metadata 中,key 永远是 string 类型,然而 value 能够是 string 也能够是二进制数据。为了在 metadata 中存储二进制数据,咱们仅仅须要在 key 的前面加上一个 – bin 后缀。具备 – bin 后缀的 key 所对应的 value 在创立 metadata 时会被编码(base64),收到的时候会被解码:

md := metadata.Pairs(
    "key", "string value",
    "key-bin", string([]byte{96, 102}),
)

metadata 构造自身也有一些操作方法,参考文档非常容易了解。这里不再赘述:https://pkg.go.dev/google.gol…

Metadata 发送与接管

让咱们再次回顾下 pb 文件和生成进去的 client 与 server 端的接口

service OrderManagement {rpc getOrder(google.protobuf.StringValue) returns (Order);
}
type OrderManagementClient interface {
    GetOrder(ctx context.Context, 
           in *wrapperspb.StringValue, opts ...grpc.CallOption) (*Order, error)
}
type OrderManagementServer interface {GetOrder(context.Context, *wrapperspb.StringValue) (*Order, error)
    mustEmbedUnimplementedOrderManagementServer()}

能够看到相比 pb 中的接口定义,生成进去的 Go 代码除了减少了 error 返回值,还多了context.Context

和错误处理相似,gRPC 中的 context.Context 也合乎 Go 语言的应用习惯:通常状况下咱们在函数首个参数搁置context.Context 用来传递一次 RPC 中无关的上下文,借助 context.WithValue()ctx.Value()context 增加变量或读取变量

metadata就是 gRPC 中能够传递的上下文信息之一,所以 metadata 的应用形式就是:metadata记录到 context,从context 读取metadata

Clinet 发送 Server 接管

client发送 metadata,那就是把metadata 存储到contex.Context

server接管 metadata,就是从contex.Context 中读取Metadata

Clinet 发送 Metadata

Metadata 放到contex.Context,有几种形式

🌲 应用NewOutgoingContext

将新创建的 metadata 增加到 context 中,这样会 笼罩 掉原来已有的metadata

// 将 metadata 增加到 context 中,获取新的 context
md := metadata.Pairs("k1", "v1", "k1", "v2", "k2", "v3")
ctx := metadata.NewOutgoingContext(context.Background(), md)

// unary RPC
response, err := client.SomeRPC(ctx, someRequest)

// streaming RPC
stream, err := client.SomeStreamingRPC(ctx)

🌲 应用AppendToOutgoingContext

能够间接将 key-value 对增加到已有的 context

  • 如果 context 中没有 metadata,那么就会 创立 一个
  • 如果已有 metadata,那么就将数据 增加 到原来的metadata
// 如果对应的 context 没有 metadata,那么就会创立一个
ctx := metadata.AppendToOutgoingContext(ctx, "k1", "v1", "k1", "v2", "k2", "v3")

// 如果已有 metadata 了,那么就将数据增加到原来的 metadata  (例如在拦截器中)
ctx := metadata.AppendToOutgoingContext(ctx, "k3", "v4")

// 一般 RPC(unary RPC)response, err := client.SomeRPC(ctx, someRequest)

// 流式 RPC(streaming RPC)stream, err := client.SomeStreamingRPC(ctx)

Server 接管 Metedata

一般 RPC 与流式 RPC 的区别不大,都是从 contex.Context 中读取metadata

🌲 应用FromIncomingContext

一般 RPC(unary RPC)

//Unary Call
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {md, ok := metadata.FromIncomingContext(ctx)
    // do something with metadata
}

流式 RPC(streaming RPC)

//Streaming Call
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {md, ok := metadata.FromIncomingContext(stream.Context()) // get context from stream
    // do something with metadata
}

Server 发送 Clinet 接管

服务端发送的 metadata 被分成了 headertrailer两者,因此客户端也能够读取两者

Server 发送 Metadata

对于 一般 RPC(unary RPC)server 能够应用 grpc 包中提供的函数向 client 发送 headertrailer

  • grpc.SendHeader()
  • grpc.SetHeader()
  • grpc.SetTrailer()

对于 ** 流式 RPC(streaming RPC)server 能够应用 ServerStream 接口中定义的函数向 client 发送 headertrailer

  • ServerStream.SendHeader()
  • ServerStream.SetHeader()
  • ServerStream.SetTrailer()

🌲 一般 RPC(unary RPC)

应用 grpc.SendHeader()grpc.SetTrailer() 办法,这两个函数将 context.Context 作为第一个参数

func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
  // 创立并发送 header
  header := metadata.Pairs("header-key", "val")
  grpc.SendHeader(ctx, header)
  
  // 创立并发送 trailer
  trailer := metadata.Pairs("trailer-key", "val")
  grpc.SetTrailer(ctx, trailer)
}

如果不想立刻发送 header,也能够应用grpc.SetHeader()grpc.SetHeader() 能够被屡次调用,在如下时机会把多个 metadata 合并发送进来

  • 调用grpc.SendHeader()
  • 第一个响应被发送时
  • RPC 完结时(蕴含胜利或失败)
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
  // 创立 header,在适当时机会被发送
  header := metadata.Pairs("header-key1", "val1")
  grpc.SetHeader(ctx, header)
    
  // 创立 header,在适当时机会被发送
  header := metadata.Pairs("header-key2", "val2")
  grpc.SetHeader(ctx, header)
  
  // 创立并发送 trailer
  trailer := metadata.Pairs("trailer-key", "val")
  grpc.SetTrailer(ctx, trailer)
}

🌲 流式 RPC(streaming RPC)

应用 ServerStream.SendHeader()ServerStream.SetTrailer() 办法

func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
  // create and send header
  header := metadata.Pairs("header-key", "val")
  stream.SendHeader(header)
  
  // create and set trailer
  trailer := metadata.Pairs("trailer-key", "val")
  stream.SetTrailer(trailer)
}

如果不想立刻发送 header,也能够应用ServerStream.SetHeader()ServerStream.SetHeader() 能够被屡次调用,在如下时机会把多个 metadata 合并发送进来

  • 调用ServerStream.SendHeader()
  • 第一个响应被发送时
  • RPC 完结时(蕴含胜利或失败)
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
  // create and send header
  header := metadata.Pairs("header-key", "val")
  stream.SetHeader(header)
  
  // create and set trailer
  trailer := metadata.Pairs("trailer-key", "val")
  stream.SetTrailer(trailer)
}

Client 接管 Metadata

🌲 一般 RPC(unary RPC)

一般 RPC(unary RPC)应用 grpc.Header()grpc.Trailer()办法来接管 Metadata

// RPC using the context with new metadata.
var header, trailer metadata.MD

// Add Order
order := pb.Order{Id: "101", Items: []string{"iPhone XS", "Mac Book Pro"}, Destination: "San Jose, CA", Price: 2300.00}
res, err := client.AddOrder(ctx, &order, grpc.Header(&header), grpc.Trailer(&trailer))
if err != nil {panic(err)
}

🌲 流式 RPC(streaming RPC)

流式 RPC(streaming RPC)通过调用返回的 ClientStream接口的 Header() Trailer()办法接管 metadata

stream, err := client.SomeStreamingRPC(ctx)

// retrieve header
header, err := stream.Header()

stream.CloseAndRecv()

// retrieve trailer
trailer := stream.Trailer()

HeaderTrailer 区别

基本区别:发送的机会不同!

headers会在上面三种场景下被发送

  • SendHeader() 被调用时(蕴含 grpc.SendHeaderstream.SendHeader)
  • 第一个响应被发送时
  • RPC 完结时(蕴含胜利或失败)

trailer会在 rpc 返回的时候,即这个申请完结的时候被发送

差别在流式 RPC(streaming RPC)中比拟显著:

因为 trailer 是在服务端发送完申请之后才发送的,所以 client 获取 trailer 的时候须要在 stream.CloseAndRecv 或者 stream.Recv 返回非 nil 谬误 (蕴含 io.EOF) 之后

如果 stream.CloseAndRecv 之前调用 stream.Trailer() 获取的是空

stream, err := client.SomeStreamingRPC(ctx)

// retrieve header
header, err := stream.Header()

// retrieve trailer 
// `trailer` 会在 rpc 返回的时候,即这个申请完结的时候被发送
// 因而此时调用 `stream.Trailer()` 获取的是空
trailer := stream.Trailer()

stream.CloseAndRecv()

// retrieve trailer 
// `trailer` 会在 rpc 返回的时候,即这个申请完结的时候被发送
// 因而此时调用 `stream.Trailer()` 才能够获取到值
trailer := stream.Trailer()

应用场景

既然咱们把 metadata 类比成 HTTP Header,那么metadata 的应用场景也能够借鉴 HTTPHeader。如传递用户 token 进行用户认证,传递 trace 进行链路追踪等

拦截器中的 metadata

在拦截器中,咱们岂但能够获取或批改 接管 到的 metadata,甚至还能够截取并批改要 发送 进来的metadata

还记得拦截器如何实现么?如果曾经忘了快快回顾一下吧:

🌰 举个例子:

咱们在客户端拦截器中从要发送给服务端的 metadata 中读取一个工夫戳字段,如果没有则补充这个工夫戳字段

留神这里用到了一个上文没有提到的 FromOutgoingContext(ctx) 函数

func orderUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{},
    cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {

    var s string

    // 获取要发送给服务端的 `metadata`
    md, ok := metadata.FromOutgoingContext(ctx)
    if ok && len(md.Get("time")) > 0 {s = md.Get("time")[0]
    } else {
        // 如果没有则补充这个工夫戳字段
        s = "inter" + strconv.FormatInt(time.Now().UnixNano(), 10)
        ctx = metadata.AppendToOutgoingContext(ctx, "time", s)
    }

    log.Printf("call timestamp: %s", s)

    // Invoking the remote method
    err := invoker(ctx, method, req, reply, cc, opts...)

    return err
}

func main() {
    conn, err := grpc.Dial("127.0.0.1:8009",
        grpc.WithInsecure(),
        grpc.WithChainUnaryInterceptor(orderUnaryClientInterceptor,),
    )
    if err != nil {panic(err)
    }
    
    c := pb.NewOrderManagementClient(conn)

    ctx = metadata.AppendToOutgoingContext(context.Background(), "time",
        "raw"+strconv.FormatInt(time.Now().UnixNano(), 10))

    // RPC using the context with new metadata.
    var header, trailer metadata.MD

    // Add Order
    order := pb.Order{
        Id:          "101",
        Items:       []string{"iPhone XS", "Mac Book Pro"},
        Destination: "San Jose, CA",
        Price:       2300.00,
    }
    res, err := c.AddOrder(ctx, &order)
    if err != nil {panic(err)
    }
}

以上的思路在 server 同样实用。基于以上原理咱们能够实现链路追踪、用户认证等性能

错误信息

还记得错误处理一文中留下的问题么:gRPC 中如何传递谬误音讯 Status 的呢?没错!也是应用的 metadata 或者说 http2.0headerStatus 的三种信息别离应用了三个 header

  • Grpc-Status: 传递 Statuscode
  • Grpc-Message: 传递 Statusmessage
  • Grpc-Status-Details-Bin: 传递 Statusdetails
func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error {
    // ...
        h := ht.rw.Header()
        h.Set("Grpc-Status", fmt.Sprintf("%d", st.Code()))
        if m := st.Message(); m != "" {h.Set("Grpc-Message", encodeGrpcMessage(m))
        }

        if p := st.Proto(); p != nil && len(p.Details) > 0 {stBytes, err := proto.Marshal(p)
            if err != nil {
                // TODO: return error instead, when callers are able to handle it.
                panic(err)
            }

            h.Set("Grpc-Status-Details-Bin", encodeBinHeader(stBytes))
        }
    // ...
}

总结

一张图总结下整个 metadata 的应用办法(公众号发送:metadata 总结 获取高清原文件)

举荐浏览

  • 写给 go 开发者的 gRPC 教程 - 拦截器
  • 写给 go 开发者的 gRPC 教程 - 错误处理

参考资料

  • gRPC 中的 Metadata
  • pkg.go.dev/grpc@v1.44.0/metadata
  • concept of metadata
  • Documentation/grpc-metadata

✨ 微信公众号【凉凉的知识库】同步更新,欢送关注获取最新最有用的后端常识 ✨

退出移动版