乐趣区

关于redis:基于Redis实现一套支持排队等待的限流器

一、背景

因为我的项目中调用了一个政府官网零碎,前段时间失去政府告诉说咱们调用频率太高,目前给咱们凋谢的接口调用频率是每秒一次。而后还发过来一个咱们申请通过与超频的比例报告,显示失败率高达 80%,也就是说咱们百分之 80 的申请都被拦挡了,这里应该会有有搭档疑难 80% 的异样申请你们零碎怎么开展业务的。实际上咱们零碎外部有做缓存,而后后盾有被动和被动两种形式去刷新缓存,所以这 80% 失败申请中绝大多数都是后盾刷新缓存的申请,并非客户端用户的申请所以呢对咱们的业务也没有实质性的影响。基于此我方也须要做申请限度,不然政府方面会思考以申请失败率过高而把咱们的接口权限下掉。

二、调研

对于限流的经典算法漏斗和令牌通这里就不多说了,这类算法介绍网上曾经很多内容了。我这里整顿下目前市面上开源的限流工具以及为什么咱们没抉择应用开源工具而要本人造轮子的起因。

Google Guava

首先就是谷歌的 Guava 工具类库,该类库提供很多比拟好用的工具类,其中就包含基于令牌通算法实现的限流器 RateLimiter

// 1、申明一个 qps 最大为 1 的限流器
RateLimiter limiter = RateLimiter.create(1);
// 2、尝试阻塞获取令牌
limiter.acquire();

Alibaba Sentinel

而后就是阿里巴巴的 Sentinel,这个就比拟弱小了,应该是目前市面上限流方面做的最全面的开源我的项目了。不仅反对流量管制,同时还反对分布式限流,熔断降级,系统监控等,还有比拟灵便的限流策略配置反对。这里我也没用过,可能须要花些工夫能力把握吧。

Redisson RRateLimiter

最初要介绍的是基于令牌通算法实现的 RRateLimiter, 它是 Redisson 类库的限流工具,反对分布式限流,应用起来也相当的不便

// 1、申明一个限流器
RRateLimiter rateLimiter = redissonClient.getRateLimiter(key);
 
// 2、设置速率,5 秒中产生 3 个令牌
rateLimiter.trySetRate(RateType.OVERALL, 3, 5, RateIntervalUnit.SECONDS);
 
// 3、试图获取一个令牌,获取到返回 true
rateLimiter.tryAcquire(1)

选型

  1. 首先我的需要是限流器必须要反对分布式,那 Guava 首先能够排除了。
  2. 而后 Sentinel 对于咱们的需要来说有些轻便,太过于分量所以也排除了。
  3. 最初 RRateLimiter 尽管反对分布式,应用也比较简单,然而如同它不反对偏心排队(不太确定)。

三、造轮子

基于以上我决定本人手撸一个反对偏心排队的分布式限流器。实现计划是基于 Redis Lua 脚本而后配合业务层代码反对,间接上菜

限流器的主体代码

public class RedisRateLimiter {public static final GenericToStringSerializer argsSerializer = new GenericToStringSerializer<>(Object.class);
    public static final GenericToStringSerializer resultSerializer = new GenericToStringSerializer<>(Long.class);

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    @Resource
    private RedisUtil redisUtil;

    public static final int DEFAULT_MAX_PERMIT_COUNT = 1;
    public static final float DEFAULT_INTERVAL_SECONDS = 1.3f;
    public static final int DEFAULT_TIMEOUT_SECONDS = 5;

    // TODO 目前不反对自定义该值
    /**
     * 一个周期内的最大许可数量
     */
    private int maxPermitCount;

    public RedisRateLimiter() {this.maxPermitCount = DEFAULT_MAX_PERMIT_COUNT;}

    public static DefaultRedisScript<Long> redisScript;

    static {redisScript = new DefaultRedisScript<>();
        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("rateLimiter.lua")));
        redisScript.setResultType(Long.class);
    }

    /**
     *
     * @param redisKey
     * @param intervalSeconds 距离几秒创立一个许可
     * @param timeoutSeconds 获取许可超时工夫
     * @return
     * @throws InterruptedException
     */
    public boolean tryAcquire(String redisKey, float intervalSeconds, long timeoutSeconds) throws InterruptedException {
        try {if (redisKey == null) {throw new BusinessException(BusinessExceptionCode.REQUEST_PARAM_ERROR, "redisKey 不能为空!");
            }
            Preconditions.checkArgument(intervalSeconds > 0.0 && !Double.isNaN(intervalSeconds), "rate must be positive");

            long intervalMillis = (long) (intervalSeconds * 1000);
            long timeoutMillis = Math.max(TimeUnit.SECONDS.toMillis(timeoutSeconds), 0);

            long pttl = redisTemplate.execute(redisScript, argsSerializer, resultSerializer, Arrays.asList(redisKey), maxPermitCount, intervalMillis, timeoutMillis);

            if (pttl == 0) {log.info("---------------- 无需排队,间接通过, 以后许可数量 ={}", redisUtil.get(redisKey));
                return true;
            }else if(pttl < timeoutMillis) {Thread.sleep(pttl);
                log.info("---------------- 排队 {} 毫秒后,通过, 以后许可数量 ={}", pttl, redisUtil.get(redisKey));
                return true;
            }else {
                // 间接超时
                log.info("---------------- 需排队 {} 毫秒,间接超时, 以后许可数量 ={}", pttl, redisUtil.get(redisKey));
                return false;
            }
        }catch (Exception e) {log.error("限流异样", e);
        }
        return true;
    }
}

外围 Lua 脚本文件: rateLimiter.lua

local limit = tonumber(ARGV[1])
local interval = tonumber(ARGV[2])
local timeout = tonumber(ARGV[3])
local count = tonumber(redis.call('get', KEYS[1]) or "0")
local pttl = tonumber(redis.call('pttl', KEYS[1]) or "0")
if pttl < 0 then
 pttl = 0
end
-- 这个代表已被预约的令牌数
local currentCount = count - math.max(math.floor((count*interval - pttl)/interval), 0)
-- 新增一个令牌
local newCount = currentCount + 1
-- 所有令牌总的生效毫秒
local newPTTL = pttl + interval

if newCount <= limit then

 -- 无需排队间接通过
 redis.call("PSETEX", KEYS[1], newPTTL, newCount)
 return 0
elseif pttl < timeout then

 -- 排队 pttl 毫秒后可通过
 redis.call("PSETEX", KEYS[1], newPTTL, newCount)
else

 -- 超时
 redis.call("PSETEX", KEYS[1], pttl, currentCount)
end
-- 返回需期待毫秒数
return pttl
退出移动版