关于java:听说-TCC-不支持-OpenFeign这个坑松哥必须给大家填了

34次阅读

共计 14017 个字符,预计需要花费 36 分钟才能阅读完成。

情谊提醒:本文略长略简单,然而有配套的视频教程。

在后面的文章中,松哥和大家聊了分布式事务框架 seata 的 at 模式,而后有小伙伴评论说 seata 的 tcc 模式不反对 Spring Boot:

这。。。必然是反对的呀!

我始终感觉网上讲分布式事务的实践很多,案例代码很少,所以咱们明天就整一个例子,一起来把这个捋一捋。

1. 什么是 TCC 模式

相比于上篇文章所聊的 AT 模式,TCC(Try-Confirm-Cancel)模式就带一点手动的感觉了,它也是两阶段提交的演变,然而和 AT 又不太一样,咱们来看下流程。

官网上有一张 TCC 的流程图,咱们来看下:

能够看到,TCC 也是分为两阶段:

  • 第一阶段是 prepare,在这个阶段次要是做资源的检测和预留工作,例如银行转账,这个阶段就先去查看下用户的钱够不够,不够就间接抛异样,够就先给解冻上。
  • 第二阶段是 commit 或 rollback,这个次要是等各个分支事务的一阶段都执行结束,都执行结束后各自将本人的状况报告给 TC,TC 一统计,发现各个分支事务都没有异样,那么就告诉大家一起提交;如果 TC 发现有分支事务产生异样了,那么就告诉大家回滚。

那么小伙伴可能也发现了,下面这个流程中,一共波及到了三个办法,prepare、commit 以及 rollback,这三个办法都齐全是用户自定义的办法,都是须要咱们本人来实现的,所以我一开始就说 TCC 是一种手动的模式。

和 AT 相比,大家发现 TCC 这种模式其实是不依赖于底层数据库的事务反对的,也就是说,哪怕你底层数据库不反对事务也没关系,反正 prepare、commit 以及 rollback 三个办法都是开发者本人写的,咱们本人将这三个办法对应的流程捋顺就行了。

在上篇文章的中,咱们讲 AT 模式,每个数据库都须要有一个 undo log 表,这个表用来记录一条数据更改之前和更改之后的状态(前镜像和后镜像),如果所有分支事务最终都提交胜利,那么记录在 undo log 表中的数据就会主动删除;如果有一个分支事务执行失败,导致所有事务都须要回滚,那么就会以 undo log 表中的数据会根据,生成反向弥补语句,利用反向弥补语句将数据还原,执行实现后也会删除 undo log 表中的记录。

在这个流程中,大家看到,undo log 表表演了十分重要的角色。TCC 和 AT 最大的区别在于,TCC 中的提交和回滚逻辑都是开发者本人写的,而 AT 都是框架主动实现的。

为了不便大家了解,本文我就不从新搞案例了,咱们还用上篇文章那个下订单的案例来演示。

2. 案例回顾

这是一个商品下单的案例,一共有五个服务,我来和大家略微解释下:

  • eureka:这是服务注册核心。
  • account:这是账户服务,能够查问 / 批改用户的账户信息(次要是账户余额)。
  • order:这是订单服务,能够下订单。
  • storage:这是一个仓储服务,能够查问 / 批改商品的库存数量。
  • bussiness:这是业务,用户下单操作将在这里实现。

这个案例讲了一个什么事呢?

当用户想要下单的时候,调用了 bussiness 中的接口,bussiness 中的接口又调用了它本人的 service,在 service 中,首先开启了全局分布式事务,而后通过 feign 调用 storage 中的接口去扣库存,而后再通过 feign 调用 order 中的接口去创立订单(order 在创立订单的时候,不仅会创立订单,还会扣除用户账户的余额),在这个过程中,如果有任何一个环节出错了(余额有余、库存有余等导致的问题),就会触发整体的事务回滚。

本案例具体架构如下图:

这个案例就是一个典型的分布式事务问题,storage、order 以及 account 中的事务分属于不同的微服务,然而咱们心愿他们同时胜利或者同时失败。

这个案例的根本架构我这里就不反复搭建了,小伙伴们能够参考上篇文章,这里咱们次要来看 TCC 事务如何增加进来。

3. 从新设计数据库

首先咱们将上篇文章中的数据库来从新设计一下,不便咱们本文的应用。

账户表减少一个解冻金额的字段,如下:

订单表和前文保持一致,不变。

库存表也减少一个解冻库存数量的字段,如下:

另外,因为咱们这里不再应用 AT 模式,所以能够删除之前的 undo_log 表了(可能有小伙伴删除 undo_log 表之后,会报错,那是因为你 TCC 模式应用不对,留神看松哥前面的解说哦)。

相干的数据库脚本小伙伴们能够在文末下载,这里我就不列出来了。

4. 从新设计 Feign 接口

在 TCC 模式中,咱们的 Feign 换一种形式来配置。

小伙伴们都晓得,在上篇文章的案例中,咱们有一个 common 模块,用来寄存一些公共内容(实际上咱们只是存储了 RespBean),当初咱们把这里波及到的 OpenFeign 接口也存储进来,一共是三个 OpenFeign 接口,因为还要用到 seata 中的注解,所以咱们在 common 中引入 OpenFeign 和 seata 的依赖,如下:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    <version>2.2.2.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
    <version>2.2.6.RELEASE</version>
</dependency>

而后在这里定义 OpenFeign 的三个接口,如下:

@LocalTCC
public interface AccountServiceApi {@PostMapping("/account/deduct/prepare")
    @TwoPhaseBusinessAction(name = "accountServiceApi", commitMethod = "commit", rollbackMethod = "rollback")
    boolean prepare(@RequestBody BusinessActionContext actionContext, @RequestParam("userId") @BusinessActionContextParameter(paramName = "userId") String userId, @RequestParam("money") @BusinessActionContextParameter(paramName = "money") Double money);

    @RequestMapping("/account/deduct/commit")
    boolean commit(@RequestBody BusinessActionContext actionContext);

    @RequestMapping("/account/deduct/rollback")
    boolean rollback(@RequestBody BusinessActionContext actionContext);
}
@LocalTCC
public interface OrderServiceApi {@PostMapping("/order/create/prepare")
    @TwoPhaseBusinessAction(name = "orderServiceApi", commitMethod = "commit", rollbackMethod = "rollback")
    boolean prepare(@RequestBody BusinessActionContext actionContext, @RequestParam("userId") @BusinessActionContextParameter(paramName = "userId") String userId, @RequestParam("productId") @BusinessActionContextParameter(paramName = "productId") String productId, @RequestParam("count") @BusinessActionContextParameter(paramName = "count") Integer count);

    @RequestMapping("/order/create/commit")
    boolean commit(@RequestBody BusinessActionContext actionContext);

    @RequestMapping("/order/create/rollback")
    boolean rollback(@RequestBody BusinessActionContext actionContext);
}
@LocalTCC
public interface StorageServiceApi {@PostMapping("/storage/deduct/prepare")
    @TwoPhaseBusinessAction(name = "storageServiceApi",commitMethod = "commit",rollbackMethod = "rollback")
    boolean deduct(@RequestBody BusinessActionContext actionContext, @RequestParam("productId")@BusinessActionContextParameter(paramName = "productId") String productId, @RequestParam("count") @BusinessActionContextParameter(paramName = "count") Integer count);

    @RequestMapping("/storage/deduct/commit")
    boolean commit(@RequestBody BusinessActionContext actionContext);

    @RequestMapping("/storage/deduct/rollback")
    boolean rollback(@RequestBody BusinessActionContext actionContext);
}

这里一共有三个接口,然而只有大家搞懂其中一个,另外两个都很好懂了。我这里就以 AccountServiceApi 为例来和大家解说吧。

  • 首先接口的定义上,须要加一个注解 @LocalTCC,这个示意开启 seata 中的 TCC 模式。
  • 而后就是 @TwoPhaseBusinessAction 注解,两阶段提交的注解,这个注解有三个属性,第一个 name 就是解决两阶段提交的 bean 的名字,其实就是以后 bean 的名字,以后类名首字母小写。两阶段第一阶段就是 prepare 阶段,也就是执行 @TwoPhaseBusinessAction 注解所在的办法,第二阶段则分为两种状况,提交或者回滚,别离对应了两个不同的办法,commitMethod 和 rollbackMethod 就指明了相应的办法。
  • 一阶段的 prepare 须要开发者手动调用,二阶段的 commit 或者 rollback 则是零碎主动调用。prepare 中的办法是由开发者来传递的,而在二阶段的办法中,相干的参数咱们须要从 BusinessActionContext 中获取,@BusinessActionContextParameter 注解就是将对应的参数放入到 BusinessActionContext 中(留神须要给每一个参数取一个名字),未来能够从 BusinessActionContext 中取出对应的参数。
  • 另外须要留神,接口的返回值设计成 boolean,用以示意相应的操作执行胜利还是失败,返回 false 示意执行失败,默认会有重试机制进行重试。

这是 AccountServiceApi,另外两个接口的设计也是大同小异,这里我就不再赘述。

接下来看接口的实现。

5. Account

首先咱们来看看 Account 服务。AccountController 实现 AccountServiceApi。

咱们来看下 AccountController 的定义:

@RestController
public class AccountController implements AccountServiceApi {
    @Autowired
    AccountService accountService;

    @Override
    public boolean prepare(BusinessActionContext actionContext, String userId, Double money) {return accountService.prepareDeduct(userId, money);
    }

    @Override
    public boolean commit(BusinessActionContext actionContext) {return accountService.commitDeduct(actionContext);
    }

    @Override
    public boolean rollback(BusinessActionContext actionContext) {return accountService.rollbackDeduct(actionContext);
    }
}

因为接口的门路都定义在 AccountServiceApi 中了,所以这里只须要简略实现即可,外围的解决逻辑在 AccountService 中,咱们来看下 AccountService:

@Service
public class AccountService {private static final Logger logger = LoggerFactory.getLogger(AccountService.class);

    @Autowired
    AccountMapper accountMapper;

    /**
     * 预扣款阶段
     * 查看账户余额
     *
     * @param userId
     * @param money
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean prepareDeduct(String userId, Double money) {Account account = accountMapper.getAccountByUserId(userId);
        if (account == null) {throw new RuntimeException("账户不存在");
        }
        if (account.getMoney() < money) {throw new RuntimeException("余额有余,预扣款失败");
        }
        account.setFreezeMoney(account.getFreezeMoney() + money);
        account.setMoney(account.getMoney() - money);
        Integer i = accountMapper.updateAccount(account);
        logger.info("{} 账户预扣款 {} 元", userId, money);
        return i == 1;
    }

    /**
     * 理论扣款阶段
     *
     * @param actionContext
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean commitDeduct(BusinessActionContext actionContext) {String userId = (String) actionContext.getActionContext("userId");
        Double money = ((BigDecimal) actionContext.getActionContext("money")).doubleValue();
        Account account = accountMapper.getAccountByUserId(userId);
        if (account.getFreezeMoney() < money) {throw new RuntimeException("余额有余,扣款失败");
        }
        account.setFreezeMoney(account.getFreezeMoney() - money);
        Integer i = accountMapper.updateAccount(account);
        logger.info("{} 账户扣款 {} 元", userId, money);
        return i == 1;
    }

    /**
     * 账户回滚阶段
     *
     * @param actionContext
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean rollbackDeduct(BusinessActionContext actionContext) {String userId = (String) actionContext.getActionContext("userId");
        Double money = ((BigDecimal) actionContext.getActionContext("money")).doubleValue();
        Account account = accountMapper.getAccountByUserId(userId);
        if (account.getFreezeMoney() >= money) {account.setMoney(account.getMoney() + money);
            account.setFreezeMoney(account.getFreezeMoney() - money);
            Integer i = accountMapper.updateAccount(account);
            logger.info("{} 账户开释解冻金额 {} 元", userId, money);
            return i == 1;
        }
        logger.info("{} 账户资金已开释",userId);
        // 阐明 prepare 中抛出异样,未冻结资金
        return true;
    }
}
  • AccountService 里一共有三个办法,在整个两阶段提交中,一阶段执行 prepareDeduct 办法,二阶段执行 commitDeduct 或者 rollbackDeduct 办法。
  • 在 prepareDeduct 中,咱们次要检查一下账户是否存在,账户余额是否短缺,余额短缺就将本次生产的金额解冻起来,解冻的逻辑就是给 freezeMoney 字段减少本次生产金额,从 money 字段缩小本次生产金额。
  • 等到其余几个服务的一阶段办法都执行实现后,都没有抛出异样,此时就执行二阶段的提交办法,对应这里就是 commitDeduct 办法;如果其余服务的一阶段执行过程中,抛出了异样,那么就执行二阶段的回滚办法,对应这里的 rollbackDeduct。
  • 在 commitDeduct 办法中,首先从 BusinessActionContext 中提取进去咱们须要的参数(因为这个办法是零碎主动调用的,不是咱们手动调用,因而没法本人传参数进来,只能通过 BusinessActionContext 来获取),而后再检查一下余额是否短缺,没问题就把解冻的资金划掉,就算扣款实现了。
  • 在 rollbackDeduct 办法中,也是先从 BusinessActionContext 中获取相应的参数,检查一下解冻的金额,没问题就把解冻的金额复原到 money 字段上(如果没进入 if 分支,则阐明 prepare 中抛出异样,未冻结资金)。

好了,这就是从账户扣钱的两阶段操作,数据库操作比较简单,我这里就不列出来了,文末能够下载源码。

6. Order

再来看订单服务。

因为咱们是在 order 中调用 account 实现账户扣款的,所以须要先在 order 中退出 account 的 OpenFeign 调用,如下:

@FeignClient("account")
public interface AccountServiceApiImpl extends AccountServiceApi {}

这应该没啥好解释的。

接下来咱们来看 OrderController:

@RestController
public class OrderController implements OrderServiceApi {

    @Autowired
    OrderService orderService;

    @Override
    public boolean prepare(BusinessActionContext actionContext, String userId, String productId, Integer count) {return orderService.prepareCreateOrder(actionContext,userId, productId, count);
    }

    @Override
    public boolean commit(BusinessActionContext actionContext) {return orderService.commitOrder(actionContext);
    }

    @Override
    public boolean rollback(BusinessActionContext actionContext) {return orderService.rollbackOrder(actionContext);
    }
}

这个跟 AccountService 也基本一致,实现了 OrderServiceApi 接口,接口地址啥的都定义在 OrderServiceApi 中,这个类重点还是在 OrderService 中,如下:

@Service
public class OrderService {private static final Logger logger = LoggerFactory.getLogger(OrderService.class);

    @Autowired
    AccountServiceApi accountServiceApi;

    @Autowired
    OrderMapper orderMapper;

    @Transactional(rollbackFor = Exception.class)
    public boolean prepareCreateOrder(BusinessActionContext actionContext, String userId, String productId, Integer count) {
        // 先去扣款,假如每个产品 100 块钱
        boolean resp = accountServiceApi.prepare(actionContext, userId, count * 100.0);
        logger.info("{} 用户购买的 {} 商品共计 {} 件,预下单胜利", userId, productId, count);
        return resp;
    }

    @Transactional(rollbackFor = Exception.class)
    public boolean commitOrder(BusinessActionContext actionContext) {String userId = (String) actionContext.getActionContext("userId");
        String productId = (String) actionContext.getActionContext("productId");
        Integer count = (Integer) actionContext.getActionContext("count");
        int i = orderMapper.addOrder(userId, productId, count, count * 100.0);
        logger.info("{} 用户购买的 {} 商品共计 {} 件,下单胜利", userId, productId, count);
        return i==1;
    }

    @Transactional(rollbackFor = Exception.class)
    public boolean rollbackOrder(BusinessActionContext actionContext) {String userId = (String) actionContext.getActionContext("userId");
        String productId = (String) actionContext.getActionContext("productId");
        Integer count = (Integer) actionContext.getActionContext("count");
        logger.info("{} 用户购买的 {} 商品共计 {} 件,订单回滚胜利", userId, productId, count);
        return true;
    }
}

跟之前的 AccountService 一样,这里也是三个外围办法:

  • prepareCreateOrder:这里次要是调用了一下账户的办法,去查看下看下钱够不。一阶段就做个这事。
  • commitOrder:二阶段如果是提交的话,就向数据库中增加一条订单记录。
  • rollbackOrder:二阶段如果是回滚的话,就什么事件都不做,打个日志就行了。

好了,这就是下单的操作。

7. Storage

最初咱们再来看看扣库存的操作,这个跟扣款比拟像,一起来看下:

@RestController
public class StorageController implements StorageServiceApi {

    @Autowired
    StorageService storageService;

    @Override
    public boolean deduct(BusinessActionContext actionContext, String productId, Integer count) {return storageService.prepareDeduct(productId, count);
    }

    @Override
    public boolean commit(BusinessActionContext actionContext) {return storageService.commitDeduct(actionContext);
    }

    @Override
    public boolean rollback(BusinessActionContext actionContext) {return storageService.rollbackDeduct(actionContext);
    }
}

外围逻辑在 StorageService 中,如下:

@Service
public class StorageService {private static final Logger logger = LoggerFactory.getLogger(StorageService.class);

    @Autowired
    StorageMapper storageMapper;

    /**
     * 预扣库存
     *
     * @param productId
     * @param count
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean prepareDeduct(String productId, Integer count) {Storage storage = storageMapper.getStorageByProductId(productId);
        if (storage == null) {throw new RuntimeException("商品不存在");
        }
        if (storage.getCount() < count) {throw new RuntimeException("库存有余,预扣库存失败");
        }
        storage.setFreezeCount(storage.getFreezeCount() + count);
        storage.setCount(storage.getCount() - count);
        int i = storageMapper.updateStorage(storage);
        logger.info("{} 商品库存解冻 {} 个", productId, count);
        return i == 1;
    }

    /**
     * 扣库存
     *
     * @param actionContext
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean commitDeduct(BusinessActionContext actionContext) {String productId = (String) actionContext.getActionContext("productId");
        Integer count = (Integer) actionContext.getActionContext("count");
        Storage storage = storageMapper.getStorageByProductId(productId);
        if (storage.getFreezeCount() < count) {throw new RuntimeException("库存有余,扣库存失败");
        }
        storage.setFreezeCount(storage.getFreezeCount() - count);
        int i = storageMapper.updateStorage(storage);
        logger.info("{} 商品库存扣除 {} 个", productId, count);
        return i == 1;
    }

    @Transactional(rollbackFor = Exception.class)
    public boolean rollbackDeduct(BusinessActionContext actionContext) {String productId = (String) actionContext.getActionContext("productId");
        Integer count = (Integer) actionContext.getActionContext("count");
        Storage storage = storageMapper.getStorageByProductId(productId);
        if (storage.getFreezeCount() >= count) {storage.setFreezeCount(storage.getFreezeCount() - count);
            storage.setCount(storage.getCount() + count);
            int i = storageMapper.updateStorage(storage);
            logger.info("{} 商品开释库存 {} 个", productId, count);
            return i == 1;
        }
        // 阐明 prepare 阶段就没有解冻
        return true;
    }
}

这个跟 AccountService 的逻辑基本上是一样的,我就不多做解释了。

8. Business

最初再来看看调用的入口 Business。Business 中要调用 storage 和 order,所以先把这两个的 OpenFeign 整进来:

@FeignClient("order")
public interface OrderServiceApiImpl extends OrderServiceApi {
}
@FeignClient("storage")
public interface StorageServiceApiImpl extends StorageServiceApi {}

而后看下接口调用:

@RestController
public class BusinessController {
    @Autowired
    BusinessService businessService;

    @PostMapping("/order")
    public RespBean order(String account, String productId, Integer count) {
        try {businessService.purchase(account, productId, count);
            return RespBean.ok("下单胜利");
        } catch (Exception e) {e.printStackTrace();
            return RespBean.error("下单失败", e.getMessage());
        }
    }
}
@Service
public class BusinessService {
    @Autowired
    StorageServiceApi storageServiceApi;
    @Autowired
    OrderServiceApi orderServiceApi;

    @GlobalTransactional
    public void purchase(String account, String productId, Integer count) {String xid = RootContext.getXID();
        BusinessActionContext actionContext = new BusinessActionContext();
        actionContext.setXid(xid);
        storageServiceApi.deduct(actionContext, productId, count);
        orderServiceApi.prepare(actionContext, account, productId, count);
    }
}

BusinessService 中通过 RootContext 获取全局事务 ID,而后结构一个 BusinessActionContext 对象,开始整个流程的调用。

好啦,功败垂成。

9. 测试

最初再来个简略测试,胜利的测试:

调用失败的测试:

好啦,这篇文章太长了,我就不啰嗦了,本文须要联合上篇文章一起食用成果更佳~

正文完
 0