作者:杨奇龙
本文起源:原创投稿
*爱可生开源社区出品,原创内容未经受权不得随便应用,转载请分割小编并注明起源。
一、前言
作为热点频出的电商零碎,常常遇到高并发,热点秒杀的场景。咱们在开发设计高并发海量业务申请的零碎时,通常利用三板斧:缓存、降级和限流来保障系统稳定性。
缓存:使业务数据更凑近数据的使用者,晋升程序拜访数据速度和增大零碎QPS容量。降级:当零碎容量达到肯定衰弱阈值时依据以后业务状况及流量对一些服务和页面有策略的降级,以此开释服务器资源以保障外围工作的失常运行。
限流:通过对并发拜访/申请进行限速,或者对一个工夫窗口内的申请进行限速来爱护零碎稳固可用,一旦达到限度速率则能够拒绝服务、排队或期待、降级等解决。
本文聊聊限流的罕用算法,并且通过案例测试验证令牌桶算法。
二、限流算法
目前程序开发过程罕用的限流算法有两个:漏桶算法和令牌桶算法。
漏桶算法
漏桶算法的原理比较简单,申请进入到漏桶中,漏桶以肯定的速率漏水。当申请过多时,水间接溢出。能够看出,漏桶算法能够强制限度数据的传输速度。
如图所示,把申请比作是水滴,水先滴到桶里,通过破绽并以限定的速度出水,当水来得过猛而出水不够快时就会导致水间接溢出,即拒绝服务。
图片来自网络
漏桶的出水速度是恒定的,那么意味着如果刹时大流量的话,将有大部分申请被抛弃掉(也就是所谓的溢出)。
令牌桶算法
令牌桶算法的原理是零碎以肯定速率向桶中放入令牌,如果有申请时,申请会从桶中取出令牌,如果能取到令牌,则能够持续实现申请,否则期待或者拒绝服务。这种算法能够应答突发水平的申请,因而比漏桶算法好。
图片来自网络
漏桶算法和令牌桶算法的抉择
两者的次要区别 漏桶算法可能强行限度解决数据的速率,不管零碎是否闲暇。而令牌桶算法可能在限度数据的均匀解决速率的同时还容许某种程度的突发流量。
如何了解下面的含意呢?
漏桶算法 比方零碎吞吐量是 120/s,业务申请 130/s,应用漏斗限流 100/s,起到限流的作用,多余的申请将产生期待或者抛弃。
对于令牌桶算法,每秒产生 100 个令牌,零碎容量 200 个令牌。失常状况下,业务申请 100/s 时,申请能被失常被解决。当有突发流量过去比方 200 个申请时,因为零碎容量有 200 个令牌能够同一时刻解决掉这 200 个申请。如果是漏桶算法,则只能解决 100 个申请,其余的申请期待或者被抛弃。
三、代码实现
本案例应用 python 基于令牌桶算法进行测试。
# encoding: utf-8"""author: yangyi@youzan.comtime: 2020/9/9 10:43 PMfunc: """import timeimport multiprocessingTEST = { # 测试 {'test1': 20} 每秒产生的令牌数量 'all': { 'test1': {'test1': 20}, 'test2': {'test2': 50}, 'test3': {'test3': 80}, 'test4': {'test4': 100},#示意突发100个申请 'test5': {'test5': 200},#示意突发200个申请 'test6': {'test6': 20}, }}class TokenBucket(object): # rate是令牌发放速度,capacity是桶的大小 def __init__(self, rate, capacity): self._rate = rate self._capacity = capacity self._current_amount = 0 self._last_consume_time = int(time.time()) # token_amount是发送数据须要的令牌数 def consume(self, token_amount): time.sleep(1) # 计算从上次发送到这次发送,新发放的令牌数量 increment = (int(time.time()) - self._last_consume_time) * self._rate # 令牌数量不能超过桶的容量 self._current_amount = min( increment + self._current_amount, self._capacity) # 如果没有足够的令牌,则不能发送数据 if token_amount > self._current_amount: return False self._last_consume_time = int(time.time()) self._current_amount -= token_amount return Truedef job(): i = 100 while i>1: for result in result_dict.values(): key = tuple(result.keys())[0] rate = tuple(result.values())[0] i = i-1 if i <= 0: break if not limiter.consume(rate): print(key + ' 限流') else: print(key + ' 失常')def run(): threads = [multiprocessing.Process(target=job) for i in range(3)] for thread in threads: thread.start()if __name__ == '__main__': result_dict = TEST["all"] RATE = 30 CAPACITY = 120 limiter = TokenBucket(RATE, CAPACITY) run()
这段测试代码比较简单,大家能够联合本人的业务场景在编写工具时看看哪里能够应用到限流算法。
四、参考文章
https://github.com/titan-web/...
https://www.jianshu.com/p/c6b...
https://www.simpleapples.com/...