WebFlux是Spring 5提供的响应式Web利用框架。
它是齐全非阻塞的,能够在Netty,Undertow和Servlet 3.1+等非阻塞服务器上运行。
本文次要介绍WebFlux的应用。
FluxWeb vs noFluxWeb
WebFlux是齐全非阻塞的。
在FluxWeb前,咱们能够应用DeferredResult和AsyncRestTemplate等形式实现非阻塞的Web通信。
咱们先来比拟一下这两者。
留神:对于同步阻塞与异步非阻塞的性能差别,本文不再论述。
阻塞即节约。咱们通过异步实现非阻塞。只有存在阻塞时,异步能力进步性能。如果不存在阻塞,应用异步反而可能因为线程调度等开销导致性能降落。
上面例子模仿一种业务场景。
订单服务提供接口查找订单信息,同时,该接口实现还须要调用仓库服务查问仓库信息,商品服务查问商品信息,并过滤,取前5个商品数据。
OrderService提供如下办法
public void getOrderByRest(DeferredResult<Order> rs, long orderId) {
// [1]
Order order = mockOrder(orderId);
// [2]
ListenableFuture<ResponseEntity<User>> userLister = asyncRestTemplate.getForEntity("http://user-service/user/mock/" + 1, User.class);
ListenableFuture<ResponseEntity<List<Goods>>> goodsLister =
asyncRestTemplate.exchange("http://goods-service/goods/mock/list?ids=" + StringUtils.join(order.getGoodsIds(), ","),
HttpMethod.GET, null, new ParameterizedTypeReference<List<Goods>>(){});
// [3]
CompletableFuture<ResponseEntity<User>> userFuture = userLister.completable().exceptionally(err -> {
logger.warn("get user err", err);
return new ResponseEntity(new User(), HttpStatus.OK);
});
CompletableFuture<ResponseEntity<List<Goods>>> goodsFuture = goodsLister.completable().exceptionally(err -> {
logger.warn("get goods err", err);
return new ResponseEntity(new ArrayList<>(), HttpStatus.OK);
});
// [4]
warehouseFuture.thenCombineAsync(goodsFuture, (warehouseRes, goodsRes)-> {
order.setWarehouse(warehouseRes.getBody());
List<Goods> goods = goodsRes.getBody().stream()
.filter(g -> g.getPrice() > 10).limit(5)
.collect(Collectors.toList());
order.setGoods(goods);
return order;
}).whenCompleteAsync((o, err)-> {
// [5]
if(err != null) {
logger.warn("err happen:", err);
}
rs.setResult(o);
});
}
- 加载订单数据,这里mack了一个数据。
- 通过asyncRestTemplate获取仓库,产品信息,失去ListenableFuture。
- 设置ListenableFuture异样解决,防止因为某个申请报错导致接口失败。
- 合并仓库,产品申请后果,组装订单数据
- 通过DeferredResult设置接口返回数据。
能够看到,代码较繁琐,通过DeferredResult返回数据的形式也与咱们同步接口通过办法返回值返回数据的形式天壤之别。
这里理论存在两处非阻塞
- 应用AsyncRestTemplate实现发送异步Http申请,也就是说通过其余线程调用仓库服务和产品服务,并返回CompletableFuture,所以不阻塞getOrderByRest办法线程。
- DeferredResult负责异步返回Http响应。
getOrderByRest办法中并不阻塞期待AsyncRestTemplate返回,而是间接返回,等到AsyncRestTemplate返回后通过回调函数设置DeferredResult的值将数据返回给Http,可比照以下阻塞期待的代码
ResponseEntity<Warehouse> warehouseRes = warehouseFuture.get();
ResponseEntity<List<Goods>> goodsRes = goodsFuture.get();
order.setWarehouse(warehouseRes.getBody());
order.setGoods(goodsRes.getBody());
return order;
上面咱们应用WebFlux实现。
pom引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
服务启动类OrderServiceReactive
@EnableDiscoveryClient
@SpringBootApplication
public class OrderServiceReactive
{
public static void main( String[] args )
{
new SpringApplicationBuilder(
OrderServiceReactive.class)
.web(WebApplicationType.REACTIVE).run(args);
}
}
WebApplicationType.REACTIVE启动WebFlux。
OrderController实现如下
@GetMapping("/{id}")
public Mono<Order> getById(@PathVariable long id) {
return service.getOrder(id);
}
留神返回一个Mono数据,Mono与Flux是Spring Reactor提供的异步数据流。
WebFlux中通常应用Mono,Flux作为数据输出,输入值。
当接口返回Mono,Flux,Spring晓得这是一个异步申请后果。
对于Spring Reactor,可参考了解Reactor的设计与实现
OrderService实现如下
public Mono<Order> getOrder(long orderId) {
// [1]
Mono<Order> orderMono = mockOrder(orderId);
// [2]
return orderMono.flatMap(o -> {
// [3]
Mono<User> userMono = getMono("http://user-service/user/mock/" + o.getUserId(), User.class).onErrorReturn(new User());
Flux<Goods> goodsFlux = getFlux("http://goods-service/goods/mock/list?ids=" +
StringUtils.join(o.getGoodsIds(), ","), Goods.class)
.filter(g -> g.getPrice() > 10)
.take(5)
.onErrorReturn(new Goods());
// [4]
return userMono.zipWith(goodsFlux.collectList(), (u, gs) -> {
o.setUser(u);
o.setGoods(gs);
return o;
});
});
}
private <T> Mono<T> getMono(String url, Class<T> resType) {
return webClient.get().uri(url).retrieve().bodyToMono(resType);
}
// getFlux
- 加载订单数据,这里mock了一个Mono数据
- flatMap办法能够将Mono中的数据转化类型,这里转化后的后果还是Order。
- 获取仓库,产品数据。这里能够看到,对产品过滤,取前5个的操作能够间接增加到Flux<Goods>上。
- zipWith办法能够组合两个Mono,并返回新的Mono类型,这里组合仓库、产品数据,最初返回Mono<Order>。
能够看到,代码整洁不少,并且接口返回Mono<Order>,与咱们在同步接口中间接数据的做法相似,不须要借助DeferredResult这样的工具类。
咱们通过WebClient发动异步申请,WebClient返回Mono后果,尽管它并不是真正的数据(它是一个数据发布者,等申请数据返回后,它才把数据送过来),但咱们能够通过操作符办法对他增加逻辑,如过滤,排序,组合,就如同同步操作时曾经拿到数据那样。
而在AsyncRestTemplate,则所有的逻辑都要写到回调函数中。
WebFlux是齐全非阻塞的。
Mono、Flux的组合函数十分有用。
下面办法中先获取订单数据,再同时获取仓库,产品数据,
如果接口参数同时传入了订单id,仓库id,产品id,咱们也能够同时获取这三个数据,再组装起来
public Mono<Order> getOrder(long orderId, long warehouseId, List<Long> goodsIds) {
Mono<Order> orderMono = mockOrderMono(orderId);
return orderMono.zipWith(getMono("http://warehouse-service/warehouse/mock/" + warehouseId, Warehouse.class), (o,w) -> {
o.setWarehouse(w);
return o;
}).zipWith(getFlux("http://goods-service/goods/mock/list?ids=" +
StringUtils.join(goodsIds, ","), Goods.class)
.filter(g -> g.getPrice() > 10).take(5).collectList(), (o, gs) -> {
o.setGoods(gs);
return o;
});
}
如果咱们须要串行获取订单,仓库,商品这三个数据,实现如下
public Mono<Order> getOrderInLabel(long orderId) {
Mono<Order> orderMono = mockOrderMono(orderId);
return orderMono.zipWhen(o -> getMono("http://warehouse-service/warehouse/mock/" + o.getWarehouseId(), Warehouse.class), (o, w) -> {
o.setWarehouse(w);
return o;
}).zipWhen(o -> getFlux("http://goods-service/goods/mock/list?ids=" +
StringUtils.join(o.getGoodsIds(), ",") + "&label=" + o.getWarehouse().getLabel() , Goods.class)
.filter(g -> g.getPrice() > 10).take(5).collectList(), (o, gs) -> {
o.setGoods(gs);
return o;
});
}
zipWith办法会同时申请待合并的两个Mono数据,而zipWhen办法则会阻塞期待第一个Mono数据达到在申请第二个Mono数据。orderMono.zipWhen(...).zipWhen(...)
第一个zipWhen办法会阻塞期待orderMono数据返回再应用order数据结构新的Mono数据,第二个zipWhen办法也会期待后面zipWhen构建的Mono数据返回再构建新Mono,
所以在第二个zipWhen办法中,能够调用o.getWarehouse().getLabel(),因为第一个zipWhen曾经获取到仓库信息。
上面说一个WebFlux的应用。
分为两局部,WebFlux服务端与WebClient。
WebFlux服务端
底层容器切换
WebFlux默认应用Netty实现服务端异步通信,能够通过更换依赖包切换底层容器
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
注解
WebFlux反对SpringMvc大部分的注解,如
映射:@Controller,@GetMapping,@PostMapping,@PutMapping,@DeleteMapping
参数绑定:@PatchMapping,@RequestParam,@RequestBody,@RequestHeader,@PathVariable,@RequestAttribute,@SessionAttribute
后果解析:@ResponseBody,@ModelAttribute
这些注解的应用形式与springMvc雷同
命令式映射
WebFlux反对应用命令式编程指定映射关系
@Bean
public RouterFunction<ServerResponse> monoRouterFunction(InvoiceHandler invoiceHandler) {
return route()
.GET("/invoice/{orderId}", accept(APPLICATION_JSON), invoiceHandler::get)
.build();
}
调用”/invoice/{orderId}”,申请会转发到invoiceHandler#get办法
invoiceHandler#get办法实现如下
public Mono<ServerResponse> get(ServerRequest request) {
Invoice invoice = new Invoice();
invoice.setId(999L);
invoice.setOrderId(Long.parseLong(request.pathVariable("orderId")));
return ok().contentType(APPLICATION_JSON).body(Mono.just(invoice), Warehouse.class);
}
Filter
能够通过实现WebFilter接口增加过滤器
@Component
public class TokenCheckFilter implements WebFilter {
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
if(!exchange.getRequest().getHeaders().containsKey("token")) {
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.FORBIDDEN);
response.getHeaders().setContentType(MediaType.APPLICATION_JSON);
return response.writeWith(Mono.just(response.bufferFactory().wrap("{\"msg\":\"no token\"}".getBytes())));
} else {
exchange.getAttributes().put("auth", "true");
return chain.filter(exchange);
}
}
}
下面实现的是前置过滤器,在调用逻辑办法前的查看申请token
实现后置过滤器代码如下
@Component
public class LogFilter implements WebFilter {
private static final Logger logger = LoggerFactory.getLogger(LogFilter.class);
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
// [1]
logger.info("request before, url:{}, statusCode:{}", exchange.getRequest().getURI(), exchange.getResponse().getStatusCode());
return chain.filter(exchange)
.doFinally(s -> {
// [2]
logger.info("request after, url:{}, statusCode:{}", exchange.getRequest().getURI(), exchange.getResponse().getStatusCode());
});
}
}
留神,[1]
处exchange.getResponse()返回的是初始化状态的response,并不是申请解决后返回的response。
异样解决
通过@ExceptionHandler注解定义一个全局的异样处理器
@ControllerAdvice
public class ErrorController {
private static final Logger logger = LoggerFactory.getLogger(ErrorController.class);
@ResponseBody
@ExceptionHandler({NullPointerException.class})
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public String nullException(NullPointerException e) {
logger.error("global err handler", e);
return "{\"msg\":\"There is a problem\"}";
}
}
WebFluxConfigurer
WebFlux中能够通过WebFluxConfigurer做自定义配置,如配置自定义的后果解析
@Configuration
@EnableWebFlux
public class WebConfig implements WebFluxConfigurer {
public void configureArgumentResolvers(ArgumentResolverConfigurer configurer) {
configurer.addCustomResolver(new HandlerMethodArgumentResolver() {
...
});
}
public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
configurer.customCodecs().register(new HttpMessageWriter() {
...
});
}
}
configureArgumentResolvers办法配置参数绑定处理器
configureHttpMessageCodecs办法配置Http申请报文,响应报文解析器
@EnableWebFlux要求Spring从WebFluxConfigurationSupport引入Spring WebFlux 配置。如果你的依赖中引入了spring-boot-starter-webflux,Spring WebFlux 将主动配置,不须要增加该注解。
但如果你只应用Spring WebFlux而没有应用Spring Boot,这是须要增加@EnableWebFlux启动Spring WebFlux自动化配置。
Spring Flux反对CORS,Spring Security,HTTP/2,更多内容不再列出,请参考官网文档。
WebClient
WebClient能够发送异步Web申请,并反对响应式编程。
上面说一个WebClient的应用。
底层框架
WebClient底层应用的Netty实现异步Http申请,咱们能够切换底层库,如Jetty
@Bean
public JettyResourceFactory resourceFactory() {
return new JettyResourceFactory();
}
@Bean
public WebClient webClient() {
HttpClient httpClient = HttpClient.create();
ClientHttpConnector connector =
new JettyClientHttpConnector(httpClient, resourceFactory());
return WebClient.builder().clientConnector(connector).build();
}
连接池
WebClient默认是每个申请创立一个连贯。
咱们能够配置连接池复用连贯,以进步性能。
ConnectionProvider provider = ConnectionProvider.builder("order")
.maxConnections(100)
.maxIdleTime(Duration.ofSeconds(30))
.pendingAcquireTimeout(Duration.ofMillis(100))
.build();
return WebClient
.builder().clientConnector(new ReactorClientHttpConnector(HttpClient.create(provider)));
maxConnections:容许的最大连接数
pendingAcquireTimeout:没有连贯可用时,申请期待的最长工夫
maxIdleTime:连贯最大闲置工夫
超时
底层应用Netty时,能够如下配置超时工夫
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
HttpClient httpClient = HttpClient.create()
.doOnConnected(conn -> conn
.addHandlerLast(new ReadTimeoutHandler(10))
.addHandlerLast(new WriteTimeoutHandler(10)));
或者间接应用responseTimeout
HttpClient httpClient = HttpClient.create()
.responseTimeout(Duration.ofSeconds(2));
Post Json
WebClient能够发送json,form,文件等申请报文,
看一个最罕用的Post Json申请
webClient.post().uri("http://localhost:9004/order/")
.contentType(MediaType.APPLICATION_JSON)
.body(Mono.just(order), Order.class)
.retrieve().bodyToMono(String.class)
异样解决
能够在ResponseSpec中指定异样解决
private <T> Mono<T> getMono(String url, Class<T> resType) {
return webClient
.get().uri(url).retrieve()
.onStatus(HttpStatus::is5xxServerError, clientResponse -> {
return Mono.error(...);
})
.onStatus(HttpStatus::is4xxClientError, clientResponse -> {
return Mono.error(...);
})
.onStatus(HttpStatus::isError, clientResponse -> {
return Mono.error(...);
})
.bodyToMono(resType)
}
也能够在HttpClient上配置
HttpClient httpClient = HttpClient.create()
.doOnError((req, err) -> {
log.error("err on request:{}", req.uri(), err);
}, (res, err) -> {
log.error("err on response:{}", res.uri(), err);
})
同步返回后果
应用block办法能够阻塞线程,期待申请返回
private <T> T syncGetMono(String url, Class<T> resType) {
return webClient
.get().uri(url).retrieve()
.bodyToMono(resType).block();
}
获取响应信息
exchangeToMono能够获取到响应的header,statusCode等信息
private <T> Mono<T> getMonoWithInfo(String url, Class<T> resType) {
return webClient
.get()
.uri(url)
.exchangeToMono(response -> {
logger.info("request url:{},statusCode:{},headers:{}", url, response.statusCode(), response.headers());
return response.bodyToMono(resType);
});
}
注册核心与Ribbon
教训证,WebClient反对Eureka注册核心与Ribbon转发,应用形式与restTemplate雷同。
不过@LoadBalanced须要增加在WebClient.Builder上
@Bean
@LoadBalanced
public WebClient.Builder loadBalancedWebClientBuilder() {
return WebClient.builder();
}
官网文档:https://docs.spring.io/spring…
文章残缺代码:https://gitee.com/binecy/bin-…
理论我的项目中,线程阻塞场景往往不只有Http申请阻塞,还有Mysql申请,Redis申请,Kafka申请等等导致的阻塞。从这些数据源中获取数据时,大多数都是阻塞直到数据源返回数据。
而Reactive Spring弱小在于,它也反对这些数据源的非阻塞响应式编程。
下一篇文章,咱们来看一个如何实现Redis的非阻塞响应式编程。
如果您感觉本文不错,欢送关注我的微信公众号,系列文章继续更新中。您的关注是我保持的能源!
发表回复