关于java:最终一致性事务解决方案

113次阅读

共计 10224 个字符,预计需要花费 26 分钟才能阅读完成。

单体秒杀服务转 dubbo 框架 + 分布式事务实现

计划和技术架构

计划:秒杀计划(

)+ 分布式事务解决方案(为了让领取、扣减库存和订单状态一致性解决方案,见下图)

目标:dubbo 微服务化 实现订单领取分布式事务,对立提交和回滚

技术架构:

docker + nacos 架构(舍去 zookeeper,nacos 更香)

docker 版本 2.7

springboot 版本 2.6.1

分布式事务(tcc -transaction 1.73)

github 地址:https://github.com/luozijing/…

我的项目体验地址:http://81.69.254.72:9082/logi…

tcc-transaction

TCC(Try Confirm Cancel)计划是一种利用层面侵入业务的两阶段提交。是目前最火的一种柔性事务计划,其核心思想是:针对每个操作,都要注册一个与其对应的确认和弥补(撤销)操作

TCC 分为两个阶段,别离如下:

  • 第一阶段:Try(尝试),次要是对业务零碎做检测及资源预留 (加锁,预留资源)
  • 第二阶段:本阶段依据第一阶段的后果,决定是执行 confirm 还是 cancel
    1. Confirm(确认):执行真正的业务(执行业务,开释锁)
    2. Cancle(勾销):是预留资源的勾销(出问题,开释锁)

最终一致性保障,

  • TCC 事务机制以初步操作(Try)为核心的,确认操作(Confirm)和勾销操作(Cancel)都是围绕初步操作(Try)而开展。因而,Try 阶段中的操作,其保障性是最好的,即便失败,依然有勾销操作(Cancel)能够将其执行后果撤销。
  • Try 阶段执行胜利并开始执行 Confirm阶段时,默认 Confirm阶段是不会出错的。也就是说只有 Try 胜利,Confirm肯定胜利(TCC 设计之初的定义)。
  • Confirm 与 Cancel 如果失败,由 TCC 框架进行 == 重试 == 弥补
  • 存在极低概率在 CC 环节彻底失败,则须要定时工作或人工染指、

TCC 事务机制绝对于传统事务机制,有以下长处:

  • 性能晋升:具体业务来实现管制资源锁的粒度变小,不会锁定整个资源。
  • 数据最终一致性:基于 Confirm 和 Cancel 的幂等性,保障事务最终实现确认或者勾销,保证数据的一致性。
  • 可靠性:解决了 XA 协定的协调者单点故障问题,由主业务方发动并管制整个业务流动,业务流动管理器也变成多点,引入集群。

毛病:

  • TCC 的 Try、Confirm 和 Cancel 操作性能要按具体业务来实现,业务耦合度较高,进步了开发成本。

dubbo+TCC 实现

样例实现了领取、更新订单和库存的程序一致性。联合 TCC-Transaction 的代码和业务代码,分下下要害流程。

tcc 源码剖析

更具体的流程能够看这

https://blog.csdn.net/tianjin…

启动类通过 @EnableTccTransaction 注解,真正起作用的是上面的 import,在同一个上下文注入相干的类,包含 transactionRepository、springBeanFactory、recoveryLock、TransactionManager 等类,TransactionManager 在整个 tcc 事务中是一个事务管理类,作用包含 begin、commit、rollback 等。

@Import(TccTransactionConfigurationSelector.class)

除此之外,还注入上述两个切面,切面 CompensableTransactionInterceptor 的切点为 @Pointcut(“@annotation(org.mengyun.tcctransaction.api.Compensable)”),作用赋予事务角色和初始化状态。分事务角色和事务以后状态解决分布式事务。

当角色使 root,即根事务,进入 rootMethodProceed 处理事务上下文(分布式上下文,微服务),来判断本身事务进入 confirm、cancel 还是 try 分支。

当角色使 provider,即分支事务,进入 providerMethodProceed 处理事务上下文。

若没有事务角色,则间接执行切面的代理办法。

public Object interceptCompensableMethod(TransactionMethodJoinPoint pjp) throws Throwable {Transaction transaction = transactionManager.getCurrentTransaction();
        CompensableMethodContext compensableMethodContext = new CompensableMethodContext(pjp, transaction);

        // if method is @Compensable and no transaction context and no transaction, then root
        // else if method is @Compensable and has transaction context and no transaction ,then provider
        switch (compensableMethodContext.getParticipantRole()) {
            case ROOT:
                return rootMethodProceed(compensableMethodContext);
            case PROVIDER:
                return providerMethodProceed(compensableMethodContext);
            default:
                return compensableMethodContext.proceed();}
    }

rootMethodProceed 办法,很明确,由事务管理器创立事务 transaction,而后执行服务调用,try catch 住调用的服务的状态,若异样则,进行回滚解决执行 cancel 办法,若失常执行提交,执行 confirm 办法。

private Object rootMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable {

        Object returnValue = null;

        Transaction transaction = null;

        boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm();

        boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel();

        try {transaction = transactionManager.begin(compensableMethodContext.getUniqueIdentity());

            try {returnValue = compensableMethodContext.proceed();
            } catch (Throwable tryingException) {transactionManager.rollback(asyncCancel);

                throw tryingException;
            }

            transactionManager.commit(asyncConfirm);

        } finally {transactionManager.cleanAfterCompletion(transaction);
        }

        return returnValue;
    }


providerMethodProceed 办法当上游事务的状态是 try 或者是 confirm 是别离执行 try 和 commit,当是 cancel 时,会通过 Transaction transaction = transactionRepository.findByXid(transactionContext.getXid()); 先去找之前的事务,因为执行 cancel 阐明大概率之前事务时存在的,因而有了前面的判断,判断之前事务时 try 胜利、try 失败或者勾销,都有可能须要回滚,另外其余状况就时走定时回复工作去回滚。

private Object providerMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable {

        Transaction transaction = null;

        boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm();

        boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel();

        try {switch (TransactionStatus.valueOf(compensableMethodContext.getTransactionContext().getStatus())) {
                case TRYING:
                    transaction = transactionManager.propagationNewBegin(compensableMethodContext.getTransactionContext());

                    Object result = null;

                    try {result = compensableMethodContext.proceed();

                        //TODO: need tuning here, async change the status to tuning the invoke chain performance
                        //transactionManager.changeStatus(TransactionStatus.TRY_SUCCESS, asyncSave);
                        transactionManager.changeStatus(TransactionStatus.TRY_SUCCESS, true);
                    } catch (Throwable e) {transactionManager.changeStatus(TransactionStatus.TRY_FAILED);
                        throw e;
                    }

                    return result;

                case CONFIRMING:
                    try {transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext());
                        transactionManager.commit(asyncConfirm);
                    } catch (NoExistedTransactionException excepton) {
                        //the transaction has been commit,ignore it.
                        logger.info("no existed transaction found at CONFIRMING stage, will ignore and confirm automatically. transaction:" + JSON.toJSONString(transaction));
                    }
                    break;
                case CANCELLING:

                    try {

                        //The transaction' status of this branch transaction, passed from consumer side.
                        int transactionStatusFromConsumer = compensableMethodContext.getTransactionContext().getParticipantStatus();

                        transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext());

                        // Only if transaction's status is at TRY_SUCCESS、TRY_FAILED、CANCELLING stage we can call rollback.
                        // If transactionStatusFromConsumer is TRY_SUCCESS, no mate current transaction is TRYING or not, also can rollback.
                        // transaction's status is TRYING while transactionStatusFromConsumer is TRY_SUCCESS may happen when transaction's changeStatus is async.
                        if (transaction.getStatus().equals(TransactionStatus.TRY_SUCCESS)
                                || transaction.getStatus().equals(TransactionStatus.TRY_FAILED)
                                || transaction.getStatus().equals(TransactionStatus.CANCELLING)
                                || transactionStatusFromConsumer == ParticipantStatus.TRY_SUCCESS.getId()) {transactionManager.rollback(asyncCancel);
                        } else {
                            //in this case, transaction's Status is TRYING and transactionStatusFromConsumer is TRY_FAILED
                            // this may happen if timeout exception throws during rpc call.
                            throw new IllegalTransactionStatusException("Branch transaction status is TRYING, cannot rollback directly, waiting for recovery job to rollback.");
                        }

                    } catch (NoExistedTransactionException exception) {
                        //the transaction has been rollback,ignore it.
                        logger.info("no existed transaction found at CANCELLING stage, will ignore and cancel automatically. transaction:" + JSON.toJSONString(transaction));
                    }
                    break;
            }

        } finally {transactionManager.cleanAfterCompletion(transaction);
        }

        Method method = compensableMethodContext.getMethod();

        return ReflectionUtils.getNullValue(method.getReturnType());
    }

另外一个切面是 ConfigurableCoordinatorAspect,这个切面的作用是制作参加事务,参加事务的作用的是当 Commit 时所有的参加事务一起提交,回滚时,所有的参加事务一起回滚,来管制分布式服务。

public Object interceptTransactionContextMethod(TransactionMethodJoinPoint pjp) throws Throwable {Transaction transaction = transactionManager.getCurrentTransaction();

        if (transaction != null && transaction.getStatus().equals(TransactionStatus.TRYING)) {Participant participant = enlistParticipant(pjp);

            if (participant != null) {

                Object result = null;
                try {result = pjp.proceed(pjp.getArgs());
                    participant.setStatus(ParticipantStatus.TRY_SUCCESS);
                } catch (Throwable e) {participant.setStatus(ParticipantStatus.TRY_FAILED);

                    //if root transaction, here no need persistent transaction
                    // because following stage is rollback, transaction's status is changed to CANCELING and save
//                    transactionManager.update(participant);
//
                    throw e;
                }


                return result;
            }
        }

        return pjp.proceed(pjp.getArgs());
    }

业务实现

业务要实现订单、库存和领取的一致性事务,首先起一个根事务,该服务负责给前端返回后果,上面有两个子事务,一个是领取,另外一个是扣减库存和订单。

 @Compensable(confirmMethod = "confirmMakePayment", cancelMethod = "cancelMakePayment", asyncConfirm = false)
    public  void makePayment(MiaoShaUser user, PaymentVo paymentVo) {log.info("start tcc transaction try: {}", JSONObject.toJSONString(paymentVo));
        // 领取
        miaoShaUserService.pay(user, paymentVo);
        // 扣减库存和更新订单
        miaoshaService.completeOrder(user, paymentVo.getOrderId());

    }

    public void confirmMakePayment(MiaoShaUser user, PaymentVo paymentVo) {log.info("start tcc transaction confirm: {}", JSONObject.toJSONString(paymentVo));

        //check if the trade order status is PAYING, if no, means another call confirmMakePayment happened, return directly, ensure idempotency.
    }

    public void cancelMakePayment(MiaoShaUser user, PaymentVo paymentVo) {log.info("start tcc transaction cancel: {}", JSONObject.toJSONString(paymentVo));

    }

以领取为例子,领取的办法都做了幂等管制管制,通过领取记录和领取记录的状态来管制。

还须要特地留神的中央,避免事务悬挂,tcc-transaction 默认反对悬挂,利用定时工作来回复悬挂事务。见上面源码

  @Override
    @Compensable(confirmMethod = "confirmPay", cancelMethod = "cancelPay", transactionContextEditor = DubboTransactionContextEditor.class)
    @Transactional
    /**
     * 领取订单 预留扣款资源
     */
    public ResultGeekQ<MiaoShaUserVo> pay(MiaoShaUser user, PaymentVo paymentVo) {ResultGeekQ<MiaoShaUserVo> resultGeekQ  = ResultGeekQ.build();
        log.info("start tcc pay try, user:{}, paymentVo:{}", user, paymentVo);
        MiaoshaPayment miaoshaPaymentDb = miaoshaPaymentDao.selectByOrderID(paymentVo.getOrderId());
        MiaoshaUserAccount miaoshaUserAccountDb = miaoshaUserAccountDao.selectByUserID(user.getId());
        // 金额是否足够用
        if (miaoshaUserAccountDb.getBalanceAmount() == null
                || miaoshaUserAccountDb.getBalanceAmount().compareTo(paymentVo.getPayAmount()) < 0) {throw new AccountException("领取金额有余");
        }
        // 判断领取记录是否存在,try 具备重试机制,须要幂等性
        if (miaoshaPaymentDb == null) {
            // 账户欲扣款
            MiaoshaUserAccount miaoshaUserAccount = new MiaoshaUserAccount();
            miaoshaUserAccount.setUserId(user.getId());
            miaoshaUserAccount.setTransferAmount(paymentVo.getPayAmount());
            miaoshaUserAccountDao.updateByUserID(miaoshaUserAccount);
            // 插入欲扣款记录
            MiaoshaPayment miaoshaPayment = new MiaoshaPayment();
            miaoshaPayment.setAmount(paymentVo.getPayAmount());
            miaoshaPayment.setMiaoshaOrderId(paymentVo.getOrderId());
            miaoshaPayment.setUserId(user.getId());
            miaoshaPayment.setCreateTime(new Date());
            miaoshaPayment.setUpdateTime(new Date());
            miaoshaPayment.setStatus(Constants.PAY_DEALING);
            miaoshaPayment.setVersion(1);
            miaoshaPaymentDao.insertSelective(miaoshaPayment);
        }

        return resultGeekQ;
    }
//transaction type is BRANCH
                switch (transaction.getStatus()) {
                    case CONFIRMING:
                        commitTransaction(transactionRepository, transaction);
                        break;
                    case CANCELLING:
                    case TRY_FAILED:
                        rollbackTransaction(transactionRepository, transaction);
                        break;
                    case TRY_SUCCESS:

                        if(transactionRepository.getRootDomain() == null) {break;}

                        //check the root transaction
                        Transaction rootTransaction = transactionRepository.findByRootXid(transaction.getRootXid());

                        if (rootTransaction == null) {
                            // In this case means the root transaction is already rollback.
                            // Need cancel this branch transaction.
                            rollbackTransaction(transactionRepository, transaction);
                        } else {switch (rootTransaction.getStatus()) {
                                case CONFIRMING:
                                    commitTransaction(transactionRepository, transaction);
                                    break;
                                case CANCELLING:
                                    rollbackTransaction(transactionRepository, transaction);
                                    break;
                                default:
                                    break;
                            }
                        }
                        break;
                    default:
                        // the transaction status is TRYING, ignore it.
                        break;
                }

            }

正文完
 0