TCC 基本原理
TCC 与 Seata AT 事务一样都是两阶段事务,它与 AT 事务的次要区别为:
- TCC 对业务代码侵入重大
每个阶段的数据操作都要本人进行编码来实现,事务框架无奈主动解决。 - TCC 效率更高
不用对数据加全局锁,容许多个事务同时操作数据。
第一阶段 Try
以账户服务为例,当下订单时要扣减用户账户金额:
如果用户购买 100 元商品,要扣减 100 元。
TCC 事务首先对这100元的扣减金额进行预留,或者说是先解冻这100元:
第二阶段 Confirm
如果第一阶段可能顺利完成,那么阐明“扣减金额”业务(分支事务)最终必定是能够胜利的。当全局事务提交时, TC会管制以后分支事务进行提交,如果提交失败,TC 会重复尝试,直到提交胜利为止。
当全局事务提交时,就能够应用解冻的金额来最终实现业务数据操作:
第二阶段 Cancel
如果全局事务回滚,就把解冻的金额进行冻结,复原到以前的状态,TC 会管制以后分支事务回滚,如果回滚失败,TC 会重复尝试,直到回滚实现为止。
多个事务并发的状况
多个TCC全局事务容许并发,它们执行扣减金额时,只须要解冻各自的金额即可:
Seata TCC事务模式
Seata 反对 TCC 事务模式,与 AT 模式雷同的,也须要以下组件来反对全局事务的管制:
- TC 事务协调器
- TM 事务管理器
- RM 资源管理器
筹备订单我的项目案例
新建 seata-tcc 工程
新建 Empty Project:
工程命名为 seata-tcc
,寄存到 seata-samples 文件夹下,与 seata-at
工程寄存在一起:
导入订单我的项目,无事务版本
下载我的项目代码
- 拜访 git 仓库 https://gitee.com/benwang6/seata-samples
- 拜访我的项目标签
- 下载无事务版
解压到 seata-tcc 目录
压缩文件中的 7 个我的项目目录解压缩到 seata-tcc
目录:
导入我的项目
在 idea 中按两下 shift
键,搜寻 add maven projects
,关上 maven 工具:
而后抉择 seata-tcc
工程目录下的 7 个我的项目的 pom.xml
导入:
order启动全局事务,增加“保留订单”分支事务
在订单我的项目中执行增加订单:
咱们要增加以下 TCC 事务操作的代码:
T
ry - 第一阶,解冻数据阶段,向订单表直接插入订单,订单状态设置为0(解冻状态)。
C
onfirm - 第二阶段,提交事务,将订单状态批改成1(失常状态)。
C
ancel - 第二阶段,回滚事务,删除订单。
order-parent 增加 seata 依赖
关上 order-parent 中正文掉的 seata 依赖:
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>cn.tedu</groupId> <artifactId>order-parent</artifactId> <version>1.0-SNAPSHOT</version> <packaging>pom</packaging> <name>order-parent</name> <properties> <mybatis-plus.version>3.3.2</mybatis-plus.version> <druid-spring-boot-starter.version>1.1.23</druid-spring-boot-starter.version> <seata.version>1.3.0</seata.version> <spring-cloud-alibaba-seata.version>2.0.0.RELEASE</spring-cloud-alibaba-seata.version> <spring-cloud.version>Hoxton.SR6</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>${mybatis-plus.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>${druid-spring-boot-starter.version}</version> </dependency> <!-- 关上 seata 依赖 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-seata</artifactId> <version>${spring-cloud-alibaba-seata.version}</version> <exclusions> <exclusion> <artifactId>seata-all</artifactId> <groupId>io.seata</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>io.seata</groupId> <artifactId>seata-all</artifactId> <version>${seata.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build></project>
配置
application.yml
设置全局事务组的组名:
spring: application: name: order datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost/seata_order?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8 username: root password: root # 事务组设置 cloud: alibaba: seata: tx-service-group: order_tx_group......
registry.conf 和 file.conf
与 AT 事务中的配置完全相同:
registry.conf
:
registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa type = "eureka" nacos { serverAddr = "localhost" namespace = "" cluster = "default" } eureka { serviceUrl = "http://localhost:8761/eureka" # application = "default" # weight = "1" } redis { serverAddr = "localhost:6379" db = "0" password = "" cluster = "default" timeout = "0" } zk { cluster = "default" serverAddr = "127.0.0.1:2181" session.timeout = 6000 connect.timeout = 2000 username = "" password = "" } consul { cluster = "default" serverAddr = "127.0.0.1:8500" } etcd3 { cluster = "default" serverAddr = "http://localhost:2379" } sofa { serverAddr = "127.0.0.1:9603" application = "default" region = "DEFAULT_ZONE" datacenter = "DefaultDataCenter" cluster = "default" group = "SEATA_GROUP" addressWaitTime = "3000" } file { name = "file.conf" }}config { # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig type = "file" nacos { serverAddr = "localhost" namespace = "" group = "SEATA_GROUP" } consul { serverAddr = "127.0.0.1:8500" } apollo { app.id = "seata-server" apollo.meta = "http://192.168.1.204:8801" namespace = "application" } zk { serverAddr = "127.0.0.1:2181" session.timeout = 6000 connect.timeout = 2000 username = "" password = "" } etcd3 { serverAddr = "http://localhost:2379" } file { name = "file.conf" }}
file.conf
:
transport { # tcp udt unix-domain-socket type = "TCP" #NIO NATIVE server = "NIO" #enable heartbeat heartbeat = true # the client batch send request enable enableClientBatchSendRequest = true #thread factory for netty threadFactory { bossThreadPrefix = "NettyBoss" workerThreadPrefix = "NettyServerNIOWorker" serverExecutorThread-prefix = "NettyServerBizHandler" shareBossWorker = false clientSelectorThreadPrefix = "NettyClientSelector" clientSelectorThreadSize = 1 clientWorkerThreadPrefix = "NettyClientWorkerThread" # netty boss thread size,will not be used for UDT bossThreadSize = 1 #auto default pin or 8 workerThreadSize = "default" } shutdown { # when destroy server, wait seconds wait = 3 } serialization = "seata" compressor = "none"}service { #transaction service group mapping # order_tx_group 与 yml 中的 “tx-service-group: order_tx_group” 配置统一 # “seata-server” 与 TC 服务器的注册名统一 # 从eureka获取seata-server的地址,再向seata-server注册本人,设置group vgroupMapping.order_tx_group = "seata-server" #only support when registry.type=file, please don't set multiple addresses order_tx_group.grouplist = "127.0.0.1:8091" #degrade, current not support enableDegrade = false #disable seata disableGlobalTransaction = false}client { rm { asyncCommitBufferLimit = 10000 lock { retryInterval = 10 retryTimes = 30 retryPolicyBranchRollbackOnConflict = true } reportRetryCount = 5 tableMetaCheckEnable = false reportSuccessEnable = false } tm { commitRetryCount = 5 rollbackRetryCount = 5 } undo { dataValidation = true logSerialization = "jackson" logTable = "undo_log" } log { exceptionRate = 100 }}
OrderMapper 增加更新订单状态、删除订单
依据后面的剖析,订单数据操作有以下三项:
- 插入订单
- 批改订单状态
- 删除订单
在 OrderMapper 中曾经有插入订单的办法了,当初须要增加批改订单和删除订单的办法(删除办法从BaseMapper继承):
package cn.tedu.order.mapper;import cn.tedu.order.entity.Order;import com.baomidou.mybatisplus.core.mapper.BaseMapper;import org.apache.ibatis.annotations.Param;public interface OrderMapper extends BaseMapper { void create(Order order); void updateStatus(@Param("orderId") Long orderId, @Param("status") Integer status);}
那么对应的 OrderMapper.xml
中也要增加 sql:
<?xml version="1.0" encoding="UTF-8" ?><!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" ><mapper namespace="cn.tedu.order.mapper.OrderMapper" > <resultMap id="BaseResultMap" type="cn.tedu.order.entity.Order" > <id column="id" property="id" jdbcType="BIGINT" /> <result column="user_id" property="userId" jdbcType="BIGINT" /> <result column="product_id" property="productId" jdbcType="BIGINT" /> <result column="count" property="count" jdbcType="INTEGER" /> <result column="money" property="money" jdbcType="DECIMAL" /> <result column="status" property="status" jdbcType="INTEGER" /> </resultMap> <insert id="create"> INSERT INTO `order` (`id`,`user_id`,`product_id`,`count`,`money`,`status`) VALUES(#{id}, #{userId}, #{productId}, #{count}, #{money}, ${status}); </insert> <update id="updateStatus" > UPDATE `order` SET `status`=#{status} WHERE `id`=#{orderId}; </update> <delete id="deleteById"> DELETE FROM `order` WHERE `id`=#{orderId} </delete></mapper>
Seata 实现订单的 TCC 操作方法
- 第一阶段 Try
第二阶段
- Confirm
- Cancel
第二阶段为了解决幂等性问题这里首先增加一个工具类 ResultHolder
。
这个工具也能够在第二阶段 Confirm 或 Cancel 阶段对第一阶段的胜利与否进行判断,在第一阶段胜利时须要保留一个标识。
ResultHolder
能够为每一个全局事务保留一个标识:
package cn.tedu.order.tcc;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;public class ResultHolder { private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>(); public static void setResult(Class<?> actionClass, String xid, String v) { Map<String, String> results = map.get(actionClass); if (results == null) { synchronized (map) { if (results == null) { results = new ConcurrentHashMap<>(); map.put(actionClass, results); } } } results.put(xid, v); } public static String getResult(Class<?> actionClass, String xid) { Map<String, String> results = map.get(actionClass); if (results != null) { return results.get(xid); } return null; } public static void removeResult(Class<?> actionClass, String xid) { Map<String, String> results = map.get(actionClass); if (results != null) { results.remove(xid); } }}
Seata 实现 TCC 操作须要定义一个接口,咱们在接口中增加以下办法:
- Try -
prepareCreateOrder()
- Confirm -
commit()
- Cancel -
rollback()
package cn.tedu.order.tcc;import io.seata.rm.tcc.api.BusinessActionContext;import io.seata.rm.tcc.api.BusinessActionContextParameter;import io.seata.rm.tcc.api.LocalTCC;import io.seata.rm.tcc.api.TwoPhaseBusinessAction;import java.math.BigDecimal;@LocalTCCpublic interface OrderTccAction { // T (try - 预留资源,解冻订单) /* 第一阶段的办法 通过注解指定第二阶段的两个办法名 BusinessActionContext 上下文对象,用来在两个阶段之间传递数据 @BusinessActionContextParameter 注解的参数数据会被存入 BusinessActionContext @TwoPhaseBusinessAction 提交回滚的默认值能够不写 */ @TwoPhaseBusinessAction(name = "orderTccAction", commitMethod = "commit", rollbackMethod = "rollback") boolean prepareCreateOrder(BusinessActionContext businessActionContext, @BusinessActionContextParameter(paramName = "orderId") Long orderId, @BusinessActionContextParameter(paramName = "userId") Long userId, @BusinessActionContextParameter(paramName = "productId") Long productId, @BusinessActionContextParameter(paramName = "count") Integer count, @BusinessActionContextParameter(paramName = "money") BigDecimal money); //C (confirm - 确认,提交) boolean commit(BusinessActionContext businessActionContext); //C (cancel - 勾销,回滚) boolean rollback(BusinessActionContext businessActionContext);}
实现类:
package cn.tedu.order.tcc;import cn.tedu.order.entity.Order;import cn.tedu.order.mapper.OrderMapper;import io.seata.rm.tcc.api.BusinessActionContext;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.math.BigDecimal;@Componentpublic class OrderTccActionImpl implements OrderTccAction { @Autowired private OrderMapper orderMapper; @Override public boolean prepareCreateOrder(BusinessActionContext ctx, Long orderId, Long userId, Long productId, Integer count, BigDecimal money) { orderMapper.create(new Order(orderId, userId, productId, count, money, 0)); //保留一阶段的胜利标记 ResultHolder.setResult(OrderTccAction.class, ctx.getXid(), "p"); return true; } @Override public boolean commit(BusinessActionContext ctx) { //查看标记是否存在,如果标记不存在不反复提交 String p = ResultHolder.getResult(OrderTccAction.class, ctx.getXid()); if (p == null){ return true; } /** * 上下文对象从第一阶段向第二阶段传递时,先转成了json数据,而后还原成上下文对象 * 其中的整数比拟小的会转成Integer类型,所以如果须要Long类型,须要先转换成字符串在用Long.valueOf()解析。 */ Long orderId = Long.valueOf(ctx.getActionContext("orderId").toString()); orderMapper.updateStatus(orderId, 1); //提交实现后,删除标记 ResultHolder.removeResult(OrderTccAction.class, ctx.getXid()); return true; } @Override public boolean rollback(BusinessActionContext ctx) { //第一阶段没有实现的状况下,不用执行回滚 //因为第一阶段有本地事务,事务失败时曾经进行了回滚。 //如果这里第一阶段胜利,而其余全局事务参与者失败,这里会执行回滚 //幂等性管制:如果反复执行回滚则间接返回 //查看标记是否存在,如果标记不存在不反复提交 String p = ResultHolder.getResult(OrderTccAction.class, ctx.getXid()); if (p == null){ return true; } Long orderId = Long.valueOf(ctx.getActionContext("orderId").toString()); orderMapper.deleteById(orderId); //提交实现后,删除标记 ResultHolder.removeResult(OrderTccAction.class, ctx.getXid()); return true; }}
在业务代码中调用 Try 阶段办法
业务代码中不再间接保留订单数据,而是调用 TCC 第一阶段办法prepareCreateOrder()
,并增加全局事务注解 @GlobalTransactional
:
package cn.tedu.order.service;import cn.tedu.order.entity.Order;import cn.tedu.order.feign.AccountClient;import cn.tedu.order.feign.EasyIdGeneratorClient;import cn.tedu.order.feign.StorageClient;import cn.tedu.order.mapper.OrderMapper;import cn.tedu.order.tcc.OrderTccAction;import io.seata.spring.annotation.GlobalTransactional;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.util.Random;@Servicepublic class OrderServiceImpl implements OrderService { // @Autowired // private OrderMapper orderMapper; @Autowired EasyIdGeneratorClient easyIdGeneratorClient; @Autowired private AccountClient accountClient; @Autowired private StorageClient storageClient; @Autowired private OrderTccAction orderTccAction; @GlobalTransactional @Override public void create(Order order) { // 从全局惟一id发号器取得id Long orderId = easyIdGeneratorClient.nextId("order_business"); order.setId(orderId); // orderMapper.create(order); // 这里批改成调用 TCC 第一节端办法 //orderTccAction是一个动静代理对象,其中增加了前置拦截器,所以底层会创立,所以传null值即可 orderTccAction.prepareCreateOrder( null, order.getId(), order.getUserId(), order.getProductId(), order.getCount(), order.getMoney()); // 批改库存 //storageClient.decrease(order.getProductId(), order.getCount()); // 批改账户余额 //accountClient.decrease(order.getUserId(), order.getMoney()); }}
启动 order 进行测试
按程序启动服务:
- Eureka
- Seata Server
- Easy Id Generator
- Order
调用保留订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100
察看控制台日志:
查看数据库表中的订单数据:
storage增加“缩小库存”分支事务
在库存我的项目中执行缩小库存:
咱们要增加以下 TCC 事务操作的代码:
T
ry - 第一阶,解冻数据阶段,将要缩小的库存量先解冻:
C
onfirm - 第二阶段,提交事务,应用解冻的库存实现业务数据处理:
C
ancel - 第二阶段,回滚事务,解冻的库存冻结,复原以前的库存量:
配置
有三个文件须要配置:
- application.yml
- registry.conf
- file.conf
这三个文件的设置与下面 order 我的项目的配置完全相同,请参考下面订单配置一章进行配置。
StorageMapper 增加解冻库存相干办法
依据后面的剖析,库存数据操作有以下三项:
- 解冻库存
- 解冻库存量批改为已售出量
- 冻结库存
在 StorageMapper 中增加三个办法:
package cn.tedu.storage.mapper;import cn.tedu.storage.entity.Storage;import com.baomidou.mybatisplus.core.mapper.BaseMapper;import org.apache.ibatis.annotations.Param;public interface StorageMapper extends BaseMapper<Storage> { void decrease(Long productId, Integer count); // 解冻库存 void updateFrozen(@Param("productId") Long productId, @Param("residue") Integer residue, @Param("frozen") Integer frozen); // 提交时,把解冻量批改到已售出 void updateFrozenToUsed(@Param("productId") Long productId, @Param("count") Integer count); // 回滚时,把解冻量批改到可用库存 void updateFrozenToResidue(@Param("productId") Long productId, @Param("count") Integer count);}
那么对应的 StorageMapper.xml
中也要增加 sql:
<?xml version="1.0" encoding="UTF-8" ?><!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" ><mapper namespace="cn.tedu.storage.mapper.StorageMapper" > <resultMap id="BaseResultMap" type="cn.tedu.storage.entity.Storage" > <id column="id" property="id" jdbcType="BIGINT" /> <result column="product_id" property="productId" jdbcType="BIGINT" /> <result column="total" property="total" jdbcType="INTEGER" /> <result column="used" property="used" jdbcType="INTEGER" /> <result column="residue" property="residue" jdbcType="INTEGER" /> </resultMap> <update id="decrease"> UPDATE storage SET used = used + #{count},residue = residue - #{count} WHERE product_id = #{productId} </update> <select id="selectById" resultMap="BaseResultMap"> SELECT * FROM storage WHERE `product_id`=#{productId} </select> <update id="updateFrozen"> UPDATE storage SET `residue`=#{residue},`frozen`=#{frozen} WHERE `product_id`=#{productId} </update> <update id="updateFrozenToUsed"> UPDATE storage SET `frozen`=`frozen`-#{count}, `used`=`used`+#{count} WHERE `product_id`=#{productId} </update> <update id="updateFrozenToResidue"> UPDATE storage SET `frozen`=`frozen`-#{count}, `residue`=`residue`+#{count} WHERE `product_id`=#{productId} </update></mapper>
Seata 实现库存的 TCC 操作方法
工具类 ResultHolder
:
package cn.tedu.storage.tcc;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;public class ResultHolder { private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>(); public static void setResult(Class<?> actionClass, String xid, String v) { Map<String, String> results = map.get(actionClass); if (results == null) { synchronized (map) { if (results == null) { results = new ConcurrentHashMap<>(); map.put(actionClass, results); } } } results.put(xid, v); } public static String getResult(Class<?> actionClass, String xid) { Map<String, String> results = map.get(actionClass); if (results != null) { return results.get(xid); } return null; } public static void removeResult(Class<?> actionClass, String xid) { Map<String, String> results = map.get(actionClass); if (results != null) { results.remove(xid); } }}
增加 TCC 接口,在接口中增加以下办法:
- Try -
prepareDecreaseStorage()
- Confirm -
commit()
- Cancel -
rollback()
package cn.tedu.storage.tcc;import io.seata.rm.tcc.api.BusinessActionContext;import io.seata.rm.tcc.api.BusinessActionContextParameter;import io.seata.rm.tcc.api.LocalTCC;import io.seata.rm.tcc.api.TwoPhaseBusinessAction;@LocalTCCpublic interface StorageTccAction { @TwoPhaseBusinessAction(name = "storageTccAction", commitMethod = "commit", rollbackMethod = "rollback") boolean prepareDecreaseStorage(BusinessActionContext businessActionContext, @BusinessActionContextParameter(paramName = "productId") Long productId, @BusinessActionContextParameter(paramName = "count") Integer count); boolean commit(BusinessActionContext businessActionContext); boolean rollback(BusinessActionContext businessActionContext);}
实现类:
package cn.tedu.storage.tcc;import cn.tedu.storage.entity.Storage;import cn.tedu.storage.mapper.StorageMapper;import io.seata.rm.tcc.api.BusinessActionContext;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import org.springframework.transaction.annotation.Transactional;@Component@Slf4jpublic class StorageTccActionImpl implements StorageTccAction { @Autowired private StorageMapper storageMapper; @Transactional @Override public boolean prepareDecreaseStorage(BusinessActionContext businessActionContext, Long productId, Integer count) { log.info("缩小商品库存,第一阶段,锁定缩小的库存量,productId="+productId+", count="+count); Storage storage = storageMapper.selectById(productId); if (storage.getResidue()-count<0) { throw new RuntimeException("库存有余"); } /* 库存减掉count, 解冻库存减少count */ storageMapper.updateFrozen(productId, storage.getResidue()-count, storage.getFrozen()+count); //保留标识 ResultHolder.setResult(getClass(), businessActionContext.getXid(), "p"); return true; } @Transactional @Override public boolean commit(BusinessActionContext businessActionContext) { long productId = Long.parseLong(businessActionContext.getActionContext("productId").toString()); int count = Integer.parseInt(businessActionContext.getActionContext("count").toString()); log.info("缩小商品库存,第二阶段提交,productId="+productId+", count="+count); //避免反复提交 if (ResultHolder.getResult(getClass(), businessActionContext.getXid()) == null) { return true; } storageMapper.updateFrozenToUsed(productId, count); //删除标识 ResultHolder.removeResult(getClass(), businessActionContext.getXid()); return true; } @Transactional @Override public boolean rollback(BusinessActionContext businessActionContext) { long productId = Long.parseLong(businessActionContext.getActionContext("productId").toString()); int count = Integer.parseInt(businessActionContext.getActionContext("count").toString()); log.info("缩小商品库存,第二阶段,回滚,productId="+productId+", count="+count); //避免反复回滚 if (ResultHolder.getResult(getClass(), businessActionContext.getXid()) == null) { return true; } storageMapper.updateFrozenToResidue(productId, count); //删除标识 ResultHolder.removeResult(getClass(), businessActionContext.getXid()); return true; }}
在业务代码中调用 Try 阶段办法
业务代码中调用 TCC 第一阶段办法prepareDecreaseStorage()
,并增加全局事务注解 @GlobalTransactional
:
package cn.tedu.storage.service;import cn.tedu.storage.tcc.StorageTccAction;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;@Servicepublic class StorageServiceImpl implements StorageService { // @Autowired // private StorageMapper storageMapper; @Autowired private StorageTccAction storageTccAction; @Override public void decrease(Long productId, Integer count) throws Exception { // storageMapper.decrease(productId,count); storageTccAction.prepareDecreaseStorage(null, productId, count); }}
启动 storage 进行测试
按程序启动服务:
- Eureka
- Seata Server
- Easy Id Generator
- Storage
- Order
调用保留订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100
察看 storage 的控制台日志:
查看数据库表中的库存数据:
account增加“扣减金额”分支事务
扣减金额 TCC 事务剖析请见《分布式事务(六)Seata TCC模式-TCC模式介绍》
配置
有三个文件须要配置:
- application.yml
- registry.conf
- file.conf
这三个文件的设置与下面 order 我的项目的配置完全相同,请参考下面订单配置一章进行配置。
AccountMapper 增加解冻库存相干办法
依据后面的剖析,库存数据操作有以下三项:
- 解冻库存
- 解冻库存量批改为已售出量
- 冻结库存
在 AccountMapper 中增加三个办法:
package cn.tedu.account.mapper;import cn.tedu.account.entity.Account;import com.baomidou.mybatisplus.core.mapper.BaseMapper;import org.apache.ibatis.annotations.Param;import java.math.BigDecimal;public interface AccountMapper extends BaseMapper<Account> { void decrease(Long userId, BigDecimal money); void updateFrozen(@Param("userId") Long userId, @Param("residue") BigDecimal residue, @Param("frozen") BigDecimal frozen); void updateFrozenToUsed(@Param("userId") Long userId, @Param("money") BigDecimal money); void updateFrozenToResidue(@Param("userId") Long userId, @Param("money") BigDecimal money);}
那么对应的 AccountMapper.xml
中增加 sql:
<?xml version="1.0" encoding="UTF-8" ?><!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" ><mapper namespace="cn.tedu.account.mapper.AccountMapper" > <resultMap id="BaseResultMap" type="cn.tedu.account.entity.Account" > <id column="id" property="id" jdbcType="BIGINT" /> <result column="user_id" property="userId" jdbcType="BIGINT" /> <result column="total" property="total" jdbcType="DECIMAL" /> <result column="used" property="used" jdbcType="DECIMAL" /> <result column="residue" property="residue" jdbcType="DECIMAL"/> <result column="frozen" property="frozen" jdbcType="DECIMAL"/> </resultMap> <update id="decrease"> UPDATE account SET residue = residue - #{money},used = used + #{money} where user_id = #{userId}; </update> <select id="selectById" resultMap="BaseResultMap"> SELECT * FROM account WHERE `user_id`=#{userId} </select> <update id="updateFrozen"> UPDATE account SET `residue`=#{residue},`frozen`=#{frozen} WHERE `user_id`=#{userId} </update> <update id="updateFrozenToUsed"> UPDATE account SET `frozen`=`frozen`-#{money}, `used`=`used`+#{money} WHERE `user_id`=#{userId} </update> <update id="updateFrozenToResidue"> UPDATE account SET `frozen`=`frozen`-#{money}, `residue`=`residue`+#{money} WHERE `user_id`=#{userId} </update></mapper>
Seata 实现库存的 TCC 操作方法
工具类 ResultHolder
:
package cn.tedu.account.tcc;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;public class ResultHolder { private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>(); public static void setResult(Class<?> actionClass, String xid, String v) { Map<String, String> results = map.get(actionClass); if (results == null) { synchronized (map) { if (results == null) { results = new ConcurrentHashMap<>(); map.put(actionClass, results); } } } results.put(xid, v); } public static String getResult(Class<?> actionClass, String xid) { Map<String, String> results = map.get(actionClass); if (results != null) { return results.get(xid); } return null; } public static void removeResult(Class<?> actionClass, String xid) { Map<String, String> results = map.get(actionClass); if (results != null) { results.remove(xid); } }}
增加 TCC 接口,在接口中增加以下办法:
- Try -
prepareDecreaseAccount()
- Confirm -
commit()
- Cancel -
rollback()
package cn.tedu.account.tcc;import io.seata.rm.tcc.api.BusinessActionContext;import io.seata.rm.tcc.api.BusinessActionContextParameter;import io.seata.rm.tcc.api.LocalTCC;import io.seata.rm.tcc.api.TwoPhaseBusinessAction;import java.math.BigDecimal;@LocalTCCpublic interface AccountTccAction { @TwoPhaseBusinessAction(name = "accountTccAction", commitMethod = "commit", rollbackMethod = "rollback") boolean prepareDecreaseAccount(BusinessActionContext businessActionContext, @BusinessActionContextParameter(paramName = "userId") Long userId, @BusinessActionContextParameter(paramName = "money") BigDecimal money); boolean commit(BusinessActionContext businessActionContext); boolean rollback(BusinessActionContext businessActionContext);}
实现类:
package cn.tedu.account.tcc;import cn.tedu.account.entity.Account;import cn.tedu.account.mapper.AccountMapper;import io.seata.rm.tcc.api.BusinessActionContext;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import org.springframework.transaction.annotation.Transactional;import java.math.BigDecimal;@Component@Slf4jpublic class AccountTccActionImpl implements AccountTccAction { @Autowired private AccountMapper accountMapper; @Transactional @Override public boolean prepareDecreaseAccount(BusinessActionContext businessActionContext, Long userId, BigDecimal money) { log.info("缩小账户金额,第一阶段锁定金额,userId="+userId+", money="+money); Account account = accountMapper.selectById(userId); if (account.getResidue().compareTo(money) < 0) { throw new RuntimeException("账户金额有余"); } /* 余额-money 解冻+money */ accountMapper.updateFrozen(userId, account.getResidue().subtract(money), account.getFrozen().add(money)); //保留标识 ResultHolder.setResult(getClass(), businessActionContext.getXid(), "p"); return true; } @Transactional @Override public boolean commit(BusinessActionContext businessActionContext) { long userId = Long.parseLong(businessActionContext.getActionContext("userId").toString()); BigDecimal money = new BigDecimal(businessActionContext.getActionContext("money").toString()); log.info("缩小账户金额,第二阶段,提交,userId="+userId+", money="+money); //避免反复提交 if (ResultHolder.getResult(getClass(), businessActionContext.getXid()) == null) { return true; } accountMapper.updateFrozenToUsed(userId, money); //删除标识 ResultHolder.removeResult(getClass(), businessActionContext.getXid()); return true; } @Transactional @Override public boolean rollback(BusinessActionContext businessActionContext) { long userId = Long.parseLong(businessActionContext.getActionContext("userId").toString()); BigDecimal money = new BigDecimal(businessActionContext.getActionContext("money").toString()); //避免反复回滚 if (ResultHolder.getResult(getClass(), businessActionContext.getXid()) == null) { return true; } log.info("缩小账户金额,第二阶段,回滚,userId="+userId+", money="+money); accountMapper.updateFrozenToResidue(userId, money); //删除标识 ResultHolder.removeResult(getClass(), businessActionContext.getXid()); return true; }}
在业务代码中调用 Try 阶段办法
业务代码中调用 TCC 第一阶段办法prepareDecreaseAccount()
,并增加全局事务注解 @GlobalTransactional
:
package cn.tedu.account.service;import cn.tedu.account.mapper.AccountMapper;import cn.tedu.account.tcc.AccountTccAction;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.math.BigDecimal;@Servicepublic class AccountServiceImpl implements AccountService { // @Autowired // private AccountMapper accountMapper; @Autowired private AccountTccAction accountTccAction; @Override public void decrease(Long userId, BigDecimal money) { // accountMapper.decrease(userId,money); accountTccAction.prepareDecreaseAccount(null, userId, money); }}
启动 account 进行测试
按程序启动服务:
- Eureka
- Seata Server
- Easy Id Generator
- Storage
- Account
- Order
调用保留订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100
察看 account 的控制台日志:
查看数据库表中的账户数据:
全局事务回滚测试
上面来测试全局事务回滚的状况。
订单和库存第一阶段胜利,而账户第一阶段失败了,这时会触发全局事务的回滚,如下图所示:
首先在 account 的第一阶段代码中增加模仿异样:
AccountTccActionImpl
的 prepareDecreaseAccount
办法
@Transactional @Override public boolean prepareDecreaseAccount(BusinessActionContext businessActionContext, Long userId, BigDecimal money) { log.info("缩小账户金额,第一阶段锁定金额,userId="+userId+", money="+money); Account account = accountMapper.selectById(userId); if (account.getResidue().compareTo(money) < 0) { throw new RuntimeException("账户金额有余"); } /* 余额-money 解冻+money */ accountMapper.updateFrozen(userId, account.getResidue().subtract(money), account.getFrozen().add(money)); if (Math.random() < 0.5) { throw new RuntimeException("模仿异样"); } //保留标识 ResultHolder.setResult(getClass(), businessActionContext.getXid(), "p"); return true; }
重启 account 后,拜访订单:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100
查看控制台,能够看到 storage 和 order 的回滚日志,order 的回滚日志如下: