0.1、索引
https://waterflow.link/articles/1665853719750
当咱们编写 HTTP 应用程序时,您能够应用 HTTP 中间件包装特定于路由的应用程序处理程序,能够在执行应用程序处理程序之前和之后执行一些常见的逻辑。 咱们通常应用中间件来编写跨畛域组件,例如受权、日志记录、缓存等。在 gRPC 中能够应用称为拦截器的概念来实现雷同的性能。
通过应用拦截器,咱们能够在客户端和服务器上拦挡 RPC 办法的执行。 在客户端和服务器上,都有两种类型的拦截器:
- UnaryInterceptor(一元拦截器)
- StreamInterceptor(流式拦截器)
UnaryInterceptor 拦挡一元 RPC,而 StreamInterceptor 拦挡流式 RPC。
在一元 RPC 中,客户端向服务器发送单个申请并返回单个响应。 在流式 RPC 中,客户端或服务器,或单方(双向流式传输),获取一个流读取一系列音讯返回,而后客户端或服务器从返回的流中读音讯,直到没有更多音讯为止。
1、在 gRPC 客户端中编写拦截器
咱们能够在 gRPC 客户端应用程序中编写两种类型的拦截器:
- UnaryClientInterceptor:UnaryClientInterceptor 拦挡客户端上一元 RPC 的执行。
- StreamClientInterceptor:StreamClientInterceptor拦挡ClientStream的创立。 它可能会返回一个自定义的 ClientStream 来拦挡所有 I/O 操作。
1、UnaryClientInterceptor
为了创立 UnaryClientInterceptor,能够通过提供 UnaryClientInterceptor 函数值调用 WithUnaryInterceptor 函数,该函数返回一个 grpc.DialOption 指定一元 RPC 的拦截器:
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption
而后将返回的 grpc.DialOption 值用作调用 grpc.Dial 函数以将拦截器利用于一元 RPC 的参数。
UnaryClientInterceptor func 类型的定义如下:
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts …CallOption) error
参数调用者是实现 RPC 的处理程序,调用它是拦截器的责任。 UnaryClientInterceptor 函数值提供拦截器逻辑。 这是一个实现 UnaryClientInterceptor 的示例拦截器:
clientInterceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { start := time.Now().Unix() err := invoker(ctx, method, req, reply, cc, opts...) end := time.Now().Unix() // 获取调用grpc的申请时长 fmt.Println("request time duration: ", end - start) return err }
上面的函数返回一个 grpc.DialOption 值,它通过提供 UnaryClientInterceptor 函数值来调用 WithUnaryInterceptor 函数:
func WithUnaryInterceptorCustom() grpc.DialOption { clientInterceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { start := time.Now().Unix() err := invoker(ctx, method, req, reply, cc, opts...) end := time.Now().Unix() // 获取调用grpc的申请时长 fmt.Println("request time duration: ", end - start) return err } return grpc.WithUnaryInterceptor(clientInterceptor)}
返回的 grpc.DialOption 值用作调用 grpc.Dial 函数以利用拦截器的参数:
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure(), WithUnaryInterceptorCustom())
搭建简略grpc服务能够参考这篇文章:https://waterflow.link/articl...
2、StreamClientInterceptor
为了创立 StreamClientInterceptor,能够通过提供 StreamClientInterceptor 函数值调用 WithStreamInterceptor 函数,该函数返回一个 grpc.DialOption 指定流 RPC 的拦截器:
func WithStreamInterceptor(f StreamClientInterceptor) DialOption { return newFuncDialOption(func(o *dialOptions) { o.streamInt = f })}
而后将返回的 grpc.DialOption 值用作调用 grpc.Dial 函数的参数,以将拦截器利用于流式 RPC。
上面是 StreamClientInterceptor func 类型的定义:
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
上面是StreamClientInterceptor的具体实现:
// 包装下stream// 构造体内嵌接口,初始化的时候须要赋值对象实现了该接口的所有办法type wrappedStream struct { grpc.ClientStream}// 实现接管音讯办法,并自定义打印func (w *wrappedStream) RecvMsg(m interface{}) error { log.Printf("====== [Client Stream Interceptor] " + "Receive a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339)) return w.ClientStream.RecvMsg(m)}// 实现发送音讯办法,并自定义打印func (w *wrappedStream) SendMsg(m interface{}) error { log.Printf("====== [Client Stream Interceptor] " + "Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339)) return w.ClientStream.SendMsg(m)}// 初始化包装streamfunc newWrappedStream(s grpc.ClientStream) grpc.ClientStream { return &wrappedStream{s}}func WithStreamInterceptorCustom() grpc.DialOption { clientInterceptor := func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { clientStream, err := streamer(ctx, desc, cc, method, opts...) if err != nil { return nil, err } // 返回包装后的stream // 这里clientStream实现了grpc.ClientStream接口 return newWrappedStream(clientStream), err } return grpc.WithStreamInterceptor(clientInterceptor)}
这里须要留神:
- 定义一个包装stream构造体wrappedStream,这里用到了构造体内嵌接口的形式,间接实现了接口的所有办法,具体能够看正文
- 重写RecvMsg和SendMsg办法
- WithStreamInterceptorCustom拦截器中染回包装后的clientStream
为了将 StreamClientInterceptor 利用于流式 RPC,只需将 WithStreamInterceptor 函数返回的 grpc.DialOption 值作为调用 grpc.Dial 函数的参数传递。 您能够将 UnaryClientInterceptor 和 StreamClientInterceptor 值传递给 grpc.Dial 函数。
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure(), WithUnaryInterceptorCustom(), WithStreamInterceptorCustom())
残缺的客户端代码像,上面这样:
package mainimport ( "context" "fmt" "google.golang.org/grpc" "grpcdemo/helloservice" "io" "log" "time")func main() { // 连贯grpc服务端,退出拦截器 conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure(), WithUnaryInterceptorCustom(), WithStreamInterceptorCustom()) if err != nil { log.Fatal(err) } defer conn.Close() // 一元rpc unaryRpc(conn) // 流式rpc streamRpc(conn)}// 一元拦截器func WithUnaryInterceptorCustom() grpc.DialOption { clientInterceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { start := time.Now().Unix() err := invoker(ctx, method, req, reply, cc, opts...) end := time.Now().Unix() fmt.Println("invoker request time duration: ", end - start) return err } return grpc.WithUnaryInterceptor(clientInterceptor)}type wrappedStream struct { grpc.ClientStream}func (w *wrappedStream) RecvMsg(m interface{}) error { log.Printf("====== [Client Stream Interceptor] " + "Receive a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339)) return w.ClientStream.RecvMsg(m)}func (w *wrappedStream) SendMsg(m interface{}) error { log.Printf("====== [Client Stream Interceptor] " + "Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339)) return w.ClientStream.SendMsg(m)}func newWrappedStream(s grpc.ClientStream) grpc.ClientStream { return &wrappedStream{s}}// 流式拦截器func WithStreamInterceptorCustom() grpc.DialOption { clientInterceptor := func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { clientStream, err := streamer(ctx, desc, cc, method, opts...) if err != nil { return nil, err } return newWrappedStream(clientStream), err } return grpc.WithStreamInterceptor(clientInterceptor)}func unaryRpc(conn *grpc.ClientConn) { client := helloservice.NewHelloServiceClient(conn) reply, err := client.Hello(context.Background(), &helloservice.String{Value: "hello"}) if err != nil { log.Fatal(err) } log.Println("unaryRpc recv: ", reply.Value)}func streamRpc(conn *grpc.ClientConn) { client := helloservice.NewHelloServiceClient(conn) stream, err := client.Channel(context.Background()) if err != nil { log.Fatal(err) } go func() { for { if err := stream.Send(&helloservice.String{Value: "hi"}); err != nil { log.Fatal(err) } time.Sleep(time.Second) } }() for { recv, err := stream.Recv() if err != nil { if err == io.EOF { break } log.Fatal(err) } fmt.Println("streamRpc recv: ", recv.Value) }}
能够联合我之前的文章,把本期的代码加进去运行调试
(搭建简略grpc服务能够参考这篇文章:https://waterflow.link/articl...)
运行成果如下:
go run helloclient/main.goinvoker request time duration: 12022/10/14 23:17:35 unaryRpc recv: hello:hello2022/10/14 23:17:35 ====== [Client Stream Interceptor] Receive a message (Type: *helloservice.String) at 2022-10-14T23:17:35+08:002022/10/14 23:17:35 ====== [Client Stream Interceptor] Send a message (Type: *helloservice.String) at 2022-10-14T23:17:35+08:002022/10/14 23:17:36 ====== [Client Stream Interceptor] Send a message (Type: *helloservice.String) at 2022-10-14T23:17:36+08:00streamRpc recv: hello:hi2022/10/14 23:17:36 ====== [Client Stream Interceptor] Receive a message (Type: *helloservice.String) at 2022-10-14T23:17:36+08:002022/10/14 23:17:37 ====== [Client Stream Interceptor] Send a message (Type: *helloservice.String) at 2022-10-14T23:17:37+08:00streamRpc recv: hello:hi2022/10/14 23:17:37 ====== [Client Stream Interceptor] Receive a message (Type: *helloservice.String) at 2022-10-14T23:17:37+08:002022/10/14 23:17:38 ====== [Client Stream Interceptor] Send a message (Type: *helloservice.String) at 2022-10-14T23:17:38+08:00streamRpc recv: hello:hi2022/10/14 23:17:38 ====== [Client Stream Interceptor] Receive a message (Type: *helloservice.String) at 2022-10-14T23:17:38+08:00
2、在 gRPC 客户端中编写拦截器
和 gRPC 客户端应用程序一样,gRPC 服务器应用程序提供两种类型的拦截器:
- UnaryServerInterceptor:提供了一个钩子来拦挡服务器上一元 RPC 的执行。
- StreamServerInterceptor:提供了一个钩子来拦挡服务器上流式 RPC 的执行。
1、UnaryServerInterceptor
为了创立 UnaryServerInterceptor,能够通过提供 UnaryServerInterceptor 函数值作为参数调用 UnaryInterceptor 函数,该参数返回为服务器设置 UnaryServerInterceptor 的 grpc.ServerOption 值。
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption { return newFuncServerOption(func(o *serverOptions) { if o.unaryInt != nil { panic("The unary server interceptor was already set and may not be reset.") } o.unaryInt = i })}
而后应用返回的 grpc.ServerOption 值作为参数提供给 grpc.NewServer 函数以注册为 UnaryServerInterceptor。
UnaryServerInterceptor 函数的定义如下:
func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error)
参数 info 蕴含了这个 RPC 的所有信息,拦截器能够对其进行操作。 而handler是服务办法实现的包装器。 拦截器负责调用处理程序来实现 RPC。
1、定义一个服务端的鉴权一元拦截器
func ServerUnaryInterceptorCustom() grpc.ServerOption { serverInterceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { start := time.Now() // 如果是非登录申请,须要验证token if info.FullMethod != "/helloservice.HelloService/Login" { if err := authorize(ctx); err != nil { return nil, err } } h, err := handler(ctx, req) log.Printf("Request - Method:%s\tDuration:%s\tError:%v\n", info.FullMethod, time.Since(start), err) return h, err } return grpc.UnaryInterceptor(serverInterceptor)}// authorize 从Metadata中获取token并校验是否非法func authorize(ctx context.Context) error { // 从context中获取metadata md, ok := metadata.FromIncomingContext(ctx) if !ok { return status.Errorf(codes.InvalidArgument, "Retrieving metadata is failed") } authHeader, ok := md["authorization"] if !ok { return status.Errorf(codes.Unauthenticated, "Authorization token is not supplied") } token := authHeader[0] // validateToken function validates the token err := validateToken(token) if err != nil { return status.Errorf(codes.Unauthenticated, err.Error()) } return nil}func validateToken(token string) error { // 校验token return nil}
咱们能够看下咱们定义的一元拦截器的执行流程:
- 首先进来之后咱们判断如果是登录申请,间接转发申请,并打印日志
- 如果是非登录申请,须要验证token,调用authorize办法
- 在authorize办法中,会从context中获取metadata元数据,而后解析获取token并验证
请留神,后面代码块中的拦截器逻辑应用包 google.golang.org/grpc/codes 和 google.golang.org/grpc/status。
2、grpc客户端传入token
gRPC 反对在客户端和服务器之间应用 Context 值发送元数据。 google.golang.org/grpc/metadata 包提供了元数据的性能。
其中MD类型是一个k-v的map,想上面这样
type MD map[string][]string
上面咱们在客户端编写向服务端发送token的代码,咱们批改下客户端的unaryRpc,结构蕴含authorization的metadata:
func unaryRpc(conn *grpc.ClientConn) { client := helloservice.NewHelloServiceClient(conn) ctx := context.Background() // 结构元数据,并返回MD类型的构造 md := metadata.Pairs("authorization", "mytoken") // 元数据塞入context并返回新的context ctx = metadata.NewOutgoingContext(ctx, md) reply, err := client.Hello(ctx, &helloservice.String{Value: "hello"}) if err != nil { log.Fatal(err) } log.Println("unaryRpc recv: ", reply.Value)}
这样元数据的信息就会跟着context发送到grpc服务端。
接着咱们在服务端grpc中批改如下代码,退出一行日志:
func validateToken(token string) error { log.Printf("get the token: %s \n", token) // 校验token return nil}
3、运行服务
咱们从新执行下grpc服务端程序,而后运行下客户端代码,能够看到token传过来了:
go run helloservice/main/main.go2022/10/15 20:36:04 server started...2022/10/15 20:36:08 get the token: mytoken 2022/10/15 20:36:09 Request - Method:/helloservice.HelloService/Hello Duration:1.001216763s Error:<nil>
2、StreamServerInterceptor
为了创立 StreamServerInterceptor,通过提供 StreamServerInterceptor func 值作为参数调用 StreamInterceptor 函数,该参数返回为服务器设置 StreamServerInterceptor 的 grpc.ServerOption 值。
func StreamInterceptor(i StreamServerInterceptor) ServerOption { return newFuncServerOption(func(o *serverOptions) { if o.streamInt != nil { panic("The stream server interceptor was already set and may not be reset.") } o.streamInt = i })}
而后应用返回的 grpc.ServerOption 值作为参数提供给 grpc.NewServer 函数以注册为 UnaryServerInterceptor。
上面是 StreamServerInterceptor func 类型的定义:
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
咱们看下服务端流式拦截器的具体例子:
type wrappedStream struct { grpc.ServerStream}func (w *wrappedStream) RecvMsg(m interface{}) error { log.Printf("====== [Server Stream Interceptor] " + "Receive a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339)) return w.ServerStream.RecvMsg(m)}func (w *wrappedStream) SendMsg(m interface{}) error { log.Printf("====== [Server Stream Interceptor] " + "Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339)) return w.ServerStream.SendMsg(m)}func newWrappedStream(s grpc.ServerStream) grpc.ServerStream { return &wrappedStream{s}}func ServerStreamInterceptorCustom() grpc.ServerOption { serverInterceptor := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { return handler(srv, newWrappedStream(ss)) } return grpc.StreamInterceptor(serverInterceptor)}
下面服务端流式拦截器代码可参考客户端流式拦截器的代码,根本差不多。
3、多拦截器
go-grpc在v1.28.0之前是不反对多个拦截器。然而能够应用一些第三方的包,拦截器链接允利用多个拦截器。
v1.28.0之后曾经能够反对多个拦截器,咱们批改下服务端代码如下:
...unaryInterceptors := []grpc.ServerOption { ServerUnaryInterceptorCustom(), ServerStreamInterceptorCustom(), }grpcServer := grpc.NewServer(unaryInterceptors...)...