关于java:Springboot微服务整合Seata分布式事务二微服务接入

45次阅读

共计 10639 个字符,预计需要花费 27 分钟才能阅读完成。

Seata 分布式事务微服务接入

一、初始化 SQL

波及到业务库都要新建:undo_log 表

CREATE TABLE IF NOT EXISTS `undo_log`
(`branch_id`     BIGINT(20)   NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(100) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

二、我的项目配置

依赖,目前和 Seata Server 服务端放弃版本统一:1.3.0

<!--SpringCloud alibaba seata-->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-alibaba-seata</artifactId>
    <version>2.2.0.RELEASE</version>
    <exclusions>
        <exclusion>
            <groupId>io.seata</groupId>
            <artifactId>seata-spring-boot-starter</artifactId>
        </exclusion>
        <exclusion>
            <groupId>io.seata</groupId>
            <artifactId>seata-all</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>1.3.0</version>
</dependency>

application 配置

seata:
  # 开启 Springboot 主动拆卸 默认 true
  enabled: true
  # 开启数据源主动代理 默认 true
  enable-auto-data-source-proxy: true
  # app id
  application-id: ${spring.application.name}
  # 事务组(能够每个利用独立取名, 也能够应用雷同的名字, 留神跟配置文件保持一致)
  tx-service-group: ${spring.application.name}
  # 配置核心
  config:
    type: apollo
    apollo:
      app-id: ${spring.application.name}
      #apollo-meta: http://10.0.17.92:9083
      apollo-meta: ${apollo.meta}
      namespace: seata
  # 注册核心
  registry:
    type: eureka
    eureka:
      weight: 1
      application: seata-server
      #service-url: http://10.0.17.92:9001/eureka/
      # eureka.service.ip-address 通过配置注入
      service-url: ${eureka.service.ip-address}

Apollo seata命名空间配置

# transport 配置
transport.type = TCP
transport.server = NIO
transport.heartbeat = true
# 客户端事务音讯申请合并发送 默认 true
transport.enableClientBatchSendRequest = true
transport.compressor = none
transport.shutdown.wait = 3
transport.serialization = seata
transport.threadFactory.bossThreadPrefix = NettyBoss
transport.threadFactory.workerThreadPrefix = NettyServerNIOWorker
transport.threadFactory.serverExecutorThread-prefix = NettyServerBizHandler
transport.threadFactory.shareBossWorker = false
transport.threadFactory.clientSelectorThreadPrefix = NettyClientSelector
transport.threadFactory.clientSelectorThreadSize = 1
transport.threadFactory.clientWorkerThreadPrefix = NettyClientWorkerThread
transport.threadFactory.bossThreadSize = 1
transport.threadFactory.workerThreadSize = default

# client 配置
# 异步提交缓存队列长度(默认 10000)
client.rm.asyncCommitBufferLimit = 10000
# 一阶段后果上报 TC 重试次数(默认 5)
client.rm.reportRetryCount = 5
# 主动刷新缓存中的表构造(默认 false)
client.rm.tableMetaCheckEnable = false
# 是否上报一阶段胜利(默认 false),true 用于放弃分支事务生命周期残缺
client.rm.reportSuccessEnable = false
# 校验或占用全局锁重试距离(默认 10ms)
client.rm.lock.retryInterval = 10
# 校验或占用全局锁重试次数(默认 30)
client.rm.lock.retryTimes = 30
# 分支事务与其它全局事务回滚事务抵触时策略(优先开释本地锁让回滚)
client.rm.lock.retryPolicyBranchRollbackOnConflict = true
# 一阶段全局回滚上报 TC 重试次数(默认 1 次,倡议大于 1)
client.tm.rollbackRetryCount = 3
# 一阶段全局提交上报 TC 重试次数(默认 1 次,倡议大于 1)
client.tm.commitRetryCount = 3
# undo 序列化形式(默认 jackson)
client.undo.logSerialization = jackson
# 自定义 undo_log 表明(默认 undo_log)
client.undo.logTable = undo_log
# 二阶段回滚镜像校验(默认 true 开启)
client.undo.dataValidation = false
# 日志异样输入概率(默认 100)
client.log.exceptionRate = 100

# service 配置
# seata-server 服务端地址 仅在 store.mode=file 时失效
service.default.grouplist = 10.0.17.92:8091
# 各自我的项目需配置本人的 service.vgroupMapping. 事务组 , seata-server 为服务端事务组
service.vgroupMapping.claim-workflow = seata-server
# 开启降级,业务间断出错则不应用事务。默认 false
service.enableDegrade = false
# 禁用全局事务(默认 false)
service.disableGlobalTransaction = false

三、手动注入数据源

Seata 1.3.0 版本,曾经实现主动数据代理注入。

  • 参数:seata.enable-auto-data-source-proxy=true , 默认为 true
  • Seata 1.3.0 版本后,不须要手动注入数据源(即以下代码)。
@Configuration
public class DataSourceProxyConfig {

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource druidDataSource(){return new DruidDataSource();
    }
}
import com.alibaba.druid.pool.DruidDataSource;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

@Configuration
public class DataSourceProxyConfig {//@Value("${mybatis-plus.mapper-locations}")
    //private String mapperLocations;

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource druidDataSource(){return new DruidDataSource();
    }

    //@Primary
    //@Bean
    //public DataSourceProxy dataSourceProxy(DataSource dataSource) {//    return new DataSourceProxy(dataSource);
    //}

    // 代理 mybaits-plus
    //@Bean
    //public MybatisSqlSessionFactoryBean sqlSessionFactoryBean(DataSource dataSourceProxy) throws Exception {//    MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
    //    sqlSessionFactoryBean.setDataSource(dataSourceProxy);
    //    sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
    //    sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
    //    return sqlSessionFactoryBean;
    //}

    // 代理 mybatis / mybatis-config
    //@Bean
    //public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {//    SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
    //    sqlSessionFactoryBean.setDataSource(dataSourceProxy);
    //    sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
    //    sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
    //    return sqlSessionFactoryBean.getObject();
    //}

}

四、事务应用

@GlobalTransactional能够自定义名称和超时工夫,默认超时工夫:60 秒

@GlobalTransactional(timeoutMills = 60000,rollbackFor = Exception.class)
public Result submitById(ReportDispatchingCommand command, UserLoginDTO userLoginDTO){// 业务逻辑}
  • 注意事项:业务流程中如果波及到 先插入 批改插入数据 的流程,要保障所有的 改库动作 都加 @GlobalTransactional 注解

像这样,test2的接口加了 @GlobalTransactionaltest3 的接口没有加 @GlobalTransactional
如果执行程序是: test2执行更新 -> test3执行更新 -> test2回滚
这样就会因为 test3 执行了更新,导致回滚 test 的时候,seata认为有脏数据,就不回滚了。
要防止这种状况就须要给 test3 的办法也加上 @GlobalTransactional的注解

// service A
@GetMapping("test2")
@GlobalTransactional
public void test2() {bService.doInsertAndUpdate();
}

// service A
// 留神这里没有加 @GlobalTransactional
@GetMapping("test3")
public void test3() {bService.doInsertAndUpdate();
}

五、注意事项

1. pom.xml版本抵触问题

spring-cloud-alibaba-seata中主动依赖了seata-spring-boot-starter。然而版本对应不上,所以要解决版本不兼容问题

2. Could not register branch into global session异样

  • 异步调用不不退出全局事务,异步改为 MQ 调用
这里很重要
异样:Could not register branch into global session xid = status = Rollbacked(还有              Rollbacking、AsyncCommitting 等等二阶段状态)while expecting Begin

形容:分支事务注册时,全局事务状态需是一阶段状态 begin,非 begin 不容许注册。属于 seata 框架层面失常的解决,用户能够从本身业务层面解决。呈现场景(可持续补充)1. 分支事务是异步,全局事务无奈感知它的执行进度,全局事务已进入二阶段,该异步分支才来注册
2. 服务 a rpc 服务 b 超时(dubbo、feign 等默认 1 秒超时),a 上抛异样给 tm,tm 告诉 tc 回滚,然而 b 还是收到了申请(网络提早或 rpc 框架重试),而后去 tc 注册时发现全局事务已在回滚
3. tc 感知全局事务超时(@GlobalTransactional(timeoutMills = 默认 60 秒)),被动变更状态并告诉各分支事务    回滚,此时有新的分支事务来注册
# 设置 RPC 调用的超时工夫
ribbon:
  # 连贯超时
  ConnectTimeout: 5000
  # 响应超时  设置超时工夫设置长一点。应为回滚须要工夫
  ReadTimeout: 10000

3. @GlobalTransactional 全局事务不失效

事务是基于接口或基于类的代理被创立的。事务注解,没有加在办法入口,事务无奈失效

@ Service
public class Service1 {
  @ Autowired
  private Service1 self;

  public void m1() {
      // 类外部间接调用,不会触发 AOP
      //this.m2(); 
      // 通过保护一个本人的 bean 来调用,从而使事务失效
      self.m2();}

  @ GlobalTransactional
  publiv void m2() {}
}

4. 引入 Seata后,spring本地事务注解 @Transactional 有效

  • 是否所有 sql 都走 DataSourceProxy 代理;而 DataSourceProxy 代理是间接 commit 代码;所以只能应用 @GlobalTransactional 能力执行 undo 回滚
  • 所以 Seata 就不反对 spring @Transactional的本地事务了?

​ 待验证。。。

六、原理剖析

Spring 事务个性

  • 事务的隔离级别:是指若干个并发的事务之间的隔离水平
1. @Transactional(isolation = Isolation.READ_UNCOMMITTED):读取未提交数据(会呈现脏读, 不可反复读) 根本不应用
2. @Transactional(isolation = Isolation.READ_COMMITTED):读取已提交数据(会呈现不可反复读和幻读)
3. @Transactional(isolation = Isolation.REPEATABLE_READ):可反复读(会呈现幻读)
4. @Transactional(isolation = Isolation.SERIALIZABLE):串行化
  • 事务流传行为:如果在开始以后事务之前,一个事务上下文曾经存在,此时有若干选项能够指定一个事务性办法的执行行为
1. TransactionDefinition.PROPAGATION_REQUIRED:   如果以后存在事务,则退出该事务;如果以后没有事务,则创立一个新的事务。这是默认值。2. TransactionDefinition.PROPAGATION_REQUIRES_NEW:   创立一个新的事务,如果以后存在事务,则把以后事务挂起。3. TransactionDefinition.PROPAGATION_SUPPORTS:   如果以后存在事务,则退出该事务;如果以后没有事务,则以非事务的形式持续运行。4. TransactionDefinition.PROPAGATION_NOT_SUPPORTED:   以非事务形式运行,如果以后存在事务,则把以后事务挂起。5. TransactionDefinition.PROPAGATION_NEVER:   以非事务形式运行,如果以后存在事务,则抛出异样。6. TransactionDefinition.PROPAGATION_MANDATORY:   如果以后存在事务,则退出该事务;如果以后没有事务,则抛出异样。7. TransactionDefinition.PROPAGATION_NESTED:   如果以后存在事务,则创立一个事务作为以后事务的嵌套事务来运行;如果以后没有事务,则该取值等价于 TransactionDefinition.PROPAGATION_REQUIRED。

Seata 框架设计

Seata 框架的次要角色

  • (Transaction ID)XID 全局惟一事务 ID
  • (Transaction Manager) TM 事务管理器
  • (Resource Manager) RM 资源管理器
  • (Transaction Coordinator) TC 事务协调器

事务管理器:外围模块为全局事务管理器,次要负责:全局事务的开启、提交、回滚。

资源管理器:执行具体的分支事务,由对应的执行器去执行。包含重做日志,SQL 的执行;

​ 理论的 SQL 由连贯代理 ConnectionProxy 执行,连贯代理 ConnectProxy 会注册事务分支到事务 协调器 TC,执行完理论 SQL 后,上报分支事务的状态给事务协调器 TC。

事务协调器:依据事务管理器 TM 的全局事务的开启、提交、回滚申请,创立全局事务,驱动分支事务的提交 和回滚,并保护全局事务和分支事务的状态。

  • 执行流程

全局事务与分支事务

  • 全局事务:通过 XID(全局惟一事务 ID) 来标识

xid = 10.0.18.202:8905:138030304412573696

  • 分支事务:通过分支事务 ID 和资源 ID 来标识,每个分支事务都有 XID 来标识属于哪个全局事务

branchId = 138030306228707329

Register branch successfully, xid = 10.0.18.202:8905:138030304412573696, branchId = 138030306228707329, resourceId = jdbc:mysql://10.0.18.200:3306/claim_auprocess ,lockKeys = c_au_image_detail_info:340489328012288
[2m2021-05-18 13:23:09.581[0;39m [32m INFO[0;39m [2m---[0;39m [2m[LoggerPrint_1_1][0;39m [36mi.s.c.r.p.server.BatchLogHandler        [0;39m [2m:[0;39m SeataMergeMessage xid=10.0.18.202:8905:138030304412573696,branchType=AT,resourceId=jdbc:mysql://10.0.18.200:3306/claim_auprocess,lockKey=c_au_image_detail_info:340489328012289

1. Seata-clientSeata-server 是如何通信的

1. seata-servaer 向注册核心注册
2. seata-client 向注册核心注册
3. seata-client 通过注册核心返回的 seata-server 服务列表,找到 seata-server
  • 注册核心的分类
1. file 
  文件类型:用于做概念验证,目标是通过配置文件,让 client 疾速找到 seata-server, 从而免去依赖第三方注册核心

2. 非 file 类型
   nacos
   apollo
   zookeeper
   redis
   etcd 
   ....

2. Seata 分布式事务的根底 DataSource

  • 所有的外围都在 io.seata.rm.datasource.DataSourceProxy
  • 从 0.9 版本后,曾经实现主动数据代理注入,默认开启。

    seata:
    # 开启 Springboot 主动拆卸 默认 true
    enabled: true
    # 开启数据源主动代理 默认 true
    enable-auto-data-source-proxy: true
咱们只有把 DataSourceProxy 注册成默认的 java.sql.DataSource 实现,并提供给其它框架 (mybatis\jdbctemplate) 拆卸,就达到目标。所以 client 端的所有配置都围绕着把 DataSourceProxy 注册成默认的  java.sql.DataSource,或者把数据库拜访框架的 Datasource 配置成 DataSourceProxy 来进行的。

3. 锁管理器 LockManager

锁管理器的顶级接口:LockManager,锁管理器必须实现该接口。

Lock_key 格局:数据库资源 ^^^ 表名 ^^^ 主键 ID

示例:jdbc:mysql://10.0.17.xxx:3306/xxxx^^^act_ge_bytearray^^^341269355654272

七、参考文档

  • 参数配置文档

spring cloud seata 参数配置

Seata 解析 -TC 端 file.conf 文件各配置作用总结

  • 回滚失败

被调用方存在两个本地事务,在调用方抛异样时,被调用方反复呈现 Branch Rollbacked faild

压力测试一段时间后,tm 端 rpc 调用 server 端超时 RPC timeout:RPC timeout

  • 源码剖析

Seata 源码解析

  • seata springboot 整合

微服务分布式事务解决方案 -springboot 整合分布式 seata1.3.0

分布式事务两阶段提交 Eureka+Seata 计划

springcloud-nacos-seata 实现分布式事务

  • 我的项目异样问题

Could not commit JDBC transaction; nested exception is io.seata.rm.datasource.exec.LockConflictException: get global lock fail

  • 狐疑是竞争锁导致的。所以批改 seata 服务的参数

    • client.rm.lock.retryInterval 20 校验或占用全局锁重试距离 默认 10,单位毫秒
    • client.rm.lock.retryTimes 60 校验或占用全局锁重试次数 默认 30

seata 提交全局事务失败 Unable to commit against JDBC Connection

  • 博客分享

又见分布式事务之阿里开源 Seata

  • 事务不失效

seata 分布式事务不失效

本文由博客一文多发平台 OpenWrite 公布!

正文完
 0