Seata分布式事务微服务接入
一、初始化SQL
波及到业务库都要新建:undo_log表
CREATE TABLE IF NOT EXISTS `undo_log`( `branch_id` BIGINT(20) NOT NULL COMMENT 'branch transaction id', `xid` VARCHAR(100) NOT NULL COMMENT 'global transaction id', `context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization', `rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info', `log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status', `log_created` DATETIME(6) NOT NULL COMMENT 'create datetime', `log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime', UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';
二、我的项目配置
依赖 ,目前和Seata Server
服务端放弃版本统一:1.3.0
<!--SpringCloud alibaba seata--><dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-seata</artifactId> <version>2.2.0.RELEASE</version> <exclusions> <exclusion> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> </exclusion> <exclusion> <groupId>io.seata</groupId> <artifactId>seata-all</artifactId> </exclusion> </exclusions></dependency><dependency> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> <version>1.3.0</version></dependency>
application
配置
seata: # 开启Springboot主动拆卸 默认true enabled: true # 开启数据源主动代理 默认true enable-auto-data-source-proxy: true # app id application-id: ${spring.application.name} # 事务组(能够每个利用独立取名,也能够应用雷同的名字,留神跟配置文件保持一致) tx-service-group: ${spring.application.name} # 配置核心 config: type: apollo apollo: app-id: ${spring.application.name} #apollo-meta: http://10.0.17.92:9083 apollo-meta: ${apollo.meta} namespace: seata # 注册核心 registry: type: eureka eureka: weight: 1 application: seata-server #service-url: http://10.0.17.92:9001/eureka/ # eureka.service.ip-address 通过配置注入 service-url: ${eureka.service.ip-address}
Apollo
seata
命名空间配置
# transport配置transport.type = TCPtransport.server = NIOtransport.heartbeat = true# 客户端事务音讯申请合并发送 默认truetransport.enableClientBatchSendRequest = truetransport.compressor = nonetransport.shutdown.wait = 3transport.serialization = seatatransport.threadFactory.bossThreadPrefix = NettyBosstransport.threadFactory.workerThreadPrefix = NettyServerNIOWorkertransport.threadFactory.serverExecutorThread-prefix = NettyServerBizHandlertransport.threadFactory.shareBossWorker = falsetransport.threadFactory.clientSelectorThreadPrefix = NettyClientSelectortransport.threadFactory.clientSelectorThreadSize = 1transport.threadFactory.clientWorkerThreadPrefix = NettyClientWorkerThreadtransport.threadFactory.bossThreadSize = 1transport.threadFactory.workerThreadSize = default# client配置# 异步提交缓存队列长度(默认10000)client.rm.asyncCommitBufferLimit = 10000# 一阶段后果上报TC重试次数(默认5)client.rm.reportRetryCount = 5# 主动刷新缓存中的表构造(默认false)client.rm.tableMetaCheckEnable = false# 是否上报一阶段胜利(默认false),true用于放弃分支事务生命周期残缺client.rm.reportSuccessEnable = false# 校验或占用全局锁重试距离(默认10ms)client.rm.lock.retryInterval = 10# 校验或占用全局锁重试次数(默认30)client.rm.lock.retryTimes = 30# 分支事务与其它全局事务回滚事务抵触时策略(优先开释本地锁让回滚)client.rm.lock.retryPolicyBranchRollbackOnConflict = true# 一阶段全局回滚上报TC重试次数(默认1次,倡议大于1)client.tm.rollbackRetryCount = 3# 一阶段全局提交上报TC重试次数(默认1次,倡议大于1)client.tm.commitRetryCount = 3# undo序列化形式(默认jackson)client.undo.logSerialization = jackson# 自定义undo_log表明(默认undo_log)client.undo.logTable = undo_log# 二阶段回滚镜像校验(默认true开启)client.undo.dataValidation = false# 日志异样输入概率(默认100)client.log.exceptionRate = 100# service配置# seata-server服务端地址 仅在store.mode=file时失效service.default.grouplist = 10.0.17.92:8091# 各自我的项目需配置本人的 service.vgroupMapping.事务组 , seata-server为服务端事务组service.vgroupMapping.claim-workflow = seata-server# 开启降级,业务间断出错则不应用事务。默认falseservice.enableDegrade = false# 禁用全局事务(默认false)service.disableGlobalTransaction = false
三、手动注入数据源
Seata
1.3.0版本,曾经实现主动数据代理注入。
- 参数:seata.enable-auto-data-source-proxy=true ,默认为true
Seata
1.3.0版本后,不须要手动注入数据源(即以下代码)。
@Configurationpublic class DataSourceProxyConfig { @Bean @ConfigurationProperties(prefix = "spring.datasource") public DataSource druidDataSource(){ return new DruidDataSource(); }}
import com.alibaba.druid.pool.DruidDataSource;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import javax.sql.DataSource;@Configurationpublic class DataSourceProxyConfig { //@Value("${mybatis-plus.mapper-locations}") //private String mapperLocations; @Bean @ConfigurationProperties(prefix = "spring.datasource") public DataSource druidDataSource(){ return new DruidDataSource(); } //@Primary //@Bean //public DataSourceProxy dataSourceProxy(DataSource dataSource) { // return new DataSourceProxy(dataSource); //} //代理mybaits-plus //@Bean //public MybatisSqlSessionFactoryBean sqlSessionFactoryBean(DataSource dataSourceProxy) throws Exception { // MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean(); // sqlSessionFactoryBean.setDataSource(dataSourceProxy); // sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations)); // sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory()); // return sqlSessionFactoryBean; //} //代理mybatis / mybatis-config //@Bean //public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception { // SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean(); // sqlSessionFactoryBean.setDataSource(dataSourceProxy); // sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations)); // sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory()); // return sqlSessionFactoryBean.getObject(); //}}
四、事务应用
@GlobalTransactional
能够自定义名称和超时工夫,默认超时工夫:60秒
@GlobalTransactional(timeoutMills = 60000,rollbackFor = Exception.class)public Result submitById(ReportDispatchingCommand command, UserLoginDTO userLoginDTO){ //业务逻辑}
- 注意事项: 业务流程中如果波及到
先插入
后批改插入数据
的流程,要保障所有的改库动作
都加@GlobalTransactional
注解
像这样,test2
的接口加了@GlobalTransactional
,test3
的接口没有加@GlobalTransactional
如果执行程序是:test2
执行更新 ->test3
执行更新 ->test2
回滚
这样就会因为test3
执行了更新,导致回滚test
的时候,seata
认为有脏数据,就不回滚了。
要防止这种状况就须要给test3
的办法也加上@GlobalTransactional
的注解
// service A@GetMapping("test2")@GlobalTransactionalpublic void test2() { bService.doInsertAndUpdate();}// service A// 留神这里没有加@GlobalTransactional@GetMapping("test3")public void test3() { bService.doInsertAndUpdate();}
五、注意事项
1. pom.xml
版本抵触问题
spring-cloud-alibaba-seata
中主动依赖了seata-spring-boot-starter
。然而版本对应不上,所以要解决版本不兼容问题
2. Could not register branch into global session
异样
- 异步调用不不退出全局事务,异步改为MQ调用
这里很重要异样:Could not register branch into global session xid = status = Rollbacked(还有 Rollbacking、AsyncCommitting等等二阶段状态) while expecting Begin形容:分支事务注册时,全局事务状态需是一阶段状态begin,非begin不容许注册。 属于seata框架层面失常的解决,用户能够从本身业务层面解决。呈现场景(可持续补充)1. 分支事务是异步,全局事务无奈感知它的执行进度,全局事务已进入二阶段,该异步分支才来注册2. 服务a rpc 服务b超时(dubbo、feign等默认1秒超时),a上抛异样给tm,tm告诉tc回滚,然而b还是收到了申请 (网络提早或rpc框架重试),而后去tc注册时发现全局事务已在回滚3. tc感知全局事务超时(@GlobalTransactional(timeoutMills = 默认60秒)),被动变更状态并告诉各分支事务 回滚,此时有新的分支事务来注册
# 设置RPC调用的超时工夫ribbon: # 连贯超时 ConnectTimeout: 5000 # 响应超时 设置超时工夫设置长一点。应为回滚须要工夫 ReadTimeout: 10000
3. @GlobalTransactional
全局事务不失效
事务是基于接口或基于类的代理被创立的。事务注解,没有加在办法入口,事务无奈失效
@ Servicepublic class Service1 { @ Autowired private Service1 self; public void m1() { // 类外部间接调用,不会触发AOP //this.m2(); //通过保护一个本人的bean来调用,从而使事务失效 self.m2(); } @ GlobalTransactional publiv void m2() { }}
4. 引入 Seata
后,spring
本地事务注解@Transactional
有效
- 是否所有sql都走
DataSourceProxy
代理;而DataSourceProxy
代理是间接commit代码;所以只能应用@GlobalTransactional
能力执行undo 回滚- 所以Seata就不反对 spring
@Transactional
的本地事务了?待验证 。。。
六、原理剖析
Spring事务个性
- 事务的隔离级别:是指若干个并发的事务之间的隔离水平
1. @Transactional(isolation = Isolation.READ_UNCOMMITTED):读取未提交数据(会呈现脏读, 不可反复读) 根本不应用2. @Transactional(isolation = Isolation.READ_COMMITTED):读取已提交数据(会呈现不可反复读和幻读)3. @Transactional(isolation = Isolation.REPEATABLE_READ):可反复读(会呈现幻读)4. @Transactional(isolation = Isolation.SERIALIZABLE):串行化
- 事务流传行为:如果在开始以后事务之前,一个事务上下文曾经存在,此时有若干选项能够指定一个事务性办法的执行行为
1. TransactionDefinition.PROPAGATION_REQUIRED: 如果以后存在事务,则退出该事务;如果以后没有事务,则创立一个新的事务。这是默认值。2. TransactionDefinition.PROPAGATION_REQUIRES_NEW: 创立一个新的事务,如果以后存在事务,则把以后事务挂起。3. TransactionDefinition.PROPAGATION_SUPPORTS: 如果以后存在事务,则退出该事务;如果以后没有事务,则以非事务的形式持续运行。4. TransactionDefinition.PROPAGATION_NOT_SUPPORTED: 以非事务形式运行,如果以后存在事务,则把以后事务挂起。5. TransactionDefinition.PROPAGATION_NEVER: 以非事务形式运行,如果以后存在事务,则抛出异样。6. TransactionDefinition.PROPAGATION_MANDATORY: 如果以后存在事务,则退出该事务;如果以后没有事务,则抛出异样。7. TransactionDefinition.PROPAGATION_NESTED: 如果以后存在事务,则创立一个事务作为以后事务的嵌套事务来运行; 如果以后没有事务,则该取值等价于TransactionDefinition.PROPAGATION_REQUIRED。
Seata框架设计
Seata框架的次要角色
- (Transaction ID)XID 全局惟一事务ID
- (Transaction Manager) TM 事务管理器
- (Resource Manager ) RM 资源管理器
- (Transaction Coordinator) TC 事务协调器
事务管理器:外围模块为全局事务管理器,次要负责:全局事务的开启、提交、回滚。
资源管理器:执行具体的分支事务,由对应的执行器去执行。包含重做日志,SQL的执行;
理论的SQL由连贯代理ConnectionProxy执行,连贯代理ConnectProxy会注册事务分支到事务 协调器TC,执行完理论SQL后,上报分支事务的状态给事务协调器TC。
事务协调器:依据事务管理器TM的全局事务的开启、提交、回滚申请,创立全局事务,驱动分支事务的提交 和回滚,并保护全局事务和分支事务的状态。
- 执行流程
全局事务与分支事务
- 全局事务: 通过XID(全局惟一事务ID) 来标识
xid = 10.0.18.202:8905:138030304412573696
- 分支事务:通过分支事务ID和资源ID来标识,每个分支事务都有XID来标识属于哪个全局事务
branchId = 138030306228707329
Register branch successfully, xid = 10.0.18.202:8905:138030304412573696, branchId = 138030306228707329, resourceId = jdbc:mysql://10.0.18.200:3306/claim_auprocess ,lockKeys = c_au_image_detail_info:340489328012288[2m2021-05-18 13:23:09.581[0;39m [32m INFO[0;39m [2m---[0;39m [2m[LoggerPrint_1_1][0;39m [36mi.s.c.r.p.server.BatchLogHandler [0;39m [2m:[0;39m SeataMergeMessage xid=10.0.18.202:8905:138030304412573696,branchType=AT,resourceId=jdbc:mysql://10.0.18.200:3306/claim_auprocess,lockKey=c_au_image_detail_info:340489328012289
1. Seata-client
与 Seata-server
是如何通信的
1. seata-servaer 向注册核心注册2. seata-client 向注册核心注册3. seata-client 通过注册核心返回的seata-server服务列表,找到seata-server
- 注册核心的分类
1. file 文件类型:用于做概念验证,目标是通过配置文件,让client疾速找到seata-server,从而免去依赖第三方注册核心
2. 非file类型 nacos apollo zookeeper redis etcd ....
2. Seata分布式事务的根底 DataSource
- 所有的外围都在
io.seata.rm.datasource.DataSourceProxy
从 0.9 版本后,曾经实现主动数据代理注入,默认开启。
seata:# 开启Springboot主动拆卸 默认trueenabled: true# 开启数据源主动代理 默认trueenable-auto-data-source-proxy: true
咱们只有把DataSourceProxy注册成默认的 java.sql.DataSource实现,并提供给其它框架(mybatis\jdbctemplate)拆卸,就达到目标。所以client端的所有配置都围绕着把DataSourceProxy注册成默认的 java.sql.DataSource,或者把数据库拜访框架的Datasource配置成DataSourceProxy来进行的。
3. 锁管理器 LockManager
锁管理器的顶级接口:
LockManager
, 锁管理器必须实现该接口。Lock_key格局 :数据库资源^^^表名^^^主键ID
示例:
jdbc:mysql://10.0.17.xxx:3306/xxxx^^^act_ge_bytearray^^^341269355654272
七、参考文档
- 参数配置文档
spring cloud seata 参数配置
Seata解析-TC端file.conf文件各配置作用总结
- 回滚失败
被调用方存在两个本地事务,在调用方抛异样时,被调用方反复呈现Branch Rollbacked faild
压力测试一段时间后,tm端rpc调用server端超时 RPC timeout:RPC timeout
- 源码剖析
Seata源码解析
- seata springboot 整合
微服务分布式事务解决方案-springboot整合分布式seata1.3.0
分布式事务两阶段提交 Eureka+Seata计划
springcloud-nacos-seata 实现分布式事务
- 我的项目异样问题
Could not commit JDBC transaction; nested exception is io.seata.rm.datasource.exec.LockConflictException: get global lock fail
狐疑是竞争锁导致的。所以批改seata服务的参数
- client.rm.lock.retryInterval 20 校验或占用全局锁重试距离 默认10,单位毫秒
- client.rm.lock.retryTimes 60 校验或占用全局锁重试次数 默认30
seata 提交全局事务失败 Unable to commit against JDBC Connection
- 博客分享
又见分布式事务之阿里开源Seata
- 事务不失效
seata 分布式事务不失效
本文由博客一文多发平台 OpenWrite 公布!