关于java:用Java轻松完成一个分布式事务TCC自动处理空补偿悬挂幂等

38次阅读

共计 5846 个字符,预计需要花费 15 分钟才能阅读完成。

什么是 TCC,TCC 是 Try、Confirm、Cancel 三个词语的缩写,最早是由 Pat Helland 于 2007 年发表的一篇名为《Life beyond Distributed Transactions:an Apostate’s Opinion》的论文提出。

TCC 组成

TCC 分为 3 个阶段

  • Try 阶段:尝试执行,实现所有业务查看(一致性), 预留必须业务资源(准隔离性)
  • Confirm 阶段:如果所有分支的 Try 都胜利了,则走到 Confirm 阶段。Confirm 真正执行业务,不作任何业务查看,只应用 Try 阶段预留的业务资源
  • Cancel 阶段:如果所有分支的 Try 有一个失败了,则走到 Cancel 阶段。Cancel 开释 Try 阶段预留的业务资源。

TCC 分布式事务里,有 3 个角色,与经典的 XA 分布式事务一样:

  • AP/ 应用程序,发动全局事务,定义全局事务蕴含哪些事务分支
  • RM/ 资源管理器,负责分支事务各项资源的治理
  • TM/ 事务管理器,负责协调全局事务的正确执行,包含 Confirm,Cancel 的执行,并解决网络异样

如果咱们要进行一个相似于银行跨行转账的业务,转出(TransOut)和转入(TransIn)别离在不同的微服务里,一个胜利实现的 TCC 事务典型的时序图如下:

TCC 网络异样

TCC 在整个全局事务的过程中,可能产生各类网络异常情况,典型的是空回滚、幂等、悬挂。这里有一篇文章分布式事务的这些常见用法都有坑,来看看正确姿态,进行了具体的解说

TCC 实际

对于后面的跨行转账操作,最简略的做法是,在 Try 阶段调整余额,在 Cancel 阶段反向调整余额,Confirm 阶段则空操作。这么做带来的问题是,如果 A 扣款胜利,金额转入 B 失败,最初回滚,把 A 的余额调整为初始值。在这个过程中如果 A 发现自己的余额被扣减了,然而收款方 B 迟迟没有收到余额,那么会对 A 造成困扰。

更好的做法是,Try 阶段解冻 A 转账的金额,Confirm 进行理论的扣款,Cancel 进行资金冻结,这样用户在任何一个阶段,看到的数据都是清晰明了的。

上面咱们进行一个 TCC 事务的具体开发

咱们的例子采纳 Java 语言,应用的分布式事务框架为 https://github.com/yedf/dtm,它对分布式事务的反对十分优雅。上面来具体解说 TCC 的组成

咱们首先创立用户余额表,建表语句如下:

create table if not exists dtm_busi.user_account(id int(11) PRIMARY KEY AUTO_INCREMENT,
  user_id int(11) UNIQUE,
  balance DECIMAL(10, 2) not null default '0',
  trading_balance DECIMAL(10, 2) not null default '0',
  create_time datetime DEFAULT now(),
  update_time datetime DEFAULT now(),
  key(create_time),
  key(update_time)
);

表中,trading_balance 记录正在交易的金额。

咱们先编写外围代码,解冻 / 冻结资金操作,会查看束缚 balance+trading_balance >= 0,如果束缚不成立,执行失败

public void adjustTrading(Connection connection, TransReq transReq) throws Exception {
    String sql = "update dtm_busi.user_account set trading_balance=trading_balance+?"
            + "where user_id=? and trading_balance + ? + balance >= 0";
    PreparedStatement preparedStatement = null;
    try {preparedStatement = connection.prepareStatement(sql);
        preparedStatement.setInt(1, transReq.getAmount());
        preparedStatement.setInt(2, transReq.getUserId());
        preparedStatement.setInt(3, transReq.getAmount());
        if (preparedStatement.executeUpdate() > 0) {System.out.println("交易金额更新胜利");
        } else {throw new FailureException("交易失败");
        }
    } finally {if (null != preparedStatement) {preparedStatement.close();
        }
    }
    
}

而后是调整余额

public void adjustBalance(Connection connection, TransReq transReq) throws SQLException {
    PreparedStatement preparedStatement = null;
    try {
        String sql = "update dtm_busi.user_account set trading_balance=trading_balance-?,balance=balance+? where user_id=?";
        preparedStatement = connection.prepareStatement(sql);
        preparedStatement.setInt(1, transReq.getAmount());
        preparedStatement.setInt(2, transReq.getAmount());
        preparedStatement.setInt(3, transReq.getUserId());
        if (preparedStatement.executeUpdate() > 0) {System.out.println("余额更新胜利");
        }
    } finally {if (null != preparedStatement) {preparedStatement.close();
        }
    }
}

上面咱们来编写具体的 Try/Confirm/Cancel 的处理函数

@RequestMapping("barrierTransOutTry")
public Object TransOutTry(HttpServletRequest request) throws Exception {BranchBarrier branchBarrier = new BranchBarrier(request.getParameterMap());
    logger.info("barrierTransOutTry branchBarrier:{}", branchBarrier);

    TransReq transReq = extracted(request);
    Connection connection = dataSourceUtil.getConnecion();
    branchBarrier.call(connection, (barrier) -> {System.out.println("用户: +" + transReq.getUserId() + ", 转出" + Math.abs(transReq.getAmount()) + "元筹备");
        this.adjustTrading(connection, transReq);
    });
    connection.close();
    return TransResponse.buildTransResponse(Constant.SUCCESS_RESULT);
}

@RequestMapping("barrierTransOutConfirm")
public Object TransOutConfirm(HttpServletRequest request) throws Exception {BranchBarrier branchBarrier = new BranchBarrier(request.getParameterMap());
    logger.info("barrierTransOutConfirm branchBarrier:{}", branchBarrier);
    Connection connection = dataSourceUtil.getConnecion();
    TransReq transReq = extracted(request);
    branchBarrier.call(connection, (barrier) -> {System.out.println("用户: +" + transReq.getUserId() + ", 转出" + Math.abs(transReq.getAmount()) + "元提交");
        adjustBalance(connection, transReq);
    });
    connection.close();
    return TransResponse.buildTransResponse(Constant.SUCCESS_RESULT);
}

@RequestMapping("barrierTransOutCancel")
public Object TransOutCancel(HttpServletRequest request) throws Exception {BranchBarrier branchBarrier = new BranchBarrier(request.getParameterMap());
    logger.info("barrierTransOutCancel branchBarrier:{}", branchBarrier);
    TransReq transReq = extracted(request);
    Connection connection = dataSourceUtil.getConnecion();
    branchBarrier.call(connection, (barrier) -> {System.out.println("用户: +" + transReq.getUserId() + ", 转出" + Math.abs(transReq.getAmount()) + "元回滚");
        this.adjustTrading(connection, transReq);
    });
    connection.close();
    return TransResponse.buildTransResponse(Constant.SUCCESS_RESULT);
}

// TransIn 相干函数与 TransOut 相似,这里省略 

到此各个子事务的处理函数曾经 OK 了,而后是开启 TCC 事务,进行分支调用

@RequestMapping("tccBarrier")
public String tccBarrier() {
    // 创立 dmt client
    DtmClient dtmClient = new DtmClient(ipPort);
    // 创立 tcc 事务
    try {dtmClient.tccGlobalTransaction(dtmClient.genGid(), TccTestController::tccBarrierTrans);
    } catch (Exception e) {log.error("tccGlobalTransaction error", e);
        return "fail";
    }
    return "success";
}

public static void tccBarrierTrans(Tcc tcc) throws Exception {
    // 用户 1 转出 30 元
    Response outResponse = tcc
            .callBranch(new TransReq(1, -30), svc + "/barrierTransOutTry", svc + "/barrierTransOutConfirm",
                    svc + "/barrierTransOutCancel");
    log.info("outResponse:{}", outResponse);

    // 用户 2 转入 30 元
    Response inResponse = tcc
            .callBranch(new TransReq(2, 30), svc + "/barrierTransInTry", svc + "/barrierTransInConfirm",
                    svc + "/barrierTransInCancel");
    log.info("inResponse:{}", inResponse);
}

至此,一个残缺的 TCC 分布式事务编写实现。

如果您想要残缺运行一个胜利的示例,那么依照 dtmcli-java-sample 我的项目的阐明搭建好环境启动之后,运行上面命令运行 tcc 的例子即可

curl http://localhost:8081/tccBarrier

TCC 的回滚

如果银行将金额筹备转出用户 2 时,发现用户 2 的账户异样,返回失败,会怎么样?咱们的例子中,可用户余额为 10000,发动一笔 100000 的转账会触发异样而失败:
curl http://localhost:8081/tccBarrier
这是事务失败交互的时序图

这个跟胜利的 TCC 差异就在于,当某个子事务返回失败后,后续就回滚全局事务,调用各个子事务的 Cancel 操作,保障全局事务全副回滚。

小结

在这篇文章里,咱们介绍了 TCC 的理论知识,也通过一个例子,残缺给出了编写一个 TCC 事务的过程,涵盖了失常胜利实现,以及胜利回滚的状况。置信读者通过这边文章,对 TCC 曾经有了深刻的了解。

对于分布式事务中须要解决的幂等、悬挂、空弥补,请参考另一篇文章:分布式事务的这些常见用法都有坑,来看看正确姿态

对于分布式事务更多更全面的常识,请参考分布式事务最经典的七种解决方案

文中应用的例子选自 yedf/dtmcli-java-sample。应用的分布式事务管理器为 https://github.com/yedf/dtm,反对多种事务模式:TCC、SAGA、XA、事务音讯 跨语言反对,已反对 golang、python、PHP、nodejs 等语言的客户端。提供子事务屏障性能,优雅解决幂等、悬挂、空弥补等问题。

浏览完此篇干货,欢送大家拜访 https://github.com/yedf/dtm 我的项目,给颗星星反对!

正文完
 0