本篇为【写给 go 开发者的 gRPC 教程系列】第五篇
第一篇:protobuf 根底
第二篇:通信模式
第三篇:拦截器
第四篇:错误处理
第五篇:metadata 👈
本系列将继续更新,欢送关注👏获取实时告诉
导语
和在一般
HTTP
申请中一样,gRPC 提供了在每一次 RPC 中携带的上下文构造:metadata
。在 Go 语言中,它与context.Context
紧密结合,帮忙咱们实现服务端与客户端之间相互传递信息
什么是metadata
?
gRPC 的 metadata
简略了解,就是 HTTP Header
中的 key-value 对
metadata
是以key-value
的模式存储数据的,其中 key 是 string 类型,而 value 是 []string,即一个字符串数组类型metadata
使得 client 和 server 可能为对方提供对于本次调用的一些信息,就像一次 HTTP 申请的Request Header
和Response Header
一样HTTP Header
的生命周期是一次 HTTP 申请,那么metadata
的生命周期就是一次 RPC 调用
Metadata 创立
🌲 应用 New():
md := metadata.New(map[string]string{"key1":"value1","key2":"value2"})
🌲 应用 Pairs():
要留神如果有雷同的 key 会主动合并
md := metadata.Pairs(
"key1", "value1",
"key1", "value1.2", // "key1" will have map value []string{"value1", "value1.2"}
"key2", "value2",
)
🌲 合并多个 metadata
md1 := metadata.Pairs("k1", "v1", "k2", "v2")
md2 := metadata.New(map[string]string{"key1":"value1","key2":"value2"})
md := metadata.Join(md1, md2)
🌲 存储二进制数据
在 metadata 中,key 永远是 string 类型,然而 value 能够是 string 也能够是二进制数据。为了在 metadata 中存储二进制数据,咱们仅仅须要在 key 的前面加上一个 – bin 后缀。具备 – bin 后缀的 key 所对应的 value 在创立 metadata 时会被编码(base64),收到的时候会被解码:
md := metadata.Pairs(
"key", "string value",
"key-bin", string([]byte{96, 102}),
)
metadata 构造自身也有一些操作方法,参考文档非常容易了解。这里不再赘述:https://pkg.go.dev/google.gol…
Metadata 发送与接管
让咱们再次回顾下 pb 文件和生成进去的 client 与 server 端的接口
service OrderManagement {rpc getOrder(google.protobuf.StringValue) returns (Order);
}
type OrderManagementClient interface {
GetOrder(ctx context.Context,
in *wrapperspb.StringValue, opts ...grpc.CallOption) (*Order, error)
}
type OrderManagementServer interface {GetOrder(context.Context, *wrapperspb.StringValue) (*Order, error)
mustEmbedUnimplementedOrderManagementServer()}
能够看到相比 pb 中的接口定义,生成进去的 Go 代码除了减少了 error
返回值,还多了context.Context
和错误处理相似,gRPC 中的 context.Context
也合乎 Go 语言的应用习惯:通常状况下咱们在函数首个参数搁置context.Context
用来传递一次 RPC 中无关的上下文,借助 context.WithValue()
或ctx.Value()
往 context
增加变量或读取变量
metadata
就是 gRPC 中能够传递的上下文信息之一,所以 metadata
的应用形式就是:metadata
记录到 context
,从context
读取metadata
Clinet 发送 Server 接管
client
发送 metadata
,那就是把metadata
存储到contex.Context
server
接管 metadata
,就是从contex.Context
中读取Metadata
Clinet 发送 Metadata
把 Metadata
放到contex.Context
,有几种形式
🌲 应用NewOutgoingContext
将新创建的 metadata
增加到 context
中,这样会 笼罩 掉原来已有的metadata
// 将 metadata 增加到 context 中,获取新的 context
md := metadata.Pairs("k1", "v1", "k1", "v2", "k2", "v3")
ctx := metadata.NewOutgoingContext(context.Background(), md)
// unary RPC
response, err := client.SomeRPC(ctx, someRequest)
// streaming RPC
stream, err := client.SomeStreamingRPC(ctx)
🌲 应用AppendToOutgoingContext
能够间接将 key-value 对增加到已有的 context
中
- 如果
context
中没有metadata
,那么就会 创立 一个 - 如果已有
metadata
,那么就将数据 增加 到原来的metadata
// 如果对应的 context 没有 metadata,那么就会创立一个
ctx := metadata.AppendToOutgoingContext(ctx, "k1", "v1", "k1", "v2", "k2", "v3")
// 如果已有 metadata 了,那么就将数据增加到原来的 metadata (例如在拦截器中)
ctx := metadata.AppendToOutgoingContext(ctx, "k3", "v4")
// 一般 RPC(unary RPC)response, err := client.SomeRPC(ctx, someRequest)
// 流式 RPC(streaming RPC)stream, err := client.SomeStreamingRPC(ctx)
Server 接管 Metedata
一般 RPC 与流式 RPC 的区别不大,都是从 contex.Context
中读取metadata
🌲 应用FromIncomingContext
一般 RPC(unary RPC)
//Unary Call
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {md, ok := metadata.FromIncomingContext(ctx)
// do something with metadata
}
流式 RPC(streaming RPC)
//Streaming Call
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {md, ok := metadata.FromIncomingContext(stream.Context()) // get context from stream
// do something with metadata
}
Server 发送 Clinet 接管
服务端发送的 metadata
被分成了 header
和 trailer
两者,因此客户端也能够读取两者
Server 发送 Metadata
对于 一般 RPC(unary RPC)server 能够应用 grpc 包中提供的函数向 client 发送 header
和trailer
grpc.SendHeader()
grpc.SetHeader()
grpc.SetTrailer()
对于 ** 流式 RPC(streaming RPC)server 能够应用 ServerStream 接口中定义的函数向 client 发送 header
和 trailer
ServerStream.SendHeader()
ServerStream.SetHeader()
ServerStream.SetTrailer()
🌲 一般 RPC(unary RPC)
应用 grpc.SendHeader()
和 grpc.SetTrailer()
办法,这两个函数将 context.Context
作为第一个参数
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
// 创立并发送 header
header := metadata.Pairs("header-key", "val")
grpc.SendHeader(ctx, header)
// 创立并发送 trailer
trailer := metadata.Pairs("trailer-key", "val")
grpc.SetTrailer(ctx, trailer)
}
如果不想立刻发送 header
,也能够应用grpc.SetHeader()
。grpc.SetHeader()
能够被屡次调用,在如下时机会把多个 metadata
合并发送进来
- 调用
grpc.SendHeader()
- 第一个响应被发送时
- RPC 完结时(蕴含胜利或失败)
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
// 创立 header,在适当时机会被发送
header := metadata.Pairs("header-key1", "val1")
grpc.SetHeader(ctx, header)
// 创立 header,在适当时机会被发送
header := metadata.Pairs("header-key2", "val2")
grpc.SetHeader(ctx, header)
// 创立并发送 trailer
trailer := metadata.Pairs("trailer-key", "val")
grpc.SetTrailer(ctx, trailer)
}
🌲 流式 RPC(streaming RPC)
应用 ServerStream.SendHeader()
和 ServerStream.SetTrailer()
办法
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
// create and send header
header := metadata.Pairs("header-key", "val")
stream.SendHeader(header)
// create and set trailer
trailer := metadata.Pairs("trailer-key", "val")
stream.SetTrailer(trailer)
}
如果不想立刻发送 header
,也能够应用ServerStream.SetHeader()
。ServerStream.SetHeader()
能够被屡次调用,在如下时机会把多个 metadata
合并发送进来
- 调用
ServerStream.SendHeader()
- 第一个响应被发送时
- RPC 完结时(蕴含胜利或失败)
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
// create and send header
header := metadata.Pairs("header-key", "val")
stream.SetHeader(header)
// create and set trailer
trailer := metadata.Pairs("trailer-key", "val")
stream.SetTrailer(trailer)
}
Client 接管 Metadata
🌲 一般 RPC(unary RPC)
一般 RPC(unary RPC)应用 grpc.Header()
和grpc.Trailer()
办法来接管 Metadata
// RPC using the context with new metadata.
var header, trailer metadata.MD
// Add Order
order := pb.Order{Id: "101", Items: []string{"iPhone XS", "Mac Book Pro"}, Destination: "San Jose, CA", Price: 2300.00}
res, err := client.AddOrder(ctx, &order, grpc.Header(&header), grpc.Trailer(&trailer))
if err != nil {panic(err)
}
🌲 流式 RPC(streaming RPC)
流式 RPC(streaming RPC)通过调用返回的 ClientStream
接口的 Header()
和 Trailer()
办法接管 metadata
stream, err := client.SomeStreamingRPC(ctx)
// retrieve header
header, err := stream.Header()
stream.CloseAndRecv()
// retrieve trailer
trailer := stream.Trailer()
Header
和 Trailer
区别
基本区别:发送的机会不同!
✨ headers
会在上面三种场景下被发送
SendHeader()
被调用时(蕴含grpc.SendHeader
和stream.SendHeader
)- 第一个响应被发送时
- RPC 完结时(蕴含胜利或失败)
✨ trailer
会在 rpc 返回的时候,即这个申请完结的时候被发送
差别在流式 RPC(streaming RPC)中比拟显著:
因为 trailer
是在服务端发送完申请之后才发送的,所以 client 获取 trailer
的时候须要在 stream.CloseAndRecv
或者 stream.Recv
返回非 nil 谬误 (蕴含 io.EOF) 之后
如果 stream.CloseAndRecv
之前调用 stream.Trailer()
获取的是空
stream, err := client.SomeStreamingRPC(ctx)
// retrieve header
header, err := stream.Header()
// retrieve trailer
// `trailer` 会在 rpc 返回的时候,即这个申请完结的时候被发送
// 因而此时调用 `stream.Trailer()` 获取的是空
trailer := stream.Trailer()
stream.CloseAndRecv()
// retrieve trailer
// `trailer` 会在 rpc 返回的时候,即这个申请完结的时候被发送
// 因而此时调用 `stream.Trailer()` 才能够获取到值
trailer := stream.Trailer()
应用场景
既然咱们把 metadata
类比成 HTTP Header
,那么metadata
的应用场景也能够借鉴 HTTP
的Header
。如传递用户 token
进行用户认证,传递 trace
进行链路追踪等
拦截器中的 metadata
在拦截器中,咱们岂但能够获取或批改 接管 到的 metadata
,甚至还能够截取并批改要 发送 进来的metadata
还记得拦截器如何实现么?如果曾经忘了快快回顾一下吧:
🌰 举个例子:
咱们在客户端拦截器中从要发送给服务端的 metadata
中读取一个工夫戳字段,如果没有则补充这个工夫戳字段
留神这里用到了一个上文没有提到的 FromOutgoingContext(ctx)
函数
func orderUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
var s string
// 获取要发送给服务端的 `metadata`
md, ok := metadata.FromOutgoingContext(ctx)
if ok && len(md.Get("time")) > 0 {s = md.Get("time")[0]
} else {
// 如果没有则补充这个工夫戳字段
s = "inter" + strconv.FormatInt(time.Now().UnixNano(), 10)
ctx = metadata.AppendToOutgoingContext(ctx, "time", s)
}
log.Printf("call timestamp: %s", s)
// Invoking the remote method
err := invoker(ctx, method, req, reply, cc, opts...)
return err
}
func main() {
conn, err := grpc.Dial("127.0.0.1:8009",
grpc.WithInsecure(),
grpc.WithChainUnaryInterceptor(orderUnaryClientInterceptor,),
)
if err != nil {panic(err)
}
c := pb.NewOrderManagementClient(conn)
ctx = metadata.AppendToOutgoingContext(context.Background(), "time",
"raw"+strconv.FormatInt(time.Now().UnixNano(), 10))
// RPC using the context with new metadata.
var header, trailer metadata.MD
// Add Order
order := pb.Order{
Id: "101",
Items: []string{"iPhone XS", "Mac Book Pro"},
Destination: "San Jose, CA",
Price: 2300.00,
}
res, err := c.AddOrder(ctx, &order)
if err != nil {panic(err)
}
}
以上的思路在 server 同样实用。基于以上原理咱们能够实现链路追踪、用户认证等性能
错误信息
还记得错误处理一文中留下的问题么:gRPC 中如何传递谬误音讯 Status
的呢?没错!也是应用的 metadata
或者说 http2.0
的header
。Status
的三种信息别离应用了三个 header
头
Grpc-Status
: 传递Status
的code
Grpc-Message
: 传递Status
的message
Grpc-Status-Details-Bin
: 传递Status
的details
func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error {
// ...
h := ht.rw.Header()
h.Set("Grpc-Status", fmt.Sprintf("%d", st.Code()))
if m := st.Message(); m != "" {h.Set("Grpc-Message", encodeGrpcMessage(m))
}
if p := st.Proto(); p != nil && len(p.Details) > 0 {stBytes, err := proto.Marshal(p)
if err != nil {
// TODO: return error instead, when callers are able to handle it.
panic(err)
}
h.Set("Grpc-Status-Details-Bin", encodeBinHeader(stBytes))
}
// ...
}
总结
一张图总结下整个 metadata
的应用办法(公众号发送:metadata 总结 获取高清原文件)
举荐浏览
- 写给 go 开发者的 gRPC 教程 - 拦截器
- 写给 go 开发者的 gRPC 教程 - 错误处理
参考资料
- gRPC 中的 Metadata
- pkg.go.dev/grpc@v1.44.0/metadata
- concept of metadata
- Documentation/grpc-metadata
✨ 微信公众号【凉凉的知识库】同步更新,欢送关注获取最新最有用的后端常识 ✨