Spring Cloud Gateway在有些场景中须要获取request body内容进行参数校验或参数批改,咱们通过在GatewayFilter中获取申请内容来获取和批改申请体,上面咱们就基于ServerWebExchange来实现:
ServerWebExchange命名为服务网络交换器,寄存着重要的申请-响应属性、申请实例和响应实例等等,有点像Context的角色,其中有两个重要的接口办法:
// 获取ServerHttpRequest对象 ServerHttpRequest getRequest(); // 获取ServerHttpResponse对象 ServerHttpResponse getResponse();
创立一个GatewayFilter,必须实现Ordered接口,返回一个小于-1的order值,这是因为NettyWriteResponseFilter的order值为-1,咱们须要笼罩返回响应体的逻辑,自定义的GlobalFilter必须比NettyWriteResponseFilter优先执行。
public class RequestGatewayFilter implements GatewayFilter, Ordered {@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); HttpHeaders headers = request.getHeaders(); // 解决参数 MediaType contentType = headers.getContentType(); if (exchange.getRequest().getMethod().equals(HttpMethod.POST)) { //Content-type为“application/json” if (MediaType.APPLICATION_JSON.isCompatibleWith(contentType)) { return readBody(exchange, chain); } //Content-type为“application/x-www-form-urlencoded” else if(MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(contentType)){ GatewayContext gatewayContext = new GatewayContext(); gatewayContext.setRequestHeaders(headers); gatewayContext.getAllRequestData().addAll(request.getQueryParams()); exchange.getAttributes().put(GatewayContext.CACHE_GATEWAY_CONTEXT,gatewayContext); return readFormData(exchange, chain, gatewayContext); }//Content-type为“multipart/form-data”else if (MediaType.MULTIPART_FORM_DATA.isCompatibleWith(contentType)) { GatewayContext gatewayContext = new GatewayContext(); gatewayContext.setRequestHeaders(headers); gatewayContext.getAllRequestData().addAll(request.getQueryParams()); exchange.getAttributes().put(GatewayContext.CACHE_GATEWAY_CONTEXT,gatewayContext); return readMultipartData(exchange, chain, gatewayContext); } } else { return readGetData(exchange, chain); } return chain.filter(exchange); } @Override public int getOrder() { return -2; }}
解决content-type为application/json的办法:
/** * ReadJsonBody * * @param exchange * @param chain * @return */ private Mono<Void> readBody(ServerWebExchange exchange, GatewayFilterChain chain) { /** * join the body */ return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> { byte[] bytes = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(bytes); DataBufferUtils.release(dataBuffer); /** * validate request params or form data */ Result checkResult = null; try { String bodyString = new String(bytes, "utf-8"); Map bodyMap = JSONObject.parseObject(bodyString,Map.class); //校验参数 checkResult = validParam(exchange, bodyMap); if(checkResult.getCode()!=0){ return errorInfo(exchange, checkResult.getCode(), checkResult.getMessage()); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } Flux<DataBuffer> cachedFlux = Flux.defer(() -> { DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes); DataBufferUtils.retain(buffer); return Mono.just(buffer); }); /** * repackage ServerHttpRequest */ ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) { @Override public Flux<DataBuffer> getBody() { return cachedFlux; } }; ServerHttpResponse originalResponse = exchange.getResponse(); originalResponse.getHeaders().setContentType(MediaType.APPLICATION_JSON); DataBufferFactory bufferFactory = originalResponse.bufferFactory(); ServerHttpResponseDecorator response = buildResponse(originalResponse, bufferFactory, (Map)checkResult.getResult()); /** * mutate exchage with new ServerHttpRequest */ ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).response(response).build(); /** * read body string with default messageReaders */ return ServerRequest.create(mutatedExchange, MESSAGE_READERS).bodyToMono(String.class) .doOnNext(objectValue -> { log.debug("[GatewayContext]Read JsonBody:{}", objectValue); }).then(chain.filter(mutatedExchange)); }); }
解决content-type为application/x-www-form-urlencoded的办法:
/** * ReadFormData * @param exchange * @param chain * @return */ private Mono<Void> readFormData(ServerWebExchange exchange, GatewayFilterChain chain, GatewayContext gatewayContext){ HttpHeaders headers = exchange.getRequest().getHeaders(); return exchange.getFormData().switchIfEmpty( Mono.defer(() -> Mono.just(new LinkedMultiValueMap<>())) ).flatMap(formDataMap -> { Charset charset = headers.getContentType().getCharset(); charset = charset == null? StandardCharsets.UTF_8:charset; String charsetName = charset.name(); MultiValueMap<String, String> paramMap = exchange.getRequest().getQueryParams(); Map map = convertMap(paramMap); /* * formData is empty just return */ if((null == formDataMap || formDataMap.isEmpty()) && (null == map || map.isEmpty())){ return chain.filter(exchange); } StringBuilder formDataBodyBuilder = new StringBuilder(); String entryKey; List<String> entryValue; try { /* * repackage form data */ for (Map.Entry<String, List<String>> entry : formDataMap.entrySet()) { entryKey = entry.getKey(); entryValue = entry.getValue(); if (entryValue.size() > 1) { for(String value : entryValue){ formDataBodyBuilder.append(entryKey).append("=").append(value).append("&"); } } else { formDataBodyBuilder.append(entryKey).append("=").append(entryValue.get(0)).append("&"); } } }catch (Exception e){ e.printStackTrace(); } /* * substring with the last char '&' */ String formDataBodyString = ""; if(formDataBodyBuilder.length()>0){ formDataBodyString = formDataBodyBuilder.substring(0, formDataBodyBuilder.length() - 1); } /* * get data bytes */ byte[] bodyBytes = formDataBodyString.getBytes(charset); int contentLength = bodyBytes.length; HttpHeaders httpHeaders = new HttpHeaders(); httpHeaders.putAll(exchange.getRequest().getHeaders()); httpHeaders.remove(HttpHeaders.CONTENT_LENGTH); /* * in case of content-length not matched */ httpHeaders.setContentLength(contentLength); /** * validate request params or form data */ Map<String, Object> bodyMap = StringUtils.isEmpty(formDataBodyString)?new HashMap<String, Object>():decodeFormBody(formDataBodyString); Result checkResult = validParam(exchange, bodyMap); if(checkResult.getCode()!=0){ return errorInfo(exchange, checkResult.getCode(), checkResult.getMessage()); } /* * use BodyInserter to InsertFormData Body */ BodyInserter<String, ReactiveHttpOutputMessage> bodyInserter = BodyInserters.fromObject(formDataBodyString); CachedBodyOutputMessage cachedBodyOutputMessage = new CachedBodyOutputMessage(exchange, httpHeaders); log.debug("[GatewayContext]Rewrite Form Data :{}",formDataBodyString); return bodyInserter.insert(cachedBodyOutputMessage, new BodyInserterContext()) .then(Mono.defer(() -> { ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator( exchange.getRequest()) { @Override public HttpHeaders getHeaders() { return httpHeaders; } @Override public Flux<DataBuffer> getBody() { return cachedBodyOutputMessage.getBody(); } }; ServerHttpResponse originalResponse = exchange.getResponse(); originalResponse.getHeaders().setContentType(MediaType.APPLICATION_JSON); DataBufferFactory bufferFactory = originalResponse.bufferFactory(); ServerHttpResponseDecorator response = buildResponse(originalResponse, bufferFactory, (Map)checkResult.getResult()); return chain.filter(exchange.mutate().request(decorator).response(response).build()); })); }); }
有时须要对返回的数据对立解决,那么能够通过封装ServerHttpResponseDecorator进行解决,ServerHttpResponse装璜器ServerHttpResponseDecorator,次要笼罩写入响应体数据缓冲区的局部。
private ServerHttpResponseDecorator buildResponse(ServerHttpResponse originalResponse, DataBufferFactory bufferFactory, Map result) { return new ServerHttpResponseDecorator(originalResponse) { @Override public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) { System.out.println("++++++++++++++++++++++++1"); if (getStatusCode().equals(HttpStatus.OK) && body instanceof Flux) { Flux<? extends DataBuffer> fluxBody = Flux.from(body); return super.writeWith(fluxBody.buffer().map(dataBuffers -> { DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory(); DataBuffer join = dataBufferFactory.join(dataBuffers); byte[] content = new byte[join.readableByteCount()]; join.read(content); DataBufferUtils.release(join); // 流转为字符串 String responseData = new String(content, Charsets.UTF_8); System.out.println(responseData); Map map = JSON.parseObject(responseData); //解决返回的数据 //To do byte[] uppedContent = responseData.getBytes(Charsets.UTF_8); originalResponse.getHeaders().setContentLength(uppedContent.length); return bufferFactory.wrap(uppedContent); })); } else { System.out.println("----------"+getStatusCode()); } return super.writeWith(body); } @Override public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) { return writeWith(Flux.from(body).flatMapSequential(p -> p)); } }; }