seata的1.4.1版本曾经公布,目前看到的最次要的变动如下:
1: 配置不须要应用registry.conf和file.conf了。而是能够通过Spring的配置参数配置。而且提供了默认值,只须要依照业务批改本人的配置即可。
2:DataSourceProxy 实例也不须要创立,seata会主动创立。
1:应用:
在Spring环境下,应用seata只须要引入 seata-spring-boot-starter 的JAR包即可。默认的主动配置加载类是:SeataAutoConfiguration。
2:在 SeataAutoConfiguration 里,会加载一堆的配置文件,文件比拟多,截图如下:
这一堆的配置信息,会寄存在 StarterConstants#PROPERTY_BEAN_MAP 这个map对象里。
3:数据源后置处理器:SeataDataSourceBeanPostProcessor。在SeataAutoConfiguration会创立 SeataDataSourceBeanPostProcessor 的实例,如下:
@Bean(BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR)
@ConditionalOnMissingBean(SeataDataSourceBeanPostProcessor.class)
public SeataDataSourceBeanPostProcessor seataDataSourceBeanPostProcessor(SeataProperties seataProperties) {
return new SeataDataSourceBeanPostProcessor(seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
}
该后置处理器次要实现了postProcessAfterInitialization接口,其目标是在bean初始化后,依据seata的模式创立不同的数据库代理对象,比方:非XA模式创立的是DataSourceProxy,XA模式创立的是DataSourceProxyXA。并将其保留到全局对象中。
这里有意思的是,postProcessAfterInitialization 接口返回的时候原始 DataSource 对象。所以其余服务援用的话,不会失去代理数据库代理对象。
通过这里代码,所以咱们没有必要本人创立类DataSourceProxy的bean对象。
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DataSource) {
//When not in the excludes, put and init proxy.
if (!excludes.contains(bean.getClass().getName())) {
//Only put and init proxy, not return proxy.
DataSourceProxyHolder.get().putDataSource((DataSource) bean, dataSourceProxyMode);
}
//If is SeataDataSourceProxy, return the original data source.
if (bean instanceof SeataDataSourceProxy) {
return ((SeataDataSourceProxy) bean).getTargetDataSource();
}
}
return bean;
}
4:代理创立器 SeataAutoDataSourceProxyCreator。在SeataAutoConfiguration会创立该类的实例,代码如下:
@Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
@ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),
seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
}
该类继承了AbstractAutoProxyCreator,其目标是创立对DataSource对象创立拦截器(SeataAutoDataSourceProxyAdvice)。通过上面代码可知,只有DataSrouce对象,而且是非SeataProxy对象才创立。这里在前面的代码里会用到。
@Override
protected boolean shouldSkip(Class<?> beanClass, String beanName) {
return !DataSource.class.isAssignableFrom(beanClass) ||
SeataProxy.class.isAssignableFrom(beanClass) ||
excludes.contains(beanClass.getName());
}
5:事务扫描器GlobalTransactionScanner。在SeataAutoConfiguration会创立该类的实例,代码如下:
其结构参数如下:
applicationId: 取配置seata.application-id的值,如果没有,则取spring.application.name配置
txServiceGroup:取配置seata.tx-service-group的值,如果没有,则值为 applicationId的值加字符串“-seata-service-group”
@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Automatically configure Seata");
}
return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
}
6: GlobalTransactionScanner继承了接口InitializingBean接口,所以会调用接口 afterPropertiesSet 进行初始化操作。在这里次要是会初始化netty客户端。前面会有问题专门介绍seata里的netty应用。代码如下:
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
RMClient.init(applicationId, txServiceGroup);
7:GlobalTransactionScanner 继承了AbstractAutoProxyCreator接口,其次要是实现代理。在办法 wrapIfNecessary 中。对注解@GlobalTransactional 和 @GlobalLock ,创立了拦截器 GlobalTransactionalInterceptor。如果办法有这两个两个钟的其中一个注解,那么理论失去的是一个拦截器。该拦截器的办法 initDefaultGlobalTransactionTimeout 中会初始化事务的超时工夫(读取的是配置 client.tm.defaultGlobalTransactionTimeout,默认值 60秒,这里没有革新得彻底,按理是能够依据Spring配置,)代码如下:
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
try {
synchronized (PROXYED_SET) { //疏忽了局部代码
interceptor = null;
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
if (interceptor == null) {
if (globalTransactionalInterceptor == null) {
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener( ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
8:如果办法有注解 @GlobalTransactional,则调用的时候,会执行拦截器办法:GlobalTransactionalInterceptor#invoke。在这个办法里,会判断调用办法是有有注解 @GlobalTransactional 或者 @GlobalLock,对@GlobalTransactional注解,调用 handleGlobalTransaction 办法进行进去,代码如下:
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<?> targetClass =
methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
final GlobalTransactional globalTransactionalAnnotation =
getAnnotation(method, targetClass, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
if (!localDisable) {
if (globalTransactionalAnnotation != null) {
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {
return handleGlobalLock(methodInvocation, globalLockAnnotation);
}
}
}
return methodInvocation.proceed();
}
9:在handleGlobalTransaction办法,有个重要的接口 TransactionalExecutor。他的默认实现如下:
name()办法:获取事务的名词,如果事务的 name 属性有值,则用该值。如果没有,则依据办法名加上参数进行拼接。
getTransactionInfo()办法就是解析注解 @GlobalTransactional。比方:事务的超时工夫,事务名词,事务的流传性(默认是是:REQUIRED),事务的重试配置,
事务的回滚异样和不回滚异样等。代码如下:
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
public String name() {
String name = globalTrxAnno.name();
if (!StringUtils.isNullOrEmpty(name)) {
return name;
}
return formatMethod(methodInvocation.getMethod());
}
@Override
public TransactionInfo getTransactionInfo() {
int timeout = globalTrxAnno.timeoutMills();
if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
timeout = defaultGlobalTransactionTimeout;
}
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(timeout);
transactionInfo.setName(name());
transactionInfo.setPropagation(globalTrxAnno.propagation());
transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());
transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());
Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.rollbackForClassName()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
transactionInfo.setRollbackRules(rollbackRules);
return transactionInfo;
}
});
10:在 handleGlobalTransaction 办法里,会调用 TransactionalTemplate 对象的excute 办法。该办法就是事务的外围实现。代码如下
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. 获取事务信息,就是下面介绍的办法
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1.1 获取以后的事务,如果是TM,则失去的值是null。 如果是RM,则失去的是 DefaultGlobalTransaction 对象(有xID)。
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
// 1.2 获取事务的流传性,默认是REQUIRED
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try { // 这里省略了事务流传性的解决
// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
// 对于TM,创立DefaultGlobalTransaction,然而该对象没xID,角色是:Launcher
if (tx == null) {
tx = GlobalTransactionContext.createNew();
}
// 将事务信息保留到以后线程
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {
// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
//else do nothing. Of course, the hooks will still be triggered.
beginTransaction(txInfo, tx); // 提交事务
Object rs;
try {
// Do Your Business
rs = business.execute(); // 执行业务办法
} catch (Throwable ex) {
// 3. The needed business exception to rollback.
completeTransactionAfterThrowing(txInfo, tx, ex); //异样后的事务处理
throw ex;
}
// 4. everything is fine, commit.
commitTransaction(tx); //提交事务
return rs;
} finally {
//5. clear
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();
}
} finally {
// If the transaction is suspended, resume it.
if (suspendedResourcesHolder != null) {
tx.resume(suspendedResourcesHolder);
}
}
}
11:开始事务就是调用 DefaultGlobalTransaction#begin办法。该办法次要的性能就是应用netty调用 seata-server,生成事务ID,及XID,并把这个保留在线程上下文中。代码如下:
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
triggerBeforeBegin();
tx.begin(txInfo.getTimeOut(), txInfo.getName());
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.BeginFailure);
}
}
public void begin(int timeout, String name) throws TransactionException {
//疏忽不重要的代码
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
RootContext.bind(xid);
}
12:在办法 commitTransaction 里就是提交事务。该办法的次要作用就是应用 netty申请 seata-server 端,通知服务端,事务完结。并革除线程上下文外面的XID,代码如下:
private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
triggerBeforeCommit();
tx.commit();
triggerAfterCommit();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.CommitFailure);
}
}
public void commit() throws TransactionException {
int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
try {
while (retry > 0) {
try {
status = transactionManager.commit(xid);
break;
} catch (Throwable ex) {
retry--;
if (retry == 0) {
throw new TransactionException("Failed to report global commit", ex);
}
}
}
} finally {
if (xid.equals(RootContext.getXID())) {
suspend();
}
}
}
public GlobalStatus commit(String xid) throws TransactionException {
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setXid(xid);
GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
return response.getGlobalStatus();
}
13:completeTransactionAfterThrowing 办法里,就是判断以后抛出的异样是否须要回滚。如果须要回滚,通过netty调用 seata-server,告诉事务回滚。如果不是须要回滚的异样,则解决和下面的事务提交统一。
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {
if (txInfo != null && txInfo.rollbackOn(originalException)) {
try {
rollbackTransaction(tx, originalException);
} catch (TransactionException txe) {
// Failed to rollback
throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.RollbackFailure, originalException);
}
} else {
// not roll back on this exception, so commit
commitTransaction(tx);
}
}
14:seata的重要性能就是事务ID的流传,
发表回复