共计 8133 个字符,预计需要花费 21 分钟才能阅读完成。
Spring Cloud Gateway 在有些场景中须要获取 request body 内容进行参数校验或参数批改,咱们通过在 GatewayFilter 中获取申请内容来获取和批改申请体,上面咱们就基于 ServerWebExchange 来实现:
ServerWebExchange 命名为服务网络交换器,寄存着重要的申请 - 响应属性、申请实例和响应实例等等,有点像 Context 的角色,其中有两个重要的接口办法:
// 获取 ServerHttpRequest 对象 | |
ServerHttpRequest getRequest(); | |
// 获取 ServerHttpResponse 对象 | |
ServerHttpResponse getResponse(); |
创立一个 GatewayFilter,必须实现 Ordered 接口,返回一个小于 - 1 的 order 值,这是因为 NettyWriteResponseFilter 的 order 值为 -1,咱们须要笼罩返回响应体的逻辑,自定义的 GlobalFilter 必须比 NettyWriteResponseFilter 优先执行。
public class RequestGatewayFilter implements GatewayFilter, Ordered { | |
@Override | |
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {ServerHttpRequest request = exchange.getRequest(); | |
HttpHeaders headers = request.getHeaders(); | |
// 解决参数 | |
MediaType contentType = headers.getContentType(); | |
if (exchange.getRequest().getMethod().equals(HttpMethod.POST)) { | |
//Content-type 为“application/json”if (MediaType.APPLICATION_JSON.isCompatibleWith(contentType)) {return readBody(exchange, chain); | |
} | |
//Content-type 为“application/x-www-form-urlencoded”else if(MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(contentType)){GatewayContext gatewayContext = new GatewayContext(); | |
gatewayContext.setRequestHeaders(headers); | |
gatewayContext.getAllRequestData().addAll(request.getQueryParams()); | |
exchange.getAttributes().put(GatewayContext.CACHE_GATEWAY_CONTEXT,gatewayContext); | |
return readFormData(exchange, chain, gatewayContext); | |
} | |
//Content-type 为“multipart/form-data”else if (MediaType.MULTIPART_FORM_DATA.isCompatibleWith(contentType)) {GatewayContext gatewayContext = new GatewayContext(); | |
gatewayContext.setRequestHeaders(headers); | |
gatewayContext.getAllRequestData().addAll(request.getQueryParams()); | |
exchange.getAttributes().put(GatewayContext.CACHE_GATEWAY_CONTEXT,gatewayContext); | |
return readMultipartData(exchange, chain, gatewayContext); | |
} | |
} else {return readGetData(exchange, chain); | |
} | |
return chain.filter(exchange); | |
} | |
@Override | |
public int getOrder() {return -2;} | |
} |
解决 content-type 为 application/json 的办法:
/** | |
* ReadJsonBody | |
* | |
* @param exchange | |
* @param chain | |
* @return | |
*/ | |
private Mono<Void> readBody(ServerWebExchange exchange, GatewayFilterChain chain) { | |
/** | |
* join the body | |
*/ | |
return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {byte[] bytes = new byte[dataBuffer.readableByteCount()]; | |
dataBuffer.read(bytes); | |
DataBufferUtils.release(dataBuffer); | |
/** | |
* validate request params or form data | |
*/ | |
Result checkResult = null; | |
try {String bodyString = new String(bytes, "utf-8"); | |
Map bodyMap = JSONObject.parseObject(bodyString,Map.class); | |
// 校验参数 | |
checkResult = validParam(exchange, bodyMap); | |
if(checkResult.getCode()!=0){return errorInfo(exchange, checkResult.getCode(), checkResult.getMessage()); | |
} | |
} catch (UnsupportedEncodingException e) {e.printStackTrace(); | |
} | |
Flux<DataBuffer> cachedFlux = Flux.defer(() -> {DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes); | |
DataBufferUtils.retain(buffer); | |
return Mono.just(buffer); | |
}); | |
/** | |
* repackage ServerHttpRequest | |
*/ | |
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) { | |
@Override | |
public Flux<DataBuffer> getBody() {return cachedFlux;} | |
}; | |
ServerHttpResponse originalResponse = exchange.getResponse(); | |
originalResponse.getHeaders().setContentType(MediaType.APPLICATION_JSON); | |
DataBufferFactory bufferFactory = originalResponse.bufferFactory(); | |
ServerHttpResponseDecorator response = buildResponse(originalResponse, bufferFactory, (Map)checkResult.getResult()); | |
/** | |
* mutate exchage with new ServerHttpRequest | |
*/ | |
ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).response(response).build(); | |
/** | |
* read body string with default messageReaders | |
*/ | |
return ServerRequest.create(mutatedExchange, MESSAGE_READERS).bodyToMono(String.class) | |
.doOnNext(objectValue -> {log.debug("[GatewayContext]Read JsonBody:{}", objectValue); | |
}).then(chain.filter(mutatedExchange)); | |
}); | |
} |
解决 content-type 为 application/x-www-form-urlencoded 的办法:
/** | |
* ReadFormData | |
* @param exchange | |
* @param chain | |
* @return | |
*/ | |
private Mono<Void> readFormData(ServerWebExchange exchange, GatewayFilterChain chain, GatewayContext gatewayContext){HttpHeaders headers = exchange.getRequest().getHeaders(); | |
return exchange.getFormData().switchIfEmpty(Mono.defer(() -> Mono.just(new LinkedMultiValueMap<>())) | |
).flatMap(formDataMap -> {Charset charset = headers.getContentType().getCharset(); | |
charset = charset == null? StandardCharsets.UTF_8:charset; | |
String charsetName = charset.name(); | |
MultiValueMap<String, String> paramMap = exchange.getRequest().getQueryParams(); | |
Map map = convertMap(paramMap); | |
/* | |
* formData is empty just return | |
*/ | |
if((null == formDataMap || formDataMap.isEmpty()) && (null == map || map.isEmpty())){return chain.filter(exchange); | |
} | |
StringBuilder formDataBodyBuilder = new StringBuilder(); | |
String entryKey; | |
List<String> entryValue; | |
try { | |
/* | |
* repackage form data | |
*/ | |
for (Map.Entry<String, List<String>> entry : formDataMap.entrySet()) {entryKey = entry.getKey(); | |
entryValue = entry.getValue(); | |
if (entryValue.size() > 1) {for(String value : entryValue){formDataBodyBuilder.append(entryKey).append("=").append(value).append("&"); | |
} | |
} else {formDataBodyBuilder.append(entryKey).append("=").append(entryValue.get(0)).append("&"); | |
} | |
} | |
}catch (Exception e){e.printStackTrace(); | |
} | |
/* | |
* substring with the last char '&' | |
*/ | |
String formDataBodyString = ""; | |
if(formDataBodyBuilder.length()>0){formDataBodyString = formDataBodyBuilder.substring(0, formDataBodyBuilder.length() - 1); | |
} | |
/* | |
* get data bytes | |
*/ | |
byte[] bodyBytes = formDataBodyString.getBytes(charset); | |
int contentLength = bodyBytes.length; | |
HttpHeaders httpHeaders = new HttpHeaders(); | |
httpHeaders.putAll(exchange.getRequest().getHeaders()); | |
httpHeaders.remove(HttpHeaders.CONTENT_LENGTH); | |
/* | |
* in case of content-length not matched | |
*/ | |
httpHeaders.setContentLength(contentLength); | |
/** | |
* validate request params or form data | |
*/ | |
Map<String, Object> bodyMap = StringUtils.isEmpty(formDataBodyString)?new HashMap<String, Object>():decodeFormBody(formDataBodyString); | |
Result checkResult = validParam(exchange, bodyMap); | |
if(checkResult.getCode()!=0){return errorInfo(exchange, checkResult.getCode(), checkResult.getMessage()); | |
} | |
/* | |
* use BodyInserter to InsertFormData Body | |
*/ | |
BodyInserter<String, ReactiveHttpOutputMessage> bodyInserter = BodyInserters.fromObject(formDataBodyString); | |
CachedBodyOutputMessage cachedBodyOutputMessage = new CachedBodyOutputMessage(exchange, httpHeaders); | |
log.debug("[GatewayContext]Rewrite Form Data :{}",formDataBodyString); | |
return bodyInserter.insert(cachedBodyOutputMessage, new BodyInserterContext()) | |
.then(Mono.defer(() -> { | |
ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(exchange.getRequest()) { | |
@Override | |
public HttpHeaders getHeaders() {return httpHeaders;} | |
@Override | |
public Flux<DataBuffer> getBody() {return cachedBodyOutputMessage.getBody(); | |
} | |
}; | |
ServerHttpResponse originalResponse = exchange.getResponse(); | |
originalResponse.getHeaders().setContentType(MediaType.APPLICATION_JSON); | |
DataBufferFactory bufferFactory = originalResponse.bufferFactory(); | |
ServerHttpResponseDecorator response = buildResponse(originalResponse, bufferFactory, (Map)checkResult.getResult()); | |
return chain.filter(exchange.mutate().request(decorator).response(response).build()); | |
})); | |
}); | |
} |
有时须要对返回的数据对立解决,那么能够通过封装 ServerHttpResponseDecorator 进行解决,ServerHttpResponse 装璜器 ServerHttpResponseDecorator,次要笼罩写入响应体数据缓冲区的局部。
private ServerHttpResponseDecorator buildResponse(ServerHttpResponse originalResponse, DataBufferFactory bufferFactory, Map result) {return new ServerHttpResponseDecorator(originalResponse) { | |
@Override | |
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {System.out.println("++++++++++++++++++++++++1"); | |
if (getStatusCode().equals(HttpStatus.OK) && body instanceof Flux) {Flux<? extends DataBuffer> fluxBody = Flux.from(body); | |
return super.writeWith(fluxBody.buffer().map(dataBuffers -> {DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory(); | |
DataBuffer join = dataBufferFactory.join(dataBuffers); | |
byte[] content = new byte[join.readableByteCount()]; | |
join.read(content); | |
DataBufferUtils.release(join); | |
// 流转为字符串 | |
String responseData = new String(content, Charsets.UTF_8); | |
System.out.println(responseData); | |
Map map = JSON.parseObject(responseData); | |
// 解决返回的数据 | |
//To do | |
byte[] uppedContent = responseData.getBytes(Charsets.UTF_8); | |
originalResponse.getHeaders().setContentLength(uppedContent.length); | |
return bufferFactory.wrap(uppedContent); | |
})); | |
} else {System.out.println("----------"+getStatusCode()); | |
} | |
return super.writeWith(body); | |
} | |
@Override | |
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {return writeWith(Flux.from(body).flatMapSequential(p -> p)); | |
} | |
}; | |
} |
正文完