共计 14063 个字符,预计需要花费 36 分钟才能阅读完成。
@[toc]
前段时间松哥和大家分享了一篇文章和一个视频:
- 手把手教你玩多数据源动静切换!
- 网页上点一下,就能切换不同数据源?松哥手把手教你!
这个次要和大家讲了如何通过自定义注解实现多数据源的切换。
有小伙伴看完后就提出来问题了,既然这样,那事务怎么办呢?如果在一个 Service 办法中切换了数据源,那么传统的事务解决方案必然生效!特地是在微服务中,这种一个服务中调用多个数据源的事件还很常见。
怎么办?
对于这个问题,咱们能够依照分布式事务的思路去解决。松哥去年其实也写过分布式事务的文章,然而比拟毛糙,没有率领小伙伴们通过手写代码去体验分布式事务,这次因为要录制 TienChin 我的项目视频,而且刚好小伙伴们也提出来这个问题了,所以就认认真真写几篇文章,和大家讲一讲这个,同时前面也会录几个视频来和大家讲分布式事务,视频会放在 TienChin 我的项目中,如果小伙伴们对视频感兴趣,请戳戳戳这里 TienChin 我的项目配套视频来啦。
那么明天我就先来和小伙伴们剖析下如何应用 seata 中的 at 模式来解决分布式事务。
1. AT 模式原理
整体上来说,AT 模式是两阶段提交协定的演变:
- 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,开释本地锁和连贯资源。
- 二阶段则分两种状况:
2.1 提交异步化,十分疾速地实现。
2.2 回滚通过一阶段的回滚日志进行反向弥补。
大抵上的逻辑就是下面这样,咱们通过一个具体的案例来看看 AT 模式是如何工作的:
假如有一个业务表 product,如下:
当初咱们想做如下一个更新操作:
update product set name = 'GTS' where name = 'TXC';
步骤如下:
一阶段:
- 解析 SQL:失去 SQL 的类型(UPDATE),表(product),条件(where name = ‘TXC’)等相干的信息。
- 查问前镜像:依据解析失去的条件信息,生成查问语句,定位数据(查找到更新之前的数据)。
- 执行下面的更新 SQL。
- 查问后镜像:依据前镜像的后果,通过 主键 定位数据。
- 插入回滚日志:把前后镜像数据以及业务 SQL 相干的信息组成一条回滚日志记录,插入到 UNDO_LOG 表中。
- 提交前,向 TC 注册分支:申请 product 表中,主键值等于 1 的记录的 全局锁。
- 本地事务提交:业务数据的更新和后面步骤中生成的 UNDO LOG 一并提交。
- 将本地事务提交的后果上报给 TC。
二阶段:
二阶段分两种状况,提交或者回滚。
先来看 回滚 步骤:
- 首先收到 TC 的分支回滚申请,开启一个本地事务,执行如下操作。
- 通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录(这条记录中保留了数据批改前后对应的镜像)。
- 数据校验:拿 UNDO LOG 中的后镜像与以后数据进行比拟,如果有不同,阐明数据被以后全局事务之外的动作做了批改。这种状况,须要依据配置策略来做解决。
- 依据 UNDO LOG 中的前镜像和业务 SQL 的相干信息生成并执行回滚的语句:
update product set name = 'TXC' where id = 1
; - 提交本地事务。并把本地事务的执行后果(即分支事务回滚的后果)上报给 TC。
再来看 提交 步骤:
- 收到 TC 的分支提交申请,把申请放入一个异步工作的队列中,马上返回提交胜利的后果给 TC。
- 异步工作阶段的分支提交申请将异步和批量地删除相应 UNDO LOG 记录。
大抵上就是这样一个步骤,思路还是比拟清晰的,就是当你要更新一条记录的时候,零碎会先依据这条记录本来的内容生成一个回滚日志存入 undo log 表中,未来要回滚的话,就依据 undo log 中的记录去更新数据(反向弥补),未来要是不回滚的话,就删除 undo log 中的记录。
实践看着简略,代码怎么写?咱们持续往下看。
2. AT 模式实际
2.1 案例介绍
咱们这里举一个商品下单的案例,一共有五个服务,我来和大家略微解释下:
- eureka:这是服务注册核心。
- account:这是账户服务,能够查问 / 批改用户的账户信息(次要是账户余额)。
- order:这是订单服务,能够下订单。
- storage:这是一个仓储服务,能够查问 / 批改商品的库存数量。
- bussiness:这是业务,用户下单操作将在这里实现。
这个案例讲了一个什么事呢?
当用户想要下单的时候,调用了 bussiness 中的接口,bussiness 中的接口又调用了它本人的 service,在 service 中,首先开启了全局分布式事务,而后通过 feign 调用 storage 中的接口去扣库存,而后再通过 feign 调用 order 中的接口去创立订单(order 在创立订单的时候,不仅会创立订单,还会扣除用户账户的余额),在这个过程中,如果有任何一个环节出错了(余额有余、库存有余等导致的问题),就会触发整体的事务回滚。
本案例具体架构如下图:
这个案例就是一个典型的分布式事务问题,storage、order 以及 account 中的事务分属于不同的微服务,然而咱们心愿他们同时胜利或者同时失败。
2.2 筹备工作
咱们先来把 Seata 服务端搭建起来。
Seata 下载地址:
- https://github.com/seata/seata/releases
目前最新版本是 1.4.2,咱们就应用最新版本来做。
这个工具在 Windows 或者 Linux 上部署差异不大,所以我这里就间接部署在 Windows 上了,不便一些。
咱们首先下载 1.4.2 版本的 zip 压缩包,下载之后解压,而后在 conf 目录中配置两个中央:
- 首先配置 file.conf 文件
file.conf 中配置 TC 的存储模式,TC 的存储模式有三种:
- file:适宜单机模式,全局事务会话信息在内存中读写,并长久化本地文件 root.data,性能较高。
- db:适宜集群模式,全局事务会话信息通过 db 共享,绝对性能差点。
- redis:适宜集群模式,全局事务会话信息通过 redis 共享,绝对性能好点,然而要留神,redis 模式在 Seata-Server 1.3 及以上版本反对,性能较高,不过存在事务信息失落的危险,所以须要开发者提前配置适宜以后场景的 redis 长久化配置。
这里咱们为了省事,配置为 file 模式,这样事务会话信息读写在内存中实现,长久化则写到本地 file,如下图:
如果配置 db 或者 redis 模式,大家记得填一下上面的相干信息。具体如下图:
题外话
留神,如果应用 db 模式,须要提前准备好数据库脚本,如下(小伙伴们能够间接在公众号江南一点雨后盾回复 seata-db 下载这个数据库脚本):
CREATE DATABASE /*!32312 IF NOT EXISTS*/`seata2` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;
USE `seata2`;
/*Table structure for table `branch_table` */
DROP TABLE IF EXISTS `branch_table`;
CREATE TABLE `branch_table` (`branch_id` bigint(20) NOT NULL,
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`resource_group_id` varchar(32) DEFAULT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`branch_type` varchar(8) DEFAULT NULL,
`status` tinyint(4) DEFAULT NULL,
`client_id` varchar(64) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL,
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
/*Data for the table `branch_table` */
/*Table structure for table `global_table` */
DROP TABLE IF EXISTS `global_table`;
CREATE TABLE `global_table` (`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`status` tinyint(4) NOT NULL,
`application_id` varchar(32) DEFAULT NULL,
`transaction_service_group` varchar(32) DEFAULT NULL,
`transaction_name` varchar(128) DEFAULT NULL,
`timeout` int(11) DEFAULT NULL,
`begin_time` bigint(20) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
/*Data for the table `global_table` */
/*Table structure for table `lock_table` */
DROP TABLE IF EXISTS `lock_table`;
CREATE TABLE `lock_table` (`row_key` varchar(128) NOT NULL,
`xid` varchar(128) DEFAULT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`branch_id` bigint(20) NOT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`table_name` varchar(32) DEFAULT NULL,
`pk` varchar(36) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
另外还须要留神的是本人的数据库版本信息,改数据库连贯的时候依照理论状况批改,Seata 针对 MySQL5.x 和 MySQL8.x 都提供了对应的数据库驱动(在 lib 目录下),咱们只须要把驱动改好就行了。
- 再配置 registry.conf 文件
registry.conf 次要配置 Seata 的注册核心,咱们这里采纳大家比拟相熟的 Eureka,配置如下:
能够看到,反对的配置核心比拟多,咱们抉择 Eureka,选好配置核心之后,记得批改配置核心相干的信息。
OK,当初就配置实现了,然而先别启动,还差一个 Eureka 注册核心。
2.3 工程搭建
首先咱们创立一个名为 seata-at 的 maven 工程,作为咱们的 parent 我的项目,微服务中的各个模块将在这个 maven 中创立。
搞过微服务的小伙伴应该晓得 Spring Cloud 体系中有一个让人特地头疼的版本抵触问题,特地是用到一些比拟有共性的组件的时候,这个版本抵触特地烦人。咱们在 Spring Cloud 中整合 seata 的时候一样也是存在版本抵触问题,一个比拟省事的解决办法是应用阿里云提供的 Spring Boot 构建地址,这个地址尽管不能应用目前最新版的 Spring Boot,然而却不存在版本抵触问题。松哥这里就采纳这种计划。
eureka
eureka 的创立其实不关涉版本问题,大家间接创立即可,引入 web 和 eureka server 依赖即可。
business
business 相当于我整个服务的入口,它里边须要用到 seata、feign,不过这里不必间接操作数据库。
创立形式如下,首先抉择 Initializr Service URL 地址为 https://start.aliyun.com
,如下图:
而后抉择咱们须要的依赖,如下:
order
接下来创立订单服务,订单服务也是基于 https://start.aliyun.com
地址来创立,相比于 business,订单服务中多了数据库操作依赖:
account
同 order 服务的创立,不再赘述。
storage
同 order 服务的创立,不再赘述。
最初创立好的工程构造如下:
2.4 工程配置
eureka
eureka 的配置比较简单,配置两个中央就行了:
- application.properties
eureka.client.fetch-registry=false
eureka.client.register-with-eureka=false
server.port=8761
这个 eureka 不仅仅是咱们一会微服务的注册地址,也是 seata-server 的注册地址,在 seata-server 的 registry.conf 配置文件中,eureka 的默认端口就是 8761,所以如果你这里不是 8761,那么记得批改一下 seata-server 的 registry.conf 配置文件。
- 启动类上加一个注解就完事:
@SpringBootApplication
@EnableEurekaServer
public class EurekaApplication {public static void main(String[] args) {SpringApplication.run(EurekaApplication.class, args);
}
}
business
business 不必操作数据库,所以配置次要是两方面。
在 seata 的应用过程中,seata-server 相当于是一个协调者的角色,波及到微服务的服务都须要注册到 seata-server 上,那么这里就波及到两个配置文件,别离是 file.conf 和 regsitry.conf。
file.conf 次要配置了微服务和 seata-server 之间的一些通信信息啥的,这个文件比拟长,小伙伴们文末下载我的项目源码间接拷贝即可。
registry.conf 则次要形容了一些注册信息,咱们这里都是注册到 eureka,所以配置一下注册到 eureka 即可。这个配置文件大家到时候也是间接下载源码拷贝过来就行了。反正这两个配置基本上也都是模版化的,并不需要做过多的批改。
如果须要理解这两个配置文件的具体含意,能够参考这个文档:
- https://seata.io/zh-cn/docs/user/configurations.html
最初再来配置一下 business 的 application.properties:
server.port=1112
spring.application.name=business
eureka.client.service-url.defaultZone=http://localhost:8761/eureka
spring.cloud.alibaba.seata.tx-service-group=my_test_tx_group
后面三行配置好说。第四行配置示意配置事务群组的名称为 my_test_tx_group
,也就是 TC 的集群名为 my_test_tx_group
,这个名字是在 file.conf 中配置的,这里依据 file.conf 中的配置状况去填写即可。
order
order 中的 file.conf 和 registry.conf 和 business 统一,不再赘述。这里就来看看它的 application.properties:
server.port=1113
spring.application.name=order
eureka.client.service-url.defaultZone=http://localhost:8761/eureka
spring.datasource.username=root
spring.datasource.password=123
spring.datasource.url=jdbc:mysql:///order?serverTimezone=Asia/Shanghai&useSSL=false
spring.cloud.alibaba.seata.tx-service-group=my_test_tx_group
这个是具体的服务,所以要连贯 order 数据库。
order 数据库脚本如下:
CREATE TABLE `order_tbl` (`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` varchar(255) DEFAULT NULL,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT '0',
`money` int(11) DEFAULT '0',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
account
account 中的 file.conf 和 registry.conf 和 business 统一,不再赘述。这里就来看看它的 application.properties:
server.port=1111
spring.application.name=account
eureka.client.service-url.defaultZone=http://localhost:8761/eureka
spring.datasource.username=root
spring.datasource.password=123
spring.datasource.url=jdbc:mysql:///account?serverTimezone=Asia/Shanghai&useSSL=false
spring.cloud.alibaba.seata.tx-service-group=my_test_tx_group
这个是具体的服务,所以要连贯 account 数据库。
account 数据库脚本如下:
CREATE TABLE `account_tbl` (`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` varchar(255) DEFAULT NULL,
`money` int(11) DEFAULT '0',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
storage
storage 中的 file.conf 和 registry.conf 和 business 统一,不再赘述。这里就来看看它的 application.properties:
server.port=1114
spring.application.name=storage
eureka.client.service-url.defaultZone=http://localhost:8761/eureka
spring.datasource.username=root
spring.datasource.password=123
spring.datasource.url=jdbc:mysql:///storage?serverTimezone=Asia/Shanghai&useSSL=false
spring.cloud.alibaba.seata.tx-service-group=my_test_tx_group
这个是具体的服务,所以要连贯 storage 数据库。
storage 数据库脚本如下:
CREATE TABLE `storage_tbl` (`id` int(11) NOT NULL AUTO_INCREMENT,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT '0',
PRIMARY KEY (`id`),
UNIQUE KEY `commodity_code` (`commodity_code`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
另外,因为在分布式事务操作的过程中,会波及到一个 undo log 表,就是咱们后面所说用来保留前后镜像的表,所以,以上三个库,再别离执行如下 SQL,各自增加一个 undo log 表。
CREATE TABLE `undo_log` (`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
2.5 模块开发
account
先来看看 account 模块的开发。
这个模块次要是提供扣款服务,如果扣款的时候没钱了,就抛出一个账户余额有余的异样。
具体操作如下:
首先创立 AccountMapper,为了省事,我这里就不创立 XML 文件了,间接用注解:
@Mapper
public interface AccountMapper {@Update("update account_tbl set money=money-#{money} where user_id=#{account}")
int updateAccount(@Param("account") String account, @Param("money") Double money);
@Select("select money from account_tbl where user_id=#{account}")
Double getMoneyByAccount(String account);
}
这里两个办法,一个是扣款,还有一个是查问账户还剩多少钱。
再来看 AccountService:
@Service
public class AccountService {
@Autowired
AccountMapper accountMapper;
public boolean deductAccount(String account, Double money) {accountMapper.updateAccount(account, money);
Double m = accountMapper.getMoneyByAccount(account);
if (m >= 0) {return true;}else{throw new RuntimeException("账户余额有余");
}
}
}
先去扣款,扣款实现后,再去查问账户余额,如果余额小于 0,就抛出异样。
最初再 AccountController 中调用这个 AccountService:
@RestController
public class AccountController {
@Autowired
AccountService accountService;
@PostMapping("/deductAccount")
public RespBean deductAccount(String account, Double money) {if (accountService.deductAccount(account, money)) {return RespBean.ok("扣款胜利");
}
return RespBean.error("扣款失败");
}
}
这块比较简单,没啥好说的。
order
再来看 order。order 这里就是下订单,下订单之前先扣款,扣款胜利的话就增加一条订单记录。所以咱们要在 order 服务中通过 OpenFeign 去调用 account 服务,先在启动类上开启 OpenFeign 的应用:
@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
public class OrderApplication {public static void main(String[] args) {SpringApplication.run(OrderApplication.class, args);
}
}
接下来再定义一个 AccountFeign 用来调用 Account 服务:
@FeignClient("account")
public interface AccountFeign {@PostMapping("/deductAccount")
RespBean deductAccount(@RequestParam("account") String account, @RequestParam("money") Double money);
}
再来看看 OrderService:
@Service
public class OrderService {
@Autowired
OrderMapper orderMapper;
@Autowired
AccountFeign accountFeign;
public boolean createOrder(String account, String productId, Integer count) {
// 扣款,每件商品 100 块钱
RespBean respBean = accountFeign.deductAccount(account, count * 100.0);
int order = orderMapper.createOrder(account, productId, count);
return order == 1 && respBean.getStatus() == 200;}
}
商品价格这里间接硬编码,每件商品 100 块钱,先扣款,扣款胜利后增加一条订单记录。
最初在 Controller 中调用这个 OrderService:
@RestController
public class OrderController {
@Autowired
OrderService orderService;
@PostMapping("/createOrder")
public RespBean createOrder(@RequestParam("acount") String account, @RequestParam("count") Integer count, @RequestParam("productId") String productId) {if (orderService.createOrder(account, productId, count)) {return RespBean.ok("下单胜利");
}
return RespBean.error("下单失败");
}
}
storage
再来看 storage 模块,这个就是扣库存的,如下:
@RestController
public class StorageController {
@Autowired
StorageService storageService;
@PostMapping("/deduct")
public RespBean deduct(@RequestParam("productId") String productId, @RequestParam("count") Integer count) {if (storageService.deduct(productId, count)) {return RespBean.ok("扣库存胜利");
}
return RespBean.error("扣库存失败");
}
}
@Service
public class StorageService {
@Autowired
StorageMapper storageMapper;
public boolean deduct(String productId, Integer count) {int deduct = storageMapper.deduct(productId, count);
int c = storageMapper.getCountByProductId(productId);
if (c >= 0) {return true;}
throw new RuntimeException("库存有余,扣库存失败");
}
}
扣完库存后检查一下,如果库存总数小于 0,阐明库存有余,此时间接抛出异样即可。
business
business 是整个服务的入口,在 business 中调用 order 和 storage 两个服务,并且在 business 中开启全局事务,如果以上三个服务中,有任何一个服务抛出异样,都会导致全局事务回滚,咱们来看下 business 中的代码:
@RestController
public class BusinessController {
@Autowired
BusinessService businessService;
@GetMapping("/order")
public RespBean order(String account, Integer count, String productId) {
try {businessService.purchase(account, count, productId);
return RespBean.ok("下单胜利");
} catch (Exception e) {return RespBean.error(e.getMessage());
}
}
}
@Service
public class BusinessService {
@Autowired
StorageFeignClient storageFeignClient;
@Autowired
OrderFeignClient orderFeignClient;
@GlobalTransactional
public void purchase(String account, Integer count, String productId) {storageFeignClient.deduce(productId, count);
orderFeignClient.createOrder(account, count, productId);
}
}
@FeignClient("order")
public interface OrderFeignClient {@PostMapping("/createOrder")
RespBean createOrder(@RequestParam("acount") String account, @RequestParam("count") Integer count, @RequestParam("productId") String productId);
}
@FeignClient("storage")
public interface StorageFeignClient {@PostMapping("/deduct")
RespBean deduce(@RequestParam("productId") String productId, @RequestParam("count") Integer count);
}
大家留神,seata 中的 at 模式,在通过后面的配置之后,咱们在后续应用的时候,当初的工作就非常简单了,只须要在指标办法上增加一个 @GlobalTransactional
注解即可,就是这么 easy。
common
最初咱们再提供一个公共模块,这个公共模块被其余所有业务模块所所依赖,在公共模块中咱们来解决全局异样:
@RestControllerAdvice
public class GlobalException {@ExceptionHandler(RuntimeException.class)
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public RespBean runtimeException(RuntimeException e) {return RespBean.error(e.getMessage());
}
}
2.6 测试
最初,咱们来简略测试下。
先本人手动给 account 表和 storage 表加几条记录,比方我这里设置 zhangsan 有 10000 块钱:
设置编号为 1111 的商品有 100 件:
而后咱们来一个购买,如下:
zhangsan 想买 1000 件商品,显然库存不够,购买失败。此时去查看数据库,account 表、order 表 以及 storage 表都曾经回滚了。
而后咱们也能够批改表,设置 zhangsan 有 1 块钱,而后批改申请,如下:
大家看到,此时的异样就是账户余额有余了。
最初咱们还是设置 zhangsan 有 10000 块钱,而后来一个失常的测试,如下:
有小伙伴可能会说,咦!没看到 undo log 表的应用呀?其实在分布式事务中,undo log 是施展了作用的,只是当二阶段执行结束后,无论是提交还是回滚,都会删除 undo log 表中的记录,所以就没看到 undo log 中的数据了。
如果小伙伴们想看到 undo log 中的数据,那么简略,只须要在 business 的业务办法中 debug,在零碎运行的过程中暂停一下,此时关上数据库,就能看到 undo log 表中的数据了。这个我会在 TienChin 我的项目的视频中和大家具体来说。
参考资料:
- seata.io