原因
Spring Cloud 作为微服务框架,工作当中必用。起因也很简略,Spring cloud 依靠Springboot,
背靠Spring Framework,又有Eureka 和 Alibaba 两个大厂的反对,大多数的互联网公司会优先选择以此作为后端开发框架。微服务架构下每个服务都会有一个数据库,不同的数据库、不同的服务实例都会产生事务问题。
简述
微服务架构下的性能开发会遇到分布式事务问题,有2PC 3PC TCC等解决方案,基于性能思考,公司会思考应用Seata作为分布式事务解决方案,其中提供了基于2PC的AT模式、TCC模式、Saga模式等等,不过最罕用的还是AT模式,只须要一个GlobalTransaction注解即可,无业务侵入性。
前文对Seata的客户端 TM RM 进行理解说。本文持续对服务端TC进行讲解。
注释
Seata Server 是一个Springboot利用,较早的版本是一个并未应用Spring boot,1.5.2版本的代码曾经开始应用 Springboot了。Server端次要起到了TC的角色,对利用中的Seata客户端RM TM进行交互,同样应用Netty作为通信组件,引入了很多的注册核心比方Nacos就在其中,Seata TC就像利用一样,能够启动多个实例,TM RM客户端会在调用TC是进行负载平衡。
日志收集
反对File输入、Kafka、Logstash、Console输入,TC的日志收集能够作为基础设施的扩大进行保护,其稳定性的要求不亚于利用。
Server 启动
ServerRunner是一个启动入口,其中利用启动实现后会执行 Server.start(),启动服务端的Netty,另外设置一个Bean销毁回调,触发防止对注册核心的下线失败。
进入 Server 类,仅有一个start() 作为利用的入口。
- 创立一个线程池作为Netty的工作线程池,同样是可配置的.
- 初始化Session容器,反对 DB Redis File三种模式,能够通过SPI扩大.
- 初始化锁管理器工程的模式,未指定则应用存储模式,能够通过SPI扩大.
- 创立定时工作:删除UndoLog 查看事务超时 异步提交 重试提交 重试回滚.
- 获取本机Ip.
- 退出Spring销毁回调.
- 启动Netty.
- 向注册核心注册.
Netty 启动后,就能够接管RM TM的申请了。TC的性能在 TCInboundHandler 接口中定义:
public interface TCInboundHandler { // 开始全局事务 GlobalBeginResponse handle(GlobalBeginRequest globalBegin, RpcContext rpcContext); // 提交事务 GlobalCommitResponse handle(GlobalCommitRequest globalCommit, RpcContext rpcContext); // 回滚事务 GlobalRollbackResponse handle(GlobalRollbackRequest globalRollback, RpcContext rpcContext); // 注册分支事务 BranchRegisterResponse handle(BranchRegisterRequest branchRegister, RpcContext rpcContext); // 分支事务报告 BranchReportResponse handle(BranchReportRequest branchReport, RpcContext rpcContext); // 全局锁查问 GlobalLockQueryResponse handle(GlobalLockQueryRequest checkLock, RpcContext rpcContext); // 全局事务状态 GlobalStatusResponse handle(GlobalStatusRequest globalStatus, RpcContext rpcContext); // 全局事务报告 GlobalReportResponse handle(GlobalReportRequest globalReport, RpcContext rpcContext);}
TCInboundHandler 的实现类是DefaultCoordinator,此对象会作为一个Netty Handler,解决Netty申请。
TCInboundHandler 还有一个形象实现 AbstractTCInboundHandler,为TC的性能解决提供异样机制
Seata形象了一个事务管理器 TransactionManager 作为全局事务管理器,大多数状况下都会应用AT模式,也是Seata基于2PC开发的一种高性能模式,其TC的性能逻辑在 ATCore 中。
DefaultCoordinator 聚合了 DefaultCore。
public DefaultCore(RemotingServer remotingServer) { // SPI 加载数据所有的事务管理器实现 List<AbstractCore> allCore = EnhancedServiceLoader.loadAll(AbstractCore.class, new Class[] {RemotingServer.class}, new Object[] {remotingServer}); if (CollectionUtils.isNotEmpty(allCore)) { for (AbstractCore core : allCore) { // 以分支事务枚举为key,存在Map中,后续可依据分支事务类型获取对应的事务管理器 coreMap.put(core.getHandleBranchType(), core); } } }
# META-INF/services/io.seata.server.coordinator.AbstractCoreio.seata.server.transaction.at.ATCoreio.seata.server.transaction.tcc.TccCoreio.seata.server.transaction.saga.SagaCoreio.seata.server.transaction.xa.XACore
DefaultCore对TransactionManager接口办法的实现,并没有做真正的业务解决,全副调用某个分支事务类的事务处理器进行解决。
@Override public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException { // 先获取 AbstractCore 的具体实现类,再进行业务解决 return getCore(branchType).branchRegister(branchType, resourceId, clientId, xid, applicationData, lockKeys); }
GlobalSession开启
全局事务的开启是第一步,TM向TC获取一个 xid,开启全局事务的逻辑是在 DefaultCore.begin(String applicationId, String transactionServiceGroup, String name, int timeout)中。
具体的实现就是 new GlobalSession(),设置session长久化形式(redis file db),开启Session,记录开始工夫,生成并返回 xid.
ipAddress + IP_PORT_SPLIT_CHAR + port + IP_PORT_SPLIT_CHAR + UUIDGenerator.generateUUID()
分支事务注册
SessionHelper.newBranchByGlobal()创立分支事务 branchSession,把 branchId 放入MDC,对分支事务和全局事务加锁,而后将分支事务放入全局事务(长久化和内存都会退出),响应分支事务Id。
branchSessionLock(globalSession, branchSession); // 加锁,只有AT模式才有加锁的实现,其余模式全是空实现
从 branchSession.getApplicationData()
获取 autoCommit skipCheckLock 两个参数进行加锁 branchSession.lock(autoCommit, skipCheckLock)
,lock的调用是 LockerManagerFactory.getLockManager().acquireLock(this, autoCommit, skipCheckLock)
,LockerManagerFactory.getLockManager()
获取的是Seata server启动时配置的锁实现(db file redis)
# AbstractLockManager#acquireLock(io.seata.server.session.BranchSession, boolean, boolean) @Override public boolean acquireLock(BranchSession branchSession, boolean autoCommit, boolean skipCheckLock) throws TransactionException { if (branchSession == null) { throw new IllegalArgumentException("branchSession can't be null for memory/file locker."); } String lockKey = branchSession.getLockKey(); if (StringUtils.isNullOrEmpty(lockKey)) { // no lock return true; } // get locks of branch List<RowLock> locks = collectRowLocks(branchSession); if (CollectionUtils.isEmpty(locks)) { // no lock return true; } // 获取锁实现进行加锁 return getLocker(branchSession).acquireLock(locks, autoCommit, skipCheckLock); }
redis 的实现非常简单,db 和file,暂不做剖析。
@Override public boolean acquireLock(List<RowLock> rowLocks, boolean autoCommit, boolean skipCheckLock) { // rowLocks 是申请参数传递过去的 if (CollectionUtils.isEmpty(rowLocks)) { return true; } try (Jedis jedis = JedisPooledFactory.getJedisInstance()) { if (ACQUIRE_LOCK_SHA != null && autoCommit) { return acquireLockByLua(jedis, rowLocks); } else { return acquireLockByPipeline(jedis, rowLocks, autoCommit, skipCheckLock); } } }
分支事务注册后会执行本地事务本地事务提交,会有一个锁的概念,提交本地事务前会对数据加锁,加锁胜利能力提交。
全局事务提交
globalSession.asyncCommit(); 提交事务
全局事务回滚
若是第一阶段失败,间接删除分支事务即可,若是第二阶段,则告诉RM进行回滚。
总结
Seata Server 局部的逻辑比RM TM的逻辑简单不少,本文仅波及AT模式,接下来会对Saga TCC模式进行剖析。分布式事务自身就是一个分布式架构中产生的一个难题,功能设计时尽可能的防止产生分布式事务,若不能防止,能够借鉴Seata框架的事务实现,理解其中的运行原理,逐渐的斟酌高性能的分布式事务解决方案,对我的项目的稳定性和扩展性至关重要。