简介:订单状态流转是交易系统的最为外围的工作,订单零碎往往都会存在状态多、链路长、逻辑简单的特点,还存在多场景、多类型、多业务维度等业务个性。在保障订单状态流转稳定性的前提下、可扩展性和可维护性是咱们须要重点关注和解决的问题。
作者 | 亮言
起源 | 阿里技术公众号
一 背景
订单状态流转是交易系统的最为外围的工作,订单零碎往往都会存在状态多、链路长、逻辑简单的特点,还存在多场景、多类型、多业务维度等业务个性。在保障订单状态流转稳定性的前提下、可扩展性和可维护性是咱们须要重点关注和解决的问题。
以高德打车业务的订单状态为例,订单状态就有乘客下单、司机接单、司机已达到乘车点、开始行程、行程完结、确认费用、领取胜利、订单勾销、订单敞开等;订单车型有专车、慢车、出租车等几种车型,而专车又分舒适型、豪华型、商务型等;业务场景接送机、企业用车、城际拼车等等场景。
当订单状态、类型、场景、以及其余一些维度组合时,每一种组合都可能会有不同的解决逻辑、也可能会存在共性的业务逻辑,这种状况下代码中各种 if-else 必定是不敢设想的。怎么解决这种 ” 多状态 + 多类型 + 多场景 + 多维度 ” 的简单订单状态流转业务,又要保障整个零碎的可扩展性和可维护性,本文的解决思路和计划同大家一起探讨。
二 实现计划
要解决 ” 多状态 + 多类型 + 多场景 + 多维度 ” 的简单订单状态流转业务,咱们从纵向和横向两个维度进行设计。纵向次要从业务隔离和流程编排的角度登程解决问题、而横向次要从逻辑复用和业务扩大的角度解决问题。
1 纵向解决业务隔离和流程编排
状态模式的利用
通常咱们解决一个多状态或者多维度的业务逻辑,都会采纳状态模式或者策略模式来解决,咱们这里不探讨两种设计模式的异同,其外围其实能够概括为一个词 ” 分而治之 ”,形象一个根底逻辑接口、每一个状态或者类型都实现该接口,业务解决时依据不同的状态或者类型调用对应的业务实现,以达到逻辑互相独立互不烦扰、代码隔离的目标。
这不仅仅是从可扩展性和可维护性的角度登程,其实咱们做架构做稳定性、隔离是一种缩小影响面的根本伎俩,相似的隔离环境做灰度、分批公布等,这里不做扩大。
/**
* 状态机处理器接口
*/
public interface StateProcessor {
/**
* 执行状态迁徙的入口
*/
void action(StateContext context) throws Exception;
}
/**
* 状态 A 对应的状态处理器
*/
public class StateAProcessor interface StateProcessor {
/**
* 执行状态迁徙的入口
*/
@Override
public void action(StateContext context) throws Exception {}}
繁多状态或类型能够通过下面的办法解决,那么 ” 多状态 + 多类型 + 多场景 + 多维度 ” 这种组合业务呢,当然也能够采纳这种模式或思路来解决。首先在开发阶段通过一个注解 @OrderPorcessor 将不同的维度予以组合、开发出多个对应的具体实现类,在零碎运行阶段,通过判断上下文来动静抉择具体应用哪一个实现类执行。@OrderPorcessor 中别离定义 state 代表以后处理器要解决的状态,bizCode 和 sceneId 别离代表业务类型和场景,这两个字段留给业务进行扩大,比方能够用 bizCode 代表产品或订单类型、sceneId 代表业务状态或起源场景等等,如果要扩大多个维度的组合、也能够用多个维度拼接后的字符串赋值到 bizCode 和 sceneId 上。
受限于 Java 枚举不能继承的标准,如果要开发通用的性能、注解中就不能应用枚举、所以此处只好应用 String。
/**
* 状态机引擎的处理器注解标识
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Component
public @interface OrderProcessor {
/**
* 指定状态,state 不能同时存在
*/
String[] state() default {};
/**
* 业务
*/
String[] bizCode() default {};
/**
* 场景
*/
String[] sceneId() default {};}
/**
* 创立订单状态对应的状态处理器
*/
@OrderProcessor(state = "INIT", bizCode = {"CHEAP","POPULAR"}, sceneId = "H5")
public class StateCreateProcessor interface StateProcessor {}
再想一下,因为波及到状态流转,不可能会是一个状态 A 只能流转到状态 B、状态 A 可能在不同的场景下流转到状态 B、状态 C、状态 D;还有尽管都是由状态 A 流转到状态 B、然而不同的场景解决流程也可能不一样,比方都是将订单从从待领取状态进行领取、用户被动发动领取和零碎免密领取的流程可能就不一样。针对下面这两种状况、咱们把这里的 ” 场景 ” 对立封装为 ” 事件(event)”,以 ” 事件驱动 ” 的形式来管制状态的流向,一个状态遇到一个特定的处理事件来决定该状态的业务解决流程和最终状态流向。咱们能够总结下,其实状态机模式简略说就是:基于某些特定业务和场景下,依据源状态和产生的事件,来执行下一步的流程解决逻辑,并设置一个指标状态。
这里有人可能有一些疑难,这个 ” 事件 ” 和下面说的 ” 多场景 ”、” 多维度 ” 有什么不一样。解释一下,咱们这里说的是 ” 事件 ” 是一个具体的业务要执行的动作,比方用户下单是一个业务事件、用户勾销订单是一个业务事件、用户领取订单也是一个业务事件。而 ” 多场景 ”、” 多维度 ” 则是可交由业务自行进行扩大的维度,比方自有规范模式起源的订单、通过开放平台 API 来的订单、通过第三方规范起源的订单,某某小程序、某某 APP 起源能够定义为不同场景,而接送机、企业用车、拼车等能够定义为维度。
public @interface OrderProcessor {
/**
* 指定状态
*/
String[] state() default {};
/**
* 订单操作事件
*/
String event();
......
}
/**
* 订单状态迁徙事件
*/
public interface OrderStateEvent {
/**
* 订单状态事件
*/
String getEventType();
/**
* 订单 ID
*/
String getOrderId();
/**
* 如果 orderState 不为空,则代表只有订单是以后状态才进行迁徙
*/
default String orderState() {return null;}
/**
* 是否要新创建订单
*/
boolean newCreate();}
状态迁徙流程的封装
在满足了下面说的多维度组合的业务场景、开发多个实现类来执行的状况,咱们思考执行这些实现类在流程上是否有再次形象和封装的中央、以缩小研发工作量和尽量的实现通用流程。咱们通过察看和形象,发现每一个订单状态流转的流程中,都会有三个流程:校验、业务逻辑执行、数据更新长久化;于是再次形象,能够将一个状态流转分为数据筹备(prepare)——> 校验(check)——> 获取下一个状态(getNextState)——> 业务逻辑执行(action)——> 数据长久化(save)——> 后续解决(after)这六个阶段;而后通过一个模板办法将六个阶段办法串联在一起、造成一个有程序的执行逻辑。这样一来整个状
清晰和简略了、可维护性上也失去的肯定的晋升。
![上传中...]()
/**
* 状态迁徙动作解决步骤
*/
public interface StateActionStep<T, C> {
/**
* 筹备数据
*/
default void prepare(StateContext<C> context) { }
/**
* 校验
*/
ServiceResult<T> check(StateContext<C> context);
/**
* 获取以后状态处理器处理完毕后,所处于的下一个状态
*/
String getNextState(StateContext<C> context);
/**
* 状态动作办法,次要状态迁徙逻辑
*/
ServiceResult<T> action(String nextState, StateContext<C> context) throws Exception;
/**
* 状态数据长久化
*/
ServiceResult<T> save(String nextState, StateContext<C> context) throws Exception;
/**
* 状态迁徙胜利,长久化后执行的后续解决
*/
void after(StateContext<C> context);
}
/**
* 状态机处理器模板类
*/
@Component
public abstract class AbstractStateProcessor<T, C> implements StateProcessor<T, C>, StateActionStep<T, C> {
@Override
public final ServiceResult<T> action(StateContext<C> context) throws Exception {
ServiceResult<T> result = null;
try {
// 数据筹备
this.prepare(context);
// 串行校验器
result = this.check(context);
if (!result.isSuccess()) {return result;}
// getNextState 不能在 prepare 前,因为有的 nextState 是依据 prepare 中的数据转换而来
String nextState = this.getNextState(context);
// 业务逻辑
result = this.action(nextState, context);
if (!result.isSuccess()) {return result;}
// 长久化
result = this.save(nextState, context);
if (!result.isSuccess()) {return result;}
// after
this.after(context);
return result;
} catch (Exception e) {throw e;}
}
/**
* 状态 A 对应的状态处理器
*/
@OrderProcessor(state = "INIT", bizCode = {"CHEAP","POPULAR"}, sceneId = "H5")
public class StateCreateProcessor extends AbstractStateProcessor<String, CreateOrderContext> {......}
(1)校验器
下面提到了校验(check),咱们都晓得任何一个状态的流转甚至接口的调用其实都少不了一些校验规定,尤其是对于简单的业务、其校验规定和校验逻辑也会更加简单。那么对于这些校验规定怎么解耦呢,既要将校验逻辑从简单的业务流程中解耦进去、同时又须要把简单的校验规定简单化,使整个校验逻辑更具备可扩展性和可维护性。其实做法也比较简单、参考下面的逻辑,只须要形象一个校验器接口 checker、把简单的校验逻辑拆开、造成多个繁多逻辑的校验器实现类,状态处理器在调用 check 时只须要调用一个接口、由校验器执行多个 checker 的汇合就能够了。将校验器 checker 进行封装之后,发现要退出一个新的校验逻辑就非常简略了,只须要写一个新的 checker 实现类退出校验器就行、对其余代码根本没有改变。
/**
* 状态机校验器
*/
public interface Checker<T, C> {ServiceResult<T> check(StateContext<C> context);
/**
* 多个 checker 时的执行程序
*/
default int order() {return 0;}
}
逻辑简略了、扩展性和维护性解决了、性能问题就会显现出来。多个校验器 checker 串行执行性能必定性能比拟差,此时很简略的能够想到应用并行执行,是的、此处应用多线程并行执行多个校验器 checker 能显著进步执行效率。然而也应该意识到,有些校验器逻辑可能是有前后依赖的(其实不应该呈现),还有写业务流程中要求某些校验器的执行必须有前后程序,还有些流程不要求校验器的执行程序然而要求谬误时的返回程序、那么怎么在并行的前提下保障程序呢、此处就能够用 order+Future 实现了。通过一系列的思考和总结,咱们把校验器分为参数校验(paramChecker)、同步校验(syncChecker)、异步校验(asyncChecker)三种类型,其中参数校验 paramChecker 是须要在状态处理器最开始处执行的,为什么这么做、因为参数都不非法了必定没有持续向下执行的必要了。
/**
* 状态机校验器
*/
public interface Checkable {
/**
* 参数校验
*/
default List<Checker> getParamChecker() {return Collections.EMPTY_LIST;}
/**
* 需同步执行的状态查看器
*/
default List<Checker> getSyncChecker() {return Collections.EMPTY_LIST;}
/**
* 可异步执行的校验器
*/
default List<Checker> getAsyncChecker() {return Collections.EMPTY_LIST;}
}
/**
* 校验器的执行器
*/
public class CheckerExecutor {
/**
* 执行并行校验器,* 依照工作投递的程序判断返回。*/
public ServiceResult<T, C> parallelCheck(List<Checker> checkers, StateContext<C> context) {if (!CollectionUtils.isEmpty(checkers)) {if (checkers.size() == 1) {return checkers.get(0).check(context);
}
List<Future<ServiceResult>> resultList = Collections.synchronizedList(new ArrayList<>(checkers.size()));
checkers.sort(Comparator.comparingInt(Checker::order));
for (Checker c : checkers) {Future<ServiceResult> future = executor.submit(() -> c.check(context));
resultList.add(future);
}
for (Future<ServiceResult> future : resultList) {
try {ServiceResult sr = future.get();
if (!sr.isSuccess()) {return sr;}
} catch (Exception e) {log.error("parallelCheck executor.submit error.", e);
throw new RuntimeException(e);
}
}
}
return new ServiceResult<>();}
}
checkable 在模板办法中的应用。public interface StateActionStep<T, C> {Checkable getCheckable(StateContext<C> context);
....
}
public abstract class AbstractStateProcessor<T, C> implements StateProcessor<T>, StateActionStep<T, C> {
@Resource
private CheckerExecutor checkerExecutor;
@Override
public final ServiceResult<T> action(StateContext<C> context) throws Exception {
ServiceResult<T> result = null;
Checkable checkable = this.getCheckable(context);
try {
// 参数校验器
result = checkerExecutor.serialCheck(checkable.getParamChecker(), context);
if (!result.isSuccess()) {return result;}
// 数据筹备
this.prepare(context);
// 串行校验器
result = checkerExecutor.serialCheck(checkable.getSyncChecker(), context);
if (!result.isSuccess()) {return result;}
// 并行校验器
result = checkerExecutor.parallelCheck(checkable.getAsyncChecker(), context);
if (!result.isSuccess()) {return result;}
......
}
checkable 在具体状态处理器中的代码利用举例。@OrderProcessor(state = "INIT", bizCode = {"CHEAP","POPULAR"}, sceneId = "H5")
public class OrderCreatedProcessor extends AbstractStateProcessor<String, CreateOrderContext> {
@Resource
private CreateParamChecker createParamChecker;
@Resource
private UserChecker userChecker;
@Resource
private UnfinshChecker unfinshChecker;
@Override
public Checkable getCheckable(StateContext<CreateOrderContext> context) {return new Checkable() {
@Override
public List<Checker> getParamChecker() {return Arrays.asList(createParamChecker);
}
@Override
public List<Checker> getSyncChecker() {return Collections.EMPTY_LIST;}
@Override
public List<Checker> getAsyncChecker() {return Arrays.asList(userChecker, unfinshChecker);
}
};
}
......
checker 的定位是校验器,负责校验参数或业务的合法性,但理论编码过程中、checker 中可能会有一些长期状态类操作,比方在校验之前进行计数或者加锁操作、在校验实现后依据后果进行开释,这里就须要反对对立的开释性能。public interface Checker<T, C> {
......
/**
* 是否需要 release
*/
default boolean needRelease() {return false;}
/**
* 业务执行实现后的开释办法,
* 比方有些业务会在 checker 中加一些状态操作,等业务执行实现后依据后果抉择解决这些状态操作,
* 最典型的就是 checker 中加一把锁,release 依据后果开释锁.
*/
default void release(StateContext<C> context, ServiceResult<T> result) {}}
public class CheckerExecutor {
/**
* 执行 checker 的开释操作
*/
public <T, C> void releaseCheck(Checkable checkable, StateContext<C> context, ServiceResult<T> result) {List<Checker> checkers = new ArrayList<>();
checkers.addAll(checkable.getParamChecker());
checkers.addAll(checkable.getSyncChecker());
checkers.addAll(checkable.getAsyncChecker());
checkers.removeIf(Checker::needRelease);
if (!CollectionUtils.isEmpty(checkers)) {if (checkers.size() == 1) {checkers.get(0).release(context, result);
return;
}
CountDownLatch latch = new CountDownLatch(checkers.size());
for (Checker c : checkers) {executor.execute(() -> {
try {c.release(context, result);
} finally {latch.countDown();
}
});
}
try {latch.await();
} catch (InterruptedException e) {throw new RuntimeException(e);
}
}
}
}
2)上下文
从下面代码能够发现,整个状态迁徙的几个办法都是应用上下文 Context 对象串联的。Context 对象中一共有三类对象,(1)订单的根本信息(订单 ID、状态、业务属性、场景属性)、(2)事件对象(其参数根本就是状态迁徙行为的入参)、(3)具体处理器决定的泛型类。个别要将数据在多个办法中进行传递有两种计划:一个是包装应用 ThreadLocal、每个办法都能够对以后 ThreadLocal 进行赋值和取值;另一种是应用一个上下文 Context 对象做为每个办法的入参传递。这种计划都有一些优缺点,应用 ThreadLocal 其实是一种 ” 隐式调用 ”,尽管能够在 ” 随处 ” 进行调用、然而对应用方其实不显著的、在中间件中会大量应用、在开发业务代码中是须要尽量避免的;而应用 Context 做为参数在办法中进行传递、能够无效的缩小 ” 不可知 ” 的问题。
不论是应用 ThreadLocal 还是 Context 做为参数传递,对于理论承载的数据载体有两种计划,常见的是应用 Map 做为载体,业务在应用的时候能够依据须要随便的设置任何 kv,然而这种状况对代码的可维护性和可读性是极大的挑战,所以这里应用泛型类来固定数据格式,一个具体的状态解决流程到底须要对哪些数据做传递须要明确定义好。其实准则是一样的,业务开发尽量用用可见性防止不可知。
public class StateContext<C> {
/**
* 订单操作事件
*/
private OrderStateEvent orderStateEvent;
/**
* 状态机须要的订单根本信息
*/
private FsmOrder fsmOrder;
/**
* 业务可定义的上下文泛型对象
*/
private C context;
public StateContext(OrderStateEvent orderStateEvent, FsmOrder fsmOrder) {
this.orderStateEvent = orderStateEvent;
this.fsmOrder = fsmOrder;
}
......
/**
* 状态机引擎所需的订单信息基类信息
*/
public interface FsmOrder {
/**
* 订单 ID
*/
String getOrderId();
/**
* 订单状态
*/
String getOrderState();
/**
* 订单的业务属性
*/
String bizCode();
/**
* 订单的场景属性
*/
String sceneId();}
(3)迁徙到的状态断定
为什么要把下一个状态(getNextState)形象为独自一个步骤、而不是交由业务本人进行设置呢?是因为要迁徙到的下一个状态不肯定是固定的,就是说依据以后状态和产生的事件、再遇到更加细节的逻辑时也可能会流转到不同的状态。举个例子,以后状态是用户已下单实现、要产生的事件是用户勾销订单,此时依据不同的逻辑,订单有可能流转到勾销状态、也有可能流转到勾销待审核状态、甚至有可能流转到勾销待领取费用状态。当然这里要取决于业务系统对状态和事件定义的粗细和状态机的复杂程度,做为状态机引擎、这里把下一个状态的断定交由业务依据上下文对象本人来判断。
getNextState()应用及状态迁徙长久化举例:
@OrderProcessor(state = OrderStateEnum.INIT, event = OrderEventEnum.CREATE, bizCode = "BUSINESS")
public class OrderCreatedProcessor extends AbstractStateProcessor<String, CreateOrderContext> {
........
@Override
public String getNextState(StateContext<CreateOrderContext> context) {// if (context.getOrderStateEvent().getEventType().equals("xxx")) {
// return OrderStateEnum.INIT;
// }
return OrderStateEnum.NEW;
}
@Override
public ServiceResult<String> save(String nextState, StateContext<CreateOrderContext> context) throws Exception {OrderInfo orderInfo = context.getContext().getOrderInfo();
// 更新状态
orderInfo.setOrderState(nextState);
// 长久化
// this.updateOrderInfo(orderInfo);
log.info("save BUSINESS order success, userId:{}, orderId:{}", orderInfo.getUserId(), orderInfo.getOrderId());
return new ServiceResult<>(orderInfo.getOrderId(), "business 下单胜利");
}
}
状态音讯
一般来说,所有的状态迁徙都应该收回对应的音讯,由上游生产方订阅进行相应的业务解决。
(1)状态音讯内容
对于状态迁徙音讯的发送内容通常有两种模式,一个是只发状态产生迁徙这个告诉、举例子就是只发送 ” 订单 ID、变更前状态、变更后状态 ” 等几个关键字段,具体上游业务须要哪些具体内容在调用相应的接口进行反查;还有一种是发送所有字段进来、相似于发一个状态变更后的订单内容快照,上游接到音讯后简直不须要在调用接口进行反查。
(2)状态音讯的时序
状态迁徙是有时序的,因而很多上游依赖方也须要判断音讯的程序。一种实现计划是应用程序音讯(rocketmq、kafka 等),但基于并发吞吐量的思考很少采纳这种计划;个别都是在音讯体中退出 ” 音讯发送工夫 ” 或者 ” 状态变更工夫 ” 字段,有生产方本人进行解决。
(3)数据库状态变更和音讯的一致性
状态变更须要和音讯保持一致吗?
很多时候是须要的,如果数据库状态变更胜利了、然而状态音讯没有发送进来、则会导致一些上游依赖方解决逻辑的缺失。而咱们晓得,数据库和音讯零碎是无奈保障 100% 统一的,咱们要保障的是次要数据库状态变更了、音讯就要尽量靠近 100% 的发送胜利。
那么怎么保障呢?
其实通常的确有几种计划:
a)应用 rocketmq 等反对的两阶段式音讯提交形式:
先向音讯服务器发送一条预处理音讯
当本地数据库变更提交之后、再向音讯服务器发送一条确认发送的音讯
如果本地数据库变更失败、则向音讯服务器发送一条勾销发送的音讯
如果长时间没有向音讯服务器产生确认发送的音讯,音讯零碎则会回调一个提前约定的接口、来查看本地业务是否胜利,以此决定是否真正产生音讯
b)应用数据库事务计划保障:
创立一个音讯发送表,将要发送的音讯插入到该表中,同本地业务在一个数据库事务中进行提交
之后在由一个定时工作来轮询发送、直到发送胜利后在删除以后表记录
c)还是应用数据库事务计划保障:
创立一个音讯发送表,将要发送的音讯插入到该表中,同本地业务在一个数据库事务中进行提交
向音讯服务器发送音讯
发送胜利则删除掉以后表记录
对于没有发送胜利的音讯(也就是表外面没有被删除的记录),再由定时工作来轮询发送
image.png
还有其余计划吗?有的。
d)数据对账、发现不统一时进行弥补解决、以此保证数据的最终统一。其实不论应用哪种计划来保障数据库状态变更和音讯的统一,数据对账的计划都是 ” 必须 ” 要有的一种兜底计划。
那么、还有其余计划吗?还是有的,对于数据库状态变更和音讯的一致性的问题,细节比拟多,每种计划又都有相应的优缺点,本文次要是介绍状态机引擎的设计,对于音讯一致性的问题就不过多介绍,前面兴许会有独自的文章对数据库变更和音讯的一致性的问题进行介绍和探讨。
2 横向解决逻辑复用和实现业务扩大
实现基于 ” 多类型 + 多场景 + 多维度 ” 的代码拆散治理、以及规范解决流程模板的状态机模型之后,其实在真正编码的时候会发现不同类型不同维度对于同一个状态的流程处理过程,有时多个解决逻辑中的一部分流程一样的或者是类似的,比方领取环节不论是采纳免密还是其余形式、其中核销优惠券的解决逻辑、设置发票金额的解决逻辑等都是一样的;甚至有些时候多个类型间的解决逻辑大部分是雷同的而差别是小局部,比方下单流程的解决逻辑根本逻辑都差不多,而出租车比照网约车可能就多了出租车红包、无预估价等个别流程的差别。
对于下面这种状况、其实就是要实现在纵向解决业务隔离和流程编排的根底上,须要反对小局部逻辑或代码段的复用、或者大部分流程的复用,缩小反复建设和开发。对此咱们在状态机引擎中反对两种解决方案:
基于插件化的解决方案
插件的次要逻辑是:能够在业务逻辑执行(action)、数据长久化(save)这两个节点前加载对应到的插件类进行执行,次要是对上下文 Context 对象进行操作、或者依据 Context 参数发动不同的流程调用,已达到扭转业务数据或流程的目标。
(1)规范流程 + 差异化插件
下面讲到同一个状态模型下、不同的类型或维度有些逻辑或解决流程是一样的小局部逻辑是不同的。于是咱们能够把一种解决流程定义为规范的或默认的解决逻辑,把差异化的代码写成插件,当业务执行到具体差异化逻辑时会调用到不同的插件进行解决,这样只须要为不同的类型或维度编写对应有差别逻辑的插件即可、规范的解决流程由默认的处理器执行就行。
(2)差别流程 + 专用插件
当然对于小局部逻辑和代码能够专用的场景,也能够用插件化的计划解决。比方对于同一个状态下多个培修下不同处理器中、咱们能够把雷同的逻辑或代码封装成一个插件,多个处理器中都能够辨认加载该插件进行执行,从而实现多个差别的流程应用想用插件的模式。
/**
* 插件注解
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Component
public @interface ProcessorPlugin {
/**
* 指定状态,state 不能同时存在
*/
String[] state() default {};
/**
* 订单操作事件
*/
String event();
/**
* 业务
*/
String[] bizCode() default {};
/**
* 场景
*/
String[] sceneId() default {};}
* 插件处理器
*/
public interface PluginHandler<T, C> extends StateProcessor<T, C> {
}
Plug 在处理器模板中的执行逻辑。public abstract class AbstractStateProcessor<T, C> implements StateProcessor<T>, StateActionStep<T, C> {
@Override
public final ServiceResult<T> action(StateContext<C> context) throws Exception {
ServiceResult<T> result = null;
try {
......
// 业务逻辑
result = this.action(nextState, context);
if (!result.isSuccess()) {return result;}
// 在 action 和 save 之间执行插件逻辑
this.pluginExecutor.parallelExecutor(context);
// 长久化
result = this.save(nextState, context));
if (!result.isSuccess()) {return result;}
......
} catch (Exception e) {throw e;}
}
插件应用的例子:/**
* 预估价插件
*/
@ProcessorPlugin(state = OrderStateEnum.INIT, event = OrderEventEnum.CREATE, bizCode = "BUSINESS")
public class EstimatePricePlugin implements PluginHandler<String, CreateOrderContext> {
@Override
public ServiceResult action(StateContext<CreateOrderContext> context) throws Exception {// String price = priceSerive.getPrice();
String price = "";
context.getContext().setEstimatePriceInfo(price);
return new ServiceResult();}
}
基于代码继承形式的解决方案
当发现新增一个状态不同维度的解决流程,和以后已存在的一个处理器大部分逻辑是雷同的,此时就能够使新写的这个处理器 B 继承已存在的处理器 A,只须要让处理器 B 覆写 A 中不同办法逻辑、实现差别逻辑的替换。这种计划比拟好了解,然而须要处理器 A 曾经布局好一些能够扩大的点、其余处理器能够基于这些扩大点进行覆写替换。当然更好的计划其实是,先实现一个默认的处理器,把所有的规范解决流程和可扩大点进行封装实现、其余处理器进行继承、覆写、替换就好。
@OrderProcessor(state = OrderStateEnum.INIT, event = OrderEventEnum.CREATE, bizCode = "CHEAP")
public class OrderCreatedProcessor extends AbstractStateProcessor<String, CreateOrderContext> {
@Override
public ServiceResult action(String nextState, StateContext<CreateOrderContext> context) throws Exception {CreateEvent createEvent = (CreateEvent) context.getOrderStateEvent();
// 促销信息信息
String promtionInfo = this.doPromotion();
......
}
/**
* 促销相干扩大点
*/
protected String doPromotion() {return "1";}
}
@OrderProcessor(state = OrderStateEnum.INIT, event = OrderEventEnum.CREATE, bizCode = "TAXI")
public class OrderCreatedProcessor4Taxi extends OrderCreatedProcessor<String, CreateOrderContext> {
@Override
protected String doPromotion() {return "taxt1";}
}
3 状态迁徙流程的执行流程
状态机引擎的执行过程
通过下面的介绍,大体明确了怎么实现状态流程编排、业务隔离和扩大等等,然而状态机引擎具体是怎么把这个过程串联起来的呢?简略说、分为两个阶段:初始化阶段和运行时阶段。
(1)状态机引擎初始化阶段
首先在代码编写阶段、依据下面的剖析,业务通过实现 AbstractStateProcessor 模板类、并增加 @OrderProcessor 注解来实现本人的多个须要的特定状态处理器。
那么在零碎初始化阶段,所有增加了 @OrderProcessor 注解的实现类都会被 spring 所治理成为 spring bean,状态机引擎在通过监听 spring bean 的注册(BeanPostProcessor)来将这些状态处理器 processor 装载到本人治理的容器中。直白来说、这个状态处理器容器其实就是一个多层 map 实现的,第一层 map 的 key 是状态(state),第二层 map 的 key 是状态对应的事件(event)、一个状态能够有多个要解决的事件,第三层 map 的 key 是具体的场景 code(也就是 bizCode 和 sceneId 的组合),最初的 value 是 AbstractStateProcessor 汇合。
public class DefaultStateProcessRegistry implements BeanPostProcessor {
/**
* 第一层 key 是订单状态。* 第二层 key 是订单状态对应的事件,一个状态能够有多个事件。* 第三层 key 是具体场景 code,场景下对应的多个处理器,须要后续进行过滤抉择出一个具体的执行。
*/
private static Map<String, Map<String, Map<String, List<AbstractStateProcessor>>>> stateProcessMap = new ConcurrentHashMap<>();
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if (bean instanceof AbstractStateProcessor && bean.getClass().isAnnotationPresent(OrderProcessor.class)) {OrderProcessor annotation = bean.getClass().getAnnotation(OrderProcessor.class);
String[] states = annotation.state();
String event = annotation.event();
String[] bizCodes = annotation.bizCode().length == 0 ? new String[]{"#"} : annotation.bizCode();
String[] sceneIds = annotation.sceneId().length == 0 ? new String[]{"#"} : annotation.sceneId();
initProcessMap(states, event, bizCodes, sceneIds, stateProcessMap, (AbstractStateProcessor) bean);
}
return bean;
}
private <E extends StateProcessor> void initProcessMap(String[] states, String event, String[] bizCodes, String[] sceneIds,
Map<String, Map<String, Map<String, List<E>>>> map, E processor) {for (String bizCode : bizCodes) {for (String sceneId : sceneIds) {Arrays.asList(states).parallelStream().forEach(orderStateEnum -> {registerStateHandlers(orderStateEnum, event, bizCode, sceneId, map, processor);
});
}
}
}
/**
* 初始化状态机处理器
*/
public <E extends StateProcessor> void registerStateHandlers(String orderStateEnum, String event, String bizCode, String sceneId,
Map<String, Map<String, Map<String, List<E>>>> map, E processor) {
// state 维度
if (!map.containsKey(orderStateEnum)) {map.put(orderStateEnum, new ConcurrentHashMap<>());
}
Map<String, Map<String, List<E>>> stateTransformEventEnumMap = map.get(orderStateEnum);
// event 维度
if (!stateTransformEventEnumMap.containsKey(event)) {stateTransformEventEnumMap.put(event, new ConcurrentHashMap<>());
}
// bizCode and sceneId
Map<String, List<E>> processorMap = stateTransformEventEnumMap.get(event);
String bizCodeAndSceneId = bizCode + "@" + sceneId;
if (!processorMap.containsKey(bizCodeAndSceneId)) {processorMap.put(bizCodeAndSceneId, new CopyOnWriteArrayList<>());
}
processorMap.get(bizCodeAndSceneId).add(processor);
}
}
(2)状态机引擎运行时阶段
通过初始化之后,所有的状态处理器 processor 都被装载到容器。在运行时,通过一个入口来发动对状态机的调用,办法的主要参数是操作事件(event)和业务入参,如果是新创建订单申请须要携带业务(bizCode)和场景(sceneId)信息、如果是已存在订单的更新状态机引擎会依据 oderId 主动获取业务(bizCode)、场景(sceneId)和以后状态(state)。之后引擎依据 state+event+bizCode+sceneId 从状态处理器容器中获取到对应的具体处理器 processor,从而进行状态迁徙解决。
/**
* 状态机执行引擎
*/
public interface OrderFsmEngine {
/**
* 执行状态迁徙事件,不传 FsmOrder 默认会依据 orderId 从 FsmOrderService 接口获取
*/
<T> ServiceResult<T> sendEvent(OrderStateEvent orderStateEvent) throws Exception;
/**
* 执行状态迁徙事件,可携带 FsmOrder 参数
*/
<T> ServiceResult<T> sendEvent(OrderStateEvent orderStateEvent, FsmOrder fsmOrder) throws Exception;
}
@Component
public class DefaultOrderFsmEngine implements OrderFsmEngine {
@Override
public <T> ServiceResult<T> sendEvent(OrderStateEvent orderStateEvent) throws Exception {
FsmOrder fsmOrder = null;
if (orderStateEvent.newCreate()) {fsmOrder = this.fsmOrderService.getFsmOrder(orderStateEvent.getOrderId());
if (fsmOrder == null) {throw new FsmException(ErrorCodeEnum.ORDER_NOT_FOUND);
}
}
return sendEvent(orderStateEvent, fsmOrder);
}
@Override
public <T> ServiceResult<T> sendEvent(OrderStateEvent orderStateEvent, FsmOrder fsmOrder) throws Exception {
// 结构以后事件上下文
StateContext context = this.getStateContext(orderStateEvent, fsmOrder);
// 获取以后事件处理器
StateProcessor<T> stateProcessor = this.getStateProcessor(context);
// 执行解决逻辑
return stateProcessor.action(context);
}
private <T> StateProcessor<T, ?> getStateProcessor(StateContext<?> context) {OrderStateEvent stateEvent = context.getOrderStateEvent();
FsmOrder fsmOrder = context.getFsmOrder();
// 依据状态 + 事件对象获取所对应的业务处理器汇合
List<AbstractStateProcessor> processorList = stateProcessorRegistry.acquireStateProcess(fsmOrder.getOrderState(),
stateEvent.getEventType(), fsmOrder.bizCode(), fsmOrder.sceneId());
if (processorList == null) {
// 订单状态产生扭转
if (!Objects.isNull(stateEvent.orderState()) && !stateEvent.orderState().equals(fsmOrder.getOrderState())) {throw new FsmException(ErrorCodeEnum.ORDER_STATE_NOT_MATCH);
}
throw new FsmException(ErrorCodeEnum.NOT_FOUND_PROCESSOR);
}
if (CollectionUtils.isEmpty(processorResult)) {throw new FsmException(ErrorCodeEnum.NOT_FOUND_PROCESSOR);
}
if (processorResult.size() > 1) {throw new FsmException(ErrorCodeEnum.FOUND_MORE_PROCESSOR);
}
return processorResult.get(0);
}
private StateContext<?> getStateContext(OrderStateEvent orderStateEvent, FsmOrder fsmOrder) {StateContext<?> context = new StateContext(orderStateEvent, fsmOrder);
return context;
}
}
检测到多个状态执行器怎么解决
有一点要阐明,有可能依据 state+event+bizCode+sceneId 信息获取到的是多个状态处理器 processor,有可能的确业务须要单纯依赖 bizCode 和 sceneId 两个属性无奈无效辨认和定位惟一 processor,那么咱们这里给业务开一个口、由业务决定从多个处理器当选一个适宜以后上下文的,具体做法是业务 processor 通过 filter 办法依据以后 context 来判断是否合乎调用条件。
private <T> StateProcessor<T, ?> getStateProcessor(StateContext<?> context) {
// 依据状态 + 事件对象获取所对应的业务处理器汇合
List<AbstractStateProcessor> processorList = ...
......
List<AbstractStateProcessor> processorResult = new ArrayList<>(processorList.size());
// 依据上下文获取惟一的业务处理器
for (AbstractStateProcessor processor : processorList) {if (processor.filter(context)) {processorResult.add(processor);
}
}
......
}
filter 在具体状态处理器 processor 中的应用举例:
@OrderProcessor(state = OrderStateEnum.INIT, event = OrderEventEnum.CREATE, bizCode = "BUSINESS")
public class OrderCreatedProcessor extends AbstractStateProcessor<String, CreateOrderContext> {
......
@Override
public boolean filter(StateContext<CreateOrderContext> context) {OrderInfo orderInfo = (OrderInfo) context.getFsmOrder();
if (orderInfo.getServiceType() == ServiceType.TAKEOFF_CAR) {return true;}
return false;
}
......
}
当然,如果最终通过业务 filter 之后,还是有多个状态处理器符合条件,那么这里只能抛异样解决了。这个须要在开发时,对状态和多维度处理器有具体布局。
4 状态机引擎执行总结
状态机引擎解决流程
繁难的状态机引擎的执行流程整顿,次要介绍运行时的状态机执行过程。
状态处理器的原理
繁难的状态机处理器的原理和依赖整顿,次要介绍状态处理器的流程和细节。
三 其余
还有其余问题么,想一下。
1 状态流转并发问题怎么解决?
如果一个订单以后是状态 A、此刻从不同的维度或入口别离发动了不同的事件申请,此时怎么解决?
比方以后订单是新创建实现状态,用户发动了勾销同时客服也发动了勾销,在或者订单是待领取状态、零碎发动了免密领取同时客服或者用户发动了改价。这些场景不论是零碎照成的并发还是业务操作造成的并发,并发是实在存在的。对于这种状况、准则是同一时刻一个订单只能有一个状态变更事件可进行,其余的申请要么排队、要么返回由上游进行解决或重试等。
咱们的做法是:
在状态机 OrderFsmEngine 的 sendEvent 入口处,针对同一个订单维度加锁(redis 分布式锁)、同一时间只容许有一个状态变更操作进行,其余申请则进行排队期待。
在数据库层对以后 state 做校验、相似与乐观锁形式。最终是将其余申请抛错、由上游业务进行解决。
2 能不能动静实现状态流程的切换和编排?
最开始咱们有一个版本,状态处理器的定义不是由注解形式实现、而是将 state、event、bizCode、sceneId、processor 这些通过数据库表来保留,初始化时从数据库加载后进行处理器的装载。同时通过一个后盾能够动静的调整 state、event、bizCode、sceneId、processor 对应关系、以此来达到动静灵便配置流程的成果,然而随着业务的上线,根本素来没有进行动静变更过,其实也不敢操作,毕竟状态流转事非常外围的业务、一旦因变更导致故障是不可设想的。
3 通用性的问题
其实不仅仅订单零碎、甚至不仅是状态机逻辑能够用下面讲的这些思路解决,很多日常中其余一些多维度的业务都能够采取这些计划进行解决。
4 与 TMF 的联合
其实这套状态机引擎还是比较简单的、对于业务扩大点处的定义也不是非常敌对,目前咱们也正在联合 TMF 框架来定制扩大点,TMF 是从执行具体扩大点实现的角度登程,达到规范流程和具体业务逻辑拆散的成果。
当然不论那种计划,扩大点的定义是业务须要外围关怀和敌对封装的事件。
原文链接
本文为阿里云原创内容,未经容许不得转载。