关于java:seata客户端-141版本源码一

2次阅读

共计 11212 个字符,预计需要花费 29 分钟才能阅读完成。

seata 的 1.4.1 版本曾经公布,目前看到的最次要的变动如下:
1: 配置不须要应用 registry.conf 和 file.conf 了。而是能够通过 Spring 的配置参数配置。而且提供了默认值,只须要依照业务批改本人的配置即可。
2:DataSourceProxy 实例也不须要创立,seata 会主动创立。

1:应用:
在 Spring 环境下,应用 seata 只须要引入 seata-spring-boot-starter 的 JAR 包即可。默认的主动配置加载类是:SeataAutoConfiguration。

2:在 SeataAutoConfiguration 里,会加载一堆的配置文件,文件比拟多,截图如下:


这一堆的配置信息,会寄存在 StarterConstants#PROPERTY_BEAN_MAP 这个 map 对象里。

3:数据源后置处理器:SeataDataSourceBeanPostProcessor。在 SeataAutoConfiguration 会创立 SeataDataSourceBeanPostProcessor 的实例,如下:

@Bean(BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR)
@ConditionalOnMissingBean(SeataDataSourceBeanPostProcessor.class)
public SeataDataSourceBeanPostProcessor seataDataSourceBeanPostProcessor(SeataProperties seataProperties) {return new SeataDataSourceBeanPostProcessor(seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
}

该后置处理器次要实现了 postProcessAfterInitialization 接口,其目标是在 bean 初始化后,依据 seata 的模式创立不同的数据库代理对象,比方:非 XA 模式创立的是 DataSourceProxy,XA 模式创立的是 DataSourceProxyXA。并将其保留到全局对象中。
这里有意思的是,postProcessAfterInitialization 接口返回的时候原始 DataSource 对象。所以其余服务援用的话,不会失去代理数据库代理对象。
通过这里代码,所以咱们没有必要本人创立类 DataSourceProxy 的 bean 对象。

public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if (bean instanceof DataSource) {
        //When not in the excludes, put and init proxy.
        if (!excludes.contains(bean.getClass().getName())) {
            //Only put and init proxy, not return proxy.
            DataSourceProxyHolder.get().putDataSource((DataSource) bean, dataSourceProxyMode);
        }
        //If is SeataDataSourceProxy, return the original data source.
        if (bean instanceof SeataDataSourceProxy) {return ((SeataDataSourceProxy) bean).getTargetDataSource();}
    }
    return bean;
}

4:代理创立器 SeataAutoDataSourceProxyCreator。在 SeataAutoConfiguration 会创立该类的实例,代码如下:

@Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
@ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),
 seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
}

该类继承了 AbstractAutoProxyCreator,其目标是创立对 DataSource 对象创立拦截器(SeataAutoDataSourceProxyAdvice)。通过上面代码可知,只有 DataSrouce 对象,而且是非 SeataProxy 对象才创立。这里在前面的代码里会用到。

@Override
protected boolean shouldSkip(Class<?> beanClass, String beanName) {return !DataSource.class.isAssignableFrom(beanClass) ||
        SeataProxy.class.isAssignableFrom(beanClass) ||
        excludes.contains(beanClass.getName());
}

5:事务扫描器 GlobalTransactionScanner。在 SeataAutoConfiguration 会创立该类的实例,代码如下:
其结构参数如下:
applicationId: 取配置 seata.application-id 的值,如果没有,则取 spring.application.name 配置
txServiceGroup:取配置 seata.tx-service-group 的值,如果没有,则值为 applicationId 的值加字符串“-seata-service-group”

@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Automatically configure Seata");
 }
    return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
}

6: GlobalTransactionScanner 继承了接口 InitializingBean 接口,所以会调用接口 afterPropertiesSet 进行初始化操作。在这里次要是会初始化 netty 客户端。前面会有问题专门介绍 seata 里的 netty 应用。代码如下:

TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
RMClient.init(applicationId, txServiceGroup);

7:GlobalTransactionScanner 继承了 AbstractAutoProxyCreator 接口,其次要是实现代理。在办法 wrapIfNecessary 中。对注解 @GlobalTransactional 和 @GlobalLock,创立了拦截器 GlobalTransactionalInterceptor。如果办法有这两个两个钟的其中一个注解,那么理论失去的是一个拦截器。该拦截器的办法 initDefaultGlobalTransactionTimeout 中会初始化事务的超时工夫(读取的是配置 client.tm.defaultGlobalTransactionTimeout,默认值 60 秒,这里没有革新得彻底,按理是能够依据 Spring 配置,)代码如下:

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    try {synchronized (PROXYED_SET) { // 疏忽了局部代码
           interceptor = null;
           Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
           Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
           if (!existsAnnotation(new Class[]{serviceInterface})
                    && !existsAnnotation(interfacesIfJdk)) {return bean;}
                if (interceptor == null) {if (globalTransactionalInterceptor == null) {globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                        ConfigurationCache.addConfigListener( ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                              (ConfigurationChangeListener)globalTransactionalInterceptor);
                    }
                    interceptor = globalTransactionalInterceptor;
                }
                if (!AopUtils.isAopProxy(bean)) {bean = super.wrapIfNecessary(bean, beanName, cacheKey);
               } else {AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                for (Advisor avr : advisor) {advised.addAdvisor(0, avr);
                }
            }
            PROXYED_SET.add(beanName);
           return bean;
       }
    } catch (Exception exx) {throw new RuntimeException(exx);
  }
}

8:如果办法有注解 @GlobalTransactional,则调用的时候,会执行拦截器办法:GlobalTransactionalInterceptor#invoke。在这个办法里,会判断调用办法是有有注解 @GlobalTransactional 或者 @GlobalLock,对 @GlobalTransactional 注解,调用 handleGlobalTransaction 办法进行进去,代码如下:

public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
    Class<?> targetClass =
        methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
    Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
    if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
    final GlobalTransactional globalTransactionalAnnotation =
            getAnnotation(method, targetClass, GlobalTransactional.class);
    final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
    boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
    if (!localDisable) {if (globalTransactionalAnnotation != null) {return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
    } else if (globalLockAnnotation != null) {return handleGlobalLock(methodInvocation, globalLockAnnotation);
    }
        }
    }
    return methodInvocation.proceed();}

9:在 handleGlobalTransaction 办法,有个重要的接口 TransactionalExecutor。他的默认实现如下:
name() 办法:获取事务的名词,如果事务的 name 属性有值,则用该值。如果没有,则依据办法名加上参数进行拼接。
getTransactionInfo() 办法就是解析注解 @GlobalTransactional。比方:事务的超时工夫,事务名词,事务的流传性(默认是是:REQUIRED),事务的重试配置,
事务的回滚异样和不回滚异样等。代码如下:

return transactionalTemplate.execute(new TransactionalExecutor() {
    @Override
    public Object execute() throws Throwable {return methodInvocation.proceed();
    }
    public String name() {String name = globalTrxAnno.name();
        if (!StringUtils.isNullOrEmpty(name)) {return name;}
        return formatMethod(methodInvocation.getMethod());
    }
    @Override
    public TransactionInfo getTransactionInfo() {int timeout = globalTrxAnno.timeoutMills();
        if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {timeout = defaultGlobalTransactionTimeout;}
    TransactionInfo transactionInfo = new TransactionInfo();
    transactionInfo.setTimeOut(timeout);
    transactionInfo.setName(name());
    transactionInfo.setPropagation(globalTrxAnno.propagation());
    transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());
    transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());
    Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
    for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {rollbackRules.add(new RollbackRule(rbRule));
    }
    for (String rbRule : globalTrxAnno.rollbackForClassName()) {rollbackRules.add(new RollbackRule(rbRule));
    }
    for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {rollbackRules.add(new NoRollbackRule(rbRule));
    }
    for (String rbRule : globalTrxAnno.noRollbackForClassName()) {rollbackRules.add(new NoRollbackRule(rbRule));
    }
    transactionInfo.setRollbackRules(rollbackRules);
   return transactionInfo;
 }
});

10:在 handleGlobalTransaction 办法里,会调用 TransactionalTemplate 对象的 excute 办法。该办法就是事务的外围实现。代码如下

public Object execute(TransactionalExecutor business) throws Throwable {
    // 1. 获取事务信息,就是下面介绍的办法
    TransactionInfo txInfo = business.getTransactionInfo();
    if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");
    }
    // 1.1 获取以后的事务,如果是 TM,则失去的值是 null。如果是 RM,则失去的是 DefaultGlobalTransaction 对象(有 xID)。GlobalTransaction tx = GlobalTransactionContext.getCurrent();
    // 1.2 获取事务的流传性,默认是 REQUIRED
    Propagation propagation = txInfo.getPropagation();
    SuspendedResourcesHolder suspendedResourcesHolder = null;
    try { // 这里省略了事务流传性的解决
        // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
        // 对于 TM,创立 DefaultGlobalTransaction,然而该对象没 xID,角色是:Launcher
        if (tx == null) {tx = GlobalTransactionContext.createNew();
        }
        // 将事务信息保留到以后线程
        GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
        try {
            // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
            //else do nothing. Of course, the hooks will still be triggered. 
           beginTransaction(txInfo, tx); // 提交事务
           Object rs;
           try {
                // Do Your Business
                rs = business.execute(); // 执行业务办法} catch (Throwable ex) {
                // 3. The needed business exception to rollback.
                completeTransactionAfterThrowing(txInfo, tx, ex); // 异样后的事务处理
                throw ex;
            }
            // 4. everything is fine, commit.
            commitTransaction(tx); // 提交事务
            return rs;
        } finally {
            //5. clear
           resumeGlobalLockConfig(previousConfig);
           triggerAfterCompletion();
          cleanUp();}
    } finally {
        // If the transaction is suspended, resume it.
       if (suspendedResourcesHolder != null) {tx.resume(suspendedResourcesHolder);
       }
    }
}

11:开始事务就是调用 DefaultGlobalTransaction#begin 办法。该办法次要的性能就是应用 netty 调用 seata-server,生成事务 ID,及 XID,并把这个保留在线程上下文中。代码如下:

private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
    try {triggerBeforeBegin();
        tx.begin(txInfo.getTimeOut(), txInfo.getName());
        triggerAfterBegin();} catch (TransactionException txe) {throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.BeginFailure);
    }
}

public void begin(int timeout, String name) throws TransactionException {
    // 疏忽不重要的代码
    xid = transactionManager.begin(null, null, name, timeout);
    status = GlobalStatus.Begin;
    RootContext.bind(xid);
}

12:在办法 commitTransaction 里就是提交事务。该办法的次要作用就是应用 netty 申请 seata-server 端,通知服务端,事务完结。并革除线程上下文外面的 XID,代码如下:

private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
    try {triggerBeforeCommit();
        tx.commit();
        triggerAfterCommit();} catch (TransactionException txe) {throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.CommitFailure);
    }
}

public void commit() throws TransactionException {
    int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
    try {while (retry > 0) {
            try {status = transactionManager.commit(xid);
                break; 
            } catch (Throwable ex) {
               retry--;
               if (retry == 0) {throw new TransactionException("Failed to report global commit", ex);
               }
            }
        }
    } finally {if (xid.equals(RootContext.getXID())) {suspend();
        }
    }
}


public GlobalStatus commit(String xid) throws TransactionException {GlobalCommitRequest globalCommit = new GlobalCommitRequest();
    globalCommit.setXid(xid);
    GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
    return response.getGlobalStatus();}

13:completeTransactionAfterThrowing 办法里,就是判断以后抛出的异样是否须要回滚。如果须要回滚,通过 netty 调用 seata-server,告诉事务回滚。如果不是须要回滚的异样,则解决和下面的事务提交统一。

private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException)    throws TransactionalExecutor.ExecutionException {if (txInfo != null && txInfo.rollbackOn(originalException)) {
        try {rollbackTransaction(tx, originalException);
        } catch (TransactionException txe) {
            // Failed to rollback
            throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.RollbackFailure,                            originalException);
        }
    } else {
        // not roll back on this exception, so commit
         commitTransaction(tx);
    }
}

14:seata 的重要性能就是事务 ID 的流传,

正文完
 0