1 为什么要用 WebClient

刚开始尝试应用 Spring WebFlux 的时候,很多人都会应用 Mono.fromFuture() 将异步申请转成 Mono 对象,或者 Mono.fromSupplier() 将申请转成 MOno 对象,这两种形式在响应式编程中都是不倡议的,都会阻塞以后线程。

1.1 Mono.fromFuture() VS WebClient

Mono.fromFuture()办法和应用 WebClient 调用第三方接口之间存在以下区别:

  • 异步 vs. 非阻塞

Mono.fromFuture()办法实用于接管一个 java.util.concurrent.Future 对象,并将其转换为响应式的 Mono。这是一个阻塞操作,因为它会期待 Future 对象实现。而应用 WebClient 调用第三方接口是异步和非阻塞的,它不会间接阻塞应用程序的执行,而是应用事件驱动的形式解决响应。

可扩展性和灵活性:应用 WebClient 能够更灵便地进行配置和解决,例如设置超时工夫、申请头、重试机制等。WebClient 还能够与许多其余 Spring WebFlux 组件集成,如 WebSockets、Server-Sent Events 等。而 Mono.fromFuture() 是实用于单个 Future 对象转化为 Mono 的状况,可扩展性较差。

  • 错误处理

WebClient 提供了更丰盛的错误处理机制,能够通过 onStatus、onError 等办法来解决不同的 HTTP 状态码或异样。同时,WebClient 还提供了更灵便的重试和回退策略。Mono.fromFuture() 办法只能将 Future 对象的后果包装在 Mono 中,不提供特定的错误处理机制。

  • 阻塞操作

Mono.fromFuture() 会阻塞。当调用 Mono.fromFuture() 办法将 Future 转换为 Mono 时,它会期待 Future 对象的后果返回。在这个期待的过程中,Mono.fromFuture()办法会阻塞以后的线程。这意味着,如果 Future 的后果在运行过程中没有返回,则以后线程会始终阻塞,直到 Future 对象返回后果或者超时。因而,在应用 Mono.fromFuture() 时须要留神潜在的阻塞危险。另外,须要确保F uture 的工作在后盾线程中执行,免得阻塞应用程序的主线程。

1.2 Mono.fromFuture VS Mono.fromSupplier

Mono.fromSupplier() 和 Mono.fromFuture() 都是用于将异步执行的操作转换为响应式的 Mono 对象,但它们的区别在于:

Mono.fromSupplier() 实用于一个提供者/生产者,能够用来示意某个操作的后果,该操作是一些纯计算并且没有阻塞的办法。也就是说,Mono.fromSupplier() 将其参数 (Supplier) 所提供的操作异步执行,并将其后果打包成一个 Mono 对象。

Mono.fromFuture() 实用于一个 java.util.concurrent.Future 对象,将其封装成 Mono 对象。这意味着调用 Mono.fromFuture() 办法将阻塞以后线程,直到异步操作实现返回一个 Future 对象。

因而,Mono.fromSupplier() 与 Mono.fromFuture() 的次要区别在于:

Mono.fromSupplier() 是一个非阻塞的操作,不会阻塞以后线程。这个办法用于执行计算型的工作,返回一个封装了计算结果的 Mono 对象。
Mono.fromFuture() 是阻塞操作,会阻塞以后线程,直到异步操作结束并返回看,它实用于解决 java.util.concurrent.Future 对象。

须要留神的是,如果 Supplier 提供的操作是阻塞的,则 Mono.fromSupplier() 办法自身也会阻塞线程。但通常状况下,Supplier 提供的操作是纯计算型的,不会阻塞线程。

因而,能够应用 Mono.fromSupplier() 办法将一个纯计算型的操作转换为 Mono 对象,而将一个异步返回后果的操作转换为 Mono 对象时,能够应用 Mono.fromFuture() 办法。

2 定制化本人的 WebClient

2.1 初始化 WebClient

WebClient 反对建造者模式,应用 WebClient 建造者模式反对开发本人的个性化 WebClient,比方反对设置接口调用对立耗时、自定义底层 Http 客户端、调用链路、打印接口返回日志、监控接口耗时等等。

WebClient builder 反对以下办法

interface Builder {        /**         * 配置申请根底的url,如:baseUrl = "https://abc.go.com/v1";和 uriBuilderFactory 抵触,如果有 uriBuilderFactory ,则疏忽 baseUrl         */        Builder baseUrl(String baseUrl);        /**         * URI 申请的默认变量。也和 uriBuilderFactory 抵触,如果有 uriBuilderFactory ,则疏忽 defaultUriVariables         */        Builder defaultUriVariables(Map<String, ?> defaultUriVariables);        /**         * 提供一个预配置的UriBuilderFactory实例         */        Builder uriBuilderFactory(UriBuilderFactory uriBuilderFactory);        /**         * 默认 header         */        Builder defaultHeader(String header, String... values);        /**         * 默认cookie         */        Builder defaultCookie(String cookie, String... values);        /**         * 提供一个 consumer 来定制每个申请         */        Builder defaultRequest(Consumer<RequestHeadersSpec<?>> defaultRequest);        /**         * 增加一个filter,能够增加多个         */        Builder filter(ExchangeFilterFunction filter);            /**         * 配置要应用的 ClientHttpConnector。这对于插入或自定义底层HTTP 客户端库(例如SSL)的选项十分有用。         */        Builder clientConnector(ClientHttpConnector connector);        /**         * Configure the codecs for the {@code WebClient} in the         * {@link #exchangeStrategies(ExchangeStrategies) underlying}         * {@code ExchangeStrategies}.         * @param configurer the configurer to apply         * @since 5.1.13         */        Builder codecs(Consumer<ClientCodecConfigurer> configurer);        /**         * 提供一个事后配置了ClientHttpConnector和ExchangeStrategies的ExchangeFunction。这是对 clientConnector 的一种代替,并且无效地笼罩了它们。         */        Builder exchangeFunction(ExchangeFunction exchangeFunction);        /**         * Builder the {@link WebClient} instance.         */        WebClient build();          // 其余办法    }

2.2 日志打印及监控

  • 打印参数、url、返回
  • 参数和返回须要转成json
  • 须要打印失常返回日志和异样
  • 失常监控、异样监控、总监控以及响应工夫
.doOnSuccess(response-> {    log.info("get.success, url={}, response={}, param={}", url, response);}).doOnError(error-> {    log.info("get.error, url={}", url, error);    // 监控}).doFinally(res-> {  //监控})

2.3 返回解决

retrieve() // 申明如何提取响应。例如,提取一个ResponseEntity的状态,头部和身材:

.bodyToMono(clazz) 将返回body内容转成clazz对象,clazz 对象能够本人指定类型。如果碰到有问题的无奈转化的,也能够先转成String,而后本人实现一个工具类,将String转成 class 对象。

2.3.1 get

public <T> Mono<T> get(String url, Class<T> clazz, T defaultClass) {long start = System.currentTimeMillis();return webClient.get()        .uri(url)        .accept(MediaType.APPLICATION_JSON)        .retrieve()        .bodyToMono(clazz)        .doOnSuccess(response-> {            log.info("get.success, url={}, response={}, param={}", url, response);        })        .doOnError(error-> {            log.info("get.param.error, url={}", url, error);        })        .onErrorReturn(defaultClass)        .doFinally(res-> {        })        .publishOn(customScheduler);}

2.3.2 get param 申请

public <T> Mono<T> getParam(String url, MultiValueMap<String, String> param, Class<T> clazz, T defaultClass) {long start = System.currentTimeMillis();URI uri = UriComponentsBuilder.fromUriString(url)        .queryParams(param)        .build()        .toUri();return webClient.get()        .uri(uri)        .accept(MediaType.APPLICATION_JSON)        .retrieve()        .bodyToMono(clazz)        .doOnSuccess(response-> {            log.info("get.param.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(param));        })        .doOnError(error-> {            log.error("get.param.error, url={}, param={}", url, JsonUtil.toJson(param), error);        })        .onErrorReturn(defaultClass)        .doFinally(res-> {        // 监控 or 打印日志 or 耗时        })        .publishOn(customScheduler);}

2.3.3 post json 申请

public <T> Mono<T> postJson(String url, final HttpParameter4Json parameter, Class<T> clazz, T defaultClass) {final long start = System.currentTimeMillis();return webClient.post()        .uri(url)        .contentType(MediaType.APPLICATION_JSON)        .cookies(cookies -> cookies.setAll(parameter.getCookies()))        .body(Mono.just(parameter.getJsonBody()), ParameterizedTypeReference.forType(parameter.getBodyType()))        .headers(headers -> headers.setAll(parameter.getHeaders()))        .accept(MediaType.APPLICATION_JSON)        .retrieve()        .bodyToMono(clazz)        .doOnSuccess(response-> {            log.info("post.json.success, url={}, response={}, param={}", url, response, parameter.getJsonBody());        })        .doOnError(error-> {            log.error("get.param.error, url={}, param={}", url, parameter.getJsonBody(), error);        })        .onErrorReturn(defaultClass)        .doFinally(res-> {        })        .publishOn(customScheduler);}

2.3.4 post form Data 申请

public <T> Mono<T> postFormData(String url, HttpParameter parameter, Class<T> clazz, T defaultClass) {    final long start = System.currentTimeMillis();    return webClient.post()            .uri(url)            .contentType(MediaType.APPLICATION_FORM_URLENCODED)            .cookies(cookies -> cookies.setAll(parameter.getCookies()))            .body(BodyInserters.fromFormData(parameter.getMultiValueMapParam()))            .headers(headers -> headers.setAll(parameter.getMapHeaders()))            .accept(MediaType.APPLICATION_JSON)            .retrieve()            .bodyToMono(clazz)            .doOnSuccess(response-> {                log.info("post.fromData.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(parameter));            })            .doOnError(error-> {                log.info("get.param.error, url={}, param={}", url, JsonUtil.toJson(parameter), error);            })            .onErrorReturn(defaultClass)            .doFinally(res-> {            })            .publishOn(customScheduler);}

2.4 异样解决

2.4.1 异样返回兜底

onErrorReturn 发现异常返回兜底数据

2.4.2 异样解决

状态码转成异样抛出

.onStatus(HttpStatus::isError, response -> Mono.error(new RuntimeException("Request failed with status code: " + response.statusCode())))

监控异样

.doOnError(error -> {    // log and monitor})

3 残缺的 WebClient

package com.geniu.reactor.webclient;import com.geniu.utils.JsonUtil;import io.netty.channel.ChannelOption;import io.netty.handler.ssl.SslContextBuilder;import io.netty.handler.ssl.util.InsecureTrustManagerFactory;import lombok.extern.slf4j.Slf4j;import org.springframework.core.ParameterizedTypeReference;import org.springframework.http.HttpStatus;import org.springframework.http.MediaType;import org.springframework.http.client.reactive.ReactorClientHttpConnector;import org.springframework.util.MultiValueMap;import org.springframework.web.reactive.function.BodyInserters;import org.springframework.web.reactive.function.client.WebClient;import org.springframework.web.util.UriComponentsBuilder;import reactor.core.publisher.Mono;import reactor.core.scheduler.Scheduler;import reactor.core.scheduler.Schedulers;import reactor.netty.http.client.HttpClient;import reactor.netty.resources.ConnectionProvider;import reactor.netty.resources.LoopResources;import reactor.netty.tcp.SslProvider;import reactor.netty.tcp.TcpClient;import java.net.URI;import java.time.Duration;import java.util.function.Function;/** * @Author: prepared * @Date: 2023/8/15 11:05 */@Slf4jpublic class CustomerWebClient {    public static final CustomerWebClient instance = new CustomerWebClient();    /**     * 限度并发数 100     */    Scheduler customScheduler = Schedulers.newParallel("CustomScheduler", 100);    private final WebClient webClient;    private CustomerWebClient() {        final SslContextBuilder sslBuilder = SslContextBuilder.forClient()                .trustManager(InsecureTrustManagerFactory.INSTANCE);        final SslProvider ssl = SslProvider.builder().sslContext(sslBuilder)                .defaultConfiguration(SslProvider.DefaultConfigurationType.TCP).build();        final int cpuCores = Runtime.getRuntime().availableProcessors();        final int selectorCount = Math.max(cpuCores / 2, 4);        final int workerCount = Math.max(cpuCores * 2, 8);        final LoopResources pool = LoopResources.create("HCofSWC", selectorCount, workerCount, true);        final Function<? super TcpClient, ? extends TcpClient> tcpMapper = tcp -> tcp                .option(ChannelOption.TCP_NODELAY, true)                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)                .option(ChannelOption.SO_TIMEOUT, 10000)                .secure(ssl)                .runOn(pool);        ConnectionProvider.Builder httpClientOfSWC = ConnectionProvider                .builder("HttpClientOfSWC")                .maxConnections(100_000)                .pendingAcquireTimeout(Duration.ofSeconds(6));        final ConnectionProvider connectionProvider = httpClientOfSWC.build();        final HttpClient hc = HttpClient.create(connectionProvider)                .tcpConfiguration(tcpMapper);        final Function<HttpClient, HttpClient> hcMapper = rhc -> rhc                .compress(true);        final WebClient.Builder wcb = WebClient.builder()                .clientConnector(new ReactorClientHttpConnector(hcMapper.apply(hc)));//                .filter(new TraceRequestFilter()); 能够通过Filter 减少trace追踪        this.webClient = wcb.build();    }    public <T> Mono<T> get(String url, Class<T> clazz, T defaultClass) {        long start = System.currentTimeMillis();        return webClient.get()                .uri(url)                .accept(MediaType.APPLICATION_JSON)                .retrieve()                .onStatus(HttpStatus::isError, response -> Mono.error(new RuntimeException("Request failed with status code: " + response.statusCode())))                .bodyToMono(clazz)                .doOnSuccess(response-> {                    log.info("get.success, url={}, response={}, param={}", url, response);                })                .doOnError(error-> {                    log.info("get.param.error, url={}", url, error);                })                .onErrorReturn(defaultClass)                .doFinally(res-> {                })                .publishOn(customScheduler);    }    public <T> Mono<T> getParam(String url, MultiValueMap<String, String> param, Class<T> clazz, T defaultClass) {        long start = System.currentTimeMillis();        URI uri = UriComponentsBuilder.fromUriString(url)                .queryParams(param)                .build()                .toUri();        return webClient.get()                .uri(uri)                .accept(MediaType.APPLICATION_JSON)                .retrieve()                .bodyToMono(clazz)                .doOnSuccess(response-> {                    log.info("get.param.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(param));                })                .doOnError(error-> {                    log.error("get.param.error, url={}, param={}", url, JsonUtil.toJson(param), error);                })                .onErrorReturn(defaultClass)                .doFinally(res-> {                })                .publishOn(customScheduler);    }    public <T> Mono<T> postJson(String url, final HttpParameter4Json parameter, Class<T> clazz, T defaultClass) {        final long start = System.currentTimeMillis();        return webClient.post()                .uri(url)                .contentType(MediaType.APPLICATION_JSON)                .cookies(cookies -> cookies.setAll(parameter.getCookies()))                .body(Mono.just(parameter.getJsonBody()), ParameterizedTypeReference.forType(parameter.getBodyType()))                .headers(headers -> headers.setAll(parameter.getHeaders()))                .accept(MediaType.APPLICATION_JSON)                .retrieve()                .bodyToMono(clazz)                .doOnSuccess(response-> {                    log.info("post.json.success, url={}, response={}, param={}", url, response, parameter.getJsonBody());                })                .doOnError(error-> {                    log.error("get.param.error, url={}, param={}", url, parameter.getJsonBody(), error);                })                .onErrorReturn(defaultClass)                .doFinally(res-> {                })                .publishOn(customScheduler);    }    public <T> Mono<T> postFormData(String url, HttpParameter parameter, Class<T> clazz, T defaultClass) {        final long start = System.currentTimeMillis();        return webClient.post()                .uri(url)                .contentType(MediaType.APPLICATION_FORM_URLENCODED)                .cookies(cookies -> cookies.setAll(parameter.getCookies()))                .body(BodyInserters.fromFormData(parameter.getMultiValueMapParam()))                .headers(headers -> headers.setAll(parameter.getMapHeaders()))                .accept(MediaType.APPLICATION_JSON)                .retrieve()                .bodyToMono(clazz)                .doOnSuccess(response-> {                    log.info("post.fromData.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(parameter));                })                .doOnError(error-> {                    log.info("get.param.error, url={}, param={}", url, JsonUtil.toJson(parameter), error);                })                .onErrorReturn(defaultClass)                .doFinally(res-> {                })                .publishOn(customScheduler);    }}