关于spring-cloud:SpringCloud升级之路20200x版33-实现重试断路器以及线程隔离源码

10次阅读

共计 13664 个字符,预计需要花费 35 分钟才能阅读完成。

本系列代码地址:https://github.com/JoJoTec/sp…

在后面两节,咱们梳理了实现 Feign 断路器以及线程隔离的思路,并阐明了如何优化目前的负载平衡算法。然而如何更新负载平衡的数据缓存,以及实现重试、断路器以及线程隔离的源码还没提,这一节咱们会详细分析。

首先,从 spring.factories 引入,减少咱们自定义 OpenFeign 配置的加载:

spring.factories

# AutoConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.github.jojotech.spring.cloud.webmvc.auto.OpenFeignAutoConfiguration

主动配置类是 OpenFeignAutoConfiguration,其内容是:

OpenFeignAutoConfiguration.java

// 设置 `@Configuration(proxyBeanMethods=false)`,因为没有 @Bean 的办法相互调用须要每次返回同一个 Bean,没必要代理,敞开减少启动速度
@Configuration(proxyBeanMethods = false)
// 加载配置,CommonOpenFeignConfiguration
@Import(CommonOpenFeignConfiguration.class)
// 启用 OpenFeign 注解扫描和配置,默认配置为 DefaultOpenFeignConfiguration,其实就是 Feign 的 NamedContextFactory(即 FeignContext)的默认配置类是 DefaultOpenFeignConfiguration
@EnableFeignClients(value = "com.github.jojotech", defaultConfiguration = DefaultOpenFeignConfiguration.class)
public class OpenFeignAutoConfiguration {}

为何要加这一层而不是间接应用 Import 的 CommonOpenFeignConfiguration?应用 @AutoConfigurationBefore@AutoConfigurationAfter 配置和其余 AutoConfiguration 加载的前后程序。@AutoConfigurationBefore@AutoConfigurationAfter 是 spring-boot 的注解,只对于 spring.factories 加载的 AutoConfiguration 失效。所以 在设计上要加上这一层,避免咱们将来可能会用到这些注解

CommonOpenFeignConfiguration 中蕴含所有 OpenFeign 的共用的一些 Bean,这些 Bean 是单例被所有 FeignClient 专用的,包含:

  1. FeignClient 要用的 Client 的底层 HTTP Client,咱们这里应用 Apache HttpClient
  2. 将 Apache HttpClient 封装成 FeignClient 要用的 Client 的 ApacheHttpClient
  3. spring-cloud-openfeign 的 FeignClient 用的 Client 的负载平衡实现外围类是 FeignBlockingLoadBalancerClient,咱们须要将其封装代理从而实现断路器和线程隔离以及负载平衡数据采集,封装类是咱们本人实现的 FeignBlockingLoadBalancerClientDelegate。外围实现断路器和线程隔离逻辑的类是 Resilience4jFeignClient。

CommonOpenFeignConfiguration.java

@Configuration(proxyBeanMethods = false)
public class CommonOpenFeignConfiguration {
    // 创立 Apache HttpClient,自定义一些配置
    @Bean
    public HttpClient getHttpClient() {
        // 长连贯放弃 5 分钟
        PoolingHttpClientConnectionManager pollingConnectionManager = new PoolingHttpClientConnectionManager(5, TimeUnit.MINUTES);
        // 总连接数
        pollingConnectionManager.setMaxTotal(1000);
        // 同路由的并发数
        pollingConnectionManager.setDefaultMaxPerRoute(1000);
        HttpClientBuilder httpClientBuilder = HttpClients.custom();
        httpClientBuilder.setConnectionManager(pollingConnectionManager);
        // 放弃长连贯配置,须要在头增加 Keep-Alive
        httpClientBuilder.setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy());
        return httpClientBuilder.build();}

    // 创立应用 HttpClient 实现的 OpenFeign 的 Client 接口的 Bean
    @Bean
    public ApacheHttpClient apacheHttpClient(HttpClient httpClient) {return new ApacheHttpClient(httpClient);
    }

    //FeignBlockingLoadBalancerClient 的代理类,也是实现 OpenFeign 的 Client 接口的 Bean
    @Bean
    // 应用 Primary 让 FeignBlockingLoadBalancerClientDelegate 成为所有 FeignClient 理论应用的 Bean
    @Primary
    public FeignBlockingLoadBalancerClientDelegate feignBlockingLoadBalancerCircuitBreakableClient(
            ServiceInstanceMetrics serviceInstanceMetrics,
            // 咱们下面创立的 ApacheHttpClient Bean
            ApacheHttpClient apacheHttpClient,
            // 为何应用 ObjectProvider 请参考 FeignBlockingLoadBalancerClientDelegate 源码的正文
            ObjectProvider<LoadBalancerClient> loadBalancerClientProvider,
            //resilience4j 的线程隔离
            ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry,
            //resilience4j 的断路器
            CircuitBreakerRegistry circuitBreakerRegistry,
            //Sleuth 的 Tracer,用于获取申请上下文
            Tracer tracer,
            // 负载平衡属性
            LoadBalancerProperties properties,
            // 为何应用这个不间接用 FeignBlockingLoadBalancerClient 请参考 FeignBlockingLoadBalancerClientDelegate 的正文
            LoadBalancerClientFactory loadBalancerClientFactory
    ) {
        return new FeignBlockingLoadBalancerClientDelegate(
                // 咱们本人封装的外围 Client 实现,退出了断路器,线程隔离以及负载平衡数据采集
                new Resilience4jFeignClient(
                        serviceInstanceMetrics, apacheHttpClient,
                        threadPoolBulkheadRegistry,
                        circuitBreakerRegistry,
                        tracer
                ),
                loadBalancerClientProvider,
                properties,
                loadBalancerClientFactory
        );
    }
}

其中,Resilience4jFeignClient 粘合断路器,线程隔离的外围代码,同时也记录了负载平衡的理论调用数据

Resilience4jFeignClient.java

public class Resilience4jFeignClient implements Client {
    private final ServiceInstanceMetrics serviceInstanceMetrics;
    private final ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry;
    private final CircuitBreakerRegistry circuitBreakerRegistry;
    private final Tracer tracer;
    private ApacheHttpClient apacheHttpClient;


    public Resilience4jFeignClient(
            ServiceInstanceMetrics serviceInstanceMetrics, ApacheHttpClient apacheHttpClient,
            ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry,
            CircuitBreakerRegistry circuitBreakerRegistry,
            Tracer tracer
    ) {
        this.serviceInstanceMetrics = serviceInstanceMetrics;
        this.apacheHttpClient = apacheHttpClient;
        this.threadPoolBulkheadRegistry = threadPoolBulkheadRegistry;
        this.circuitBreakerRegistry = circuitBreakerRegistry;
        this.tracer = tracer;
    }

    @Override
    public Response execute(Request request, Request.Options options) throws IOException {
        // 获取定义 FeignClient 的接口的 FeignClient 注解
        FeignClient annotation = request.requestTemplate().methodMetadata().method().getDeclaringClass().getAnnotation(FeignClient.class);
        // 和 Retry 保持一致,应用 contextId,而不是微服务名称
        //contextId 会作为咱们前面读取断路器以及线程隔离配置的 key
        String contextId = annotation.contextId();
        // 获取实例惟一 id
        String serviceInstanceId = getServiceInstanceId(contextId, request);
        // 获取实例 + 办法惟一 id
        String serviceInstanceMethodId = getServiceInstanceMethodId(contextId, request);

        ThreadPoolBulkhead threadPoolBulkhead;
        CircuitBreaker circuitBreaker;
        try {
            // 每个实例一个线程池
            threadPoolBulkhead = threadPoolBulkheadRegistry.bulkhead(serviceInstanceId, contextId);
        } catch (ConfigurationNotFoundException e) {threadPoolBulkhead = threadPoolBulkheadRegistry.bulkhead(serviceInstanceId);
        }
        try {
            // 每个服务实例具体方法一个 resilience4j 熔断记录器,在服务实例具体方法维度做熔断,所有这个服务的实例具体方法共享这个服务的 resilience4j 熔断配置
            circuitBreaker = circuitBreakerRegistry.circuitBreaker(serviceInstanceMethodId, contextId);
        } catch (ConfigurationNotFoundException e) {circuitBreaker = circuitBreakerRegistry.circuitBreaker(serviceInstanceMethodId);
        }
        // 放弃 traceId
        Span span = tracer.currentSpan();
        ThreadPoolBulkhead finalThreadPoolBulkhead = threadPoolBulkhead;
        CircuitBreaker finalCircuitBreaker = circuitBreaker;
        Supplier<CompletionStage<Response>> completionStageSupplier = ThreadPoolBulkhead.decorateSupplier(threadPoolBulkhead,
                OpenfeignUtil.decorateSupplier(circuitBreaker, () -> {try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {log.info("call url: {} -> {}, ThreadPoolStats({}): {}, CircuitBreakStats({}): {}",
                                request.httpMethod(),
                                request.url(),
                                serviceInstanceId,
                                JSON.toJSONString(finalThreadPoolBulkhead.getMetrics()),
                                serviceInstanceMethodId,
                                JSON.toJSONString(finalCircuitBreaker.getMetrics())
                        );
                        Response execute = apacheHttpClient.execute(request, options);
                        log.info("response: {} - {}", execute.status(), execute.reason());
                        return execute;
                    } catch (IOException e) {throw new CompletionException(e);
                    }
                })
        );
        ServiceInstance serviceInstance = getServiceInstance(request);
        try {serviceInstanceMetrics.recordServiceInstanceCall(serviceInstance);
            Response response = Try.ofSupplier(completionStageSupplier).get().toCompletableFuture().join();
            serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, true);
            return response;
        } catch (BulkheadFullException e) {
            // 线程池限流异样
            serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, false);
            return Response.builder()
                    .request(request)
                    .status(SpecialHttpStatus.BULKHEAD_FULL.getValue())
                    .reason(e.getLocalizedMessage())
                    .requestTemplate(request.requestTemplate()).build();} catch (CompletionException e) {serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, false);
            // 外部抛出的所有异样都被封装了一层 CompletionException,所以这里须要取出外面的 Exception
            Throwable cause = e.getCause();
            // 对于断路器关上,返回对应非凡的错误码
            if (cause instanceof CallNotPermittedException) {return Response.builder()
                        .request(request)
                        .status(SpecialHttpStatus.CIRCUIT_BREAKER_ON.getValue())
                        .reason(cause.getLocalizedMessage())
                        .requestTemplate(request.requestTemplate()).build();}
            // 对于 IOException,须要判断是否申请曾经发送进来了
            // 对于 connect time out 的异样,则能够重试,因为申请没收回去,然而例如 read time out 则不行,因为申请曾经收回去了
            if (cause instanceof IOException) {boolean containsRead = cause.getMessage().toLowerCase().contains("read");
                if (containsRead) {log.info("{}-{} exception contains read, which indicates the request has been sent", e.getMessage(), cause.getMessage());
                    // 如果是 read 异样,则代表申请曾经发了进来,则不能重试(除非是 GET 申请或者有 RetryableMethod 注解,这个在 DefaultErrorDecoder 判断)return Response.builder()
                            .request(request)
                            .status(SpecialHttpStatus.NOT_RETRYABLE_IO_EXCEPTION.getValue())
                            .reason(cause.getLocalizedMessage())
                            .requestTemplate(request.requestTemplate()).build();} else {return Response.builder()
                            .request(request)
                            .status(SpecialHttpStatus.RETRYABLE_IO_EXCEPTION.getValue())
                            .reason(cause.getLocalizedMessage())
                            .requestTemplate(request.requestTemplate()).build();}
            }
            throw e;
        }
    }

    private ServiceInstance getServiceInstance(Request request) throws MalformedURLException {URL url = new URL(request.url());
        DefaultServiceInstance defaultServiceInstance = new DefaultServiceInstance();
        defaultServiceInstance.setHost(url.getHost());
        defaultServiceInstance.setPort(url.getPort());
        return defaultServiceInstance;
    }

    // 获取微服务实例 id,格局为:FeignClient 的 contextId:host:port,例如:test1Client:10.238.45.78:8251
    private String getServiceInstanceId(String contextId, Request request) throws MalformedURLException {
        // 解析 URL
        URL url = new URL(request.url());
        // 拼接微服务实例 id
        return contextId + ":" + url.getHost() + ":" + url.getPort();
    }

    // 获取微服务实例办法 id,格局为:FeignClient 的 contextId:host:port:methodName,例如:test1Client:10.238.45.78:8251:
    private String getServiceInstanceMethodId(String contextId, Request request) throws MalformedURLException {URL url = new URL(request.url());
        // 通过微服务名称 + 实例 + 办法的形式,获取惟一 id
        String methodName = request.requestTemplate().methodMetadata().method().toGenericString();
        return contextId + ":" + url.getHost() + ":" + url.getPort() + ":" + methodName;
    }
}

在下面,咱们定义了几种非凡的 HTTP 返回码,次要目标是想将一些异样封装成响应返回,而后通过咱们前面 Feign 谬误解码器解码成对立的 RetryableException,这样 在 resilience4j 的重试配置中,咱们就不必配置很简单的异样重试,仅针对 RetryableException 进行重试即可

咱们想让 spring-cloud-openfeign 的外围负载平衡 Client,在实现调用 LoadBalancer 抉择实例并替换 url 之后,调用的 client 间接是 ApacheHttpClient 而是咱们下面这个类,所以退出了 FeignBlockingLoadBalancerClientDelegate 封装:

/**
 * 因为初始化 FeignBlockingLoadBalancerClient 须要 LoadBalancerClient
 * 然而因为 Spring Cloud 2020 之后,Spring Cloud LoadBalancer BlockingClient 的加载,强制退出了程序
 * @see org.springframework.cloud.loadbalancer.config.BlockingLoadBalancerClientAutoConfiguration
 * 这个主动配置退出了 @AutoConfigureAfter(LoadBalancerAutoConfiguration.class)
 * 导致咱们在初始化的 FeignClient 的时候,无奈拿到 BlockingClient
 * 所以,须要通过 ObjectProvider 封装 LoadBalancerClient,在真正调用 FeignClient 的时候通过 ObjectProvider 拿到 LoadBalancerClient 来创立 FeignBlockingLoadBalancerClient
 */
public class FeignBlockingLoadBalancerClientDelegate implements Client {
    private FeignBlockingLoadBalancerClient feignBlockingLoadBalancerClient;

    private final Client delegate;
    private final ObjectProvider<LoadBalancerClient> loadBalancerClientObjectProvider;
    private final LoadBalancerProperties properties;
    private final LoadBalancerClientFactory loadBalancerClientFactory;

    public FeignBlockingLoadBalancerClientDelegate(
            Client delegate,
            ObjectProvider<LoadBalancerClient> loadBalancerClientObjectProvider,
            LoadBalancerProperties properties,
            LoadBalancerClientFactory loadBalancerClientFactory
    ) {
        this.delegate = delegate;
        this.loadBalancerClientObjectProvider = loadBalancerClientObjectProvider;
        this.properties = properties;
        this.loadBalancerClientFactory = loadBalancerClientFactory;
    }

    @Override
    public Response execute(Request request, Request.Options options) throws IOException {if (feignBlockingLoadBalancerClient == null) {synchronized (this) {if (feignBlockingLoadBalancerClient == null) {
                    feignBlockingLoadBalancerClient = new FeignBlockingLoadBalancerClient(
                            this.delegate,
                            this.loadBalancerClientObjectProvider.getIfAvailable(),
                            this.properties,
                            this.loadBalancerClientFactory
                    );
                }
            }
        }
        return feignBlockingLoadBalancerClient.execute(request, options);
    }
}

咱们指定的 FeignClient 的 NamedContextFactory(即 FeignContext)的默认配置 DefaultOpenFeignConfiguration 中,次要粘合了重试逻辑,以及谬误解码器:

@Configuration(proxyBeanMethods = false)
public class DefaultOpenFeignConfiguration {

    @Bean
    public ErrorDecoder errorDecoder() {return new DefaultErrorDecoder();
    }

    @Bean
    public Feign.Builder resilience4jFeignBuilder(
            List<FeignDecoratorBuilderInterceptor> feignDecoratorBuilderInterceptors,
            FeignDecorators.Builder builder
    ) {feignDecoratorBuilderInterceptors.forEach(feignDecoratorBuilderInterceptor -> feignDecoratorBuilderInterceptor.intercept(builder));
        return Resilience4jFeign.builder(builder.build());
    }

    @Bean
    public FeignDecorators.Builder defaultBuilder(Environment environment, RetryRegistry retryRegistry) {String name = environment.getProperty("feign.client.name");
        Retry retry = null;
        try {retry = retryRegistry.retry(name, name);
        } catch (ConfigurationNotFoundException e) {retry = retryRegistry.retry(name);
        }
        // 笼罩其中的异样判断,只针对 feign.RetryableException 进行重试,所有须要重试的异样咱们都在 DefaultErrorDecoder 以及 Resilience4jFeignClient 中封装成了 RetryableException
        retry = Retry.of(name, RetryConfig.from(retry.getRetryConfig()).retryOnException(throwable -> {return throwable instanceof feign.RetryableException;}).build());
        return FeignDecorators.builder().withRetry(retry);
    }
}

谬误解码器即把下面能够重试的异样响应码,以及咱们想重试的申请封装成 RetryableException,代码就不赘述了。这样咱们就实现了自定义的实现重试、断路器以及线程隔离的 FeignClient。能够通过如下形式进行配置应用:

application.yml 配置:

################ feign 配置 ################
feign:
  hystrix:
    enabled: false
  client:
    config:
      default:
        # 链接超时
        connectTimeout: 500
        # 读取超时
        readTimeout: 8000
      test1-client:
        # 链接超时
        connectTimeout: 500
        # 读取超时
        readTimeout: 60000
################ resilience 配置 ################
resilience4j.circuitbreaker:
  configs:
    default:
      registerHealthIndicator: true
      slidingWindowSize: 10
      minimumNumberOfCalls: 5
      slidingWindowType: TIME_BASED
      permittedNumberOfCallsInHalfOpenState: 3
      automaticTransitionFromOpenToHalfOpenEnabled: true
      waitDurationInOpenState: 2s
      failureRateThreshold: 30
      eventConsumerBufferSize: 10
      recordExceptions:
        - java.lang.Exception
resilience4j.retry:
  configs:
    default:
      maxRetryAttempts: 2
    test1-client:
      maxRetryAttempts: 3
resilience4j.thread-pool-bulkhead:
  configs:
    default:
      maxThreadPoolSize: 64
      coreThreadPoolSize: 32
      queueCapacity: 32
    

定义 Feignclient:

// 这个会用到所有 key 为 test1-client 的配置,如果对应的配置中没有 test1-client,就用 default
@FeignClient(name = "service1", contextId = "test1-client")
public interface TestService1Client {@GetMapping("/anything")
    HttpBinAnythingResponse anything();}
// 这个会用到所有 key 为 test2-client 的配置,因为咱们这里没有 test2-client 的独自配置,所以用的全是 default 配置
@FeignClient(name = "service1", contextId = "test2-client")
public interface TestService1Client2 {@GetMapping("/anything")
    HttpBinAnythingResponse anything();}

下一节开始,咱们会对这里实现的 FeignClient 封装进行单元测试,验证咱们的正确性。

微信搜寻“我的编程喵”关注公众号,每日一刷,轻松晋升技术,斩获各种 offer

正文完
 0