本系列代码地址:https://github.com/JoJoTec/sp…
上一节咱们通过单元测试验证了重试的正确性,这一节咱们来验证咱们线程隔离的正确性,次要包含:
- 验证配置正确加载:即咱们在 Spring 配置(例如
application.yml
)中的退出的 Resilience4j 的配置被正确加载利用了。 - 雷同微服务调用不同实例的时候,应用的是不同的线程(池)。
验证配置正确加载
与之前验证重试相似,咱们能够定义不同的 FeignClient,之后查看 resilience4j 加载的线程隔离配置来验证线程隔离配置的正确加载。
并且,与重试配置不同的是,通过系列后面的源码剖析,咱们晓得 spring-cloud-openfeign 的 FeignClient 其实是懒加载的。所以咱们实现的线程隔离也是懒加载的,须要先调用,之后才会初始化线程池。所以这里咱们须要先进行调用之后,再验证线程池配置。
首先定义两个 FeignClient,微服务别离是 testService1 和 testService2,contextId 别离是 testService1Client 和 testService2Client
@FeignClient(name = "testService1", contextId = "testService1Client")
public interface TestService1Client {@GetMapping("/anything")
HttpBinAnythingResponse anything();}
@FeignClient(name = "testService2", contextId = "testService2Client")
public interface TestService2Client {@GetMapping("/anything")
HttpBinAnythingResponse anything();}
而后,咱们减少 Spring 配置,并且给两个微服务都增加一个实例,应用 SpringExtension 编写单元测试类:
//SpringExtension 也蕴含了 Mockito 相干的 Extension,所以 @Mock 等注解也失效了
@ExtendWith(SpringExtension.class)
@SpringBootTest(properties = {
// 默认申请重试次数为 3
"resilience4j.retry.configs.default.maxAttempts=3",
// testService2Client 外面的所有办法申请重试次数为 2
"resilience4j.retry.configs.testService2Client.maxAttempts=2",
// 默认线程池配置
"resilience4j.thread-pool-bulkhead.configs.default.coreThreadPoolSize=10",
"resilience4j.thread-pool-bulkhead.configs.default.maxThreadPoolSize=10",
"resilience4j.thread-pool-bulkhead.configs.default.queueCapacity=1" ,
//testService2Client 的线程池配置
"resilience4j.thread-pool-bulkhead.configs.testService2Client.coreThreadPoolSize=5",
"resilience4j.thread-pool-bulkhead.configs.testService2Client.maxThreadPoolSize=5",
"resilience4j.thread-pool-bulkhead.configs.testService2Client.queueCapacity=1",
})
@Log4j2
public class OpenFeignClientTest {
@SpringBootApplication
@Configuration
public static class App {
@Bean
public DiscoveryClient discoveryClient() {
// 模仿两个服务实例
ServiceInstance service1Instance1 = Mockito.spy(ServiceInstance.class);
ServiceInstance service2Instance2 = Mockito.spy(ServiceInstance.class);
Map<String, String> zone1 = Map.ofEntries(Map.entry("zone", "zone1")
);
when(service1Instance1.getMetadata()).thenReturn(zone1);
when(service1Instance1.getInstanceId()).thenReturn("service1Instance1");
when(service1Instance1.getHost()).thenReturn("www.httpbin.org");
when(service1Instance1.getPort()).thenReturn(80);
when(service2Instance2.getInstanceId()).thenReturn("service1Instance2");
when(service2Instance2.getHost()).thenReturn("httpbin.org");
when(service2Instance2.getPort()).thenReturn(80);
DiscoveryClient spy = Mockito.spy(DiscoveryClient.class);
Mockito.when(spy.getInstances("testService1"))
.thenReturn(List.of(service1Instance1));
Mockito.when(spy.getInstances("testService2"))
.thenReturn(List.of(service2Instance2));
return spy;
}
}
}
编写测试代码,验证配置正确:
@Test
public void testConfigureThreadPool() {
// 避免断路器影响
circuitBreakerRegistry.getAllCircuitBreakers().asJava().forEach(CircuitBreaker::reset);
// 调用下这两个 FeignClient 确保对应的 NamedContext 被初始化
testService1Client.anything();
testService2Client.anything();
// 验证线程隔离的理论配置,合乎咱们的填入的配置
ThreadPoolBulkhead threadPoolBulkhead = threadPoolBulkheadRegistry.getAllBulkheads().asJava()
.stream().filter(t -> t.getName().contains("service1Instance1")).findFirst().get();
Assertions.assertEquals(threadPoolBulkhead.getBulkheadConfig().getCoreThreadPoolSize(), 10);
Assertions.assertEquals(threadPoolBulkhead.getBulkheadConfig().getMaxThreadPoolSize(), 10);
threadPoolBulkhead = threadPoolBulkheadRegistry.getAllBulkheads().asJava()
.stream().filter(t -> t.getName().contains("service1Instance2")).findFirst().get();
Assertions.assertEquals(threadPoolBulkhead.getBulkheadConfig().getCoreThreadPoolSize(), 5);
Assertions.assertEquals(threadPoolBulkhead.getBulkheadConfig().getMaxThreadPoolSize(), 5);
}
雷同微服务调用不同实例的时候,应用的是不同的线程(池)。
咱们须要确保,最初调用(也就是发送 http 申请)的执行的线程池,必须是对应的 ThreadPoolBulkHead 中的线程池。这个须要咱们对 ApacheHttpClient 做切面实现,增加注解 @EnableAspectJAutoProxy(proxyTargetClass = true)
:
//SpringExtension 也蕴含了 Mockito 相干的 Extension,所以 @Mock 等注解也失效了
@ExtendWith(SpringExtension.class)
@SpringBootTest(properties = {
// 默认申请重试次数为 3
"resilience4j.retry.configs.default.maxAttempts=3",
// testService2Client 外面的所有办法申请重试次数为 2
"resilience4j.retry.configs.testService2Client.maxAttempts=2",
// 默认线程池配置
"resilience4j.thread-pool-bulkhead.configs.default.coreThreadPoolSize=10",
"resilience4j.thread-pool-bulkhead.configs.default.maxThreadPoolSize=10",
"resilience4j.thread-pool-bulkhead.configs.default.queueCapacity=1" ,
//testService2Client 的线程池配置
"resilience4j.thread-pool-bulkhead.configs.testService2Client.coreThreadPoolSize=5",
"resilience4j.thread-pool-bulkhead.configs.testService2Client.maxThreadPoolSize=5",
"resilience4j.thread-pool-bulkhead.configs.testService2Client.queueCapacity=1",
})
@Log4j2
public class OpenFeignClientTest {
@SpringBootApplication
@Configuration
@EnableAspectJAutoProxy(proxyTargetClass = true)
public static class App {
@Bean
public DiscoveryClient discoveryClient() {
// 模仿两个服务实例
ServiceInstance service1Instance1 = Mockito.spy(ServiceInstance.class);
ServiceInstance service2Instance2 = Mockito.spy(ServiceInstance.class);
Map<String, String> zone1 = Map.ofEntries(Map.entry("zone", "zone1")
);
when(service1Instance1.getMetadata()).thenReturn(zone1);
when(service1Instance1.getInstanceId()).thenReturn("service1Instance1");
when(service1Instance1.getHost()).thenReturn("www.httpbin.org");
when(service1Instance1.getPort()).thenReturn(80);
when(service2Instance2.getInstanceId()).thenReturn("service1Instance2");
when(service2Instance2.getHost()).thenReturn("httpbin.org");
when(service2Instance2.getPort()).thenReturn(80);
DiscoveryClient spy = Mockito.spy(DiscoveryClient.class);
Mockito.when(spy.getInstances("testService1"))
.thenReturn(List.of(service1Instance1));
Mockito.when(spy.getInstances("testService2"))
.thenReturn(List.of(service2Instance2));
return spy;
}
}
}
拦挡 ApacheHttpClient
的 execute
办法,这样能够拿到真正负责 http 调用的线程池,将线程其放入申请的 Header:
@Aspect
public static class ApacheHttpClientAop {
// 在最初一步 ApacheHttpClient 切面
@Pointcut("execution(* com.github.jojotech.spring.cloud.webmvc.feign.ApacheHttpClient.execute(..))")
public void annotationPointcut() {}
@Around("annotationPointcut()")
public Object around(ProceedingJoinPoint pjp) throws Throwable {
// 设置 Header,不能通过 Feign 的 RequestInterceptor,因为咱们要拿到最初调用 ApacheHttpClient 的线程上下文
Request request = (Request) pjp.getArgs()[0];
Field headers = ReflectionUtils.findField(Request.class, "headers");
ReflectionUtils.makeAccessible(headers);
Map<String, Collection<String>> map = (Map<String, Collection<String>>) ReflectionUtils.getField(headers, request);
HashMap<String, Collection<String>> stringCollectionHashMap = new HashMap<>(map);
stringCollectionHashMap.put(THREAD_ID_HEADER, List.of(String.valueOf(Thread.currentThread().getName())));
ReflectionUtils.setField(headers, request, stringCollectionHashMap);
return pjp.proceed();}
}
这样,咱们就能拿到具体承载申请的线程的名称,从名称中能够看出他所处于的线程池(格局为“bulkhead- 线程隔离名称 -n”,例如 bulkhead-testService1Client:www.httpbin.org:80-1
),接下来咱们就来看下不同的实例是否用了不同的线程池进行调用:
@Test
public void testDifferentThreadPoolForDifferentInstance() throws InterruptedException {
// 避免断路器影响
circuitBreakerRegistry.getAllCircuitBreakers().asJava().forEach(CircuitBreaker::reset);
Set<String> threadIds = Sets.newConcurrentHashSet();
Thread[] threads = new Thread[100];
// 循环 100 次
for (int i = 0; i < 100; i++) {threads[i] = new Thread(() -> {Span span = tracer.nextSpan();
try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {HttpBinAnythingResponse response = testService1Client.anything();
// 因为 anything 会返回咱们发送的申请实体的所有内容,所以咱们能获取到申请的线程名称 header
String threadId = response.getHeaders().get(THREAD_ID_HEADER);
threadIds.add(threadId);
}
});
threads[i].start();}
for (int i = 0; i < 100; i++) {threads[i].join();}
// 确认实例 testService1Client:httpbin.org:80 线程池的线程存在
Assertions.assertTrue(threadIds.stream().anyMatch(s -> s.contains("testService1Client:httpbin.org:80")));
// 确认实例 testService1Client:httpbin.org:80 线程池的线程存在
Assertions.assertTrue(threadIds.stream().anyMatch(s -> s.contains("testService1Client:www.httpbin.org:80")));
}
这样,咱们就胜利验证了,实例调用的线程池隔离。
微信搜寻“我的编程喵”关注公众号,每日一刷,轻松晋升技术,斩获各种 offer: