限流简述
限流算法在分布式畛域是一个常常被提起的话题,当零碎的解决能力无限时,如何阻止计划外的申请持续对系统施压,这是一个须要器重的问题。
除了管制流量,限流还有一个利用目标是用于管制用户行为,防止垃圾申请。比方在 UGC 社区,用户的发帖、回复、点赞等行为都要严格受控,个别要严格限定某行为在规定工夫内容许的次数,超过了次数那就是非法行为。对非法行为,业务必须规定适当的表彰策略。
# 指定用户 user_id 的某个行为 action_key 在特定的工夫内 period 只容许产生肯定的次数 max_count
def is_action_allowed(user_id, action_key, period, max_count):
return True
# 调用这个接口 , 一分钟内只容许最多回复 5 个帖子
can_reply = is_action_allowed("110", "reply", 60, 5)
if can_reply:
do_reply()
else:
raise ActionThresholdOverflow()
计数限流
pipe 与 zset
如图所示,用一个 zset 构造记录用户的行为历史,每一个行为都会作为 zset 中的一个 key 保留下来。同一个用户同一种行为用一个 zset 记录。
为节俭内存,咱们只须要保留工夫窗口内的行为记录,同时如果用户是冷用户,滑动工夫窗口内的行为是空记录,那么这个 zset 就能够从内存中移除,不再占用空间。
通过统计滑动窗口内的行为数量与阈值 max\_count 进行比拟就能够得出以后的行为是否容许。用代码示意如下:
function isActionAllowed($userId, $action, $period, $maxCount)
{
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$key = sprintf('hist:%s:%s', $userId, $action);
$now = msectime(); # 毫秒工夫戳
$pipe=$redis->multi(Redis::PIPELINE); //应用管道晋升性能
$pipe->zadd($key, $now, $now); //value 和 score 都应用毫秒工夫戳
$pipe->zremrangebyscore($key, 0, $now - $period); //移除工夫窗口之前的行为记录,剩下的都是工夫窗口内的
$pipe->zcard($key); //获取窗口内的行为数量
$pipe->expire($key, $period + 1); //多加一秒过期工夫
$replies = $pipe->exec();
return $replies[2] <= $maxCount;
}
for ($i=0; $i<20; $i++){
var_dump(isActionAllowed("110", "reply", 60*1000, 5)); //执行能够发现只有前5次是通过的
}
//返回以后的毫秒工夫戳
function msectime() {
list($msec, $sec) = explode(' ', microtime());
$msectime = (float)sprintf('%.0f', (floatval($msec) + floatval($sec)) * 1000);
return $msectime;
}
应用lua脚本
10秒内只能拜访3次。 后续该脚本能够在nginx或者程序运行脚本中间接应用,判断返回是否为0,就0就不让其持续拜访。
-- redis-cli --eval ratelimiting.lua rate.limitingl:127.0.0.1 , 10 3
-- rate.limitingl + 1
local times = redis.call('incr',KEYS[1])
-- 第一次拜访的时候加上过期工夫10秒(10秒过后从新计数)
if times == 1 then
redis.call('expire',KEYS[1], ARGV[1])
end
-- 留神,从redis进来的默认为字符串,lua同种数据类型只能和同种数据类型比拟
if times > tonumber(ARGV[2]) then
return 0
end
return 1
漏桶限流
漏斗的容量是无限的,如果将漏嘴堵住,而后始终往里面灌水,它就会变满,直至再也装不进去。如果将漏嘴放开,水就会往下流,流走一部分之后,就又能够持续往里面灌水。如果漏嘴流水的速率大于灌水的速率,那么漏斗永远都装不满。如果漏嘴流水速率小于灌水的速率,那么一旦漏斗满了,灌水就须要暂停并期待漏斗凌空。
所以,漏斗的残余空间就代表着以后行为能够继续进行的数量,漏嘴的流水速率代表着零碎容许该行为的最大频率。上面咱们应用代码来形容单机漏斗算法。
<?php
class Funnel {
private $capacity;
private $leakingRate;
private $leftQuote;
private $leakingTs;
public function __construct($capacity, $leakingRate)
{
$this->capacity = $capacity; //漏斗容量
$this->leakingRate = $leakingRate;//漏斗流水速率
$this->leftQuote = $capacity; //漏斗残余空间
$this->leakingTs = time(); //上一次漏水工夫
}
/**
* 漏斗算法的外围
* 其在每次灌水前都会被调用以触发漏水,给漏斗腾出空间来。
* 能腾出多少空间取决于过来了多久以及流水的速率。
* Funnel 对象占据的空间大小不再和行为的频率成正比,它的空间占用是一个常量。
*/
public function makeSpace()
{
// 500毫秒
usleep(1000*1000);
$now = time();
//间隔上一次漏水过来了多久
$deltaTs = $now-$this->leakingTs;
//可腾出的空间 = 上次工夫 * 流水速率
$deltaQuota = $deltaTs * $this->leakingRate;
var_dump('deltaQuota ' . $deltaQuota);
if($deltaQuota < 1) {
return;
}
$this->leftQuote += $deltaQuota; //减少残余空间
$this->leakingTs = time(); //记录漏水工夫
if($this->leftQuote > $this->capacity){
$this->leftQuote - $this->capacity;
}
}
/**
* @param $quota 漏桶的空间
* @return bool
*/
public function watering($quota=1)
{
$this->makeSpace(); //漏水操作
if($this->leftQuote >= $quota) {
$this->leftQuote -= $quota;
return true;
}
return false;
}
}
function isActionAllowed($userId, $action, $capacity, $leakingRate)
{
static $funnels = [];
$key = sprintf("%s:%s", $userId, $action);
$funnel = $funnels[$key] ?? '';
if (!$funnel) {
$funnel = new Funnel($capacity, $leakingRate);
$funnels[$key] = $funnel;
}
return $funnel->watering();
}
for ($i=0; $i<20; $i++){
var_dump(isActionAllowed("110", "reply", 5, 0.5));
}
应用redis漏桶插件
Redis 4.0 提供了一个限流 Redis 模块,它叫 redis-cell。该模块也应用了漏斗算法,并提供了原子的限流指令。有了这个模块,限流问题就非常简单了。
该模块只有1条指令cl.throttle
,它的参数和返回值都略显简单,接下来让咱们来看看这个指令具体该如何应用。
> cl.throttle key 15(漏斗容量) 30 60(30 operations / 60 seconds 这是漏水速率) 1(need 1 quota,可选参数默认是1)
频率为每 60s 最多 30 次(漏水速率),漏斗的初始容量为 15,也就是说一开始能够间断回复 15 个帖子,而后才开始受漏水速率的影响。咱们看到这个指令中漏水速率变成了 2 个参数,代替了之前的单个浮点数。用两个参数相除的后果来表白漏水速率绝对单个浮点数要更加直观一些。
> cl.throttle key:reply 15 30 60
1) (integer) 0 # 0 示意容许,1示意回绝
2) (integer) 15 # 漏斗容量capacity
3) (integer) 14 # 漏斗残余空间left_quota
4) (integer) -1 # 如果回绝了,须要多长时间后再试(漏斗有空间了,单位秒)
5) (integer) 2 # 多长时间后,漏斗齐全空进去(left_quota==capacity,单位秒)
令牌桶限流
令牌桶算法(Token Bucket)和 Leaky Bucket 成果一样但方向相同的算法,更加容易了解.随着工夫流逝,零碎会按恒定1/QPS工夫距离(如果QPS=100,则距离是10ms)往桶里退出Token(设想和破绽漏水相同,有个水龙头在一直的加水),如果桶曾经满了就不再加了.新申请来长期,会各自拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务.
令牌桶的另外一个益处是能够不便的扭转速度. 一旦须要进步速率,则按需进步放入桶中的令牌的速率. 个别会定时(比方100毫秒)往桶中减少肯定数量的令牌, 有些变种算法则实时的计算应该减少的令牌的数量.
具体实现可参考php 基于redis应用令牌桶算法实现流量管制
<?php
class TrafficShaper
{
private $_config; // redis设定
private $_redis; // redis对象
private $_queue; // 令牌桶
private $_max; // 最大令牌数
/**
* 初始化
* @param Array $config redis连贯设定
*/
public function __construct($config , $queue , $max)
{
$this->_config = $config;
$this->_queue = $queue;
$this->_max = $max;
$this->_redis = $this->connect();
}
/**
* 退出令牌
* @param Int $num 退出的令牌数量
* @return Int 退出的数量
*/
public function add($num = 0)
{
// 以后残余令牌数
$curnum = intval($this->_redis->lSize($this->_queue));
// 最大令牌数
$maxnum = intval($this->_max);
// 计算最大可退出的令牌数量,不能超过最大令牌数
$num = $maxnum >= $curnum + $num ? $num : $maxnum - $curnum;
// 退出令牌
if ($num > 0) {
$token = array_fill(0 , $num , 1);
$this->_redis->lPush($this->_queue , ...$token);
return $num;
}
return 0;
}
/**
* 获取令牌
* @return Boolean
*/
public function get()
{
return $this->_redis->rPop($this->_queue) ? true : false;
}
/**
* 重设令牌桶,填满令牌
*/
public function reset()
{
$this->_redis->delete($this->_queue);
$this->add($this->_max);
}
private function connect()
{
try {
$redis = new Redis();
$redis->connect($this->_config['host'] , $this->_config['port'] , $this->_config['timeout'] , $this->_config['reserved'] , $this->_config['retry_interval']);
if (empty($this->_config['auth'])) {
$redis->auth($this->_config['auth']);
}
$redis->select($this->_config['index']);
} catch (\RedisException $e) {
throw new Exception($e->getMessage());
return false;
}
return $redis;
}
}
$config = [
'host' => '172.28.3.157' ,
'port' => 6379 ,
'index' => 0 ,
'auth' => '' ,
'timeout' => 1 ,
'reserved' => NULL ,
'retry_interval' => 100 ,
];
// 令牌桶容器
$queue = 'mycontainer';
// 最大令牌数
$max = 5;
// 创立TrafficShaper对象
$oTrafficShaper = new TrafficShaper($config , $queue , $max);
// 重设令牌桶,填满令牌
$oTrafficShaper->reset();
// 循环获取令牌,令牌桶内只有5个令牌,因而最初3次获取失败
for ($i = 0;$i < 8;$i ++) {
var_dump($oTrafficShaper->get());
}
// 退出10个令牌,最大令牌为5,因而只能退出5个
// 这里能够应用异步脚本来增加令牌。
$add_num = $oTrafficShaper->add(10);
var_dump($add_num);
// 循环获取令牌,令牌桶内只有5个令牌,因而最初1次获取失败
for ($i = 0;$i < 6;$i ++) {
var_dump($oTrafficShaper->get());
}
[info] 剖析下来应用lua脚本的计数器,最简略不便。
发表回复