引入 redis 的 reactive 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
<version>2.6.2</version>
</dependency>
定义一下限流的具体维度
@Configuration
public class DemoConfiguration {
@Bean
public KeyResolver pathKeyResolver() {return exchange -> Mono.just(exchange.getRequest().getPath().toString());
}
}
两种形式,一种是 application.yml 中配置
spring:
redis:
host: 127.0.0.1
port: 6379
cloud:
gateway:
routes:
- id: routeB
uri: http://www.b.com
predicates:
- Path=/pathB
filters:
- name: RequestRateLimiter
args:
# 每秒解决多少个均匀申请数
redis-rate-limiter.replenishRate: 2
# 令容许在一秒钟内实现的最大申请数
redis-rate-limiter.burstCapacity: 2
# 获取 Bean 对象,@Bean 的标识,默认 bean 名称与办法名一样。key-resolver: "#{@apiKeyResolver}"
另一种在 RouteDefinitionRepository 的实现中,即动静路由的实现中
FilterDefinition filter3 = new FilterDefinition();
filter3.setName("RequestRateLimiter");
Map<String, String> map3 = new HashMap<>();
map3.put("redis-rate-limiter.replenishRate", "3");
map3.put("redis-rate-limiter.burstCapacity", "2");
map3.put("key-resolver", "#{@pathKeyResolver}");
filter3.setArgs(map3);
能够在本地 redis 中应用 monitor 命令观察所执行的内容
127.0.0.1:6379> monitor
OK
那么来看一下同一秒中间断执行两次申请的状况:
1643107233.014412 [0 127.0.0.1:52263] "EVALSHA" "76a566c51f92d8f643d16e0ef0101663d5380dba" "2" "request_rate_limiter.{/client1}.tokens" "request_rate_limiter.{/client1}.timestamp" "2" "2" """1"
1643107233.014451 [0 lua] "TIME"
1643107233.014462 [0 lua] "get" "request_rate_limiter.{/client1}.tokens"
1643107233.014469 [0 lua] "get" "request_rate_limiter.{/client1}.timestamp"
1643107233.014484 [0 lua] "setex" "request_rate_limiter.{/client1}.tokens" "2" "1"
1643107233.014495 [0 lua] "setex" "request_rate_limiter.{/client1}.timestamp" "2" "1643107233"
1643107233.326166 [0 127.0.0.1:52263] "EVALSHA" "76a566c51f92d8f643d16e0ef0101663d5380dba" "2" "request_rate_limiter.{/client1}.tokens" "request_rate_limiter.{/client1}.timestamp" "2" "2" """1"
1643107233.326219 [0 lua] "TIME"
1643107233.326228 [0 lua] "get" "request_rate_limiter.{/client1}.tokens"
1643107233.326235 [0 lua] "get" "request_rate_limiter.{/client1}.timestamp"
1643107233.326251 [0 lua] "setex" "request_rate_limiter.{/client1}.tokens" "2" "0"
1643107233.326262 [0 lua] "setex" "request_rate_limiter.{/client1}.timestamp" "2" "1643107233"
再看一下同一秒内间断三次的状况:
1643107474.188667 [0 127.0.0.1:52263] "EVALSHA" "76a566c51f92d8f643d16e0ef0101663d5380dba" "2" "request_rate_limiter.{/client1}.tokens" "request_rate_limiter.{/client1}.timestamp" "2" "2" """1"
1643107474.188704 [0 lua] "TIME"
1643107474.188712 [0 lua] "get" "request_rate_limiter.{/client1}.tokens"
1643107474.188718 [0 lua] "get" "request_rate_limiter.{/client1}.timestamp"
1643107474.188733 [0 lua] "setex" "request_rate_limiter.{/client1}.tokens" "2" "1"
1643107474.188745 [0 lua] "setex" "request_rate_limiter.{/client1}.timestamp" "2" "1643107474"
1643107474.347245 [0 127.0.0.1:52263] "EVALSHA" "76a566c51f92d8f643d16e0ef0101663d5380dba" "2" "request_rate_limiter.{/client1}.tokens" "request_rate_limiter.{/client1}.timestamp" "2" "2" """1"
1643107474.347283 [0 lua] "TIME"
1643107474.347291 [0 lua] "get" "request_rate_limiter.{/client1}.tokens"
1643107474.347298 [0 lua] "get" "request_rate_limiter.{/client1}.timestamp"
1643107474.347313 [0 lua] "setex" "request_rate_limiter.{/client1}.tokens" "2" "0"
1643107474.347324 [0 lua] "setex" "request_rate_limiter.{/client1}.timestamp" "2" "1643107474"
1643107474.494758 [0 127.0.0.1:52263] "EVALSHA" "76a566c51f92d8f643d16e0ef0101663d5380dba" "2" "request_rate_limiter.{/client1}.tokens" "request_rate_limiter.{/client1}.timestamp" "2" "2" """1"
1643107474.494793 [0 lua] "TIME"
1643107474.494801 [0 lua] "get" "request_rate_limiter.{/client1}.tokens"
1643107474.494808 [0 lua] "get" "request_rate_limiter.{/client1}.timestamp"
1643107474.494823 [0 lua] "setex" "request_rate_limiter.{/client1}.tokens" "2" "0"
1643107474.494834 [0 lua] "setex" "request_rate_limiter.{/client1}.timestamp" "2" "1643107474"
第三次的申请,曾经被限流了,返回了 http code 429
执行的 lua 脚本具体内容从源码中能够看到:
redis.replicate_commands()
local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key" .. tokens_key)
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = redis.call('TIME')[1]
local requested = tonumber(ARGV[4])
local fill_time = capacity/rate
local ttl = math.floor(fill_time*2)
--redis.log(redis.LOG_WARNING, "rate" .. ARGV[1])
--redis.log(redis.LOG_WARNING, "capacity" .. ARGV[2])
--redis.log(redis.LOG_WARNING, "now" .. now)
--redis.log(redis.LOG_WARNING, "requested" .. ARGV[4])
--redis.log(redis.LOG_WARNING, "filltime" .. fill_time)
--redis.log(redis.LOG_WARNING, "ttl" .. ttl)
local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
last_tokens = capacity
end
--redis.log(redis.LOG_WARNING, "last_tokens" .. last_tokens)
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
last_refreshed = 0
end
--redis.log(redis.LOG_WARNING, "last_refreshed" .. last_refreshed)
local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
new_tokens = filled_tokens - requested
allowed_num = 1
end
--redis.log(redis.LOG_WARNING, "delta" .. delta)
--redis.log(redis.LOG_WARNING, "filled_tokens" .. filled_tokens)
--redis.log(redis.LOG_WARNING, "allowed_num" .. allowed_num)
--redis.log(redis.LOG_WARNING, "new_tokens" .. new_tokens)
if ttl > 0 then
redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)
end
-- return {allowed_num, new_tokens, capacity, filled_tokens, requested, new_tokens}
return {allowed_num, new_tokens}