原文链接:# 微服务架构下的熔断框架:hystrix-go

背景

随同着微服务架构被宣传得如火如茶,一些概念也被推到了咱们的背后。一提到微服务,就离不开这几个字:高内聚低耦合;微服务的架构设计最终目标也就是实现这几个字。在微服务架构中,微服务就是实现一个繁多的业务性能,每个微服务能够独立演进,一个利用可能会有多个微服务组成,微服务之间的数据交能够通过近程调用来实现,这样在一个微服务架构下就会造成这样的依赖关系:

微服务A调用微服务C、D,微服务B又依赖微服务B、E,微服务D依赖于服务F,这只是一个简略的小例子,理论业务中服务之间的依赖关系比这还简单,这样在调用链路上如果某个微服务的调用响应工夫过长或者不可用,那么对上游服务(按调用关系命名)的调用就会占用越来越多的系统资源,进而引起零碎解体,这就是微服务的雪蹦效应。

为了解决微服务的雪蹦效应,提出来应用熔断机制为微服务链路提供爱护机制。熔断机制大家应该都不生疏,电路的中保险丝就是一种熔断机制,在微服务中的熔断机制是什么样的呢?

当链路中的某个微服务不可用或者响应的工夫太长时,会进行服务的降级,进而熔断该节点微服务的调用,疾速返回谬误的响应信息,当检测到该节点微服务调用响应失常后,复原调用链路。

本文咱们就介绍一个开源熔断框架:hystrix-go。

熔断框架(hystrix-go)

Hystrix是一个提早和容错库,旨在隔离对近程零碎、服务和第三方服务的拜访点,进行级联故障并在故障不可避免的简单分布式系统中实现弹性。hystrix-go 旨在容许 Go 程序员轻松构建具备与基于 Java 的 Hystrix 库相似的执行语义的应用程序。所以本文就从应用开始到源码剖析一下hystrix-go。

疾速装置

go get -u github.com/afex/hystrix-go/hystrix

疾速应用

hystrix-go真的是开箱即用,应用还是比较简单的,次要分为两个步骤:

  • 配置熔断规定,否则将应用默认配置。能够调用的办法
func Configure(cmds map[string]CommandConfig) func ConfigureCommand(name string, config CommandConfig)

Configure办法外部也是调用的ConfigureCommand办法,就是传参数不一样,依据本人的代码格调抉择。

  • 定义依赖于内部零碎的利用程序逻辑 - runFunc 和服务中断期间执行的逻辑代码 - fallbackFunc,能够调用的办法:
func Go(name string, run runFunc, fallback fallbackFunc) // 外部调用Goc办法func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) func Do(name string, run runFunc, fallback fallbackFunc) // 外部调用的是Doc办法func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) // 外部调用Goc办法,解决了异步过程

GoDo的区别在于异步还是同步,Do办法在调用Doc办法内解决了异步过程,他们最终都是调用的Goc办法。前面咱们进行剖析。

举一个例子:咱们在Gin框架上加一个接口级的熔断中间件

// 代码已上传github: 文末查看地址var CircuitBreakerName = "api_%s_circuit_breaker"func CircuitBreakerWrapper(ctx *gin.Context){    name := fmt.Sprintf(CircuitBreakerName,ctx.Request.URL)    hystrix.Do(name, func() error {        ctx.Next()        code := ctx.Writer.Status()        if code != http.StatusOK{            return errors.New(fmt.Sprintf("status code %d", code))        }        return nil    }, func(err error) error {        if err != nil{            // 监控上报(未实现)            _, _ = io.WriteString(f, fmt.Sprintf("circuitBreaker and err is %s\n",err.Error())) //写入文件(字符串)            fmt.Printf("circuitBreaker and err is %s\n",err.Error())            // 返回熔断谬误            ctx.JSON(http.StatusServiceUnavailable,gin.H{                "msg": err.Error(),            })        }        return nil    })}func init()  {    hystrix.ConfigureCommand(CircuitBreakerName,hystrix.CommandConfig{        Timeout:                int(3*time.Second), // 执行command的超时工夫为3s        MaxConcurrentRequests:  10, // command的最大并发量        RequestVolumeThreshold: 100, // 统计窗口10s内的申请数量,达到这个申请数量后才去判断是否要开启熔断        SleepWindow:            int(2 * time.Second), // 当熔断器被关上后,SleepWindow的工夫就是管制过多久后去尝试服务是否可用了        ErrorPercentThreshold:  20, // 谬误百分比,申请数量大于等于RequestVolumeThreshold并且错误率达到这个百分比后就会启动熔断    })    if checkFileIsExist(filename) { //如果文件存在        f, errfile = os.OpenFile(filename, os.O_APPEND, 0666) //关上文件    } else {        f, errfile = os.Create(filename) //创立文件    }}func main()  {    defer f.Close()    hystrixStreamHandler := hystrix.NewStreamHandler()    hystrixStreamHandler.Start()    go http.ListenAndServe(net.JoinHostPort("", "81"), hystrixStreamHandler)    r := gin.Default()    r.GET("/api/ping/baidu", func(c *gin.Context) {        _, err := http.Get("https://www.baidu.com")        if err != nil {            c.JSON(http.StatusInternalServerError, gin.H{"msg": err.Error()})            return        }        c.JSON(http.StatusOK, gin.H{"msg": "success"})    }, CircuitBreakerWrapper)    r.Run()  // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080")}func checkFileIsExist(filename string) bool {    if _, err := os.Stat(filename); os.IsNotExist(err) {        return false    }    return true}

指令:wrk -t100 -c100 -d1s http://127.0.0.1:8080/api/pin...

运行后果:

circuitBreaker and err is status code 500circuitBreaker and err is status code 500..... circuitBreaker and err is hystrix: max concurrencycircuitBreaker and err is hystrix: max concurrency.....circuitBreaker and err is hystrix: circuit opencircuitBreaker and err is hystrix: circuit open.....

对谬误进行剖析:

  • circuitBreaker and err is status code 500:因为咱们敞开了网络,所以申请是没有响应的
  • circuitBreaker and err is hystrix: max concurrency:咱们设置的最大并发量MaxConcurrentRequests10,咱们的压测工具应用的是100并发,所有会触发这个熔断
  • circuitBreaker and err is hystrix: circuit open:咱们设置熔断开启的申请数量RequestVolumeThreshold100,所以当10s内的申请数量大于100时就会触发熔断。

简略对下面的例子做一个解析:

  • 增加接口级的熔断中间件
  • 初始化熔断相干配置
  • 开启dashboard 可视化hystrix的上报信息,浏览器关上http://localhost:81,能够看到如下后果:

hystrix-go流程剖析

原本想对源码进行剖析,代码量有点大,所以就针对流程来剖析,顺便看一些外围代码。

配置熔断规定

既然是熔断,就要有熔断规定,咱们能够调用两个办法配置熔断规定,不会最终调用的都是ConfigureCommand,这里没有特地的逻辑,如果咱们没有配置,零碎将应用默认熔断规定:

var (    // DefaultTimeout is how long to wait for command to complete, in milliseconds    DefaultTimeout = 1000    // DefaultMaxConcurrent is how many commands of the same type can run at the same time    DefaultMaxConcurrent = 10    // DefaultVolumeThreshold is the minimum number of requests needed before a circuit can be tripped due to health    DefaultVolumeThreshold = 20    // DefaultSleepWindow is how long, in milliseconds, to wait after a circuit opens before testing for recovery    DefaultSleepWindow = 5000    // DefaultErrorPercentThreshold causes circuits to open once the rolling measure of errors exceeds this percent of requests    DefaultErrorPercentThreshold = 50    // DefaultLogger is the default logger that will be used in the Hystrix package. By default prints nothing.    DefaultLogger = NoopLogger{})

配置规定如下:

  • Timeout:定义执行command的超时工夫,工夫单位是ms,默认工夫是1000ms
  • MaxConcurrnetRequests:定义command的最大并发量,默认值是10并发量;
  • SleepWindow:熔断器被关上后应用,在熔断器被关上后,依据SleepWindow设置的工夫管制多久后尝试服务是否可用,默认工夫为5000ms
  • RequestVolumeThreshold:判断熔断开关的条件之一,统计10s(代码中写死了)内申请数量,达到这个申请数量后再依据错误率判断是否要开启熔断;
  • ErrorPercentThreshold:判断熔断开关的条件之一,统计谬误百分比,申请数量大于等于RequestVolumeThreshold并且错误率达到这个百分比后就会启动熔断 默认值是50

这些规定依据command的name进行辨别寄存到一个map中。

执行command

执行command次要能够调用四个办法,别离是:

func Go(name string, run runFunc, fallback fallbackFunc)func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) func Do(name string, run runFunc, fallback fallbackFunc)func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC)

Do外部调用的Doc办法,Go外部调用的是Goc办法,在Doc办法外部最终调用的还是Goc办法,只是在Doc办法内做了同步逻辑:

func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) error {  ..... 省略局部封装代码  var errChan chan error    if fallback == nil {        errChan = GoC(ctx, name, r, nil)    } else {        errChan = GoC(ctx, name, r, f)    }    select {    case <-done:        return nil    case err := <-errChan:        return err    }}

因为他们最终都是调用的Goc办法,所以咱们执行剖析Goc办法的外部逻辑;代码有点长,咱们分逻辑来剖析:

创立command对象
    cmd := &command{        run:      run,        fallback: fallback,        start:    time.Now(),        errChan:  make(chan error, 1),        finished: make(chan bool, 1),    }    // 获取熔断器    circuit, _, err := GetCircuit(name)    if err != nil {        cmd.errChan <- err        return cmd.errChan    }

介绍一下command的数据结构:

type command struct {    sync.Mutex    ticket      *struct{}    start       time.Time    errChan     chan error    finished    chan bool    circuit     *CircuitBreaker    run         runFuncC    fallback    fallbackFuncC    runDuration time.Duration    events      []string}

字段介绍:

  • ticket:用来做最大并发量管制,这个就是一个令牌
  • start:记录command执行的开始工夫
  • errChan:记录command执行谬误
  • finished:标记command执行完结,用来做协程同步
  • circuit:存储熔断器相干信息
  • run:应用程序
  • fallback:应用程序执行失败后要执行的函数
  • runDuration:记录command执行耗费工夫
  • eventsevents次要是存储事件类型信息,比方执行胜利的success,或者失败的timeoutcontext_canceled

上段代码重点是GetCircuit办法,这一步的目标就是获取熔断器,应用动静加载的形式,如果没有就创立一个熔断器,熔断器构造如下:

type CircuitBreaker struct {    Name                   string    open                   bool    forceOpen              bool    mutex                  *sync.RWMutex    openedOrLastTestedTime int64    executorPool *executorPool    metrics      *metricExchange}

解释一下这几个字段:

  • name:熔断器的名字,其实就是创立的command名字
  • open:判断熔断器是否关上的标记
  • forceopen:手动触发熔断器的开关,单元测试应用
  • mutex:应用读写锁保障并发平安
  • openedOrLastTestedTime:记录上一次关上熔断器的工夫,因为要依据这个工夫和SleepWindow工夫来做复原尝试
  • executorPool:用来做流量管制,因为咱们有一个最大并发量管制,就是依据这个来做的流量管制,每次申请都要获取令牌
  • metrics:用来上报执行状态的事件,通过它把执行状态信息存储到理论熔断器执行各个维度状态 (胜利次数,失败次数,超时……) 的数据汇合中。

前面会独自剖析executorPoolmetrics的实现逻辑。

定义令牌相干的办法和变量

因为咱们有一个条件是最大并发管制,采纳的是令牌的形式进行流量管制,每一个申请都要获取一个令牌,应用结束要把令牌还回去,先看一下这段代码:

    ticketCond := sync.NewCond(cmd)    ticketChecked := false    // When the caller extracts error from returned errChan, it's assumed that    // the ticket's been returned to executorPool. Therefore, returnTicket() can    // not run after cmd.errorWithFallback().    returnTicket := func() {        cmd.Lock()        // Avoid releasing before a ticket is acquired.        for !ticketChecked {            ticketCond.Wait()        }        cmd.circuit.executorPool.Return(cmd.ticket)        cmd.Unlock()    }

应用sync.NewCond创立一个条件变量,用来协调告诉你能够偿还令牌了。

而后定义一个返回令牌的办法,调用Return办法偿还令牌。

定义上报执行事件的办法

后面咱们也提到了,咱们的熔断器会上报执行状态的事件,通过它把执行状态信息存储到理论熔断器执行各个维度状态 (胜利次数,失败次数,超时……) 的数据汇合中。所以要定义一个上报的办法:

    reportAllEvent := func() {        err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration)        if err != nil {            log.Printf(err.Error())        }    }
开启协程一:执行利用程序逻辑 - runFunc

协程一的次要目标就是执行利用程序逻辑:

go func() {        defer func() { cmd.finished <- true }() // 标记协程一的command执行完结,同步到协程二        // 当最近执行的并发数量超过阈值并且错误率很高时,就会关上熔断器。       // 如果熔断器关上,间接回绝拒绝请求并返回令牌,当感觉衰弱状态复原时,熔断器将容许新的流量。        if !cmd.circuit.AllowRequest() {            cmd.Lock()            // It's safe for another goroutine to go ahead releasing a nil ticket.            ticketChecked = true            ticketCond.Signal() // 告诉开释ticket信号            cmd.Unlock()      // 应用sync.Onece保障只执行一次。            returnOnce.Do(func() {        // 返还令牌                returnTicket()        // 执行fallback逻辑                cmd.errorWithFallback(ctx, ErrCircuitOpen)        // 上报状态事件                reportAllEvent()            })            return        }   // 管制并发        cmd.Lock()        select {    // 获取到令牌        case cmd.ticket = <-circuit.executorPool.Tickets:      // 发送开释令牌信号            ticketChecked = true            ticketCond.Signal()            cmd.Unlock()        default:         // 没有令牌可用了, 也就是达到最大并发数量则间接解决fallback逻辑            ticketChecked = true            ticketCond.Signal()            cmd.Unlock()            returnOnce.Do(func() {                returnTicket()                cmd.errorWithFallback(ctx, ErrMaxConcurrency)                reportAllEvent()            })            return        }        // 执行利用程序逻辑        runStart := time.Now()        runErr := run(ctx)        returnOnce.Do(func() {            defer reportAllEvent() // 状态事件上报      // 统计应用程序执行时长            cmd.runDuration = time.Since(runStart)      // 返还令牌            returnTicket()      // 如果应用程序执行失败执行fallback函数            if runErr != nil {                cmd.errorWithFallback(ctx, runErr)                return            }            cmd.reportEvent("success")        })    }()

总结一下这个协程:

  • 判断熔断器是否关上,如果关上了熔断器间接进行熔断,不在进行前面的申请
  • 运行利用程序逻辑
开启协程二:同步协程一并监听谬误

先看代码:

go func() {    //  应用定时器来做超时管制,这个超时工夫就是咱们配置的,默认1000ms        timer := time.NewTimer(getSettings(name).Timeout)        defer timer.Stop()        select {      // 同步协程一        case <-cmd.finished:            // returnOnce has been executed in another goroutine          // 是否收到context勾销信号        case <-ctx.Done():            returnOnce.Do(func() {                returnTicket()                cmd.errorWithFallback(ctx, ctx.Err())                reportAllEvent()            })            return    // command执行超时了        case <-timer.C:            returnOnce.Do(func() {                returnTicket()                cmd.errorWithFallback(ctx, ErrTimeout)                reportAllEvent()            })            return        }    }()

这个协程的逻辑比拟清晰明了,目标就是监听业务执行被勾销以及超时。

画图总结command执行流程

下面咱们都是通过代码来进行剖析的,看起来还是有点乱,最初画个图总结一下:

下面咱们剖析了整个具体流程,接下来咱们针对一些外围点就行剖析

上报状态事件

hystrix-go为每一个Command设置了一个默认统计控制器,用来保留熔断器的所有状态,包含调用次数、失败次数、被回绝次数等,存储指标构造如下:

type DefaultMetricCollector struct {    mutex *sync.RWMutex    numRequests *rolling.Number    errors      *rolling.Number    successes               *rolling.Number    failures                *rolling.Number    rejects                 *rolling.Number    shortCircuits           *rolling.Number    timeouts                *rolling.Number    contextCanceled         *rolling.Number    contextDeadlineExceeded *rolling.Number    fallbackSuccesses *rolling.Number    fallbackFailures  *rolling.Number    totalDuration     *rolling.Timing    runDuration       *rolling.Timing}

应用rolling.Number构造保留状态指标,应用rolling.Timing保留工夫指标。

最终监控上报都依附metricExchange来实现,数据结构如下:

type metricExchange struct {    Name    string    Updates chan *commandExecution    Mutex   *sync.RWMutex    metricCollectors []metricCollector.MetricCollector}

上报command的信息结构:

type commandExecution struct {    Types            []string      `json:"types"` // 辨别事件类型,比方success、failure....    Start            time.Time     `json:"start_time"` // command开始工夫    RunDuration      time.Duration `json:"run_duration"` // command完结工夫    ConcurrencyInUse float64       `json:"concurrency_inuse"` // command 线程池使用率}

说了这么多,大家还是有点懵,其实用一个类图就能表明他们之间的关系:

咱们能够看到类mertricExchange提供了一个Monitor办法,这个办法次要逻辑就是监听状态事件,而后写入指标,所以整个上报流程就是这个样子:

流量管制

hystrix-go对流量管制采纳的是令牌算法,能失去令牌的就能够执行后继的工作,执行完后要返还令牌。
构造体executorPool就是hystrix-go 流量管制的具体实现。字段Max就是每秒最大的并发值。

type executorPool struct {    Name    string    Metrics *poolMetrics // 上报执行数量指标    Max     int // 最大并发数量    Tickets chan *struct{} // 代表令牌}

这里还有一个上报指标,这个又独自实现一套办法用来统计执行数量,比方执行的总数量、最大并发数等,咱们依赖画一个类图来示意:

上报执行数量逻辑与上报状态事件的逻辑是一样的,应用channel进行数据通信的,上报与返还令牌都在Return办法中:

func (p *executorPool) Return(ticket *struct{}) {    if ticket == nil {        return    }    p.Metrics.Updates <- poolMetricsUpdate{        activeCount: p.ActiveCount(),    }    p.Tickets <- ticket}

次要逻辑两步:

  • 上报以后可用的令牌数
  • 返回令牌

熔断器

咱们最初来剖析熔断器中一个比拟重要的办法:AllowRequest,咱们在执行Command是会依据这个办法来判断是否能够执行command,接下来咱们就来看一下这个判断的次要逻辑:

func (circuit *CircuitBreaker) AllowRequest() bool {    return !circuit.IsOpen() || circuit.allowSingleTest()}

外部就是调用IsOpen()allowSingleTest这两个办法:

  • IsOpen()
func (circuit *CircuitBreaker) IsOpen() bool {    circuit.mutex.RLock()    o := circuit.forceOpen || circuit.open    circuit.mutex.RUnlock()    // 熔断曾经开启    if o {        return true    }    // 判断10s内的并发数是否超过设置的最大并发数,没有超过时,不须要开启熔断器    if uint64(circuit.metrics.Requests().Sum(time.Now())) < getSettings(circuit.Name).RequestVolumeThreshold {        return false    }    // 此时10s内的并发数曾经超过设置的最大并发数了,如果此时零碎错误率超过了预设值,那就开启熔断器    if !circuit.metrics.IsHealthy(time.Now()) {        //         circuit.setOpen()        return true    }    return false}
  • allowSingleTest()

先解释一下为什么要有这个办法,还记得咱们之前设置了一个熔断规定中的SleepWindow吗,如果在开启熔断的状况下,在SleepWindow工夫后进行尝试,这个办法的目标就是干这个的:

func (circuit *CircuitBreaker) allowSingleTest() bool {    circuit.mutex.RLock()    defer circuit.mutex.RUnlock()      // 获取以后工夫戳    now := time.Now().UnixNano()    openedOrLastTestedTime := atomic.LoadInt64(&circuit.openedOrLastTestedTime)  // 以后熔断器是开启状态,以后的工夫曾经大于 (上次开启熔断器的工夫 +SleepWindow 的工夫)    if circuit.open && now > openedOrLastTestedTime+getSettings(circuit.Name).SleepWindow.Nanoseconds() {    // 替换openedOrLastTestedTime        swapped := atomic.CompareAndSwapInt64(&circuit.openedOrLastTestedTime, openedOrLastTestedTime, now)        if swapped {            log.Printf("hystrix-go: allowing single test to possibly close circuit %v", circuit.Name)        }        return swapped    }

这里只看到了熔断器被开启的设置了,然而没有敞开熔断器的逻辑,因为敞开熔断器的逻辑是在上报状态指标的办法ReportEvent内实现,咱们最初再看一下ReportEvent的实现:

func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, start time.Time, runDuration time.Duration) error {    if len(eventTypes) == 0 {        return fmt.Errorf("no event types sent for metrics")    }        circuit.mutex.RLock()    o := circuit.open    circuit.mutex.RUnlock()  // 上报的状态事件是success 并且以后熔断器是开启状态,则阐明上游服务失常了,能够敞开熔断器了    if eventTypes[0] == "success" && o {        circuit.setClose()    }    var concurrencyInUse float64    if circuit.executorPool.Max > 0 {        concurrencyInUse = float64(circuit.executorPool.ActiveCount()) / float64(circuit.executorPool.Max)    }    select {    // 上报状态指标,与上文的monitor响应    case circuit.metrics.Updates <- &commandExecution{        Types:            eventTypes,        Start:            start,        RunDuration:      runDuration,        ConcurrencyInUse: concurrencyInUse,    }:    default:        return CircuitError{Message: fmt.Sprintf("metrics channel (%v) is at capacity", circuit.Name)}    }    return nil}

可视化hystrix的上报信息

通过下面的剖析咱们晓得hystrix-go上报了状态事件、执行数量事件,那么这些指标咱们能够怎么查看呢?

设计者早就想到了这个问题,所以他们做了一个dashborad,能够查看hystrix的上报信息,应用办法只需在服务启动时增加如下代码:

hystrixStreamHandler := hystrix.NewStreamHandler()hystrixStreamHandler.Start()go http.ListenAndServe(net.JoinHostPort("", "81"), hystrixStreamHandler)

而后关上浏览器:http://127.0.0.1:81/hystrix-d...,进行观测吧。

总结

故事终于靠近序幕了,一个熔断机制的实现的确不简略,要思考的因素也是方方面面,尤其在微服务架构下,熔断机制是必不可少的,不仅要在框架层面实现熔断机制,还要依据具体业务场景应用熔断机制,这些都是值得咱们三思而行的。本文介绍的熔断框架实现的还是比拟完满的,这种优良的设计思路值得咱们学习。

文中代码已上传github:https://github.com/asong2020/...,欢送star

欢送关注公众号:【Golang梦工厂】

举荐往期文章:

  • 学习channel设计:从入门到放弃
  • 详解内存对齐
  • [[警觉] 请勿滥用goroutine](https://mp.weixin.qq.com/s/JC...)
  • 源码分析panic与recover,看不懂你打我好了!
  • 面试官:小松子来聊一聊内存逃逸
  • [面试官:你能聊聊string和[]byte的转换吗?](https://mp.weixin.qq.com/s/jz...)
  • 面试官:两个nil比拟后果是什么?
  • 并发编程包之 errgroup