Seata 分布式事务微服务接入
一、初始化 SQL
波及到业务库都要新建:undo_log 表
CREATE TABLE IF NOT EXISTS `undo_log`
(`branch_id` BIGINT(20) NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(100) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';
二、我的项目配置
依赖,目前和
Seata Server
服务端放弃版本统一:1.3.0
<!--SpringCloud alibaba seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
<version>2.2.0.RELEASE</version>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.3.0</version>
</dependency>
application
配置
seata:
# 开启 Springboot 主动拆卸 默认 true
enabled: true
# 开启数据源主动代理 默认 true
enable-auto-data-source-proxy: true
# app id
application-id: ${spring.application.name}
# 事务组(能够每个利用独立取名, 也能够应用雷同的名字, 留神跟配置文件保持一致)
tx-service-group: ${spring.application.name}
# 配置核心
config:
type: apollo
apollo:
app-id: ${spring.application.name}
#apollo-meta: http://10.0.17.92:9083
apollo-meta: ${apollo.meta}
namespace: seata
# 注册核心
registry:
type: eureka
eureka:
weight: 1
application: seata-server
#service-url: http://10.0.17.92:9001/eureka/
# eureka.service.ip-address 通过配置注入
service-url: ${eureka.service.ip-address}
Apollo
seata
命名空间配置
# transport 配置
transport.type = TCP
transport.server = NIO
transport.heartbeat = true
# 客户端事务音讯申请合并发送 默认 true
transport.enableClientBatchSendRequest = true
transport.compressor = none
transport.shutdown.wait = 3
transport.serialization = seata
transport.threadFactory.bossThreadPrefix = NettyBoss
transport.threadFactory.workerThreadPrefix = NettyServerNIOWorker
transport.threadFactory.serverExecutorThread-prefix = NettyServerBizHandler
transport.threadFactory.shareBossWorker = false
transport.threadFactory.clientSelectorThreadPrefix = NettyClientSelector
transport.threadFactory.clientSelectorThreadSize = 1
transport.threadFactory.clientWorkerThreadPrefix = NettyClientWorkerThread
transport.threadFactory.bossThreadSize = 1
transport.threadFactory.workerThreadSize = default
# client 配置
# 异步提交缓存队列长度(默认 10000)
client.rm.asyncCommitBufferLimit = 10000
# 一阶段后果上报 TC 重试次数(默认 5)
client.rm.reportRetryCount = 5
# 主动刷新缓存中的表构造(默认 false)
client.rm.tableMetaCheckEnable = false
# 是否上报一阶段胜利(默认 false),true 用于放弃分支事务生命周期残缺
client.rm.reportSuccessEnable = false
# 校验或占用全局锁重试距离(默认 10ms)
client.rm.lock.retryInterval = 10
# 校验或占用全局锁重试次数(默认 30)
client.rm.lock.retryTimes = 30
# 分支事务与其它全局事务回滚事务抵触时策略(优先开释本地锁让回滚)
client.rm.lock.retryPolicyBranchRollbackOnConflict = true
# 一阶段全局回滚上报 TC 重试次数(默认 1 次,倡议大于 1)
client.tm.rollbackRetryCount = 3
# 一阶段全局提交上报 TC 重试次数(默认 1 次,倡议大于 1)
client.tm.commitRetryCount = 3
# undo 序列化形式(默认 jackson)
client.undo.logSerialization = jackson
# 自定义 undo_log 表明(默认 undo_log)
client.undo.logTable = undo_log
# 二阶段回滚镜像校验(默认 true 开启)
client.undo.dataValidation = false
# 日志异样输入概率(默认 100)
client.log.exceptionRate = 100
# service 配置
# seata-server 服务端地址 仅在 store.mode=file 时失效
service.default.grouplist = 10.0.17.92:8091
# 各自我的项目需配置本人的 service.vgroupMapping. 事务组 , seata-server 为服务端事务组
service.vgroupMapping.claim-workflow = seata-server
# 开启降级,业务间断出错则不应用事务。默认 false
service.enableDegrade = false
# 禁用全局事务(默认 false)
service.disableGlobalTransaction = false
三、手动注入数据源
Seata
1.3.0 版本,曾经实现主动数据代理注入。
- 参数:seata.enable-auto-data-source-proxy=true , 默认为 true
Seata
1.3.0 版本后,不须要手动注入数据源(即以下代码)。
@Configuration
public class DataSourceProxyConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource druidDataSource(){return new DruidDataSource();
}
}
import com.alibaba.druid.pool.DruidDataSource;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
@Configuration
public class DataSourceProxyConfig {//@Value("${mybatis-plus.mapper-locations}")
//private String mapperLocations;
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource druidDataSource(){return new DruidDataSource();
}
//@Primary
//@Bean
//public DataSourceProxy dataSourceProxy(DataSource dataSource) {// return new DataSourceProxy(dataSource);
//}
// 代理 mybaits-plus
//@Bean
//public MybatisSqlSessionFactoryBean sqlSessionFactoryBean(DataSource dataSourceProxy) throws Exception {// MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
// sqlSessionFactoryBean.setDataSource(dataSourceProxy);
// sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
// sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
// return sqlSessionFactoryBean;
//}
// 代理 mybatis / mybatis-config
//@Bean
//public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {// SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
// sqlSessionFactoryBean.setDataSource(dataSourceProxy);
// sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
// sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
// return sqlSessionFactoryBean.getObject();
//}
}
四、事务应用
@GlobalTransactional
能够自定义名称和超时工夫,默认超时工夫:60 秒
@GlobalTransactional(timeoutMills = 60000,rollbackFor = Exception.class)
public Result submitById(ReportDispatchingCommand command, UserLoginDTO userLoginDTO){// 业务逻辑}
- 注意事项:业务流程中如果波及到
先插入
后批改插入数据
的流程,要保障所有的改库动作
都加@GlobalTransactional
注解
像这样,
test2
的接口加了@GlobalTransactional
,test3
的接口没有加@GlobalTransactional
如果执行程序是:test2
执行更新 ->test3
执行更新 ->test2
回滚
这样就会因为test3
执行了更新,导致回滚test
的时候,seata
认为有脏数据,就不回滚了。
要防止这种状况就须要给test3
的办法也加上@GlobalTransactional
的注解
// service A
@GetMapping("test2")
@GlobalTransactional
public void test2() {bService.doInsertAndUpdate();
}
// service A
// 留神这里没有加 @GlobalTransactional
@GetMapping("test3")
public void test3() {bService.doInsertAndUpdate();
}
五、注意事项
1. pom.xml
版本抵触问题
spring-cloud-alibaba-seata
中主动依赖了seata-spring-boot-starter
。然而版本对应不上,所以要解决版本不兼容问题
2. Could not register branch into global session
异样
- 异步调用不不退出全局事务,异步改为 MQ 调用
这里很重要
异样:Could not register branch into global session xid = status = Rollbacked(还有 Rollbacking、AsyncCommitting 等等二阶段状态)while expecting Begin
形容:分支事务注册时,全局事务状态需是一阶段状态 begin,非 begin 不容许注册。属于 seata 框架层面失常的解决,用户能够从本身业务层面解决。呈现场景(可持续补充)1. 分支事务是异步,全局事务无奈感知它的执行进度,全局事务已进入二阶段,该异步分支才来注册
2. 服务 a rpc 服务 b 超时(dubbo、feign 等默认 1 秒超时),a 上抛异样给 tm,tm 告诉 tc 回滚,然而 b 还是收到了申请(网络提早或 rpc 框架重试),而后去 tc 注册时发现全局事务已在回滚
3. tc 感知全局事务超时(@GlobalTransactional(timeoutMills = 默认 60 秒)),被动变更状态并告诉各分支事务 回滚,此时有新的分支事务来注册
# 设置 RPC 调用的超时工夫
ribbon:
# 连贯超时
ConnectTimeout: 5000
# 响应超时 设置超时工夫设置长一点。应为回滚须要工夫
ReadTimeout: 10000
3. @GlobalTransactional
全局事务不失效
事务是基于接口或基于类的代理被创立的。事务注解,没有加在办法入口,事务无奈失效
@ Service
public class Service1 {
@ Autowired
private Service1 self;
public void m1() {
// 类外部间接调用,不会触发 AOP
//this.m2();
// 通过保护一个本人的 bean 来调用,从而使事务失效
self.m2();}
@ GlobalTransactional
publiv void m2() {}
}
4. 引入 Seata
后,spring
本地事务注解 @Transactional
有效
- 是否所有 sql 都走
DataSourceProxy
代理;而DataSourceProxy
代理是间接 commit 代码;所以只能应用@GlobalTransactional
能力执行 undo 回滚- 所以 Seata 就不反对 spring
@Transactional
的本地事务了? 待验证。。。
六、原理剖析
Spring 事务个性
- 事务的隔离级别:是指若干个并发的事务之间的隔离水平
1. @Transactional(isolation = Isolation.READ_UNCOMMITTED):读取未提交数据(会呈现脏读, 不可反复读) 根本不应用
2. @Transactional(isolation = Isolation.READ_COMMITTED):读取已提交数据(会呈现不可反复读和幻读)
3. @Transactional(isolation = Isolation.REPEATABLE_READ):可反复读(会呈现幻读)
4. @Transactional(isolation = Isolation.SERIALIZABLE):串行化
- 事务流传行为:如果在开始以后事务之前,一个事务上下文曾经存在,此时有若干选项能够指定一个事务性办法的执行行为
1. TransactionDefinition.PROPAGATION_REQUIRED: 如果以后存在事务,则退出该事务;如果以后没有事务,则创立一个新的事务。这是默认值。2. TransactionDefinition.PROPAGATION_REQUIRES_NEW: 创立一个新的事务,如果以后存在事务,则把以后事务挂起。3. TransactionDefinition.PROPAGATION_SUPPORTS: 如果以后存在事务,则退出该事务;如果以后没有事务,则以非事务的形式持续运行。4. TransactionDefinition.PROPAGATION_NOT_SUPPORTED: 以非事务形式运行,如果以后存在事务,则把以后事务挂起。5. TransactionDefinition.PROPAGATION_NEVER: 以非事务形式运行,如果以后存在事务,则抛出异样。6. TransactionDefinition.PROPAGATION_MANDATORY: 如果以后存在事务,则退出该事务;如果以后没有事务,则抛出异样。7. TransactionDefinition.PROPAGATION_NESTED: 如果以后存在事务,则创立一个事务作为以后事务的嵌套事务来运行;如果以后没有事务,则该取值等价于 TransactionDefinition.PROPAGATION_REQUIRED。
Seata 框架设计
Seata 框架的次要角色
- (Transaction ID)XID 全局惟一事务 ID
- (Transaction Manager) TM 事务管理器
- (Resource Manager) RM 资源管理器
- (Transaction Coordinator) TC 事务协调器
事务管理器:外围模块为全局事务管理器,次要负责:全局事务的开启、提交、回滚。
资源管理器:执行具体的分支事务,由对应的执行器去执行。包含重做日志,SQL 的执行;
理论的 SQL 由连贯代理 ConnectionProxy 执行,连贯代理 ConnectProxy 会注册事务分支到事务 协调器 TC,执行完理论 SQL 后,上报分支事务的状态给事务协调器 TC。
事务协调器:依据事务管理器 TM 的全局事务的开启、提交、回滚申请,创立全局事务,驱动分支事务的提交 和回滚,并保护全局事务和分支事务的状态。
- 执行流程
全局事务与分支事务
- 全局事务:通过 XID(全局惟一事务 ID) 来标识
xid = 10.0.18.202:8905:138030304412573696
- 分支事务:通过分支事务 ID 和资源 ID 来标识,每个分支事务都有 XID 来标识属于哪个全局事务
branchId = 138030306228707329
Register branch successfully, xid = 10.0.18.202:8905:138030304412573696, branchId = 138030306228707329, resourceId = jdbc:mysql://10.0.18.200:3306/claim_auprocess ,lockKeys = c_au_image_detail_info:340489328012288
[2m2021-05-18 13:23:09.581[0;39m [32m INFO[0;39m [2m---[0;39m [2m[LoggerPrint_1_1][0;39m [36mi.s.c.r.p.server.BatchLogHandler [0;39m [2m:[0;39m SeataMergeMessage xid=10.0.18.202:8905:138030304412573696,branchType=AT,resourceId=jdbc:mysql://10.0.18.200:3306/claim_auprocess,lockKey=c_au_image_detail_info:340489328012289
1. Seata-client
与 Seata-server
是如何通信的
1. seata-servaer 向注册核心注册
2. seata-client 向注册核心注册
3. seata-client 通过注册核心返回的 seata-server 服务列表,找到 seata-server
- 注册核心的分类
1. file
文件类型:用于做概念验证,目标是通过配置文件,让 client 疾速找到 seata-server, 从而免去依赖第三方注册核心
2. 非 file 类型
nacos
apollo
zookeeper
redis
etcd
....
2. Seata 分布式事务的根底 DataSource
- 所有的外围都在
io.seata.rm.datasource.DataSourceProxy
从 0.9 版本后,曾经实现主动数据代理注入,默认开启。
seata: # 开启 Springboot 主动拆卸 默认 true enabled: true # 开启数据源主动代理 默认 true enable-auto-data-source-proxy: true
咱们只有把 DataSourceProxy 注册成默认的 java.sql.DataSource 实现,并提供给其它框架 (mybatis\jdbctemplate) 拆卸,就达到目标。所以 client 端的所有配置都围绕着把 DataSourceProxy 注册成默认的 java.sql.DataSource,或者把数据库拜访框架的 Datasource 配置成 DataSourceProxy 来进行的。
3. 锁管理器 LockManager
锁管理器的顶级接口:
LockManager
,锁管理器必须实现该接口。Lock_key 格局:数据库资源 ^^^ 表名 ^^^ 主键 ID
示例:
jdbc:mysql://10.0.17.xxx:3306/xxxx^^^act_ge_bytearray^^^341269355654272
七、参考文档
- 参数配置文档
spring cloud seata 参数配置
Seata 解析 -TC 端 file.conf 文件各配置作用总结
- 回滚失败
被调用方存在两个本地事务,在调用方抛异样时,被调用方反复呈现 Branch Rollbacked faild
压力测试一段时间后,tm 端 rpc 调用 server 端超时 RPC timeout:RPC timeout
- 源码剖析
Seata 源码解析
- seata springboot 整合
微服务分布式事务解决方案 -springboot 整合分布式 seata1.3.0
分布式事务两阶段提交 Eureka+Seata 计划
springcloud-nacos-seata 实现分布式事务
- 我的项目异样问题
Could not commit JDBC transaction; nested exception is io.seata.rm.datasource.exec.LockConflictException: get global lock fail
狐疑是竞争锁导致的。所以批改 seata 服务的参数
- client.rm.lock.retryInterval 20 校验或占用全局锁重试距离 默认 10,单位毫秒
- client.rm.lock.retryTimes 60 校验或占用全局锁重试次数 默认 30
seata 提交全局事务失败 Unable to commit against JDBC Connection
- 博客分享
又见分布式事务之阿里开源 Seata
- 事务不失效
seata 分布式事务不失效
本文由博客一文多发平台 OpenWrite 公布!