Leaky Bucket 漏桶
漏桶可了解为是一个限定容量的申请队列。
设想有一个桶,有水(指申请或数据)从下面流进来,水从桶上面的一个孔流进去。水流进桶的速度能够是随机的,然而水流出桶的速度是恒定的。
当水流进桶的速度较慢,桶不会被填满,申请就能够被解决。
当水流进桶的速度过快时,桶会逐步被填满,当水超过桶的容量就会溢出,即被抛弃。
class LeakyBucketRateLimiter(object):
def __init__(self, capacity, leak_rate):
# 桶容量
self.capacity = capacity
# 水流出的速率 每秒 n 个单位
self.rate = leak_rate
# 上一次漏水的工夫,用来跟本次工夫做差值,乘以水流速率计算出这段时间内流出了多少水
self.last_leak_time = int(time.time())
# 桶中以后残余的水
self.remain_water = 0
def allow_request(self, require_units=1):
now = int(time.time())
leaked_water = (now - self.last_leak_time) * self.rate
self.remain_water = max(0, self.remain_water - leaked_water)
self.last_leak_time = now
print(f"刚刚流出 {leaked_water}, 桶容量已达 {self.remain_water}")
if self.remain_water + require_units <= self.capacity:
self.remain_water += require_units
return True
return False
if __name__ == "__main__":
leaky_bucket = LeakyBucketRateLimiter(3, 1)
for i in range(10):
print(f"限流后果:", leaky_bucket.allow_request())
print("----")
s = random.randint(1, 5) / 10
time.sleep(s)
后果
刚刚流出 0, 桶容量已达 0
限流后果: True
----
刚刚流出 0, 桶容量已达 1
限流后果: True
----
刚刚流出 1, 桶容量已达 1
限流后果: True
----
刚刚流出 0, 桶容量已达 2
限流后果: True
----
刚刚流出 1, 桶容量已达 2
限流后果: True
----
刚刚流出 0, 桶容量已达 3
限流后果: False
----
刚刚流出 0, 桶容量已达 3
限流后果: False
----
刚刚流出 1, 桶容量已达 2
限流后果: True
----
刚刚流出 0, 桶容量已达 3
限流后果: False
----
刚刚流出 0, 桶容量已达 3
限流后果: False
----
Token Bucket 令牌桶
在令牌桶算法中,零碎会以一个固定的速率向桶中增加令牌。
当有申请(或数据包)到来时,会从桶中删除肯定数量的令牌。如果桶中有足够的令牌,申请就能够立刻解决;
如果桶中没有足够的令牌,申请就会被阻塞或抛弃,具体行为取决于具体的实现。
桶有容量限度,如果增加令牌时桶已满,新的令牌就会被抛弃。
class TokenBucketRateLimiter(object):
def __init__(self, capacity, refill_rate):
# 容量
self.capacity = capacity
# 产生令牌的速率
self.refill_rate = refill_rate
# 以后已有的可用令牌数
self.current_tokens = capacity
# 上一次产生令牌的工夫,用来与以后工夫计算并乘以速率,得出这段时间内产生了多少令牌
self.last_refill_time = int(time.time())
def allow_request(self, token_needs=1):
now = int(time.time())
new_tokens = (now - self.last_refill_time) * self.refill_rate
self.current_tokens += min(self.capacity, new_tokens)
self.last_refill_time = now
print(f"刚刚新产生 {new_tokens}, 残余 {self.current_tokens}")
if self.current_tokens >= token_needs:
self.current_tokens -= token_needs
return True
return False
if __name__ == "__main__":
token_bucket = TokenBucketRateLimiter(3, 1)
for i in range(10):
print(f"限流后果:", token_bucket.allow_request())
print("----")
s = random.randint(1, 5) / 10
time.sleep(s)
后果
刚刚新产生 0, 残余 3
限流后果: True
----
刚刚新产生 0, 残余 2
限流后果: True
----
刚刚新产生 0, 残余 1
限流后果: True
----
刚刚新产生 1, 残余 1
限流后果: True
----
刚刚新产生 0, 残余 0
限流后果: False
----
刚刚新产生 0, 残余 0
限流后果: False
----
刚刚新产生 1, 残余 1
限流后果: True
----
刚刚新产生 0, 残余 0
限流后果: False
----
刚刚新产生 1, 残余 1
限流后果: True
----
刚刚新产生 0, 残余 0
限流后果: False
----
Fixed Window 固定窗口
在一个固定的工夫窗口内,只容许肯定数量的申请。
如果在这个工夫窗口内的申请曾经达到了限度,那么新的申请就会被回绝,过了以后工夫窗口后,会进入下一个工夫窗口,并重置窗口内的申请数量,从新计算。
class FixedWindowRateLimiter(object):
def __init__(self, max_requests, window_size):
# 单个窗口大小 单位秒
self.window_size = window_size
# 单个窗口中最大可解决的申请数量
self.max_requests = max_requests
# 以后窗口已解决申请
self.current_request = 0
# 窗口计算起始点
self.last_window_start = int(time.time())
def allow_request(self):
now = int(time.time())
if now - self.last_window_start >= self.window_size:
print("窗口刷新")
self.last_window_start = now
self.current_request = 0
print(f"以后窗口已解决申请 {self.current_request} 个")
if self.current_request < self.max_requests:
self.current_request += 1
return True
return False
if __name__ == "__main__":
limiter = FixedWindowRateLimiter(3, 2)
for i in range(10):
print(f"限流后果:", limiter.allow_request())
print("----")
s = random.randint(1, 5) / 10
time.sleep(s)
后果
以后窗口已解决申请 0 个
限流后果: True
----
以后窗口已解决申请 1 个
限流后果: True
----
以后窗口已解决申请 2 个
限流后果: True
----
以后窗口已解决申请 3 个
限流后果: False
----
以后窗口已解决申请 3 个
限流后果: False
----
以后窗口已解决申请 3 个
限流后果: False
----
以后窗口已解决申请 3 个
限流后果: False
----
以后窗口已解决申请 3 个
限流后果: False
----
以后窗口已解决申请 3 个
限流后果: False
----
窗口刷新
以后窗口已解决申请 0 个
限流后果: True
----
Sliding Window 滑动窗口
在固定窗口限流算法中,如果大量申请在一个工夫窗口的边界左近达到,可能会造成刹时的流量突增。
滑动窗口随着工夫的推移,动静统计申请量,防止了在窗口边界左近的流量突增。
class SlidingWindowRateLimiter(object):
def __init__(self, max_requests, window_size):
# 最大申请量
self.max_requests = max_requests
# 窗口大小
self.window_size = window_size
# 寄存每个申请的工夫
self.requests_list = collections.deque()
def allow_request(self):
now = int(time.time())
while self.requests_list and self.requests_list[0] <= (now - self.window_size):
self.requests_list.popleft()
print(f"以后窗口已解决申请 {len(self.requests_list)} 个")
if len(self.requests_list) < self.max_requests:
self.requests_list.append(now)
return True
return False
if __name__ == "__main__":
limiter = SlidingWindowRateLimiter(3, 2)
for i in range(10):
print(f"限流后果:", limiter.allow_request())
print('----')
s = random.randint(1, 5) / 10
time.sleep(s)
后果
以后窗口已解决申请 0 个
限流后果: True
----
以后窗口已解决申请 1 个
限流后果: True
----
以后窗口已解决申请 2 个
限流后果: True
----
以后窗口已解决申请 3 个
限流后果: False
----
以后窗口已解决申请 3 个
限流后果: False
----
以后窗口已解决申请 2 个
限流后果: True
----
以后窗口已解决申请 3 个
限流后果: False
----
以后窗口已解决申请 3 个
限流后果: False
----
以后窗口已解决申请 1 个
限流后果: True
----
以后窗口已解决申请 2 个
限流后果: True
----
还有一种形式,不须要记录具体每个申请的工夫点,而通过计算滑动窗口与固定窗口之间工夫的偏移,估算出滑动窗口中申请量,这个形式不太精确,但节俭了申请工夫点的存储老本。
图中所示,已知设窗口大小为 1 小时即 60 分钟,每个窗口内可解决 100 申请,以后工夫为 1:15。
在 [12:00–1:00) 的整个绿色窗口中一共有 84 个申请,在 [1:00 to 1:15) 的黄色窗口中,15 分钟内曾经解决了 36 个申请,如何计算以后窗口残余容量?
通过计算滑动窗口与前一窗口重叠局部占比,来估算前一窗口中被占用的容量,(60 分钟 -15 分钟)/60
分钟 示意滑动窗口减去黄色以后窗口后,与绿色窗口的重叠,占绿色窗口整体的百分比,也就估算出重叠局部的申请量在总共 84 个申请的占比,得出 63 个申请,再加上以后窗口的 36 申请,一共是 99 申请,那么以后滑动窗口的残余容量就是 100 – 99 = 1 个申请容量。
class SlidingWindowRateLimiterByEstimate(object):
def __init__(self, max_requests, window_size):
self.max_requests = max_requests
self.window_size = window_size
# 以后窗口起始工夫
self.current_window_start = time.time()
# 前一个窗口申请量
self.pre_count = 0
# 以后窗口申请量
self.current_count = 0
def allow_request(self):
now = time.time()
if (now - self.current_window_start) > self.window_size:
# 滑过残缺的一个窗口,重置
self.current_window_start = now
self.pre_count = self.current_count
self.current_count = 0
# 通过计算滑动窗口与前一个窗口重叠局部,占整个窗口的占比,估算重叠局部的申请量
# 再加上以后窗口的申请量
# 得出滑动窗口中的申请量
estimate_count = (self.pre_count * (self.window_size - (now - self.current_window_start)) / self.window_size) + self.current_count
print(f"估算申请量:{estimate_count}")
if estimate_count > self.max_requests:
return False
self.current_count += 1
return True
if __name__ == "__main__":
limiter = SlidingWindowRateLimiterByEstimate(3, 2)
for i in range(10):
print(f"限流后果:", limiter.allow_request())
print('----')
s = random.randint(1, 5) / 10
time.sleep(s)
后果
估算申请量:0.0
限流后果: True
----
估算申请量:1.0
限流后果: True
----
估算申请量:2.0
限流后果: True
----
估算申请量:3.0
限流后果: True
----
估算申请量:4.0
限流后果: False
----
估算申请量:4.0
限流后果: False
----
估算申请量:4.0
限流后果: False
----
估算申请量:3.3901939392089844
限流后果: False
----
估算申请量:2.775090217590332
限流后果: True
----
估算申请量:3.366755485534668
限流后果: False
----
参考:
- 图解 + 代码 | 常见限流算法以及限流在单机分布式场景下的思考:https://segmentfault.com/a/1190000023552181
- Design a Scalable Rate Limiting Algorithm — System Design:https://medium.com/@NlognTeam/design-a-scalable-rate-limiting…
- How to Design a Scalable Rate Limiting Algorithm with Kong API:https://konghq.com/blog/how-to-design-a-scalable-rate-limitin…
- 漏桶算法和令牌桶算法,区别到底在哪里?:https://xie.infoq.cn/article/4a0acdd12a0f6dd4a53e0472c
- Rate limiting using the Sliding Window algorithm:https://dev.to/satrobit/rate-limiting-using-the-sliding-window-algorithm-5fjn
本文由 mdnice 多平台公布