背景

框架之前实现了多数据源的动静切换及事务的解决,想更近一步提供一个简略的跨库事务处理性能,通过网上的搜寻调研,大抵有XA事务/SEGA事务/TCC事务等计划,因为业务次要波及政府及企业且并发量不大,所以采纳XA事务,尽管性能有所损失,然而能够保证数据的强一致性

方案设计

针对注册的数据源拷贝一份用于XA事务,使得本地事务和XA全局事务互相独立可抉择的应用

Maven配置

引入atomikos第三方组件

        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-jta-atomikos</artifactId>        </dependency>

注册XA数据源

应用Druid连接池,须要应用DruidXADataSource数据源对象,再应用AtomikosDataSourceBean进行包装

注册数据源时针对同一个连贯注册两份,一份失常数据源,一份用于XA事务的数据源,数据源标识辨别并关联

因为spring默认注册了XA事务管理器后,所有事务操作不再走本地事务,咱们通过切换不同的数据源决定走本地事务还是XA事务

 //主数据源xa模式    @Bean    @Qualifier("masterXADataSource")    public DataSource masterXADataSource() {        DruidXADataSource datasource = new DruidXADataSource();        if(driverClassName.equals("com.mysql.cj.jdbc.Driver")){            if(!dbUrl.contains("useOldAliasMetadataBehavior")){                dbUrl += "&useOldAliasMetadataBehavior=true";            }            if(!dbUrl.contains("useAffectedRows")){                dbUrl += "&useAffectedRows=true";            }        }        datasource.setUrl(this.dbUrl);        datasource.setUsername(username);        datasource.setPassword(password);        datasource.setDriverClassName(driverClassName);        //configuration        datasource.setInitialSize(1);        datasource.setMinIdle(3);        datasource.setMaxActive(20);        datasource.setMaxWait(60000);        datasource.setTimeBetweenEvictionRunsMillis(60000);        datasource.setMinEvictableIdleTimeMillis(60000);        datasource.setValidationQuery("select 'x'");        datasource.setTestWhileIdle(true);        datasource.setTestOnBorrow(false);        datasource.setTestOnReturn(false);        datasource.setPoolPreparedStatements(true);        datasource.setMaxPoolPreparedStatementPerConnectionSize(20);        datasource.setLogAbandoned(false); //移除泄露连贯产生是是否记录日志        try {            datasource.setFilters("stat,slf4j");        } catch (SQLException e) {            logger.error("druid configuration initialization filter", e);        }        datasource.setConnectionProperties("druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000");//connectionProperties);        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();        atomikosDataSourceBean.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");        atomikosDataSourceBean.setUniqueResourceName("master-xa");        atomikosDataSourceBean.setXaDataSource(datasource);        atomikosDataSourceBean.setPoolSize(5);        atomikosDataSourceBean.setMaxPoolSize(20);        atomikosDataSourceBean.setTestQuery("select 1");        return atomikosDataSourceBean;    }

注册XA事务管理器

应用spring内置的JtaTransactionManager事务管理器对象,设置AllowCustomIsolationLevels为true,否则指定自定义事务隔离级别会报错

    //xa模式全局事务管理器    @Bean(name = "jtaTransactionManager")    public PlatformTransactionManager transactionManager() throws Throwable {        UserTransactionManager userTransactionManager = new UserTransactionManager();        UserTransaction userTransaction = new UserTransactionImp();        JtaTransactionManager jtaTransactionManager =  new JtaTransactionManager(userTransaction, userTransactionManager);        jtaTransactionManager.setAllowCustomIsolationLevels(true);        return jtaTransactionManager;    }

定义XA事务切面

自定义注解@GlobalTransactional并定义对应切面,应用指定注解时在ThreadLocal变量值进行标识,组合

@Transactional注解指定XA事务管理器,在切换数据原时判断以后是否在XA事物中,从而切换不同的数据源

@Target({ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Inherited@Documented@Transactional(rollbackFor = Exception.class,isolation = Isolation.READ_UNCOMMITTED,transactionManager = "jtaTransactionManager")public @interface GlobalTransactional {}
@Aspect@Component@Order(value = 99)public class GlobalTransitionAspect {    private static Logger logger = LoggerFactory.getLogger(GlobalTransitionAspect.class);    @Autowired    private DynamicDataSource dynamicDataSource;    /**     * 切面点 指定注解     */    @Pointcut("@annotation(com.code2roc.fastkernel.datasource.GlobalTransactional) " +            "|| @within(com.code2roc.fastkernel.datasource.GlobalTransactional)")    public void dataSourcePointCut() {    }    /**     * 拦挡办法指定为 dataSourcePointCut     */    @Around("dataSourcePointCut()")    public Object around(ProceedingJoinPoint point) throws Throwable {        MethodSignature signature = (MethodSignature) point.getSignature();        Method method = signature.getMethod();        GlobalTransactional methodAnnotation = method.getAnnotation(GlobalTransactional.class);        if (methodAnnotation != null) {            DataSourceContextHolder.tagGlobal();            logger.info("标记全局事务");        }        try {            return point.proceed();        } finally {            logger.info("革除全局事务");            DataSourceContextHolder.clearGlobal();        }    }}
public class DataSourceContextHolder {    private static Logger log = LoggerFactory.getLogger(DataSourceContextHolder.class);    // 对以后线程的操作-线程平安的    private static final ThreadLocal<String> contextHolder = new ThreadLocal<String>();    private static final ThreadLocal<String> contextGlobalHolder = new ThreadLocal<String>();    // 调用此办法,切换数据源    public static void setDataSource(String dataSource) {        contextHolder.set(dataSource);        log.debug("已切换到数据源:{}", dataSource);    }    // 获取数据源    public static String getDataSource() {        String value = contextHolder.get();        if (StringUtil.isEmpty(value)) {            value = "master";        }        if (!StringUtil.isEmpty(getGlobal())) {            value = value + "-xa";        }        return value;    }    // 删除数据源    public static void clearDataSource() {        contextHolder.remove();        log.debug("已切换到主数据源");    }    //====================全局事务标记================    public static void tagGlobal() {        contextGlobalHolder.set("1");    }    public static String getGlobal() {        String value = contextGlobalHolder.get();        return value;    }    public static void clearGlobal() {        contextGlobalHolder.remove();    }    //===================================================}

配置XA事务日志

通过在resource文件夹下创立transactions.properties文件能够指定XA事务日志的存储门路

com.atomikos.icatch.log_base_dir= tempfiles/transition/