关于springboot:SpringBoot整合atomikos实现跨库事务

26次阅读

共计 5097 个字符,预计需要花费 13 分钟才能阅读完成。

背景

框架之前实现了多数据源的动静切换及事务的解决,想更近一步提供一个简略的跨库事务处理性能,通过网上的搜寻调研,大抵有 XA 事务 /SEGA 事务 /TCC 事务等计划,因为业务次要波及政府及企业且并发量不大,所以采纳 XA 事务,尽管性能有所损失,然而能够保证数据的强一致性

方案设计

针对注册的数据源拷贝一份用于 XA 事务,使得本地事务和 XA 全局事务互相独立可抉择的应用

Maven 配置

引入 atomikos 第三方组件

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>

注册 XA 数据源

应用 Druid 连接池,须要应用 DruidXADataSource 数据源对象,再应用 AtomikosDataSourceBean 进行包装

注册数据源时针对同一个连贯注册两份,一份失常数据源,一份用于 XA 事务的数据源,数据源标识辨别并关联

因为 spring 默认注册了 XA 事务管理器后,所有事务操作不再走本地事务,咱们通过切换不同的数据源决定走本地事务还是 XA 事务

 // 主数据源 xa 模式
    @Bean
    @Qualifier("masterXADataSource")
    public DataSource masterXADataSource() {DruidXADataSource datasource = new DruidXADataSource();
        if(driverClassName.equals("com.mysql.cj.jdbc.Driver")){if(!dbUrl.contains("useOldAliasMetadataBehavior")){dbUrl += "&useOldAliasMetadataBehavior=true";}
            if(!dbUrl.contains("useAffectedRows")){dbUrl += "&useAffectedRows=true";}
        }
        datasource.setUrl(this.dbUrl);
        datasource.setUsername(username);
        datasource.setPassword(password);
        datasource.setDriverClassName(driverClassName);
        //configuration
        datasource.setInitialSize(1);
        datasource.setMinIdle(3);
        datasource.setMaxActive(20);
        datasource.setMaxWait(60000);
        datasource.setTimeBetweenEvictionRunsMillis(60000);
        datasource.setMinEvictableIdleTimeMillis(60000);
        datasource.setValidationQuery("select'x'");
        datasource.setTestWhileIdle(true);
        datasource.setTestOnBorrow(false);
        datasource.setTestOnReturn(false);
        datasource.setPoolPreparedStatements(true);
        datasource.setMaxPoolPreparedStatementPerConnectionSize(20);
        datasource.setLogAbandoned(false); // 移除泄露连贯产生是是否记录日志
        try {datasource.setFilters("stat,slf4j");
        } catch (SQLException e) {logger.error("druid configuration initialization filter", e);
        }
        datasource.setConnectionProperties("druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000");//connectionProperties);

        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
        atomikosDataSourceBean.setUniqueResourceName("master-xa");
        atomikosDataSourceBean.setXaDataSource(datasource);
        atomikosDataSourceBean.setPoolSize(5);
        atomikosDataSourceBean.setMaxPoolSize(20);
        atomikosDataSourceBean.setTestQuery("select 1");
        return atomikosDataSourceBean;
    }

注册 XA 事务管理器

应用 spring 内置的 JtaTransactionManager 事务管理器对象,设置 AllowCustomIsolationLevels 为 true,否则指定自定义事务隔离级别会报错

    //xa 模式全局事务管理器
    @Bean(name = "jtaTransactionManager")
    public PlatformTransactionManager transactionManager() throws Throwable {UserTransactionManager userTransactionManager = new UserTransactionManager();
        UserTransaction userTransaction = new UserTransactionImp();
        JtaTransactionManager jtaTransactionManager =  new JtaTransactionManager(userTransaction, userTransactionManager);
        jtaTransactionManager.setAllowCustomIsolationLevels(true);
        return jtaTransactionManager;
    }

定义 XA 事务切面

自定义注解 @GlobalTransactional 并定义对应切面,应用指定注解时在 ThreadLocal 变量值进行标识,组合

@Transactional 注解指定 XA 事务管理器,在切换数据原时判断以后是否在 XA 事物中,从而切换不同的数据源

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
@Transactional(rollbackFor = Exception.class,isolation = Isolation.READ_UNCOMMITTED,transactionManager = "jtaTransactionManager")
public @interface GlobalTransactional {}
@Aspect
@Component
@Order(value = 99)
public class GlobalTransitionAspect {private static Logger logger = LoggerFactory.getLogger(GlobalTransitionAspect.class);
    @Autowired
    private DynamicDataSource dynamicDataSource;

    /**
     * 切面点 指定注解
     */
    @Pointcut("@annotation(com.code2roc.fastkernel.datasource.GlobalTransactional)" +
            "|| @within(com.code2roc.fastkernel.datasource.GlobalTransactional)")
    public void dataSourcePointCut() {}

    /**
     * 拦挡办法指定为 dataSourcePointCut
     */
    @Around("dataSourcePointCut()")
    public Object around(ProceedingJoinPoint point) throws Throwable {MethodSignature signature = (MethodSignature) point.getSignature();
        Method method = signature.getMethod();
        GlobalTransactional methodAnnotation = method.getAnnotation(GlobalTransactional.class);
        if (methodAnnotation != null) {DataSourceContextHolder.tagGlobal();
            logger.info("标记全局事务");
        }
        try {return point.proceed();
        } finally {logger.info("革除全局事务");
            DataSourceContextHolder.clearGlobal();}
    }
}
public class DataSourceContextHolder {private static Logger log = LoggerFactory.getLogger(DataSourceContextHolder.class);
    // 对以后线程的操作 - 线程平安的
    private static final ThreadLocal<String> contextHolder = new ThreadLocal<String>();
    private static final ThreadLocal<String> contextGlobalHolder = new ThreadLocal<String>();

    // 调用此办法,切换数据源
    public static void setDataSource(String dataSource) {contextHolder.set(dataSource);
        log.debug("已切换到数据源:{}", dataSource);
    }

    // 获取数据源
    public static String getDataSource() {String value = contextHolder.get();
        if (StringUtil.isEmpty(value)) {value = "master";}
        if (!StringUtil.isEmpty(getGlobal())) {value = value + "-xa";}
        return value;
    }

    // 删除数据源
    public static void clearDataSource() {contextHolder.remove();
        log.debug("已切换到主数据源");
    }

    //==================== 全局事务标记 ================
    public static void tagGlobal() {contextGlobalHolder.set("1");
    }

    public static String getGlobal() {String value = contextGlobalHolder.get();
        return value;
    }

    public static void clearGlobal() {contextGlobalHolder.remove();
    }
    //===================================================
}

配置 XA 事务日志

通过在 resource 文件夹下创立 transactions.properties 文件能够指定 XA 事务日志的存储门路

com.atomikos.icatch.log_base_dir= tempfiles/transition/

正文完
 0