1. 熔断器作用
熔断器是对于一段时间内申请失败数超过设定的阈值的客户端,之后不再申请后端服务,间接返回出错信息,以防申请工作沉积。过期之后的申请失常拜访后端服务。
2. grpc拦截器
grpc拦截器分为两类:
- 一元拦截器UnaryInterceptor, 能够拦挡一元rpc申请
- 流式拦截器StreamInterceptor, 能够拦挡服务端流式rpc、客户端流式、双向散失rpc申请
罕用来做日志,认证,metric 等等
3. go-zero 熔断器的应用
go-zero 应用grpc的一元和流式,总共有6个拦截器。
func (c *client) buildDialOptions(opts ...ClientOption) []grpc.DialOption { options = append(options, WithUnaryClientInterceptors( clientinterceptors.UnaryTracingInterceptor, clientinterceptors.DurationInterceptor, clientinterceptors.PrometheusInterceptor, clientinterceptors.BreakerInterceptor, //熔断器应用 clientinterceptors.TimeoutInterceptor(cliOpts.Timeout), ), WithStreamClientInterceptors( clientinterceptors.StreamTracingInterceptor, ), )}func (c *client) dial(server string, opts ...ClientOption) error { options := c.buildDialOptions(opts...) //结构options timeCtx, cancel := context.WithTimeout(context.Background(), dialTimeout) defer cancel() conn, err := grpc.DialContext(timeCtx, server, options...)//调用入口 if err != nil { service := server if errors.Is(err, context.DeadlineExceeded) { pos := strings.LastIndexByte(server, separator) // len(server) - 1 is the index of last char if 0 < pos && pos < len(server)-1 { service = server[pos+1:] } } return fmt.Errorf("rpc dial: %s, error: %s, make sure rpc service %q is already started", server, err.Error(), service) } c.conn = conn return nil}// WithStreamClientInterceptors uses given client stream interceptors.func WithStreamClientInterceptors(interceptors ...grpc.StreamClientInterceptor) grpc.DialOption { return grpc.WithChainStreamInterceptor(interceptors...)}// WithUnaryClientInterceptors uses given client unary interceptors.func WithUnaryClientInterceptors(interceptors ...grpc.UnaryClientInterceptor) grpc.DialOption { return grpc.WithChainUnaryInterceptor(interceptors...)}
其余拦截器本篇暂不介绍,先重点看一下熔断器拦截器BreakerInterceptor
func BreakerInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { breakerName := path.Join(cc.Target(), method) //拦截器名是target+method return breaker.DoWithAcceptable(breakerName, func() error { return invoker(ctx, method, req, reply, cc, opts...)//发动一次grpc申请 }, codes.Acceptable)}//定义哪些错误码为须要拦挡的。// Acceptable checks if given error is acceptable.func Acceptable(err error) bool { switch status.Code(err) { case codes.DeadlineExceeded, codes.Internal, codes.Unavailable, codes.DataLoss, codes.Unimplemented: return false default: return true }}// DoWithAcceptable calls Breaker.DoWithAcceptable on the Breaker with given name.func DoWithAcceptable(name string, req func() error, acceptable Acceptable) error { return do(name, func(b Breaker) error { return b.DoWithAcceptable(req, acceptable) })}//获取拦截器func do(name string, execute func(b Breaker) error) error { return execute(GetBreaker(name))}// GetBreaker returns the Breaker with the given name.func GetBreaker(name string) Breaker { lock.RLock() b, ok := breakers[name] lock.RUnlock() if ok { return b } lock.Lock() b, ok = breakers[name] if !ok { b = NewBreaker(WithName(name)) breakers[name] = b } lock.Unlock() return b}最终调用的是func (cb *circuitBreaker) DoWithAcceptable(req func() error, acceptable Acceptable) error { return cb.throttle.doReq(req, nil, acceptable)}
底层应用的是googleBreaker,其判断是否熔断的条件是:
//依照最近一段时间的申请数据计算是否熔断func (b *googleBreaker) accept() error { //获取最近一段时间的统计数据 accepts, total := b.history() //计算动静熔断概率 weightedAccepts := b.k * float64(accepts) // https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101 dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1)) //概率为0,通过 if dropRatio <= 0 { return nil } //随机产生0.0-1.0之间的随机数与下面计算出来的熔断概率相比拟 //如果随机数比熔断概率小则进行熔断 if b.proba.TrueOnProba(dropRatio) { return ErrServiceUnavailable } return nil}
参考文章:
grpc拦截器: https://zhuanlan.zhihu.com/p/...
go-zero服务治理-自适应熔断器:https://juejin.cn/post/702853...