前言在 SOA、微服务架构流行的年代,许多复杂业务上需要支持多资源占用场景,而在分布式系统中因为某个资源不足而导致其它资源占用回滚的系统设计一直是个难点。我所在的团队也遇到了这个问题,为解决这个问题上,团队采用的是阿里开源的分布式中间件 Fescar 的解决方案,并详细了解了 Fescar 内部的工作原理,解决在使用 Fescar 中间件过程中的一些疑虑的地方,也为后续团队在继续使用该中间件奠定理论基础。
目前分布式事务解决方案基本是围绕两阶段提交模式来设计的,按对业务是有侵入分为:对业务无侵入的基于 XA 协议的方案,但需要数据库支持 XA 协议并且性能较低;对业务有侵入的方案包括:TCC 等。Fescar 就是基于两阶段提交模式设计的,以高效且对业务零侵入的方式,解决微服务场景下面临的分布式事务问题。Fescar 设计上将整体分成三个大模块,即 TM、RM、TC,具体解释如下:
TM(Transaction Manager):全局事务管理器,控制全局事务边界,负责全局事务开启、全局提交、全局回滚。RM(Resource Manager):资源管理器,控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。TC(Transaction Coordinator):事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。
本文将深入到 Fescar 的 RM 模块源码去介绍 Fescar 是如何在完成分支提交和回滚的基础上又做到零侵入,进而极大方便业务方进行业务系统开发。
一、从配置开始解读
上图是 Fescar 源码 examples 模块 dubbo-order-service.xml 内的配置,数据源采用 druid 的 DruidDataSource,但实际 jdbcTemplate 执行时并不是用该数据源,而用的是 Fescar 对 DruidDataSource 的代理 DataSourceProxy,所以,与 RM 相关的代码逻辑基本上都是从 DataSourceProxy 这个代理数据源开始的。
Fescar 采用 2PC 来完成分支事务的提交与回滚,具体怎么做到的呢,下面就分别介绍 Phase1、Phase2 具体做了些什么。
二、Phase1—分支(本地)事务执行 Fescar 将一个本地事务做为一个分布式事务分支,所以若干个分布在不同微服务中的本地事务共同组成了一个全局事务,结构如下。
那么,一个本地事务中 SQL 是如何执行呢?在 Spring 中,本质上都是从 jdbcTemplate 开始的,比如下面的 SQL 语句:
jdbcTemplate.update(“update storage_tbl set count = count – ? where commodity_code = ?”, new Object[] {count, commodityCode}); 一般 JdbcTemplate 执行流程如下图所示:
由于在配置中,JdbcTemplate 数据源被配置成了 Fescar 实现 DataSourceProxy,进而控制了后续的数据库连接使用的是 Fescar 提供的 ConnectionProxy,Statment 使用的是 Fescar 实现的 StatmentProxy,最终 Fescar 就顺理成章地实现了在本地事务执行前后增加所需要的逻辑,比如:完成分支事务的快照记录和分支事务执行状态的上报等等。
DataSourceProxy 获取 ConnectionProxy:
ConnectionProxy 获取 StatmentProxy:
在获取到 StatmentProxy 后,可以调用 excute 方法执行 sql 了
而真正 excute 实现逻辑如下:
首先会检查当前本地事务是否处于全局事务中,如果不处于,直接使用默认的 Statment 执行,避免因引入 Fescar 导致非全局事务中的 SQL 执行性能下降。解析 Sql,有缓存机制,因为有些 sql 解析会比较耗时,可能会导致在应用启动后刚开始的那段时间里处理全局事务中的 sql 执行效率降低。对于 INSERT、UPDATE、DELETE、SELECT..FOR UPDATE 这四种类型的 sql 会专门实现的 SQL 执行器进行处理,其它 SQL 直接是默认的 Statment 执行。返回执行结果,如有异常则直接抛给上层业务代码进行处理。再来看一下关键的 INSERT、UPDATE、DELETE、SELECT..FOR UPDATE 这四种类型的 sql 如何执行的,先看一下具体类图结构:
为结省篇幅,选择 UpdateExecutor 实现源码看一下,先看入口 BaseTransactionalExecutor.execute,该方法将 ConnectionProxy 与 Xid(事务 ID)进行绑定, 这样后续判断当前本地事务是否处理全局事务中只需要看 ConnectionProxy 中 Xid 是否为空。
然后,执行 AbstractDMLBaseExecutor 中实现的 doExecute 方法
基本逻辑如下:
先判断是否为 Auto-Commit 模式如果非 Auto-Commit 模式,则先查询 Update 前对应行记录的快照 beforeImage,再执行 Update 语句,完成后再查询 Update 后对应行记录的快照 afterImage,最后将 beforeImage、afterImage 生成 UndoLog 追加到 Connection 上下文 ConnectionContext 中。(注:获取 beforeImage、afterImage 方法在 UpdateExecutor 类下,一般是构造一条 Select…For Update 语句获取执行前后的行记录,同时会检查是否有全局锁冲突,具体可参考源码)如果是 Auto-Commit 模式,先将提交模式设置成非自动 Commit,再执行 2 中的逻辑,再执行 connectionProxy.commit() 方法,由于执行 2 过程和 commit 时都可能会出现全局锁冲突问题,增加了一个循环等待重试逻辑,最后将 connection 的模式设置成 Auto-Commit 模式如果本地事务执行过程中发生异常,业务上层会接收到该异常,至于是给 TM 模块返回成功还是失败,由业务上层实现决定,如果返回失败,则 TM 裁决对全局事务进行回滚;如果本地事务执行过程未发生异常,不管是非 Auto-Commit 还是 Auto-Commit 模式,最后都会调用 connectionProxy.commit() 对本地事务进行提交,在这里会创建分支事务、上报分支事务的状态以及将 UndoLog 持久化到 undo_log 表中,具体代码如下图:
基本逻辑:
判断当前本地事务是否处于全局事务中(也就判断 ConnectionContext 中的 xid 是否为空)。如果不处于全局事务中,则调用 targetConnection 对本地事务进行 commit。如果处于全局事务中,首先创建分支事务,再将 ConnectionContext 中的 UndoLog 写入到 undo_log 表中,然后调用 targetConnection 对本地事务进行 commit,将 UndoLog 与业务 SQL 一起提交,最后上报分支事务的状态(成功 or 失败),并将 ConnectionContext 上下文重置。综上所述,RM 模块通过对 JDBC 数据源进行代理,干预业务 SQL 执行过程,加入了很多流程,比如业务 SQL 解析、业务 SQL 执行前后的数据快照查询并组织成 UndoLog、全局锁检查、分支事务注册、UndoLog 写入并随本地事务一起 Commit、分支事务状态上报等。通过这种方式,Fescar 真正做到了对业务代码无侵入,只需要通过简单的配置,业务方就可以轻松享受 Fescar 所带来的功能。Phase1 整体流程引用 Fescar 官方图总结如下:
三、Phase2- 分支事务提交或回滚阶段 2 完成的是全局事物的最终提交或回滚,当全局事务中所有分支事务全部完成并且都执行成功,这时 TM 会发起全局事务提交,TC 收到全全局事务提交消息后,会通知各分支事务进行提交;同理,当全局事务中所有分支事务全部完成并且某个分支事务失败了,TM 会通知 TC 协调全局事务回滚,进而 TC 通知各分支事务进行回滚。
在业务应用启动过程中,由于引入了 Fescar 客户端,RmRpcClient 会随应用一起启动,该 RmRpcClient 采用 Netty 实现,可以接收 TC 消息和向 TC 发送消息,因此 RmRpcClient 是与 TC 收发消息的关键模块。
public class RMClientAT {
public static void init(String applicationId, String transactionServiceGroup) {
RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
AsyncWorker asyncWorker = new AsyncWorker();
asyncWorker.init();
DataSourceManager.init(asyncWorker);
rmRpcClient.setResourceManager(DataSourceManager.get());
rmRpcClient.setClientMessageListener(new RmMessageListener(new RMHandlerAT()));
rmRpcClient.init();
}
} 上述代码展示是的 RmRpcClient 初始化过程,有三个关键类 RMHandlerAT、AsyncWorker 和 DataSourceManager。RMHandlerAT 具有了分支提交和回滚两个方法,分支提交或回滚的逻辑可以从这里开始看;AsyncWorker 是一个异步 Worker, 主要是完成分支事务异步提交的功能,具有失败重试功能;DataSourceManager 对数据源管理和维护。
下面分成两部分来讲:分支事务提交、分去事务回滚。
3.1、分支事务提交在接收到 TC 发起的全局提交消息后,经 RmRpcClient 对通信协议的处理,再交由 RMHandlerAT 来完成对分支事务的提交,分支事务提交从 RMHandlerAT.doBranchCommit() 开始,但最后由 AsyncWorker 异步 Worker 完成,直接看 AsyncWorker 中的代码实现:
分支事务提交关键逻辑在 doBranchCommits 方法中:
该方法主要是批量删除 UndoLog 日志,但并未使用 ConnectionProxy 去执行删除 SQL,可能原因是:1、完全没必要 2、考虑效率优先
同样,对于分支事务提交也引用 Fescar 官方一张图来结尾:
3.2、分支事务回滚同样,分支事务回滚是从 RMHandlerAT.doBranchRollback 开始的,然后到了 dataSourceManager.branchRollback,最后完成分支事务回滚逻辑的是 UndoLogManager.undo 方法。
@Override
protected void RMHandlerAT:doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
LOGGER.info(“AT Branch rolling back: ” + xid + ” ” + branchId + ” ” + resourceId);
BranchStatus status = dataSourceManager.branchRollback(xid, branchId, resourceId, applicationData);
response.setBranchStatus(status);
LOGGER.info(“AT Branch rollback result: ” + status);
}
@Override
public BranchStatus DataSourceManager:branchRollback(String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
DataSourceProxy dataSourceProxy = get(resourceId);
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException();
}
try {
UndoLogManager.undo(dataSourceProxy, xid, branchId);
} catch (TransactionException te) {
if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
} else {
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}
return BranchStatus.PhaseTwo_Rollbacked;
}
UndoLogManager.undo 方法源码如下:
从上图可以看出,整个回滚到全局事务之前状态的代码逻辑集中在如下代码中:
AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);undoExecutor.executeOn(conn); 首先通过 UndoExecutorFactory 获取到对应的 UndoExecutor,然后再执行 UndoExecutor 的 executeOn 方法完成回滚操作。目前三种类型的 UndoExecutor 结构如下:
undoExecutor.executeOn 源码如下:
至此,整个分支事务回滚就结束了,分支事务回滚整体时序图如下:
引入 Fescar 官方对分支事务回滚原理介绍图作为结尾:
综合上述,Fescar 在 Phase2 通过 UndoLog 自动完成分支事务提交与回滚,在这个过程中不需要业务方做任何处理,业务方无感知,因些在该阶段对业务代码也是无侵入的。
四、总结本文主要介绍了 RM 模块的相关代码,将 RM 模块按 2PC 模式分成 Phase1 和 Phase2 分别进行介绍,从 Fescar 源码上看,整个源码结构清晰,有利于研发人员快速学习 Fescar 的原理。在使用方面,只需进行简单的配置,就可以享受 Fescar 带来的便捷功能,对业务做到了无侵入;同时在性能方面,Fescar 在分支事务提交过程中采用异步模式,减少了全局锁的占用时间,进而提升了整体性能。后续,将继续学习 Fescar 的其它模块(TM、TC)与全局锁的实现逻辑,并做相关总结介绍。
本文作者:中间件小哥
阅读原文
本文为云栖社区原创内容,未经允许不得转载。