关于后端:Seata-ServerTC-技术原理解说

3次阅读

共计 5062 个字符,预计需要花费 13 分钟才能阅读完成。

原因

Spring Cloud 作为微服务框架,工作当中必用。起因也很简略,Spring cloud 依靠 Springboot,
背靠 Spring Framework,又有 Eureka 和 Alibaba 两个大厂的反对,大多数的互联网公司会优先选择以此作为后端开发框架。微服务架构下每个服务都会有一个数据库,不同的数据库、不同的服务实例都会产生事务问题。

简述

微服务架构下的性能开发会遇到分布式事务问题,有 2PC 3PC TCC 等解决方案,基于性能思考,公司会思考应用 Seata 作为分布式事务解决方案,其中提供了基于 2PC 的 AT 模式、TCC 模式、Saga 模式等等,不过最罕用的还是 AT 模式,只须要一个 GlobalTransaction 注解即可,无业务侵入性。

前文对 Seata 的客户端 TM RM 进行理解说。本文持续对服务端 TC 进行讲解。

注释

Seata Server 是一个 Springboot 利用,较早的版本是一个并未应用 Spring boot,1.5.2 版本的代码曾经开始应用 Springboot 了。Server 端次要起到了 TC 的角色,对利用中的 Seata 客户端 RM TM 进行交互,同样应用 Netty 作为通信组件,引入了很多的注册核心比方 Nacos 就在其中,Seata TC 就像利用一样,能够启动多个实例,TM RM 客户端会在调用 TC 是进行负载平衡。

日志收集

反对 File 输入、Kafka、Logstash、Console 输入,TC 的日志收集能够作为基础设施的扩大进行保护,其稳定性的要求不亚于利用。

Server 启动

ServerRunner 是一个启动入口,其中利用启动实现后会执行 Server.start(),启动服务端的 Netty,另外设置一个 Bean 销毁回调,触发防止对注册核心的下线失败。

进入 Server 类,仅有一个 start() 作为利用的入口。

  1. 创立一个线程池作为 Netty 的工作线程池,同样是可配置的.
  2. 初始化 Session 容器,反对 DB Redis File 三种模式,能够通过 SPI 扩大.
  3. 初始化锁管理器工程的模式,未指定则应用存储模式,能够通过 SPI 扩大.
  4. 创立定时工作:删除 UndoLog 查看事务超时 异步提交 重试提交 重试回滚.
  5. 获取本机 Ip.
  6. 退出 Spring 销毁回调.
  7. 启动 Netty.
  8. 向注册核心注册.

Netty 启动后,就能够接管 RM TM 的申请了。TC 的性能在 TCInboundHandler 接口中定义:

public interface TCInboundHandler {
    // 开始全局事务
    GlobalBeginResponse handle(GlobalBeginRequest globalBegin, RpcContext rpcContext);
    // 提交事务
    GlobalCommitResponse handle(GlobalCommitRequest globalCommit, RpcContext rpcContext);
    // 回滚事务
    GlobalRollbackResponse handle(GlobalRollbackRequest globalRollback, RpcContext rpcContext);
    // 注册分支事务
    BranchRegisterResponse handle(BranchRegisterRequest branchRegister, RpcContext rpcContext);
    // 分支事务报告
    BranchReportResponse handle(BranchReportRequest branchReport, RpcContext rpcContext);
    // 全局锁查问
    GlobalLockQueryResponse handle(GlobalLockQueryRequest checkLock, RpcContext rpcContext);
    // 全局事务状态
    GlobalStatusResponse handle(GlobalStatusRequest globalStatus, RpcContext rpcContext);
    // 全局事务报告
    GlobalReportResponse handle(GlobalReportRequest globalReport, RpcContext rpcContext);
}

TCInboundHandler 的实现类是 DefaultCoordinator,此对象会作为一个 Netty Handler,解决 Netty 申请。
TCInboundHandler 还有一个形象实现 AbstractTCInboundHandler,为 TC 的性能解决提供异样机制

Seata 形象了一个事务管理器 TransactionManager 作为全局事务管理器,大多数状况下都会应用 AT 模式,也是 Seata 基于 2PC 开发的一种高性能模式,其 TC 的性能逻辑在 ATCore 中。

DefaultCoordinator 聚合了 DefaultCore。

    public DefaultCore(RemotingServer remotingServer) {
        // SPI 加载数据所有的事务管理器实现
        List<AbstractCore> allCore = EnhancedServiceLoader.loadAll(AbstractCore.class,
            new Class[] {RemotingServer.class}, new Object[] {remotingServer});
        if (CollectionUtils.isNotEmpty(allCore)) {for (AbstractCore core : allCore) {
                // 以分支事务枚举为 key,存在 Map 中,后续可依据分支事务类型获取对应的事务管理器
                coreMap.put(core.getHandleBranchType(), core);
            }
        }
    }
# META-INF/services/io.seata.server.coordinator.AbstractCore
io.seata.server.transaction.at.ATCore
io.seata.server.transaction.tcc.TccCore
io.seata.server.transaction.saga.SagaCore
io.seata.server.transaction.xa.XACore

DefaultCore 对 TransactionManager 接口办法的实现,并没有做真正的业务解决,全副调用某个分支事务类的事务处理器进行解决。

    @Override
    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
                               String applicationData, String lockKeys) throws TransactionException {
        // 先获取 AbstractCore 的具体实现类,再进行业务解决
        return getCore(branchType).branchRegister(branchType, resourceId, clientId, xid,
            applicationData, lockKeys);
    }

GlobalSession 开启

全局事务的开启是第一步,TM 向 TC 获取一个 xid,开启全局事务的逻辑是在 DefaultCore.begin(String applicationId, String transactionServiceGroup, String name, int timeout) 中。

具体的实现就是 new GlobalSession(),设置 session 长久化形式(redis file db),开启 Session,记录开始工夫,生成并返回 xid.

ipAddress + IP_PORT_SPLIT_CHAR + port + IP_PORT_SPLIT_CHAR + UUIDGenerator.generateUUID()

分支事务注册

SessionHelper.newBranchByGlobal() 创立分支事务 branchSession,把 branchId 放入 MDC,对分支事务和全局事务加锁,而后将分支事务放入全局事务(长久化和内存都会退出),响应分支事务 Id。

branchSessionLock(globalSession, branchSession); // 加锁,只有 AT 模式才有加锁的实现,其余模式全是空实现

branchSession.getApplicationData() 获取 autoCommit skipCheckLock 两个参数进行加锁 branchSession.lock(autoCommit, skipCheckLock),lock 的调用是 LockerManagerFactory.getLockManager().acquireLock(this, autoCommit, skipCheckLock)LockerManagerFactory.getLockManager() 获取的是 Seata server 启动时配置的锁实现(db file redis)

# AbstractLockManager#acquireLock(io.seata.server.session.BranchSession, boolean, boolean)

    @Override
    public boolean acquireLock(BranchSession branchSession, boolean autoCommit, boolean skipCheckLock) throws TransactionException {if (branchSession == null) {throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");
        }
        String lockKey = branchSession.getLockKey();
        if (StringUtils.isNullOrEmpty(lockKey)) {
            // no lock
            return true;
        }
        // get locks of branch
        List<RowLock> locks = collectRowLocks(branchSession);
        if (CollectionUtils.isEmpty(locks)) {
            // no lock
            return true;
        }
        // 获取锁实现进行加锁
        return getLocker(branchSession).acquireLock(locks, autoCommit, skipCheckLock);
    }

redis 的实现非常简单,db 和 file,暂不做剖析。

    @Override
    public boolean acquireLock(List<RowLock> rowLocks, boolean autoCommit, boolean skipCheckLock) {
        // rowLocks 是申请参数传递过去的
        if (CollectionUtils.isEmpty(rowLocks)) {return true;}
        try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {if (ACQUIRE_LOCK_SHA != null && autoCommit) {return acquireLockByLua(jedis, rowLocks);
            } else {return acquireLockByPipeline(jedis, rowLocks, autoCommit, skipCheckLock);
            }
        }
    }

分支事务注册后会执行本地事务本地事务提交,会有一个锁的概念,提交本地事务前会对数据加锁,加锁胜利能力提交。

全局事务提交

globalSession.asyncCommit(); 提交事务

全局事务回滚

若是第一阶段失败,间接删除分支事务即可,若是第二阶段,则告诉 RM 进行回滚。

总结

Seata Server 局部的逻辑比 RM TM 的逻辑简单不少,本文仅波及 AT 模式,接下来会对 Saga TCC 模式进行剖析。分布式事务自身就是一个分布式架构中产生的一个难题,功能设计时尽可能的防止产生分布式事务,若不能防止,能够借鉴 Seata 框架的事务实现,理解其中的运行原理,逐渐的斟酌高性能的分布式事务解决方案,对我的项目的稳定性和扩展性至关重要。

正文完
 0