LCN502-lcn模式源码分析二

20次阅读

共计 3974 个字符,预计需要花费 10 分钟才能阅读完成。

前言

上一篇文章(https://segmentfault.com/a/11…)我们在 springboot2.1.3 上集成了 lcn5.0.2 并简单做了一个 lcn 模式的 demo。LCN 官网将源码都给了出来,但是分析源码的部分目前还不是很多,这篇文章主要分析一下 LCN 模式源码

事务控制原理

分析源码之前,我们首先看一下 LCN 整体的框架模型:
TX-LCN 由两大模块组成, TxClient、TxManager。TxClient 作为模块的依赖框架,提供 TX-LCN 的标准支持,TxManager 作为分布式事务的控制放。事务发起方或者参与反都由 TxClient 端来控制。

原理图:

lcn 模式

不难发现,开启处理的地方在拦截器(com.codingapi.txlcn.tc.aspect.TransactionAspect)里面

@Around("lcnTransactionPointcut() && !txcTransactionPointcut()" +
            "&& !tccTransactionPointcut() && !txTransactionPointcut()")
    public Object runWithLcnTransaction(ProceedingJoinPoint point) throws Throwable {
        // 将执行分布式事务的方法放在 DTXInfo 对象里面
        DTXInfo dtxInfo = DTXInfo.getFromCache(point);
        LcnTransaction lcnTransaction = dtxInfo.getBusinessMethod().getAnnotation(LcnTransaction.class);
        dtxInfo.setTransactionType(Transactions.LCN);
        dtxInfo.setTransactionPropagation(lcnTransaction.propagation());
        
        // 调用方法,正式开启 (或继续,这里取决于是否是事务发起方) 分布式事务
        return dtxLogicWeaver.runTransaction(dtxInfo, point::proceed);
    }

走进 runTransaction 方法,我们可以看到一下内容(伪代码,方便分析)

public class DTXLogicWeaver {
    
    // 执行分布式事务的核心方法
    public Object runTransaction(){
    
        //1. 拿到当前模块的事务上下文和全局事务上下文
        DTXLocalContext dtxLocalContext = DTXLocalContext.getOrNew();
        TxContext txContext;
        // ---------- 保证每个模块在一个 DTX 下只会有一个 TxContext ---------- 
        if (globalContext.hasTxContext()) {
            // 有事务上下文的获取父上下文
            txContext = globalContext.txContext();
            dtxLocalContext.setInGroup(true);// 加入事务组
            log.debug("Unit[{}] used parent's TxContext[{}].", dtxInfo.getUnitId(), txContext.getGroupId());
        } else {
            // 没有的开启本地事务上下文
            txContext = globalContext.startTx();// 下层创建了事务组}
        
        //2. 设置本地事务上下文的一些参数
        if (Objects.nonNull(dtxLocalContext.getGroupId())) {dtxLocalContext.setDestroy(false);
        }
        dtxLocalContext.setUnitId(dtxInfo.getUnitId());
        dtxLocalContext.setGroupId(txContext.getGroupId());// 从全局上下文获取
        dtxLocalContext.setTransactionType(dtxInfo.getTransactionType());
        
       
        //3. 设置分布式事务参数
        TxTransactionInfo info = new TxTransactionInfo();
        info.setBusinessCallback(business);// 业务执行器(核心)
        info.setGroupId(txContext.getGroupId());// 从全局上下文获取
        info.setUnitId(dtxInfo.getUnitId());
        info.setPointMethod(dtxInfo.getBusinessMethod());
        info.setPropagation(dtxInfo.getTransactionPropagation());
        info.setTransactionInfo(dtxInfo.getTransactionInfo());
        info.setTransactionType(dtxInfo.getTransactionType());
        info.setTransactionStart(txContext.isDtxStart());
        
        
        
        //4.LCN 事务处理器
        try {return transactionServiceExecutor.transactionRunning(info);
        } finally {
            // 线程执行业务完毕清理本地数据
            if (dtxLocalContext.isDestroy()) {
                // 通知事务执行完毕 
                synchronized (txContext.getLock()) {txContext.getLock().notifyAll();}

                // TxContext 生命周期是?和事务组一样(不与具体模块相关的)if (!dtxLocalContext.isInGroup()) {globalContext.destroyTx();
                }

                DTXLocalContext.makeNeverAppeared();
                TracingContext.tracing().destroy();
            }
            log.debug("<---- TxLcn end ---->");
        }      
    } 
}

执行业务操作


 /**
     * 事务业务执行
     *
     * @param info info
     * @return Object
     * @throws Throwable Throwable
     */
    public Object transactionRunning(TxTransactionInfo info) throws Throwable {

        // 1. 获取事务类型
        String transactionType = info.getTransactionType();

        // 2. 获取事务传播状态
        DTXPropagationState propagationState = propagationResolver.resolvePropagationState(info);

        // 2.1 如果不参与分布式事务立即终止
        if (propagationState.isIgnored()) {return info.getBusinessCallback().call();}

        // 3. 获取本地分布式事务控制器
        DTXLocalControl dtxLocalControl = txLcnBeanHelper.loadDTXLocalControl(transactionType, propagationState);

        // 4. 织入事务操作
        try {
            // 4.1 记录事务类型到事务上下文
            Set<String> transactionTypeSet = globalContext.txContext(info.getGroupId()).getTransactionTypes();
            transactionTypeSet.add(transactionType);

            dtxLocalControl.preBusinessCode(info);

            // 4.2 业务执行前
            txLogger.txTrace(info.getGroupId(), info.getUnitId(), "pre business code, unit type: {}", transactionType);

            // 4.3 执行业务
            Object result = dtxLocalControl.doBusinessCode(info);

            // 4.4 业务执行成功
            txLogger.txTrace(info.getGroupId(), info.getUnitId(), "business success");
            dtxLocalControl.onBusinessCodeSuccess(info, result);
            return result;
        } catch (TransactionException e) {txLogger.error(info.getGroupId(), info.getUnitId(), "before business code error");
            throw e;
        } catch (Throwable e) {
            // 4.5 业务执行失败
            txLogger.error(info.getGroupId(), info.getUnitId(), Transactions.TAG_TRANSACTION,
                    "business code error");
            dtxLocalControl.onBusinessCodeError(info, e);
            throw e;
        } finally {
            // 4.6 业务执行完毕
            dtxLocalControl.postBusinessCode(info);
        }
    }        
    

正文完
 0