咱们目前在工作中遇到一个性能问题,咱们有个定时工作须要解决大量的数据,为了晋升吞吐量,所以部署了很多台机器,但这个工作在运行前须要从别的服务那拉取大量的数据,随着数据量的增大,如果同时多台机器并发拉取数据,会对上游服务产生十分大的压力。之前曾经减少了单机限流,但无奈解决问题,因为这个数据工作运行中只有不到 10% 的工夫拉取数据,如果单机限流限度太狠,尽管集群总的申请量管制住了,但工作吞吐量又降下来。如果限流阈值太高,多机并发的时候,还是有可能压垮上游。所以目前惟一可行的解决方案就是 分布式限流。
我目前是抉择间接应用 Redisson 库中的 RRateLimiter 实现了分布式限流,对于 Redission 可能很多人都有所耳闻,它其实是在 Redis 能力上构建的开发库,除了反对 Redis 的根底操作外,还封装了布隆过滤器、分布式锁、限流器……等工具。明天要说的 RRateLimiter 及时其实现的限流器。接下来本文将具体介绍下 RRateLimiter 的具体应用形式、实现原理还有一些注意事项,最初简略谈谈我对分布式限流底层原理的了解。
RRateLimiter 应用
RRateLimiter 的应用形式异样的简略,参数也不多。只有创立出 RedissonClient,就能够从 client 中获取到 RRateLimiter 对象,间接看代码示例。
RedissonClient redissonClient = Redisson.create();
RRateLimiter rateLimiter = redissonClient.getRateLimiter("xindoo.limiter");
rateLimiter.trySetRate(RateType.OVERALL, 100, 1, RateIntervalUnit.HOURS);
rateLimiter.trySetRate 就是设置限流参数,RateType 有两种,OVERALL 是全局限流,PER_CLIENT 是单 Client 限流(能够认为就是单机限流),这里咱们只探讨全局模式。而前面三个参数的作用就是设置在多长时间窗口内(rateInterval+IntervalUnit),许可总量不超过多少(rate),下面代码中我设置的值就是 1 小时内总许可数不超过 100 个。而后调用 rateLimiter 的 tryAcquire()或者 acquire()办法即可获取许可。
rateLimiter.acquire(1); // 申请 1 份许可,直到胜利
boolean res = rateLimiter.tryAcquire(1, 5, TimeUnit.SECONDS); // 申请 1 份许可,如果 5s 内未申请到就放弃
应用起来还是很简略的嘛,以上代码中的两种形式都是同步调用,但 Redisson 还同样提供了异步办法 acquireAsync()和 tryAcquireAsync(),应用其返回的 RFuture 就能够异步获取许可。
RRateLimiter 的实现
接下来咱们顺着 tryAcquire()办法来看下它的实现形式,在 RedissonRateLimiter 类中,咱们能够看到最底层的 tryAcquireAsync()办法。
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {byte[] random = new byte[8];
ThreadLocalRandom.current().nextBytes(random);
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"——————————————————————————————————————"
+ "这里是一大段 lua 代码"
+ "____________________________________",
Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()),
value, System.currentTimeMillis(), random);
}
映入眼帘的就是一大段 lua 代码,其实这段 Lua 代码就是限流实现的外围,我把这段 lua 代码摘出来,并加了一些正文,咱们来具体看下。
local rate = redis.call("hget", KEYS[1], "rate") # 100
local interval = redis.call("hget", KEYS[1], "interval") # 3600000
local type = redis.call("hget", KEYS[1], "type") # 0
assert(rate ~= false and interval ~= false and type ~= false, "RateLimiter is not initialized")
local valueName = KEYS[2] # {xindoo.limiter}:value 用来存储残余许可数量
local permitsName = KEYS[4] # {xindoo.limiter}:permits 记录了所有许可收回的工夫戳
# 如果是单实例模式,name 信息前面就须要拼接上 clientId 来辨别进去了
if type == "1" then
valueName = KEYS[3] # {xindoo.limiter}:value:b474c7d5-862c-4be2-9656-f4011c269d54
permitsName = KEYS[5] # {xindoo.limiter}:permits:b474c7d5-862c-4be2-9656-f4011c269d54
end
# 对参数校验
assert(tonumber(rate) >= tonumber(ARGV[1]), "Requested permits amount could not exceed defined rate")
# 获取以后还有多少许可
local currentValue = redis.call("get", valueName)
local res
# 如果有记录以后还残余多少许可
if currentValue ~= false then
# 回收已过期的许可数量
local expiredValues = redis.call("zrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval)
local released = 0
for i, v in ipairs(expiredValues) do
local random, permits = struct.unpack("Bc0I", v)
released = released + permits
end
# 清理已过期的许可记录
if released > 0 then
redis.call("zremrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval)
if tonumber(currentValue) + released > tonumber(rate) then
currentValue = tonumber(rate) - redis.call("zcard", permitsName)
else
currentValue = tonumber(currentValue) + released
end
redis.call("set", valueName, currentValue)
end
# ARGV permit timestamp random,random 是一个随机的 8 字节
# 如果残余许可不够,须要在 res 中返回下个许可须要期待多长时间
if tonumber(currentValue) < tonumber(ARGV[1]) then
local firstValue = redis.call("zrange", permitsName, 0, 0, "withscores")
res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]))
else
redis.call("zadd", permitsName, ARGV[2], struct.pack("Bc0I", string.len(ARGV[3]), ARGV[3], ARGV[1]))
# 减小可用许可量
redis.call("decrby", valueName, ARGV[1])
res = nil
end
else # 反之,记录到还有多少许可,阐明是首次应用或者之前已记录的信息曾经过期了,就将配置 rate 写进去,并缩小许可数
redis.call("set", valueName, rate)
redis.call("zadd", permitsName, ARGV[2], struct.pack("Bc0I", string.len(ARGV[3]), ARGV[3], ARGV[1]))
redis.call("decrby", valueName, ARGV[1])
res = nil
end
local ttl = redis.call("pttl", KEYS[1])
# 重置
if ttl > 0 then
redis.call("pexpire", valueName, ttl)
redis.call("pexpire", permitsName, ttl)
end
return res
即使是加了正文,置信你还是很难一下子看懂这段代码的,接下来我就以其在 Redis 中的数据存储模式,然辅以流程图让大家彻底理解其实现实现原理。
首先用 RRateLimiter 有个 name,在我代码中就是xindoo.limiter
,用这个作为 KEY 你就能够在 Redis 中找到一个 map,外面存储了 limiter 的工作模式(type)、可数量(rate)、工夫窗口大小(interval),这些都是在 limiter 创立时写入到的 redis 中的,在下面的 lua 代码中也应用到了。
其次还俩很重要的 key,valueName 和 permitsName,其中在我的代码实现中 valueName 是{xindoo.limiter}:value
,它存储的是以后可用的许可数量。我代码中 permitsName 的具体值是{xindoo.limiter}:permits
,它是一个 zset,其中存储了以后所有的许可受权记录(含有许可受权工夫戳),其中 SCORE 间接应用了工夫戳,而 VALUE 中蕴含了 8 字节的随机值和许可的数量,如下图:
{xindoo.limiter}:permits 这个 zset 中存储了所有的历史受权记录,直到了这些信息,置信你也就了解了 RRateLimiter 的实现原理,咱们还是将下面的那大段 Lua 代码的流程图绘制进去,整个执行的流程会更直观。
看到这大家应该能了解这段 Lua 代码的逻辑了,能够看到 Redis 用了多个字段来存储限流的信息,也有各种各样的操作,那 Redis 是如何保障在分布式下这些限流信息数据的一致性的?答案是不须要保障,在这个场景下,信息人造就是一致性的。起因是 Redis 的单过程数据处理模型,在同一个 Key 下,所有的 eval 申请都是串行的,所有不须要思考数据并发操作的问题。在这里,Redisson 也应用了 HashTag,保障所有的限流信息都存储在同一个 Redis 实例上。
RRateLimiter 应用时注意事项
理解了 RRateLimiter 的底层原理,再联合 Redis 本身的个性,我想到了 RRateLimiter 应用的几个局限点(问题点)。
RRateLimiter 是非偏心限流器
这个是我查阅材料得悉,并且在本人代码实际的过程中也失去了验证,具体表现就是如果多个实例 (机器) 取竞争这些许可,很可能某些实例会获取到大部分,而另外一些实例可怜巴巴仅获取到大量的许可,也就是说容易呈现旱的旱死 涝的涝死的状况。在应用过程中,你就必须思考你是否承受这种状况,如果不能承受就得思考用某些形式尽可能让其变偏心。
Rate 不要设置太大
从 RRateLimiter 的实现原理你也看出了,它采纳的是滑动窗口的模式来限流的,而且记录了所有的许可受权信息,所以如果你设置的 Rate 值过大,在 Redis 中存储的信息 (permitsName 对应的 zset) 也就越多,每次执行那段 lua 脚本的性能也就越差,这对 Redis 实例也是一种压力。集体倡议如果你是想设置较大的限流阈值,偏向于小 Rate+ 小工夫窗口的形式,而且这种设置形式申请也会更平均一些。
限流的下限取决于 Redis 单实例的性能
从原理上看,RRateLimiter 在 Redis 上所存储的信息都必须在一个 Redis 实例上,所以它的限流 QPS 的下限就是 Redis 单实例的下限,比方你 Redis 实例就是 1w QPS,你想用 RRateLimiter 实现一个 2w QPS 的限流器,必然实现不了。那有没有冲破 Redis 单实例性能下限的形式?单限流器必定是实现不了的,咱们能够拆分多个限流器,比方我搞 10 个限流器,名词用不一样的,而后每台机器随机应用一个限流器限流,理论的流量不就被扩散到不同的限流器上了吗,总的限流上线不也就上来了。
分布式限流的实质
分布式限流的实质实际上就是协同,协同的实质就是信息替换,信息替换最重要的的就是信息的准确性和一致性。 更简略粗犷了解,分布式限流的实质原理其实还是分布式数据一致性的原理,而限流只是数据后果的一种决策。所以只有以任何形式能让信息同步,且保障信息的正确性就能够实现一个分布式限流器了,这就是我了解的实质思路。
其实从下面的 RRateLimiter 的实现原理也能够看进去,它不就是存储了一些信息吗!那我不必 Redis,而是应用 mysql 行不行。理论必定是能够的,只有将的下面 Lua 代码中的所有操作都放到一个事务里,且事务的级别改成串行化,仍旧能实现 RRateLimiter 同样的性能。如果你具备 Mysql 相干常识的话,必定也能基于 Mysql 将 RRateLimiter 的 API 封装进去,然而封装进去的限流器,其限流的下限就取决于 Mysql 实例的性能下限。
最近 chatGPT 比拟火,我也问了下它对分布式限流实质原理的了解,上面是它的答复,大家感觉怎么样?
分布式限流的实质原理是通过在分布式系统中共享限流状态来限度零碎中单位工夫内的申请数量,从而防止零碎因流量过大而解体。
这是通过应用一些共享的存储组件,如数据库,缓存,分布式锁等来实现的。在每次申请时,零碎会查看以后的申请数量是否超过了事后设定的限度,如果超过了限度,申请就会被回绝;如果未超过限度,申请就会被容许。
通过应用分布式限流技术,零碎能够在高并发状况下保持稳定的性能,并防止因流量过大而导致的零碎解体。