共计 7408 个字符,预计需要花费 19 分钟才能阅读完成。
在接入 Spring-Cloud-Gateway 时,可能有需求进行缓存 Json-Body 数据或者 Form-Urlencoded 数据的情况。由于 Spring-Cloud-Gateway 是以 WebFlux 为基础的响应式架构设计,所以在原有 Zuul 基础上迁移过来的过程中,传统的编程思路,并不适合于 Reactor Stream 的开发。网络上有许多缓存案例,但是在测试过程中出现各种 Bug 问题,在缓存 Body 时,需要考虑整体的响应式操作,才能更合理的缓存数据
下面提供缓存 Json-Body 数据或者 Form-Urlencoded 数据的具体实现方案,该方案经测试,满足各方面需求,以及避免了网络上其他缓存方案所出现的问题
定义一个 GatewayContext 类,用于存储请求中缓存的数据
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
@Getter
@Setter
@ToString
public class GatewayContext {
public static final String CACHE_GATEWAY_CONTEXT = “cacheGatewayContext”;
/**
* cache json body
*/
private String cacheBody;
/**
* cache formdata
*/
private MultiValueMap<String, String> formData;
/**
* cache reqeust path
*/
private String path;
}
实现 GlobalFilter 和 Ordered 接口用于缓存请求数据
1 . 该示例只支持缓存下面 3 种 MediaType
APPLICATION_JSON–Json 数据
APPLICATION_JSON_UTF8–Json 数据
APPLICATION_FORM_URLENCODED–FormData 表单数据
2 . 经验总结:
在缓存 Body 时, 不能够在 Filter 内部直接进行缓存,需要按照响应式的处理方式,在异步操作路途上进行缓存 Body,由于 Body 只能读取一次,所以要读取完成后要重新封装新的 request 和 exchange 才能保证请求正常传递到下游
在缓存 FormData 时,FormData 也只能读取一次,所以在读取完毕后,需要重新封装 request 和 exchange, 这里要注意,如果对 FormData 内容进行了修改,则必须重新定义 Header 中的 content-length 已保证传输数据的大小一致
import com.choice.cloud.architect.usergate.option.FilterOrderEnum;
import com.choice.cloud.architect.usergate.support.GatewayContext;
import io.netty.buffer.ByteBufAllocator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.server.HandlerStrategies;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
@Slf4j
public class GatewayContextFilter implements GlobalFilter, Ordered {
/**
* default HttpMessageReader
*/
private static final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
/**
* save request path and serviceId into gateway context
*/
ServerHttpRequest request = exchange.getRequest();
String path = request.getPath().pathWithinApplication().value();
GatewayContext gatewayContext = new GatewayContext();
gatewayContext.getAllRequestData().addAll(request.getQueryParams());
gatewayContext.setPath(path);
/**
* save gateway context into exchange
*/
exchange.getAttributes().put(GatewayContext.CACHE_GATEWAY_CONTEXT,gatewayContext);
HttpHeaders headers = request.getHeaders();
MediaType contentType = headers.getContentType();
long contentLength = headers.getContentLength();
if(contentLength>0){
if(MediaType.APPLICATION_JSON.equals(contentType) || MediaType.APPLICATION_JSON_UTF8.equals(contentType)){
return readBody(exchange, chain,gatewayContext);
}
if(MediaType.APPLICATION_FORM_URLENCODED.equals(contentType)){
return readFormData(exchange, chain,gatewayContext);
}
}
log.debug(“[GatewayContext]ContentType:{},Gateway context is set with {}”,contentType, gatewayContext);
return chain.filter(exchange);
}
@Override
public int getOrder() {
return Integer.MIN_VALUE;
}
/**
* ReadFormData
* @param exchange
* @param chain
* @return
*/
private Mono<Void> readFormData(ServerWebExchange exchange,GatewayFilterChain chain,GatewayContext gatewayContext){
HttpHeaders headers = exchange.getRequest().getHeaders();
return exchange.getFormData()
.doOnNext(multiValueMap -> {
gatewayContext.setFormData(multiValueMap);
log.debug(“[GatewayContext]Read FormData:{}”,multiValueMap);
})
.then(Mono.defer(() -> {
Charset charset = headers.getContentType().getCharset();
charset = charset == null? StandardCharsets.UTF_8:charset;
String charsetName = charset.name();
MultiValueMap<String, String> formData = gatewayContext.getFormData();
/**
* formData is empty just return
*/
if(null == formData || formData.isEmpty()){
return chain.filter(exchange);
}
StringBuilder formDataBodyBuilder = new StringBuilder();
String entryKey;
List<String> entryValue;
try {
/**
* remove system param ,repackage form data
*/
for (Map.Entry<String, List<String>> entry : formData.entrySet()) {
entryKey = entry.getKey();
entryValue = entry.getValue();
if (entryValue.size() > 1) {
for(String value : entryValue){
formDataBodyBuilder.append(entryKey).append(“=”).append(URLEncoder.encode(value, charsetName)).append(“&”);
}
} else {
formDataBodyBuilder.append(entryKey).append(“=”).append(URLEncoder.encode(entryValue.get(0), charsetName)).append(“&”);
}
}
}catch (UnsupportedEncodingException e){
//ignore URLEncode Exception
}
/**
* substring with the last char ‘&’
*/
String formDataBodyString = “”;
if(formDataBodyBuilder.length()>0){
formDataBodyString = formDataBodyBuilder.substring(0, formDataBodyBuilder.length() – 1);
}
/**
* get data bytes
*/
byte[] bodyBytes = formDataBodyString.getBytes(charset);
int contentLength = bodyBytes.length;
ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(
exchange.getRequest()) {
/**
* change content-length
* @return
*/
@Override
public HttpHeaders getHeaders() {
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(super.getHeaders());
if (contentLength > 0) {
httpHeaders.setContentLength(contentLength);
} else {
httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, “chunked”);
}
return httpHeaders;
}
/**
* read bytes to Flux<Databuffer>
* @return
*/
@Override
public Flux<DataBuffer> getBody() {
return DataBufferUtils.read(new ByteArrayResource(bodyBytes),new NettyDataBufferFactory(ByteBufAllocator.DEFAULT),contentLength);
}
};
ServerWebExchange mutateExchange = exchange.mutate().request(decorator).build();
log.debug(“[GatewayContext]Rewrite Form Data :{}”,formDataBodyString);
return chain.filter(mutateExchange);
}));
}
/**
* ReadJsonBody
* @param exchange
* @param chain
* @return
*/
private Mono<Void> readBody(ServerWebExchange exchange,GatewayFilterChain chain,GatewayContext gatewayContext){
/**
* join the body
*/
return DataBufferUtils.join(exchange.getRequest().getBody())
.flatMap(dataBuffer -> {
/**
* read the body Flux<Databuffer>
*/
DataBufferUtils.retain(dataBuffer);
Flux<DataBuffer> cachedFlux = Flux.defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
/**
* repackage ServerHttpRequest
*/
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
/**
* mutate exchage with new ServerHttpRequest
*/
ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build();
/**
* read body string with default messageReaders
*/
return ServerRequest.create(mutatedExchange, messageReaders)
.bodyToMono(String.class)
.doOnNext(objectValue -> {
gatewayContext.setCacheBody(objectValue);
log.debug(“[GatewayContext]Read JsonBody:{}”,objectValue);
}).then(chain.filter(mutatedExchange));
});
}
}
在后续 Filter 中,可以直接从 ServerExchange 中获取 GatewayContext,就可以获取到缓存的数据,如果需要缓存其他数据,则可以根据自己的需求,添加到 GatewayContext 中即可
GatewayContext gatewayContext = exchange.getAttribute(GatewayContext.CACHE_GATEWAY_CONTEXT);