关于java:项目终于用上了-Spring-状态机非常优雅

3次阅读

共计 22556 个字符,预计需要花费 57 分钟才能阅读完成。

起源:https://www.duidaima.com/Group/Topic/JAVA/11942

1、什么是状态机

1.1 什么是状态

先来解释什么是“状态”(State)。事实事物是有不同状态的,例如一个自动门,就有 open 和 closed 两种状态。咱们通常所说的状态机是无限状态机,也就是被形容的事物的状态的数量是无限个,例如自动门的状态就是两个 open 和 closed。

状态机,也就是 State Machine,不是指一台理论机器,而是指一个数学模型。说白了,个别就是指一张状态转换图。例如,依据自动门的运行规定,咱们能够形象出上面这么一个图。

自动门有两个状态,open 和 closed,closed 状态下,如果读取开门信号,那么状态就会切换为 open。open 状态下如果读取关门信号,状态就会切换为 closed。

状态机的全称是无限状态自动机,主动两个字也是蕴含重要含意的。给定一个状态机,同时给定它的以后状态以及输出,那么输入状态时能够明确的运算进去的。例如对于自动门,给定初始状态 closed,给定输出“开门”,那么下一个状态时能够运算进去的。

这样状态机的根本定义咱们就介绍结束了。反复一下:状态机是无限状态自动机的简称,是事实事物运行规定形象而成的一个数学模型。

1.2 四大概念

上面来给出状态机的四大概念。

  • 第一个是 State,状态。一个状态机至多要蕴含两个状态。例如下面自动门的例子,有 open 和 closed 两个状态。
  • 第二个是 Event,事件。事件就是执行某个操作的触发条件或者口令。对于自动门,“按下开门按钮”就是一个事件。
  • 第三个是 Action,动作。事件产生当前要执行动作。例如事件是“按开门按钮”,动作是“开门”。编程的时候,一个 Action 个别就对应一个函数。
  • 第四个是 Transition,变换。也就是从一个状态变动为另一个状态。例如“开门过程”就是一个变换。

1.3 状态机

无限状态机(Finite-state machine,FSM),又称无限状态自动机,简称状态机,是示意无限个状态以及在这些状态之间的转移和动作等行为的数学模型。

FSM 是一种算法思维,简略而言,无限状态机由一组状态、一个初始状态、输出和依据输出及现有状态转换为下一个状态的转换函数组成。

其作用次要是形容对象在它的生命周期内所经验的状态序列,以及如何响应来自外界的各种事件。

举荐一个开源收费的 Spring Boot 实战我的项目:

https://github.com/javastacks/spring-boot-best-practice

2、状态机图

做需要时,须要理解以下六种元素:起始、终止、现态、次态(指标状态)、动作、条件,咱们就能够实现一个状态机图了:

以订单为例:以从待领取状态转换为待发货状态为例

  • ①现态:是指以后所处的状态。待领取
  • ②条件:又称为“事件”,当一个条件被满足,将会触发一个动作,或者执行一次状态的迁徙。领取事件
  • ③动作:条件满足后执行的动作。动作执行结束后,能够迁徙到新的状态,也能够仍旧放弃原状态。动作不是必须的,当条件满足后,也能够不执行任何动作,间接迁徙到新状态。状态转换为待发货
  • ④次态:条件满足后要迁往的新状态。“次态”是绝对于“现态”而言的,“次态”一旦被激活,就转变成新的“现态”了。待发货 注意事项

1、防止把某个“程序动作”当作是一种“状态”来解决。那么如何辨别“动作”和“状态”?“动作”是不稳固的,即便没有条件的触发,“动作”一旦执行结束就完结了;而“状态”是绝对稳固的,如果没有内部条件的触发,一个状态会始终继续上来。

2、状态划分时漏掉一些状态,导致跳转逻辑不残缺。所以在设计状态机时,咱们须要重复的查看设计的状态图或者状态表,最终达到一种颠扑不破的设计方案。

3、spring statemachine

3.1 状态机 spring statemachine 概述

Spring Statemachine 是应用程序开发人员在 Spring 应用程序中应用状态机概念的框架

Spring Statemachine 旨在提供以下性能:

  1. 易于应用的扁平单级状态机,用于简略的应用案例。
  2. 分层状态机构造,以简化简单的状态配置。
  3. 状态机区域提供更简单的状态配置。
  4. 应用触发器,转换,警卫和操作。
  5. 键入平安配置适配器。
  6. 生成器模式,用于在 Spring Application 上下文之外应用的简略实例化通常用例的食谱
  7. 基于 Zookeeper 的分布式状态机
  8. 状态机事件监听器。
  9. UML Eclipse Papyrus 建模。
  10. 将计算机配置存储在永恒存储中。
  11. Spring IOC 集成将 bean 与状态机关联起来。

状态机功能强大,因为行为始终保障统一,使调试绝对容易。这是因为操作规定是在机器启动时写成的。这个想法是你的应用程序可能存在于无限数量的状态中,某些预约义的触发器能够将你的应用程序从一个状态转移到另一个状态。此类触发器能够基于事件或计时器。

在应用程序之外定义高级逻辑而后依附状态机来治理状态要容易得多。您能够通过发送事件,侦听更改或仅申请以后状态来与状态机进行交互。

官网:spring.io/projects/sp…

3.2 疾速开始

Spring Boot 根底就不介绍了,举荐看这个实战我的项目:

https://github.com/javastacks/spring-boot-best-practice

以订单状态扭转的例子为例:

表结构设计如下:

CREATE TABLE `tb_order` (`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键 ID',
      `order_code` varchar(128) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '订单编码',
      `status` smallint(3) DEFAULT NULL COMMENT '订单状态',
      `name` varchar(64) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '订单名称',
      `price` decimal(12,2) DEFAULT NULL COMMENT '价格',
      `delete_flag` tinyint(2) NOT NULL DEFAULT '0' COMMENT '删除标记,0 未删除  1 已删除',
      `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创立工夫',
      `update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '更新工夫',
      `create_user_code` varchar(32) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '创建人',
      `update_user_code` varchar(32) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '更新人',
      `version` int(11) NOT NULL DEFAULT '0' COMMENT '版本号',
      `remark` varchar(64) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '备注',
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='订单表';

    /*Data for the table `tb_order` */

    insert  into `tb_order`(`id`,`order_code`,`status`,`name`,`price`,`delete_flag`,`create_time`,`update_time`,`create_user_code`,`update_user_code`,`version`,`remark`) values
    (2,'A111',1,'A','22.00',0,'2022-10-15 16:14:11','2022-10-02 21:29:14','zhangsan','zhangsan',0,NULL),
    (3,'A111',1,'订单 A','22.00',0,'2022-10-02 21:53:13','2022-10-02 21:29:14','zhangsan','zhangsan',0,NULL),
    (4,'A111',1,'订单 A','22.00',0,'2022-10-02 21:53:13','2022-10-02 21:29:14','zhangsan','zhangsan',0,NULL),
    (5,'A111',1,'订单 A','22.00',0,'2022-10-03 09:08:30','2022-10-02 21:29:14','zhangsan','zhangsan',0,NULL);
1)引入依赖
 <!-- redis 长久化状态机 -->
    <dependency>
        <groupId>org.springframework.statemachine</groupId>
        <artifactId>spring-statemachine-redis</artifactId>
        <version>1.2.9.RELEASE</version>
    </dependency>
    <!-- 状态机 -->
    <dependency>
        <groupId>org.springframework.statemachine</groupId>
        <artifactId>spring-statemachine-starter</artifactId>
        <version>2.0.1.RELEASE</version>
    </dependency>
2)定义状态机状态和事件

状态枚举:

/**
* 
*/
public enum OrderStatus {
        // 待领取,待发货,待收货,已实现
        WAIT_PAYMENT(1, "待领取"),
        WAIT_DELIVER(2, "待发货"),
        WAIT_RECEIVE(3, "待收货"),
        FINISH(4, "已实现");
        private Integer key;
        private String desc;
        OrderStatus(Integer key, String desc) {
            this.key = key;
            this.desc = desc;
        }
        public Integer getKey() {return key;}
        public String getDesc() {return desc;}
        public static OrderStatus getByKey(Integer key) {for (OrderStatus e : values()) {if (e.getKey().equals(key)) {return e;}
            }
            throw new RuntimeException("enum not exists.");
        }
    }

事件:

/**
* 
*/
public enum OrderStatusChangeEvent {
        // 领取,发货,确认收货
        PAYED, DELIVERY, RECEIVED;
}
3)定义状态机规定和配置状态机
 @Configuration
    @EnableStateMachine(name = "orderStateMachine")
    public class OrderStateMachineConfig extends StateMachineConfigurerAdapter<OrderStatus, OrderStatusChangeEvent> {
        /**
         * 配置状态
         *
         * @param states
         * @throws Exception
         */
        public void configure(StateMachineStateConfigurer<OrderStatus, OrderStatusChangeEvent> states) throws Exception {
            states
                    .withStates()
                    .initial(OrderStatus.WAIT_PAYMENT)
                    .states(EnumSet.allOf(OrderStatus.class));
        }
        /**
         * 配置状态转换事件关系
         *
         * @param transitions
         * @throws Exception
         */
        public void configure(StateMachineTransitionConfigurer<OrderStatus, OrderStatusChangeEvent> transitions) throws Exception {
            transitions
                    // 领取事件: 待领取 -》待发货
                    .withExternal().source(OrderStatus.WAIT_PAYMENT).target(OrderStatus.WAIT_DELIVER).event(OrderStatusChangeEvent.PAYED)
                    .and()
                    // 发货事件: 待发货 -》待收货
                    .withExternal().source(OrderStatus.WAIT_DELIVER).target(OrderStatus.WAIT_RECEIVE).event(OrderStatusChangeEvent.DELIVERY)
                    .and()
                    // 收货事件: 待收货 -》已实现
                    .withExternal().source(OrderStatus.WAIT_RECEIVE).target(OrderStatus.FINISH).event(OrderStatusChangeEvent.RECEIVED);
        }
    }

配置长久化:

 /**
 * 
 */
    @Configuration
    @Slf4j
    public class Persist<E, S> {
        /**
         * 长久化到内存 map 中
         *
         * @return
         */
        @Bean(name = "stateMachineMemPersister")
        public static StateMachinePersister getPersister() {return new DefaultStateMachinePersister(new StateMachinePersist() {
                @Override
                public void write(StateMachineContext context, Object contextObj) throws Exception {log.info("长久化状态机,context:{},contextObj:{}", JSON.toJSONString(context), JSON.toJSONString(contextObj));
                    map.put(contextObj, context);
                }
                @Override
                public StateMachineContext read(Object contextObj) throws Exception {log.info("获取状态机,contextObj:{}", JSON.toJSONString(contextObj));
                    StateMachineContext stateMachineContext = (StateMachineContext) map.get(contextObj);
                    log.info("获取状态机后果,stateMachineContext:{}", JSON.toJSONString(stateMachineContext));
                    return stateMachineContext;
                }
                private Map map = new HashMap();});
        }

        @Resource
        private RedisConnectionFactory redisConnectionFactory;
        /**
         * 长久化到 redis 中,在分布式系统中应用
         *
         * @return
         */
        @Bean(name = "stateMachineRedisPersister")
        public RedisStateMachinePersister<E, S> getRedisPersister() {RedisStateMachineContextRepository<E, S> repository = new RedisStateMachineContextRepository<>(redisConnectionFactory);
            RepositoryStateMachinePersist p = new RepositoryStateMachinePersist<>(repository);
            return new RedisStateMachinePersister<>(p);
        }
    }
4)业务零碎

controller:

 /**
 * 
 */
    @RestController
    @RequestMapping("/order")
    public class OrderController {
        @Resource
        private OrderService orderService;
        /**
         * 依据 id 查问订单
         *
         * @return
         */
        @RequestMapping("/getById")
        public Order getById(@RequestParam("id") Long id) {
            // 依据 id 查问订单
            Order order = orderService.getById(id);
            return order;
        }
        /**
         * 创立订单
         *
         * @return
         */
        @RequestMapping("/create")
        public String create(@RequestBody Order order) {
            // 创立订单
            orderService.create(order);
            return "sucess";
        }
        /**
         * 对订单进行领取
         *
         * @param id
         * @return
         */
        @RequestMapping("/pay")
        public String pay(@RequestParam("id") Long id) {
            // 对订单进行领取
            orderService.pay(id);
            return "success";
        }

        /**
         * 对订单进行发货
         *
         * @param id
         * @return
         */
        @RequestMapping("/deliver")
        public String deliver(@RequestParam("id") Long id) {
            // 对订单进行确认收货
            orderService.deliver(id);
            return "success";
        }
        /**
         * 对订单进行确认收货
         *
         * @param id
         * @return
         */
        @RequestMapping("/receive")
        public String receive(@RequestParam("id") Long id) {
            // 对订单进行确认收货
            orderService.receive(id);
            return "success";
        }
    }

servie:

 /**
 * 
 */
    @Service("orderService")
    @Slf4j
    public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService {
        @Resource
        private StateMachine<OrderStatus, OrderStatusChangeEvent> orderStateMachine;
        @Resource
        private StateMachinePersister<OrderStatus, OrderStatusChangeEvent, String> stateMachineMemPersister;
        @Resource
        private OrderMapper orderMapper;
        /**
         * 创立订单
         *
         * @param order
         * @return
         */
        public Order create(Order order) {order.setStatus(OrderStatus.WAIT_PAYMENT.getKey());
            orderMapper.insert(order);
            return order;
        }
        /**
         * 对订单进行领取
         *
         * @param id
         * @return
         */
        public Order pay(Long id) {Order order = orderMapper.selectById(id);
            log.info("线程名称:{}, 尝试领取,订单号:{}" ,Thread.currentThread().getName() , id);
            if (!sendEvent(OrderStatusChangeEvent.PAYED, order)) {log.error("线程名称:{}, 领取失败, 状态异样,订单信息:{}", Thread.currentThread().getName(), order);
                throw new RuntimeException("领取失败, 订单状态异样");
            }
            return order;
        }
        /**
         * 对订单进行发货
         *
         * @param id
         * @return
         */
        public Order deliver(Long id) {Order order = orderMapper.selectById(id);
            log.info("线程名称:{}, 尝试发货,订单号:{}" ,Thread.currentThread().getName() , id);
            if (!sendEvent(OrderStatusChangeEvent.DELIVERY, order)) {log.error("线程名称:{}, 发货失败, 状态异样,订单信息:{}", Thread.currentThread().getName(), order);
                throw new RuntimeException("发货失败, 订单状态异样");
            }
            return order;
        }
        /**
         * 对订单进行确认收货
         *
         * @param id
         * @return
         */
        public Order receive(Long id) {Order order = orderMapper.selectById(id);
            log.info("线程名称:{}, 尝试收货,订单号:{}" ,Thread.currentThread().getName() , id);
            if (!sendEvent(OrderStatusChangeEvent.RECEIVED, order)) {log.error("线程名称:{}, 收货失败, 状态异样,订单信息:{}", Thread.currentThread().getName(), order);
                throw new RuntimeException("收货失败, 订单状态异样");
            }
            return order;
        }
        /**
         * 发送订单状态转换事件
         * synchronized 润饰保障这个办法是线程平安的
         *
         * @param changeEvent
         * @param order
         * @return
         */
        private synchronized boolean sendEvent(OrderStatusChangeEvent changeEvent, Order order) {
            boolean result = false;
            try {
                // 启动状态机
                orderStateMachine.start();
                // 尝试复原状态机状态
                stateMachineMemPersister.restore(orderStateMachine, String.valueOf(order.getId()));
                Message message = MessageBuilder.withPayload(changeEvent).setHeader("order", order).build();
                result = orderStateMachine.sendEvent(message);
                // 长久化状态机状态
                stateMachineMemPersister.persist(orderStateMachine, String.valueOf(order.getId()));
            } catch (Exception e) {log.error("订单操作失败:{}", e);
            } finally {orderStateMachine.stop();
            }
            return result;
        }
    }

监听状态的变动:

 /**
 * 
 */
    @Component("orderStateListener")
    @WithStateMachine(name = "orderStateMachine")
    @Slf4j
    public class OrderStateListenerImpl {
        @Resource
        private OrderMapper orderMapper;

        @OnTransition(source = "WAIT_PAYMENT", target = "WAIT_DELIVER")
        public void payTransition(Message<OrderStatusChangeEvent> message) {Order order = (Order) message.getHeaders().get("order");
            log.info("领取,状态机反馈信息:{}",  message.getHeaders().toString());
            // 更新订单
            order.setStatus(OrderStatus.WAIT_DELIVER.getKey());
            orderMapper.updateById(order);
            //TODO 其余业务
        }
        @OnTransition(source = "WAIT_DELIVER", target = "WAIT_RECEIVE")
        public void deliverTransition(Message<OrderStatusChangeEvent> message) {Order order = (Order) message.getHeaders().get("order");
            log.info("发货,状态机反馈信息:{}",  message.getHeaders().toString());
            // 更新订单
            order.setStatus(OrderStatus.WAIT_RECEIVE.getKey());
            orderMapper.updateById(order);
            //TODO 其余业务
        }
        @OnTransition(source = "WAIT_RECEIVE", target = "FINISH")
        public void receiveTransition(Message<OrderStatusChangeEvent> message) {Order order = (Order) message.getHeaders().get("order");
            log.info("确认收货,状态机反馈信息:{}",  message.getHeaders().toString());
            // 更新订单
            order.setStatus(OrderStatus.FINISH.getKey());
            orderMapper.updateById(order);
            //TODO 其余业务
        }
    }

3.3 测试验证

1)验证业务
  • 新增一个订单

    http://localhost:8084/order/create

  • 对订单进行领取

    http://localhost:8084/order/pay?id=2

  • 对订单进行发货

    http://localhost:8084/order/deliver?id=2

  • 对订单进行确认收货

    http://localhost:8084/order/receive?id=2

失常流程完结。如果对一个订单进行领取了,再次进行领取,则会报错:http://localhost:8084/order/pay?id=2

报错如下:

2)验证长久化

内存

应用内存长久化类长久化:

 /**
 * 
 */
 @Resource
    private StateMachinePersister<OrderStatus, OrderStatusChangeEvent, String> stateMachineMemPersister;

    /**
     * 发送订单状态转换事件
     * synchronized 润饰保障这个办法是线程平安的
     *
     * @param changeEvent
     * @param order
     * @return
     */
    private synchronized boolean sendEvent(OrderStatusChangeEvent changeEvent, Order order) {
        boolean result = false;
        try {
            // 启动状态机
            orderStateMachine.start();
            // 尝试复原状态机状态
            stateMachineMemPersister.restore(orderStateMachine, String.valueOf(order.getId()));
            Message message = MessageBuilder.withPayload(changeEvent).setHeader("order", order).build();
            result = orderStateMachine.sendEvent(message);
            // 长久化状态机状态
            stateMachineMemPersister.persist(orderStateMachine, String.valueOf(order.getId()));
        } catch (Exception e) {log.error("订单操作失败:{}", e);
        } finally {orderStateMachine.stop();
        }
        return result;
    }

redis 长久化

引入依赖:

<!-- redis 长久化状态机 -->
<dependency>
    <groupId>org.springframework.statemachine</groupId>
    <artifactId>spring-statemachine-redis</artifactId>
    <version>1.2.9.RELEASE</version>
</dependency>

配置 yaml:

spring:
  redis:
    database: 0
    host: localhost
    jedis:
      pool:
        max-active: 8
        max-idle: 8
        max-wait: ''
        min-idle: 0
    password: ''
    port: 6379
    timeout: 0

应用 redis 长久化类长久化:

 /**
 * 
 */
 @Resource
    private StateMachinePersister<OrderStatus, OrderStatusChangeEvent, String> stateMachineRedisPersister;

    /**
     * 发送订单状态转换事件
     * synchronized 润饰保障这个办法是线程平安的
     *
     * @param changeEvent
     * @param order
     * @return
     */
    private synchronized boolean sendEvent(OrderStatusChangeEvent changeEvent, Order order) {
        boolean result = false;
        try {
            // 启动状态机
            orderStateMachine.start();
            // 尝试复原状态机状态
            stateMachineRedisPersister.restore(orderStateMachine, String.valueOf(order.getId()));
            Message message = MessageBuilder.withPayload(changeEvent).setHeader("order", order).build();
            result = orderStateMachine.sendEvent(message);
            // 长久化状态机状态
            stateMachineRedisPersister.persist(orderStateMachine, String.valueOf(order.getId()));
        } catch (Exception e) {log.error("订单操作失败:{}", e);
        } finally {orderStateMachine.stop();
        }
        return result;
    }

3.4 状态机存在的问题

1)stateMachine 无奈抛出异样,异样会被状态机给消化掉

问题景象

从 orderStateMachine.sendEvent(message); 获取的后果无奈感知到。无论执行失常还是抛出异样,都返回 true。

 @Resource
    private OrderMapper orderMapper;

    @Resource
    private StateMachine<OrderStatus, OrderStatusChangeEvent> orderStateMachine;

    @OnTransition(source = "WAIT_PAYMENT", target = "WAIT_DELIVER")
    @Transactional(rollbackFor = Exception.class)
    public void payTransition(Message<OrderStatusChangeEvent> message) {Order order = (Order) message.getHeaders().get("order");
        log.info("领取,状态机反馈信息:{}",  message.getHeaders().toString());
        try {
            // 更新订单
            order.setStatus(OrderStatus.WAIT_DELIVER.getKey());
            orderMapper.updateById(order);
            //TODO 其余业务
            // 模仿异样
            if(Objects.equals(order.getName(),"A")){throw new RuntimeException("执行业务异样");
            }
        } catch (Exception e) {
            // 如果出现异常,记录异样信息,抛出异样信息进行回滚
            log.error("payTransition 出现异常:{}",e);
            throw e;
        }
    }

监听事件抛出异样,在发送事件中无奈感知:

 private synchronized boolean sendEvent(OrderStatusChangeEvent changeEvent, Order order) {
        boolean result = false;
        try {
            // 启动状态机
            orderStateMachine.start();
            // 尝试复原状态机状态
            stateMachineMemPersister.restore(orderStateMachine, String.valueOf(order.getId()));
            Message message = MessageBuilder.withPayload(changeEvent).setHeader("order", order).build();
             // 事件执行异样了,仍然返回 true,无奈感知异样
            result = orderStateMachine.sendEvent(message);
            if(result){
                // 长久化状态机状态,如果依据 true 长久化,则会呈现问题
                stateMachineMemPersister.persist(orderStateMachine, String.valueOf(order.getId()));
            }
        } catch (Exception e) {log.error("订单操作失败:{}", e);
        } finally {orderStateMachine.stop();
        }
        return result;
    }

调试发现:发送事件和监听事件是一个线程,发送事件的后果是在监听操作执行完之后才返回

监听线程:

解决方案:本人保留异样到数据库或者内存中,进行判断

也能够通过接口:org.springframework.statemachine.StateMachine##getExtendedState

办法把执行状态放入这个变量中

public interface ExtendedState {Map<Object, Object> getVariables();
        <T> T get(Object var1, Class<T> var2);
        void setExtendedStateChangeListener(ExtendedState.ExtendedStateChangeListener var1);
        public interface ExtendedStateChangeListener {void changed(Object var1, Object var2);
        }
    }

org.springframework.statemachine.support.DefaultExtendedState##getVariables

private final Map<Object, Object> variables;

    public DefaultExtendedState() {this.variables = new ObservableMap(new ConcurrentHashMap(), new DefaultExtendedState.LocalMapChangeListener());
    }

    public Map<Object, Object> getVariables() {return this.variables;}

革新监听状态:把业务的执行后果进行保留,1 胜利,0 失败

    @Resource
    private OrderMapper orderMapper;
    @Resource
    private StateMachine<OrderStatus, OrderStatusChangeEvent> orderStateMachine;

    @OnTransition(source = "WAIT_PAYMENT", target = "WAIT_DELIVER")
    @Transactional(rollbackFor = Exception.class)
    public void payTransition(Message<OrderStatusChangeEvent> message) {Order order = (Order) message.getHeaders().get("order");
        log.info("领取,状态机反馈信息:{}",  message.getHeaders().toString());
        try {
            // 更新订单
            order.setStatus(OrderStatus.WAIT_DELIVER.getKey());
            orderMapper.updateById(order);
            //TODO 其余业务
            // 模仿异样
            if(Objects.equals(order.getName(),"A")){throw new RuntimeException("执行业务异样");
            }
            // 胜利 则为 1
            orderStateMachine.getExtendedState().getVariables().put(CommonConstants.payTransition+order.getId(),1);
        } catch (Exception e) {
            // 如果出现异常,则进行回滚
            log.error("payTransition 出现异常:{}",e);
            // 将异样信息变量信息中,失败则为 0
            orderStateMachine.getExtendedState().getVariables().put(CommonConstants.payTransition+order.getId(), 0);
            throw e;
        }
    }

发送事件革新:如果获取到业务执行异样,则返回失败,不进行状态机长久化 com.zengqingfa.springboot.state.demo.service.impl.OrderServiceImpl##sendEvent

 @Resource
    private StateMachine<OrderStatus, OrderStatusChangeEvent> orderStateMachine;
    @Resource
    private StateMachinePersister<OrderStatus, OrderStatusChangeEvent, String> stateMachineMemPersister;

    /**
     * 发送订单状态转换事件
     * synchronized 润饰保障这个办法是线程平安的
     *
     * @param changeEvent
     * @param order
     * @return
     */
    private synchronized boolean sendEvent(OrderStatusChangeEvent changeEvent, Order order){
        boolean result = false;
        try {
            // 启动状态机
            orderStateMachine.start();
            // 尝试复原状态机状态
            stateMachineMemPersister.restore(orderStateMachine, String.valueOf(order.getId()));
            Message message = MessageBuilder.withPayload(changeEvent).setHeader("order", order).build();
            result = orderStateMachine.sendEvent(message);
            if(!result){return false;}
            // 获取到监听的后果信息
            Integer o = (Integer) orderStateMachine.getExtendedState().getVariables().get(CommonConstants.payTransition + order.getId());
            // 操作实现之后, 删除本次对应的 key 信息
            orderStateMachine.getExtendedState().getVariables().remove(CommonConstants.payTransition+order.getId());
            // 如果事务执行胜利,则长久化状态机
            if(Objects.equals(1,Integer.valueOf(o))){
                // 长久化状态机状态
                stateMachineMemPersister.persist(orderStateMachine, String.valueOf(order.getId()));
            }else {
                // 订单执行业务异样
                return false;
            }
        } catch (Exception e) {log.error("订单操作失败:{}", e);
        } finally {orderStateMachine.stop();
        }
        return result;
    }

代码优化

  • 发送事件只针对了领取,如果是非领取事件呢?
// 获取到监听的后果信息
Integer o = (Integer) orderStateMachine.getExtendedState().getVariables().get(CommonConstants.payTransition + order.getId());
  • 监听设置状态的代码有反复代码,须要进行优化,可应用 aop
try {
        //TODO 其余业务
        // 胜利 则为 1
        orderStateMachine.getExtendedState().getVariables().put(CommonConstants.payTransition+order.getId(),1);
    } catch (Exception e) {
        // 如果出现异常,则进行回滚
        log.error("payTransition 出现异常:{}",e);
        // 将异样信息变量信息中,失败则为 0
        orderStateMachine.getExtendedState().getVariables().put(CommonConstants.payTransition+order.getId(), 0);
        throw e;
    }

常量类:

public interface CommonConstants {
        String orderHeader="order";
        String payTransition="payTransition";
        String deliverTransition="deliverTransition";
        String receiveTransition="receiveTransition";
    }

领取发送事件:com.zengqingfa.springboot.state.demo.service.impl.OrderServiceImpl##pay

 @Resource
    private StateMachine<OrderStatus, OrderStatusChangeEvent> orderStateMachine;
    @Resource
    private StateMachinePersister<OrderStatus, OrderStatusChangeEvent, String> stateMachineMemPersister;
    @Resource
    private OrderMapper orderMapper;

    /**
     * 对订单进行领取
     *
     * @param id
     * @return
     */
    public Order pay(Long id) {Order order = orderMapper.selectById(id);
        log.info("线程名称:{}, 尝试领取,订单号:{}" ,Thread.currentThread().getName() , id);
        if (!sendEvent(OrderStatusChangeEvent.PAYED, order,CommonConstants.payTransition)) {log.error("线程名称:{}, 领取失败, 状态异样,订单信息:{}", Thread.currentThread().getName(), order);
            throw new RuntimeException("领取失败, 订单状态异样");
        }
        return order;
    }

    /**
     * 发送订单状态转换事件
     * synchronized 润饰保障这个办法是线程平安的
     *
     * @param changeEvent
     * @param order
     * @return
     */
    private synchronized boolean sendEvent(OrderStatusChangeEvent changeEvent, Order order,String key){
        boolean result = false;
        try {
            // 启动状态机
            orderStateMachine.start();
            // 尝试复原状态机状态
            stateMachineMemPersister.restore(orderStateMachine, String.valueOf(order.getId()));
            Message message = MessageBuilder.withPayload(changeEvent).setHeader("order", order).build();
            result = orderStateMachine.sendEvent(message);
            if(!result){return false;}
            // 获取到监听的后果信息
            Integer o = (Integer) orderStateMachine.getExtendedState().getVariables().get(key + order.getId());
            // 操作实现之后, 删除本次对应的 key 信息
            orderStateMachine.getExtendedState().getVariables().remove(key+order.getId());
            // 如果事务执行胜利,则长久化状态机
            if(Objects.equals(1,Integer.valueOf(o))){
                // 长久化状态机状态
                stateMachineMemPersister.persist(orderStateMachine, String.valueOf(order.getId()));
            }else {
                // 订单执行业务异样
                return false;
            }
        } catch (Exception e) {log.error("订单操作失败:{}", e);
        } finally {orderStateMachine.stop();
        }
        return result;
    }

应用 aop 对监听事件切面,把业务执行后果封装到状态机的变量中,注解:

 @Retention(RetentionPolicy.RUNTIME)
    public @interface LogResult {
        /**
         * 执行的业务 key
         *
         * @return String
         */
        String key();}

切面:

 @Component
    @Aspect
    @Slf4j
    public class LogResultAspect {

        // 拦挡 LogHistory 注解
        @Pointcut("@annotation(com.zengqingfa.springboot.state.demo.aop.annotation.LogResult)")
        private void logResultPointCut() {//logResultPointCut 日志注解切点}
        @Resource
        private StateMachine<OrderStatus, OrderStatusChangeEvent> orderStateMachine;

        @Around("logResultPointCut()")
        public Object logResultAround(ProceedingJoinPoint pjp) throws Throwable {
            // 获取参数
            Object[] args = pjp.getArgs();
            log.info("参数 args:{}", args);
            Message message = (Message) args[0];
            Order order = (Order) message.getHeaders().get("order");
            // 获取办法
            Method method = ((MethodSignature) pjp.getSignature()).getMethod();
            // 获取 LogHistory 注解
            LogResult logResult = method.getAnnotation(LogResult.class);
            String key = logResult.key();
            Object returnVal = null;
            try {
                // 执行办法
                returnVal = pjp.proceed();
                // 如果业务执行失常,则保存信息
                // 胜利 则为 1
                orderStateMachine.getExtendedState().getVariables().put(key + order.getId(), 1);
            } catch (Throwable e) {log.error("e:{}", e.getMessage());
                // 如果业务执行异样,则保存信息
                // 将异样信息变量信息中,失败则为 0
                orderStateMachine.getExtendedState().getVariables().put(key + order.getId(), 0);
                throw e;
            }
            return returnVal;
        }
    }

监听类应用注解:

 @Component("orderStateListener")
    @WithStateMachine(name = "orderStateMachine")
    @Slf4j
    public class OrderStateListenerImpl {
        @Resource
        private OrderMapper orderMapper;

        @OnTransition(source = "WAIT_PAYMENT", target = "WAIT_DELIVER")
        @Transactional(rollbackFor = Exception.class)
        @LogResult(key = CommonConstants.payTransition)
        public void payTransition(Message<OrderStatusChangeEvent> message) {Order order = (Order) message.getHeaders().get("order");
            log.info("领取,状态机反馈信息:{}", message.getHeaders().toString());
            // 更新订单
            order.setStatus(OrderStatus.WAIT_DELIVER.getKey());
            orderMapper.updateById(order);
            //TODO 其余业务
            // 模仿异样
            if (Objects.equals(order.getName(), "A")) {throw new RuntimeException("执行业务异样");
            }
        }
        @OnTransition(source = "WAIT_DELIVER", target = "WAIT_RECEIVE")
        @LogResult(key = CommonConstants.deliverTransition)
        public void deliverTransition(Message<OrderStatusChangeEvent> message) {Order order = (Order) message.getHeaders().get("order");
            log.info("发货,状态机反馈信息:{}", message.getHeaders().toString());
            // 更新订单
            order.setStatus(OrderStatus.WAIT_RECEIVE.getKey());
            orderMapper.updateById(order);
            //TODO 其余业务
        }
        @OnTransition(source = "WAIT_RECEIVE", target = "FINISH")
        @LogResult(key = CommonConstants.receiveTransition)
        public void receiveTransition(Message<OrderStatusChangeEvent> message) {Order order = (Order) message.getHeaders().get("order");
            log.info("确认收货,状态机反馈信息:{}", message.getHeaders().toString());
            // 更新订单
            order.setStatus(OrderStatus.FINISH.getKey());
            orderMapper.updateById(order);
            //TODO 其余业务
        }
    }

近期热文举荐:

1.1,000+ 道 Java 面试题及答案整顿 (2022 最新版)

2. 劲爆!Java 协程要来了。。。

3.Spring Boot 2.x 教程,太全了!

4. 别再写满屏的爆爆爆炸类了,试试装璜器模式,这才是优雅的形式!!

5.《Java 开发手册(嵩山版)》最新公布,速速下载!

感觉不错,别忘了顺手点赞 + 转发哦!

正文完
 0