1. 前言
本文先通过分布式事务中tcc计划,衍生出seata的tcc模式,次要还是会通过代码示例来做介绍。github代码地址可提前下载,该我的项目中包含数据库、seata配置,以及所有分布式服务的全副代码。大家如果想练练手,能够先拉取该我的项目代码,再联合本文学习。外围配置环境如下:
环境类型 | 版本号 |
---|---|
jdk | 1.8.0_251 |
mysql | 8.0.22 |
seata server | 1.4.1 |
1.1. tcc
咱们后面有几篇文章都有介绍过分布式事务的计划,目前常见的分布式事务计划有:2pc、tcc和异步确保型。之前讲过用jta atomikos实现多数据源的 2pc
,用 异步确保型
计划实现领取业务的事务等等,就是没专门讲过 tcc
的利用。
因为tcc计划的操作难度还是比拟大的。不能单打独斗,最好须要依靠一个成熟的框架来实现。常见的tcc开源框架有tcc-transaction、Hmily和ByteTCC等,不过他们不像seata背靠大厂,无奈提供继续的保护,因而我更举荐seata的tcc计划。
1.2. seata
先说说seata吧,分布式事务的解决方案必定不局限于下面说的三种,实际上形形色色。因为它确实很让人头疼,各位大神都想研发出最好用的框架。本文的配角 - seata
,就是阿里的一个开源我的项目。
seata提供了AT、TCC、SAGA 和 XA,一共4种事务模式。像AT模式就很受欢迎,咱们在实现多数据源的事务一致性时,通常会选用 2PC
的计划,期待所有数据源的事务执行胜利,最初再一起提交事务。这个期待所有数据源事务执行的过程就比拟耗时,即影响性能,也不平安。
而seata AT模式的做法就很灵便,它学习数据库的 undo log,每个事务执行时立刻提交事务,但会把 undo 的回退sql记录下来。如果所有事务执行胜利,革除记录 undo sql的行记录,如果某个事务失败,则执行对应 undo sql 回滚数据。在保障事务的同时,并发量也大了起来。
但咱们明天要讲的是 seata TCC 模式,如果你对 Seata的其余模式感兴趣,能够上官网理解。
2. 业务
先讲一下示例的业务吧,咱们还是拿比拟经典的电商领取场景举例。假如领取胜利后,波及到三个零碎事务:
- 订单零碎(order):创立领取订单。
- 库存零碎(storage):对应商品扣除库存。
- 账户零碎(account):用户账户扣除响应金额。
2.1. tcc业务
依照tcc(try-confirm-cancel)的思路,这三个事务能够别离分解成上面的过程。
订单零碎 order
- try: 创立订单,然而订单状态设置一个长期状态(如:status=0)。
- confirm: try胜利,提交事务,将订单状态更新为齐全状态(如:status=1)。
- cancel: 回滚事务,删除该订单记录。
库存零碎 storage
- try: 将须要缩小的库存量解冻起来。
- confirm: try胜利,提交事务,应用解冻的库存扣除,实现业务数据处理。
- cancel: 回滚事务,解冻的库存冻结,复原以前的库存量。
账户零碎 account
- try: 将须要扣除的钱解冻起来。
- confirm: try胜利,提交事务,应用解冻的钱扣除,实现业务数据处理。
- cancel: 回滚事务,解冻的钱冻结,复原以前的账户余额。
2.2. 数据库
为了模仿分布式事务,上述的不同零碎业务,咱们通过在不同数据库中创立表构造来模仿。当然tcc的分布式事务不局限于数据库层面,还包含http接口调用和rpc调用等,然而殊途同归,能够作为示例参考。
上面先列出三张业务表的表构造,具体的sql可见最初附件。
表:order
列名 | 类型 | 备注 |
---|---|---|
id | int | 主键 |
order_no | varchar | 订单号 |
user_id | int | 用户id |
product_id | int | 产品id |
amount | int | 数量 |
money | decimal | 金额 |
status | int | 订单状态:0:创立中;1:已完结 |
表:storage
列名 | 类型 | 备注 |
---|---|---|
id | int | 主键 |
product_id | int | 产品id |
residue | int | 残余库存 |
frozen | int | TCC事务锁定的库存 |
表:account
列名 | 类型 | 备注 |
---|---|---|
id | int | 主键 |
user_id | int | 用户id |
residue | int | 残余可用额度 |
frozen | int | TCC事务锁定的金额 |
3. seata server
3.1. 下载
seata server 的安装包可间接从官网github下载,下载压缩包后,解压到本地或服务器上。
3.2. 配置
Seata Server 的配置文件有两个:
- seata/conf/registry.conf
- seata/conf/file.conf
registry.conf
Seata Server 要向注册核心进行注册,这样,其余服务就能够通过注册核心去发现 Seata Server,与 Seata Server 进行通信。Seata 反对多款注册核心服务:nacos 、eureka、redis、zk、consul、etcd3、sofa。咱们我的项目中要应用 eureka 注册核心,eureka服务的连贯地址、注册的服务名,这须要在 registry.conf 文件中对 registry
进行配置。
Seata 须要存储全局事务信息、分支事务信息、全局锁信息,这些数据存储到什么地位?针对存储地位的配置,反对放在配置核心,或者也能够放在本地文件。Seata Server 反对的配置核心服务有:nacos 、apollo、zk、consul、etcd3。这里咱们抉择最简略的,应用本地文件,这须要在 registry.conf 文件中对 config
进行配置。
file.conf
file.conf 中对事务信息的存储地位进行配置,存储地位反对:file、db、redis。
这里咱们抉择数据库作为存储地位,这须要在 file.conf 中进行配置。
3.3. 启动
执行 seata/bin/seata-server.sh(windows 是 seata-server.bat) 脚本即可启动seata server。还能够配置下列参数:
-h:注册到注册核心的ip-p:server rpc 监听端口,默认 8091-m:全局事务会话信息存储模式,file、db,优先读取启动参数-n:server node,多个server时,须要辨别各自节点,用于生成不同区间的transctionId,免得抵触-e:多环境配置
3.4. 常见问题
mysql 8
默认启动后会报mysql-connector-java-x.jar
驱动的谬误,是因为seata server 默认不反对mysql 8。
能够在seata server的 lib 文件夹下替换 mysql 的驱动 jar 包。lib 文件夹下,曾经有一个 jdbc 文件夹,把外面驱动版本为 8 的 mysql-connector-java-x.jar 包拷贝到里面 lib 文件夹下即可。
4. 代码
github示例我的项目中包含3个业务服务、1个注册核心,以及resources下的数据库脚本和seata server配置文件。依照服务的启动程序,如下分类:
- resources/database-sql:初始化数据库
- eureka-server:运行 注册核心
- resources/seata-server:下载、装置、配置、启动 seata server服务
- account-service:运行 用户账户服务
- storage-service:运行 商品库存服务
- order-service:运行 订单服务
- 测试:通过postman等工具,调用 order-server 的下订单接口
3个业务服务中,order订单服务
能够被称为“主事务”,当订单创立胜利后,再在订单服务中调用 account账号服务
和 storage库存服务
两个“副事务”。因而从 seata tcc代码层面上,能够分成上面两类。
下文中不会列举业务代码,残缺代码能够从github上查看,只会列出 seata 的相干代码和配置。
4.1. 主事务(order)
4.1.1. application 配置文件
配置文件中须要配置 tx-service-group
,须要留神的是,3个业务服务中都须要配置同样的值。
application.yml
spring: cloud: alibaba: seata: tx-service-group: order_tx_group
4.1.2. seata配置文件
在application.yml同级目录,即 resources 目录下,创立两个seata 的配置文件。还记得在seata server 启动的时候也有这两个文件,但内容不一样,不要混同了。
file.conf
transport { type = "TCP" server = "NIO" heartbeat = true enableClientBatchSendRequest = true threadFactory { bossThreadPrefix = "NettyBoss" workerThreadPrefix = "NettyServerNIOWorker" serverExecutorThread-prefix = "NettyServerBizHandler" shareBossWorker = false clientSelectorThreadPrefix = "NettyClientSelector" clientSelectorThreadSize = 1 clientWorkerThreadPrefix = "NettyClientWorkerThread" bossThreadSize = 1 workerThreadSize = "default" } shutdown { wait = 3 } serialization = "seata" compressor = "none"}service { vgroupMapping.order_tx_group = "seata-server" order_tx_group.grouplist = "127.0.0.1:8091" enableDegrade = false disableGlobalTransaction = false}client { rm { asyncCommitBufferLimit = 10000 lock { retryInterval = 10 retryTimes = 30 retryPolicyBranchRollbackOnConflict = true } reportRetryCount = 5 tableMetaCheckEnable = false reportSuccessEnable = false } tm { commitRetryCount = 5 rollbackRetryCount = 5 } undo { dataValidation = true logSerialization = "jackson" logTable = "undo_log" } log { exceptionRate = 100 }}
registry.conf
registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa type = "eureka" eureka { serviceUrl = "http://localhost:8761/eureka" }}config { # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig type = "file" file { name = "file.conf" }}
4.1.3. @LocalTCC tcc服务
这是配置 TCC 子服务的外围代码,
- @LocalTCC:
该注解须要增加到下面形容的接口上,示意实现该接口的类被 seata 来治理,seata 依据事务的状态,主动调用咱们定义的办法,如果没问题则调用 Commit 办法,否则调用 Rollback 办法。
- @TwoPhaseBusinessAction:
该注解用在接口的 Try 办法上。
- @BusinessActionContextParameter:
该注解用来润饰 Try 办法的入参,被润饰的入参能够在 Commit 办法和 Rollback 办法中通过 BusinessActionContext 获取。
- BusinessActionContext:
在接口办法的实现代码中,能够通过 BusinessActionContext 来获取参数, BusinessActionContext 就是 seata tcc 的事务上下文,用于寄存 tcc 事务的一些要害数据。BusinessActionContext 对象能够间接作为 commit 办法和 rollbakc 办法的参数,Seata 会主动注入参数。
OrderTccAction.java
@LocalTCCpublic interface OrderTccAction { /** * try 尝试 * * BusinessActionContext 上下文对象,用来在两个阶段之间传递数据 * BusinessActionContextParameter 注解的参数数据会被存入 BusinessActionContext * TwoPhaseBusinessAction 注解中commitMethod、rollbackMethod 属性有默认值,能够不写 * * @param businessActionContext * @param orderNo * @param userId * @param productId * @param amount * @param money * @return */ @TwoPhaseBusinessAction(name = "orderTccAction") boolean prepareCreateOrder(BusinessActionContext businessActionContext, @BusinessActionContextParameter(paramName = "orderNo") String orderNo, @BusinessActionContextParameter(paramName = "userId") Long userId, @BusinessActionContextParameter(paramName = "productId") Long productId, @BusinessActionContextParameter(paramName = "amount") Integer amount, @BusinessActionContextParameter(paramName = "money") BigDecimal money); /** * commit 提交 * @param businessActionContext * @return */ boolean commit(BusinessActionContext businessActionContext); /** * cancel 撤销 * @param businessActionContext * @return */ boolean rollback(BusinessActionContext businessActionContext);}
OrderTccActionImpl.java
@Slf4j@Componentpublic class OrderTccActionImpl implements OrderTccAction { private final OrderMapper orderMapper; public OrderTccActionImpl(OrderMapper orderMapper){ this.orderMapper=orderMapper; } /** * try 尝试 * * BusinessActionContext 上下文对象,用来在两个阶段之间传递数据 * BusinessActionContextParameter 注解的参数数据会被存入 BusinessActionContext * TwoPhaseBusinessAction 注解中commitMethod、rollbackMethod 属性有默认值,能够不写 * * @param businessActionContext * @param orderNo * @param userId * @param productId * @param amount * @param money * @return */ @Transactional(rollbackFor = Exception.class) @Override public boolean prepareCreateOrder(BusinessActionContext businessActionContext, String orderNo, Long userId, Long productId, Integer amount, BigDecimal money) { orderMapper.save(new OrderDO(orderNo,userId, productId, amount, money, 0)); ResultHolder.setResult(OrderTccAction.class, businessActionContext.getXid(), "p"); return true; } /** * commit 提交 * * @param businessActionContext * @return */ @Transactional(rollbackFor = Exception.class) @Override public boolean commit(BusinessActionContext businessActionContext) { //查看标记是否存在,如果标记不存在不反复提交 String p = ResultHolder.getResult(OrderTccAction.class, businessActionContext.getXid()); if (p == null){ return true; } /** * 上下文对象从第一阶段向第二阶段传递时,先转成了json数据,而后还原成上下文对象 * 其中的整数比拟小的会转成Integer类型,所以如果须要Long类型,须要先转换成字符串在用Long.valueOf()解析。 */ String orderNo = businessActionContext.getActionContext("orderNo").toString(); orderMapper.updateStatusByOrderNo(orderNo, 1); //提交实现后,删除标记 ResultHolder.removeResult(OrderTccAction.class, businessActionContext.getXid()); return true; } /** * cancel 撤销 * * 第一阶段没有实现的状况下,不用执行回滚。因为第一阶段有本地事务,事务失败时曾经进行了回滚。 * 如果这里第一阶段胜利,而其余全局事务参与者失败,这里会执行回滚 * 幂等性管制:如果反复执行回滚则间接返回 * * @param businessActionContext * @return */ @Transactional(rollbackFor = Exception.class) @Override public boolean rollback(BusinessActionContext businessActionContext) { //查看标记是否存在,如果标记不存在不反复提交 String p = ResultHolder.getResult(OrderTccAction.class, businessActionContext.getXid()); if (p == null){ return true; } String orderNo = businessActionContext.getActionContext("orderNo").toString(); orderMapper.deleteByOrderNo(orderNo); //提交实现后,删除标记 ResultHolder.removeResult(OrderTccAction.class, businessActionContext.getXid()); return true; }}
4.1.4. @GlobalTransactional 全局服务
@GlobalTransactional
注解是惟一作用到“主事务”的办法。该注解加在“主事务”调用“副事务”的办法上。
OrderServiceImpl.java
@Servicepublic class OrderServiceImpl implements OrderService { private final OrderTccAction orderTccAction; private final AccountFeign accountFeign; private final StorageFeign storageFeign; public OrderServiceImpl(OrderTccAction orderTccAction, AccountFeign accountFeign, StorageFeign storageFeign){ this.orderTccAction=orderTccAction; this.accountFeign=accountFeign; this.storageFeign=storageFeign; } /** * 创立订单 * @param orderDO */ @GlobalTransactional @Override public void createOrder(OrderDO orderDO) { String orderNo=this.generateOrderNo(); //创立订单 orderTccAction.prepareCreateOrder(null, orderNo, orderDO.getUserId(), orderDO.getProductId(), orderDO.getAmount(), orderDO.getMoney()); //扣余额 accountFeign.decreaseMoney(orderDO.getUserId(),orderDO.getMoney()); //扣库存 storageFeign.decreaseStorage(orderDO.getProductId(),orderDO.getAmount()); } private String generateOrderNo(){ return LocalDateTime.now() .format( DateTimeFormatter.ofPattern("yyMMddHHmmssSSS") ); }}
4.2. 副事务(account、storage)
account 和 storage 两个服务相比拟于 order,只少了 “4.1.4. @GlobalTransactional 全局服务”,其余的配置齐全一样。因而,这里就不再赘言了。
5. 总结
测试
通过调用“主事务” order-service 的创立订单接口,来模仿分布式事务。咱们能够通过在3个业务服务的不同代码处成心抛出谬误,看是否可能实现事务的统一回滚。
seata框架表构造
在 /resources/database-sql 的数据库脚本中,各自还有一些 seata 框架自身的表构造,用于存储分布式事务各自的中间状态。因为这个中间状态很短,一旦事务一致性达成,表数据就会主动删除,因而平时咱们无奈查看数据库。
因为seata tcc模式,会始终阻塞到所有的 try执行结束,再执行后续的。从而咱们能够通过在局部业务服务try的代码中加上Thread.sleep(10000)
,强制让事务过程变慢,从而就能够看到这些 seata 表数据。
幂等性
tcc模式中,Commit
和 Cancel
都是有主动重试性能的,处于事务一致性思考,重试性能很有必要。但咱们就肯定要慎重考虑办法的 幂等性
,示例代码中的ResultHolder类并不是个好计划,还是要在Commit、Cancel业务办法自身做幂等性要求。