乐趣区

关于后端:Seata-RM-TM-技术原理解说

原因

Spring Cloud 作为微服务框架,工作当中必用。起因也很简略,Spring cloud 依靠 Springboot,背靠 Spring Framework,又有 Eureka 和 Alibaba 两个大厂的反对,大多数的互联网公司会优先选择以此作为后端开发框架。微服务架构下每个服务都会有一个数据库,不同的数据库、不同的服务实例都会产生事务问题。

简述

微服务架构下的性能开发会遇到分布式事务问题,有 2PC 3PC TCC 等解决方案,基于性能思考,公司会思考应用 Seata 作为分布式事务解决方案,其中提供了基于 2PC 的 AT 模式、TCC 模式、Saga 模式等等,不过最罕用的还是 AT 模式,只须要一个 GlobalTransaction 注解即可,无业务侵入性。

引入 Seata

  1. pom 引入

         <dependency>
             <groupId>com.alibaba.cloud</groupId>
             <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
         </dependency>
  2. 服务配置

    seata.enabled=false
    seata.application-id=${spring.application.name}
    seata.tx-service-group=default_tx_group
    seata.registry.type=nacos
    seata.registry.nacos.application=seata-server
    seata.registry.nacos.server-addr=localhost:8848
    seata.registry.nacos.namespace=41eddc87-5fc2-4fdf-8d4f-e16118a143e0
    seata.registry.nacos.group=SEATA_GROUP
    
    seata.config.type=nacos
    seata.config.nacos.server-addr=localhost:8848
    seata.config.nacos.group=SEATA_GROUP
    seata.config.nacos.namespace=41eddc87-5fc2-4fdf-8d4f-e16118a143e0
    seata.config.nacos.data‐id=seataServer.properties
  3. nacos 配置

    配置项 内容
    Data ID service.vgroupMapping.default_tx_group
    Group SEATA_GROUP
    配置内容 default
    / /
    Data ID seataServer.properties
    Group SEATA_GROUP
    配置内容 default

    seataServer.properties 配置参考,不同手敲,文档中有脚本的执行形式,

    sh ${SEATAPATH}/script/config-center/nacos/nacos-config.sh -h localhost -p 8848 -g SEATA_GROUP -t 5a3c7d6c-f497-4d68-a71a-2e5e3340b3ca
  4. 性能上开启 @GlobalTransaction 注解即可

SpringBoot 集成 Seata

剖析 Seata 的集成形式,还是要从 spring-cloud-starter-alibaba-seata 开始,此 maven 依赖引入:

    <dependency>
      <groupId>io.seata</groupId>
      <artifactId>seata-spring-boot-starter</artifactId>
      <version>1.3.0</version>
      <scope>compile</scope>
    </dependency>

Seata 的集成逻辑在 Seata-spring-boot-starter 当中。以下是依赖源文件中的 spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
io.seata.spring.boot.autoconfigure.SeataDataSourceAutoConfiguration,\
io.seata.spring.boot.autoconfigure.SeataAutoConfiguration,\
io.seata.spring.boot.autoconfigure.HttpAutoConfiguration,\
io.seata.spring.boot.autoconfigure.SeataSagaAutoConfiguration

SeataAutoConfiguration

  1. 默认有 maven 依赖就会加载,能够通过配置敞开 Seata: seata.enabled=false.
  2. 引入默认实现的 FailureHandler。
  3. 引入 GlobalTransactionScanner。

GlobalTransactionScanner 实现了 InitializingBean ApplicationContextAware DisposableBean,在对应的 Bean 周期实现对 Seata 的操作,集成 AbstractAutoProxyCreator 实现 Aop 的逻辑,使得办法上标注 @GlobalTransaction 注解就能开启分布式事务性能。

  1. 实例化后执行客户端初始化 initClient,创立 TM 客户端 RM 客户端
  2. Bean 销毁时执行 Seata 扩大的销毁机制,能够触发 TMClient RMClient 中的敞开办法调用。
  3. 重写的 wrapIfNecessary() 办法中实现代理类的创立。能够解决 GlobalTransaction 注解,也是剖析的入口。

初始化 TM RM 客户端

初始化 TM 客户端,首先会创立一个 TmNettyRemotingClient 对象,创建对象须要一个线程池,能够通过参数配置线程池。初始化的逻辑比拟有意思,外面封装了 Netty 作为客户端连贯工具。
创立过程:

  1. 创立父类 AbstractNettyRemotingClient

    1. 创立父类 AbstractNettyRemoting,将客户端的 messageExecutor 与 Bean 销毁关联。
    2. AbstractNettyRemotingClient 构造方法中创立一些要害对象
    3. 创立 NettyClientBootstrap 对象,创立 Netty 客户端的细节就在其中。
    4. 创立 ChannelDuplexHandler 实现类 io.seata.core.rpc.netty.AbstractNettyRemotingClient.ClientHandler,把 Netty 的 channel 办法比方 channelRead(),和 Seata 注册对应的处理器关联起来,有对应的音讯后,就会调用对应的处理器,channelInactive() exceptionCaught() userEventTriggered() 保护着 Seata 中的 channel 的存活和心跳,保护连接池中的有效性。
    5. 创立 NettyClientChannelManager 对象,应用 apache common pools 池化 netty Channel,防止每次与 Seata 服务交互都要建设连贯。连贯的建设和删除逻辑就在其中。
  2. seata 能够通过 SPI 扩大性能,比方 AuthSigner。
  3. 读取是否能够批量申请的配置。并设置监听器监听此配置
  4. 单例对象

初始化过程:

  1. 注册 TC 的响应处理器 ClientOnResponseProcessor
  2. 注册心跳音讯处理器 ClientHeartbeatProcessor
  3. 执行父类 AbstractNettyRemotingClient 初始化,定时刷新注册核心指定分组的 Seata 服务实例,将实例地址拿到后,建设连贯并放入连接池 NettyClientChannelManager。若容许批量申请,则创立合并申请的线程池,合并申请的解决逻辑在 MergedSendRunnable 中。
  4. 执行父类 AbstractNettyRemoting 初始化,单线程定时执行清理哪些超时的异步申请
  5. 启动 Netty。

初始化 RM 客户端,创立和初始化过程与 TM 基本一致。
初始化过程:

  1. 注册分支事务提交处理器 RmBranchCommitProcessor
  2. 注册分支事务回滚处理器 RmBranchRollbackProcessor
  3. 注册 undo log 处理器 RmUndoLogProcessor
  4. 注册 TC 的响应处理器 ClientOnResponseProcessor
  5. 注册心跳音讯处理器 ClientHeartbeatProcessor

GlobalTransaction 注解

Bean 初始化实现后调用初始化后的 postProcessors,applyBeanPostProcessorsAfterInitialization(),Seata 注册了一个 GlobalTransactionScanner,继承了 AbstractAutoProxyCreator,具备了创立代理的能力,每个 Bean 初始化后都有机会进入 GlobalTransactionScanner.wrapIfNecessary()。

进入 wrapIfNecessary()

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        try {synchronized (PROXYED_SET) {if (PROXYED_SET.contains(beanName)) {return bean;}
                interceptor = null;
                //check TCC proxy
                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                    // 创立 TccActionInterceptor
                    //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                    interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                    ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                        (ConfigurationChangeListener)interceptor);
                } else {Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                    Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

                    // 非 GlobalTransactional GlobalLock 标注的类或者办法 跳过解决
                    if (!existsAnnotation(new Class[]{serviceInterface})
                        && !existsAnnotation(interfacesIfJdk)) {return bean;}

                    if (interceptor == null) {if (globalTransactionalInterceptor == null) {
                            // 创立 GlobalTransactionalInterceptor
                            globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                            ConfigurationCache.addConfigListener(
                                ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                (ConfigurationChangeListener)globalTransactionalInterceptor);
                        }
                        interceptor = globalTransactionalInterceptor;
                    }
                }

                LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
                if (!AopUtils.isAopProxy(bean)) {
                    // 调用父类的 wrapIfNecessary,创立代理类
                    bean = super.wrapIfNecessary(bean, beanName, cacheKey);
                } else {AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                    Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                    for (Advisor avr : advisor) {advised.addAdvisor(0, avr);
                    }
                }
                PROXYED_SET.add(beanName);
                return bean;
            }
        } catch (Exception exx) {throw new RuntimeException(exx);
        }
    }
// AbstractAutoProxyCreator
    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        // ...
        // Create proxy if we have advice.
        // 调用子类的 getAdvicesAndAdvisorsForBean 返回 GlobalTransactionalInterceptor 对象。// 将 seata 性能织入 Bean 代理类中
        Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
        if (specificInterceptors != DO_NOT_PROXY) {this.advisedBeans.put(cacheKey, Boolean.TRUE);
            Object proxy = createProxy(bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
            this.proxyTypes.put(cacheKey, proxy.getClass());
            return proxy;
        }

        this.advisedBeans.put(cacheKey, Boolean.FALSE);
        return bean;
    }

标注 GlobalTransactional GlobalLock 注解的类或者办法的 Bean 曾经创立了代理,那么在 Bean 办法调用的时候就会进入 GlobalTransactionalInterceptor 的 invoke() 办法中执行 Seata 的逻辑。

    @Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        // 获取指标类
        Class<?> targetClass =
            methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
        // 获取指标办法,比方:Foo.bar()
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {// 获取原生办法 bar()
            final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
            // 解析出注解
            final GlobalTransactional globalTransactionalAnnotation =
                getAnnotation(method, targetClass, GlobalTransactional.class);
            final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
            // disable 代表是否禁用 service.disableGlobalTransaction,默认是启用状态
            boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
            if (!localDisable) {if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
                    AspectTransactional transactional;
                    if (globalTransactionalAnnotation != null) {
                        // 构建 AspectTransactional 对象,将注解中的参数解析到 对象中
                        transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
                            globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
                            globalTransactionalAnnotation.rollbackForClassName(),
                            globalTransactionalAnnotation.noRollbackFor(),
                            globalTransactionalAnnotation.noRollbackForClassName(),
                            globalTransactionalAnnotation.propagation(),
                            globalTransactionalAnnotation.lockRetryInterval(),
                            globalTransactionalAnnotation.lockRetryTimes());
                    } else {transactional = this.aspectTransactional;}
                    // 进入 GlobalTransaction 解决流程
                    return handleGlobalTransaction(methodInvocation, transactional);
                } else if (globalLockAnnotation != null) {
                    // 进入 GlobalLock 解决流程
                    return handleGlobalLock(methodInvocation, globalLockAnnotation);
                }
            }
        }
        return methodInvocation.proceed();}

以上办法是代理类的加强逻辑入口,次要是解析出以后办法的注解,进入 GlobalTransaction 的解决流程。

    Object handleGlobalTransaction(final MethodInvocation methodInvocation,
        final AspectTransactional aspectTransactional) throws Throwable {
        boolean succeed = true;
        try {
            // 全局事务的执行是在 transactionalTemplate 对象中进行
            return transactionalTemplate.execute(new TransactionalExecutor() {
                @Override
                public Object execute() throws Throwable {return methodInvocation.proceed();
                }

                public String name() {String name = aspectTransactional.getName();
                    if (!StringUtils.isNullOrEmpty(name)) {return name;}
                    return formatMethod(methodInvocation.getMethod());
                }

                // 把注解中的参数解析为 TransactionInfo 对象
                @Override
                public TransactionInfo getTransactionInfo() {
                    // reset the value of timeout
                    int timeout = aspectTransactional.getTimeoutMills();
                    if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {timeout = defaultGlobalTransactionTimeout;}

                    TransactionInfo transactionInfo = new TransactionInfo();
                    transactionInfo.setTimeOut(timeout);
                    transactionInfo.setName(name());
                    transactionInfo.setPropagation(aspectTransactional.getPropagation());
                    transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval());
                    transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes());
                    Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                    for (Class<?> rbRule : aspectTransactional.getRollbackFor()) {rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (String rbRule : aspectTransactional.getRollbackForClassName()) {rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (Class<?> rbRule : aspectTransactional.getNoRollbackFor()) {rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    for (String rbRule : aspectTransactional.getNoRollbackForClassName()) {rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    transactionInfo.setRollbackRules(rollbackRules);
                    return transactionInfo;
                }
            });
        } catch (TransactionalExecutor.ExecutionException e) {TransactionalExecutor.Code code = e.getCode();
            // 异样解决机制
            switch (code) {
                case RollbackDone:
                    throw e.getOriginalException();
                case BeginFailure:
                    succeed = false;
                    failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case CommitFailure:
                    succeed = false;
                    failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case RollbackFailure:
                    failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
                    throw e.getOriginalException();
                case RollbackRetrying:
                    failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
                    throw e.getOriginalException();
                default:
                    throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
            }
        } finally {if (degradeCheck) {EVENT_BUS.post(new DegradeCheckEvent(succeed));
            }
        }
    }

以上会把 GlobalTransaction 注解中的参数转换为一个事务,另创立一个事务执行器,在事务模板中执行事务。

    /**
     * Execute object.
     *
     * @param business the business
     * @return the object
     * @throws TransactionalExecutor.ExecutionException the execution exception
     */
    public Object execute(TransactionalExecutor business) throws Throwable {
        // 1. Get transactionInfo 获取事务
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'. 获取以后全局事务,若有空则是事务参与者
        GlobalTransaction tx = GlobalTransactionContext.getCurrent();

        // 1.2 Handle the transaction propagation. 依照流传行文处理事务
        Propagation propagation = txInfo.getPropagation();
        SuspendedResourcesHolder suspendedResourcesHolder = null;
        try {switch (propagation) {
                case NOT_SUPPORTED:// 不反对事务,将以后事务挂起,以无事务的形式执行
                    // If transaction is existing, suspend it.
                    if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend();
                    }
                    // Execute without transaction and return.
                    return business.execute();
                case REQUIRES_NEW:// 须要新事务,挂起以后事务,开始一个新事务
                    // If transaction is existing, suspend it, and then begin new transaction.
                    if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend();
                        tx = GlobalTransactionContext.createNew();}
                    // Continue and execute with new transaction
                    break;
                case SUPPORTS:// 反对事务或者无事务运行,没有事务则间接执行办法
                    // If transaction is not existing, execute without transaction.
                    if (notExistingTransaction(tx)) {return business.execute();
                    }
                    // Continue and execute with new transaction
                    break;
                case REQUIRED:// 须要事务,以后事务有则是五以后事务,若没有创立一个事务
                    // If current transaction is existing, execute with current transaction,
                    // else continue and execute with new transaction.
                    break;
                case NEVER:// 不反对事务
                    // If transaction is existing, throw exception.
                    if (existingTransaction(tx)) {
                        throw new TransactionException(
                            String.format("Existing transaction found for transaction marked with propagation'never', xid = %s"
                                    , tx.getXid()));
                    } else {
                        // Execute without transaction and return.
                        return business.execute();}
                case MANDATORY: // 必须要一个事务
                    // If transaction is not existing, throw exception.
                    if (notExistingTransaction(tx)) {throw new TransactionException("No existing transaction found for transaction marked with propagation'mandatory'");
                    }
                    // Continue and execute with current transaction.
                    break;
                default:
                    throw new TransactionException("Not Supported Propagation:" + propagation);
            }

            // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
            if (tx == null) {tx = GlobalTransactionContext.createNew();
            }

            // set current tx config to holder
            GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

            try {
                // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
                //    else do nothing. Of course, the hooks will still be triggered.
                // 发动全局事务注册
                beginTransaction(txInfo, tx);

                Object rs;
                try {
                    // Do Your Business 执行本地办法,作为事务发起者,本地办法中产生的任何近程调用都将携带 XID
                    rs = business.execute();} catch (Throwable ex) {
                    // 发动回滚申请,TC 收到后,会一一告诉子事务回滚
                    // 3. The needed business exception to rollback.
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }

                // 4. everything is fine, commit. 提交事务
                commitTransaction(tx);

                return rs;
            } finally {
                //5. clear 清理阶段
                resumeGlobalLockConfig(previousConfig);
                triggerAfterCompletion();
                cleanUp();}
        } finally {
            // If the transaction is suspended, resume it. 复原挂起的事务
            if (suspendedResourcesHolder != null) {tx.resume(suspendedResourcesHolder);
            }
        }
    }

GlobalTransactionRole 全局事务角色分为发起者和参与者,以上下文是否有 GlobalTransaction 对象来判断。
本地办法执行后,近程调用也全副胜利,发起者就向 Seata 发动提交事务的申请。

全局事务 ID 的传递

Seata 反对的近程调用:SpringMvc Dubbbo sofa motan hsf grpc dubbo brpc.

SpringMvc 中自定义一个 HandlerInterceptor,在调用 preHandle 是绑定 seata 的 rpcId。向 Spring 注入一个 WebMvcConfigurer 实现类将自定义的 HandlerInterceptor 放入 MVC 中。

Dubbo 会自定义一个 Filter 扩大,绑定 seata 的 rpcId。

其余的近程调用反对,能够自行查看,逻辑比较简单,调用前绑定 rpcId,产生异样须要革除 rpcId


以上就是全局事务管理器的大抵内容。具体事务的管制在 RM 端,TM 客户端的的操作大多数在下面应用到。


数据源代理

SeataDataSourceAutoConfiguration 会引入 SeataAutoDataSourceProxyCreator,此对象将会创立一个数据库代理,代理类中加强了 RM 的逻辑,事务提交前要解决 RM 的逻辑。

SeataAutoDataSourceProxyCreator 也是一个 AbstractAutoProxyCreator 子类,会被 Spring 作为 AOP 解决,重写的 wrapIfNecessary() 中,将数据库对象进行加强,创立代理类 DataSourceProxy , 具体的创立代理逻辑在 super.wrapIfNecessary() 中。

DataSourceProxy.getConnection() 会创立一个 ConnectionProxy,组合了 targetDataSource.getConnection(),不会影响原有数据库连贯的应用。

ConnectionProxy 代理的 commit 办法中,就会 GlobalTransaction 进行了逻辑解决:注册本地事务,创立 undo log 执行本地提交。TC 向 RM 客户端收回提交或者回滚申请后,解决 undolog 即可。

    @Override
    public void commit() throws SQLException {
        try {lockRetryPolicy.execute(() -> {
                // 执行提交
                doCommit();
                return null;
            });
        } catch (SQLException e) {if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {rollback();
            }
            throw e;
        } catch (Exception e) {throw new SQLException(e);
        }
    }

    // 依据上下文判断执行哪种提交
    private void doCommit() throws SQLException {if (context.inGlobalTransaction()) {processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {processLocalCommitWithGlobalLocks();
        } else {targetConnection.commit();
        }
    }

    // 全局事务提交
    private void processGlobalTransactionCommit() throws SQLException {
        try {
            // 注册分支事务
            register();} catch (TransactionException e) {recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {// undo log 出场 UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            // 真正的业务提交
            targetConnection.commit();} catch (Throwable ex) {LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
            report(false);
            throw new SQLException(ex);
        }
        if (IS_REPORT_SUCCESS_ENABLE) {report(true);
        }
        context.reset();}

总结

Seata 作为一款分布式的框架,提供了分布式事务的解决方案,其中的实现原理值得剖析,以上是对 Seata 客户端的局部进行理解说,Seata 服务端的内容同样精彩,敬请期待。

退出移动版