背景
框架之前实现了多数据源的动静切换及事务的解决,想更近一步提供一个简略的跨库事务处理性能,通过网上的搜寻调研,大抵有 XA 事务 /SEGA 事务 /TCC 事务等计划,因为业务次要波及政府及企业且并发量不大,所以采纳 XA 事务,尽管性能有所损失,然而能够保证数据的强一致性
方案设计
针对注册的数据源拷贝一份用于 XA 事务,使得本地事务和 XA 全局事务互相独立可抉择的应用
Maven 配置
引入 atomikos 第三方组件
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
注册 XA 数据源
应用 Druid 连接池,须要应用 DruidXADataSource 数据源对象,再应用 AtomikosDataSourceBean 进行包装
注册数据源时针对同一个连贯注册两份,一份失常数据源,一份用于 XA 事务的数据源,数据源标识辨别并关联
因为 spring 默认注册了 XA 事务管理器后,所有事务操作不再走本地事务,咱们通过切换不同的数据源决定走本地事务还是 XA 事务
// 主数据源 xa 模式
@Bean
@Qualifier("masterXADataSource")
public DataSource masterXADataSource() {DruidXADataSource datasource = new DruidXADataSource();
if(driverClassName.equals("com.mysql.cj.jdbc.Driver")){if(!dbUrl.contains("useOldAliasMetadataBehavior")){dbUrl += "&useOldAliasMetadataBehavior=true";}
if(!dbUrl.contains("useAffectedRows")){dbUrl += "&useAffectedRows=true";}
}
datasource.setUrl(this.dbUrl);
datasource.setUsername(username);
datasource.setPassword(password);
datasource.setDriverClassName(driverClassName);
//configuration
datasource.setInitialSize(1);
datasource.setMinIdle(3);
datasource.setMaxActive(20);
datasource.setMaxWait(60000);
datasource.setTimeBetweenEvictionRunsMillis(60000);
datasource.setMinEvictableIdleTimeMillis(60000);
datasource.setValidationQuery("select'x'");
datasource.setTestWhileIdle(true);
datasource.setTestOnBorrow(false);
datasource.setTestOnReturn(false);
datasource.setPoolPreparedStatements(true);
datasource.setMaxPoolPreparedStatementPerConnectionSize(20);
datasource.setLogAbandoned(false); // 移除泄露连贯产生是是否记录日志
try {datasource.setFilters("stat,slf4j");
} catch (SQLException e) {logger.error("druid configuration initialization filter", e);
}
datasource.setConnectionProperties("druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000");//connectionProperties);
AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
atomikosDataSourceBean.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
atomikosDataSourceBean.setUniqueResourceName("master-xa");
atomikosDataSourceBean.setXaDataSource(datasource);
atomikosDataSourceBean.setPoolSize(5);
atomikosDataSourceBean.setMaxPoolSize(20);
atomikosDataSourceBean.setTestQuery("select 1");
return atomikosDataSourceBean;
}
注册 XA 事务管理器
应用 spring 内置的 JtaTransactionManager 事务管理器对象,设置 AllowCustomIsolationLevels 为 true,否则指定自定义事务隔离级别会报错
//xa 模式全局事务管理器
@Bean(name = "jtaTransactionManager")
public PlatformTransactionManager transactionManager() throws Throwable {UserTransactionManager userTransactionManager = new UserTransactionManager();
UserTransaction userTransaction = new UserTransactionImp();
JtaTransactionManager jtaTransactionManager = new JtaTransactionManager(userTransaction, userTransactionManager);
jtaTransactionManager.setAllowCustomIsolationLevels(true);
return jtaTransactionManager;
}
定义 XA 事务切面
自定义注解 @GlobalTransactional 并定义对应切面,应用指定注解时在 ThreadLocal 变量值进行标识,组合
@Transactional 注解指定 XA 事务管理器,在切换数据原时判断以后是否在 XA 事物中,从而切换不同的数据源
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
@Transactional(rollbackFor = Exception.class,isolation = Isolation.READ_UNCOMMITTED,transactionManager = "jtaTransactionManager")
public @interface GlobalTransactional {}
@Aspect
@Component
@Order(value = 99)
public class GlobalTransitionAspect {private static Logger logger = LoggerFactory.getLogger(GlobalTransitionAspect.class);
@Autowired
private DynamicDataSource dynamicDataSource;
/**
* 切面点 指定注解
*/
@Pointcut("@annotation(com.code2roc.fastkernel.datasource.GlobalTransactional)" +
"|| @within(com.code2roc.fastkernel.datasource.GlobalTransactional)")
public void dataSourcePointCut() {}
/**
* 拦挡办法指定为 dataSourcePointCut
*/
@Around("dataSourcePointCut()")
public Object around(ProceedingJoinPoint point) throws Throwable {MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
GlobalTransactional methodAnnotation = method.getAnnotation(GlobalTransactional.class);
if (methodAnnotation != null) {DataSourceContextHolder.tagGlobal();
logger.info("标记全局事务");
}
try {return point.proceed();
} finally {logger.info("革除全局事务");
DataSourceContextHolder.clearGlobal();}
}
}
public class DataSourceContextHolder {private static Logger log = LoggerFactory.getLogger(DataSourceContextHolder.class);
// 对以后线程的操作 - 线程平安的
private static final ThreadLocal<String> contextHolder = new ThreadLocal<String>();
private static final ThreadLocal<String> contextGlobalHolder = new ThreadLocal<String>();
// 调用此办法,切换数据源
public static void setDataSource(String dataSource) {contextHolder.set(dataSource);
log.debug("已切换到数据源:{}", dataSource);
}
// 获取数据源
public static String getDataSource() {String value = contextHolder.get();
if (StringUtil.isEmpty(value)) {value = "master";}
if (!StringUtil.isEmpty(getGlobal())) {value = value + "-xa";}
return value;
}
// 删除数据源
public static void clearDataSource() {contextHolder.remove();
log.debug("已切换到主数据源");
}
//==================== 全局事务标记 ================
public static void tagGlobal() {contextGlobalHolder.set("1");
}
public static String getGlobal() {String value = contextGlobalHolder.get();
return value;
}
public static void clearGlobal() {contextGlobalHolder.remove();
}
//===================================================
}
配置 XA 事务日志
通过在 resource 文件夹下创立 transactions.properties 文件能够指定 XA 事务日志的存储门路
com.atomikos.icatch.log_base_dir= tempfiles/transition/