简介: 本文次要剖析阿里巴巴团体开源的流量管制中间件 Sentinel,其原生反对了 Java/Go/C++ 等多种语言,本文仅仅剖析其 Go 语言实现。下文如无非凡阐明,sentinel 指代 Sentinel-Go。

作者 | 于雨  apache/dubbo-go 我的项目负责人

本文作者系 apache/dubbo-go 我的项目负责人,目前在 dubbogo 我的项目中已内置可用 sentinel-go,如果想独自应用可参考 在 dubbo-go 中应用 sentinel 一文,若有其余疑难可进 dubbogo社区【钉钉群 23331795】进行沟通。

导读:本文次要剖析阿里巴巴团体开源的流量管制中间件 Sentinel,其原生反对了 Java/Go/C++ 等多种语言,本文仅仅剖析其 Go 语言实现。下文如无非凡阐明,sentinel 指代 Sentinel-Go。

1 基本概念 Resource  和 Rule

1.1 Resource

 _// ResourceType represents classification of the resources_    type ResourceType int32    const (        ResTypeCommon ResourceType = iota        ResTypeWeb        ResTypeRPC    )    _// TrafficType describes the traffic type: Inbound or Outbound_    type TrafficType int32    const (        _// Inbound represents the inbound traffic (e.g. provider)_        Inbound TrafficType = iota        _// Outbound represents the outbound traffic (e.g. consumer)_        Outbound    )    _// ResourceWrapper represents the invocation_    type ResourceWrapper struct {        _// global unique resource name_        name string        _// resource classification_        classification ResourceType        _// Inbound or Outbound_        flowType TrafficType    }

Resource(ResourceWrapper) 存储了利用场景 ResourceType,以及指标流控的方向 FlowType(TrafficType)。

1.2 Entry

 _// EntryOptions represents the options of a Sentinel resource entry._    type EntryOptions struct {        resourceType base.ResourceType        entryType    base.TrafficType        acquireCount uint32        slotChain    *base.SlotChain    }    type EntryContext struct {        entry *SentinelEntry        _// Use to calculate RT_        startTime uint64        Resource *ResourceWrapper        StatNode StatNode        Input *SentinelInput        _// the result of rule slots check_        RuleCheckResult *TokenResult    }    type SentinelEntry struct {        res *ResourceWrapper        _// one entry bounds with one context_        ctx *EntryContext        sc *SlotChain    }

Entry 实体 SentinelEntry 关联了 Resource(ResourceWrapper) 以及其流控规定汇合 SlotChain。每个 Entry 实体有一个上下文环境 EntryContext,存储每个 Rule 检测时用到的一些流控参数和流控断定后果。

值得注意的是,SentinelEntry.sc 值来自于 EntryOptions.slotChainEntryOptions.slotChain 存储了全局 SlotChain 对象 api/slot_chain.go:globalSlotChain

至于何为 SlotChain,就是 sentinel 提供的所有的流控组件的汇合,能够简略地认为每个流控组件就是一个 Slot,其详细分析见[[3.5 SlotChain]](#3.5)。

sentinel 一些变量和函数命名的可读性较差,如 EntryOptions.acquireCount 切实无奈让人顾名思义,看过函数 core/api.go:WithAcquireCount() 的正文才明确:EntryOptions.acquireCount 是批量动作执行次数。如有的一次 RPC 申请中调用了服务端的一个服务接口,则取值 1【也是 EntryOptions.acquireCount 的默认取值】,如果调用了服务端的 3 个服务接口,则取值 3。所以倡议改名为 EntryOptions.batchCount 比拟好,思考到最小改变准则,能够在保留 core/api.go:WithAcquireCount() 的同时减少一个同样性能的 core/api.go:WithBatchCount() 接口。相干改良曾经提交到  pr 263。

1.3 Rule

 type TokenCalculateStrategy int32    const (        Direct TokenCalculateStrategy = iota        WarmUp    )    type ControlBehavior int32    const (        Reject ControlBehavior = iota        Throttling    )    _// Rule describes the strategy of flow control, the flow control strategy is based on QPS statistic metric_    type Rule struct {        _// Resource represents the resource name._        Resource               string                 `json:"resource"`        ControlBehavior        ControlBehavior        `json:"controlBehavior"`        _// Threshold means the threshold during StatIntervalInMs_        _// If StatIntervalInMs is 1000(1 second), Threshold means QPS_        Threshold         float64          `json:"threshold"`        MaxQueueingTimeMs uint32           `json:"maxQueueingTimeMs"`        _// StatIntervalInMs indicates the statistic interval and it's the optional setting for flow Rule._        _// If user doesn't set StatIntervalInMs, that means using default metric statistic of resource._        _// If the StatIntervalInMs user specifies can not reuse the global statistic of resource,_        _//         sentinel will generate independent statistic structure for this rule._        StatIntervalInMs uint32 `json:"statIntervalInMs"`    }

Rule 记录了某 Resource 的限流断定阈值 Threshold、限流工夫窗口计时长度 StatIntervalInMs 以及 触发限流后的判罚动作 ControlBehavior。

下面外围是 Rule 的接口 RuleCheckSlot,至于 StatSlot 则用于统计 sentinel 本身的运行 metrics。

1.4 Flow

以后章节次要剖析流控中的限流(core/flow),依据流控的解决流程梳理 sentinel 整体骨架。

1.4.1 TrafficShapingController

所谓 TrafficShapingController,顾名思义,就是 流量塑形控制器,是流控的具体实施者。

 _// core/flow/traffic_shaping.go_    _// TrafficShapingCalculator calculates the actual traffic shaping threshold_    _// based on the threshold of rule and the traffic shaping strategy._    type TrafficShapingCalculator interface {        CalculateAllowedTokens(acquireCount uint32, flag int32) float64    }    type DirectTrafficShapingCalculator struct {        threshold float64    }    func (d *DirectTrafficShapingCalculator) CalculateAllowedTokens(uint32, int32) float64 {        return d.threshold    }

TrafficShapingCalculator 接口用于计算限流的下限,如果不应用 warm-up 性能,能够不去深究其实现,其实体之一 DirectTrafficShapingCalculator 返回 Rule.Threshold【用户设定的限流下限】。

 _// TrafficShapingChecker performs checking according to current metrics and the traffic_    _// shaping strategy, then yield the token result._    type TrafficShapingChecker interface {        DoCheck(resStat base.StatNode, acquireCount uint32, threshold float64) *base.TokenResult    }    type RejectTrafficShapingChecker struct {        rule  *Rule    }    func (d *RejectTrafficShapingChecker) DoCheck(resStat base.StatNode, acquireCount uint32, threshold float64) *base.TokenResult {        metricReadonlyStat := d.BoundOwner().boundStat.readOnlyMetric        if metricReadonlyStat == nil {            return nil        }        curCount := float64(metricReadonlyStat.GetSum(base.MetricEventPass))        if curCount+float64(acquireCount) > threshold {            return base.NewTokenResultBlockedWithCause(base.BlockTypeFlow, "", d.rule, curCount)        }        return nil    }

RejectTrafficShapingChecker 根据 Rule.Threshold 断定 Resource 在以后工夫窗口是否超限,其限流后果 TokenResultStatus 只可能是 Pass 或者 Blocked。

sentinel flow 还有一个匀速限流 ThrottlingChecker,它的目标是让申请匀速被执行,把一个工夫窗口【譬如 1s】依据 threshold 再细分为更细的微工夫窗口,在每个微工夫窗口最多执行一次申请,其限流后果 TokenResultStatus 只可能是 Pass 或者 Blocked 或者 Wait,其相干意义别离为:

  • Pass:在微工夫窗口内无超限,申请通过;
  • Wait:在微工夫窗口内超限,被滞后若干工夫窗口执行,在这段时间内申请须要期待;
  • Blocked:在微工夫窗口内超限,且等待时间超过用户设定的最大违心等待时间长度【Rule.MaxQueueingTimeMs】,申请被回绝。
 type TrafficShapingController struct {        flowCalculator TrafficShapingCalculator        flowChecker    TrafficShapingChecker        rule *Rule        _// boundStat is the statistic of current TrafficShapingController_        boundStat standaloneStatistic    }    func (t *TrafficShapingController) PerformChecking(acquireCount uint32, flag int32) *base.TokenResult {        allowedTokens := t.flowCalculator.CalculateAllowedTokens(acquireCount, flag)        return t.flowChecker.DoCheck(resStat, acquireCount, allowedTokens)    }

Direct + Reject 限流的场景下,这三个接口其实并无多大意义,其外围函数 TrafficShapingController.PerformChecking() 的次要流程是:

  • 1  从 TrafficShapingController.boundStat 中获取以后 Resource 的 metrics 值【curCount】;
  • 2 如果 curCount + batchNum(acquireCount) > Rule.Threshold,则 pass,否则就 reject。

在限流场景下, TrafficShapingController 四个成员的意义如下:

  • flowCalculator 计算限流下限;
  • flowChecker 执行限流 Check 动作;
  • rule 存储限流规定;
  • boundStat 存储限流的 Check 后果和工夫窗口参数,作为下次限流 Check 动作断定的根据。

1.4.2 TrafficControllerMap

在执行限流断定时,须要依据 Resource 名称获取其对应的 TrafficShapingController

 _// TrafficControllerMap represents the map storage for TrafficShapingController._   type TrafficControllerMap map[string][]*TrafficShapingController    _// core/flow/rule_manager.go_    tcMap        = make(TrafficControllerMap)

package 级别全局公有变量 tcMap 存储了所有的 Rule,其 key 为 Resource 名称,value 则是与 Resource 对应的 TrafficShapingController。

用户级别接口函数 core/flow/rule_manager.go:LoadRules() 会依据用户定义的 Rule 结构其对应的 TrafficShapingController 存入 tcMap,这个接口调用函数 generateStatFor(*Rule) 结构 TrafficShapingController.boundStat

限流场景下,函数 generateStatFor(*Rule) 的外围代码如下:

 func generateStatFor(rule *Rule) (*standaloneStatistic, error) {        resNode = stat.GetOrCreateResourceNode(rule.Resource, base.ResTypeCommon)        _// default case, use the resource's default statistic_        readStat := resNode.DefaultMetric()        retStat.reuseResourceStat = true        retStat.readOnlyMetric = readStat        retStat.writeOnlyMetric = nil        return &retStat, nil    }

2 Metrics

Resource 的指标 Metrics 是进行 Rule 断定的根底。

2.1 原子工夫轮 AtomicBucketWrapArray

Sentinel 库功能丰富,但无论是限流还是熔断,其存储根底都是滑动工夫窗口。其间蕴含了泛滥优化:如无锁定长时间轮。

滑动窗口实现有很多种,工夫轮算法是其中一种比较简单的实现,在工夫轮算法之上能够实现多种限流办法。工夫轮整体框图如下:

1 BucketWrap

工夫轮的最根本单元是一个桶【工夫窗口】。

 _// BucketWrap represent a slot to record metrics_    _// In order to reduce the usage of memory, BucketWrap don't hold length of BucketWrap_    _// The length of BucketWrap could be seen in LeapArray._    _// The scope of time is [startTime, startTime+bucketLength)_    _// The size of BucketWrap is 24(8+16) bytes_    type BucketWrap struct {        _// The start timestamp of this statistic bucket wrapper._        BucketStart uint64        _// The actual data structure to record the metrics (e.g. MetricBucket)._        Value atomic.Value    }

补充:这里之所以用指针,是因为以 BucketWrap 为根底的 AtomicBucketWrapArray 会被多个 sentinel 流控组件应用,每个组件的流控参数不一,例如:

  • 1 core/circuitbreaker/circuit_breaker.go:slowRtCircuitBreaker 应用的 slowRequestLeapArray 的底层参数 slowRequestCounter
 _// core/circuitbreaker/circuit_breaker.go_    type slowRequestCounter struct {        slowCount  uint64        totalCount uint64    }
  • 2 core/circuitbreaker/circuit_breaker.go:errorRatioCircuitBreaker 应用的 errorCounterLeapArray 的底层参数 errorCounter
 _// core/circuitbreaker/circuit_breaker.go_    type errorCounter struct {        errorCount uint64        totalCount uint64    }
1.1 MetricBucket

BucketWrap 能够认作是一种 工夫桶模板,具体的桶的实体是 MetricsBucket,其定义如下:

 _// MetricBucket represents the entity to record metrics per minimum time unit (i.e. the bucket time span)._    _// Note that all operations of the MetricBucket are required to be thread-safe._    type MetricBucket struct {        _// Value of statistic_        counter [base.MetricEventTotal]int64        minRt   int64    }

MetricBucket 存储了五种类型的 metric:

 _// There are five events to record_    _// pass + block == Total_    const (        _// sentinel rules check pass_        MetricEventPass MetricEvent = iota        _// sentinel rules check block_        MetricEventBlock        MetricEventComplete        _// Biz error, used for circuit breaker_        MetricEventError        _// request execute rt, unit is millisecond_        MetricEventRt        _// hack for the number of event_        MetricEventTotal    )
2 AtomicBucketWrapArray

每个桶只记录了其起始工夫和 metric 值,至于每个桶的工夫窗口长度这种公共值则对立记录在 AtomicBucketWrapArray 内,AtomicBucketWrapArray 定义如下:

 _// atomic BucketWrap array to resolve race condition_    _// AtomicBucketWrapArray can not append or delete element after initializing_    type AtomicBucketWrapArray struct {        _// The base address for real data array_        base unsafe.Pointer        _// The length of slice(array), it can not be modified._        length int        data   []*BucketWrap    }

AtomicBucketWrapArray.base 的值是 AtomicBucketWrapArray.data slice 的 data 区域的首指针。因为 AtomicBucketWrapArray.data 是一个固定长度的 slice,所以 AtomicBucketWrapArray.base 间接存储数据内存区域的首地址,以减速访问速度。

其次,AtomicBucketWrapArray.data 中存储的是 BucketWrap 的指针,而不是 BucketWrap。

NewAtomicBucketWrapArrayWithTime() 函数会预热一下,把所有的工夫桶都生成进去。

2.2 工夫轮

1 leapArray
 _// Give a diagram to illustrate_    _// Suppose current time is 888, bucketLengthInMs is 200ms,_    _// intervalInMs is 1000ms, LeapArray will build the below windows_    _//   B0       B1      B2     B3      B4_    _//   |_______|_______|_______|_______|_______|_    _//  1000    1200    1400    1600    800    (1000)_    _//                                        ^_    _//                                      time=888_    type LeapArray struct {        bucketLengthInMs uint32        sampleCount      uint32        intervalInMs     uint32        array            *AtomicBucketWrapArray        _// update lock_        updateLock mutex    }

LeapArray 各个成员解析:

  • bucketLengthInMs 是漏桶长度,以毫秒为单位;
  • sampleCount 则是工夫漏桶个数;
  • intervalInMs 是工夫窗口长度,以毫秒为单位。

其正文中的 ASCII 图很好地解释了每个字段的含意。

LeapArray 外围函数是 LeapArray.currentBucketOfTime(),其作用是依据某个工夫点获取其做对应的工夫桶 BucketWrap,代码如下:

 func (la *LeapArray) currentBucketOfTime(now uint64, bg BucketGenerator) (*BucketWrap, error) {        if now <= 0 {            return nil, errors.New("Current time is less than 0.")        }        idx := la.calculateTimeIdx(now)        bucketStart := calculateStartTime(now, la.bucketLengthInMs)        for { _//spin to get the current BucketWrap_            old := la.array.get(idx)            if old == nil {                _// because la.array.data had initiated when new la.array_                _// theoretically, here is not reachable_                newWrap := &BucketWrap{                    BucketStart: bucketStart,                    Value:       atomic.Value{},                }                newWrap.Value.Store(bg.NewEmptyBucket())                if la.array.compareAndSet(idx, nil, newWrap) {                    return newWrap, nil                } else {                    runtime.Gosched()                }            } else if bucketStart == atomic.LoadUint64(&old.BucketStart) {                return old, nil            } else if bucketStart > atomic.LoadUint64(&old.BucketStart) {                _// current time has been next cycle of LeapArray and LeapArray dont't count in last cycle._                _// reset BucketWrap_                if la.updateLock.TryLock() {                    old = bg.ResetBucketTo(old, bucketStart)                    la.updateLock.Unlock()                    return old, nil                } else {                    runtime.Gosched()                }            } else if bucketStart < atomic.LoadUint64(&old.BucketStart) {                _// TODO: reserve for some special case (e.g. when occupying "future" buckets)._                return nil, errors.New(fmt.Sprintf("Provided time timeMillis=%d is already behind old.BucketStart=%d.", bucketStart, old.BucketStart))            }        }    }

其 for-loop 外围逻辑是:

  • 1 获取工夫点对应的工夫桶 old;
  • 2 如果 old 为空,则新建一个工夫桶,以原子操作的形式尝试存入工夫窗口的工夫轮中,存入失败则从新尝试;
  • 3 如果 old 就是以后工夫点所在的工夫桶,则返回;
  • 4 如果 old 的工夫终点小于以后工夫,则通过乐观锁尝试 reset 桶的起始工夫等参数值,加锁更新胜利则返回;
  • 5 如果 old 的工夫终点大于以后工夫,则零碎产生了工夫扭曲,返回谬误。
2 BucketLeapArray

leapArray 实现了滑动工夫窗口的所有主体,其对外应用接口则是 BucketLeapArray:

 _// The implementation of sliding window based on LeapArray (as the sliding window infrastructure)_    _// and MetricBucket (as the data type). The MetricBucket is used to record statistic_    _// metrics per minimum time unit (i.e. the bucket time span)._    type BucketLeapArray struct {        data     LeapArray        dataType string    }

从这个 struct 的正文可见,其工夫窗口 BucketWrap 的实体是 MetricBucket。

2.3 Metric 数据读写

SlidingWindowMetric
 _// SlidingWindowMetric represents the sliding window metric wrapper._    _// It does not store any data and is the wrapper of BucketLeapArray to adapt to different internal bucket_    _// SlidingWindowMetric is used for SentinelRules and BucketLeapArray is used for monitor_    _// BucketLeapArray is per resource, and SlidingWindowMetric support only read operation._    type SlidingWindowMetric struct {        bucketLengthInMs uint32        sampleCount      uint32        intervalInMs     uint32        real             *BucketLeapArray    }

SlidingWindowMetric 是对 BucketLeapArray 的一个封装,只提供了只读接口。

ResourceNode
 type BaseStatNode struct {        sampleCount uint32        intervalMs  uint32        goroutineNum int32        arr    *sbase.BucketLeapArray        metric *sbase.SlidingWindowMetric    }    type ResourceNode struct {        BaseStatNode        resourceName string        resourceType base.ResourceType    }    _// core/stat/node_storage.go_    type ResourceNodeMap map[string]*ResourceNode    var (        inboundNode = NewResourceNode(base.TotalInBoundResourceName, base.ResTypeCommon)        resNodeMap = make(ResourceNodeMap)        rnsMux     = new(sync.RWMutex)    )

BaseStatNode 对外提供了读写接口,其数据写入 BaseStatNode.arr,读取接口则依赖 BaseStatNode.metric。BaseStatNode.arr 是在 NewBaseStatNode() 中创立的,指针 SlidingWindowMetric.real 也指向它。

ResourceNode 则顾名思义,其代表了某资源和它的 Metrics 存储  ResourceNode.BaseStatNode

全局变量 resNodeMap 存储了所有资源的 Metrics 指标数据。

3 限流流程

本节只剖析 Sentinel 库提供的最根底的流量整形性能 -- 限流,限流算法多种多样,能够应用其内置的算法,用户本人也能够进行扩大。

限流过程有三步步骤:

  • 1 针对特定 Resource 结构其 EntryContext,存储其 Metrics、限流开始工夫等,Sentinel 称之为 StatPrepareSlot;
  • 2 根据 Resource 的限流算法断定其是否应该进行限流,并给出限流断定后果,Sentinel 称之为 RuleCheckSlot;

    • 补充:这个限流算法是一系列判断办法的合集(SlotChain);
  • 3 断定之后,除了用户本身依据断定后果执行相应的 action,Sentinel 也须要依据断定后果执行本身的 Action,以及把整个断定流程所应用的的工夫 RT 等指标存储下来,Sentinel 称之为 StatSlot。

整体流程如下图所示:

3.1 Slot

针对 Check 三个步骤,有三个对应的 Slot 别离定义如下:

 _// StatPrepareSlot is responsible for some preparation before statistic_    _// For example: init structure and so on_    type StatPrepareSlot interface {        _// Prepare function do some initialization_        _// Such as: init statistic structure、node and etc_        _// The result of preparing would store in EntryContext_        _// All StatPrepareSlots execute in sequence_        _// Prepare function should not throw panic._        Prepare(ctx *EntryContext)    }    _// RuleCheckSlot is rule based checking strategy_    _// All checking rule must implement this interface._    type RuleCheckSlot interface {        _// Check function do some validation_        _// It can break off the slot pipeline_        _// Each TokenResult will return check result_        _// The upper logic will control pipeline according to SlotResult._        Check(ctx *EntryContext) *TokenResult    }    _// StatSlot is responsible for counting all custom biz metrics._    _// StatSlot would not handle any panic, and pass up all panic to slot chain_    type StatSlot interface {        _// OnEntryPass function will be invoked when StatPrepareSlots and RuleCheckSlots execute pass_        _// StatSlots will do some statistic logic, such as QPS、log、etc_        OnEntryPassed(ctx *EntryContext)        _// OnEntryBlocked function will be invoked when StatPrepareSlots and RuleCheckSlots fail to execute_        _// It may be inbound flow control or outbound cir_        _// StatSlots will do some statistic logic, such as QPS、log、etc_        _// blockError introduce the block detail_        OnEntryBlocked(ctx *EntryContext, blockError *BlockError)        _// OnCompleted function will be invoked when chain exits._        _// The semantics of OnCompleted is the entry passed and completed_        _// Note: blocked entry will not call this function_        OnCompleted(ctx *EntryContext)    }

抛却 Prepare 和 Stat,能够简略的认为:所谓的 slot,就是 sentinel 提供的某个流控组件。

值得注意的是,依据正文 StatSlot.OnCompleted 只有在 RuleCheckSlot.Check 通过才会执行,用于计算从申请开始到完结所应用的 RT 等 Metrics。

3.2 Prepare

 _// core/base/slot_chain.go_    _// StatPrepareSlot is responsible for some preparation before statistic_    _// For example: init structure and so on_    type StatPrepareSlot interface {        _// Prepare function do some initialization_        _// Such as: init statistic structure、node and etc_        _// The result of preparing would store in EntryContext_        _// All StatPrepareSlots execute in sequence_        _// Prepare function should not throw panic._        Prepare(ctx *EntryContext)    }    _// core/stat/stat_prepare_slot.go_    type ResourceNodePrepareSlot struct {    }    func (s *ResourceNodePrepareSlot) Prepare(ctx *base.EntryContext) {        node := GetOrCreateResourceNode(ctx.Resource.Name(), ctx.Resource.Classification())        _// Set the resource node to the context._        ctx.StatNode = node    }

如后面解释,Prepare 次要是结构存储 Resource Metrics 所应用的 ResourceNode。所有 Resource 的 StatNode 都会存储在 package 级别的全局变量 core/stat/node_storage.go:resNodeMap [type: map[string]*ResourceNode]中,函数 GetOrCreateResourceNode 用于依据 Resource Name 从 resNodeMap 中获取其对应的 StatNode,如果不存在则创立一个 StatNode 并存入 resNodeMap

3.3 Check

RuleCheckSlot.Check() 执行流程:

  • 1 依据 Resource 名称获取其所有的 Rule 汇合;
  • 2 遍历 Rule 汇合,对 Resource 顺次执行 Check,任何一个 Rule 断定 Resource 须要进行限流【Blocked】则返回,否则放行。
 type Slot struct {    }    func (s *Slot) Check(ctx *base.EntryContext) *base.TokenResult {        res := ctx.Resource.Name()        tcs := getTrafficControllerListFor(res)        result := ctx.RuleCheckResult        _// Check rules in order_        for _, tc := range tcs {            r := canPassCheck(tc, ctx.StatNode, ctx.Input.AcquireCount)            if r == nil {                _// nil means pass_                continue            }            if r.Status() == base.ResultStatusBlocked {                return r            }            if r.Status() == base.ResultStatusShouldWait {                if waitMs := r.WaitMs(); waitMs > 0 {                    _// Handle waiting action._                    time.Sleep(time.Duration(waitMs) * time.Millisecond)                }                continue            }        }        return result    }    func canPassCheck(tc *TrafficShapingController, node base.StatNode, acquireCount uint32) *base.TokenResult {        return canPassCheckWithFlag(tc, node, acquireCount, 0)    }    func canPassCheckWithFlag(tc *TrafficShapingController, node base.StatNode, acquireCount uint32, flag int32) *base.TokenResult {        return checkInLocal(tc, node, acquireCount, flag)    }    func checkInLocal(tc *TrafficShapingController, resStat base.StatNode, acquireCount uint32, flag int32) *base.TokenResult {        return tc.PerformChecking(resStat, acquireCount, flag)    }

3.4 Exit

sentinel 对 Resource 进行 Check 后,其后续逻辑执行程序是:

  • 1 如果 RuleCheckSlot.Check() 断定 pass 通过则执行 StatSlot.OnEntryPassed(),否则 RuleCheckSlot.Check() 断定 reject 则执行 StatSlot.OnEntryBlocked();
  • 2 如果 RuleCheckSlot.Check() 断定 pass 通过,则执行本次 Action;
  • 3 如果 RuleCheckSlot.Check() 断定 pass 通过,则执行 SentinelEntry.Exit() --> SlotChain.ext() --> StatSlot.OnCompleted() 。

第三步骤的调用链路如下:

StatSlot.OnCompleted()
 _// core/flow/standalone_stat_slot.go_    type StandaloneStatSlot struct {    }    func (s StandaloneStatSlot) OnEntryPassed(ctx *base.EntryContext) {        res := ctx.Resource.Name()        for _, tc := range getTrafficControllerListFor(res) {            if !tc.boundStat.reuseResourceStat {                if tc.boundStat.writeOnlyMetric != nil {                    tc.boundStat.writeOnlyMetric.AddCount(base.MetricEventPass, int64(ctx.Input.AcquireCount))                }            }        }    }    func (s StandaloneStatSlot) OnEntryBlocked(ctx *base.EntryContext, blockError *base.BlockError) {        _// Do nothing_    }    func (s StandaloneStatSlot) OnCompleted(ctx *base.EntryContext) {        _// Do nothing_    }
SlotChain.exit()
 _// core/base/slot_chain.go_    type SlotChain struct {    }    func (sc *SlotChain) exit(ctx *EntryContext) {        _// The OnCompleted is called only when entry passed_        if ctx.IsBlocked() {            return        }        for _, s := range sc.stats {            s.OnCompleted(ctx)        }    }
SentinelEntry.Exit()
 _// core/base/entry.go_    type SentinelEntry struct {        sc *SlotChain        exitCtl sync.Once    }    func (e *SentinelEntry) Exit() {        e.exitCtl.Do(func() {            if e.sc != nil {                e.sc.exit(ctx)            }        })    }

从下面执行可见,StatSlot.OnCompleted() 是在 Action 【如一次 RPC 的申请-响应 Invokation】实现之后调用的。如果有的组件须要计算一次 Action 的工夫消耗  RT,就在其对应的 StatSlot.OnCompleted() 中根据 EntryContext.startTime 实现工夫消耗计算。

[3.5 SlotChain]()

Sentinel 实质是一个流控包,不仅提供了限流性能,还提供了泛滥其余诸如自适应流量爱护、熔断降级、冷启动、全局流量 Metrics 后果等性能流控组件,Sentinel-Go 包定义了一个 SlotChain 实体存储其所有的流控组件。

 _// core/base/slot_chain.go_    _// SlotChain hold all system slots and customized slot._    _// SlotChain support plug-in slots developed by developer._    type SlotChain struct {        statPres   []StatPrepareSlot        ruleChecks []RuleCheckSlot        stats      []StatSlot    }    _// The entrance of slot chain_    _// Return the TokenResult and nil if internal panic._    func (sc *SlotChain) Entry(ctx *EntryContext) *TokenResult {        _// execute prepare slot_        sps := sc.statPres        if len(sps) > 0 {            for _, s := range sps {                s.Prepare(ctx)            }        }        _// execute rule based checking slot_        rcs := sc.ruleChecks        var ruleCheckRet *TokenResult        if len(rcs) > 0 {            for _, s := range rcs {                sr := s.Check(ctx)                if sr == nil {                    _// nil equals to check pass_                    continue                }                _// check slot result_                if sr.IsBlocked() {                    ruleCheckRet = sr                    break                }            }        }        if ruleCheckRet == nil {            ctx.RuleCheckResult.ResetToPass()        } else {            ctx.RuleCheckResult = ruleCheckRet        }        _// execute statistic slot_        ss := sc.stats        ruleCheckRet = ctx.RuleCheckResult        if len(ss) > 0 {            for _, s := range ss {                _// indicate the result of rule based checking slot._                if !ruleCheckRet.IsBlocked() {                    s.OnEntryPassed(ctx)                } else {                    _// The block error should not be nil._                    s.OnEntryBlocked(ctx, ruleCheckRet.blockErr)                }            }        }        return ruleCheckRet    }    func (sc *SlotChain) exit(ctx *EntryContext) {        if ctx == nil || ctx.Entry() == nil {            logging.Error(errors.New("nil EntryContext or SentinelEntry"), "")            return        }        _// The OnCompleted is called only when entry passed_        if ctx.IsBlocked() {            return        }        for _, s := range sc.stats {            s.OnCompleted(ctx)        }        _// relieve the context here_    }

倡议:Sentinel 包针对某个 Resource 无奈确知其应用了那个组件,在运行时会针对某个 Resource 的 EntryContext 顺次执行所有的组件的 Rule。Sentinel-golang 为何不给用户相干用户提供一个接口让其设置应用的流控组件汇合,以缩小上面函数 SlotChain.Entry() 中执行 RuleCheckSlot.Check() 执行次数?相干改良曾经提交到 pr 264【补充,代码已合并,据负责人压测后回复 sentinel-go 效率整体晋升 15%】。

globalSlotChain

Sentinel-Go 定义了一个 SlotChain 的 package 级别的全局公有变量 globalSlotChain 用于存储其所有的流控组件对象。相干代码示例如下。因本文只关注限流组件,所以上面只给出了限流组件的注册代码。

 _// api/slot_chain.go_    func BuildDefaultSlotChain() *base.SlotChain {        sc := base.NewSlotChain()        sc.AddStatPrepareSlotLast(&stat.ResourceNodePrepareSlot{})        sc.AddRuleCheckSlotLast(&flow.Slot{})        sc.AddStatSlotLast(&flow.StandaloneStatSlot{})        return sc    }    var globalSlotChain = BuildDefaultSlotChain()
Entry

在 Sentinel-Go 对外的最重要的入口函数 api/api.go:Entry() 中,globalSlotChain 会作为 EntryOptions 的 SlotChain 参数被应用。

 _// api/api.go_    _// Entry is the basic API of Sentinel._    func Entry(resource string, opts ...EntryOption) (*base.SentinelEntry, *base.BlockError) {        options := entryOptsPool.Get().(*EntryOptions)        options.slotChain = globalSlotChain        return entry(resource, options)    }

Sentinel 的演进离不开社区的奉献。Sentinel Go 1.0 GA 版本行将在近期公布,带来更多云原生相干的个性。咱们十分欢送感兴趣的开发者参加奉献,一起来主导将来版本的演进。咱们激励任何模式的奉献,包含但不限于:

• bug fix
• new features/improvements
• dashboard
• document/website
• test cases

开发者能够在 GitHub 下面的 good first issue 列表上筛选感兴趣的 issue 来参加探讨和奉献。咱们会重点关注积极参与奉献的开发者,外围贡献者会提名为 Committer,一起主导社区的倒退。咱们也欢送大家有任何问题和倡议,都能够通过 GitHub issue、Gitter 或钉钉群(群号:30150716)等渠道进行交换。Now start hacking!

• Sentinel Go repo: https://github.com/alibaba/sentinel-golang
• 企业用户欢送进行注销:https://github.com/alibaba/Sentinel/issues/18

作者简介

于雨(github @AlexStocks),apache/dubbo-go 我的项目负责人,一个有十多年服务端基础架构研发一线工作教训的程序员,目前在蚂蚁金服可信原生部从事容器编排和 service mesh 工作。酷爱开源,从 2015 年给 Redis 奉献代码开始,陆续改良过 Muduo/Pika/Dubbo/Dubbo-go 等出名我的项目。

原文链接
本文为阿里云原创内容,未经容许不得转载。