共计 9410 个字符,预计需要花费 24 分钟才能阅读完成。
@[toc]
Seata 一共提供了四种分布式事务的解决模式:
- AT
- TCC
- XA
- Saga
后面三种松哥都和大家介绍过了,明天咱们来看看 Saga 这种模式。如果大家对于前三种还不太熟悉,能够先看看之前的文章,传送门:
- 五分钟带你体验一把分布式事务!so easy!
- 看了那么多博客,还是不懂 TCC,无妨看看这个案例!
- XA 事务水很深,小伙子我怕你把握不住!
好啦,开始明天的注释吧。
1. 什么是 Saga 事务模式
Saga 模式是 Seata 提供的长事务解决方案,在 Saga 模式中,业务流程中每个参与者都提交本地事务,当呈现某一个参与者失败则弥补后面曾经胜利的参与者, 一阶段正向服务和二阶段弥补服务都由业务开发实现 。
留神最初一句话很要害,阐明 Saga 模式的回滚其实和 AT、TCC 的回滚一样,都是反向弥补操作(区别于 XA 模式)。
官网给了上面一张流程图,咱们一起来看下:
能够看到,T1、T2、T3 始终到 Tn 别离代表分布式事务中的分支事务,这条线都是事务的失常状态,如果在执行的过程中,有某一个抛出异样,则执行 C3、C2 始终到 C1 进行事务的回滚,这里的回滚实际上就是反向弥补操作。
一般来说,Saga 模式实用于业务流程长、业务流程多的分布式事务,就像下面的流程图这样,不过当业务流程比拟长的时候,如何去定义每一个事务的状态也就成了问题。
这里就波及到 Saga 分布式事务的状态机。
2. Saga 的状态图
状态图这个货色,如果小伙伴们用过 Activiti 流程引擎,那么基本上就晓得什么是状态图,Saga 的状态图跟那个也差不多。
Saga 中的状态图是这样:
- 首先咱们须要定义一个状态流程图,像上面这样:
这个流程图官网提供了绘制工具,地址如下:
https://seata.io/saga_designer/index.html
官网还为此提供了一个视频教程,松哥看了下,录视频的人预计也是第一次录视频,没啥教训,视频各种问题没法看,所以我就不放链接了,小伙伴们在工作中如果须要绘制状态图,能够参考这个文档:
https://help.aliyun.com/document_detail/172550.html
流程图上记录了每一个分支事务的状态以及相干的弥补操作,流程图画好之后,会主动生成 JSON 状态语言定义文件,把这个文件未来拷贝到我的项目中。
- 状态图中的每一个节点能够调用一个服务,每一个节点都能够配置它的弥补节点,当节点出现异常时状态引擎反向执行已胜利节点对应的弥补节点将事务回滚(是否回滚可由用户自行决定)。
- 状态图能够实现服务编排需要,反对单项抉择、并发、子流程、参数转换、参数映射、服务执行状态判断、异样捕捉等性能。
3. Saga 模式案例
咱们来看一个 Saga 模式的案例,看完案例大家就懂什么是 Saga 模式了。
3.1 筹备工作
咱们还是应用官网的案例。不过还是松哥之前说的,官网的案例容易导入失败,并且里边有的中央有问题,所以小伙伴们能够间接在公众号后盾回复 seata-demo
下载本文案例。
Saga 的例子咱们用这个:
如果小伙伴们间接应用官网的案例,须要做如下批改:
- 批改 Dubbo 的版本为 2.7.3,本来默认的 3.0.1 这个版本运行时候有问题。
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring</artifactId>
</exclusion>
</exclusions>
<version>2.7.3</version>
</dependency>
- 官网默认提供的数据库脚本少一个字段,不晓得咋回事,这么显著的 BUG。这个须要咱们在
src/main/resources/sql/h2_init.sql
文件中,为 seata_state_inst 表增加一个gmt_updated timestamp(3) not null
字段。
筹备工作就算实现啦。
3.2 测试运行
接下来咱们来测试运行。
首先咱们先来执行 src/main/java/io/seata/samples/saga/starter/DubboSagaProviderStarter.java
中的 main 办法,启动服务端。
而后关上 src/main/java/io/seata/samples/saga/starter/DubboSagaTransactionStarter.java
类,这个类须要批改一下能力运行。
public static void main(String[] args) {AbstractApplicationContext applicationContext = new ClassPathXmlApplicationContext(new String[] {"spring/seata-saga.xml", "spring/seata-dubbo-reference.xml"});
StateMachineEngine stateMachineEngine = (StateMachineEngine) applicationContext.getBean("stateMachineEngine");
transactionCommittedDemo(stateMachineEngine);
transactionCompensatedDemo(stateMachineEngine);
new ApplicationKeeper(applicationContext).keep();}
能够看到,这个 main 办法中有两个测试方法:
- transactionCommittedDemo
- transactionCompensatedDemo
第一个办法是二阶段提交的测试,第二个办法是二阶段弥补的测试,咱们正文掉其中一个,每次执行的时候只有执行其中一个就能够了。
另外,对于 transactionCommittedDemo 办法,它里边提供了两种状态的获取形式:同步和异步,咱们须要正文掉其中一种而后进行测试,像上面这样:
private static void transactionCommittedDemo(StateMachineEngine stateMachineEngine) {Map<String, Object> startParams = new HashMap<>(3);
String businessKey = String.valueOf(System.currentTimeMillis());
startParams.put("businessKey", businessKey);
startParams.put("count", 10);
startParams.put("amount", new BigDecimal("100"));
//sync test
//StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("reduceInventoryAndBalance", null, businessKey, startParams);
//Assert.isTrue(ExecutionStatus.SU.equals(inst.getStatus()), "saga transaction execute failed. XID:" + inst.getId());
//System.out.println("saga transaction commit succeed. XID:" + inst.getId());
//async test
businessKey = String.valueOf(System.currentTimeMillis());
StateMachineInstance inst = stateMachineEngine.startWithBusinessKeyAsync("reduceInventoryAndBalance", null, businessKey, startParams, CALL_BACK);
waittingForFinish(inst);
Assert.isTrue(ExecutionStatus.SU.equals(inst.getStatus()), "saga transaction execute failed. XID:" + inst.getId());
System.out.println("saga transaction commit succeed. XID:" + inst.getId());
}
正文掉同步的代码块或者正文掉异步的代码块,正文掉之后,执行 main 办法进行测试。
如果测试 transactionCommittedDemo 办法,控制台打印日志如下:
saga transaction commit succeed. XID: 192.168.1.105:8091:2612256553007833092
如果测试 transactionCompensatedDemo 办法,控制台打印日志如下:
saga transaction compensate succeed. XID: 192.168.1.105:8091:2612256553007833094
能看到如上两个日志,阐明案例运行没问题了。
接下来咱们就来剖析一下,这个案例到底讲了个啥!
3.3 案例剖析
3.3.1 JSON 状态形容剖析
这个案例并没有一个明确的业务,就单纯是一个案例。
首先定义了两个 Action:
- InventoryAction
- BalanceAction
这两个 Action 中各自定义了两个办法:
- reduce
- compensateReduce
从办法名就能看出,reduce 办法就是失常的执行逻辑,compensateReduce 办法则是代码弥补逻辑,即回滚的时候须要执行的代码。
具体到这两个办法的实现上,并没有啥,都是打印日志,所以这个我的项目咱们只须要认真察看打印进去的日志,就能晓得事务是提交了还是回滚了。
在 src/main/resources/statelang/reduce_inventory_and_balance.json
文件中定义了各个事务的状态,咱们能够大略看一下,因为残缺 JSON 文件比拟长,我就分段贴出来。
{
"Name": "reduceInventoryAndBalance",
"Comment": "reduce inventory then reduce balance in a transaction",
"StartState": "ReduceInventory",
"Version": "0.0.1",
...
...
下面这段定义了状态机的名称为 reduceInventoryAndBalance,在一个我的项目中,咱们能够同时存在多个这样的 JSON 文件,每一个都有一个 name 属性,这样在 Java 代码调用的时候就能够通过具体的名字去指定须要调用哪一个流程了。StartState 则定义了整个流程从 ReduceInventory 开始,ReduceInventory 是前面定义的节点。
"ReduceInventory": {
"Type": "ServiceTask",
"ServiceName": "inventoryAction",
"ServiceMethod": "reduce",
"CompensateState": "CompensateReduceInventory",
"Next": "ChoiceState",
"Input": ["$.[businessKey]",
"$.[count]"
],
"Output": {"reduceInventoryResult": "$.#root"},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
}
}
这是整个流程图中的第一步,我挑几个要害的点说下。
- ServiceName:这个是服务的名字,也就是由哪个对象解决这里的申请,inventoryAction 是咱们通过 dubbo 获取到的对象。
- ServiceMethod:这个指定了要执行的办法,也就是失常执行的办法。
- CompensateState:这个是指定了负责弥补服务的节点,它的取值是这个 JSON 文件中定义的另外一个节点。
- Next:这是以后节点走完后,下一步要去的节点。
- Input/Output/Status:别离示意输出参数 / 输入参数以及各个状态的取值。
下面这个节点执行完,就会进入到上面这个节点中:
"ChoiceState":{
"Type": "Choice",
"Choices":[
{"Expression":"[reduceInventoryResult] == true",
"Next":"ReduceBalance"
}
],
"Default":"Fail"
},
这个很好懂,就是定义了期待下面一个节点的返回值为 true,如果上个节点返回值不为 true,那就是执行失败,要筹备弥补操作了;如果上个节点执行后果是为 true,那就进入下个节点 ReduceBalance。
前面节点的定义也都差不多,我就不一一列进去了,小伙伴们公号后盾回复 seata-demo
下载文章案例后,能够自行查看。
这是状态图。
3.3.2 代码剖析
接下来再简略看下代码。
官网提供了两个测试方法,一个用来测试二阶段提交,一个用来测试二阶段回滚。
先来看这个这个测试二阶段提交的办法:
private static void transactionCommittedDemo(StateMachineEngine stateMachineEngine) {Map<String, Object> startParams = new HashMap<>(3);
String businessKey = String.valueOf(System.currentTimeMillis());
startParams.put("businessKey", businessKey);
startParams.put("count", 10);
startParams.put("amount", new BigDecimal("100"));
businessKey = String.valueOf(System.currentTimeMillis());
StateMachineInstance inst = stateMachineEngine.startWithBusinessKeyAsync("reduceInventoryAndBalance", null, businessKey, startParams, CALL_BACK);
waittingForFinish(inst);
Assert.isTrue(ExecutionStatus.SU.equals(inst.getStatus()), "saga transaction execute failed. XID:" + inst.getId());
System.out.println("saga transaction commit succeed. XID:" + inst.getId());
}
官网这个办法中有同步和异步的案例,我这里把同步的那几行代码删了,咱们就来看异步的。
首先后面 startParams 就是我的项目的参数,在下面 JSON 剖析中,每个办法(reduce、compensateReduce)都有参数,参数就是这。
接下来调用状态机的 startWithBusinessKeyAsync 办法开始各个流程的执行,这个办法的第一个参数就是流程的名字,也就是咱们后面说的 JSON 中的 name,通过这个名字就能够确定是执行哪一个流程,startParams 也是在这里传进去。
waittingForFinish 是一个自定的阻塞办法,目标是使流程执行完,以便获取事务的执行后果,这就是根本的线程常识,我这里就不强调了,大家能够自行下载源码查看。
最初通过断言判断事务的执行状态(inst.getStatus()
)并打印相干日志。
接下来咱们再来看二阶段回滚的办法:
private static void transactionCompensatedDemo(StateMachineEngine stateMachineEngine) {Map<String, Object> startParams = new HashMap<>(4);
String businessKey = String.valueOf(System.currentTimeMillis());
startParams.put("businessKey", businessKey);
startParams.put("count", 10);
startParams.put("amount", new BigDecimal("100"));
startParams.put("mockReduceBalanceFail", "true");
//sync test
StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("reduceInventoryAndBalance", null, businessKey, startParams);
Assert.isTrue(ExecutionStatus.SU.equals(inst.getCompensationStatus()), "saga transaction compensate failed. XID:" + inst.getId());
System.out.println("saga transaction compensate succeed. XID:" + inst.getId());
}
和上个办法相比,这里就是多了一个 mockReduceBalanceFail 参数,在后面所说的那个 JSON 文件的定义中,定义了这个输出参数,大家看上面这段 JSON:
"ReduceBalance": {
"Type": "ServiceTask",
"ServiceName": "balanceAction",
"ServiceMethod": "reduce",
"CompensateState": "CompensateReduceBalance",
"Input": ["$.[businessKey]",
"$.[amount]",
{"throwException" : "$.[mockReduceBalanceFail]"
}
],
"Output": {"compensateReduceBalanceResult": "$.#root"},
能够看到这个输出参数中有 mockReduceBalanceFail,不过这里不是间接将其作为输出参数,而是将之转为了一个 Map,这个 Map 的 key 是 throwException,所以在 BalanceActionImpl#reduce
办法中会有如下一段代码:
public boolean reduce(String businessKey, BigDecimal amount, Map<String, Object> params) {if(params != null && "true".equals(params.get("throwException"))){throw new RuntimeException("reduce balance failed");
}
LOGGER.info("reduce balance succeed, amount:" + amount + ", businessKey:" + businessKey);
return true;
}
如果 throwException 的值为 true,就会抛出异样,此时就会触发事务的回滚。
再回到二阶段回滚的办法中,最初通过 inst.getCompensationStatus()
办法获取事务弥补操作的状态,如果该办法返回 true,示意事务的弥补操作执行胜利。
案例中波及到一些 Dubbo 的知识点我这里就不赘述了,这不是咱们本文的宗旨。
好啦,通过下面的剖析,大家应该大抵上明确了这个 Saga 到底是怎么玩的了。
4. 设计教训
4.1 容许空弥补
空弥补就是原服务未执行,后果弥补服务执行了,当原服务呈现超时、丢包等状况时或者在收到原服务申请之前先收到弥补申请,就可能会呈现空弥补。
因而咱们在服务设计时须要容许空弥补, 即没有找到要弥补的业务主键时返回弥补胜利并将原业务主键记录下来,这也是案例中,无论是原服务还是弥补服务都有 businessKey 参数的起因。
4.2 防悬挂管制
悬挂就是弥补服务比原服务先执行,呈现的起因和后面说的差不多,所以咱们须要在执行原服务时,要先检查一下以后业务主键是否曾经在空弥补记录下来,如果曾经被记录下来,阐明弥补曾经先执行了,此时咱们能够进行原服务的执行。
4.3 幂等管制
原服务与弥补服务都须要保障幂等性, 因为网络可能超时, 所以咱们可能会设置重试策略,重试产生时要通过幂等管制,防止业务数据反复更新。如何保障幂等性,松哥之前公众号的文章中和大家聊过,这里就不再赘述了。
4.4 不足隔离性的应答
因为 Saga 事务不保障隔离性, 在极其状况下可能因为脏写无奈实现回滚操作。
举一个极其的例子, 分布式事务内先给用户 A 充值, 而后给用户 B 扣减余额, 如果在给 A 用户充值胜利, 在事务提交以前, A 用户把余额生产掉了, 如果事务产生回滚, 这时则没有方法进行弥补了。这就是不足隔离性造成的典型的问题。
对于这种问题,咱们能够通过如下形式来尝试解决:
业务流程设计时遵循“宁肯长款, 不可短款”的准则, 长款意思是客户少了钱机构多了钱, 以机构信用能够给客户退款, 反之则是短款, 少的钱可能追不回来了。所以在业务流程设计上肯定是先扣款。
有些业务场景能够容许让业务最终胜利, 在回滚不了的状况下能够持续重试实现前面的流程, 所以状态机引擎除了提供“回滚”能力还须要提供“向前”复原上下文继续执行的能力, 让业务最终执行胜利, 达到最终一致性的目标。
4.5 性能优化
配置客户端参数 client.rm.report.success.enable=false
,能够在当分支事务执行胜利时不上报分支状态到 server,从而晋升性能。
当上一个分支事务的状态还没有上报的时候,下一个分支事务已注册,能够认为上一个理论已胜利
5. 小结
这就是 Seata 分布式事务中的 Saga 模式。至此,Seata 中的四种分布式事务模式松哥就都和大家扯完了,前面我在整一篇文章比拟下这四种模式。
另外三篇传送门:
- 五分钟带你体验一把分布式事务!so easy!
- 看了那么多博客,还是不懂 TCC,无妨看看这个案例!
- XA 事务水很深,小伙子我怕你把握不住!
公号后盾回复 seata-demo
能够下载本文案例。
参考资料:
- https://www.sofastack.tech/bl…
- https://seata.io/zh-cn/docs/u…