共计 3555 个字符,预计需要花费 9 分钟才能阅读完成。
1. 熔断器作用
熔断器是对于 一段时间内申请失败数 超过 设定的阈值 的客户端,之后不再申请后端服务,间接返回出错信息,以防申请工作沉积。过期之后的申请失常拜访后端服务。
2. grpc 拦截器
grpc 拦截器分为两类:
- 一元拦截器 UnaryInterceptor, 能够拦挡一元 rpc 申请
- 流式拦截器 StreamInterceptor, 能够拦挡服务端流式 rpc、客户端流式、双向散失 rpc 申请
罕用来做日志,认证,metric 等等
3. go-zero 熔断器的应用
go-zero 应用 grpc 的一元和流式,总共有 6 个拦截器。
func (c *client) buildDialOptions(opts ...ClientOption) []grpc.DialOption {
options = append(options,
WithUnaryClientInterceptors(
clientinterceptors.UnaryTracingInterceptor,
clientinterceptors.DurationInterceptor,
clientinterceptors.PrometheusInterceptor,
clientinterceptors.BreakerInterceptor, // 熔断器应用
clientinterceptors.TimeoutInterceptor(cliOpts.Timeout),
),
WithStreamClientInterceptors(clientinterceptors.StreamTracingInterceptor,),
)
}
func (c *client) dial(server string, opts ...ClientOption) error {options := c.buildDialOptions(opts...) // 结构 options
timeCtx, cancel := context.WithTimeout(context.Background(), dialTimeout)
defer cancel()
conn, err := grpc.DialContext(timeCtx, server, options...)// 调用入口
if err != nil {
service := server
if errors.Is(err, context.DeadlineExceeded) {pos := strings.LastIndexByte(server, separator)
// len(server) - 1 is the index of last char
if 0 < pos && pos < len(server)-1 {service = server[pos+1:]
}
}
return fmt.Errorf("rpc dial: %s, error: %s, make sure rpc service %q is already started",
server, err.Error(), service)
}
c.conn = conn
return nil
}
// WithStreamClientInterceptors uses given client stream interceptors.
func WithStreamClientInterceptors(interceptors ...grpc.StreamClientInterceptor) grpc.DialOption {return grpc.WithChainStreamInterceptor(interceptors...)
}
// WithUnaryClientInterceptors uses given client unary interceptors.
func WithUnaryClientInterceptors(interceptors ...grpc.UnaryClientInterceptor) grpc.DialOption {return grpc.WithChainUnaryInterceptor(interceptors...)
}
其余拦截器本篇暂不介绍,先重点看一下熔断器拦截器 BreakerInterceptor
func BreakerInterceptor(ctx context.Context, method string, req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {breakerName := path.Join(cc.Target(), method) // 拦截器名是 target+method
return breaker.DoWithAcceptable(breakerName, func() error {return invoker(ctx, method, req, reply, cc, opts...)// 发动一次 grpc 申请
}, codes.Acceptable)
}
// 定义哪些错误码为须要拦挡的。// Acceptable checks if given error is acceptable.
func Acceptable(err error) bool {switch status.Code(err) {
case codes.DeadlineExceeded, codes.Internal, codes.Unavailable, codes.DataLoss, codes.Unimplemented:
return false
default:
return true
}
}
// DoWithAcceptable calls Breaker.DoWithAcceptable on the Breaker with given name.
func DoWithAcceptable(name string, req func() error, acceptable Acceptable) error {return do(name, func(b Breaker) error {return b.DoWithAcceptable(req, acceptable)
})
}
// 获取拦截器
func do(name string, execute func(b Breaker) error) error {return execute(GetBreaker(name))
}
// GetBreaker returns the Breaker with the given name.
func GetBreaker(name string) Breaker {lock.RLock()
b, ok := breakers[name]
lock.RUnlock()
if ok {return b}
lock.Lock()
b, ok = breakers[name]
if !ok {b = NewBreaker(WithName(name))
breakers[name] = b
}
lock.Unlock()
return b
}
最终调用的是
func (cb *circuitBreaker) DoWithAcceptable(req func() error, acceptable Acceptable) error {return cb.throttle.doReq(req, nil, acceptable)
}
底层应用的是 googleBreaker, 其判断是否熔断的条件是:
// 依照最近一段时间的申请数据计算是否熔断
func (b *googleBreaker) accept() error {
// 获取最近一段时间的统计数据
accepts, total := b.history()
// 计算动静熔断概率
weightedAccepts := b.k * float64(accepts)
// https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
// 概率为 0,通过
if dropRatio <= 0 {return nil}
// 随机产生 0.0-1.0 之间的随机数与下面计算出来的熔断概率相比拟
// 如果随机数比熔断概率小则进行熔断
if b.proba.TrueOnProba(dropRatio) {return ErrServiceUnavailable}
return nil
}
参考文章:
grpc 拦截器: https://zhuanlan.zhihu.com/p/…
go-zero 服务治理 - 自适应熔断器:https://juejin.cn/post/702853…
正文完