共计 28705 个字符,预计需要花费 72 分钟才能阅读完成。
转载请注明出处: 翻译:Hystrix – How To Use
Hello World!
下面的代码展示了 HystrixCommand 版的 Hello World:
public class CommandHelloWorld extends HystrixCommand<String> {
private final String name;
public CommandHelloWorld(String name) {
super(HystrixCommandGroupKey.Factory.asKey(“ExampleGroup”));
this.name = name;
}
@Override
protected String run() {
// a real example would do work like a network call here
return “Hello ” + name + “!”;
}
}
查看源码
HystrixObservableCommand 的同等实现如下:
public class CommandHelloWorld extends HystrixObservableCommand<String> {
private final String name;
public CommandHelloWorld(String name) {
super(HystrixCommandGroupKey.Factory.asKey(“ExampleGroup”));
this.name = name;
}
@Override
protected Observable<String> construct() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> observer) {
try {
if (!observer.isUnsubscribed()) {
// a real example would do work like a network call here
observer.onNext(“Hello”);
observer.onNext(name + “!”);
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
} ).subscribeOn(Schedulers.io());
}
}
Synchronous Execution
可以通过调用 HystrixCommand.execute()方法实现同步执行, 示例如下:
String s = new CommandHelloWorld(“World”).execute();
测试如下:
@Test
public void testSynchronous() {
assertEquals(“Hello World!”, new CommandHelloWorld(“World”).execute());
assertEquals(“Hello Bob!”, new CommandHelloWorld(“Bob”).execute());
}
HystrixObservableCommand 不提供同步执行方法, 但是如果确定其只会产生一个值, 那么也可以用如下方式实现:
HystrixObservableCommand.observe().observe().toBlocking().toFuture().get()
HystrixObservableCommand.toObservable().observe().toBlocking().toFuture().get()
如果实际上产生了多个值, 上述的代码将会抛出 java.lang.IllegalArgumentException: Sequence contains too many elements.
Asynchronous Execution
可以通过调用 HystrixCommand.queue()方法实现异步执行, 示例如下:
Future<String> fs = new CommandHelloWorld(“World”).queue();
此时可以通过 Future.get()方法获取 command 执行结果:
String s = fs.get();
测试代码如下:
@Test
public void testAsynchronous1() throws Exception {
assertEquals(“Hello World!”, new CommandHelloWorld(“World”).queue().get());
assertEquals(“Hello Bob!”, new CommandHelloWorld(“Bob”).queue().get());
}
@Test
public void testAsynchronous2() throws Exception {
Future<String> fWorld = new CommandHelloWorld(“World”).queue();
Future<String> fBob = new CommandHelloWorld(“Bob”).queue();
assertEquals(“Hello World!”, fWorld.get());
assertEquals(“Hello Bob!”, fBob.get());
}
下面的两种实现是等价的:
String s1 = new CommandHelloWorld(“World”).execute();
String s2 = new CommandHelloWorld(“World”).queue().get();
HystrixObservableCommand 不提供 queue 方法, 但是如果确定其只会产生一个值, 那么也可以用如下方式实现:
HystrixObservableCommand.observe().observe().toBlocking().toFuture()
HystrixObservableCommand.toObservable().observe().toBlocking().toFuture()
如果实际上产生了多个值, 上述的代码将会抛出 java.lang.IllegalArgumentException: Sequence contains too many elements.
Reactive Execution
你也可以将 HystrixCommand 当做一个可观察对象 (Observable) 来观察 (Observe) 其产生的结果, 可以使用以下任意一个方法实现:
observe(): 一旦调用该方法, 请求将立即开始执行, 其利用 ReplaySubject 特性可以保证不会丢失任何 command 产生的结果, 即使结果在你订阅之前产生的也不会丢失.
toObservable(): 调用该方法后不会立即执行请求, 而是当有订阅者订阅时才会执行.
Observable<String> ho = new CommandHelloWorld(“World”).observe();
// or Observable<String> co = new CommandHelloWorld(“World”).toObservable();
然后你可以通过订阅到这个 Observable 来取得 command 产生的结果:
ho.subscribe(new Action1<String>() {
@Override
public void call(String s) {
// value emitted here
}
});
测试如下:
@Test
public void testObservable() throws Exception {
Observable<String> fWorld = new CommandHelloWorld(“World”).observe();
Observable<String> fBob = new CommandHelloWorld(“Bob”).observe();
// blocking
assertEquals(“Hello World!”, fWorld.toBlockingObservable().single());
assertEquals(“Hello Bob!”, fBob.toBlockingObservable().single());
// non-blocking
// – this is a verbose anonymous inner-class approach and doesn’t do assertions
fWorld.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
// nothing needed here
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(String v) {
System.out.println(“onNext: ” + v);
}
});
// non-blocking
// – also verbose anonymous inner-class
// – ignore errors and onCompleted signal
fBob.subscribe(new Action1<String>() {
@Override
public void call(String v) {
System.out.println(“onNext: ” + v);
}
});
}
使用 Java 8 的 Lambda 表达式可以使代码更简洁:
fWorld.subscribe((v) -> {
System.out.println(“onNext: ” + v);
})
// – or while also including error handling
fWorld.subscribe((v) -> {
System.out.println(“onNext: ” + v);
}, (exception) -> {
exception.printStackTrace();
})
关于 Observable 的信息可以在这里查阅
Reactive Commands
相比将 HystrixCommand 使用上述方法转换成一个 Observable, 你也可以选择创建一个 HystrixObservableCommand 对象. HystrixObservableCommand 包装的 Observable 允许产生多个结果(译者注: Subscriber.onNext 可以调用多次), 而 HystrixCommand 即使转换成了 Observable 也只能产生一个结果.
使用 HystrixObservableCommnad 时, 你需要重载 construct 方法来实现你的业务逻辑, 而不是重载 run 方法, contruct 方法将会返回你需要包装的 Observable.
使用下面任意一个方法可以从 HystrixObservableCommand 中获取 Observable 对象:
observe(): 一旦调用该方法, 请求将立即开始执行, 其利用 ReplaySubject 特性可以保证不会丢失任何 command 产生的结果, 即使结果在你订阅之前产生的也不会丢失.
toObservable(): 调用该方法后不会立即执行请求, 而是当有订阅者订阅时才会执行.
Fallback
大多数情况下, 我们都希望 command 在执行失败时能够有一个候选方法来处理, 如: 返回一个默认值或执行其他失败处理逻辑, 除了以下几个情况:
执行写操作的 command: 当 command 的目标是执行写操作而不是读操作, 那么通常需要将写操作失败的错误交给调用者处理.
批处理系统 / 离线计算: 如果 command 的目标是做一些离线计算、生成报表、填充缓存等, 那么同样应该将失败交给调用者处理.
无论 command 是否实现了 getFallback()方法, command 执行失败时, Hystrix 的状态和断路器 (circuit-breaker) 的状态 / 指标都会进行更新.
HystrixCommand 可以通过实现 getFallback()方法来实现降级处理, run()方法异常、执行超时、线程池或信号量已满拒绝提供服务、断路器短路时, 都会调用 getFallback():
public class CommandHelloFailure extends HystrixCommand<String> {
private final String name;
public CommandHelloFailure(String name) {
super(HystrixCommandGroupKey.Factory.asKey(“ExampleGroup”));
this.name = name;
}
@Override
protected String run() {
throw new RuntimeException(“this command always fails”);
}
@Override
protected String getFallback() {
return “Hello Failure ” + name + “!”;
}
}
查看源码
这个命令的 run()方法总是会执行失败, 但是调用者总是能收到 getFallback()方法返回的值, 而不是收到一个异常:
@Test
public void testSynchronous() {
assertEquals(“Hello Failure World!”, new CommandHelloFailure(“World”).execute());
assertEquals(“Hello Failure Bob!”, new CommandHelloFailure(“Bob”).execute());
}
HystrixObservableCommand 可以通过重载 resumeWithFallback 方法实现原 Observable 执行失败时返回回另一个 Observable, 需要注意的是, 原 Observable 有可能在发出多个结果之后才出现错误, 因此在 fallback 实现的逻辑中不应该假设订阅者只会收到失败逻辑中发出的结果.
Hystrix 内部使用了 RxJava 的 onErrorResumeNext 操作符来实现 Observable 之间的无缝转移.
Error Propagation
除 HystrixBadRequestException 异常外, run 方法中抛出的所有异常都会被认为是执行失败且会触发 getFallback()方法和断路器的逻辑.
你可以在 HystrixBadRequestException 中包装想要抛出的异常, 然后通过 getCause()方法获取. HystrixBadRequestException 使用在不应该被错误指标 (failure metrics) 统计和不应该触发 getFallback()方法的场景, 例如报告参数不合法或者非系统异常等.
对于 HystrixObservableCommand, 不可恢复的错误都会在通过 onError 方法通知, 并通过获取用户实现的 resumeWithFallback()方法返回的 Observable 来完成回退机制.
执行异常类型
Failure Type
Exception class
Exception.cause
FAILURE
HystrixRuntimeException
underlying exception(user-controlled)
TIMEOUT
HystrixRuntimeException
j.u.c.TimeoutException
SHORT_CIRCUITED
HystrixRuntimeException
j.l.RuntimeException
THREAD_POOL_REJECTED
HystrixRuntimeException
j.u.c.RejectedExecutionException
SEMAPHORE_REJECTED
HystrixRuntimeException
j.l.RuntimeException
BAD_REQUEST
HystrixBadRequestException
underlying exception(user-controller)
Command Name
默认的 command name 是从类名中派生的:
getClass().getSimpleName()
可以通过 HystrixCommand 或 HystrixObservableCommand 的构造器来指定 command name:
public CommandHelloWorld(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(“ExampleGroup”))
.andCommandKey(HystrixCommandKey.Factory.asKey(“HelloWorld”)));
this.name = name;
}
可以通过如下方式来重用 Setter:
private static final Setter cachedSetter =
Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(“ExampleGroup”))
.andCommandKey(HystrixCommandKey.Factory.asKey(“HelloWorld”));
public CommandHelloWorld(String name) {
super(cachedSetter);
this.name = name;
}
HystrixCommandKey 是一个接口, 因此可以将其实现为一个枚举或者常规的类, 但是它已经内置了一个 Factory 类来构建帮助构建内部实例, 使用方式如下:
HystrixCommandKey.Factory.asKey(“Hello World”);
Command Group
Hystrix 使用 command group 来为分组, 分组信息主要用于报告、警报、仪表盘上显示, 或者是标识团队 / 库的拥有者.
默认情况下, 除非已经用这个名字定义了一个信号量, 否则 Hystrix 将使用这个名称来定义 command 的线程池.
HystrixCommandGroupKey 是一个接口, 因此可以将其实现为一个枚举或者常规的类, 但是它已经内置了一个 Factory 类来构建帮助构建内部实例, 使用方式如下:
HystrixCommandGroupKey.Factory.asKey(“Example Group”)
Command Thread-pool
thread-pool key 主要用于在监控、指标发布、缓存等类似场景中标识一个 HystrixThreadPool, 一个 HystrixCommand 于其构造函数中传入的 HystrixThreadPoolKey 指定的 HystrixThreadPool 相关联, 如果未指定的话, 则使用 HystrixCommandGroupKey 来获取 / 创建 HystrixThreadPool.
可以通过 HystrixCommand 或 HystrixObservableCommand 的构造器来指定其值:
public CommandHelloWorld(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(“ExampleGroup”))
.andCommandKey(HystrixCommandKey.Factory.asKey(“HelloWorld”))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(“HelloWorldPool”)));
this.name = name;
}
HystrixCommandThreadPoolKey 是一个接口, 因此可以将其实现为一个枚举或者常规的类, 但是它已经内置了一个 Factory 类来构建帮助构建内部实例, 使用方式如下:
HystrixThreadPoolKey.Factory.asKey(“Hello World Pool”)
使用 HystrixThreadPoolKey 而不是使用不同的 HystrixCommandGroupKey 的原因是: 可能会有多条 command 在逻辑功能上属于同一个组(group), 但是其中的某些 command 需要和其他 command 隔离开, 例如:
两条用于访问视频元数据的 command
两条 command 的 group name 都是 VideoMetadata
command A 与资源 #1 互斥
command B 与资源 #2 互斥
如果 command A 由于延迟等原因导致其所在的线程池资源耗尽, 不应该影响 command B 对 #2 的执行, 因为他们访问的是不同的后端资源.
因此, 从逻辑上来说, 我们希望这两条 command 应该被分到同一个分组, 但是我们同样系统将这两条命令的执行隔离开来, 因此我们使用 HystrixThreadPoolKey 将其分配到不同的线程池.
Request Cache
可以通过实现 HystrixCommand 或 HystrixObservableCommand 的 getCacheKey()方法开启用对请求的缓存功能:
public class CommandUsingRequestCache extends HystrixCommand<Boolean> {
private final int value;
protected CommandUsingRequestCache(int value) {
super(HystrixCommandGroupKey.Factory.asKey(“ExampleGroup”));
this.value = value;
}
@Override
protected Boolean run() {
return value == 0 || value % 2 == 0;
}
@Override
protected String getCacheKey() {
return String.valueOf(value);
}
}
由于该功能依赖于请求的上下文信息, 因此我们必须初始化一个 HystrixRequestContext, 使用方式如下:
@Test
public void testWithoutCacheHits() {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
assertTrue(new CommandUsingRequestCache(2).execute());
assertFalse(new CommandUsingRequestCache(1).execute());
assertTrue(new CommandUsingRequestCache(0).execute());
assertTrue(new CommandUsingRequestCache(58672).execute());
} finally {
context.shutdown();
}
}
通常情况下, 上下文信息 (HystrixRequestContext) 应该在持有用户请求的 ServletFilter 或者其他拥有生命周期管理功能的类来初始化和关闭.
下面的例子展示了 command 如何从缓存中获取数据, 以及如何查询一个数据是否是从缓存中获取到的:
@Test
public void testWithCacheHits() {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
CommandUsingRequestCache command2a = new CommandUsingRequestCache(2);
CommandUsingRequestCache command2b = new CommandUsingRequestCache(2);
assertTrue(command2a.execute());
// this is the first time we’ve executed this command with
// the value of “2” so it should not be from cache
assertFalse(command2a.isResponseFromCache());
assertTrue(command2b.execute());
// this is the second time we’ve executed this command with
// the same value so it should return from cache
assertTrue(command2b.isResponseFromCache());
} finally {
context.shutdown();
}
// start a new request context
context = HystrixRequestContext.initializeContext();
try {
CommandUsingRequestCache command3b = new CommandUsingRequestCache(2);
assertTrue(command3b.execute());
// this is a new request context so this
// should not come from cache
assertFalse(command3b.isResponseFromCache());
} finally {
context.shutdown();
}
}
Request Collapsing
请求合并可以用于将多条请求绑定到一起, 由同一个 HystrixCommand 实例执行.
collapser 可以通过 batch size 和 batch 创建以来的耗时来自动将请求合并执行.
Hystrix 支持两个请求合并方式: 请求级的合并和全局级的合并. 默认是请求范围的合并, 可以在构造 collapser 时指定值.
请求级 (request-scoped) 的 collapser 只会合并每一个 HystrixRequestContext 中的请求, 而全局级 (globally-scoped) 的 collapser 则可以跨 HystrixRequestContext 合并请求. 因此, 如果你下游的依赖者无法再一个 command 中处理多个 HystrixRequestContext 的话, 那么你应该使用请求级的合并.
在 Netflix, 我们只会使用请求级的合并, 因为我们当前所有的系统都是基于一个 command 对应一个 HystrixRequestContext 的设想下构建的. 因此, 当一个 command 使用不同的参数在一个请求中并发执行时, 合并是有效的.
下面的代码展示了如何实现请求级的 HystrixCollapser:
public class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, Integer> {
private final Integer key;
public CommandCollapserGetValueForKey(Integer key) {
this.key = key;
}
@Override
public Integer getRequestArgument() {
return key;
}
@Override
protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) {
return new BatchCommand(requests);
}
@Override
protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) {
int count = 0;
for (CollapsedRequest<String, Integer> request : requests) {
request.setResponse(batchResponse.get(count++));
}
}
private static final class BatchCommand extends HystrixCommand<List<String>> {
private final Collection<CollapsedRequest<String, Integer>> requests;
private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(“ExampleGroup”))
.andCommandKey(HystrixCommandKey.Factory.asKey(“GetValueForKey”)));
this.requests = requests;
}
@Override
protected List<String> run() {
ArrayList<String> response = new ArrayList<String>();
for (CollapsedRequest<String, Integer> request : requests) {
// artificial response for each argument received in the batch
response.add(“ValueForKey: ” + request.getArgument());
}
return response;
}
}
}
下面的代码展示了如果使用 collapser 自动合并 4 个 CommandCollapserGetValueForKey 到一个 HystrixCommand 中执行:
@Test
public void testCollapser() throws Exception {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
Future<String> f1 = new CommandCollapserGetValueForKey(1).queue();
Future<String> f2 = new CommandCollapserGetValueForKey(2).queue();
Future<String> f3 = new CommandCollapserGetValueForKey(3).queue();
Future<String> f4 = new CommandCollapserGetValueForKey(4).queue();
assertEquals(“ValueForKey: 1”, f1.get());
assertEquals(“ValueForKey: 2”, f2.get());
assertEquals(“ValueForKey: 3”, f3.get());
assertEquals(“ValueForKey: 4”, f4.get());
// assert that the batch command ‘GetValueForKey’ was in fact
// executed and that it executed only once
assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size());
HystrixCommand<?> command = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand<?>[1])[0];
// assert the command is the one we’re expecting
assertEquals(“GetValueForKey”, command.getCommandKey().name());
// confirm that it was a COLLAPSED command execution
assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED));
// and that it was successful
assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));
} finally {
context.shutdown();
}
}
Request Context Setup
使用请求级的特性时 (如: 请求缓存、请求合并、请求日志) 你必须管理 HystrixRequestContext 的生命周期(或者实现 HystrixConcurrencyStategy).
这意味着你必须在请求之前执行如下代码:
HystrixRequestContext context = HystrixRequestContext.initializeContext();
并在请求结束后执行如下代码:
context.shutdown();
在标准的 Java web 应用中, 你可以使用 Setvlet Filter 实现的如下的过滤器来管理:
public class HystrixRequestContextServletFilter implements Filter {
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
chain.doFilter(request, response);
} finally {
context.shutdown();
}
}
}
可以在 web.xml 中加入如下代码实现对所有的请求都使用该过滤器:
<filter>
<display-name>HystrixRequestContextServletFilter</display-name>
<filter-name>HystrixRequestContextServletFilter</filter-name>
<filter-class>com.netflix.hystrix.contrib.requestservlet.HystrixRequestContextServletFilter</filter-class>
</filter>
<filter-mapping>
<filter-name>HystrixRequestContextServletFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
Common Patterns
以下是 HystrixCommand 和 HystrixObservableCommand 的一般用法和使用模式.
Fail Fast
最基本的使用是执行一条只做一件事情且没有实现回退方法的 command, 这样的 command 在发生任何错误时都会抛出异常:
public class CommandThatFailsFast extends HystrixCommand<String> {
private final boolean throwException;
public CommandThatFailsFast(boolean throwException) {
super(HystrixCommandGroupKey.Factory.asKey(“ExampleGroup”));
this.throwException = throwException;
}
@Override
protected String run() {
if (throwException) {
throw new RuntimeException(“failure from CommandThatFailsFast”);
} else {
return “success”;
}
}
下面的代码演示了上述行为:
@Test
public void testSuccess() {
assertEquals(“success”, new CommandThatFailsFast(false).execute());
}
@Test
public void testFailure() {
try {
new CommandThatFailsFast(true).execute();
fail(“we should have thrown an exception”);
} catch (HystrixRuntimeException e) {
assertEquals(“failure from CommandThatFailsFast”, e.getCause().getMessage());
e.printStackTrace();
}
}
HystrixObservableCommand 需要重载 resumeWithFallback()方法来实现同样的行为:
@Override
protected Observable<String> resumeWithFallback() {
if (throwException) {
return Observable.error(new Throwable(“failure from CommandThatFailsFast”));
} else {
return Observable.just(“success”);
}
}
Fail Silent
静默失败等同于返回一个空的响应或者移除功能. 可以是返回 null、空 Map、空 List, 或者其他类似的响应.
可以通过实现 HystrixCommand.getFallback()方法实现该功能:
public class CommandThatFailsSilently extends HystrixCommand<String> {
private final boolean throwException;
public CommandThatFailsSilently(boolean throwException) {
super(HystrixCommandGroupKey.Factory.asKey(“ExampleGroup”));
this.throwException = throwException;
}
@Override
protected String run() {
if (throwException) {
throw new RuntimeException(“failure from CommandThatFailsFast”);
} else {
return “success”;
}
}
@Override
protected String getFallback() {
return null;
}
}
@Test
public void testSuccess() {
assertEquals(“success”, new CommandThatFailsSilently(false).execute());
}
@Test
public void testFailure() {
try {
assertEquals(null, new CommandThatFailsSilently(true).execute());
} catch (HystrixRuntimeException e) {
fail(“we should not get an exception as we fail silently with a fallback”);
}
}
或者返回一个空 List 的实现如下:
@Override
protected List<String> getFallback() {
return Collections.emptyList();
}
HystrixObservableCommand 可以通过重载 resumeWithFallback()方法实现同样的行为:
@Override
protected Observable<String> resumeWithFallback() {
return Observable.empty();
}
Fallback: Static
Fallback 可以返回代码里设定的默认值, 这种方式可以通过默认行为来有效避免于静默失败带来影响.
例如, 如果一个应返回 true/false 的用户认证的 command 执行失败了, 那么其默认行为可以如下:
@Override
protected Boolean getFallback() {
return true;
}
对于 HystrixObservableCommand 可以通过重载 resumeWithFallback()方法实现同样的行为:
@Override
protected Observable<Boolean> resumeWithFallback() {
return Observable.just(true);
}
Fallback: Stubbed
当 command 返回的是一个包含多个字段的复合对象, 且该对象的一部分字段值可以通过其他请求状态获得, 另一部分状态可以通过设置默认值获得时, 你通常需要使用存根 (stubbed) 模式.
你可能可以从存根值 (stubbed values) 中得到适当的值的情况如下:
cookies
请求参数和请求头
当前失败请求的前一个服务请求的响应
在 fallback 代码块内可以静态地获取请求范围内的存根 (stubbed) 值, 但是通常我们更推荐在构建 command 实例时注入这些值, 就像下面实例的代码中的 countryCodeFromGeoLookup 一样:
public class CommandWithStubbedFallback extends HystrixCommand<UserAccount> {
private final int customerId;
private final String countryCodeFromGeoLookup;
/**
* @param customerId
* The customerID to retrieve UserAccount for
* @param countryCodeFromGeoLookup
* The default country code from the HTTP request geo code lookup used for fallback.
*/
protected CommandWithStubbedFallback(int customerId, String countryCodeFromGeoLookup) {
super(HystrixCommandGroupKey.Factory.asKey(“ExampleGroup”));
this.customerId = customerId;
this.countryCodeFromGeoLookup = countryCodeFromGeoLookup;
}
@Override
protected UserAccount run() {
// fetch UserAccount from remote service
// return UserAccountClient.getAccount(customerId);
throw new RuntimeException(“forcing failure for example”);
}
@Override
protected UserAccount getFallback() {
/**
* Return stubbed fallback with some static defaults, placeholders,
* and an injected value ‘countryCodeFromGeoLookup’ that we’ll use
* instead of what we would have retrieved from the remote service.
*/
return new UserAccount(customerId, “Unknown Name”,
countryCodeFromGeoLookup, true, true, false);
}
public static class UserAccount {
private final int customerId;
private final String name;
private final String countryCode;
private final boolean isFeatureXPermitted;
private final boolean isFeatureYPermitted;
private final boolean isFeatureZPermitted;
UserAccount(int customerId, String name, String countryCode,
boolean isFeatureXPermitted,
boolean isFeatureYPermitted,
boolean isFeatureZPermitted) {
this.customerId = customerId;
this.name = name;
this.countryCode = countryCode;
this.isFeatureXPermitted = isFeatureXPermitted;
this.isFeatureYPermitted = isFeatureYPermitted;
this.isFeatureZPermitted = isFeatureZPermitted;
}
}
}
下面的代码演示了上述行为:
@Test
public void test() {
CommandWithStubbedFallback command = new CommandWithStubbedFallback(1234, “ca”);
UserAccount account = command.execute();
assertTrue(command.isFailedExecution());
assertTrue(command.isResponseFromFallback());
assertEquals(1234, account.customerId);
assertEquals(“ca”, account.countryCode);
assertEquals(true, account.isFeatureXPermitted);
assertEquals(true, account.isFeatureYPermitted);
assertEquals(false, account.isFeatureZPermitted);
}
对于 HystrixObservableCommand 可以通过重载 resumeWithFallback()方法实现同样的行为:
@Override
protected Observable<Boolean> resumeWithFallback() {
return Observable.just(new UserAccount(customerId, “Unknown Name”,
countryCodeFromGeoLookup, true, true, false) );
}
如果你想要从 Observable 中发出多个值, 那么当失败发生时, 原本的 Observable 可能已经发出的一部分值, 此时你或许更希望能够只从 fallback 逻辑中发出另一部分未被发出的值, 下面的例子就展示了如何实现这一个目的: 它通过追踪原 Observable 发出的最后一个值来实现 fallback 逻辑中的 Observable 应该从什么地方继续发出存根值(stubbed value) :
@Override
protected Observable<Integer> construct() {
return Observable.just(1, 2, 3)
.concatWith(Observable.<Integer> error(new RuntimeException(“forced error”)))
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer t1) {
lastSeen = t1;
}
})
.subscribeOn(Schedulers.computation());
}
@Override
protected Observable<Integer> resumeWithFallback() {
if (lastSeen < 4) {
return Observable.range(lastSeen + 1, 4 – lastSeen);
} else {
return Observable.empty();
}
}
Fallback: Cache via Network
有时后端的服务异常也会引起 command 执行失败, 此时我们也可以从缓存中 (如: memcached) 取得相关的数据.
由于在 fallback 的逻辑代码中访问网络可能会再次失败, 因此必须构建新的 HystrixCommand 或 HystrixObservableCommand 来执行:
很重要的一点是执行 fallback 逻辑的 command 需要在一个不同的线程池中执行, 否则如果原 command 的延迟变高且其所在线程池已经满了的话, 执行 fallback 逻辑的 command 将无法在同一个线程池中执行.
下面的代码展示了 CommandWithFallbackViaNetwork 如何在 getFallback()方法中执行 FallbackViaNetwork.
注意, FallbackViaNetwork 同样也具有回退机制, 这里通过返回 null 来实现 fail silent.
FallbackViaNetwork 默认会从 HystrixCommandGroupKey 中继承线程池的配置 RemoteServiceX, 因此需要在其构造器中注入 HystrixThreadPoolKey.Factory.asKey(“RemoteServiceXFallback”)来使其在不同的线程池中执行.
这样, CommandWithFallbackViaNetwork 会在名为 RemoteServiceX 的线程池中执行, 而 FallbackViaNetwork 会在名为 RemoteServiceXFallback 的线程池中执行.
public class CommandWithFallbackViaNetwork extends HystrixCommand<String> {
private final int id;
protected CommandWithFallbackViaNetwork(int id) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(“RemoteServiceX”))
.andCommandKey(HystrixCommandKey.Factory.asKey(“GetValueCommand”)));
this.id = id;
}
@Override
protected String run() {
// RemoteServiceXClient.getValue(id);
throw new RuntimeException(“force failure for example”);
}
@Override
protected String getFallback() {
return new FallbackViaNetwork(id).execute();
}
private static class FallbackViaNetwork extends HystrixCommand<String> {
private final int id;
public FallbackViaNetwork(int id) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(“RemoteServiceX”))
.andCommandKey(HystrixCommandKey.Factory.asKey(“GetValueFallbackCommand”))
// use a different threadpool for the fallback command
// so saturating the RemoteServiceX pool won’t prevent
// fallbacks from executing
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(“RemoteServiceXFallback”)));
this.id = id;
}
@Override
protected String run() {
MemCacheClient.getValue(id);
}
@Override
protected String getFallback() {
// the fallback also failed
// so this fallback-of-a-fallback will
// fail silently and return null
return null;
}
}
}
Primary + Secondary with Fallback
有些系统可能具有是以双系统模式搭建的 — 主从模式或主备模式.
有时从系统或备用系统会被认为是失败状态的一种, 仅在执行 fallback 逻辑是才使用它; 这种场景和 Cache via Network 一节中描述的场景是一样的.
然而, 如果切换到从系统是一个很正常时, 例如发布新代码时(这是有状态的系统发布代码的一种方式), 此时每当切换到从系统使用时, 主系统都是处于不可用状态, 断路器将会打开且发出警报.
这并不是我们期望发生的事, 这种狼来了式的警报可能会导致真正发生问题的时候我们却把它当成正常的误报而忽略了.
因此, 我们可以通过在其前面放置一个门面 HystrixCommand(见下文), 将主 / 从系统的切换视为正常的、健康的状态.
主从 HystrixCommand 都是需要访问网络且实现了特定的业务逻辑, 因此其实现上应该是线程隔离的. 它们可能具有显著的性能差距(通常从系统是一个静态缓存), 因此将两个 command 隔离的另一个好处是可以针对性地调优.
你不需要将这两个 command 都公开发布, 只需要将它们隐藏在另一个由信号量隔离的 HystrixCommand 中(称之为门面 HystrixCommand), 在这个 command 中去实现主系统还是从系统的调用选择. 只有当主从系统都失败了, 才会去执行这个门面 command 的 fallback 逻辑.
门面 HystrixCommand 可以使用信号量隔离的, 因为其业务逻辑仅仅是调用另外两个线程隔离的 HystrixCommand, 它不涉及任何的网络访问、重试等容易出错的事, 因此没必要将这部分代码放到其他线程去执行.
public class CommandFacadeWithPrimarySecondary extends HystrixCommand<String> {
private final static DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance().getBooleanProperty(“primarySecondary.usePrimary”, true);
private final int id;
public CommandFacadeWithPrimarySecondary(int id) {
super(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey(“SystemX”))
.andCommandKey(HystrixCommandKey.Factory.asKey(“PrimarySecondaryCommand”))
.andCommandPropertiesDefaults(
// we want to default to semaphore-isolation since this wraps
// 2 others commands that are already thread isolated
HystrixCommandProperties.Setter()
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)));
this.id = id;
}
@Override
protected String run() {
if (usePrimary.get()) {
return new PrimaryCommand(id).execute();
} else {
return new SecondaryCommand(id).execute();
}
}
@Override
protected String getFallback() {
return “static-fallback-” + id;
}
@Override
protected String getCacheKey() {
return String.valueOf(id);
}
private static class PrimaryCommand extends HystrixCommand<String> {
private final int id;
private PrimaryCommand(int id) {
super(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey(“SystemX”))
.andCommandKey(HystrixCommandKey.Factory.asKey(“PrimaryCommand”))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(“PrimaryCommand”))
.andCommandPropertiesDefaults(
// we default to a 600ms timeout for primary
HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(600)));
this.id = id;
}
@Override
protected String run() {
// perform expensive ‘primary’ service call
return “responseFromPrimary-” + id;
}
}
private static class SecondaryCommand extends HystrixCommand<String> {
private final int id;
private SecondaryCommand(int id) {
super(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey(“SystemX”))
.andCommandKey(HystrixCommandKey.Factory.asKey(“SecondaryCommand”))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(“SecondaryCommand”))
.andCommandPropertiesDefaults(
// we default to a 100ms timeout for secondary
HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(100)));
this.id = id;
}
@Override
protected String run() {
// perform fast ‘secondary’ service call
return “responseFromSecondary-” + id;
}
}
public static class UnitTest {
@Test
public void testPrimary() {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
ConfigurationManager.getConfigInstance().setProperty(“primarySecondary.usePrimary”, true);
assertEquals(“responseFromPrimary-20”, new CommandFacadeWithPrimarySecondary(20).execute());
} finally {
context.shutdown();
ConfigurationManager.getConfigInstance().clear();
}
}
@Test
public void testSecondary() {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
ConfigurationManager.getConfigInstance().setProperty(“primarySecondary.usePrimary”, false);
assertEquals(“responseFromSecondary-20”, new CommandFacadeWithPrimarySecondary(20).execute());
} finally {
context.shutdown();
ConfigurationManager.getConfigInstance().clear();
}
}
}
}
Client Doesn’t Perform Network Access
当你使用 HystrixCommand 实现的业务逻辑不涉及到网络访问、对延迟敏感且无法接受多线程带来的开销时, 你需要设置 executionIsolationStrategy)属性的值为 ExecutionIsolationStrategy.SEMAPHORE, 此时 Hystrix 会使用信号量隔离代替线程隔离.
下面的代码展示了如何为 command 设置该属性(也可以在运行时动态改变这个属性的值):
public class CommandUsingSemaphoreIsolation extends HystrixCommand<String> {
private final int id;
public CommandUsingSemaphoreIsolation(int id) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(“ExampleGroup”))
// since we’re doing an in-memory cache lookup we choose SEMAPHORE isolation
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)));
this.id = id;
}
@Override
protected String run() {
// a real implementation would retrieve data from in memory data structure
return “ValueFromHashMap_” + id;
}
}
Get-Set-Get with Request Cache Invalidation
Get-Set-Get 是指: Get 请求的结果被缓存下来后, 另一个 command 对同一个资源发出了 Set 请求, 此时由 Get 请求缓存的结果应该失效, 避免随后的 Get 请求获取到过时的缓存结果, 此时可以通过调用 HystrixRequestCache.clear())方法来使缓存失效.
public class CommandUsingRequestCacheInvalidation {
/* represents a remote data store */
private static volatile String prefixStoredOnRemoteDataStore = “ValueBeforeSet_”;
public static class GetterCommand extends HystrixCommand<String> {
private static final HystrixCommandKey GETTER_KEY = HystrixCommandKey.Factory.asKey(“GetterCommand”);
private final int id;
public GetterCommand(int id) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(“GetSetGet”))
.andCommandKey(GETTER_KEY));
this.id = id;
}
@Override
protected String run() {
return prefixStoredOnRemoteDataStore + id;
}
@Override
protected String getCacheKey() {
return String.valueOf(id);
}
/**
* Allow the cache to be flushed for this object.
*
* @param id
* argument that would normally be passed to the command
*/
public static void flushCache(int id) {
HystrixRequestCache.getInstance(GETTER_KEY,
HystrixConcurrencyStrategyDefault.getInstance()).clear(String.valueOf(id));
}
}
public static class SetterCommand extends HystrixCommand<Void> {
private final int id;
private final String prefix;
public SetterCommand(int id, String prefix) {
super(HystrixCommandGroupKey.Factory.asKey(“GetSetGet”));
this.id = id;
this.prefix = prefix;
}
@Override
protected Void run() {
// persist the value against the datastore
prefixStoredOnRemoteDataStore = prefix;
// flush the cache
GetterCommand.flushCache(id);
// no return value
return null;
}
}
}
@Test
public void getGetSetGet() {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
assertEquals(“ValueBeforeSet_1”, new GetterCommand(1).execute());
GetterCommand commandAgainstCache = new GetterCommand(1);
assertEquals(“ValueBeforeSet_1”, commandAgainstCache.execute());
// confirm it executed against cache the second time
assertTrue(commandAgainstCache.isResponseFromCache());
// set the new value
new SetterCommand(1, “ValueAfterSet_”).execute();
// fetch it again
GetterCommand commandAfterSet = new GetterCommand(1);
// the getter should return with the new prefix, not the value from cache
assertFalse(commandAfterSet.isResponseFromCache());
assertEquals(“ValueAfterSet_1”, commandAfterSet.execute());
} finally {
context.shutdown();
}
}
}
Migrating a Library to Hystrix
如果你要迁移一个已有的客户端库到 Hystrix, 你应该将所有的服务方法 (service methods) 替换成 HystrixCommand.
服务方法 (service methods) 转而调用 HystrixCommand 且不在包含任何额外的业务逻辑.
因此, 在迁移之前, 一个服务库可能是这样的:
迁移完成之后, 服务库的用户要能直接访问到 HystrixCommand, 或者通过服务门面 (service facade) 的代理间接访问到 HystrixCommand.