共计 10665 个字符,预计需要花费 27 分钟才能阅读完成。
1. 应用场景
我们开发的接口服务系统很多都具有抗高并发,保证高可用的特性。现实条件下,随着流量的不断增加,在经费、硬件和资源受限的情况下,我们就需要为我们的系统服务制定有效的限流、分流策略来保护我们的系统了。
2. 算法简介和示例说明
业界比较流行的限流算法有漏桶算法和令牌桶算法。
2.1 漏桶算法
漏桶 (Leaky Bucket) 算法的实现思路比较简单,水 (请求) 先流入到桶中,然后桶以一定的速度出水 (接口有响应速率),当水流过大时(访问频率超过设置的阈值),系统服务就会拒绝请求。强行限制系统单位时间内访问的请求量。漏桶算法示意图如下:
漏桶算法有两个关键变量:桶的大小和出水速率,他们共同决定了单位时间内系统能接收的最大请求量。因为漏桶算法中桶的大小和出水速率是固定的参数。不能使流突发到端口,对存在突发特性的流量缺乏效率,什么意思呢?我们后边会使用使用 php 实现一个漏桶 demo,并对测试结果做详细说明。github 源码地址是:漏桶算法 demo
2.2 令牌桶算法
令牌桶 (Token Bucket) 和漏桶 (Leaky Bucket) 使用方向相反的算法,这种算法更加容易理解。随着时间的流逝,系统会按照恒定 1 /QPS(如果 QPS=1000,则时间间隔是 1ms)向桶中添加 Token(想象和漏洞漏水相反, 有个水龙头在不断的加水)。如果桶已经满了就不会添加了,请求到来时会尝试从桶中拿一个 Token, 如果拿不到 Token, 就阻塞或者拒绝服务,待下次有令牌时再去拿令牌。令牌桶的算法如下图所示例:
令牌桶的好处是显而易见的,我们可以通过提高放入桶中令牌的速率,改变请求的限制速度。令牌桶一般会定时的向桶中添加令牌(例如每隔 10ms 向桶中添加一枚令牌)。我们会使用 Go 语言实现一个令牌桶 demo,为了达到兼容分布式并发场景,我们会对令牌桶的 demo 做改进说明,我们在添加令牌时采用一种变种算法:等请求到达时根据令牌放入桶中的速率实时计算应该放入桶中令牌的数量。github 源码地址是:令牌桶算法 demo
2.3 示例说明
我们模拟实现的功能是限制一个公司下对某一个接口的访问频次,示例中是限制公司 org1 的员工列表接口 /user/list 在 1s 内能被外部访问 100 次。
3. 示例源码和压测结果
3.1 php 实现漏桶算法
Redis 中设置接口限制 1s 内访问 100 次的 hash:
hmset org1/user/list expire 1 limitReq 100
我们使用 Predis 连接 redis 进行操作,模拟接口比较简单,我们只获取两个参数,org 和 pathInfo,RateLimit 类中相关方法是:
<?php
/**
* Description: 漏桶限流
* User: guozhaoran<guozhaoran@cmcm.com>
* Date: 2019-06-13
*/
class RateLimit
{
private $conn = null; //redis 连接
private $org = ''; // 公司标识
private $pathInfo = ''; // 接口路径信息
/**
* RateLimit constructor.
* @param $org
* @param $pathInfo
* @param $expire
* @param $limitReq
*/
public function __construct($org, $pathInfo)
{$this->conn = $this->getRedisConn();
$this->org = $org;
$this->pathInfo = $pathInfo;
}
//...... 此处省略 getLuaScript 方法
/**
* 获取 redis 连接
* @return \Predis\Client
*/
private function getRedisConn()
{require_once('vendor/autoload.php');
$conn = new Predis\Client(['host' => '127.0.0.1',
'port' => 6379,]);
return $conn;
}
//...... 此处省略 isActionAllowed 方法
}
下边我们看看 Lua 脚本的设计:
/**
* 获取 lua 脚本
* @return string
*/
private function getLuaScript()
{
$luaScript = <<<LUA_SCRIPT
-- 限制接口访问频次
local times = redis.call('incr', KEYS[1]); -- 将 key 自增 1
if times == 1 then
redis.call('expire', KEYS[1], ARGV[1]) -- 给 key 设置过期时间
end
if times > tonumber(ARGV[2]) then
return 0
end
return 1
LUA_SCRIPT;
return $luaScript;
}
Lua 脚本可以打包到 Redis 服务端进行执行,因为 Redis 服务端 redis-server 在 2.6 版本默认内置了 Lua 解析器,php 的 Redis 客户端与 Lua 脚本交互主要传两个 KEYS 和 ARGV, 其中 KEYS 是对应 Redis 中操作的 key 值(示例中的 KEYS[1]就是 org1/user/list),ARGV 是要设置的属性参数。在 Lua 脚本中 Table 的索引是从 1 开始自增的,Lua 脚本执行 Redis 命令可以保证原子性 (因为 Redis 是单线程的),所以在并发竞态条件下也能保证 hash 的读写一致。 命令首先调用 incr 设置 org/user/list 记数,Redis 中的 list、set、hash、zset 这四种数据结构是容器型数据结构,他们共享下面两条通用规则:
- 1.create if not exists:如果容器不存在,那就创建一个再进行操作。比如 incr org/user/list 时,如果 org/user/list 不存在,就相当于设置了 org/user/list 为 1, 这就是为什么上边 Lua 脚本使用 expire 当 times 为 1 时设置 org/user/list 的过期时间
- 2.drop if no elements:如果容器里的元素没有了,那么立即删除容器,释放内存。比如 lpop 操作完一个 list 之后,list 中没有元素内容了,那么这个 list 也就不存在了
下边的逻辑就很明了了,就是看接口的调用累加次数有没有超限(限制频率通过 ARGV[2])进行判断,超限返回 0,否则返回 1.
下边我们就可以看看怎样 isActionAllowed 方法判断是否要进行限流了:
/**
* 判断接口是否限制访问
* @return bool
*/
public function isActionAllowed()
{
$pathInfo = $this->org . $this->pathInfo;
$config = $this->conn->hgetall($pathInfo);
// 配置中没有对接口进行限制
if (!$config) return true;
$pathInfoLimitKey = $this->org . '-' . $this->pathInfo;
try {$ret = $this->conn->evalsha(sha1($this->getLuaScript()), 1, $pathInfoLimitKey, $config['expire'], $config['limitReq']);
} catch (Exception $e) {$ret = $this->conn->eval($this->getLuaScript(), 1, $pathInfoLimitKey, $config['expire'], $config['limitReq']);
}
return boolval($ret);
}
Predis 使用 evalsha 打包 Lua 脚本发送到服务端执行。evalsha 的第一个参数是 sha1 编码后的 Lua 脚本。redis-server 可以对 Lua 脚本进行缓存,缓存的方法是 key:value 的形式,其中 key 是 sha1 后的 lua 脚本内容, 这样在 Lua 脚本比较大时,客户端只需要发送 sha1 后的值到 redis-server 就可以了,减小了每次发送命令内容的字节大小。如果 evalsha 报出错误信息可以改为 eval 函数,因为 redis-server 第一次接收到 lua 脚本,可能还没没有进行缓存。最好是使用 try…catch… 做一下兼容处理。evalsha 的第二个参数是 key 的个数,这里是一个,$pathInfoLimitKey,下边两个是从 Redis 中取出的配置值,标示 1s 内允许 $pathInfoLimitKey 被操作 100 次。如果没有对 $pathInfoLimitKey 做配置限制频率,默认不受限。
以上就是 rateLimit 类的全部内容了,思路比较简单,下边简单看一下入口文件,也比较简单,就是接收参数,然后将接口是否受限的信息写到 stat.log 日志文件中去。
<?php
/**
* Description: 漏斗限流入口文件
* User: guozhaoran<guozhaoran@cmcm.com>
* Date: 2019-06-16
*/
require_once('./RateLimit.php');
ini_set('display_errors', true);
$org = $_GET['org'];
$pathInfo = $_GET['path_info'];
$result = (new RateLimit($org, $pathInfo))->isActionAllowed();
$handler = fopen('./stat.log', 'a') or die('can not open file!');
if ($result) {fwrite($handler, 'request success!' . PHP_EOL);
} else {fwrite($handler, 'request failed!' . PHP_EOL);
}
fclose($handler);
我们通过 ab 工具压测一下接口信息,程序限制 1s 内允许 100 次访问,我们就开 10 个客户端同时请求 110 次,理论上应该是前一百次是成功的,后十次是失败的,命令为:
ab -n 110 -c 10 http://localhost/demo/rateLimit/index.php\?org\=org1\&path_info\=/user/list
stat.log 中的日志信息和我们预期中的一样,说明我们的接口频次设置达到了预期效果:
...// 此处省略 96 行
request success!
request success!
request success!
request success!
request failed!
request failed!
request failed!
request failed!
request failed!
request failed!
request failed!
request failed!
request failed!
request failed!
但是漏斗限流还是有一些缺点的,它不支持突发流量,我们接口设置 1s 内限制访问 100 次,假如说前 900 毫秒只有 80 次访问,突然在接下来的 100 毫秒来了 50 次访问,那么毫无疑问,后边 30 次访问是失败的。不过漏斗这种简单粗暴的限流处理方案对于流量集中性访问,比如 (1 分钟只允许访问 1000 次) 还是非常适合的。
3.2 go 语言实现令牌桶算法
我们首先不考虑竞态条件,用 go 语言实现一个 v1 版本的令牌桶来体会一下它的算法思想。我们新建一个 funnel 模块,定义一个结构体,包含了令牌桶需要的属性:
package funnel
import (
"math"
"time"
)
type Funnel struct {
Capacity int64 // 令牌桶容量
LeakingRate float64 // 令牌桶流水速率: 每毫秒向令牌桶中添加的令牌数
RemainingCapacity int64 // 令牌桶剩余空间
LastLeakingTime int64 // 上次流水 (放入令牌) 时间: 毫秒时间戳
Funnel 结构体支持导出,分别包含令牌桶的容量、向令牌桶中添加令牌的速率、令牌桶剩余空间
和上次放入令牌时间的四个属性。
我们采用请求进来时实时改变令牌桶状态的思路,改变令牌桶状态的方法如下:
// 有请求时更新令牌桶的状态, 主要是令牌桶剩余空间和记录取走 Token 的时间戳
func (rateLimit *Funnel) updateFunnelStatus() {nowTs := time.Now().UnixNano() / int64(time.Millisecond)
// 距离上一次取走令牌已经过去了多长时间
timeDiff := nowTs - rateLimit.LastLeakingTime
// 根据时间差和流水速率计算需要向令牌桶中添加多少令牌
needAddSpace := int64(math.Floor(rateLimit.LeakingRate * float64(timeDiff)))
// 不需要添加令牌
if needAddSpace < 1 {return}
rateLimit.RemainingCapacity += needAddSpace
// 添加的令牌不能大于令牌桶的剩余空间
if rateLimit.RemainingCapacity > rateLimit.Capacity {rateLimit.RemainingCapacity = rateLimit.Capacity}
// 更新上次令牌桶流水 (添加令牌) 时间戳
rateLimit.LastLeakingTime = nowTs
}
因为要改变令牌桶的状态,所以我们这里使用指针接收者为结构体 Funnel 定义方法。主要思路就是根据当前时间和上次放入令牌桶中令牌的时间戳,再结合每毫秒应该放入令牌桶中令牌,计算添加应该放入到令牌桶中的令牌,放入令牌后不能超过令牌桶本身容量的大小。然后取出令牌,更新上次添加令牌时间戳。
判断接口是否限流其实就是看能不能从令牌桶中取出令牌,方法如下:
// 判断接口是否被限流
func (rateLimit *Funnel) IsActionAllowed() bool {
// 更新令牌桶状态
rateLimit.updateFunnelStatus()
if rateLimit.RemainingCapacity < 1 {return false}
rateLimit.RemainingCapacity = rateLimit.RemainingCapacity - 1
return true
}
到了这里,相信读者已经对令牌桶算法有了一个比较清晰的认识了。我们再来说问题,因为限流最终还是要通过操作 Redis 来实现的,我们首先来在 Redis 里初始化好接口限流的配置:
hmset org2/user/list Capacity 100 LeakingRate 0.1 RemainingCapacity 0 LastLeakingTime 1560789716896
我们设置公司二 (org2) 的接口(/user/list)令牌桶容量 100,每隔 10ms 放入一令牌(计算方法 10/1000)。我们将 Funnel 对象内容的字段存储到一个 hash 结构中,我们在计算是否限流的时候需要从 hash 结构中取值,在内存中做运算,再回填到 hash 结构,尤其对于 go 语言这种天然并发的程序来讲,我们无法保证整个过程的原子化(这就是为什么要使用 Lua 脚本的原因,因为如果用程序来实现,就需要加锁,一旦加锁就有加锁失败的可能,失败只能选择重试或放弃,重试会导致性能下降,放弃会影响用户体验,代码复杂度会增加不少)。我们 V2 版本还是会选择使用 Lua 脚本来实现:具体调研过程如下:
方案 | 特点 |
---|---|
单服务对操作采用锁机制 | 文章有提到,这种只能保证单节点下串行且性能差 |
Redis 原子操作 incr | 这种方案我们在漏斗模型中有使用,它只能应对简单的场景,涉及到复杂场景就比较难处理 |
Redis 分布式事务 | 虽然 Redis 的分布式事务能保证原子操作,但是实现复杂,并且网络开销大,需要大量的网络传输 |
Redis+Lua | 这里就不得不夸一下这种方案了,Lua 脚本中运行在 Redis 中,redis 又是单线程的,因此能保证操作的串行。另外:减少网络开销,前边我们提到过,Lua 代码包装的命令不需要发送多次命令请求,Redis 可以对 Lua 脚本进行缓存,减少了网络传输, 另外其他的客户端也可以使用缓存 |
补充一点:Redis4.0 提供了一个限流模块 Redis 模块,它叫 Redis-Cell。该模块也使用了漏斗算法,并提供了原子的限流命令,重试机制也非常简单,有兴趣的可以研究一下。我们这里还是使用 Lua + Redis 解决方案,废话少说,上 V2 版本的代码:
const luaScript = `
-- 接口限流
-- last_leaking_time 最后访问时间的毫秒
-- remaining_capacity 当前令牌桶中可用请求令牌数
-- capacity 令牌桶容量
-- leaking_rate 令牌桶添加令牌的速率
-- 把发生数据变更的命令以事务的方式做持久化和主从复制(Redis4.0 支持)
redis.replicate_commands()
-- 获取令牌桶的配置信息
local rate_limit_info = redis.call("HGETALL", KEYS[1])
-- 获取当前时间戳
local timestamp = redis.call("TIME")
local now = math.floor((timestamp[1] * 1000000 + timestamp[2]) / 1000)
if rate_limit_info == nil then -- 没有设置限流配置, 则默认拿到令牌
return now * 10 + 1
end
local capacity = tonumber(rate_limit_info[2])
local leaking_rate = tonumber(rate_limit_info[4])
local remaining_capacity = tonumber(rate_limit_info[6])
local last_leaking_time = tonumber(rate_limit_info[8])
-- 计算需要补给的令牌数, 更新令牌数和补给时间戳
local supply_token = math.floor((now - last_leaking_time) * leaking_rate)
if (supply_token > 0) then
last_leaking_time = now
remaining_capacity = supply_token + remaining_capacity
if remaining_capacity > capacity then
remaining_capacity = capacity
end
end
local result = 0 -- 返回结果是否能够拿到令牌, 默认否
-- 计算请求是否能够拿到令牌
if (remaining_capacity > 0) then
remaining_capacity = remaining_capacity - 1
result = 1
end
-- 更新令牌桶的配置信息
redis.call("HMSET", KEYS[1], "RemainingCapacity", remaining_capacity, "LastLeakingTime", last_leaking_time)
return now * 10 + result
`
我们这段脚本返回一个 int64 类型的整数,最后一位 0 或 1 表示是否要对接口限流,前边的数字表示毫秒时间戳,将来记录到日志里进行压测统计使用。程序运行时当前时间戳我是调用 Redis 的 time 命令计算获得的,原因有二:
- Lua 命令获得当前时间戳只能精确到秒,而 Redis 确可以精确到纳秒。
- 如果时间戳作为脚本调用参数(go 程序)传进来会有问题,因为脚本传参到 Lua 在 Redis 中执行还有一段时间误差,不能保证最先被接收到的请求先被处理,而 Lua 中获取时间戳可以保证请求、时间串行
和以前一样,没有设置限流配置,就默认可以请求。
然后根据时间戳补给令牌,计算是否能够取到令牌,然后更新令牌状态,思路和 V1 版本一样,读者可自行阅读。说明一点,脚本开始处的 redis.replicate_commands()命令是因为 Redis 低版本不支持对 Redis 既读又写,所以这种方式还是存在版本兼容性,但是解决办法确是最完美的。
接下来我们看 go 逻辑代码:
func main() {http.HandleFunc("/user/list", handleReq)
http.ListenAndServe(":8082", nil)
}
// 初始化 redis 连接池
func newPool() *redis.Pool {
return &redis.Pool{
MaxIdle: 80,
MaxActive: 12000, // max number of connections
Dial: func() (redis.Conn, error) {c, err := redis.Dial("tcp", ":6379")
if err != nil {panic(err.Error())
}
return c, err
},
}
}
// 写入日志
func writeLog(msg string, logPath string) {fd, _ := os.OpenFile(logPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
defer fd.Close()
content := strings.Join([]string{msg, "\r\n"}, "")
buf := []byte(content)
fd.Write(buf)
}
// 处理请求函数, 根据请求将响应结果信息写入日志
func handleReq(w http.ResponseWriter, r *http.Request) {
// 获取 url 信息
pathInfo := r.URL.Path
// 获取 get 传递的公司信息 org
orgInfo, ok := r.URL.Query()["org"]
if !ok || len(orgInfo) < 1 {fmt.Println("Param org is missing!")
}
// 调用 lua 脚本原子性进行接口限流统计
conn := newPool().Get()
key := orgInfo[0] + pathInfo
lua := redis.NewScript(1, luaScript)
reply, err := redis.Int64(lua.Do(conn, key))
if err != nil {fmt.Println(err)
return
}
// 接口是否被限制访问
isLimit := bool(reply % 10 == 1)
reqTime := int64(math.Floor(float64(reply) / 10))
// 将统计结果写入日志当中
if !isLimit {successLog := strconv.FormatInt(reqTime, 10) + "request failed!"
writeLog(successLog, "./stat.log")
return
}
failedLog := strconv.FormatInt(reqTime, 10) + "request success!"
writeLog(failedLog, "./stat.log")
}
脚本监听本地 8082 端口,使用 go 的 redis 框架 redigo 来操作 redis,我们初始化了一个 redis 连接池,从连接池中取得连接进行操作。我们分析如下代码:
lua := redis.NewScript(1, luaScript)
reply, err := redis.Int64(lua.Do(conn, key))
NewScript 中第一个参数代表要操作 Redis 的 key 的个数,这点和 Predis 的 evalsha 第二个参数类似。然后采用 Do 方法执行脚本,返回值使用 redis.Int64 做处理,然后进行运算判断接口是否允许被访问,然后将访问时间和结果写入到 stat.log 日志文件中。
逻辑还是非常的简单,我们主要看压测结果,启动代码,使用 ab 压测命令执行:
ab -n 110 -c 10 http://127.0.0.1:8082/user/list\?org\=org2
然后我们分析 stat.log 日志兴许会有些惊讶:
1561263349294 request success! // 第一行日志
...// 省略 95 行
1561263349387 request success!
1561263349388 request success!
1561263349398 request success!
1561263349396 request success!
1561263349404 request success!
1561263349407 request success!
1561263349406 request success!
1561263349406 request success!
1561263349407 request success!
1561263349406 request success!
1561263349406 request success!
1561263349405 request success!
1561263349406 request success!
1561263349406 request success!
1561263349406 request success!
是的,都成功了,为什么呢?我们看统计时间会发现执行这 100 个请求总共用了 110 毫秒,在程序执行过程中,每隔 10ms 会向令牌桶中添加一个令牌,一共添加了 11 个令牌,所以 110 次请求都拿到了令牌。可以看出令牌桶适用于大流量下的限流,可以保证流量按照时间均匀分摊,避免出现流量的集中式爆发访问。
4. 简单总结
到此为止,已经给大家介绍了限流的必要性以及常用限流手段与程序实现。相信大家对分步式限流有了一个初步的了解。下面做一个简单的总结:
算法 | 场景 |
---|---|
令牌桶 | 适用于大流量下的访问,可以保证流量按时间均匀分摊,避免出现流量集中爆发式访问 |
漏桶 | 简单粗暴,对于大流量下限流有很好的效果,尤其适合于单位时间内限制请求的业务,对突发流量的不能有很好的应对 |