seata 的 1.4.1 版本曾经公布,目前看到的最次要的变动如下:
1: 配置不须要应用 registry.conf 和 file.conf 了。而是能够通过 Spring 的配置参数配置。而且提供了默认值,只须要依照业务批改本人的配置即可。
2:DataSourceProxy 实例也不须要创立,seata 会主动创立。
1:应用:
在 Spring 环境下,应用 seata 只须要引入 seata-spring-boot-starter 的 JAR 包即可。默认的主动配置加载类是:SeataAutoConfiguration。
2:在 SeataAutoConfiguration 里,会加载一堆的配置文件,文件比拟多,截图如下:
这一堆的配置信息,会寄存在 StarterConstants#PROPERTY_BEAN_MAP 这个 map 对象里。
3:数据源后置处理器:SeataDataSourceBeanPostProcessor。在 SeataAutoConfiguration 会创立 SeataDataSourceBeanPostProcessor 的实例,如下:
@Bean(BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR)
@ConditionalOnMissingBean(SeataDataSourceBeanPostProcessor.class)
public SeataDataSourceBeanPostProcessor seataDataSourceBeanPostProcessor(SeataProperties seataProperties) {return new SeataDataSourceBeanPostProcessor(seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
}
该后置处理器次要实现了 postProcessAfterInitialization 接口,其目标是在 bean 初始化后,依据 seata 的模式创立不同的数据库代理对象,比方:非 XA 模式创立的是 DataSourceProxy,XA 模式创立的是 DataSourceProxyXA。并将其保留到全局对象中。
这里有意思的是,postProcessAfterInitialization 接口返回的时候原始 DataSource 对象。所以其余服务援用的话,不会失去代理数据库代理对象。
通过这里代码,所以咱们没有必要本人创立类 DataSourceProxy 的 bean 对象。
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if (bean instanceof DataSource) {
//When not in the excludes, put and init proxy.
if (!excludes.contains(bean.getClass().getName())) {
//Only put and init proxy, not return proxy.
DataSourceProxyHolder.get().putDataSource((DataSource) bean, dataSourceProxyMode);
}
//If is SeataDataSourceProxy, return the original data source.
if (bean instanceof SeataDataSourceProxy) {return ((SeataDataSourceProxy) bean).getTargetDataSource();}
}
return bean;
}
4:代理创立器 SeataAutoDataSourceProxyCreator。在 SeataAutoConfiguration 会创立该类的实例,代码如下:
@Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
@ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),
seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
}
该类继承了 AbstractAutoProxyCreator,其目标是创立对 DataSource 对象创立拦截器(SeataAutoDataSourceProxyAdvice)。通过上面代码可知,只有 DataSrouce 对象,而且是非 SeataProxy 对象才创立。这里在前面的代码里会用到。
@Override
protected boolean shouldSkip(Class<?> beanClass, String beanName) {return !DataSource.class.isAssignableFrom(beanClass) ||
SeataProxy.class.isAssignableFrom(beanClass) ||
excludes.contains(beanClass.getName());
}
5:事务扫描器 GlobalTransactionScanner。在 SeataAutoConfiguration 会创立该类的实例,代码如下:
其结构参数如下:
applicationId: 取配置 seata.application-id 的值,如果没有,则取 spring.application.name 配置
txServiceGroup:取配置 seata.tx-service-group 的值,如果没有,则值为 applicationId 的值加字符串“-seata-service-group”
@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Automatically configure Seata");
}
return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
}
6: GlobalTransactionScanner 继承了接口 InitializingBean 接口,所以会调用接口 afterPropertiesSet 进行初始化操作。在这里次要是会初始化 netty 客户端。前面会有问题专门介绍 seata 里的 netty 应用。代码如下:
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
RMClient.init(applicationId, txServiceGroup);
7:GlobalTransactionScanner 继承了 AbstractAutoProxyCreator 接口,其次要是实现代理。在办法 wrapIfNecessary 中。对注解 @GlobalTransactional 和 @GlobalLock,创立了拦截器 GlobalTransactionalInterceptor。如果办法有这两个两个钟的其中一个注解,那么理论失去的是一个拦截器。该拦截器的办法 initDefaultGlobalTransactionTimeout 中会初始化事务的超时工夫(读取的是配置 client.tm.defaultGlobalTransactionTimeout,默认值 60 秒,这里没有革新得彻底,按理是能够依据 Spring 配置,)代码如下:
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
try {synchronized (PROXYED_SET) { // 疏忽了局部代码
interceptor = null;
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {return bean;}
if (interceptor == null) {if (globalTransactionalInterceptor == null) {globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener( ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
if (!AopUtils.isAopProxy(bean)) {bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {throw new RuntimeException(exx);
}
}
8:如果办法有注解 @GlobalTransactional,则调用的时候,会执行拦截器办法:GlobalTransactionalInterceptor#invoke。在这个办法里,会判断调用办法是有有注解 @GlobalTransactional 或者 @GlobalLock,对 @GlobalTransactional 注解,调用 handleGlobalTransaction 办法进行进去,代码如下:
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<?> targetClass =
methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
final GlobalTransactional globalTransactionalAnnotation =
getAnnotation(method, targetClass, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
if (!localDisable) {if (globalTransactionalAnnotation != null) {return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {return handleGlobalLock(methodInvocation, globalLockAnnotation);
}
}
}
return methodInvocation.proceed();}
9:在 handleGlobalTransaction 办法,有个重要的接口 TransactionalExecutor。他的默认实现如下:
name() 办法:获取事务的名词,如果事务的 name 属性有值,则用该值。如果没有,则依据办法名加上参数进行拼接。
getTransactionInfo() 办法就是解析注解 @GlobalTransactional。比方:事务的超时工夫,事务名词,事务的流传性(默认是是:REQUIRED),事务的重试配置,
事务的回滚异样和不回滚异样等。代码如下:
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {return methodInvocation.proceed();
}
public String name() {String name = globalTrxAnno.name();
if (!StringUtils.isNullOrEmpty(name)) {return name;}
return formatMethod(methodInvocation.getMethod());
}
@Override
public TransactionInfo getTransactionInfo() {int timeout = globalTrxAnno.timeoutMills();
if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {timeout = defaultGlobalTransactionTimeout;}
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(timeout);
transactionInfo.setName(name());
transactionInfo.setPropagation(globalTrxAnno.propagation());
transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());
transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());
Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {rollbackRules.add(new RollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.rollbackForClassName()) {rollbackRules.add(new RollbackRule(rbRule));
}
for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {rollbackRules.add(new NoRollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.noRollbackForClassName()) {rollbackRules.add(new NoRollbackRule(rbRule));
}
transactionInfo.setRollbackRules(rollbackRules);
return transactionInfo;
}
});
10:在 handleGlobalTransaction 办法里,会调用 TransactionalTemplate 对象的 excute 办法。该办法就是事务的外围实现。代码如下
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. 获取事务信息,就是下面介绍的办法
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1.1 获取以后的事务,如果是 TM,则失去的值是 null。如果是 RM,则失去的是 DefaultGlobalTransaction 对象(有 xID)。GlobalTransaction tx = GlobalTransactionContext.getCurrent();
// 1.2 获取事务的流传性,默认是 REQUIRED
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try { // 这里省略了事务流传性的解决
// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
// 对于 TM,创立 DefaultGlobalTransaction,然而该对象没 xID,角色是:Launcher
if (tx == null) {tx = GlobalTransactionContext.createNew();
}
// 将事务信息保留到以后线程
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {
// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
//else do nothing. Of course, the hooks will still be triggered.
beginTransaction(txInfo, tx); // 提交事务
Object rs;
try {
// Do Your Business
rs = business.execute(); // 执行业务办法} catch (Throwable ex) {
// 3. The needed business exception to rollback.
completeTransactionAfterThrowing(txInfo, tx, ex); // 异样后的事务处理
throw ex;
}
// 4. everything is fine, commit.
commitTransaction(tx); // 提交事务
return rs;
} finally {
//5. clear
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();}
} finally {
// If the transaction is suspended, resume it.
if (suspendedResourcesHolder != null) {tx.resume(suspendedResourcesHolder);
}
}
}
11:开始事务就是调用 DefaultGlobalTransaction#begin 办法。该办法次要的性能就是应用 netty 调用 seata-server,生成事务 ID,及 XID,并把这个保留在线程上下文中。代码如下:
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {triggerBeforeBegin();
tx.begin(txInfo.getTimeOut(), txInfo.getName());
triggerAfterBegin();} catch (TransactionException txe) {throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.BeginFailure);
}
}
public void begin(int timeout, String name) throws TransactionException {
// 疏忽不重要的代码
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
RootContext.bind(xid);
}
12:在办法 commitTransaction 里就是提交事务。该办法的次要作用就是应用 netty 申请 seata-server 端,通知服务端,事务完结。并革除线程上下文外面的 XID,代码如下:
private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {triggerBeforeCommit();
tx.commit();
triggerAfterCommit();} catch (TransactionException txe) {throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.CommitFailure);
}
}
public void commit() throws TransactionException {
int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
try {while (retry > 0) {
try {status = transactionManager.commit(xid);
break;
} catch (Throwable ex) {
retry--;
if (retry == 0) {throw new TransactionException("Failed to report global commit", ex);
}
}
}
} finally {if (xid.equals(RootContext.getXID())) {suspend();
}
}
}
public GlobalStatus commit(String xid) throws TransactionException {GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setXid(xid);
GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
return response.getGlobalStatus();}
13:completeTransactionAfterThrowing 办法里,就是判断以后抛出的异样是否须要回滚。如果须要回滚,通过 netty 调用 seata-server,告诉事务回滚。如果不是须要回滚的异样,则解决和下面的事务提交统一。
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {if (txInfo != null && txInfo.rollbackOn(originalException)) {
try {rollbackTransaction(tx, originalException);
} catch (TransactionException txe) {
// Failed to rollback
throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.RollbackFailure, originalException);
}
} else {
// not roll back on this exception, so commit
commitTransaction(tx);
}
}
14:seata 的重要性能就是事务 ID 的流传,