单体秒杀服务转dubbo框架+分布式事务实现
计划和技术架构
计划:秒杀计划(
)+ 分布式事务解决方案 (为了让领取、扣减库存和订单状态一致性解决方案,见下图)
目标: dubbo 微服务化 实现订单领取分布式事务,对立提交和回滚
技术架构:
docker + nacos 架构 (舍去zookeeper,nacos更香)
docker 版本 2.7
springboot 版本 2.6.1
分布式事务(tcc -transaction 1.73)
github地址:https://github.com/luozijing/...
我的项目体验地址:http://81.69.254.72:9082/logi...
tcc-transaction
TCC(Try Confirm Cancel)计划是一种利用层面侵入业务的两阶段提交。是目前最火的一种柔性事务计划,其核心思想是:针对每个操作,都要注册一个与其对应的确认和弥补(撤销)操作。
TCC分为两个阶段,别离如下:
- 第一阶段:Try(尝试),次要是对业务零碎做检测及资源预留 (加锁,预留资源)
- 第二阶段:本阶段依据第一阶段的后果,决定是执行confirm还是cancel
- Confirm(确认):执行真正的业务(执行业务,开释锁)
- Cancle(勾销):是预留资源的勾销(出问题,开释锁)
最终一致性保障,
- TCC 事务机制以初步操作(Try)为核心的,确认操作(Confirm)和勾销操作(Cancel)都是围绕初步操作(Try)而开展。因而,Try 阶段中的操作,其保障性是最好的,即便失败,依然有勾销操作(Cancel)能够将其执行后果撤销。
- Try阶段执行胜利并开始执行
Confirm
阶段时,默认Confirm
阶段是不会出错的。也就是说只有Try
胜利,Confirm
肯定胜利(TCC设计之初的定义) 。 - Confirm与Cancel如果失败,由TCC框架进行==重试==弥补
- 存在极低概率在CC环节彻底失败,则须要定时工作或人工染指、
TCC 事务机制绝对于传统事务机制,有以下长处:
- 性能晋升:具体业务来实现管制资源锁的粒度变小,不会锁定整个资源。
- 数据最终一致性:基于 Confirm 和 Cancel 的幂等性,保障事务最终实现确认或者勾销,保证数据的一致性。
- 可靠性:解决了 XA 协定的协调者单点故障问题,由主业务方发动并管制整个业务流动,业务流动管理器也变成多点,引入集群。
毛病:
- TCC 的 Try、Confirm 和 Cancel 操作性能要按具体业务来实现,业务耦合度较高,进步了开发成本。
dubbo+TCC实现
样例实现了领取、更新订单和库存的程序一致性。联合TCC-Transaction的代码和业务代码,分下下要害流程。
tcc源码剖析
更具体的流程能够看这
https://blog.csdn.net/tianjin...
启动类通过@EnableTccTransaction注解,真正起作用的是上面的import,在同一个上下文注入相干的类,包含transactionRepository、springBeanFactory、recoveryLock、TransactionManager等类,TransactionManager在整个tcc事务中是一个事务管理类,作用包含begin、commit、rollback等。
@Import(TccTransactionConfigurationSelector.class)
除此之外,还注入上述两个切面,切面CompensableTransactionInterceptor的切点为@Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)"),作用赋予事务角色和初始化状态。分事务角色和事务以后状态解决分布式事务。
当角色使root,即根事务,进入rootMethodProceed处理事务上下文(分布式上下文,微服务),来判断本身事务进入confirm、cancel还是try分支。
当角色使provider,即分支事务,进入providerMethodProceed处理事务上下文。
若没有事务角色,则间接执行切面的代理办法。
public Object interceptCompensableMethod(TransactionMethodJoinPoint pjp) throws Throwable { Transaction transaction = transactionManager.getCurrentTransaction(); CompensableMethodContext compensableMethodContext = new CompensableMethodContext(pjp, transaction); // if method is @Compensable and no transaction context and no transaction, then root // else if method is @Compensable and has transaction context and no transaction ,then provider switch (compensableMethodContext.getParticipantRole()) { case ROOT: return rootMethodProceed(compensableMethodContext); case PROVIDER: return providerMethodProceed(compensableMethodContext); default: return compensableMethodContext.proceed(); } }
rootMethodProceed办法,很明确,由事务管理器创立事务transaction,而后执行服务调用,try catch住调用的服务的状态,若异样则,进行回滚解决执行cancel办法,若失常执行提交,执行confirm办法。
private Object rootMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable { Object returnValue = null; Transaction transaction = null; boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm(); boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel(); try { transaction = transactionManager.begin(compensableMethodContext.getUniqueIdentity()); try { returnValue = compensableMethodContext.proceed(); } catch (Throwable tryingException) { transactionManager.rollback(asyncCancel); throw tryingException; } transactionManager.commit(asyncConfirm); } finally { transactionManager.cleanAfterCompletion(transaction); } return returnValue; }
providerMethodProceed办法当上游事务的状态是try 或者是confirm是别离执行try 和commit,当是cancel时,会通过Transaction transaction = transactionRepository.findByXid(transactionContext.getXid());先去找之前的事务,因为执行cancel阐明大概率之前事务时存在的,因而有了前面的判断,判断之前事务时try胜利、try失败或者勾销,都有可能须要回滚,另外其余状况就时走定时回复工作去回滚。
private Object providerMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable { Transaction transaction = null; boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm(); boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel(); try { switch (TransactionStatus.valueOf(compensableMethodContext.getTransactionContext().getStatus())) { case TRYING: transaction = transactionManager.propagationNewBegin(compensableMethodContext.getTransactionContext()); Object result = null; try { result = compensableMethodContext.proceed(); //TODO: need tuning here, async change the status to tuning the invoke chain performance //transactionManager.changeStatus(TransactionStatus.TRY_SUCCESS, asyncSave); transactionManager.changeStatus(TransactionStatus.TRY_SUCCESS, true); } catch (Throwable e) { transactionManager.changeStatus(TransactionStatus.TRY_FAILED); throw e; } return result; case CONFIRMING: try { transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext()); transactionManager.commit(asyncConfirm); } catch (NoExistedTransactionException excepton) { //the transaction has been commit,ignore it. logger.info("no existed transaction found at CONFIRMING stage, will ignore and confirm automatically. transaction:" + JSON.toJSONString(transaction)); } break; case CANCELLING: try { //The transaction' status of this branch transaction, passed from consumer side. int transactionStatusFromConsumer = compensableMethodContext.getTransactionContext().getParticipantStatus(); transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext()); // Only if transaction's status is at TRY_SUCCESS、TRY_FAILED、CANCELLING stage we can call rollback. // If transactionStatusFromConsumer is TRY_SUCCESS, no mate current transaction is TRYING or not, also can rollback. // transaction's status is TRYING while transactionStatusFromConsumer is TRY_SUCCESS may happen when transaction's changeStatus is async. if (transaction.getStatus().equals(TransactionStatus.TRY_SUCCESS) || transaction.getStatus().equals(TransactionStatus.TRY_FAILED) || transaction.getStatus().equals(TransactionStatus.CANCELLING) || transactionStatusFromConsumer == ParticipantStatus.TRY_SUCCESS.getId()) { transactionManager.rollback(asyncCancel); } else { //in this case, transaction's Status is TRYING and transactionStatusFromConsumer is TRY_FAILED // this may happen if timeout exception throws during rpc call. throw new IllegalTransactionStatusException("Branch transaction status is TRYING, cannot rollback directly, waiting for recovery job to rollback."); } } catch (NoExistedTransactionException exception) { //the transaction has been rollback,ignore it. logger.info("no existed transaction found at CANCELLING stage, will ignore and cancel automatically. transaction:" + JSON.toJSONString(transaction)); } break; } } finally { transactionManager.cleanAfterCompletion(transaction); } Method method = compensableMethodContext.getMethod(); return ReflectionUtils.getNullValue(method.getReturnType()); }
另外一个切面是ConfigurableCoordinatorAspect,这个切面的作用是制作参加事务,参加事务的作用的是当Commit时所有的参加事务一起提交,回滚时,所有的参加事务一起回滚,来管制分布式服务。
public Object interceptTransactionContextMethod(TransactionMethodJoinPoint pjp) throws Throwable { Transaction transaction = transactionManager.getCurrentTransaction(); if (transaction != null && transaction.getStatus().equals(TransactionStatus.TRYING)) { Participant participant = enlistParticipant(pjp); if (participant != null) { Object result = null; try { result = pjp.proceed(pjp.getArgs()); participant.setStatus(ParticipantStatus.TRY_SUCCESS); } catch (Throwable e) { participant.setStatus(ParticipantStatus.TRY_FAILED); //if root transaction, here no need persistent transaction // because following stage is rollback, transaction's status is changed to CANCELING and save// transactionManager.update(participant);// throw e; } return result; } } return pjp.proceed(pjp.getArgs()); }
业务实现
业务要实现订单、库存和领取的一致性事务,首先起一个根事务,该服务负责给前端返回后果,上面有两个子事务,一个是领取,另外一个是扣减库存和订单。
@Compensable(confirmMethod = "confirmMakePayment", cancelMethod = "cancelMakePayment", asyncConfirm = false) public void makePayment(MiaoShaUser user, PaymentVo paymentVo) { log.info("start tcc transaction try: {}", JSONObject.toJSONString(paymentVo)); // 领取 miaoShaUserService.pay( user, paymentVo); // 扣减库存和更新订单 miaoshaService.completeOrder(user, paymentVo.getOrderId()); } public void confirmMakePayment(MiaoShaUser user, PaymentVo paymentVo) { log.info("start tcc transaction confirm: {}", JSONObject.toJSONString(paymentVo)); //check if the trade order status is PAYING, if no, means another call confirmMakePayment happened, return directly, ensure idempotency. } public void cancelMakePayment(MiaoShaUser user, PaymentVo paymentVo) { log.info("start tcc transaction cancel: {}", JSONObject.toJSONString(paymentVo)); }
以领取为例子,领取的办法都做了幂等管制管制,通过领取记录和领取记录的状态来管制。
还须要特地留神的中央,避免事务悬挂,tcc-transaction默认反对悬挂,利用定时工作来回复悬挂事务。见上面源码
@Override @Compensable(confirmMethod = "confirmPay", cancelMethod = "cancelPay", transactionContextEditor = DubboTransactionContextEditor.class) @Transactional /** * 领取订单 预留扣款资源 */ public ResultGeekQ<MiaoShaUserVo> pay(MiaoShaUser user, PaymentVo paymentVo) { ResultGeekQ<MiaoShaUserVo> resultGeekQ = ResultGeekQ.build(); log.info("start tcc pay try, user:{}, paymentVo:{}", user, paymentVo); MiaoshaPayment miaoshaPaymentDb = miaoshaPaymentDao.selectByOrderID(paymentVo.getOrderId()); MiaoshaUserAccount miaoshaUserAccountDb = miaoshaUserAccountDao.selectByUserID(user.getId()); // 金额是否足够用 if (miaoshaUserAccountDb.getBalanceAmount() == null || miaoshaUserAccountDb.getBalanceAmount().compareTo(paymentVo.getPayAmount()) < 0) { throw new AccountException("领取金额有余"); } // 判断领取记录是否存在,try具备重试机制,须要幂等性 if (miaoshaPaymentDb == null) { // 账户欲扣款 MiaoshaUserAccount miaoshaUserAccount = new MiaoshaUserAccount(); miaoshaUserAccount.setUserId(user.getId()); miaoshaUserAccount.setTransferAmount(paymentVo.getPayAmount()); miaoshaUserAccountDao.updateByUserID(miaoshaUserAccount); // 插入欲扣款记录 MiaoshaPayment miaoshaPayment = new MiaoshaPayment(); miaoshaPayment.setAmount(paymentVo.getPayAmount()); miaoshaPayment.setMiaoshaOrderId(paymentVo.getOrderId()); miaoshaPayment.setUserId(user.getId()); miaoshaPayment.setCreateTime(new Date()); miaoshaPayment.setUpdateTime(new Date()); miaoshaPayment.setStatus(Constants.PAY_DEALING); miaoshaPayment.setVersion(1); miaoshaPaymentDao.insertSelective(miaoshaPayment); } return resultGeekQ; }
//transaction type is BRANCH switch (transaction.getStatus()) { case CONFIRMING: commitTransaction(transactionRepository, transaction); break; case CANCELLING: case TRY_FAILED: rollbackTransaction(transactionRepository, transaction); break; case TRY_SUCCESS: if(transactionRepository.getRootDomain() == null) { break; } //check the root transaction Transaction rootTransaction = transactionRepository.findByRootXid(transaction.getRootXid()); if (rootTransaction == null) { // In this case means the root transaction is already rollback. // Need cancel this branch transaction. rollbackTransaction(transactionRepository, transaction); } else { switch (rootTransaction.getStatus()) { case CONFIRMING: commitTransaction(transactionRepository, transaction); break; case CANCELLING: rollbackTransaction(transactionRepository, transaction); break; default: break; } } break; default: // the transaction status is TRYING, ignore it. break; } }