关于springboot:Reactive-Spring实战-WebFlux使用教程

56次阅读

共计 11842 个字符,预计需要花费 30 分钟才能阅读完成。

WebFlux 是 Spring 5 提供的响应式 Web 利用框架。
它是齐全非阻塞的,能够在 Netty,Undertow 和 Servlet 3.1+ 等非阻塞服务器上运行。
本文次要介绍 WebFlux 的应用。

FluxWeb vs noFluxWeb

WebFlux 是齐全非阻塞的。
在 FluxWeb 前,咱们能够应用 DeferredResult 和 AsyncRestTemplate 等形式实现非阻塞的 Web 通信。
咱们先来比拟一下这两者。

留神:对于同步阻塞与异步非阻塞的性能差别,本文不再论述。
阻塞即节约。咱们通过异步实现非阻塞。只有存在阻塞时,异步能力进步性能。如果不存在阻塞,应用异步反而可能因为线程调度等开销导致性能降落。

上面例子模仿一种业务场景。
订单服务提供接口查找订单信息,同时,该接口实现还须要调用仓库服务查问仓库信息,商品服务查问商品信息,并过滤,取前 5 个商品数据。

OrderService 提供如下办法

public void getOrderByRest(DeferredResult<Order> rs, long orderId) {// [1]
    Order order = mockOrder(orderId);
    // [2]
    ListenableFuture<ResponseEntity<User>> userLister = asyncRestTemplate.getForEntity("http://user-service/user/mock/" + 1, User.class);
    ListenableFuture<ResponseEntity<List<Goods>>> goodsLister =
                    asyncRestTemplate.exchange("http://goods-service/goods/mock/list?ids=" + StringUtils.join(order.getGoodsIds(), ","),
                            HttpMethod.GET,  null, new ParameterizedTypeReference<List<Goods>>(){});
    // [3]
    CompletableFuture<ResponseEntity<User>> userFuture = userLister.completable().exceptionally(err -> {logger.warn("get user err", err);
        return new ResponseEntity(new User(), HttpStatus.OK);
    });
    CompletableFuture<ResponseEntity<List<Goods>>> goodsFuture = goodsLister.completable().exceptionally(err -> {logger.warn("get goods err", err);
        return new ResponseEntity(new ArrayList<>(), HttpStatus.OK);
    });
    // [4]
    warehouseFuture.thenCombineAsync(goodsFuture, (warehouseRes, goodsRes)-> {order.setWarehouse(warehouseRes.getBody());
            List<Goods> goods = goodsRes.getBody().stream()
                    .filter(g -> g.getPrice() > 10).limit(5)
                    .collect(Collectors.toList());
            order.setGoods(goods);
        return order;
    }).whenCompleteAsync((o, err)-> {// [5]
        if(err != null) {logger.warn("err happen:", err);
        }
        rs.setResult(o);
    });
}
  1. 加载订单数据,这里 mack 了一个数据。
  2. 通过 asyncRestTemplate 获取仓库,产品信息,失去 ListenableFuture。
  3. 设置 ListenableFuture 异样解决,防止因为某个申请报错导致接口失败。
  4. 合并仓库,产品申请后果,组装订单数据
  5. 通过 DeferredResult 设置接口返回数据。

能够看到,代码较繁琐,通过 DeferredResult 返回数据的形式也与咱们同步接口通过办法返回值返回数据的形式天壤之别。

这里理论存在两处非阻塞

  1. 应用 AsyncRestTemplate 实现发送异步 Http 申请,也就是说通过其余线程调用仓库服务和产品服务,并返回 CompletableFuture,所以不阻塞 getOrderByRest 办法线程。
  2. DeferredResult 负责异步返回 Http 响应。

getOrderByRest 办法中并不阻塞期待 AsyncRestTemplate 返回,而是间接返回,等到 AsyncRestTemplate 返回后通过回调函数设置 DeferredResult 的值将数据返回给 Http,可比照以下阻塞期待的代码

ResponseEntity<Warehouse> warehouseRes = warehouseFuture.get();
ResponseEntity<List<Goods>> goodsRes = goodsFuture.get();
order.setWarehouse(warehouseRes.getBody());
order.setGoods(goodsRes.getBody());
return order;

上面咱们应用 WebFlux 实现。
pom 引入依赖

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

服务启动类 OrderServiceReactive

@EnableDiscoveryClient
@SpringBootApplication
public class OrderServiceReactive
{public static void main( String[] args )
    {
        new SpringApplicationBuilder(OrderServiceReactive.class)
                .web(WebApplicationType.REACTIVE).run(args);
    }
}

WebApplicationType.REACTIVE 启动 WebFlux。

OrderController 实现如下

@GetMapping("/{id}")
public Mono<Order> getById(@PathVariable long id) {return service.getOrder(id);
}

留神返回一个 Mono 数据,Mono 与 Flux 是 Spring Reactor 提供的异步数据流。
WebFlux 中通常应用 Mono,Flux 作为数据输出,输入值。
当接口返回 Mono,Flux,Spring 晓得这是一个异步申请后果。
对于 Spring Reactor,可参考了解 Reactor 的设计与实现

OrderService 实现如下

public Mono<Order> getOrder(long orderId) {// [1]
    Mono<Order> orderMono = mockOrder(orderId);
    // [2]
    return orderMono.flatMap(o -> {// [3]
        Mono<User> userMono =  getMono("http://user-service/user/mock/" + o.getUserId(), User.class).onErrorReturn(new User());
        Flux<Goods> goodsFlux = getFlux("http://goods-service/goods/mock/list?ids=" +
                StringUtils.join(o.getGoodsIds(), ","), Goods.class)
                .filter(g -> g.getPrice() > 10)
                .take(5)
                .onErrorReturn(new Goods());
        // [4]
        return userMono.zipWith(goodsFlux.collectList(), (u, gs) -> {o.setUser(u);
            o.setGoods(gs);
            return o;
        });
    });
}

private <T> Mono<T> getMono(String url, Class<T> resType) {return webClient.get().uri(url).retrieve().bodyToMono(resType);
}

// getFlux
  1. 加载订单数据,这里 mock 了一个 Mono 数据
  2. flatMap 办法能够将 Mono 中的数据转化类型,这里转化后的后果还是 Order。
  3. 获取仓库,产品数据。这里能够看到,对产品过滤,取前 5 个的操作能够间接增加到 Flux<Goods> 上。
  4. zipWith 办法能够组合两个 Mono,并返回新的 Mono 类型,这里组合仓库、产品数据,最初返回 Mono<Order>。

能够看到,代码整洁不少,并且接口返回 Mono<Order>,与咱们在同步接口中间接数据的做法相似,不须要借助 DeferredResult 这样的工具类。
咱们通过 WebClient 发动异步申请,WebClient 返回 Mono 后果,尽管它并不是真正的数据(它是一个数据发布者,等申请数据返回后,它才把数据送过来),但咱们能够通过操作符办法对他增加逻辑,如过滤,排序,组合,就如同同步操作时曾经拿到数据那样。
而在 AsyncRestTemplate,则所有的逻辑都要写到回调函数中。

WebFlux 是齐全非阻塞的。
Mono、Flux 的组合函数十分有用。
下面办法中先获取订单数据,再同时获取仓库,产品数据,
如果接口参数同时传入了订单 id,仓库 id,产品 id,咱们也能够同时获取这三个数据,再组装起来

public Mono<Order> getOrder(long orderId, long warehouseId, List<Long> goodsIds) {Mono<Order> orderMono = mockOrderMono(orderId);

    return orderMono.zipWith(getMono("http://warehouse-service/warehouse/mock/" + warehouseId, Warehouse.class), (o,w) -> {o.setWarehouse(w);
        return o;
    }).zipWith(getFlux("http://goods-service/goods/mock/list?ids=" +
            StringUtils.join(goodsIds, ","), Goods.class)
            .filter(g -> g.getPrice() > 10).take(5).collectList(), (o, gs) -> {o.setGoods(gs);
        return o;
    });
}

如果咱们须要串行获取订单,仓库,商品这三个数据,实现如下

public Mono<Order> getOrderInLabel(long orderId) {Mono<Order> orderMono = mockOrderMono(orderId);

    return orderMono.zipWhen(o -> getMono("http://warehouse-service/warehouse/mock/" + o.getWarehouseId(), Warehouse.class), (o, w) -> {o.setWarehouse(w);
        return o;
    }).zipWhen(o -> getFlux("http://goods-service/goods/mock/list?ids=" +
                    StringUtils.join(o.getGoodsIds(), ",") + "&label=" + o.getWarehouse().getLabel() , Goods.class)
            .filter(g -> g.getPrice() > 10).take(5).collectList(), (o, gs) -> {o.setGoods(gs);
        return o;
    });
}

zipWith 办法会同时申请待合并的两个 Mono 数据,而 zipWhen 办法则会阻塞期待第一个 Mono 数据达到在申请第二个 Mono 数据。
orderMono.zipWhen(...).zipWhen(...)
第一个 zipWhen 办法会阻塞期待 orderMono 数据返回再应用 order 数据结构新的 Mono 数据,第二个 zipWhen 办法也会期待后面 zipWhen 构建的 Mono 数据返回再构建新 Mono,
所以在第二个 zipWhen 办法中,能够调用 o.getWarehouse().getLabel(),因为第一个 zipWhen 曾经获取到仓库信息。

上面说一个 WebFlux 的应用。
分为两局部,WebFlux 服务端与 WebClient。

WebFlux 服务端

底层容器切换

WebFlux 默认应用 Netty 实现服务端异步通信,能够通过更换依赖包切换底层容器

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <exclusions>
    <exclusion>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-netty</artifactId>
    </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>

注解

WebFlux 反对 SpringMvc 大部分的注解,如
映射:@Controller,@GetMapping,@PostMapping,@PutMapping,@DeleteMapping
参数绑定:@PatchMapping,@RequestParam,@RequestBody,@RequestHeader,@PathVariable,@RequestAttribute,@SessionAttribute
后果解析:@ResponseBody,@ModelAttribute
这些注解的应用形式与 springMvc 雷同

命令式映射

WebFlux 反对应用命令式编程指定映射关系

@Bean
public RouterFunction<ServerResponse> monoRouterFunction(InvoiceHandler invoiceHandler) {return route()
            .GET("/invoice/{orderId}",  accept(APPLICATION_JSON), invoiceHandler::get)
            .build();}

调用 ”/invoice/{orderId}”,申请会转发到 invoiceHandler#get 办法

invoiceHandler#get 办法实现如下

public Mono<ServerResponse> get(ServerRequest request) {Invoice invoice = new Invoice();
    invoice.setId(999L);
    invoice.setOrderId(Long.parseLong(request.pathVariable("orderId")));
    return ok().contentType(APPLICATION_JSON).body(Mono.just(invoice), Warehouse.class);
}

Filter

能够通过实现 WebFilter 接口增加过滤器

@Component
public class TokenCheckFilter implements WebFilter {public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {if(!exchange.getRequest().getHeaders().containsKey("token")) {ServerHttpResponse response =  exchange.getResponse();
            response.setStatusCode(HttpStatus.FORBIDDEN);
            response.getHeaders().setContentType(MediaType.APPLICATION_JSON);
            return response.writeWith(Mono.just(response.bufferFactory().wrap("{\"msg\":\"no token\"}".getBytes())));
        } else {exchange.getAttributes().put("auth", "true");
            return chain.filter(exchange);
        }
    }
}

下面实现的是前置过滤器,在调用逻辑办法前的查看申请 token

实现后置过滤器代码如下

@Component
public class LogFilter  implements WebFilter {private static final Logger logger = LoggerFactory.getLogger(LogFilter.class);
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {// [1]
        logger.info("request before, url:{}, statusCode:{}", exchange.getRequest().getURI(), exchange.getResponse().getStatusCode());
        return chain.filter(exchange)
            .doFinally(s -> {// [2]
                logger.info("request after, url:{}, statusCode:{}", exchange.getRequest().getURI(), exchange.getResponse().getStatusCode());
            });
    }
}

留神,[1]处 exchange.getResponse()返回的是初始化状态的 response,并不是申请解决后返回的 response。

异样解决

通过 @ExceptionHandler 注解定义一个全局的异样处理器

@ControllerAdvice
public class ErrorController {private static final Logger logger = LoggerFactory.getLogger(ErrorController.class);

    @ResponseBody
    @ExceptionHandler({NullPointerException.class})
    @ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
    public String nullException(NullPointerException e) {logger.error("global err handler", e);
        return "{\"msg\":\"There is a problem\"}";
    }
}

WebFluxConfigurer

WebFlux 中能够通过 WebFluxConfigurer 做自定义配置,如配置自定义的后果解析

@Configuration
@EnableWebFlux
public class WebConfig implements WebFluxConfigurer {public void configureArgumentResolvers(ArgumentResolverConfigurer configurer) {configurer.addCustomResolver(new HandlerMethodArgumentResolver() {...});
    }

    public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {configurer.customCodecs().register(new HttpMessageWriter() {...});
    }
}

configureArgumentResolvers 办法配置参数绑定处理器
configureHttpMessageCodecs 办法配置 Http 申请报文,响应报文解析器

@EnableWebFlux 要求 Spring 从 WebFluxConfigurationSupport 引入 Spring WebFlux 配置。如果你的依赖中引入了 spring-boot-starter-webflux,Spring WebFlux 将主动配置,不须要增加该注解。
但如果你只应用 Spring WebFlux 而没有应用 Spring Boot,这是须要增加 @EnableWebFlux 启动 Spring WebFlux 自动化配置。

Spring Flux 反对 CORS,Spring Security,HTTP/2,更多内容不再列出,请参考官网文档。

WebClient

WebClient 能够发送异步 Web 申请,并反对响应式编程。
上面说一个 WebClient 的应用。

底层框架

WebClient 底层应用的 Netty 实现异步 Http 申请,咱们能够切换底层库,如 Jetty

@Bean
public JettyResourceFactory resourceFactory() {return new JettyResourceFactory();
}

@Bean
public WebClient webClient() {HttpClient httpClient = HttpClient.create();
    ClientHttpConnector connector =
            new JettyClientHttpConnector(httpClient, resourceFactory());
    return WebClient.builder().clientConnector(connector).build();}

连接池

WebClient 默认是每个申请创立一个连贯。
咱们能够配置连接池复用连贯,以进步性能。

ConnectionProvider provider = ConnectionProvider.builder("order")
    .maxConnections(100)
    .maxIdleTime(Duration.ofSeconds(30))
    .pendingAcquireTimeout(Duration.ofMillis(100))  
    .build();
return WebClient
    .builder().clientConnector(new ReactorClientHttpConnector(HttpClient.create(provider)));

maxConnections:容许的最大连接数
pendingAcquireTimeout:没有连贯可用时,申请期待的最长工夫
maxIdleTime:连贯最大闲置工夫

超时

底层应用 Netty 时,能够如下配置超时工夫

import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;

HttpClient httpClient = HttpClient.create()
        .doOnConnected(conn -> conn
                .addHandlerLast(new ReadTimeoutHandler(10))
                .addHandlerLast(new WriteTimeoutHandler(10)));

或者间接应用 responseTimeout

HttpClient httpClient = HttpClient.create()
        .responseTimeout(Duration.ofSeconds(2));
Post Json

WebClient 能够发送 json,form,文件等申请报文,
看一个最罕用的 Post Json 申请

webClient.post().uri("http://localhost:9004/order/")
    .contentType(MediaType.APPLICATION_JSON)
    .body(Mono.just(order), Order.class)
    .retrieve().bodyToMono(String.class)

异样解决

能够在 ResponseSpec 中指定异样解决

private <T> Mono<T> getMono(String url, Class<T> resType) {
return webClient
    .get().uri(url).retrieve()
    .onStatus(HttpStatus::is5xxServerError, clientResponse -> {return Mono.error(...);
    })
    .onStatus(HttpStatus::is4xxClientError, clientResponse -> {return Mono.error(...);
    })
    .onStatus(HttpStatus::isError, clientResponse -> {return Mono.error(...);
    })
    .bodyToMono(resType)
}

也能够在 HttpClient 上配置

HttpClient httpClient = HttpClient.create()
        .doOnError((req, err) -> {log.error("err on request:{}", req.uri(), err);
        }, (res, err) -> {log.error("err on response:{}", res.uri(), err);
        })

同步返回后果

应用 block 办法能够阻塞线程,期待申请返回

private <T> T syncGetMono(String url, Class<T> resType) {
    return webClient
            .get().uri(url).retrieve()
            .bodyToMono(resType).block();}

获取响应信息

exchangeToMono 能够获取到响应的 header,statusCode 等信息

private <T> Mono<T> getMonoWithInfo(String url, Class<T> resType) {
    return webClient
            .get()
            .uri(url)
            .exchangeToMono(response -> {logger.info("request url:{},statusCode:{},headers:{}", url, response.statusCode(), response.headers());
                return response.bodyToMono(resType);
            });
}

注册核心与 Ribbon

教训证,WebClient 反对 Eureka 注册核心与 Ribbon 转发,应用形式与 restTemplate 雷同。
不过 @LoadBalanced 须要增加在 WebClient.Builder 上

@Bean
@LoadBalanced
public WebClient.Builder loadBalancedWebClientBuilder() {return WebClient.builder();
}

官网文档:https://docs.spring.io/spring…
文章残缺代码:https://gitee.com/binecy/bin-…

理论我的项目中,线程阻塞场景往往不只有 Http 申请阻塞,还有 Mysql 申请,Redis 申请,Kafka 申请等等导致的阻塞。从这些数据源中获取数据时,大多数都是阻塞直到数据源返回数据。
而 Reactive Spring 弱小在于,它也反对这些数据源的非阻塞响应式编程。
下一篇文章,咱们来看一个如何实现 Redis 的非阻塞响应式编程。

如果您感觉本文不错,欢送关注我的微信公众号,系列文章继续更新中。您的关注是我保持的能源!

正文完
 0