共计 6928 个字符,预计需要花费 18 分钟才能阅读完成。
本系列代码地址:https://github.com/JoJoTec/sp…
实现 WeClient 的 NamedContextFactory
咱们要实现的是不同微服务主动配置装载不同的 WebClient Bean,这样就能够通过 NamedContextFactory 实现。咱们先来编写下实现这个 NamedContextFactory 整个的加载流程的代码,其结构图如下所示:
spring.factories
# AutoConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.github.jojotech.spring.cloud.webflux.auto.WebClientAutoConfiguration
在 spring.factories 定义了主动装载的主动配置类 WebClientAutoConfiguration
WebClientAutoConfiguration
@Import(WebClientConfiguration.class)
@Configuration(proxyBeanMethods = false)
public class WebClientAutoConfiguration {}
WebClientAutoConfiguration 这个主动配置类 Import 了 WebClientConfiguration
WebClientConfiguration
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(WebClientConfigurationProperties.class)
public class WebClientConfiguration {
@Bean
public WebClientNamedContextFactory getWebClientNamedContextFactory() {return new WebClientNamedContextFactory();
}
}
WebClientConfiguration 中创立了 WebClientNamedContextFactory 这个 NamedContextFactory 的 Bean。在这个 NamedContextFactory 中,定义了默认配置 WebClientDefaultConfiguration。在这个默认配置中,次要是给每个微服务都定义了一个 WebClient
定义 WebClient 的配置类
咱们编写下上一节定义的配置,包含:
- 微服务名称
- 微服务地址,服务地址,不填写则为 http:// 微服务名称
- 连贯超时,应用 Duration,这样咱们能够用更直观的配置了,例如 5ms,6s,7m 等等
- 响应超时,应用 Duration,这样咱们能够用更直观的配置了,例如 5ms,6s,7m 等等
- 能够重试的门路,默认只对 GET 办法重试,通过这个配置减少针对某些非 GET 办法的门路的重试;同时,这些门路能够应用 * 等门路匹配符,即 Spring 中的 AntPathMatcher 进行门路匹配多个门路。例如
/query/order/**
WebClientConfigurationProperties
@Data
@NoArgsConstructor
@ConfigurationProperties(prefix = "webclient")
public class WebClientConfigurationProperties {
private Map<String, WebClientProperties> configs;
@Data
@NoArgsConstructor
public static class WebClientProperties {private static AntPathMatcher antPathMatcher = new AntPathMatcher();
private Cache<String, Boolean> retryablePathsMatchResult = Caffeine.newBuilder().build();
/**
* 服务地址,不填写则为 http://serviceName
*/
private String baseUrl;
/**
* 微服务名称,不填写就是 configs 这个 map 的 key
*/
private String serviceName;
/**
* 能够重试的门路,默认只对 GET 办法重试,通过这个配置减少针对某些非 GET 办法的门路的重试
*/
private List<String> retryablePaths;
/**
* 连贯超时
*/
private Duration connectTimeout = Duration.ofMillis(500);
/**
* 响应超时
*/
private Duration responseTimeout = Duration.ofSeconds(8);
/**
* 是否匹配
* @param path
* @return
*/
public boolean retryablePathsMatch(String path) {if (CollectionUtils.isEmpty(retryablePaths)) {return false;}
return retryablePathsMatchResult.get(path, k -> {return retryablePaths.stream().anyMatch(pattern -> antPathMatcher.match(pattern, path));
});
}
}
}
粘合 WebClient 与 resilience4j
接下来粘合 WebClient 与 resilience4j 实现断路器以及重试逻辑,WebClient 基于 project-reactor 实现,resilience4j 官网提供了与 project-reactor 的粘合库:
<!-- 粘合 project-reactor 与 resilience4j,这个在异步场景常常会用到 -->
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-reactor</artifactId>
</dependency>
参考官网文档,咱们能够像上面这样给一般的 WebClient 减少相干组件:
减少重试器:
// 因为还是在后面弄好的 spring-cloud 环境下,所以还是能够这样获取配置对应的 retry
Retry retry;
try {retry = retryRegistry.retry(name, name);
} catch (ConfigurationNotFoundException e) {retry = retryRegistry.retry(name);
}
Retry finalRetry = retry;
WebClient.builder().filter((clientRequest, exchangeFunction) -> {return exchangeFunction.exchange(clientRequest)
// 外围就是退出 RetryOperator
.transform(RetryOperator.of(finalRetry));
})
这个 RetryOperator 其实就是应用了 project-reactor 中的 retryWhen 办法实现了 resilience4j 的 retry 机制:
RetryOperator
@Override
public Publisher<T> apply(Publisher<T> publisher) {
// 对于 mono 的解决
if (publisher instanceof Mono) {Context<T> context = new Context<>(retry.asyncContext());
Mono<T> upstream = (Mono<T>) publisher;
return upstream.doOnNext(context::handleResult)
.retryWhen(reactor.util.retry.Retry.withThrowable(errors -> errors.flatMap(context::handleErrors)))
.doOnSuccess(t -> context.onComplete());
} else if (publisher instanceof Flux) {
// 对于 flux 的解决
Context<T> context = new Context<>(retry.asyncContext());
Flux<T> upstream = (Flux<T>) publisher;
return upstream.doOnNext(context::handleResult)
.retryWhen(reactor.util.retry.Retry.withThrowable(errors -> errors.flatMap(context::handleErrors)))
.doOnComplete(context::onComplete);
} else {
// 不可能是 mono 或者 flux 以外的其余的
throw new IllegalPublisherException(publisher);
}
}
能够看出,其实次要填充了:
doOnNext(context::handleResult)
: 在有响应之后调用,将响应后果传入 retry 的 Context,判断是否须要重试以及重试距离是多久,并且抛出异样RetryDueToResultException
retryWhen(reactor.util.retry.Retry.withThrowable(errors -> errors.flatMap(context::handleErrors)))
:捕获异样RetryDueToResultException
,依据其中的间隔时间,返回 reactor 的重试距离:Mono.delay(Duration.ofMillis(waitDurationMillis))
doOnComplete(context::onComplete)
:申请实现,没有异样之后,调用 retry 的 complete 进行清理
减少断路器:
// 因为还是在后面弄好的 spring-cloud 环境下,所以还是能够这样获取配置对应的 circuitBreaker
CircuitBreaker circuitBreaker;
try {circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId, finalServiceName);
} catch (ConfigurationNotFoundException e) {circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId);
}
CircuitBreaker finalCircuitBreaker = circuitBreaker;
WebClient.builder().filter((clientRequest, exchangeFunction) -> {return exchangeFunction.exchange(clientRequest)
// 外围就是退出 CircuitBreakerOperator
.transform(CircuitBreakerOperator.of(finalCircuitBreaker));
})
相似的,CircuitBreakerOperator 其实也是粘合断路器与 reactor 的 publisher 中的一些 stage 办法,将后果的胜利或者失败记录入断路器,这里须要留神,可能有的链路能走到 onNext,可能有的链路能走到 onComplete,也有可能都走到,所以 这两个办法都要记录胜利,并且保障只记录一次:
CircuitBreakerSubscriber
class CircuitBreakerSubscriber<T> extends AbstractSubscriber<T> {
private final CircuitBreaker circuitBreaker;
private final long start;
private final boolean singleProducer;
private final AtomicBoolean successSignaled = new AtomicBoolean(false);
private final AtomicBoolean eventWasEmitted = new AtomicBoolean(false);
protected CircuitBreakerSubscriber(CircuitBreaker circuitBreaker,
CoreSubscriber<? super T> downstreamSubscriber,
boolean singleProducer) {super(downstreamSubscriber);
this.circuitBreaker = requireNonNull(circuitBreaker);
this.singleProducer = singleProducer;
this.start = circuitBreaker.getCurrentTimestamp();}
@Override
protected void hookOnNext(T value) {if (!isDisposed()) {
// 失常实现时,断路器也标记胜利,因为可能会触发屡次(因为 onComplete 也会记录),所以须要 successSignaled 标记只记录一次
if (singleProducer && successSignaled.compareAndSet(false, true)) {circuitBreaker.onResult(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), value);
}
// 标记事件曾经收回,就是曾经执行完 WebClient 的申请,前面判断勾销的时候会用到
eventWasEmitted.set(true);
downstreamSubscriber.onNext(value);
}
}
@Override
protected void hookOnComplete() {
// 失常实现时,断路器也标记胜利,因为可能会触发屡次(因为 onNext 也会记录),所以须要 successSignaled 标记只记录一次
if (successSignaled.compareAndSet(false, true)) {circuitBreaker.onSuccess(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit());
}
downstreamSubscriber.onComplete();}
@Override
public void hookOnCancel() {if (!successSignaled.get()) {
// 如果事件曾经收回,那么也记录胜利
if (eventWasEmitted.get()) {circuitBreaker.onSuccess(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit());
} else {
// 否则勾销
circuitBreaker.releasePermission();}
}
}
@Override
protected void hookOnError(Throwable e) {
// 记录失败
circuitBreaker.onError(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), e);
downstreamSubscriber.onError(e);
}
}
咱们会应用这个库进行粘合,然而不会间接应用下面的代码,因为思考到:
- 须要在重试以及断路中加一些日志,便于日后的优化
- 须要定义重试的 Exception,并且与断路器相结合,将非 2xx 的响应码也封装成特定的异样
- 须要在断路器相干的 Operator 中减少相似于 FeignClient 中的负载平衡的数据更新,使得负载平衡更加智能
在上面一节咱们会具体阐明咱们是如何实现的有断路器以及重试逻辑和负载平衡数据更新的 WebClient。
微信搜寻“我的编程喵”关注公众号,每日一刷,轻松晋升技术,斩获各种 offer: