咱们目前在工作中遇到一个性能问题,咱们有个定时工作须要解决大量的数据,为了晋升吞吐量,所以部署了很多台机器,但这个工作在运行前须要从别的服务那拉取大量的数据,随着数据量的增大,如果同时多台机器并发拉取数据,会对上游服务产生十分大的压力。之前曾经减少了单机限流,但无奈解决问题,因为这个数据工作运行中只有不到10%的工夫拉取数据,如果单机限流限度太狠,尽管集群总的申请量管制住了,但工作吞吐量又降下来。如果限流阈值太高,多机并发的时候,还是有可能压垮上游。 所以目前惟一可行的解决方案就是分布式限流

  我目前是抉择间接应用Redisson库中的RRateLimiter实现了分布式限流,对于Redission可能很多人都有所耳闻,它其实是在Redis能力上构建的开发库,除了反对Redis的根底操作外,还封装了布隆过滤器、分布式锁、限流器……等工具。明天要说的RRateLimiter及时其实现的限流器。接下来本文将具体介绍下RRateLimiter的具体应用形式、实现原理还有一些注意事项,最初简略谈谈我对分布式限流底层原理的了解。

RRateLimiter应用

  RRateLimiter的应用形式异样的简略,参数也不多。只有创立出RedissonClient,就能够从client中获取到RRateLimiter对象,间接看代码示例。

RedissonClient redissonClient = Redisson.create();RRateLimiter rateLimiter = redissonClient.getRateLimiter("xindoo.limiter");rateLimiter.trySetRate(RateType.OVERALL, 100, 1, RateIntervalUnit.HOURS); 

  rateLimiter.trySetRate就是设置限流参数,RateType有两种,OVERALL是全局限流 ,PER_CLIENT是单Client限流(能够认为就是单机限流),这里咱们只探讨全局模式。而前面三个参数的作用就是设置在多长时间窗口内(rateInterval+IntervalUnit),许可总量不超过多少(rate),下面代码中我设置的值就是1小时内总许可数不超过100个。而后调用rateLimiter的tryAcquire()或者acquire()办法即可获取许可。

rateLimiter.acquire(1); // 申请1份许可,直到胜利boolean res = rateLimiter.tryAcquire(1, 5, TimeUnit.SECONDS); // 申请1份许可,如果5s内未申请到就放弃

  应用起来还是很简略的嘛,以上代码中的两种形式都是同步调用,但Redisson还同样提供了异步办法acquireAsync()和tryAcquireAsync(),应用其返回的RFuture就能够异步获取许可。

RRateLimiter的实现

  接下来咱们顺着tryAcquire()办法来看下它的实现形式,在RedissonRateLimiter类中,咱们能够看到最底层的tryAcquireAsync()办法。

    private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {        byte[] random = new byte[8];        ThreadLocalRandom.current().nextBytes(random);        return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,                "——————————————————————————————————————"                + "这里是一大段lua代码"                + "____________________________________",                Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()),                value, System.currentTimeMillis(), random);    }

  映入眼帘的就是一大段lua代码,其实这段Lua代码就是限流实现的外围,我把这段lua代码摘出来,并加了一些正文,咱们来具体看下。

local rate = redis.call("hget", KEYS[1], "rate")  # 100 local interval = redis.call("hget", KEYS[1], "interval")  # 3600000local type = redis.call("hget", KEYS[1], "type")  # 0assert(rate ~= false and interval ~= false and type ~= false, "RateLimiter is not initialized")local valueName = KEYS[2]      # {xindoo.limiter}:value 用来存储残余许可数量local permitsName = KEYS[4]    # {xindoo.limiter}:permits 记录了所有许可收回的工夫戳  # 如果是单实例模式,name信息前面就须要拼接上clientId来辨别进去了if type == "1" then    valueName = KEYS[3]        # {xindoo.limiter}:value:b474c7d5-862c-4be2-9656-f4011c269d54    permitsName = KEYS[5]      # {xindoo.limiter}:permits:b474c7d5-862c-4be2-9656-f4011c269d54end# 对参数校验 assert(tonumber(rate) >= tonumber(ARGV[1]), "Requested permits amount could not exceed defined rate")# 获取以后还有多少许可 local currentValue = redis.call("get", valueName)   local res# 如果有记录以后还残余多少许可 if currentValue ~= false then    # 回收已过期的许可数量    local expiredValues = redis.call("zrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval)    local released = 0    for i, v in ipairs(expiredValues) do        local random, permits = struct.unpack("Bc0I", v)        released = released + permits    end    # 清理已过期的许可记录    if released > 0 then        redis.call("zremrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval)        if tonumber(currentValue) + released > tonumber(rate) then            currentValue = tonumber(rate) - redis.call("zcard", permitsName)        else            currentValue = tonumber(currentValue) + released        end        redis.call("set", valueName, currentValue)    end    # ARGV  permit  timestamp  random, random是一个随机的8字节    # 如果残余许可不够,须要在res中返回下个许可须要期待多长时间     if tonumber(currentValue) < tonumber(ARGV[1]) then        local firstValue = redis.call("zrange", permitsName, 0, 0, "withscores")        res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]))    else        redis.call("zadd", permitsName, ARGV[2], struct.pack("Bc0I", string.len(ARGV[3]), ARGV[3], ARGV[1]))        # 减小可用许可量         redis.call("decrby", valueName, ARGV[1])        res = nil    endelse # 反之,记录到还有多少许可,阐明是首次应用或者之前已记录的信息曾经过期了,就将配置rate写进去,并缩小许可数     redis.call("set", valueName, rate)    redis.call("zadd", permitsName, ARGV[2], struct.pack("Bc0I", string.len(ARGV[3]), ARGV[3], ARGV[1]))    redis.call("decrby", valueName, ARGV[1])    res = nilendlocal ttl = redis.call("pttl", KEYS[1])# 重置if ttl > 0 then    redis.call("pexpire", valueName, ttl)    redis.call("pexpire", permitsName, ttl)endreturn res

  即使是加了正文,置信你还是很难一下子看懂这段代码的,接下来我就以其在Redis中的数据存储模式,然辅以流程图让大家彻底理解其实现实现原理。

  首先用RRateLimiter有个name,在我代码中就是xindoo.limiter,用这个作为KEY你就能够在Redis中找到一个map,外面存储了limiter的工作模式(type)、可数量(rate)、工夫窗口大小(interval),这些都是在limiter创立时写入到的redis中的,在下面的lua代码中也应用到了。

  其次还俩很重要的key,valueName和permitsName,其中在我的代码实现中valueName是{xindoo.limiter}:value ,它存储的是以后可用的许可数量。我代码中permitsName的具体值是{xindoo.limiter}:permits,它是一个zset,其中存储了以后所有的许可受权记录(含有许可受权工夫戳),其中SCORE间接应用了工夫戳,而VALUE中蕴含了8字节的随机值和许可的数量,如下图:


  {xindoo.limiter}:permits这个zset中存储了所有的历史受权记录,直到了这些信息,置信你也就了解了RRateLimiter的实现原理,咱们还是将下面的那大段Lua代码的流程图绘制进去,整个执行的流程会更直观。

  看到这大家应该能了解这段Lua代码的逻辑了,能够看到Redis用了多个字段来存储限流的信息,也有各种各样的操作,那Redis是如何保障在分布式下这些限流信息数据的一致性的?答案是不须要保障,在这个场景下,信息人造就是一致性的。起因是Redis的单过程数据处理模型,在同一个Key下,所有的eval申请都是串行的,所有不须要思考数据并发操作的问题。在这里,Redisson也应用了HashTag,保障所有的限流信息都存储在同一个Redis实例上。

RRateLimiter应用时注意事项

  理解了RRateLimiter的底层原理,再联合Redis本身的个性,我想到了RRateLimiter应用的几个局限点(问题点)。

RRateLimiter是非偏心限流器

  这个是我查阅材料得悉,并且在本人代码实际的过程中也失去了验证,具体表现就是如果多个实例(机器)取竞争这些许可,很可能某些实例会获取到大部分,而另外一些实例可怜巴巴仅获取到大量的许可,也就是说容易呈现旱的旱死 涝的涝死的状况。在应用过程中,你就必须思考你是否承受这种状况,如果不能承受就得思考用某些形式尽可能让其变偏心。

Rate不要设置太大

  从RRateLimiter的实现原理你也看出了,它采纳的是滑动窗口的模式来限流的,而且记录了所有的许可受权信息,所以如果你设置的Rate值过大,在Redis中存储的信息(permitsName对应的zset)也就越多,每次执行那段lua脚本的性能也就越差,这对Redis实例也是一种压力。集体倡议如果你是想设置较大的限流阈值,偏向于小Rate+小工夫窗口的形式,而且这种设置形式申请也会更平均一些。

限流的下限取决于Redis单实例的性能

  从原理上看,RRateLimiter在Redis上所存储的信息都必须在一个Redis实例上,所以它的限流QPS的下限就是Redis单实例的下限,比方你Redis实例就是1w QPS,你想用RRateLimiter实现一个2w QPS的限流器,必然实现不了。 那有没有冲破Redis单实例性能下限的形式?单限流器必定是实现不了的,咱们能够拆分多个限流器,比方我搞10个限流器,名词用不一样的,而后每台机器随机应用一个限流器限流,理论的流量不就被扩散到不同的限流器上了吗,总的限流上线不也就上来了。

分布式限流的实质

   分布式限流的实质实际上就是协同,协同的实质就是信息替换,信息替换最重要的的就是信息的准确性和一致性。 更简略粗犷了解,分布式限流的实质原理其实还是分布式数据一致性的原理,而限流只是数据后果的一种决策。所以只有以任何形式能让信息同步,且保障信息的正确性就能够实现一个分布式限流器了,这就是我了解的实质思路。

   其实从下面的RRateLimiter的实现原理也能够看进去,它不就是存储了一些信息吗! 那我不必Redis,而是应用mysql行不行。理论必定是能够的,只有将的下面Lua代码中的所有操作都放到一个事务里,且事务的级别改成串行化,仍旧能实现RRateLimiter同样的性能。如果你具备Mysql相干常识的话,必定也能基于Mysql将RRateLimiter的API封装进去,然而封装进去的限流器,其限流的下限就取决于Mysql实例的性能下限。

   最近chatGPT比拟火,我也问了下它对分布式限流实质原理的了解,上面是它的答复,大家感觉怎么样?

分布式限流的实质原理是通过在分布式系统中共享限流状态来限度零碎中单位工夫内的申请数量,从而防止零碎因流量过大而解体。

这是通过应用一些共享的存储组件,如数据库,缓存,分布式锁等来实现的。在每次申请时,零碎会查看以后的申请数量是否超过了事后设定的限度,如果超过了限度,申请就会被回绝;如果未超过限度,申请就会被容许。

通过应用分布式限流技术,零碎能够在高并发状况下保持稳定的性能,并防止因流量过大而导致的零碎解体。