TCC 开源项目源码学习(一)

44次阅读

共计 7001 个字符,预计需要花费 18 分钟才能阅读完成。

TCC 开源项目源码学习(一)
学习 TCC 分布式事务的知识是使用了 GIT 上的一个开源项目,之前有简单的看过一些,有了一个大概的了解,但是随着时间的‘清洗’,又开始变得‘浑浊不清’了,这次索性把这份源码从头看了下,又把流程推演了好几次,觉得已经懂了,想把理解的东西通过博客写出来。在这个过程中,再次梳理一遍,加深理解,同时也可以发现一些理解偏差和错误的地方。GIT: https://github.com/1755728288…
概要
在分布式系统中,为了保证执行指令的正确性,引入了事务的概念。一般意义上,事务主要突出的是 ACID,即原子性,一致性,隔离性和持久性。而在分布式事务中,最主要的原子性的保证,要保证一个业务操作在其分布式系统中的所有指令全部执行或不执行。因为各个指令的操作耗时以及顺序,所以原子性都对应了一定的时间窗口,比如单机系统下,这个时间窗口非常短,而且其原子性也由数据库事务来保证。而在分布式系统中,原子性就依赖于具体的应用设计了,主要的依据是业务上允许的时间窗口的长短。目前一般来说,若允许的时间窗口较短,就用 TCC,若允许的时间窗口较长,则使用最终一致性。最终一致性通常使用消息机制来设计,其核心是消息的安全送达与消费。
TCC 可以说是一种设计模式,将传统单机系统中的大事务进行拆分,通过小事务来拼装,并协调整体的事务提交和回滚。按字面意思理解,TCC 主要分 Try,Confirm,Cancel 三个阶段,Try 预留资源,Confirm 使用预留资源执行业务操作,Cancel 则是释放资源。貌似简单,但是在实际的业务场景中,往往会困惑我们在这三个阶段分别要做什么,这个是业务设计的一部分内容了,在此不展开叙述。不过要注意的一点时,任何一个阶段的结果都是可见的,比如一个库存子服务的入库方法,在 try 阶段就直接加到库存上去了,回滚的时候发现刚加上的库存已经有买家下单准备出库了,那就 GG 了。
代码结构
先放个图,这是项目的组件,上面绿色区域是框架的子模块,下面黄色的是样例模块。不要被黄色区域吓到,TCC 的代码不多,主要是 tcc-transaction-core.

组件名
说明

tcc-transaction-core
【重要】核心代码,主要的切面和 job

tcc-transaction-api
注解和常量

tcc-transaction-spring
spring 使用的扩展

tcc-transaction-dubbo
dubbo 使用时的扩展

tcc-transaction-server
tcc 事务管理控制台

tcc-transaction-unit-test
单元测试

tcc-transaction-tutorial-sample
订单场景示例工程,分 dubbo 和 spring 版本

所以重点只有一个模块, 就是 tcc-transaction-core,代码量不多,主要分成下面的模块

模块名
说明

interceptor
【重要】2 个切面,一个是根据事务状态进行 tcc 阶段方法的选择,一个是组织事务的参与者,用 XID 将各个参与者绑起来,以便事务的提交和回滚

recover
恢复在同步调用失败事务的定时处理

repository
事务对象持久 DAO 对象,提供了多种选择:缓存,文件系统,jdbc,zookeeper

context
传递事务上下文的方法类,一般会在远程方法调用的切面中,将上下文加入到参数列表中

common
方法枚举和事务枚举

serializer
事务对象的序列化器,使用了 kyto 序列化工具

support
实现了工厂类,管理 TCC 的类的实例化

utils
工具类

最重要的是 interceptor,是主要业务逻辑所在。
题外话,有人初看了一遍 TCC,拿来和数据库事务来比较,会说‘TCC 不就是业务补偿么,没什么难点啊’,貌似有道理,其实不然。还记得数据库事务的 ACID 吗?因为数据库提供了事务特性,ACID 由数据库保证,应用只需要一个简单的 @Transactional 注解就都搞定了。而分布式事务,就把 ACID 的特性从数据库里面拿了出来,由应用程序来保证。当然 ACID 也有了和之前不一样的含义。

特性
数据库
分布式事务

A[Atomicity]
所有数据库更新一起提交和回滚,数据库通过日志来实现
通过协调各个事务的参与方,通过框架来处理异常

C[Consistncy]
通过数据库约束来实现,比如非空,唯一,外键等

I[Isolation]
各个数据库实现方式不一样, 主要分读已提交,读未提交等
TCC 通过 try 阶段锁定资源来实现隔离,即业务资源的隔离

D[Durability]
数据库实现
数据库实现

工作流程
以项目中的例子来说明整个 TCC 的处理流程,这个例子是类似电商下单的场景,在支付的时候可以选择红包或本金,比如一个笔记本 4000,可以选择使用 3000 的本金和 1000 的红包来支付,有三个服务:订单服务(order),本金服务(capital)和红包服务(redPacket)。主调方是 Order,被调方 Capital 和 RedPacket.
下面是绞尽脑汁画的调用流程图,为了图的简便直观,省去了 cancel 的处理,只有 try 和 confirm 的调用(cancel 的处理和 confirm 基本一致)。红线是 try 的调用,蓝线是 confirm 的调用。粗红线是开始,粗蓝线是结束。在 try 阶段的 makePayment, 方法调用了两个子事务的 api 方法,其他的方法的调用均是全局事务切面决定的。业务方法只要按 TCC 框架的要求实现即可,其他的交给 TCC 框架。
主要代码分析
我们从 TCC 事务注解开始,先介绍几个基本的类。
/**
属性主要是事务传播属性,提交方法,回滚方法,上下文传递类,是否异步提交,是否异步回滚
propagation 属性比较重要,值有 required,support,mandatory 和 requireNEW,具体的含义和 spring 的事务传播类似。
**/
public @interface Compensable {
public Propagation propagation() default Propagation.REQUIRED;
public String confirmMethod() default “”;
public String cancelMethod() default “”;
public Class<? extends TransactionContextEditor> transactionContextEditor() default DefaultTransactionContextEditor.class;
public boolean asyncConfirm() default false;
public boolean asyncCancel() default false;

/**
事务状态,在 TCC 的不通阶段进行事务状态的变更。
**/
public enum TransactionStatus {
TRYING(1), CONFIRMING(2), CANCELLING(3);
/**
ROOT: 事务启动方法,比如例子中 order 的 makePayment 方法
PROVIDER: 事务协同方法,比如例子中 redpacket 的 record 方法
Normal: 普通方法,比如 API 里面的 record 方法
**/
public enum MethodType {
ROOT, PROVIDER, NORMAL;
}
/**
ROOT: 主事务,一般 MethodType 为 ROOT 的启动
BRANCH: 分支事务
**/
public enum TransactionType {
ROOT(1),
BRANCH(2);
下面我们看一下事务持久的内容,能更好的帮助理解。每个服务都会有一张事务表,会记录所有的 ROOT 和 BRANCH 事务,在事务完成之后,会自动删除。

TRANSACTION_ID
DOMAIN
GLOBAL_TX_ID
BRANCH_QUALIFIER
CONTENT
STATUS
TRANSACTION_TYPE
RETRIED_COUNT
VERSION

2
ORDER
事务编号 001
分支标识 A
事务对象 A
2
1
0
11

2
CAPITAL
事务编号 001
分支标识 B
事务对象 B
2
2
0
11

2
REDPACKET
事务编号 001
分支标识 C
事务对象 C
2
2
0
11

domain 区分了服务 global_tx_id 串起了所有的事务 transaction_type 1 为主事务,2 为分支事务 content 用于恢复事务,主事务的 content 中包含了参与者
代码部分的最后是两个切面和恢复的 job,不进行特别的解释了,将我的理解放在代码的注释里面。
//CompensableTransactionInterceptor.java
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
// 获取所执行事务方法的设置属性
Method method = CompensableMethodUtils.getCompensableMethod(pjp);
Compensable compensable = method.getAnnotation(Compensable.class);
Propagation propagation = compensable.propagation();
TransactionContext transactionContext = FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs());
boolean asyncConfirm = compensable.asyncConfirm();
boolean asyncCancel = compensable.asyncCancel();

// 当前是否存在事务,
boolean isTransactionActive = transactionManager.isTransactionActive();

/**
* 判断方法类型
* MethodType 一共有三种,Root,Provider,Normal. 主要通过 propagation 和 isTransactionActive 来确定
*/
MethodType methodType = CompensableMethodUtils.calculateMethodType(propagation, isTransactionActive, transactionContext);
switch (methodType) {
case ROOT:
//ROOT:(1)propagation 为 require_new (2)propataion 为 require,且之前没有有事务存在
// 主事务处理方法
return rootMethodProceed(pjp, asyncConfirm, asyncCancel);
case PROVIDER:
//PROVIDER:(1) propation 为 require 或者 mandatory, 且之前有事务
// 从事务处理方法
return providerMethodProceed(pjp, transactionContext, asyncConfirm, asyncCancel);
default:
// 普通方法执行
return pjp.proceed();
}
}

private Object rootMethodProceed(ProceedingJoinPoint pjp, boolean asyncConfirm, boolean asyncCancel) throws Throwable {
Object returnValue = null;
Transaction transaction = null;
try {
// 开启新的主事务,使用新生成的 xid,状态为 trying,事务类型为 ROOT
transaction = transactionManager.begin();
try {
returnValue = pjp.proceed();
} catch (Throwable tryingException) {
if (!isDelayCancelException(tryingException)) {
logger.warn(String.format(“compensable transaction trying failed. transaction content:%s”, JSON.toJSONString(transaction)), tryingException);
// 异常回滚,将事务状态更新为 CANCELLING
transactionManager.rollback(asyncCancel);
}
throw tryingException;
}
// 执行 confirm 方法,将事务状态更新为 CONFIRMING。如果是异步,则 TM 会使用线程池异步执行,否则直接调用,会协调所有的参与者进行提交。并将事务记录删除
transactionManager.commit(asyncConfirm);
} finally {
// 清理事务数据
transactionManager.cleanAfterCompletion(transaction);
}
return returnValue;
}

private Object providerMethodProceed(ProceedingJoinPoint pjp, TransactionContext transactionContext, boolean asyncConfirm, boolean asyncCancel) throws Throwable {
Transaction transaction = null;
try {
switch (TransactionStatus.valueOf(transactionContext.getStatus())) {
case TRYING:
// 开启一个子事务,并调用 TCC 的 try 方法
transaction = transactionManager.propagationNewBegin(transactionContext);
return pjp.proceed();
case CONFIRMING:
// 获取子事务 TRY 阶段的事务,并调用 TCC 的 commit 方法
try {
transaction = transactionManager.propagationExistBegin(transactionContext);
transactionManager.commit(asyncConfirm);
} catch (NoExistedTransactionException excepton) {
//the transaction has been commit,ignore it.
}
break;
case CANCELLING:
// 获取子事务保存的事务数据,执行 cancel 方法

try {
transaction = transactionManager.propagationExistBegin(transactionContext);
transactionManager.rollback(asyncCancel);
} catch (NoExistedTransactionException exception) {
//the transaction has been rollback,ignore it.
}
break;
}

} finally {
// 清理事务数据
transactionManager.cleanAfterCompletion(transaction);
}
Method method = ((MethodSignature) (pjp.getSignature())).getMethod();

return ReflectionUtils.getNullValue(method.getReturnType());
}

private boolean isDelayCancelException(Throwable throwable) {

if (delayCancelExceptions != null) {
for (Class delayCancelException : delayCancelExceptions) {

Throwable rootCause = ExceptionUtils.getRootCause(throwable);

if (delayCancelException.isAssignableFrom(throwable.getClass())
|| (rootCause != null && delayCancelException.isAssignableFrom(rootCause.getClass()))) {
return true;
}
}
}

return false;
}

总结
代码分析的比较少,这份代码还是有很多值得称道的地方,工厂类,缓存,模板方法等设计模式的使用,下次可以从设计模式的角度来进行分析。除了 TCC,最近项目中还涉及了安全消息,等弄清楚了再来一发。

正文完
 0