本系列代码地址:https://github.com/JoJoTec/sp...

要想实现咱们上一节中提到的:

  • 须要在重试以及断路中加一些日志,便于日后的优化
  • 须要定义重试的 Exception,并且与断路器相结合,将非 2xx 的响应码也封装成特定的异样
  • 须要在断路器相干的 Operator 中减少相似于 FeignClient 中的负载平衡的数据更新,使得负载平衡更加智能

咱们须要将 resilience4j 自身提供的粘合库做一些革新,其实次要就是对 resilience4j 实现的 project reactor 的 Operator 进行革新。

对于断路器的革新

首先,WebClient 的返回对象只可能是 ClientResponse 类型,所以咱们这里革新进去的 Operator 不用带上形参,只须要针对 ClientResponse 即可,即:

public class ClientResponseCircuitBreakerOperator implements UnaryOperator<Publisher<ClientResponse>> {    ...}

在原有的断路器逻辑中,咱们须要退出针对 GET 办法以及之前定义的能够重试的门路匹配配置能够重试的逻辑,这须要咱们拿到原有申请的 URL 信息。然而 ClientResponse 中并没有裸露这些信息的接口,其默认实现 DefaultClientResponse(咱们只有没有本人给 WebClient 退出非凡的革新逻辑,实现都是 DefaultClientResponse) 中的 request() 办法能够获取申请 HttpRequest,其中蕴含 url 信息。然而这个类还有办法都是 package-private 的,咱们须要反射进去:

ClientResponseCircuitBreakerSubscriber

private static final Class<?> aClass;private static final Method request;static {    try {        aClass = Class.forName("org.springframework.web.reactive.function.client.DefaultClientResponse");        request = ReflectionUtils.findMethod(aClass, "request");        request.setAccessible(true);    } catch (Exception e) {        throw new RuntimeException(e);    }}

之后,在获取到 ClientResponse 之后记录断路器的逻辑中,须要退出下面提到的对于重试的革新,以及负载均衡器的记录:

ClientResponseCircuitBreakerSubscriber

protected void hookOnNext(ClientResponse clientResponse) {    if (!isDisposed()) {        if (singleProducer && successSignaled.compareAndSet(false, true)) {            int rawStatusCode = clientResponse.rawStatusCode();            HttpStatus httpStatus = HttpStatus.resolve(rawStatusCode);            try {                HttpRequest httpRequest = (HttpRequest) request.invoke(clientResponse);                //判断办法是否为 GET,以及是否在可重试门路配置中,从而得出是否能够重试                if (httpRequest.getMethod() != HttpMethod.GET && !webClientProperties.retryablePathsMatch(httpRequest.getURI().getPath())) {                    //如果不能重试,则间接返回后果                    circuitBreaker.onResult(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), clientResponse);                } else {                    if (httpStatus != null && httpStatus.is2xxSuccessful()) {                        //如果胜利,则间接返回后果                        circuitBreaker.onResult(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), clientResponse);                    } else {                        /**                         * 如果异样,参考 DefaultClientResponse 的代码进行异样封装                         * @see org.springframework.web.reactive.function.client.DefaultClientResponse#createException                         */                        Exception exception;                        if (httpStatus != null) {                            exception = WebClientResponseException.create(rawStatusCode, httpStatus.getReasonPhrase(), clientResponse.headers().asHttpHeaders(), EMPTY, null, null);                        } else {                            exception = new UnknownHttpStatusCodeException(rawStatusCode, clientResponse.headers().asHttpHeaders(), EMPTY, null, null);                        }                        circuitBreaker.onError(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), exception);                        downstreamSubscriber.onError(exception);                        return;                    }                }            } catch (Exception e) {                log.fatal("judge request method in circuit breaker error! the resilience4j feature would not be enabled: {}", e.getMessage(), e);                circuitBreaker.onResult(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), clientResponse);            }        }        eventWasEmitted.set(true);        downstreamSubscriber.onNext(clientResponse);    }}

同样的,在原有的实现,勾销还有失败的记录逻辑中,也加上记录负载平衡数据:

ClientResponseCircuitBreakerSubscriber

@Overrideprotected void hookOnComplete() {    if (successSignaled.compareAndSet(false, true)) {        serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, true);        circuitBreaker.onSuccess(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit());    }    downstreamSubscriber.onComplete();}@Overridepublic void hookOnCancel() {    if (!successSignaled.get()) {        serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, true);        if (eventWasEmitted.get()) {            circuitBreaker.onSuccess(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit());        } else {            circuitBreaker.releasePermission();        }    }}@Overrideprotected void hookOnError(Throwable e) {    serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, false);    circuitBreaker.onError(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), e);    downstreamSubscriber.onError(e);}

粘合 WebClient 与 resilience4j 的同时笼罩重试逻辑

因为后面的断路器中,咱们针对能够重试的非 2XX 响应封装成为 WebClientResponseException。所以在重试器中,咱们须要加上针对这个异样的重试。

同时,须要将重试器放在负载均衡器之前,因为每次重试,都要从负载均衡器中获取一个新的实例。同时,断路器须要放在负载均衡器之后,因为只有在这个之后,能力获取到本次调用的实例,咱们的的断路器是针对实例办法级别的:

WebClientDefaultConfiguration.java

@Beanpublic WebClient getWebClient(        ReactorLoadBalancerExchangeFilterFunction lbFunction,        WebClientConfigurationProperties webClientConfigurationProperties,        Environment environment,        RetryRegistry retryRegistry,        CircuitBreakerRegistry circuitBreakerRegistry,        ServiceInstanceMetrics serviceInstanceMetrics) {    String name = environment.getProperty(WebClientNamedContextFactory.PROPERTY_NAME);    Map<String, WebClientConfigurationProperties.WebClientProperties> configs = webClientConfigurationProperties.getConfigs();    if (configs == null || configs.size() == 0) {        throw new BeanCreationException("Failed to create webClient, please provide configurations under namespace: webclient.configs");    }    WebClientConfigurationProperties.WebClientProperties webClientProperties = configs.get(name);    if (webClientProperties == null) {        throw new BeanCreationException("Failed to create webClient, please provide configurations under namespace: webclient.configs." + name);    }    String serviceName = webClientProperties.getServiceName();    //如果没填写微服务名称,就应用配置 key 作为微服务名称    if (StringUtils.isBlank(serviceName)) {        serviceName = name;    }    String baseUrl = webClientProperties.getBaseUrl();    //如果没填写 baseUrl,就应用微服务名称填充    if (StringUtils.isBlank(baseUrl)) {        baseUrl = "http://" + serviceName;    }    Retry retry = null;    try {        retry = retryRegistry.retry(serviceName, serviceName);    } catch (ConfigurationNotFoundException e) {        retry = retryRegistry.retry(serviceName);    }    //笼罩其中的异样判断    retry = Retry.of(serviceName, RetryConfig.from(retry.getRetryConfig()).retryOnException(throwable -> {        //WebClientResponseException 会重试,因为在这里能 catch 的 WebClientResponseException 只对能够重试的申请封装了 WebClientResponseException        //参考 ClientResponseCircuitBreakerSubscriber 的代码        if (throwable instanceof WebClientResponseException) {            log.info("should retry on {}", throwable.toString());            return true;        }        //断路器异样重试,因为申请没有收回去        if (throwable instanceof CallNotPermittedException) {            log.info("should retry on {}", throwable.toString());            return true;        }        if (throwable instanceof WebClientRequestException) {            WebClientRequestException webClientRequestException = (WebClientRequestException) throwable;            HttpMethod method = webClientRequestException.getMethod();            URI uri = webClientRequestException.getUri();            //判断是否为响应超时,响应超时代表申请曾经收回去了,对于非 GET 并且没有标注能够重试的申请则不能重试            boolean isResponseTimeout = false;            Throwable cause = throwable.getCause();            //netty 的读取超时个别是 ReadTimeoutException            if (cause instanceof ReadTimeoutException) {                log.info("Cause is a ReadTimeoutException which indicates it is a response time out");                isResponseTimeout = true;            } else {                //对于其余一些框架,应用了 java 底层 nio 的个别是 SocketTimeoutException,message 为 read time out                //还有一些其余异样,然而 message 都会有 read time out 字段,所以通过 message 判断                String message = throwable.getMessage();                if (StringUtils.isNotBlank(message) && StringUtils.containsIgnoreCase(message.replace(" ", ""), "readtimeout")) {                    log.info("Throwable message contains readtimeout which indicates it is a response time out");                    isResponseTimeout = true;                }            }            //如果申请是 GET 或者标注了重试,则直接判断能够重试            if (method == HttpMethod.GET || webClientProperties.retryablePathsMatch(uri.getPath())) {                log.info("should retry on {}-{}, {}", method, uri, throwable.toString());                return true;            } else {                //否则,只针对申请还没有收回去的异样进行重试                if (isResponseTimeout) {                    log.info("should not retry on {}-{}, {}", method, uri, throwable.toString());                } else {                    log.info("should retry on {}-{}, {}", method, uri, throwable.toString());                    return true;                }            }        }        return false;    }).build());    HttpClient httpClient = HttpClient            .create()            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) webClientProperties.getConnectTimeout().toMillis())            .doOnConnected(connection ->                    connection                            .addHandlerLast(new ReadTimeoutHandler((int) webClientProperties.getResponseTimeout().toSeconds()))                            .addHandlerLast(new WriteTimeoutHandler((int) webClientProperties.getResponseTimeout().toSeconds()))            );    Retry finalRetry = retry;    String finalServiceName = serviceName;    return WebClient.builder()            .exchangeStrategies(ExchangeStrategies.builder()            .codecs(configurer -> configurer                    .defaultCodecs()                    //最大 body 占用 16m 内存                    .maxInMemorySize(16 * 1024 * 1024))            .build())            .clientConnector(new ReactorClientHttpConnector(httpClient))            //Retry在负载平衡前            .filter((clientRequest, exchangeFunction) -> {                return exchangeFunction                        .exchange(clientRequest)                        .transform(ClientResponseRetryOperator.of(finalRetry));            })            //负载均衡器,改写url            .filter(lbFunction)            //实例级别的断路器须要在负载平衡获取真正地址之后            .filter((clientRequest, exchangeFunction) -> {                ServiceInstance serviceInstance = getServiceInstance(clientRequest);                serviceInstanceMetrics.recordServiceInstanceCall(serviceInstance);                CircuitBreaker circuitBreaker;                //这时候的url是通过负载均衡器的,是实例的url                //须要留神的一点是,应用异步 client 的时候,最好不要带门路参数,否则这里的断路器成果不好                //断路器是每个实例每个门路一个断路器                String instancId = clientRequest.url().getHost() + ":" + clientRequest.url().getPort() + clientRequest.url().getPath();                try {                    //应用实例id新建或者获取现有的CircuitBreaker,应用serviceName获取配置                    circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId, finalServiceName);                } catch (ConfigurationNotFoundException e) {                    circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId);                }                log.info("webclient circuit breaker [{}-{}] status: {}, data: {}", finalServiceName, instancId, circuitBreaker.getState(), JSON.toJSONString(circuitBreaker.getMetrics()));                return exchangeFunction.exchange(clientRequest).transform(ClientResponseCircuitBreakerOperator.of(circuitBreaker, serviceInstance, serviceInstanceMetrics, webClientProperties));            }).baseUrl(baseUrl)            .build();}private ServiceInstance getServiceInstance(ClientRequest clientRequest) {    URI url = clientRequest.url();    DefaultServiceInstance defaultServiceInstance = new DefaultServiceInstance();    defaultServiceInstance.setHost(url.getHost());    defaultServiceInstance.setPort(url.getPort());    return defaultServiceInstance;}

这样,咱们就实现了咱们封装的基于配置的 WebClient

微信搜寻“我的编程喵”关注公众号,每日一刷,轻松晋升技术,斩获各种offer