简介:在柔性事务的多种实现中,事务音讯是最为优雅易用的一种。基于阿里云 RocketMQ 高性能、高可用的特点,齐全能够胜任抢购业务这类高并发大流量的场景。但引入事务音讯机制在实现高性能的同时,也减少了整体的业务复杂度。咱们须要对业务场景进行充沛评估,抉择与本身业务特点最为匹配的一种,能力更好地施展柔性事务的价值。
前言
在电商畛域,抢购和秒杀是十分广泛业务模式,抢购类业务在疾速拉升用户流量并为音讯者带来实惠的同时,也给电商零碎带来了微小考验。在高并发、大流量的冲击下,零碎的性能和稳定性至关重要,任何一个环节呈现故障,都会影响整体的购物体验,甚至造成电商零碎的大面积解体。和电商畛域抢购场景极为相似的业务模式还有很多,比方大型赛事和在线教育的报名零碎,以及各类购票零碎等。
针对抢购类业务在技术上带来的挑战,业界有一系列解决方案,通过不同维度来晋升零碎的性能与稳定性,包含动静拆散、定时上架、异步解决、令牌队列、多级缓存、舞弊行为侦测、流量防护、全链路压测等。
本文重点聚焦在如何确保抢购类业务的一致性上,分布式事务始终是 IT 界老大难的问题,而抢购业务所具备的高并发、大流量特色,更是成倍增加了分布式一致性的实现难度。以下将介绍如何通过事务音讯构建满足抢购类业务要求的高性能高可用分布式一致性机制。
事务一致性原理回顾
事务是应用程序中一系列紧密的操作,这一系列操作是一个不可分割的工作单位,它们要么全副执行胜利,要么一个都不做。事务具备四个特色:原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持续性(Durability),这四个个性简称为 ACID 个性。
在非并发状态下,保障事务的 ACID 个性是轻而易举的事件,如果某一个操作执行不胜利,把后面的操作全副回滚就 OK 了。而在并发状态下,因为有多个事务同时操作同一个资源,对于事务 ACID 个性的保障就会艰难一些,如果思考得不周全,就会遇到如下几个问题:
- 脏读:事务 A 读到了事务 B 还没有提交的数据。
- 不可反复读:在一个事务外面对某个数据读取了两次,读出来的数据不统一。
- 幻读:在一个事务对某个数据集用同样的形式读取了两次,数据集的条目数量不统一。
为了应答上述并发状况下呈现的问题,就须要通过肯定的事务隔离级别来解决。当事务的隔离级别越高的时候,上述问题产生的机会就越小,然而性能耗费也会越大。所以在理论生产过程中,要依据理论需要去确定隔离级别:
- READ_UNCOMMITTED(读未提交):最低的隔离级别,能够读到未提交的数据,无奈解决脏读、不可反复读、幻读中的任何一种。
- READ_COMMITED(读已提交):可能避免脏读,然而无奈解决不可反复读和幻读的问题。
- REPEATABLE_READ(反复读取):对同一条数据的多次重复读取能保持一致,解决了脏读、不可反复读的问题,然而幻读的问题还是无奈解决。
- SERLALIZABLE(串行化):最高的事务隔离级别,防止了事务的并行执行,解决了脏读、不可反复读和幻读的问题,但性能最低。
关系型数据库提供了对于事务的反对,可能通过不同隔离级别的设置,确保并发状态下事务的 ACID 个性。但关系型数据库提供的能力是基于单机事务的,一旦遇到分布式事务场景,就须要通过更多其余技术手段来解决问题。
抢购业务中的分布式事务
有如下三种状况可能会产生分布式事务:
- 一个事务操作蕴含对两个数据库的操作:数据库所提供的事务保障仅能局限在对于本身的操作上,无奈逾越到其余数据库。
2、一个事务蕴含对多个数据分片的操作:具体的分片规定由分库分表中间件或者分库分表 SDK 来实现,有可能逾越多个数据库或同一个数据库的多个表。对于业务逻辑而言,底层的数据分片状况是不通明的,因而也没有方法依赖于数据库提供的单机事务机制。
3、一个事务包含对多个服务的调用:在微服务畛域,这是极为常见的场景,不同的服务应用不同的数据资源,甚至波及到更为简单的调用链路。在这种状况下,数据库提供的单机事务机制,仅仅能保障其中繁多环节的 ACID 个性,没有方法延长到全局。
微服务技术在电商畛域的遍及水平是十分高的,比拟大型的电商利用还会通过中台思维将共性业务能力进行积淀,因而抢购业务中的很多环境都属于跨服务的分布式调用,会波及到上述第三种分布式事务状态。比方在订单领取胜利后,交易中心会调用订单核心的服务把订单状态更新,并调用物流核心的服务告诉商品发货,同时还要调用积分核心的服务为用户减少相应的积分。如何保障分布式事务一致性,成为了确保抢购业务稳固运行的外围诉求之一。
分布式事务的实现形式
传统分布式事务
传统的分布式事务通过 XA 模型实现,通过一个事务协调者,站在全局的角度将多个子事务合并成一个分布式事务。XA 模型之所以能在分布式事务畛域失去宽泛应用,是因为其具备如下两个方面的劣势:
- 提供了强一致性保障,在业务执行的任何工夫点都能确保事务一致性。
- 应用简略。常见的关系型数据库都提供了对 XA 协定的反对,通过引入事务协调器,业务代码跟应用单机事务相比基本上没有差异。
然而在互联网畛域,XA 模型的分布式事务实现存在很多局限性,在抢购业务这样的高并发大流量场景中更是被齐全弃用。咱们拿 XA 分布式协定中最广泛的两阶式提交计划,来阐明为什么 XA 模型并不适宜互联网场景。
- 性能问题。在两段式提交的执行过程中,所有参加节点都是事务阻塞型的,须要长时间锁定资源。这会导致系统整体的并发吞吐量变低,在抢购业务中是不可承受的。
- 单点故障问题。事务协调者在链路中有着至关重要的作用,一旦协调者产生故障,参与者会始终阻塞上来,整个零碎将无奈工作,因而须要投入微小的精力来保障事务协调者的高可用性。
- 数据不统一问题。在阶段二中,如果协调者向参与者发送 commit 申请之后,产生了网络异样,会导致只有一部分参与者接管到了 commit 申请,没有接管到 commit 申请的参与者最终会执行回滚操作,从而造成数据不统一景象。在抢购业务中,这样的数据不统一有可能会对企业或消费者造成微小的经济损失。
因而 XA 模型是一个理想化的分布式事务模型,并没有思考到高并发、网络故障等理论因素,即使是在两阶段提交的根底上,诞生了三阶段提交这样的实现形式,也没有方法从根本上解决性能和数据不统一的问题。
柔性事务
针对传统分布式事务计划在互联网畛域的局限性,业界提出了 CAP 实践以及 BASE 实践,在此基础上诞生了在大型互联网利用中宽泛应用的柔性事务。柔性事务的核心思想是放弃传统分布式事务中对于严格一致性的要求,容许在事务执行过程中存在数据不统一的中间状态,在业务上须要容忍中间状态的存在。柔性事务会提供欠缺的机制,保障在一段时间的中间状态后,零碎能走向最终统一状态。
遵循 BASE 实践的柔性事务放弃了隔离性,减小了事务中锁的粒度,使得利用可能更好的利用数据库的并发性能,实现吞吐量的线性扩大。异步执行形式能够更好地适应分布式环境,在网络抖动、节点故障的状况下可能尽量保障服务的可用性。因而在高并发、大流量的抢购业务中,柔性事务是最佳的抉择。
传统分布式事务
柔性事务
业务革新
无
有
一致性
强一致性
最终统一
回滚
反对
实现回退接口
隔离性
反对
放弃隔离性或实现资源锁定接口
并发性能
低
高
适宜场景
低并发、短事务
高并发、长事务
柔性事务有多种实现形式,包含 TCC、Saga、事务音讯、最大致力告诉等,本文将重点介绍通过事务音讯实现柔性事务。
事务音讯原理剖析
抢购业务场景拆解
咱们联合抢购业务的实在场景,剖析如何通过事务音讯实现分布式一致性。在抢购业务中,有两个十分要害的阶段,须要引入分布式事务机制,别离是订单创立阶段和付款胜利阶段。
从字面含意来看,抢购业务就隐含了一个重要的前提:库存是无限的。因而在订单创立的时候,须要事后查看库存状况,并绝对应的库存进行锁定,以避免商品超卖。如果库存锁定操作失败,代表库存有余,必须确保订单不能被胜利创立。在锁定库存后,如果因为某种异常情况导致订单创立失败,必须及时将之前锁定的库存进行开释操作,以便让其余用户能够从新抢夺对应的商品。
如果抢购零碎实现了购物车机制,在订单创立的同时,则须要从购买车中将相应的条目删除。
基于微服务架构的业务拆分,订单创立阶段的 3 个行为很有可能通过 3 个不同的微服务利用实现,因而须要通过分布式事务来保证数据一致性。
订单创立实现后,会期待用户付款,一旦付款胜利,就会触发付款胜利阶段的执行逻辑。这个阶段同样是通过分布式事务来实现,蕴含批改订单状态、扣减库存、告诉发货、减少积分这 4 个子事务,它们要么全副不执行,要么全副执行胜利。
当然,在实在的抢购业务中,状况有可能会更加的简单,本文列出的只是其中最具代表性的几类业务行为。
引入音讯异步告诉机制
传统的分布式事务存在一个很大的弊病是参加节点都是事务阻塞型的,须要长时间锁定资源。以锁定库存 -> 创立订单这个流程为例,借助于 Redis 等缓存零碎,单纯锁定库存的操作只须要破费毫秒级的工夫,能够承载十分高的并发量。但如果把创立订单的操作也思考进来,加上不同微服务利用之间互相通信的时候,整体耗时有可能超过 1 秒,导致性能急剧下降。
假如存在一种异步音讯机制,让分布式事务的第一个参与方在执行完本地事务后,通过触发一笔音讯告诉事务的其余参与方实现后续的操作,就能将大事务拆解为多个本地子事务来离开执行。在这种模式下,事务的多个参与方之间之间并不需要彼此阻塞和期待,就能极大水平地晋升并发吞吐能力。对于库存核心而言,在高并发场景下,只须要一直的执行锁定库存记录操作,并一直通过异步音讯告诉订单核心创立订单,只有异步音讯机制能确保音讯肯定送达,并失去正确处理,就可能实现分布式最终一致性。
先执行本地事务,还是先发送异步音讯?
在这个模型中,异步音讯的发送交给了分布式事务的第一个参与方来实现,这个参与方就领有了两个职责:执行本地事务 和发送异步音讯。到底应该先执行本地事务,还是先发异步音讯呢?
第一种计划是先发送异步音讯,再执行本地事务。这样做必定是不行的,如果本地事务没有执行胜利,异步音讯曾经收回去了,其余事务参与方收到音讯后会执行对应的近程事务,造成数据不统一。
第二种计划是先执行本地事务,再发送异步音讯。这样做可能解决本地事务执行失败导致的数据不统一问题,因为只有在本地事务执行胜利的状况下,才会发送异步音讯。但如果事务的参与方在执行本地事务胜利后,本人宕机了,就再有没有机会发送异步音讯了,因而这样做同样会造成数据不统一的问题。记住:在实在场景中,任何一个利用节点都不是 100% 牢靠的,都存在宕机的可能性。
一个可行的计划是引入能够处理事务音讯的音讯队列集群,用于异步音讯的直达。一个事务音讯蕴含两种状态:首先,事务的参与方发送一笔 半事务音讯 到音讯队列,示意本人行将执行本地事务,音讯队列集群在收到这个半事务音讯后,不会马上进行投递,而是进行暂存。在执行完本地事务后,事务的参考方再发送一笔 确认音讯 到音讯队列集群,告知本地事务的执行状态。如果本地事务执行胜利,音讯队列集群会将之前收到的半事务音讯进行投递;如果本地事务执行失败,音讯队列集群间接删除之前收到的半事务音讯,这样近程事务就不会被执行,从而保障了最终一致性。
同样,如果事务参与方在执行完本地事务后宕机了怎么办呢?这就须要音讯队列集群具备回查机制:如果收到半事务音讯后,在特定工夫内没有再收到确认音讯,会反过来申请事务参与方查问本地事务的执行状态,并给予反馈。这样,即使错过了确认音讯,音讯队列集群也有能力理解到本地事务的执行状态,从而决定是否将音讯进行投递。在一个微服务利用中,会存在多个对等的利用实例,这也就代表着即使一个事务参与方的实例在执行完本地事务后宕机了,音讯队列集群仍然能够通过这个实例的兄弟实例理解到本地事务执行的最终状态。
如何确保近程事务能执行胜利?
如果所有本地事务的执行,以及异步音讯的投递都一切顺利的话,接下来还会存在另外两种数据不统一的可能性:
- 音讯队列集群在将异步音讯投递到近程事务参与方的时候,因为网络不稳固,音讯没能投递胜利。
- 音讯投递胜利了,但近程事务参与方还没来得及执行近程事务,就宕机了。
这两种状况都会导致近程事务执行失败,所以须要建设一种 音讯重试机制,让近程事务参与方在实现工作后(实际上对近程事务参与方而言,这个工作是它要执行的本地工作),给予音讯队列集群一个反馈,告知异步音讯曾经失去了正确的解决。否则,音讯队列会在肯定工夫后,周期性的反复投递音讯,直到它收到了来自近程事务参与方的反馈,以确保近程事务肯定能执行胜利。
和事务回查机制相似,近程事务参与方也有多个对等的微服务实例,即使某个实例在没来得及执行近程事务的时候宕机,音讯队列也能够将工作交给这个实例的兄弟实例来实现。
残缺流程
事务音讯实战
理解到事务音讯的原理后,咱们不难得出一个论断:音讯队列集群在整个流程中起着至关重要的作用,如果音讯队列集群不可用,所有波及到分布式事务的业务都将停止!因而,咱们须要一个高可用的音讯队列集群,可能始终保持在工作状态,即使其某个组件呈现故障,也可能在短时间内主动复原,不会影响业务,还能确保接管到的音讯不失落。
音讯队列 RocketMQ
音讯队列 RocketMQ 版是阿里云基于 Apache RocketMQ 构建的低提早、高并发、高可用、高牢靠的分布式消息中间件。该产品最后由阿里巴巴自研并捐献给 Apache 基金会,服务于阿里团体 13 年,笼罩全团体所有业务,包含品种金融级场景。作为双十一交易外围链路的官网指定产品,撑持千万级并发、万亿级数据洪峰,历年刷新寰球最大的交易音讯流转记录。
阿里云音讯队列 RocketMQ 提供了对于事务音讯机制最残缺实现,包含半事务音讯、确认音讯、事务回查机制、音讯重试等重要性能。此外,音讯队列 RocketMQ 还提供了极强的高可用能力以及数据可靠性,能够确保在各种极其场景下都能提供稳固的服务,并确保音讯不失落。
对于开发者而言,应用云上的音讯队列 RocketMQ,能够罢黜音讯队列集群的搭建和保护工作,将更多的精力投入到实现业务逻辑的工作中。当音讯队列集群的性能不能满足要求时,还能够十分不便的进行集群一键扩容,以取得更高的并发吞吐量。
开明 RocketMQ 服务
在阿里云官方网站开明音讯队列服务前方可开始应用音讯队列 RocketMQ,如果应用 RAM 用户拜访 RocketMQ,还必须先为 RAM 用户进行受权。在实现阿里云账户注册以及实名认证后,关上音讯队列 RocketMQ 版产品页,点击 收费开明,页面跳转至音讯队列 RocketMQ 版控制台,在弹出的提醒对话框中,实现 RocketMQ 服务的开明。
接下来,登录 RAM 控制台,在左侧导航栏抉择 人员治理 > 用户,在 用户 页面,单击指标 RAM 用户 操作 列的 增加权限 ,在 增加权限 面板,单击须要授予 RAM 用户的权限策略,单击 确定 。音讯队列 RocketMQ 提供多种零碎策略,能够依据权限范畴为 RAM 用户授予相干权限。为了简略起见,咱们先开明AliyunMQFullAccess 权限策略,授予该 RAM 用户所有音讯收发权限和控制台所有性能操作权限。
创立资源
在调用 SDK 收发音讯前,需在音讯队列 RocketMQ 控制台创立相干资源,在调用 SDK 时需填写这些资源信息。首先,咱们要创立 RocketMQ 实例,实例是用于音讯队列 RocketMQ 服务的虚拟机资源,相当于一个独立的音讯队列集群,会存储音讯 Topic 和客户端 Group ID 信息。咱们还须要留神,只有在同一个地区下的同一个实例中的 Topic 和 Group ID 能力互通,例如,某 Topic 创立在华东 1(杭州)地区的实例 A 中,那么该 Topic 只能被在华东 1(杭州)地区的实例 A 中创立的 Group ID 对应的生产端和生产端拜访。
登录到音讯队列 RocketMQ 控制台,在左侧导航栏,单击 实例列表,在顶部菜单栏,抉择地区,如 华东 1(杭州),在 实例列表 页面,单击 创立实例,在 创立 RocketMQ 实例 面板,实现实例的创立。
接下来,在实例所在页面的左侧导航栏,单击 Topic 治理。在Topic 治理 页面,单击 创立 Topic,在 创立 Topic面板,输出 名称 和形容 ,抉择该 Topic 的 音讯类型 为事务音讯,实现 Topic 的创立。
Topic 是音讯队列 RocketMQ 版里对音讯的一级归类,例如创立 Topic_Trade 这一 Topic 来辨认交易类音讯,音讯生产者将音讯发送到 Topic_Trade,而音讯消费者则通过订阅该 Topic 来获取和生产音讯。
创立完实例和 Topic 后,须要为音讯的消费者和或生产者创立客户端 ID,即 Group ID 作为标识。在事务音讯的场景中,须要创立 2 个不同的 Group ID,别离代表本地事务参与方和近程事务参与方。在实例所在页面的左侧导航栏,单击 Group 治理, 在Group 治理 页面,抉择 TCP 协定 > 创立 Group ID, 在创立可用于 TCP 协定的 Group面板,实现本地事务客户端 Group ID 的创立。反复此操作,实现近程事务参与方 Group ID 的创立。
本地事务参与方的业务代码
本文将通过 Java 代码介绍如何实现事务音讯相干的业务逻辑,为了简化业务逻辑,咱们持续基于 锁定库存 – > 创立订单 这个流程来演示,在这个流程中,仅有 2 个事务参与方。
初始化 TransactionProducer
咱们先通过 Maven 引入音讯队列 RocketMQ 的 SDK,优先应用阿里云官网提供的 TCP 版 SDK。
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.7.2.Final</version>
</dependency>
顺利引入 Log4j2 用于日志输入。
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.13.1</version>
</dependency>
在库存核心的代码中,咱们须要初始化一个 TransactionProducer,用于异步音讯的发送,须要填入如下信息:
- Group ID:之前创立的用于本地事务参与方的 Group ID。
- Access key 和 Secret Key:RAM 用户对应的密钥信息,从 RAM 用户控制台取得。
- Nameserver Address:RocketMQ 实例的接入点信息,从 RocketMQ 控制台取得。
Properties properties = new Properties();
// 您在控制台创立的 Group ID。留神:事务音讯的 Group ID 不能与其余类型音讯的 Group ID 共用。properties.put(PropertyKeyConst.GROUP_ID, "XXX");
// AccessKey ID 阿里云身份验证,在阿里云 RAM 控制台创立。properties.put(PropertyKeyConst.AccessKey, "XXX");
// AccessKey Secret 阿里云身份验证,在阿里云 RAM 控制台创立。properties.put(PropertyKeyConst.SecretKey, "XXX");
// 设置 TCP 接入域名,进入音讯队列 RocketMQ 版控制台的实例详情页面的 TCP 协定客户端接入点区域查看。properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
// LocalTransactionCheckerImpl 本地事务回查类的实现
TransactionProducer producer = ONSFactory.createTransactionProducer(properties,
new LocalTransactionCheckerImpl());
producer.start();
TransactionProducer 是线程平安的,启动后能在多线程环境中复用。
获取全局惟一的交易流水号
在发送半事务音讯以及执行本地事务之前,咱们须要先获取一个全局惟一的交易流水号,订单与交易流水号一一对应,接下来的事务音讯机制都会依赖于这个这个交易流水号。咱们能够通过引入第三方 ID 生成组件,或者在本地通过 Snowflake 算法实现。
实现本地事务回查逻辑
创立一个实现了 LocalTransactionChecker 接口的
LocalTransactionCheckerImpl 类,实现其中的 check(Message) 办法,该办法返回本地事务的最终状态。至于具体的业务逻辑如何实现,不在本文探讨的范畴之前,咱们将其封装在 BusinessService 类中。
package transaction;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LocalTransactionCheckerImpl implements LocalTransactionChecker {private static Logger LOGGER = LoggerFactory.getLogger(LocalTransactionCheckerImpl.class);
private static BusinessService businessService = new BusinessService();
@Override
public TransactionStatus check(Message msg) {
// 从音讯体中取得的交易 ID
String transactionKey = msg.getKey();
TransactionStatus transactionStatus = TransactionStatus.Unknow;
try {boolean isCommit = businessService.checkbusinessService(transactionKey);
if (isCommit) {transactionStatus = TransactionStatus.CommitTransaction;} else {transactionStatus = TransactionStatus.RollbackTransaction;}
} catch (Exception e) {LOGGER.error("Transaction Key:{}", transactionKey, e);
}
LOGGER.warn("Transaction Key:{}transactionStatus:{}", transactionKey, transactionStatus.name());
return transactionStatus;
}
}
执行本地事务并发送异步音讯
咱们先组装一条异步音讯,其中蕴含了全局交易 ID,音讯将要发往的 Topic,以及音讯体。近程事务参与方将通过这个音讯体中获取执行近程事务所必须的数据信息。
接下来,将这条异步音讯连同一个实现了 LocalTransactionExecuter 接口的匿名类,通过 send 办法进行发送,这就是本地事务参与方所须要实现的所有业务代码了。当然,这个匿名类实现了 TransactionStatus execute.execute()办法,其中蕴含了对于本地事务的执行。残缺代码如下:
package transaction;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class TransactionProducerClient {private static Logger LOGGER = LoggerFactory.getLogger(TransactionProducerClient.class);
private static final BusinessService businessService = new BusinessService();
private static final String TOPIC = "create_order";
private static final TransactionProducer producer = null;
static {Properties properties = new Properties();
// 您在控制台创立的 Group ID。留神:事务音讯的 Group ID 不能与其余类型音讯的 Group ID 共用。properties.put(PropertyKeyConst.GROUP_ID, "XXX");
// AccessKey ID 阿里云身份验证,在阿里云 RAM 控制台创立。properties.put(PropertyKeyConst.AccessKey, "XXX");
// AccessKey Secret 阿里云身份验证,在阿里云 RAM 控制台创立。properties.put(PropertyKeyConst.SecretKey, "XXX");
// 设置 TCP 接入域名,进入音讯队列 RocketMQ 版控制台的实例详情页面的 TCP 协定客户端接入点区域查看。properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
// LocalTransactionCheckerImpl 本地事务回查类的实现
TransactionProducer producer = ONSFactory.createTransactionProducer(properties,
new LocalTransactionCheckerImpl());
producer.start();}
public static void main(String[] args) throws InterruptedException {String transactionKey = getGlobalTransactionKey();
String messageContent = String.format("lock inventory for: %s", transactionKey);
Message message = new Message(TOPIC, null, transactionKey, messageContent.getBytes());
try {SendResult sendResult = producer.send(message, (msg, arg) -> {// 此处用 Lambda 示意,理论是实现 TransactionStatus execute(final Message msg, final Object arg)办法
TransactionStatus transactionStatus = TransactionStatus.Unknow;
try {boolean localTransactionOK = businessService.execbusinessService(transactionKey);
if (localTransactionOK) {transactionStatus = TransactionStatus.CommitTransaction;} else {transactionStatus = TransactionStatus.RollbackTransaction;}
} catch (Exception e) {LOGGER.error("Transaction Key:{}", transactionKey, e);
}
LOGGER.warn("Transaction Key:{}", transactionKey);
return transactionStatus;
}, null);
LOGGER.info("send message OK, Transaction Key:{}, result:{}", message.getKey(), sendResult);
} catch (Exception e) {LOGGER.info("send message failed, Transaction Key:{}", message.getKey());
}
// demo example 避免过程退出
TimeUnit.MILLISECONDS.sleep(Integer.MAX_VALUE);
}
private static String getGlobalTransactionKey() {
// TODO
return "";
}
}
得益于 RocketMQ SDK 优良的封装,发送半事务音讯、发送确认音讯、事务回查等重要步骤都曾经残缺实现,不须要开发者再编写代码了,这将为用户带来特地顺畅开发体验。
近程事务参与方的业务代码
绝对本地事务参与方而言,近程事务参与方的代码更加简略,只须要从异步音讯中提取出对应信息,实现对近程事务的执行即可。
package transaction;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class TransactionConsumerClient {private static Logger LOGGER = LoggerFactory.getLogger(TransactionProducerClient.class);
private static final BusinessService businessService = new BusinessService();
private static final String TOPIC = "create_order";
private static final Consumer consumer = null;
static {Properties properties = new Properties();
// 在控制台创立的 Group ID,不同于本地事务参与方应用的 Group ID
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
// AccessKey ID 阿里云身份验证,在阿里云 RAM 控制台创立。properties.put(PropertyKeyConst.AccessKey, "XXX");
// Accesskey Secret 阿里云身份验证,在阿里云服 RAM 控制台创立。properties.put(PropertyKeyConst.SecretKey, "XXX");
// 设置 TCP 接入域名,进入控制台的实例详情页面的 TCP 协定客户端接入点区域查看。properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.start();}
public static void main(String[] args) {consumer.subscribe(TOPIC, "*", (message, context) -> {LOGGER.info("Receive:" + message);
businessService.doBusiness(message);
// 返回 CommitMessage,代表给予音讯队列集群异步音讯曾经失去失常解决的回馈
return Action.CommitMessage;
}
);
}
}
事务回滚
是否存在这样的状况:当本地事务执行胜利后,因为近程事务没有方法执行,而导致本地事务须要进行回滚操作呢?在事务音讯原理剖析一节,咱们曾经介绍过如何通过音讯重试,确保近程事务可能执行胜利,这是不是曾经阐明只有异步音讯被确认,近程事务就肯定能够执行胜利,从而不存在对本地事务的回滚呢?
理论生产状况下,的确存在近程事务无奈失常执行的状况。比方在付款胜利阶段,当本地事务“批改订单状态”执行实现后,在执行近程事务“告诉发货”的时,因为订单地址有误而被物流公司回绝,这种状况下就必须对订单状态进行回退操作,并发动退款流程。
所以在执行近程事务的时候,咱们有必要辨别如下两种齐全不同的异样:
- 技术异样:近程事务参与方宕机、网络故障、数据库故障等。
- 业务异样:近程逻辑在业务上无奈执行、代码业务逻辑谬误等。
简略来讲,当近程事务执行失败的时候,可能通过音讯重试的形式解决问题的,属于技术异样;否则,属于业务异样。基于事务音讯的分布式事务机制不能实现主动回滚,当业务异样产生的时候,必须通过回退流程确保曾经实现的本地事务失去复原。比方在批改订单状态 -> 告诉发货这个场景中,如果因为业务异样导致无奈发货的时候,须要通过额定的回退流程,将订单状态设置为“已勾销”,并执行退款流程。
在事务音讯机制中,回退流程相当于近程事务参与方和本地事务参与方调换了角色,和失常流程一样,同样也能够通过事务音讯来实现分布式事务。因为失常流程和回退流程的业务逻辑是齐全不一样的,所以最现实的形式是建设另外一个 Topic 来实现。这也就阐明,咱们在创立事务音讯 Topic 的时候,要充分考虑到这个 Topic 背地的业务含意,并 在 Topic 命名上尽可能的与实在业务相匹配。
多个事务参与方
本节展现的示例中,都只波及到 2 个事务参与方,但在真实世界中,分布式事务往往波及到更多的事务参与方,比方之前提到的付款胜利环节,有批改订单状态 -> 扣减库存 -> 告诉发货 -> 减少积分这 4 个须要同时进行的操作,波及到 4 个事务参与方。这种状况下如何通过事务音讯来实现分布式事务呢?
咱们仍然能够持续应用之前的架构,只须要退出多个近程事务参与方就行了。能够通过 RocketMQ 的多生产组关联多个近程事务参与方,每一个参与方对应一个 Group ID,在这种状况下,同一个异步音讯会复制成多份投递给不同的事务参与方。
须要特地引起留神的是,当某个近程事务参与方遇到业务异样的时候,须要 告诉其余所有事务参与方执行回退流程,这无疑会减少业务逻辑的整体复杂度。为了简化事务音讯的执行流程,咱们能够对业务逻辑事后进行梳理,将子事务分为如下两类:
- 有可能产生业务异样的:比方锁定库存的操作,有可能因为库存有余而执行失败。又比方扣除积分的操作,有可能因为用户积分有余而无奈扣除。
- 不太可能产生业务异样的:比方删除购物车条目标操作,除非是技术类故障,肯定能够执行胜利,即使对应的条目并不存在,也没有关系。又比方积分减少的操作,只有对应的用户没有登记,是不可能遇到业务异样的。
咱们尽量将第一类事务作为本地事务而实现,将第二类事务作为近程事务而实现,这样就能够最大水平防止回退流程。
其余注意事项
音讯幂等
RocketMQ 能保障音讯不失落,但不能保障音讯不反复,所以消费者在接管到音讯后,有必要依据业务上的惟一 Key 对音讯做幂等解决。在抢购业务中,惟一 Key 当然就是全局惟一的交易流水号,具体幂等解决办法在互联网上有很多文章供读者参考。当然,不是每一种业务近程事务都须要确保音讯的幂等性,比方删除购物车指定条目这样的操作,在业务上是能够容忍屡次重复执行的,就没有必要引入额定的幂等解决了。
每日对账
不同于传统事务的强一致性保障,柔性事务须要经验一个中间状态,才到达成事务的最终一致性。有某些非凡状况下,这个中间状态会继续十分长的工夫,甚至须要人工被动染指能力实现最终一致性:
- 音讯重试屡次后,仍然不胜利:当消费者齐全无奈失常工作的时候,RocketMQ 不可能永无止境地重试音讯,事实上,如果 16 次重试后异步音讯仍然没有方法被失常解决,RocketMQ 会进行尝试,将音讯放到一个非凡的队列中。
- 未解决的业务异样:比方给某个账号加积分的时候,发现此账号被登记了,这是一个十分常见的业务景象,有可能当时对此并没有强壮的解决机制。
- 幂等校验失败:解决幂等所依赖的零碎比方 Redis 产生了故障,导致某些音讯被反复解决。
- 其余重大的系统故障:比方网络长时间中断,留下了大量执行到一半的事务。
- 其余漏网之鱼。
这些状况下,咱们都有须要通过定期对账机制来进行排查,在必要的时候发动人工被动染指流程,修复不统一的数据。事实上,在任何柔性事务的实现中,每日对账都是必不可少的数据安全保障性伎俩。
总结
在柔性事务的多种实现中,事务音讯是最为优雅易用的一种。基于阿里云 RocketMQ 高性能、高可用的特点,齐全能够胜任抢购业务这类高并发大流量的场景。在阿里巴巴本身的业务中,事务音讯也宽泛应用于双 11 这样的大型营销流动中,有着十分高的通用性。
但在 IT 畛域,没有任何一种技术是银弹,引入事务音讯机制须要针对性的批改业务逻辑,还须要借助于每日对账等额定的伎俩确保数据安全,在实现高性能的同时,也减少了整体的业务复杂度。咱们须要对业务场景进行充沛评估,比照多种不同的技术实现计划,从中筛选与本身业务特点最为匹配的一种,能力更好地施展柔性事务的价值。
作者:山猎,阿里云解决方案架构师
原文链接
本文为阿里云原创内容,未经容许不得转载