关于go-zero:gozero源码学习熔断器

51次阅读

共计 3555 个字符,预计需要花费 9 分钟才能阅读完成。

1. 熔断器作用

熔断器是对于 一段时间内申请失败数 超过 设定的阈值 客户端,之后不再申请后端服务,间接返回出错信息,以防申请工作沉积。过期之后的申请失常拜访后端服务。

2. grpc 拦截器

grpc 拦截器分为两类:

  1. 一元拦截器 UnaryInterceptor, 能够拦挡一元 rpc 申请
  2. 流式拦截器 StreamInterceptor, 能够拦挡服务端流式 rpc、客户端流式、双向散失 rpc 申请

罕用来做日志,认证,metric 等等

3. go-zero 熔断器的应用

go-zero 应用 grpc 的一元和流式,总共有 6 个拦截器。


func (c *client) buildDialOptions(opts ...ClientOption) []grpc.DialOption {
    options = append(options,
        WithUnaryClientInterceptors(
            clientinterceptors.UnaryTracingInterceptor,
            clientinterceptors.DurationInterceptor,
            clientinterceptors.PrometheusInterceptor,
            clientinterceptors.BreakerInterceptor, // 熔断器应用
            clientinterceptors.TimeoutInterceptor(cliOpts.Timeout),
        ),
        WithStreamClientInterceptors(clientinterceptors.StreamTracingInterceptor,),
    )
}

func (c *client) dial(server string, opts ...ClientOption) error {options := c.buildDialOptions(opts...) // 结构 options
    timeCtx, cancel := context.WithTimeout(context.Background(), dialTimeout)
    defer cancel()
    conn, err := grpc.DialContext(timeCtx, server, options...)// 调用入口
    if err != nil {
        service := server
        if errors.Is(err, context.DeadlineExceeded) {pos := strings.LastIndexByte(server, separator)
            // len(server) - 1 is the index of last char
            if 0 < pos && pos < len(server)-1 {service = server[pos+1:]
            }
        }
        return fmt.Errorf("rpc dial: %s, error: %s, make sure rpc service %q is already started",
            server, err.Error(), service)
    }

    c.conn = conn
    return nil
}

// WithStreamClientInterceptors uses given client stream interceptors.
func WithStreamClientInterceptors(interceptors ...grpc.StreamClientInterceptor) grpc.DialOption {return grpc.WithChainStreamInterceptor(interceptors...)
}

// WithUnaryClientInterceptors uses given client unary interceptors.
func WithUnaryClientInterceptors(interceptors ...grpc.UnaryClientInterceptor) grpc.DialOption {return grpc.WithChainUnaryInterceptor(interceptors...)
}

其余拦截器本篇暂不介绍,先重点看一下熔断器拦截器 BreakerInterceptor

func BreakerInterceptor(ctx context.Context, method string, req, reply interface{},
    cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {breakerName := path.Join(cc.Target(), method) // 拦截器名是 target+method 
    return breaker.DoWithAcceptable(breakerName, func() error {return invoker(ctx, method, req, reply, cc, opts...)// 发动一次 grpc 申请
    }, codes.Acceptable)
}

// 定义哪些错误码为须要拦挡的。// Acceptable checks if given error is acceptable.
func Acceptable(err error) bool {switch status.Code(err) {
    case codes.DeadlineExceeded, codes.Internal, codes.Unavailable, codes.DataLoss, codes.Unimplemented:
        return false
    default:
        return true
    }
}

// DoWithAcceptable calls Breaker.DoWithAcceptable on the Breaker with given name.
func DoWithAcceptable(name string, req func() error, acceptable Acceptable) error {return do(name, func(b Breaker) error {return b.DoWithAcceptable(req, acceptable)
    })
}

// 获取拦截器
func do(name string, execute func(b Breaker) error) error {return execute(GetBreaker(name))
}


// GetBreaker returns the Breaker with the given name.
func GetBreaker(name string) Breaker {lock.RLock()
    b, ok := breakers[name]
    lock.RUnlock()
    if ok {return b}

    lock.Lock()
    b, ok = breakers[name]
    if !ok {b = NewBreaker(WithName(name)) 
        breakers[name] = b
    }
    lock.Unlock()

    return b
}


最终调用的是
func (cb *circuitBreaker) DoWithAcceptable(req func() error, acceptable Acceptable) error {return cb.throttle.doReq(req, nil, acceptable)
}

底层应用的是 googleBreaker, 其判断是否熔断的条件是:

// 依照最近一段时间的申请数据计算是否熔断
func (b *googleBreaker) accept() error {
    // 获取最近一段时间的统计数据
    accepts, total := b.history()
    // 计算动静熔断概率
    weightedAccepts := b.k * float64(accepts)
    // https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
    dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
    // 概率为 0,通过
    if dropRatio <= 0 {return nil}
    // 随机产生 0.0-1.0 之间的随机数与下面计算出来的熔断概率相比拟
    // 如果随机数比熔断概率小则进行熔断
    if b.proba.TrueOnProba(dropRatio) {return ErrServiceUnavailable}

    return nil
}

参考文章:
grpc 拦截器: https://zhuanlan.zhihu.com/p/…
go-zero 服务治理 - 自适应熔断器:https://juejin.cn/post/702853…

正文完
 0