乐趣区

关于后端:多数据源事务处理涉及分布式事务

在作者之前的 十二条后端开发教训分享,纯干货 文章中介绍的 优雅得 Springboot + mybatis 配置多数据源形式 里有很多小伙伴在评论区留言询问多个数据源同时在一个办法中应用时,事务是否会失常无效,这里作者 实践 + 实际 给大家解答一波,老规矩,附作者 github 地址:

  • https://github.com/wayn111

一. 数据源跨库然而不跨 MySql 实例

这个模式就是数据源在同一个 MySQL 下,然而 jdbc-url 上的数据库配置不同,波及多个数据库时,如果办法中产生异样,只有开启事务的数据源会产生回滚,其余数据源不会回滚。看到这里可能有点蛊惑,什么是 只有开启事务的数据源会产生回滚,其余数据源不会回滚?

上面给出代码验证:

主数据源配置

@Slf4j
@EnableTransactionManagement
@EnableAspectJAutoProxy
@Configuration
@MapperScan(basePackages = "ltd.newbee.mall.core.dao", sqlSessionFactoryRef = "masterSqlSessionFactory")
public class Db1DataSourceConfig {

    @Primary
    @Bean
    @ConfigurationProperties("spring.datasource.druid.master")
    public DataSource masterDataSource(DruidProperties druidProperties) {DruidDataSource build = DruidDataSourceBuilder.create().build();
        return druidProperties.dataSource(build);
    }

    /**
     * @param datasource 数据源
     * @return SqlSessionFactory
     * @Primary 默认 SqlSessionFactory
     */
    @Primary
    @Bean(name = "masterSqlSessionFactory")
    public SqlSessionFactory masterSqlSessionFactory(@Qualifier("masterDataSource") DataSource datasource,
                                                     Interceptor interceptor) throws Exception {MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
        bean.setDataSource(datasource);
        // mybatis 扫描 xml 所在位置
        bean.setMapperLocations(new PathMatchingResourcePatternResolver()
                .getResources("classpath*:mapper/*.xml"));
        bean.setTypeAliasesPackage("ltd.**.core.entity");
        bean.setPlugins(interceptor);
        GlobalConfig globalConfig = new GlobalConfig();
        GlobalConfig.DbConfig dbConfig = new GlobalConfig.DbConfig();
        dbConfig.setLogicDeleteField("isDeleted");
        dbConfig.setLogicDeleteValue("1");
        dbConfig.setLogicNotDeleteValue("0");
        globalConfig.setDbConfig(dbConfig);
        bean.setGlobalConfig(globalConfig);
        log.info("masterDataSource 配置胜利");
        return bean.getObject();}

    @Primary
    @Bean(name = "masterTransactionManager")
    public DataSourceTransactionManager masterTransactionManager(@Qualifier("masterDataSource") DataSource dataSource) {return new DataSourceTransactionManager(dataSource);
    }

}

从数据源配置

@Slf4j
@ConditionalOnProperty(value = "transactional.mode", havingValue = "seata")
@EnableTransactionManagement
@EnableAspectJAutoProxy
@Configuration
@MapperScan(basePackages = "ltd.newbee.mall.slave.dao", sqlSessionFactoryRef = "slaveSqlSessionFactory")
public class Db2DataSourceConfig {

    @Bean
    @ConfigurationProperties("spring.datasource.druid.slave")
    public DataSource slaveDataSource(DruidProperties druidProperties) {DruidDataSource build = DruidDataSourceBuilder.create().build();
        return druidProperties.dataSource(build);
    }


    /**
     * @param datasource 数据源
     * @return SqlSessionFactory
     * @Primary 默认 SqlSessionFactory
     */
    @Bean(name = "slaveSqlSessionFactory")
    public SqlSessionFactory slaveSqlSessionFactory(@Qualifier("slaveDataSource") DataSource datasource,
                                                    Interceptor interceptor) throws Exception {MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
        bean.setDataSource(datasource);
        // mybatis 扫描 xml 所在位置
        bean.setMapperLocations(new PathMatchingResourcePatternResolver()
                .getResources("classpath*:slavemapper/*.xml"));
        bean.setTypeAliasesPackage("ltd.**.slave.entity");
        bean.setPlugins(interceptor);
        GlobalConfig globalConfig = new GlobalConfig();
        GlobalConfig.DbConfig dbConfig = new GlobalConfig.DbConfig();
        dbConfig.setLogicDeleteField("isDeleted");
        dbConfig.setLogicDeleteValue("1");
        dbConfig.setLogicNotDeleteValue("0");
        globalConfig.setDbConfig(dbConfig);
        bean.setGlobalConfig(globalConfig);
        log.info("slaveDataSource 配置胜利");
        return bean.getObject();}
    
    @Bean(name = "slaveTransactionManager")
    public DataSourceTransactionManager slaveTransactionManager(@Qualifier("slaveDataSource") DataSource dataSource) {return new DataSourceTransactionManager(dataSource);
    }

}

划重点 - 上述代码在每个数据源中都配置了 DataSourceTransactionManager(事务管理器),并且在主配置中增加 @Primary 注解,示意默认事务管理器优先应用主数据源的事务管理器。 上面给出测试代码:

/**
 *  Springboot 测试类
 */
@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class MultiDataSourceTest {
    @Autowired
    private MultiDataService multiDataService;
    @Test
    public void testRollback() {multiDataService.testRollback();
    }
}
/**
 *  MultiDataService 实现类
 */
@Slf4j
@Service
public class MultiDataServiceImpl implements MultiDataService {
    @Autowired
    private TbTable1Service tbTable1Service;
    @Autowired
    private TbTable2Service tbTable2Service;
    @Autowired
    private PlatformTransactionManager transactionManager;
    @Override
    public void testRollback() {DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
        TransactionStatus transaction = transactionManager.getTransaction(transactionDefinition);
        try {TbTable1 tbTable1 = new TbTable1();
            tbTable1.setName("test1");
            // 插入 table1 表
            boolean save1 = tbTable1Service.save(tbTable1);
            TbTable2 tbTable2 = new TbTable2();
            tbTable2.setName("test2");
            // 插入 table2 表
            boolean save2 = tbTable2Service.save(tbTable2);
            int i = 1 / 0;
            transactionManager.commit(transaction);
            Assert.isTrue(save1 && save2);
        } catch (Exception e) {log.info(e.getMessage(), e);
            transactionManager.rollback(transaction);
        }
    }
}

执行后果:table1 表回滚胜利,table2 表回滚失败 。由此后果,对于 只有开启事务的数据源会产生回滚,其余数据源不会回滚? 咱们的解释就是 Spring 中默认应用的事务管理器是应用主数据源配置还是从数据源配置由咱们通过 @Primary 决定,当咱们把 @Primary 切换在从数据源配置上,执行后果:table2 表回滚胜利,table1 表回滚失败。那怎么解决这个问题?

当波及到跨库或者跨 MySQL 实例,想要保障事务操作,咱们这里先给出 XA 事务解决方案。附 XA 事务的阐明:

XA 是由 X/Open 组织提出的分布式事务标准,XA 标准次要定义了事务协调者(Transaction Manager)和资源管理器(Resource Manager)之间的接口。

事务协调者(Transaction Manager),因为 XA 事务是基于两阶段提交协定的,所以须要有一个协调者,来保障所有的事务参与者都实现了筹备工作,也就是 2PC 的第一阶段。如果事务协调者收到所有参与者都筹备好的音讯,就会告诉所有的事务都能够提交,也就是 2PC 的第二阶段。

资源管理器(Resource Manager),负责管制和治理理论资源,比方数据库。

(划重点)XA 的 MySQL 实现使 MySQL 服务器可能充当资源管理器,在全局事务中解决 XA 事务。连贯到 MySQL 服务器的客户端程序充当事务协调者

XA 事务的执行流程

XA 事务是两阶段提交的一种实现形式,依据 2PC 的标准,XA 将一次事务宰割成了两个阶段,即 Prepare 和 Commit 阶段。

Prepare 阶段,TM 向所有 RM 发送 prepare 指令,RM 承受到指令后,执行数据批改和日志记录等操作,而后返回能够提交或者不提交的音讯给 TM。如果事务协调者 TM 收到所有参与者都筹备好的音讯,会告诉所有的事务提交,而后进入第二阶段。

Commit 阶段,TM 承受到所有 RM 的 prepare 后果,如果有 RM 返回是不可提交或者超时,那么向所有 RM 发送 Rollback 命令;如果所有 RM 都返回能够提交,那么向所有 RM 发送 Commit 命令,实现一次事务操作。

上面给出两种基于 XA 事务的解决方案:

  • Springboot 我的项目中能够应用 jta,实现对 XA 协定的反对,毛病就是 jta 须要革新数据源配置
  • Springboot 我的项目引入 seataseata 反对 XA 协定,且引入 seata-spring-boot-starter 依赖对业务无侵入,毛病须要引入 seata-server 升高了零碎可用性

Springboot 我的项目中能够启用 jta

  1. 引入 spring-boot-starter-jta-atomikos

    <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-jta-atomikos</artifactId>
    </dependency>
  2. 批改 主从数据源 DataSource 配置,进行包装增加 XA 数据源反对,如下;

    
     @Primary
     @Bean
     @ConfigurationProperties("spring.datasource.druid.master")
     public DataSource dataSource(DruidProperties druidProperties) {DruidXADataSource dataSource = druidProperties.dataSource(new DruidXADataSource());
         dataSource.setUrl("jdbc:mysql://localhost:3306/xxx?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8");
         dataSource.setUsername("root");
         dataSource.setPassword("");
         dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
         AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
         atomikosDataSourceBean.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
         atomikosDataSourceBean.setUniqueResourceName("master-xa");
         atomikosDataSourceBean.setXaDataSource(dataSource);
         return atomikosDataSourceBean;
     }
  3. 增加 JtaTransactionManager

    @Bean
    public JtaTransactionManager transactionManager() throws Exception {JtaTransactionManager transactionManager = new JtaTransactionManager();
     UserTransactionManager userTransactionManager = new UserTransactionManager();
     userTransactionManager.setForceShutdown(true);
     userTransactionManager.setTransactionTimeout(3000);
     transactionManager.setUserTransaction(userTransactionManager);
     transactionManager.setAllowCustomIsolationLevels(true);
     return transactionManager;
    }
  4. 实现测试,代码如下:

    /**
     *  Springboot 测试类
     */
    @Slf4j
    @SpringBootTest
    @RunWith(SpringRunner.class)
    public class MultiDataSourceTest {
     @Autowired
     private MultiDataService multiDataService;
     @Test
     public void jtaTestRollback() {multiDataService.jtaTestRollback();
     }
    }
    /**
     *  MultiDataService 实现类
     */
    @Slf4j
    @Service
    public class MultiDataServiceImpl implements MultiDataService {
     @Autowired
     private TbTable1Service tbTable1Service;
     @Autowired
     private TbTable2Service tbTable2Service;
     @Autowired
     private JtaTransactionManager jtaTransactionManager;
     @Override
     public void jtaTestRollback() {DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
         TransactionStatus transaction = jtaTransactionManager.getTransaction(transactionDefinition);
         try {TbTable1 tbTable1 = new TbTable1();
             tbTable1.setName("test1");
             boolean save1 = tbTable1Service.save(tbTable1);
             TbTable2 tbTable2 = new TbTable2();
             tbTable2.setName("test2");
             boolean save2 = tbTable2Service.save(tbTable2);
             int i = 1 / 0;
             jtaTransactionManager.commit(transaction);
             Assert.isTrue(save1 && save2);
         } catch (Exception e) {log.info(e.getMessage(), e);
             jtaTransactionManager.rollback(transaction);
         }
     }
    }

    能够看到咱们应用的是 JtaTransactionManager, 执行后果:table1 表回滚胜利,table2 表回滚胜利。验证 OK

引入 seata,增加 XA 协定反对

  1. 下载安装启动 seata-server,这里给出官网教程:https://seata.io/zh-cn/docs/o…
  2. 在 Springboot 中引入 seata 最新依赖

    <dependency>
     <groupId>io.seata</groupId>
     <artifactId>seata-spring-boot-starter</artifactId>
     <version>1.5.2</version>
    </dependency>
  3. 在 yml 文件中增加 seata 配置

    seata:
      config:
     type: file
      registry:
     type: file
      application-id: newbeemall # Seata 利用编号,默认为 ${spring.application.name}
      tx-service-group: newbeemall-group # Seata 事务组编号,用于 TC 集群名
      # 服务配置项,对应 ServiceProperties 类
      service:
     # 虚构组和分组的映射
     vgroup-mapping:
       newbeemall-group: default
     # 分组和 Seata 服务的映射
     grouplist:
       default: 127.0.0.1:8091
      data-source-proxy-mode: XA
      enabled: true
  4. 实现测试,代码如下:

    /**
     *  Springboot 测试类
     */
    @Slf4j
    @SpringBootTest
    @RunWith(SpringRunner.class)
    public class MultiDataSourceTest {
     @Autowired
     private MultiDataService multiDataService;
     @Test
     public void seataTestRollback() {multiDataService.seataTestRollback();
     }
    }
    /**
     *  MultiDataService 实现类
     */
    @Slf4j
    @Service
    public class MultiDataServiceImpl implements MultiDataService {
     @Autowired
     private TbTable1Service tbTable1Service;
     @Autowired
     private TbTable2Service tbTable2Service;
     @GlobalTransactional
     @Override
     public void seataTestRollback() {log.info("以后 XID: {}", RootContext.getXID());
         TbTable1 tbTable1 = new TbTable1();
         tbTable1.setName("test1");
         boolean save1 = tbTable1Service.save(tbTable1);
         TbTable2 tbTable2 = new TbTable2();
         tbTable2.setName("test2");
         boolean save2 = tbTable2Service.save(tbTable2);
         int i = 1 / 0;
     }
    }

    如上代码,应用 seata 时须要启用 @GlobalTransactional 注解,并且在事务中传递 XIDRootContext.getXID()),执行后果:table1 表回滚胜利,table2 表回滚胜利。验证 OK

二. 数据源散布在不同 MySql 实例

当数据源散布在不同 MySql 实例时,这时候其实曾经进入分布式事务的领域,由上可知,XA 事务能够解决分布式环境下事务问题,也就是说上述最初两种解决方案都能够解决分布式事务问题,然而理论应用过程中,咱们倡议应用 seata,理由是他不仅反对 XA 事务还反对 AT、Saga、TCC事务模型。引入 seata 官网介绍

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简略易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

总结

对于多数据源事务的问题,不论跨不跨库其实都属于分布式事务的问题。举荐应用 seata 解决。

实际代码放在 newbeemall 我的项目:https://github.com/wayn111/ne… 分支下

欢送大家点赞、关注、评论,想要跟作者沟通技术问题的话能够加我微信【waynaqua】,欢送大家前来交换。

退出移动版