原因

Spring Cloud 作为微服务框架,工作当中必用。起因也很简略,Spring cloud 依靠Springboot,
背靠Spring Framework,又有Eureka 和 Alibaba 两个大厂的反对,大多数的互联网公司会优先选择以此作为后端开发框架。微服务架构下每个服务都会有一个数据库,不同的数据库、不同的服务实例都会产生事务问题。

简述

微服务架构下的性能开发会遇到分布式事务问题,有2PC 3PC TCC等解决方案,基于性能思考,公司会思考应用Seata作为分布式事务解决方案,其中提供了基于2PC的AT模式、TCC模式、Saga模式等等,不过最罕用的还是AT模式,只须要一个GlobalTransaction注解即可,无业务侵入性。

前文对Seata的客户端 TM RM 进行理解说。本文持续对服务端TC进行讲解。

注释

Seata Server 是一个Springboot利用,较早的版本是一个并未应用Spring boot,1.5.2版本的代码曾经开始应用 Springboot了。Server端次要起到了TC的角色,对利用中的Seata客户端RM TM进行交互,同样应用Netty作为通信组件,引入了很多的注册核心比方Nacos就在其中,Seata TC就像利用一样,能够启动多个实例,TM RM客户端会在调用TC是进行负载平衡。

日志收集

反对File输入、Kafka、Logstash、Console输入,TC的日志收集能够作为基础设施的扩大进行保护,其稳定性的要求不亚于利用。

Server 启动

ServerRunner是一个启动入口,其中利用启动实现后会执行 Server.start(),启动服务端的Netty,另外设置一个Bean销毁回调,触发防止对注册核心的下线失败。

进入 Server 类,仅有一个start() 作为利用的入口。

  1. 创立一个线程池作为Netty的工作线程池,同样是可配置的.
  2. 初始化Session容器,反对 DB Redis File三种模式,能够通过SPI扩大.
  3. 初始化锁管理器工程的模式,未指定则应用存储模式,能够通过SPI扩大.
  4. 创立定时工作:删除UndoLog 查看事务超时 异步提交 重试提交 重试回滚.
  5. 获取本机Ip.
  6. 退出Spring销毁回调.
  7. 启动Netty.
  8. 向注册核心注册.

Netty 启动后,就能够接管RM TM的申请了。TC的性能在 TCInboundHandler 接口中定义:

public interface TCInboundHandler {    // 开始全局事务    GlobalBeginResponse handle(GlobalBeginRequest globalBegin, RpcContext rpcContext);    // 提交事务    GlobalCommitResponse handle(GlobalCommitRequest globalCommit, RpcContext rpcContext);    // 回滚事务    GlobalRollbackResponse handle(GlobalRollbackRequest globalRollback, RpcContext rpcContext);    // 注册分支事务    BranchRegisterResponse handle(BranchRegisterRequest branchRegister, RpcContext rpcContext);    // 分支事务报告    BranchReportResponse handle(BranchReportRequest branchReport, RpcContext rpcContext);    // 全局锁查问    GlobalLockQueryResponse handle(GlobalLockQueryRequest checkLock, RpcContext rpcContext);    // 全局事务状态    GlobalStatusResponse handle(GlobalStatusRequest globalStatus, RpcContext rpcContext);    // 全局事务报告    GlobalReportResponse handle(GlobalReportRequest globalReport, RpcContext rpcContext);}

TCInboundHandler 的实现类是DefaultCoordinator,此对象会作为一个Netty Handler,解决Netty申请。
TCInboundHandler 还有一个形象实现 AbstractTCInboundHandler,为TC的性能解决提供异样机制

Seata形象了一个事务管理器 TransactionManager 作为全局事务管理器,大多数状况下都会应用AT模式,也是Seata基于2PC开发的一种高性能模式,其TC的性能逻辑在 ATCore 中。

DefaultCoordinator 聚合了 DefaultCore。

    public DefaultCore(RemotingServer remotingServer) {        // SPI 加载数据所有的事务管理器实现        List<AbstractCore> allCore = EnhancedServiceLoader.loadAll(AbstractCore.class,            new Class[] {RemotingServer.class}, new Object[] {remotingServer});        if (CollectionUtils.isNotEmpty(allCore)) {            for (AbstractCore core : allCore) {                // 以分支事务枚举为key,存在Map中,后续可依据分支事务类型获取对应的事务管理器                coreMap.put(core.getHandleBranchType(), core);            }        }    }
# META-INF/services/io.seata.server.coordinator.AbstractCoreio.seata.server.transaction.at.ATCoreio.seata.server.transaction.tcc.TccCoreio.seata.server.transaction.saga.SagaCoreio.seata.server.transaction.xa.XACore

DefaultCore对TransactionManager接口办法的实现,并没有做真正的业务解决,全副调用某个分支事务类的事务处理器进行解决。

    @Override    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,                               String applicationData, String lockKeys) throws TransactionException {        // 先获取 AbstractCore 的具体实现类,再进行业务解决        return getCore(branchType).branchRegister(branchType, resourceId, clientId, xid,            applicationData, lockKeys);    }

GlobalSession开启

全局事务的开启是第一步,TM向TC获取一个 xid,开启全局事务的逻辑是在 DefaultCore.begin(String applicationId, String transactionServiceGroup, String name, int timeout)中。

具体的实现就是 new GlobalSession(),设置session长久化形式(redis file db),开启Session,记录开始工夫,生成并返回 xid.

ipAddress + IP_PORT_SPLIT_CHAR + port + IP_PORT_SPLIT_CHAR + UUIDGenerator.generateUUID()

分支事务注册

SessionHelper.newBranchByGlobal()创立分支事务 branchSession,把 branchId 放入MDC,对分支事务和全局事务加锁,而后将分支事务放入全局事务(长久化和内存都会退出),响应分支事务Id。

branchSessionLock(globalSession, branchSession); // 加锁,只有AT模式才有加锁的实现,其余模式全是空实现

branchSession.getApplicationData() 获取 autoCommit skipCheckLock 两个参数进行加锁 branchSession.lock(autoCommit, skipCheckLock),lock的调用是 LockerManagerFactory.getLockManager().acquireLock(this, autoCommit, skipCheckLock)LockerManagerFactory.getLockManager() 获取的是Seata server启动时配置的锁实现(db file redis)

# AbstractLockManager#acquireLock(io.seata.server.session.BranchSession, boolean, boolean)    @Override    public boolean acquireLock(BranchSession branchSession, boolean autoCommit, boolean skipCheckLock) throws TransactionException {        if (branchSession == null) {            throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");        }        String lockKey = branchSession.getLockKey();        if (StringUtils.isNullOrEmpty(lockKey)) {            // no lock            return true;        }        // get locks of branch        List<RowLock> locks = collectRowLocks(branchSession);        if (CollectionUtils.isEmpty(locks)) {            // no lock            return true;        }        // 获取锁实现进行加锁        return getLocker(branchSession).acquireLock(locks, autoCommit, skipCheckLock);    }

redis 的实现非常简单,db 和file,暂不做剖析。

    @Override    public boolean acquireLock(List<RowLock> rowLocks, boolean autoCommit, boolean skipCheckLock) {        // rowLocks 是申请参数传递过去的        if (CollectionUtils.isEmpty(rowLocks)) {            return true;        }        try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {            if (ACQUIRE_LOCK_SHA != null && autoCommit) {                return acquireLockByLua(jedis, rowLocks);            } else {                return acquireLockByPipeline(jedis, rowLocks, autoCommit, skipCheckLock);            }        }    }

分支事务注册后会执行本地事务本地事务提交,会有一个锁的概念,提交本地事务前会对数据加锁,加锁胜利能力提交。

全局事务提交

globalSession.asyncCommit(); 提交事务

全局事务回滚

若是第一阶段失败,间接删除分支事务即可,若是第二阶段,则告诉RM进行回滚。

总结

Seata Server 局部的逻辑比RM TM的逻辑简单不少,本文仅波及AT模式,接下来会对Saga TCC模式进行剖析。分布式事务自身就是一个分布式架构中产生的一个难题,功能设计时尽可能的防止产生分布式事务,若不能防止,能够借鉴Seata框架的事务实现,理解其中的运行原理,逐渐的斟酌高性能的分布式事务解决方案,对我的项目的稳定性和扩展性至关重要。