关于springboot:SpringBoot多数据源事务解决方案

5次阅读

共计 6672 个字符,预计需要花费 17 分钟才能阅读完成。

背景

之前有文章提供了 springboot 多数据源动静注册切换的整合计划,在后续应用过程中,发现在事务管制中有多种 bug 产生,决定对此问题进行剖析与解决

前情提要

多数据源切换流程结构图如下所示,蕴含几个组成元素

  • 自定义的数据源配置解决,通过 DruidDataSource 对象动静注册到零碎中
  • 自定义数据源标识注解与切面
  • 数据源切换时的上下文线程变量持有者
  • 自定义 AbstractRoutingDataSource,实现数据源路由切换

问题剖析

在 Controller 退出 @Transitional 注解后,数据源切换会生效,只会操作主库,查问材料后解决方案是将切面的 Order 设置为 - 1 使之执行程序在事务管制拦挡之前,批改后证实无效,然而后续再次切换别的库或者进行主库操作有效,拿到的 connection 始终是第一次切换后的库对应的连贯

剖析代码后发现 AbstractRoutingDataSource 只负责提供 getConnection 这一层级,然而后续对 connection 的操作无奈跟踪,我的项目框架 mybatis 和 jdbcTemplate 混合应用,后续操作在 spring 层面对于事务 / 数据源 / 连贯这三者的逻辑层面操作是雷同的,jdbcTemplate 代码较为简单,所以以此为切入点进一步剖析

通过断点调试会发现 sql 语句的执行最终会落到 execute 办法,办法中开始就是通过 DataSourceUtils.getConnection 获取连贯,这里就是咱们须要追踪的中央,点进去发现跳转到 doGetConnection 办法,这外面就是咱们须要剖析的具体逻辑

第一行获取的 ConnectionHolder 就是以后事务对应的线程持有对象,因为咱们晓得,事务的实质就是办法外部的 sql 执行时对应的是同一个数据库 connection,对于不同的嵌套业务办法,惟一雷同的是以后线程 ID 统一,所以咱们将 connection 与线程绑定就能够实现事务管制

点进 getResource 办法,发现 dataSource 是作为一个 key 去一个 Map 汇合里取出对应的 contextHolder

到这里咱们如同发现点什么,之前对 jdbcTemplatechu 实例化设定数据源间接赋值自定义的 DynamicDataSource,所以在事物中每次咱们获取 connection 根据就是 DynamicDataSource 这个对象作为 key,所以每次都会一样了!!

    @Bean
    public JdbcTemplate jdbcTemplate(){
        JdbcTemplate jdbcTemplate = null;
        try{jdbcTemplate = new JdbcTemplate(dynamicDataSource());
        }catch (Exception e){e.printStackTrace();
        }
        return jdbcTemplate;
    }

后续针对 mybatis 查找了相干材料,事务管制默认实现是 SpringManagedTransaction,源码查看后发现了相熟的 DataSourceUtils.getConnection,证实咱们的剖析方向是正确的

解决方案

jdbcTemplate

自定义操作类继承 jdbcTemplate 重写 getDataSource,将咱们获取的 DataSource 这个对应的 key 指定到理论切换库的数据源对象上即可

public class DynamicJdbcTemplate extends JdbcTemplate {
    @Override
    public DataSource getDataSource() {DynamicDataSource router =  (DynamicDataSource) super.getDataSource();
        DataSource acuallyDataSource = router.getAcuallyDataSource();
        return acuallyDataSource;
    }

    public DynamicJdbcTemplate(DataSource dataSource) {super(dataSource);
    }
}
    public DataSource getAcuallyDataSource() {Object lookupKey = determineCurrentLookupKey();
        if (null == lookupKey) {return this;}
        DataSource determineTargetDataSource = this.determineTargetDataSource();
        return determineTargetDataSource == null ? this : determineTargetDataSource;
    }

mybatis

自定义事务操作类,实现 Transaction 接口,替换 TransitionFactory,这里的实现与网上的解决方案略有不同,网上是定义三个变量,datasource(动静数据源对象)/connection(主连贯)/connections(从库连贯),然而框架须要 mybatis 和 jdbctemplate 进行对立,mybatis 是从 connection 层面管制,jdbctemplate 是从 datasource 层面管制,所以全副应用键值对存储

public class DynamicTransaction implements Transaction {
    private final DynamicDataSource dynamicDataSource;
    private ConcurrentHashMap<String, DataSource> dataSources;
    private ConcurrentHashMap<String, Connection> connections;
    private ConcurrentHashMap<String, Boolean> autoCommits;
    private ConcurrentHashMap<String, Boolean> isConnectionTransactionals;

    public DynamicTransaction(DataSource dataSource) {this.dynamicDataSource = (DynamicDataSource) dataSource;
        dataSources = new ConcurrentHashMap<>();
        connections = new ConcurrentHashMap<>();
        autoCommits = new ConcurrentHashMap<>();
        isConnectionTransactionals = new ConcurrentHashMap<>();}

    public Connection getConnection() throws SQLException {String dataBaseID = DBContextHolder.getDataSource();
        if (!dataSources.containsKey(dataBaseID)) {DataSource dataSource = dynamicDataSource.getAcuallyDataSource();
            dataSources.put(dataBaseID, dataSource);
        }
        if (!connections.containsKey(dataBaseID)) {Connection connection = DataSourceUtils.getConnection(dataSources.get(dataBaseID));
            connections.put(dataBaseID, connection);
        }
        if (!autoCommits.containsKey(dataBaseID)) {boolean autoCommit = connections.get(dataBaseID).getAutoCommit();
            autoCommits.put(dataBaseID, autoCommit);
        }
        if (!isConnectionTransactionals.containsKey(dataBaseID)) {boolean isConnectionTransactional = DataSourceUtils.isConnectionTransactional(connections.get(dataBaseID), dataSources.get(dataBaseID));
            isConnectionTransactionals.put(dataBaseID, isConnectionTransactional);
        }
        return connections.get(dataBaseID);
    }


    public void commit() throws SQLException {for (String dataBaseID : connections.keySet()) {Connection connection = connections.get(dataBaseID);
            boolean isConnectionTransactional = isConnectionTransactionals.get(dataBaseID);
            boolean autoCommit = autoCommits.get(dataBaseID);
            if (connection != null && !isConnectionTransactional && !autoCommit) {connection.commit();
            }
        }
    }

    public void rollback() throws SQLException {for (String dataBaseID : connections.keySet()) {Connection connection = connections.get(dataBaseID);
            boolean isConnectionTransactional = isConnectionTransactionals.get(dataBaseID);
            boolean autoCommit = autoCommits.get(dataBaseID);
            if (connection != null && !isConnectionTransactional && !autoCommit) {connection.rollback();
            }
        }
    }

    public void close() {for (String dataBaseID : connections.keySet()) {Connection connection = connections.get(dataBaseID);
            DataSource dataSource = dataSources.get(dataBaseID);
            DataSourceUtils.releaseConnection(connection, dataSource);
        }
    }

    public Integer getTimeout() {return null;}
}
public class DynamicTransactionFactory extends SpringManagedTransactionFactory {
    @Override
    public Transaction newTransaction(DataSource dataSource, TransactionIsolationLevel level, boolean autoCommit) {return new DynamicTransaction(dataSource);
    }
}
    @Bean
    public SqlSessionFactory sqlSessionFactory() throws Exception {//SpringBootExecutableJarVFS.addImplClass(SpringBootVFS.class);
        final PackagesSqlSessionFactoryBean sessionFactory = new PackagesSqlSessionFactoryBean();
        sessionFactory.setDataSource(dynamicDataSource());
        sessionFactory.setTransactionFactory(new DynamicTransactionFactory());
        sessionFactory.setMapperLocations(new PathMatchingResourcePatternResolver()
                .getResources("classpath*:mybatis/**/*Mapper.xml"));
        // 敞开驼峰转换,避免带下划线的字段无奈映射
        sessionFactory.getObject().getConfiguration().setMapUnderscoreToCamelCase(false);
        return sessionFactory.getObject();}

事务管理器

事务中库动静切换的问题解决了,然而只针对了主库事务,如果从库操作也须要事务的个性该如何操作呢,这里就须要在注册数据源时针对每个数据源手动注册一个事务管理器

主库是固定的,能够间接在配置 Bean 中申明 masterTransitionManage 并设置为默认

    @Bean("masterTransactionManager")
    @Primary
    public DataSourceTransactionManager MasterTransactionManager() {return new DataSourceTransactionManager(masterDataSource());
    }

从库的事务管理器咱们能够拿到 dataSource 初始化对象,而后向 Spring 容器注册单例对象

 public static void registerSingletonBean(String beanName, Object singletonObject) {
        // 将 applicationContext 转换为 ConfigurableApplicationContext
        ConfigurableApplicationContext configurableApplicationContext = (ConfigurableApplicationContext) context;
        // 获取 BeanFactory
        DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) configurableApplicationContext.getAutowireCapableBeanFactory();
        if(configurableApplicationContext.containsBean(beanName)) {defaultListableBeanFactory.destroySingleton(beanName);
        }
        // 动静注册 bean.
        defaultListableBeanFactory.registerSingleton(beanName, singletonObject);

    }
 SpringBootBeanUtil.registerSingletonBean(key + "TransactionManager", new DataSourceTransactionManager(druidDataSource));

在应用时只有对 @Transitional 注解指定 transitionFactory 名字即可

总结

解决这个问题破费了三天的工夫,查了很多材料和解决方案,很多都是只有参考性或者特异性的,所以还是需把握问题的外围加上局部源码的追踪,比方本文中须要清晰的意识到 Transition-Connection-LocalThread 三者的关联关系,能力找对排查的方向

后续实现了集成基于 JMS(atomikos)的 XA 两段式提交的全局事务,应用 DruidXADataSrouce 呈现了 druid 和 atomikos 两者线程池交互呈现泄露的状况放弃了,给小伙伴们避个坑

正文完
 0