作者:binbin0325
背景介绍
随着微服务的风行,服务和服务之间的稳定性变得越来越重要。在 2020 年,Sentinel 社区推出 Sentinel Go 版本,朝着云原生方向演进。Sentinel Go 是一个流量治理组件,次要以流量为切入点,从流量路由、流量管制、流量整形、熔断降级、零碎自适应过载爱护、热点流量防护等多个维度来帮忙开发者保障微服务的稳定性。
无论是流量管制还是熔断降级,实现的核心思想都是通过统计一段时间内的指标数据(申请数 / 谬误数等),而后依据预选设定的阈值判断是否应该进行流量管控
那么如何存储并统计这一段时间内的指标数据则是外围要害,本文将揭秘 Sentienl-Go 是如何实现的 毫秒级指标数据存储与统计。
固定窗口
在正式介绍之前,先简略介绍一下固定窗口的算法(也叫计数器算法)是实现流量管制比较简单的一种形式。其余常见的还有很多例如滑动工夫窗口算法,漏桶算法,令牌桶算法等等。
固定窗口算法个别是通过原子操作将申请在统计周期内进行累加,而后当申请数大于阈值时进行限流。
实现代码:
var (
counter int64 // 计数
intervalMs int64 = 1000 // 窗口长度(1S)
threshold int64 = 2 // 限流阈值
startTime = time.Now().UnixMilli() // 窗口开始工夫
)
func main() {
for i := 0; i < 10; i++ {if tryAcquire() {fmt.Println("胜利申请", time.Now().Unix())
}
}
}
func tryAcquire() bool {if time.Now().UnixMilli()-atomic.LoadInt64(&startTime) > intervalMs {atomic.StoreInt64(&startTime, time.Now().UnixMilli())
atomic.StoreInt64(&counter, 0)
}
return atomic.AddInt64(&counter, 1) <= threshold
}
固定窗口的限流在实现上看起来比较简单容易,然而也有一些问题,最典型的就是“边界”问题。
如下图:统计周期为 1S,限流阈值是 2 的状况下,假如 4 次申请恰好“逾越”了固定的工夫窗口,如红色的 1SS 工夫窗口所示会有四次申请,显著不合乎限流的预期。
滑动工夫窗口
在滑动工夫窗口算法中能够解决固定窗口算法的边界问题,在滑动窗口算法中通常有两个比拟重要的概念
- 统计周期:例如想限度 5S 的申请数不能超过 100 次,那么 5S 就是统计周期
- 窗口 (格子) 的大小:一个周期内会有多个窗口 (格子) 进行指标(例如申请数)的统计,长度相等的统计周期,格子的数量越多,统计的越准确
如下所示:统计周期为 1S,每个周期内分为两个格子,每个格子的长度是 500ms。
在滑动窗口中统计周期以及窗口的大小,须要依据业务状况进行设定。
统计周期统一,窗口大小不统一:窗口越大统计精准度越低,但并发性能好,越小:统计精准度越高,并发性能随之升高;
统计周期不统一,窗口大小统一:周期越长抗流量脉冲状况越好。
统计构造
上面将具体介绍 Sentinel-Go 是如何应用滑动工夫窗口高效的存储和统计指标数据的。
窗口构造
在滑动工夫窗口中工夫很重要。每个窗口(BocketWrap)的组成是由一个开始工夫和一个形象的统计构造:
type BucketWrap struct {
// BucketStart represents start timestamp of this statistic bucket wrapper.
BucketStart uint64
// Value represents the actual data structure of the metrics (e.g. MetricBucket).
Value atomic.Value
}
开始工夫:以后格子的的起始工夫
统计构造:存储指标数据,原子操作并发平安
如下图:统计周期 1S,每个窗口的长度是 200ms。
指标数据:
- pass: 示意到来的数量,即此刻通过 Sentinel-Go 规定的流量数量
- block: 示意被拦挡的流量数量
- complete: 示意实现的流量数量,蕴含失常完结和异样完结的状况
- error: 示意谬误的流量数量(熔断场景应用)
- rt:单次申请的 request time
- total:临时无用
原子工夫轮
如上:整个统计周期内有多个工夫窗口,在 Sentinel-Go 中统计周期是由 slice 实现的,每个元素对应一个窗口。
在下面介绍了为了解决边界问题,滑动工夫窗口统计的过程须要向右滑动。随时工夫的推移,有限的向右滑动,势必会让 slice 继续的扩张,导致 slice 的容量“有限”增长。
为了解决这个问题,在 Sentinel-Go 中实现了一个 工夫轮 的概念,通过固定 slice 长度将过期的工夫窗口重置,节俭空间。
如下:原子工夫轮数据结构
type AtomicBucketWrapArray struct {
// The base address for real data array
base unsafe.Pointer // 窗口数组首元素地址
// The length of slice(array), it can not be modified.
length int // 窗口数组的长度
data []*BucketWrap // 窗口数组}
- 初始化
1: 依据以后工夫计算出以后工夫对应的窗口的 startime,并失去以后窗口对应的地位
// 计算开始工夫
func calculateStartTime(now uint64, bucketLengthInMs uint32) uint64 {return now - (now % uint64(bucketLengthInMs))
}
// 窗口下标地位
idx := int((now / uint64(bucketLengthInMs)) % uint64(len))
2:初始化窗口数据结构(BucketWrap)
for i := idx; i <= len-1; i++ {
ww := &BucketWrap{
BucketStart: startTime,
Value: atomic.Value{},}
ww.Value.Store(generator.NewEmptyBucket())
ret.data[i] = ww
startTime += uint64(bucketLengthInMs)
}
for i := 0; i < idx; i++ {
ww := &BucketWrap{
BucketStart: startTime,
Value: atomic.Value{},}
ww.Value.Store(generator.NewEmptyBucket())
ret.data[i] = ww
startTime += uint64(bucketLengthInMs)
}
3:将窗口数组首元素地址设置到原子工夫轮
// calculate base address for real data array
sliHeader := (*util.SliceHeader)(unsafe.Pointer(&ret.data))
ret.base = unsafe.Pointer((**BucketWrap)(unsafe.Pointer(sliHeader.Data)))
如果对 unsafe.Pointer 和 slice 相熟的同学,对于这段代码不难理解。这里通过 unsafe.Pointer 将底层 slice 首元素(第一个窗口)地址设置到原子工夫轮中。这么做的起因次要是实现对工夫轮中的元素(窗口)进行 原子无锁的读取和更新。
- 窗口获取 & 窗口替换
如何在并发平安的状况下读取窗口和对窗口进行替换(工夫轮波及到对窗口更新操作)代码如下:
// 获取对应窗口
func (aa *AtomicBucketWrapArray) get(idx int) *BucketWrap {// aa.elementOffset(idx) return the secondary pointer of BucketWrap, which is the pointer to the aa.data[idx]
// then convert to (*unsafe.Pointer)
if offset, ok := aa.elementOffset(idx); ok {return (*BucketWrap)(atomic.LoadPointer((*unsafe.Pointer)(offset)))
}
return nil
}
// 替换对应窗口
func (aa *AtomicBucketWrapArray) compareAndSet(idx int, except, update *BucketWrap) bool {// aa.elementOffset(idx) return the secondary pointer of BucketWrap, which is the pointer to the aa.data[idx]
// then convert to (*unsafe.Pointer)
// update secondary pointer
if offset, ok := aa.elementOffset(idx); ok {return atomic.CompareAndSwapPointer((*unsafe.Pointer)(offset), unsafe.Pointer(except), unsafe.Pointer(update))
}
return false
}
// 获取对应窗口的地址
func (aa *AtomicBucketWrapArray) elementOffset(idx int) (unsafe.Pointer, bool) {
if idx >= aa.length || idx < 0 {logging.Error(errors.New("array index out of bounds"),
"array index out of bounds in AtomicBucketWrapArray.elementOffset()",
"idx", idx, "arrayLength", aa.length)
return nil, false
}
basePtr := aa.base
return unsafe.Pointer(uintptr(basePtr) + uintptr(idx)*unsafe.Sizeof(basePtr)), true
}
获取窗口:
- 在 get func 中接管依据以后工夫计算出的窗口对应下标地位
- 依据下标地位在 elementOffset func 中,首先将底层的 slice 首元素地址转换成 uintptr,而后将窗口对应下标 * 对应的指针字节大小即能够失去对应窗口元素的地址
- 将对应窗口地址转换成工夫窗口(*BucketWarp)即可
窗口更新:
和获取窗口一样,获取到对应下标地位的窗口地址,而后利用 atomic.CompareAndSwapPointer 进行 cas 更新,将新的窗口指针地址更新到底层数组中。
滑动窗口
在原子工夫轮中提供了对窗口读取以及更新的操作。那么 在什么机会触发更新以及如何滑动?
- 滑动
所谓滑动就是依据以后工夫找到整个统计周期的所有窗口中的数据。例如在限流场景下,咱们须要获取统计周期内的所有 pass 的流量,从而来判断以后流量是否应该被限流。
外围代码:
// 依据以后工夫获取周期内的所有窗口
func (m *SlidingWindowMetric) getSatisfiedBuckets(now uint64) []*BucketWrap {start, end := m.getBucketStartRange(now)
satisfiedBuckets := m.real.ValuesConditional(now, func(ws uint64) bool {return ws >= start && ws <= end})
return satisfiedBuckets
}
// 依据以后工夫获取整个周期对应的窗口的开始工夫和完结工夫
func (m *SlidingWindowMetric) getBucketStartRange(timeMs uint64) (start, end uint64) {curBucketStartTime := calculateStartTime(timeMs, m.real.BucketLengthInMs())
end = curBucketStartTime
start = end - uint64(m.intervalInMs) + uint64(m.real.BucketLengthInMs())
return
}
// 匹配符合条件的窗口
func (la *LeapArray) ValuesConditional(now uint64, predicate base.TimePredicate) []*BucketWrap {
if now <= 0 {return make([]*BucketWrap, 0)
}
ret := make([]*BucketWrap, 0, la.array.length)
for i := 0; i < la.array.length; i++ {ww := la.array.get(i)
if ww == nil || la.isBucketDeprecated(now, ww) || !predicate(atomic.LoadUint64(&ww.BucketStart)) {continue}
ret = append(ret, ww)
}
return ret
}
如下图所示:统计周期 =1000ms(跨两个格子),now=1300 时 计算出 start=500,end=1000
那么在计算周期内的 pass 数量时,会依据如下条件遍历格子,也就会找到开始工夫是 500 和 1000 的两个格子,那么统计的时候 1000 的这个格子中的数据天然也会被统计到。(以后工夫 1300,在 1000 的这个格子中)
satisfiedBuckets := m.real.ValuesConditional(now, func(ws uint64) bool {return ws >= start && ws <= end})
- 更新
每次流量通过时都会进行相应的指标存储,在存储时会先获取对应的窗口,而后会依据窗口的开始工夫进行比照,如果过期则进行窗口重置。
如下图:依据窗口开始工夫匹配发现 0 号窗口已过期。
如下图:重置窗口的开始工夫和统计指标。
外围代码:
func (la *LeapArray) currentBucketOfTime(now uint64, bg BucketGenerator) (*BucketWrap, error) {
// 计算以后工夫对应的窗口下标
idx := la.calculateTimeIdx(now)
// 计算以后工夫对应的窗口的开始工夫
bucketStart := calculateStartTime(now, la.bucketLengthInMs)
for {
// 获取旧窗口
old := la.array.get(idx)
// 如果旧窗口 ==nil 则初始化(失常不会执行这部分代码)
if old == nil {
newWrap := &BucketWrap{
BucketStart: bucketStart,
Value: atomic.Value{},}
newWrap.Value.Store(bg.NewEmptyBucket())
if la.array.compareAndSet(idx, nil, newWrap) {return newWrap, nil} else {runtime.Gosched()
}
// 如果本次计算的开始工夫等于旧窗口的开始工夫,则认为窗口没有过期,间接返回
} else if bucketStart == atomic.LoadUint64(&old.BucketStart) {
return old, nil
// 如果本次计算的开始工夫大于旧窗口的开始工夫,则认为窗口过期尝试重置
} else if bucketStart > atomic.LoadUint64(&old.BucketStart) {if la.updateLock.TryLock() {old = bg.ResetBucketTo(old, bucketStart)
la.updateLock.Unlock()
return old, nil
} else {runtime.Gosched()
}
......
}
}
总结
通过下面的介绍能够理解到在 Sentienl-Go 中实现底层指标的统计代码量并不多,实质是通过“工夫轮”进行指标的数据统计和存储,在工夫轮中借鉴 slice 的底层实现利用 unsafe.Pointer 和 atomic 配合对工夫轮进行无锁的原子操作,极大的晋升了性能。
Sentinel-GO 整体的数据结构图:
作者介绍:
张斌斌(Github 账号:binbin0325,公众号:柠檬汁 Code),Sentinel-Golang Committer、ChaosBlade Committer、Nacos PMC、Apache Dubbo-Go Committer。目前次要关注于混沌工程、中间件以及云原生方向。
文章参考:
《golang unsafe.Pointer 应用准则以及 uintptr 暗藏的坑》
https://louyuting.blog.csdn.n…