前言
上一篇文章(https://segmentfault.com/a/11…)我们在 springboot2.1.3 上集成了 lcn5.0.2 并简单做了一个 lcn 模式的 demo。LCN 官网将源码都给了出来,但是分析源码的部分目前还不是很多,这篇文章主要分析一下 LCN 模式源码
事务控制原理
分析源码之前,我们首先看一下 LCN 整体的框架模型:
TX-LCN 由两大模块组成, TxClient、TxManager。TxClient 作为模块的依赖框架,提供 TX-LCN 的标准支持,TxManager 作为分布式事务的控制放。事务发起方或者参与反都由 TxClient 端来控制。
原理图:
lcn 模式
不难发现,开启处理的地方在拦截器(com.codingapi.txlcn.tc.aspect.TransactionAspect)里面
@Around("lcnTransactionPointcut() && !txcTransactionPointcut()" +
"&& !tccTransactionPointcut() && !txTransactionPointcut()")
public Object runWithLcnTransaction(ProceedingJoinPoint point) throws Throwable {
// 将执行分布式事务的方法放在 DTXInfo 对象里面
DTXInfo dtxInfo = DTXInfo.getFromCache(point);
LcnTransaction lcnTransaction = dtxInfo.getBusinessMethod().getAnnotation(LcnTransaction.class);
dtxInfo.setTransactionType(Transactions.LCN);
dtxInfo.setTransactionPropagation(lcnTransaction.propagation());
// 调用方法,正式开启 (或继续,这里取决于是否是事务发起方) 分布式事务
return dtxLogicWeaver.runTransaction(dtxInfo, point::proceed);
}
走进 runTransaction 方法,我们可以看到一下内容(伪代码,方便分析)
public class DTXLogicWeaver {
// 执行分布式事务的核心方法
public Object runTransaction(){
//1. 拿到当前模块的事务上下文和全局事务上下文
DTXLocalContext dtxLocalContext = DTXLocalContext.getOrNew();
TxContext txContext;
// ---------- 保证每个模块在一个 DTX 下只会有一个 TxContext ----------
if (globalContext.hasTxContext()) {
// 有事务上下文的获取父上下文
txContext = globalContext.txContext();
dtxLocalContext.setInGroup(true);// 加入事务组
log.debug("Unit[{}] used parent's TxContext[{}].", dtxInfo.getUnitId(), txContext.getGroupId());
} else {
// 没有的开启本地事务上下文
txContext = globalContext.startTx();// 下层创建了事务组}
//2. 设置本地事务上下文的一些参数
if (Objects.nonNull(dtxLocalContext.getGroupId())) {dtxLocalContext.setDestroy(false);
}
dtxLocalContext.setUnitId(dtxInfo.getUnitId());
dtxLocalContext.setGroupId(txContext.getGroupId());// 从全局上下文获取
dtxLocalContext.setTransactionType(dtxInfo.getTransactionType());
//3. 设置分布式事务参数
TxTransactionInfo info = new TxTransactionInfo();
info.setBusinessCallback(business);// 业务执行器(核心)
info.setGroupId(txContext.getGroupId());// 从全局上下文获取
info.setUnitId(dtxInfo.getUnitId());
info.setPointMethod(dtxInfo.getBusinessMethod());
info.setPropagation(dtxInfo.getTransactionPropagation());
info.setTransactionInfo(dtxInfo.getTransactionInfo());
info.setTransactionType(dtxInfo.getTransactionType());
info.setTransactionStart(txContext.isDtxStart());
//4.LCN 事务处理器
try {return transactionServiceExecutor.transactionRunning(info);
} finally {
// 线程执行业务完毕清理本地数据
if (dtxLocalContext.isDestroy()) {
// 通知事务执行完毕
synchronized (txContext.getLock()) {txContext.getLock().notifyAll();}
// TxContext 生命周期是?和事务组一样(不与具体模块相关的)if (!dtxLocalContext.isInGroup()) {globalContext.destroyTx();
}
DTXLocalContext.makeNeverAppeared();
TracingContext.tracing().destroy();
}
log.debug("<---- TxLcn end ---->");
}
}
}
执行业务操作
/**
* 事务业务执行
*
* @param info info
* @return Object
* @throws Throwable Throwable
*/
public Object transactionRunning(TxTransactionInfo info) throws Throwable {
// 1. 获取事务类型
String transactionType = info.getTransactionType();
// 2. 获取事务传播状态
DTXPropagationState propagationState = propagationResolver.resolvePropagationState(info);
// 2.1 如果不参与分布式事务立即终止
if (propagationState.isIgnored()) {return info.getBusinessCallback().call();}
// 3. 获取本地分布式事务控制器
DTXLocalControl dtxLocalControl = txLcnBeanHelper.loadDTXLocalControl(transactionType, propagationState);
// 4. 织入事务操作
try {
// 4.1 记录事务类型到事务上下文
Set<String> transactionTypeSet = globalContext.txContext(info.getGroupId()).getTransactionTypes();
transactionTypeSet.add(transactionType);
dtxLocalControl.preBusinessCode(info);
// 4.2 业务执行前
txLogger.txTrace(info.getGroupId(), info.getUnitId(), "pre business code, unit type: {}", transactionType);
// 4.3 执行业务
Object result = dtxLocalControl.doBusinessCode(info);
// 4.4 业务执行成功
txLogger.txTrace(info.getGroupId(), info.getUnitId(), "business success");
dtxLocalControl.onBusinessCodeSuccess(info, result);
return result;
} catch (TransactionException e) {txLogger.error(info.getGroupId(), info.getUnitId(), "before business code error");
throw e;
} catch (Throwable e) {
// 4.5 业务执行失败
txLogger.error(info.getGroupId(), info.getUnitId(), Transactions.TAG_TRANSACTION,
"business code error");
dtxLocalControl.onBusinessCodeError(info, e);
throw e;
} finally {
// 4.6 业务执行完毕
dtxLocalControl.postBusinessCode(info);
}
}