关于java:分布式事务解决方案

9次阅读

共计 7953 个字符,预计需要花费 20 分钟才能阅读完成。

数据不会平白无故失落,也不会莫名其妙减少

一、概述

1、曾几何时,知了在一家小公司做我的项目的时候,都是一个服务打天下,所以波及到数据一致性的问题,都是间接用本地事务处理。

2、随着工夫的推移,用户量增大了,发现一个 Java 服务扛不住了,于是技术大佬决定对于零碎进行降级。依据零碎的业务对于单体的一个服务进行拆分,而后对于开发人员也进行划分,一个开发人员只开发和保护一个或几个服务中的问题,大家各司其职,分工合作。

3、当然服务拆分不是欲速不达的,这是一个耗时耗力的宏大工程,大多数零碎都是进行多轮拆分,而后缓缓造成一个稳固的零碎。恪守一个核心思想:先按总体业务进行一轮拆分,前面再依据拆分后的服务模块,进行一个粗疏的拆分。

4、随着服务拆分之后,用户量是抗住了,然而发现数据都在不同的服务中存取,这就引出了一个新的问题:跨服务器,如何保证数据的一致性? 当然,跨服务的分布式系统中不仅仅这个问题,还有其余的一些列问题,如:服务可用性、服务容错性、服务间调用的网络问题等等,这里只探讨数据一致性问题。

5、说到数据一致性,大抵分为三种:强一致性、弱一致性、最终一致性。

  • 强一致性:数据一旦写入,在任一时刻都能读取到最新的值。
  • 弱一致性:当写入一个数据的时候,其余中央去读这些数据,可能查到的数据不是最新的
  • 最终一致性:它是弱一致性的一个变种,不谋求零碎任意时刻数据要达到统一,然而在肯定工夫后,数据最终要达到统一。

从这三种统一型的模型上来说,咱们能够看到,弱一致性和最终一致性一般来说是异步冗余的,而强一致性是同步冗余的,异步解决带来了更好的性能,但也须要解决数据的弥补。同步意味着简略,但也必然会升高零碎的性能。

二、实践

上述说的数据一致性问题,其实也就是在说分布式事务的问题,当初有一些解决方案,置信大家多多少少都看到过,这里带大家回顾下。

2.1、二阶段提交

2PC 是一种强一致性设计方案,通过引入一个 事务协调器 来协调各个本地事务(也称为事务参与者)的提交和回滚。
2PC 次要分为 2 个阶段:

1、第一阶段 :事务协调器会向每个事务参与者发动一个开启事务的命令,每个事务参与者执行筹备操作,而后再向事务协调器回复是否筹备实现。
然而 不会提交本地事务, 然而这个阶段资源是须要被锁住的。

2、第二阶段: 事务协调器收到每个事务参与者的回复后,统计每个参与者的回复,如果每个参与者都回复“能够提交”,那么事务协调器会发送提交命令,参与者正式提交本地事务,开释所有资源,完结全局事务。然而有一个参与者回复“回绝提交”,那么事务协调器发送回滚命令,所有参与者都回滚本地事务,待全副回滚实现,开释资源,勾销全局事务。

事务提交流程

事务回滚流程

当然 2PC 存在的问题这里也提一下,一个是 同步阻塞 ,这个会耗费性能。另一个是 协调器故障 问题,一旦协调器产生故障,那么所有的参与者解决资源锁定状态,那么所有参与者都会被阻塞。

2.2、三阶段提交

3PC 次要是在 2PC 的根底上做了改良,次要为了解决 2PC 的阻塞问题。它次要是将 2PC 的第一阶段分为 2 个步骤,先筹备,再锁定资源,并且引入了超时机制(这也意味着会造成数据不统一)。

3PC 的三个阶段包含:CanCommitPreCommit DoCommit

具体细节就不开展赘述了,就一个外围观点:在 CanCommit 的时候并不锁定资源,除非所有参与者都批准了,才开始锁资源

2.3、TCC 柔性事务

相比拟后面的 2PC 和 3PC,TCC 和那哥俩的本质区别就是它是业务层面的分布式事务,而 2PC 和 3PC 是数据库层面的。TCC 是三个单词的缩写:TryConfirmCancel,也分为这三个流程。

Try:尝试,即尝试预留资源,锁定资源

Confirm:确认,即执行预留的资源,如果执行失败会重试

Cancel:勾销,撤销预留的资源,如果执行失败会重试

从上图可知,TCC 对于业务的侵入是很大的,而且紧紧的耦合在一起。TCC 相比拟 2PC 和 3PC,试用范畴更广,可实现跨库,跨不同零碎去实现分布式事务。毛病是要在业务代码中去开发大量的逻辑实现这三个步骤,须要和代码耦合在一起,进步开发成本。

事务日志:在 TCC 模式中,事务发起者和事务参与者都会去记录事务日志(事务状态、信息等)。这个事务日志是整个分布式事务出现意外状况(宕机、重启、网络中断等),实现提交和回滚的要害。

幂等性:在 TCC 第二阶段,confirm 或者 cancel 的时候,这两个操作都须要保障幂等性。一旦因为网络等起因导致执行失败,就会发动一直重试。

防悬挂:因为网络的不可靠性,有异常情况的时候,try 申请可能比 cancel 申请更晚达到。cancel 可能会执行空回滚,然而 try 申请被执行的时候也不会预留资源。

2.4、Seata

对于 seata 这里就不多提了,用的最多的是 AT 模式,上回知了逐渐剖析过,配置完后只须要在事务发动的办法上增加 @GlobalTransactional 注解就能够开启全局事务,对于业务无侵入,低耦合。感兴趣的话请参考之前探讨 Seata 的内容。

三、利用场景

知了之前在一家公司遇到过这样的业务场景;用户通过页面投保,提交一笔订单过去,这个订单通过上游服务,解决保单相干的业务逻辑,最初流入上游服务,解决业绩、人员降职、分润解决等等业务。对于这个场景,两边解决的业务逻辑不在同一个服务中,接入的是不同的数据库。波及到数据一致性问题,须要用到分布式事务。

对于下面介绍的几种计划,只是探讨了实践和思路,上面我来总结下这个业务场景中使用的一种实现计划。采纳了本地音讯表 +MQ 异步音讯的计划实现了事务最终一致性,也合乎过后的业务场景,绝对强一致性,实现的性能较高。上面是该计划的思路图

  1. 实在业务解决的状态可能会有多种,因而须要明确哪种状态须要定时工作弥补
  2. 如果某条单据始终无奈解决完结,定时工作也不能无限度下发,所以本地音讯表须要减少轮次的概念,重试多少次后告警,人工染指解决
  3. 因为 MQ 和定时工作的存在,难免会呈现反复申请,因而上游要做好幂等防重,否则会呈现反复数据,导致数据不统一

对于落地实现,话不多说,间接上代码。先定义两张表 tb_order 和 tb_notice_message,别离存订单信息和本地事务信息

CREATE TABLE `tb_order` (`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键 id',
  `user_id` int(11) NOT NULL COMMENT '下单人 id',
  `order_no` varchar(255) CHARACTER SET latin1 NOT NULL COMMENT '订单编号',
  `insurance_amount` decimal(16,2) NOT NULL COMMENT '保额',
  `order_amount` decimal(16,2) DEFAULT NULL COMMENT '保费',
  `create_time` datetime DEFAULT NULL COMMENT '创立工夫',
  `update_time` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新工夫',
  `is_delete` tinyint(4) DEFAULT '0' COMMENT '删除标识:0- 不删除;1- 删除',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4;
CREATE TABLE `tb_notice_message` (`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键 id',
  `type` tinyint(4) NOT NULL COMMENT '业务类型:1- 下单',
  `status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '状态:1- 待处理,2- 已解决,3- 预警',
  `data` varchar(255) NOT NULL COMMENT '信息',
  `retry_count` tinyint(4) DEFAULT '0' COMMENT '重试次数',
  `create_time` datetime NOT NULL COMMENT '创立工夫',
  `update_time` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新工夫',
  `is_delete` tinyint(4) NOT NULL DEFAULT '0' COMMENT '删除标识:0- 不删除;1- 删除',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4;

解决订单 service,这里能够用到咱们之前说过的 装璜器模式 ,去装璜这个 service。把保留本地事务,发送 mq 音讯,交给装璜器类去做,而 service 只须要关怀业务逻辑即可,也合乎 开闭准则

/**
 * @author 往事如风
 * @version 1.0
 * @date 2022/12/13 10:58
 * @description
 */
@Service
@Slf4j
@AllArgsConstructor
public class OrderService implements BaseHandler<Object, Order> {

    private final OrderMapper orderMapper;

    /**
     * 订单解决办法:只解决订单关联逻辑
     * @param o
     * @return
     */
    @Override
    public Order handle(Object o) {
        // 订单信息
        Order order = Order.builder()
                .orderNo("2345678")
                .createTime(LocalDateTime.now())
                .userId(1)
                .insuranceAmount(new BigDecimal(2000000))
                .orderAmount(new BigDecimal(5000))
                .build();
        orderMapper.insert(order);
        return order;
    }
}

新增 OrderService 的装璜类 OrderServiceDecorate,负责对订单逻辑的扩大,这里是增加本地事务音讯,以及发送 MQ 信息,扩大办法增加了Transactional 注解,确保订单逻辑和本地事务音讯的数据在同一个事务中进行,确保原子性。其中事务音讯标记解决中,待上游服务解决完业务逻辑,再更新解决实现。

/**
 * @author 往事如风
 * @version 1.0
 * @date 2022/12/14 18:48
 * @description
 */
@Slf4j
@AllArgsConstructor
@Decorate(scene = SceneConstants.ORDER, type = DecorateConstants.CREATE_ORDER)
public class OrderServiceDecorate extends AbstractHandler {

    private final NoticeMessageMapper noticeMessageMapper;

    private final RabbitTemplate rabbitTemplate;

    /**
     * 装璜办法:对订单解决逻辑进行扩大
     * @param o
     * @return
     */
    @Override
    @Transactional
    public Object handle(Object o) {
        // 调用 service 办法,实现保单逻辑
        Order order = (Order) service.handle(o);
        // 扩大:1、保留事务音讯,2、发送 MQ 音讯
        // 本地事务音讯
        String data = "{\"orderNo\":\"2345678\", \"userId\":1, \"insuranceAmount\":2000000, \"orderAmount\":5000}";
        NoticeMessage noticeMessage = NoticeMessage.builder()
                .retryCount(0)
                .data(data)
                .status(1)
                .type(1)
                .createTime(LocalDateTime.now())
                .build();
        noticeMessageMapper.insert(noticeMessage);
        // 发送 mq 音讯
        log.info("发送 mq 音讯....");
        rabbitTemplate.convertAndSend("trans", "trans.queue.key", JSONUtil.toJsonStr(noticeMessage));
        return null;
    }
}

对于这个装璜者模式,之前有讲到过,能够看下之前公布的内容。

上游服务监听音讯,解决完本人的业务逻辑后(如:业绩、分润、降职等),须要发送 MQ,上游服务监听音讯,更新本地事务状态为已解决。这须要留神的是上游服务须要做幂等解决,避免异常情况下,上游服务数据的重试。

/**
 * @author 往事如风
 * @version 1.0
 * @date 2022/12/13 18:07
 * @description
 */
@Component
@Slf4j
@RabbitListener(queues = "trans.queue")
public class FenRunListener {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitHandler
    public void orderHandler(String msg) {log.info("监听到订单音讯:{}", msg);
        // 须要留神幂等,幂等逻辑
        log.info("上游服务业务逻辑。。。。。");
        JSONObject json = JSONUtil.parseObj(msg);
        rabbitTemplate.convertAndSend("trans", "trans.update.order.queue.key", json.getInt("id"));
    }
}

这里插个题外话,对于幂等的解决,我这里大抵有两种思路
1、比方依据订单号查一下记录是否存在,存在就间接返回胜利。
2、redis 存一个惟一的申请号,解决完再删除,不存在申请号的间接返回胜利,能够写个 AOP 去解决,与业务隔离。
言归正传,上游服务音讯监听,上游发送 MQ 音讯,更新本地事务音讯为已解决,分布式事务流程完结。

/**
 * @author 往事如风
 * @version 1.0
 * @date 2022/12/13 18:29
 * @description
 */
@Component
@Slf4j
@RabbitListener(queues = "trans.update.order.queue")
public class OrderListener {

    @Autowired
    private NoticeMessageMapper noticeMessageMapper;

    @RabbitHandler
    public void updateOrder(Integer msgId) {log.info("监听音讯,更新本地事务音讯,音讯 id:{}", msgId);
        NoticeMessage msg = NoticeMessage.builder().status(2).id(msgId).updateTime(LocalDateTime.now()).build();
        noticeMessageMapper.updateById(msg);
    }
}

存在异常情况时,会通过定时工作,轮询的往 MQ 中发送音讯,尽最大致力去让上游服务达到数据统一,当然重试也要设置下限;若达到下限当前还始终是失败,那不得不思考是上游服务本身存在问题了(有可能就是代码逻辑存在问题)。

/**
 * @author 往事如风
 * @version 1.0
 * @date 2022/12/14 10:25
 * @description
 */
@Configuration
@EnableScheduling
@AllArgsConstructor
@Slf4j
public class RetryOrderJob {

    private final RabbitTemplate rabbitTemplate;

    private final NoticeMessageMapper noticeMessageMapper;

    /**
     * 最大主动重试次数
     */
    private final Integer MAX_RETRY_COUNT = 5;

    @Scheduled(cron = "0/20 * * * * ?")
    public void retry() {log.info("定时工作,重试异样订单");
        LambdaQueryWrapper<NoticeMessage> wrapper = Wrappers.lambdaQuery(NoticeMessage.class);
        wrapper.eq(NoticeMessage::getStatus, 1);
        List<NoticeMessage> noticeMessages = noticeMessageMapper.selectList(wrapper);
        for (NoticeMessage noticeMessage : noticeMessages) {
            // 从新发送 mq 音讯
            rabbitTemplate.convertAndSend("trans", "trans.queue.key", JSONUtil.toJsonStr(noticeMessage));
            // 重试次数 +1
            noticeMessage.setRetryCount(noticeMessage.getRetryCount() + 1);
            noticeMessageMapper.updateById(noticeMessage);
            // 判断重试次数,等于最长限度次数,间接更新为报警状态
            if (MAX_RETRY_COUNT.equals(noticeMessage.getRetryCount())) {noticeMessage.setStatus(3);
                noticeMessageMapper.updateById(noticeMessage);
                // 发送告警,告诉对应人员
                // 告警逻辑(短信、邮件、企微群,等等)....
            }
        }
    }
}

其实这里有个问题,一个上游服务对应多个上游服务的时候。这个时候往往不能存一条本地音讯记录。

  1. 这里能够在音讯表多加个字段 next_server_count,示意一个订单发起方,须要调用的上游服务数量。上游服务监听的时候,每次会与上游的回调都减去 1,直到数值是 0 的时候,再更新状态是已解决。然而要管制并发,这个字段是被多个上游服务共享的。
  2. 还有一种解决计划是为每个上游服务,都记录一条事务音讯,用 type 字段去辨别,标记类型。实现上游和上游对于事务音讯的一对一关系。
  3. 最初,达到最大重试次数当前,能够将音讯退出到一个告警列表,这个告警列表能够展现在治理后盾或其余监控零碎中,展现一些必要的信息,去供公司内部人员去人工染指,解决这种异样的数据,使得数据达到最终一致性。

四、总结

其实分布式事务没有一个完满的解决计划,只能说是尽量去满足业务需要,满足数据统一。如果程序不能解决了,最初由人工去兜底,做数据的弥补计划。

五、参考源码

编程文档:https://gitee.com/cicadasmile/butte-java-note

利用仓库:https://gitee.com/cicadasmile/butte-flyer-parent
正文完
 0