乐趣区

关于限流:谈谈限流算法以及Redisson实现

1. 限流的意义

明天谈谈限流。很早之前,在接触像 hystrix、resilience4j、sentinel 这类的熔断器组件时,就理解过其对于限流的性能。在理论开发利用中,超时、谬误熔断用的挺多,但限流熔断用的到不多。

究其原因,在公司外部微服务调用时,就算服务调用的上下游服务,不是同一个我的项目团队的服务,但至多是同一个公司的研发团队。当避免上游方被频繁调用,齐全能够和上游方约定好协同计划,而不是通过限流的策略给上游方抛错。

但如果上下游方比拟独立,则有必要通过限流来进行束缚,和自我爱护。

例如:咱们产品对外提供的服务端凋谢 API,如果不在文档中约定好调用频率限度,并做好自我爱护,很容易就被人歹意攻打。

再例如:咱们在对接钉钉、微信等生态服务时,也须要调用它们在开放平台的 API,同样也无限流要求。可参考 钉钉服务端 API 限流文档。因为一旦调用钉钉 API 频率超限,会触发至多 5 分钟的限流熔断,这 5 分钟内任何 API 调用都会报错。所以咱们作为服务调用方,更要将调用服务的申请限流。

2. 限流算法

在网上看了些限流算法,次要有 4 种。

2.1. 固定窗口算法

首先保护一个计数器,将单位时间段当做一个窗口,计数器记录这个窗口接管申请的次数。

  • 当次数少于限流阀值,就容许拜访,并且计数器 +1
  • 当次数大于限流阀值,就回绝拜访。
  • 以后的工夫窗口过来之后,计数器清零。

假如单位工夫是 1 秒,限流阀值为 3。在单位工夫 1 秒内,每来一个申请, 计数器就加 1,如果计数器累加的次数超过限流阀值 3,后续的申请全副回绝。等到 1s 完结后,计数器清 0,从新开始计数。

问题 1:窗口临界值,导致双倍阈值

假如限流阀值为 5 个申请,单位工夫窗口是 1s, 如果咱们在单位工夫内的前 0.8-1s 和 1 -1.2s,别离并发 5 个申请。尽管都没有超过阀值,然而如果算 0.8-1.2s, 则并发数高达 10,曾经超过单位工夫 1s 不超过 5 阀值的定义啦,通过的申请达到了阈值的两倍。

为了解决问题 2 中窗口临界值的问题,引入了滑动窗口限流。滑动窗口限流解决固定窗口临界值的问题,能够保障在任意工夫窗口内都不会超过阈值。

问题 2(两面性):集中流量,打满阈值,后续服务不可用

比方窗口大小为 1s,限流大小为 100,而后恰好在某个窗口的第 1ms 来了 100 个申请,而后第 2ms-999ms 的申请就都会被回绝,这段时间用户会感觉零碎服务不可用。

但这是两面性的问题,有毛病,也有长处,前面会说。

2.2. 滑动窗口算法

绝对于固定窗口,滑动窗口除了须要引入计数器之外,还须要记录时间窗口内每个申请达到的工夫点,因而对内存的占用会比拟多。

规定如下,假如工夫窗口为 1 秒:

  • 记录每次申请的工夫。
  • 统计每次申请的工夫 至 往前推 1 秒这个工夫窗口内申请数,并且 1 秒前的数据能够删除。
  • 统计的申请数小于阈值就记录这个申请的工夫,并容许通过,反之回绝。

滑动窗口算法就是固定窗口的升级版。将计时窗口划分成一个小窗口,滑动窗口算法就进化成了固定窗口算法。而滑动窗口算法其实就是对申请数进行了更细粒度的限流,窗口划分的越多,则限流越精准。

然而滑动窗口和固定窗口都无奈解决短时间之内集中流量的突击,就和后面介绍的一样。

接下来再说说漏桶,它能够解决工夫窗口类的痛点,使得流量更加的平滑。

2.3. 漏桶算法

漏桶算法面对限流,就更加的柔性,不存在间接的粗犷回绝。

它的原理很简略,能够认为就是注水漏水的过程。往漏桶中以任意速率流入水,以固定的速率流出水。当水超过桶的容量时,会被溢出,也就是被抛弃。因为桶容量是不变的,保障了整体的速率。

  • 流入的水滴,能够看作是拜访零碎的申请,这个流入速率是不确定的。
  • 桶的容量个别示意零碎所能解决的申请数。
  • 如果桶的容量满了,就达到限流的阀值,就会抛弃水滴(拒绝请求)
  • 流出的水滴,是恒定过滤的,对应服务依照固定的速率解决申请。

看到这想到啥,是不是 音讯队列 思维有点像,削峰填谷。通过破绽这么一过滤,申请就能平滑的流出,看起来很像很挺完满的?实际上它的长处也即毛病。

问题 3: 无奈应答流量突发

面对突发申请,服务的处理速度和平时是一样的,这其实不是咱们想要的,在面对突发流量咱们心愿在零碎安稳的同时,晋升用户体验即能更快的解决申请,而不是和失常流量一样,安分守己的解决(看看,之前滑动窗口说流量不够平滑,当初太平滑了又不行,难搞啊)。

而接下来咱们要谈的令牌桶算法可能在肯定水平上解决流量突发的问题。

2.4. 令牌桶算法

令牌桶算法是对漏斗算法的一种改良,除了可能起到限流的作用外,还容许肯定水平的流量突发。

令牌桶算法原理:

  • 有一个令牌管理员,依据限流大小,定速往令牌桶里放令牌。
  • 如果令牌数量满了,超过令牌桶容量的限度,那就抛弃。
  • 零碎在承受到一个用户申请时,都会先去令牌桶要一个令牌。如果拿到令牌,那么就解决这个申请的业务逻辑;
  • 如果拿不到令牌,就间接回绝这个申请。

能够看出令牌桶在应答突发流量的时候,桶内如果有 100 个令牌,那么这 100 个令牌能够马上被取走,而不像漏桶那样匀速的生产。所以在应答突发流量的时候令牌桶体现的更佳。

2.5. 集体了解

依照我的了解,对这 4 种算法做上面的分类比拟。

2.5.1. 滑动窗口算法 > 固定窗口算法

固定窗口 算法实现简略,性能高。然而会有临界突发流量问题,刹时流量最大能够达到阈值的 2 倍。

为了解决临界突发流量,能够将窗口划分为多个更细粒度的单元,每次窗口向右挪动一个单元,于是便有了 滑动窗口 算法。

从算法成果上来讲

滑动窗口算法要优于固定窗口算法,毕竟能防止窗口临界值问题。

从施行性能上来讲

固定窗口算法实现起来要更简略,对性能资源要求更低。滑动窗口只须要引入计数器,但滑动窗口还须要记录时间窗口内每个申请达到的工夫点,因而对内存的占用会比拟多。

总结:滑动窗口算法优先

不过限流算法就是为了爱护线上服务器资源,防止被流量击溃。与这代价相比,滑动窗口算法的那些性能资源耗费算得了什么。所以目前市场上,简直看不到以固定窗口算法实现的限流组件。

2.5.2. 漏桶算法(MQ 音讯队列)

想要达到限流的目标,又不会掐断流量,使得流量更加平滑?能够思考漏桶算法。

我为啥在漏桶算法这节加上 MQ 生产队列 呢?因为在我的了解中,这种限流算法,就是 MQ 生产队列的利用办法。无论生产音讯的频率如何,MQ 的消费者的生产频率下限是固定的。

有差异吗?有。漏桶算法中定义的是“桶容量固定。当水超过桶的容量时,会被溢出抛弃”。而 MQ 的惯例用法是“削峰填谷”,音讯能够在队列中积压,而后满满生产,但不会轻易抛弃。其实这也合乎通常的理论利用场景。真要实现漏桶算法的要求也行,齐全给队列设置为固定长度。

总结,如果要用漏桶算法限流,用 MQ 音讯队列就是了。

2.5.3. 令牌桶算法、滑动窗口算法 类似

1. 类似

在我看来,这两种算法很类似。

滑动窗口算法,是在应用时,按速率(窗口单位工夫内的最大通过数量)计算计数器,没超过就放行。

令牌桶算法,是依照速率往固定容量桶内投放令牌,在应用时,只有桶内还有令牌就能够放行。

从这个角度来看,令牌桶算法是将统计令牌数(计数器),和判断是否能够放行,这两个环节“解耦”了。

2. 独特的长处

  • 都没有窗口临界值问题。
  • (两面性问题)都能应答流量突发。像滑动窗口算法,突发流量进来时,窗口工夫内不超过计数器阈值即可。

3. 独特的毛病

  • (两面性问题)突发流量会占据大量令牌(计数器计数),导致后续流量进入受限。

2.5.4. 依照需要选型

在我看来,限流算法选型有两种:

  • 滑动窗口算法、令牌桶算法,二者属于同一类
  • 漏桶算法

而二者的区别就在于后面始终提到的两面性问题。

两面性问题:突发流量

限流算法中,对于应答突发流量,在我看来是个两面性问题。

  • 长处:针对突发场景也有无效响应。
  • 毛病:当突发流量进来后,必然会对后续进来的其余流量造成影响,流量不够平滑。

并没有一种最好的限流算法,到底抉择哪种限流算法,还是要看理论需要场景,联合已有的资源,综合思考。

3. 限流组件摸索

3.1. Ratelimiter

在做对调用钉钉 API 限流时,有看到钉钉文档上举荐的限流形式,就是 Guava 的 RateLimiter。

RateLimiter 是基于令牌桶算法限流的。但 RateLimiter 对于继续生成令牌,采纳的不是定时工作的形式(过于消耗资源,不适宜高并发),而是应用提早计算的形式。即在获取令牌时计算上一次工夫 nextFreeTicketMicros 和以后工夫之间的差值,计算这段时间之内依照用户设定的速率能够生产多少令牌。

void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    if (nowMicros > nextFreeTicketMicros) {double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      storedPermits = min(maxPermits, storedPermits + newPermits);
      nextFreeTicketMicros = nowMicros;
    }
}

针对令牌桶算法的这种实现形式比拟常见,前面在 Redisson 中也会见到。

我不了解为啥钉钉文档外面只官网举荐 RateLimiter,其实它是无奈满足需要的。因为它是基于 Java 线程实现的,是基于单机的限流。咱们的服务根本都是多节点服务器的,显著无奈实现总体限流。因而须要找一个能失效分布式限流的组件。

3.2. sentinel

再接着,我就想到了阿里本人的限流组件 sentinel,咱们部门也有现成的 sentinel 服务能够用。

我记得在 sentinel dashboard 上配置限流规定时,能够基于集群配置,于是我就入手试了一下。发现确实能够基于多个机器节点创立集群,而后基于集群来创立配置限流规定,以实现在整个集群的维度实现限流。

但问题来了,dashboard 上集群是须要手动抉择机器(ip+port)创立的。咱们单个微服务的服务器节点 ip、端口非固定的,而且反对弹性伸缩。一旦服务器发生变化,如何主动同步到 sentinel 集群信息上,就又是一个须要攻克的问题。

限流的算法不简单,要不罗唆本人写一个吧。基于 redis 存储,这样就能满足分布式限流。

3.3. Redisson

可当我开始基于 redis 本人写限流办法时,无心中发现 Redisson 本人就提供了封装好的限流办法 RRateLimiter

之前始终在用 Redisson 封装的分布式锁办法,都忘了看看其余的性能了。上面就具体介绍本人的应用,以及源码逻辑。

4. Redisson 限流算法

4.1. 示例

1. pom.xml
引入 redisson 的依赖:

        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.15.5</version>
        </dependency>

2. application

配置文件中申明一些简略的 redisson 的连贯信息。我这边是本地起了一个 redis 库,没设置明码。

spring:
  redis:
    redisson:
      config:
        singleServerConfig:
          address: redis://127.0.0.1:6379
          database: 0

3. controller

@RestController
@RequestMapping("")
@Slf4j
public class DemoController {
    private final RedissonClient redissonClient;

    public DemoController(RedissonClient redissonClient) {this.redissonClient = redissonClient;}


    @GetMapping("require")
    public void hello(Integer num) {RRateLimiter rateLimiter = redissonClient.getRateLimiter("LIMITER_NAME");
        rateLimiter.trySetRate(RateType.OVERALL, 5, 10, RateIntervalUnit.SECONDS);
        rateLimiter.tryAcquire(num,1,TimeUnit.MINUTES);
        log.info("get!");
    }
}

这边写了一个 demo 示例,定义了一个叫 “LIMITER_NAME” 的限流器,设置每 10 秒钟生成 5 个令牌。而后依据接口传入参数 num,看看申请多少个令牌。当申请不到时阻塞,最大阻塞工夫为 1 分钟。

4.2. redis 数据结构

RRateLimiter 接口的实现类简直都在 RedissonRateLimiter 上,咱们看看后面调用 RRateLimier 办法时,这些办法的对应源码实现。

1. setRate

对应实现类中的源码是:

    public RFuture<Boolean> trySetRateAsync(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {return this.commandExecutor.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "redis.call('hsetnx', KEYS[1],'rate', ARGV[1]);redis.call('hsetnx', KEYS[1],'interval', ARGV[2]);return redis.call('hsetnx', KEYS[1],'type', ARGV[3]);", Collections.singletonList(this.getRawName()), new Object[]{rate, unit.toMillis(rateInterval), type.ordinal()});
    }

外围是其中的 lua 脚本,摘出来看看:

redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);
redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);
return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);

发现基于一个 hash 类型的 redis key 设置了 3 个值。上面就着重讲讲这里限流算法中,一共用到的 3 个 redis key。

2. key 1:Hash 构造

就是后面 setRate 设置的 hash key。依照之前限流器命名“LIMITER_NAME”,这个 redis key 的名字就是 LIMITER_NAME。一共有 3 个值:

  1. rate:代表速率
  2. interval:代表多少工夫内产生的令牌
  3. type:代表单机还是集群

3. key 2:ZSET 构造

ZSET 记录获取令牌的工夫戳,用于工夫比照,redis key 的名字是 {LIMITER_NAME}:permits。上面讲讲 ZSET 中每个元素的 member 和 score:

  • member:蕴含两个内容:(1)一段 8 位随机字符串,为了惟一标志性当次获取令牌;(2)数字,即当次获取令牌的数量。不过这些是压缩后存储在 redis 中的,在工具上看时会发现乱码。
  • score:记录获取令牌的工夫戳,如:1667025166312(对应 2022-10-29 14:32:46)

4. key 3:string 构造

记录的是以后令牌桶中残余的令牌数。redis key 的名字是 {LIMITER_NAME}:value

4.3. 算法源码剖析

后面是铺垫,上面就着重讲讲获取令牌的源码吧。

对应办法是:

    private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {return this.commandExecutor.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, command, "local rate = redis.call('hget', KEYS[1],'rate');local interval = redis.call('hget', KEYS[1],'interval');local type = redis.call('hget', KEYS[1],'type');assert(rate ~= false and interval ~= false and type ~= false,'RateLimiter is not initialized')local valueName = KEYS[2];local permitsName = KEYS[4];if type =='1'then valueName = KEYS[3];permitsName = KEYS[5];end;assert(tonumber(rate) >= tonumber(ARGV[1]),'Requested permits amount could not exceed defined rate'); local currentValue = redis.call('get', valueName); if currentValue ~= false then local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); local released = 0; for i, v in ipairs(expiredValues) do local random, permits = struct.unpack('fI', v);released = released + permits;end; if released > 0 then redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); currentValue = tonumber(currentValue) + released; redis.call('set', valueName, currentValue);end;if tonumber(currentValue) < tonumber(ARGV[1]) then local nearest = redis.call('zrangebyscore', permitsName,'('.. (tonumber(ARGV[2]) - interval),'+inf','withscores','limit', 0, 1); return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval);else redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); redis.call('decrby', valueName, ARGV[1]); return nil; end; else redis.call('set', valueName, rate); redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); redis.call('decrby', valueName, ARGV[1]); return nil; end;", Arrays.asList(this.getRawName(), this.getValueName(), this.getClientValueName(), this.getPermitsName(), this.getClientPermitsName()), new Object[]{value, System.currentTimeMillis(), ThreadLocalRandom.current().nextLong()});
    }

咱们先看看执行 lua 脚本时,所有要传入的参数内容:

  • KEYS[1]:hash key name
  • KEYS[2]:全局 string(value) key name
  • KEYS[3]:单机 string(value) key name
  • KEYS[4]:全局 zset(permits) key name
  • KEYS[5]:单机 zset(permits) key name
  • ARGV[1]:以后申请令牌数量
  • ARGV[2]:以后工夫
  • ARGV[3]:8 位随机字符串

而后,咱们再将其中的 lua 局部提取进去,我再依据本人的了解,在其中各段代码加上了正文。

-- rate:间隔时间内产生令牌数量
-- interval:间隔时间
-- type:类型:0- 全局限流;1- 单机限
local rate = redis.call('hget', KEYS[1], 'rate');
local interval = redis.call('hget', KEYS[1], 'interval');
local type = redis.call('hget', KEYS[1], 'type');
-- 如果 3 个参数存在空值,谬误提醒初始化未实现
assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')
local valueName = KEYS[2];
local permitsName = KEYS[4];
-- 如果是单机限流,在全局 key 后拼接上机器惟一标识字符
if type == '1' then
    valueName = KEYS[3];
    permitsName = KEYS[5];
end ;
-- 如果:以后申请令牌数 < 窗口工夫内令牌产生数量,谬误提醒申请令牌不能超过 rate
assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate');
-- currentValue = 以后残余令牌数量
local currentValue = redis.call('get', valueName);
-- 非第一次拜访,存储残余令牌数量的 string(value) key 存在,有值(包含 0)if currentValue ~= false then
    -- 以后工夫戳往前推一个间隔时间,属于工夫窗口以外。工夫窗口以外,签发过的令牌,都属于过期令牌,须要回收回来
    local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval);
    -- 统计能够回收的令牌数量
    local released = 0;
    for i, v in ipairs(expiredValues) do
        -- lua struct 的 pack/unpack 办法,能够了解为文本压缩 / 解压缩办法
        local random, permits = struct.unpack('fI', v);
        released = released + permits;
    end ;
    -- 移除 zset(permits) 中过期的令牌签发记录
    -- 将过期令牌回收回来,从新更新残余令牌数量
    if released > 0 then
        redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval);
        currentValue = tonumber(currentValue) + released;
        redis.call('set', valueName, currentValue);
    end ;
    -- 如果 残余令牌数量 < 以后申请令牌数量,返回揣测能够取得所需令牌数量的工夫
    --(1)最近一次签发令牌的开释工夫 = 最近一次签发令牌的签发工夫戳 + 间隔时间(interval)
    --(2)揣测可取得所需令牌数量的工夫 = 最近一次签发令牌的开释工夫 - 以后工夫戳
    --(3)"揣测" 可取得所需令牌数量的工夫,"揣测",是因为不确定最近一次签发令牌数量开释后,加上到时候的残余令牌数量,是否满足所需令牌数量
    if tonumber(currentValue) < tonumber(ARGV[1]) then
        local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), '+inf', 'withscores', 'limit', 0, 1);
        return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval);
        -- 如果 残余令牌数量 >= 以后申请令牌数量,可间接记录签发令牌,并从残余令牌数量中减去以后签发令牌数量
    else
        redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1]));
        redis.call('decrby', valueName, ARGV[1]);
        return nil;
    end ;
    -- 第一次拜访,存储残余令牌数量的 string(value) key 不存在,为 null,走初始化逻辑
else
    redis.call('set', valueName, rate);
    redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1]));
    redis.call('decrby', valueName, ARGV[1]);
    return nil;
end ;
退出移动版