事务音讯的原理


上面来看 RocketMQ 的事务音讯是如何来发送“可靠消息”的,只须要以下三步:

  1. 发送半音讯(半音讯不会发送给消费者)
  2. 执行本地事务
  3. 提交音讯


实现事务音讯发送后,消费者就能够以失常的形式来生产数据。

RocketMQ 的主动重发机制在绝大多数状况下,都能够保障音讯被正确生产。

如果音讯最终生产失败了,还能够由人工解决进行托底。


下面剖析的是失常状况下的执行流程。上面再来看两种谬误状况:

  1. 事务执行失败时回滚音讯
  2. 服务器无奈得悉音讯状态时,须要被动回查音讯状态

回滚:


音讯回查:

生产者

package demo8;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.LocalTransactionState;import org.apache.rocketmq.client.producer.TransactionListener;import org.apache.rocketmq.client.producer.TransactionMQProducer;import org.apache.rocketmq.client.producer.TransactionSendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.common.message.MessageExt;import java.util.Scanner;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.Executors;public class Producer {    public static void main(String[] args) throws MQClientException {        TransactionMQProducer p = new TransactionMQProducer("producer-demo8");        p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");        p.setExecutorService(Executors.newFixedThreadPool(5));        p.setTransactionListener(new TransactionListener() {            ConcurrentHashMap<String, LocalTransactionState> localTx = new ConcurrentHashMap<>();            /*            在这里执行本地事务             */            @Override            public LocalTransactionState executeLocalTransaction(Message message, Object o) {                System.out.println("执行本地事务");                if (Math.random()<0.333) {                    System.out.println("本地事务执行胜利, 按回车提交事务音讯");                    new Scanner(System.in).nextLine();                    localTx.put(message.getTransactionId(), LocalTransactionState.COMMIT_MESSAGE);                    return LocalTransactionState.COMMIT_MESSAGE;                } else if (Math.random()<0.666) {                    System.out.println("本地事务执行失败, 按回车回滚事务音讯");                    new Scanner(System.in).nextLine();                    localTx.put(message.getTransactionId(), LocalTransactionState.ROLLBACK_MESSAGE);                    return LocalTransactionState.ROLLBACK_MESSAGE;                } else {                    System.out.println("本地事务执行状况未知, 按回车持续");                    new Scanner(System.in).nextLine();                    localTx.put(message.getTransactionId(), LocalTransactionState.UNKNOW);                    return LocalTransactionState.UNKNOW;                }            }            /*            回查办法            检测频率默认1分钟,可通过在broker.conf文件中设置transactionCheckInterval的值来扭转默认值,单位为毫秒。             */            @Override            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {                System.out.println("服务器正在回查音讯状态");                LocalTransactionState s = localTx.get(messageExt.getTransactionId());                if (s == null || s == LocalTransactionState.UNKNOW) {                    s = LocalTransactionState.ROLLBACK_MESSAGE;                }                return s;            }        });        p.start();        String topic = "Topic8";        while (true) {            System.out.print("输出音讯,用逗号分隔多条音讯: ");            String[] a = new Scanner(System.in).nextLine().split(",");            for (String s : a) {                Message msg = new Message(topic, s.getBytes());                System.out.println("---------发送半音讯-----------");                TransactionSendResult r = p.sendMessageInTransaction(msg, null);                System.out.println("事务音讯发送后果: "+ r.getLocalTransactionState().name());            }        }    }} 

消费者

package demo8;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*如果返回 RECONSUME_LATER, 服务器会期待一会再重试发送音讯音讯属性默认设置 DELAY=6, 等待时间为 2 分钟,                org/apache/rocketmq/store/config/MessageStoreConfig.java                this.messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; */public class Consumer {    public static void main(String[] args) throws Exception {        DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo8");        c.setNamesrvAddr("192.168.64.151:9876:192.168.64.152:9876");        c.subscribe("Topic8", "*");        c.registerMessageListener(new MessageListenerConcurrently() {            @Override            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext ctx) {                for (MessageExt msg : list) {                    System.out.println(new String(msg.getBody()) + " - " + msg);                }                if (Math.random()<0.5) {                    System.out.println("音讯解决实现");                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;                } else {                    System.out.println("音讯解决失败, 要求服务器稍后重试发送音讯");                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;                }            }        });        c.start();        System.out.println("开始生产数据");    }}

筹备订单我的项目案例

新建 rocketmq-dtx 工程

新建 Empty Project:

工程命名为 rocketmq-dtx,寄存到任意文件夹下:

导入订单我的项目,无事务版本

下载我的项目代码

  1. 拜访 git 仓库 https://gitee.com/benwang6/seata-samples
  2. 拜访我的项目标签
  3. 下载无事务版

解压到 rocketmq-dtx 目录

压缩文件中的 7 个我的项目目录解压缩到 rocketmq-dtx 目录:

导入我的项目

在 idea 中按两下 shift 键,搜寻 add maven projects,关上 maven 工具:

而后抉择 rocketmq-dtx 工程目录下的 7 个我的项目的 pom.xml 导入:

order 增加事务状态表

Rocketmq收到事务音讯后,会期待生产者提交或回滚该音讯。如果无奈失去生产者的提交或回滚指令,则会被动向生产者询问音讯状态,称为回查。

在 order 我的项目中,为了让Rocketmq能够回查到事务的状态,须要记录事务的状态,所以咱们增加一个事务的状态表来记录事务状态。

批改 db-init 我的项目中的 order.sql 文件,创立 tx_table 表:

drop database if exists `seata_order`;CREATE DATABASE `seata_order` charset utf8;use `seata_order`;CREATE TABLE `order` (  `id` bigint(11) NOT NULL,  `user_id` bigint(11) DEFAULT NULL COMMENT '用户id',  `product_id` bigint(11) DEFAULT NULL COMMENT '产品id',  `count` int(11) DEFAULT NULL COMMENT '数量',  `money` decimal(11,0) DEFAULT NULL COMMENT '金额',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;ALTER TABLE `order` ADD COLUMN `status` int(1) DEFAULT NULL COMMENT '订单状态:0:创立中;1:已完结' AFTER `money` ;-- for AT mode you must to init this sql for you business database. the seata server not need it.CREATE TABLE IF NOT EXISTS `undo_log`(    `branch_id`     BIGINT(20)   NOT NULL COMMENT 'branch transaction id',    `xid`           VARCHAR(100) NOT NULL COMMENT 'global transaction id',    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',    `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',    `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)) ENGINE = InnoDB  AUTO_INCREMENT = 1  DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';CREATE TABLE IF NOT EXISTS segment(    id            BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY COMMENT '自增主键',    VERSION       BIGINT      DEFAULT 0  NOT NULL COMMENT '版本号',    business_type VARCHAR(63) DEFAULT '' NOT NULL COMMENT '业务类型,惟一',    max_id        BIGINT      DEFAULT 0  NOT NULL COMMENT '以后最大id',    step          INT         DEFAULT 0  NULL COMMENT '步长',    increment     INT         DEFAULT 1  NOT NULL COMMENT '每次id增量',    remainder     INT         DEFAULT 0  NOT NULL COMMENT '余数',    created_at    BIGINT UNSIGNED        NOT NULL COMMENT '创立工夫',    updated_at    BIGINT UNSIGNED        NOT NULL COMMENT '更新工夫',    CONSTRAINT uniq_business_type UNIQUE (business_type)) CHARSET = utf8mb4  ENGINE INNODB COMMENT '号段表';INSERT INTO segment(VERSION, business_type, max_id, step, increment, remainder, created_at, updated_at)VALUES (1, 'order_business', 1000, 1000, 1, 0, NOW(), NOW());CREATE TABLE tx_table(    `xid` char(32) PRIMARY KEY COMMENT '事务id',    `status` int COMMENT '0-提交,1-回滚,2-未知',    `created_at` BIGINT UNSIGNED NOT NULL COMMENT '创立工夫');

运行 db-init 我的项目,会创立这个表:

order 发送事务音讯,并执行本地事务

Rocketmq 中增加 Topic

应用 order-topic 来收发音讯,在 Rocketmq 服务器上创立这个 Topic:

order-parent 中增加 rocketmq 起步依赖

批改 pom.xml 增加以下内容:

properties 中设置 rocketmq 起步依赖的版本

<rocketmq-spring-boot-starter.version>2.1.0</rocketmq-spring-boot-starter.version>

增加 rocketmq 起步依赖:

<dependency>    <groupId>org.apache.rocketmq</groupId>    <artifactId>rocketmq-spring-boot-starter</artifactId>    <version>${rocketmq-spring-boot-starter.version}</version></dependency>

order 我的项目中增加 rocketmq 连贯信息配置:

批改 order 我的项目的 application.yml,增加 NameServer 地址,指定生产者组名:

rocketmq:  name-server: 192.168.64.151:9876;192.168.64.152:9876  producer:    group: order-group

增加 TxMapper 拜访事务状态表

事务状态保留到 tx_table 表,在 TxMapper 接口和 TxMapper.xml 中增加事务状态数据的读写办法。

本地事务执行后要保留事务信息(事务id、事务状态)到数据库,以便之后进行事务回查,首先创立封装事务信息的类 TxInfo

package cn.tedu.order.tx;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data@NoArgsConstructor@AllArgsConstructorpublic class TxInfo {    private String xid;    private Long created;    private Integer status;}

TxMapper 接口:

package cn.tedu.order.mapper;import cn.tedu.order.tx.TxInfo;import com.baomidou.mybatisplus.core.mapper.BaseMapper;public interface TxMapper extends BaseMapper<TxInfo> {    Boolean exists(String xid);}

TxMapper.xml

<?xml version="1.0" encoding="UTF-8" ?><!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" ><mapper namespace="cn.tedu.order.mapper.TxMapper" >    <resultMap id="BaseResultMap" type="cn.tedu.order.tx.TxInfo" >        <id column="xid" property="xid" jdbcType="CHAR" />        <result column="created_at" property="created" jdbcType="BIGINT" />        <result column="status" property="status" jdbcType="INTEGER"/>    </resultMap>    <insert id="insert">        INSERT INTO `tx_table`(`xid`,`created_at`,`status`) VALUES(#{xid},#{created},#{status});    </insert>    <select id="exists" resultType="boolean">        SELECT COUNT(1) FROM tx_table WHERE xid=#{xid};    </select>    <select id="selectById" resultMap="BaseResultMap">        SELECT `xid`,`created_at`,`status` FROM tx_table WHERE xid=#{xid};    </select></mapper>

Json解决工具

发送事务音讯时,咱们把事务对象序列化成 Json 字符串再发送。这里先增加一个工具 JsonUtil 用来解决 Json:

package cn.tedu.order.util;import java.io.File;import java.io.FileWriter;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamReader;import java.io.Writer;import java.math.BigDecimal;import java.math.BigInteger;import java.net.URL;import java.nio.charset.StandardCharsets;import java.text.SimpleDateFormat;import java.util.ArrayList;import java.util.List;import org.apache.commons.lang3.StringUtils;import com.fasterxml.jackson.annotation.JsonInclude;import com.fasterxml.jackson.core.JsonGenerator;import com.fasterxml.jackson.core.JsonParser;import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.core.type.TypeReference;import com.fasterxml.jackson.databind.DeserializationFeature;import com.fasterxml.jackson.databind.JsonNode;import com.fasterxml.jackson.databind.ObjectMapper;import com.fasterxml.jackson.databind.SerializationFeature;import com.fasterxml.jackson.databind.node.ObjectNode;import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;import lombok.extern.slf4j.Slf4j;@Slf4jpublic class JsonUtil {    private static ObjectMapper mapper;    private static JsonInclude.Include DEFAULT_PROPERTY_INCLUSION = JsonInclude.Include.NON_DEFAULT;    private static boolean IS_ENABLE_INDENT_OUTPUT = false;    private static String CSV_DEFAULT_COLUMN_SEPARATOR = ",";    static {        try {            initMapper();            configPropertyInclusion();            configIndentOutput();            configCommon();        } catch (Exception e) {            log.error("jackson config error", e);        }    }    private static void initMapper() {        mapper = new ObjectMapper();    }    private static void configCommon() {        config(mapper);    }    private static void configPropertyInclusion() {        mapper.setSerializationInclusion(DEFAULT_PROPERTY_INCLUSION);    }    private static void configIndentOutput() {        mapper.configure(SerializationFeature.INDENT_OUTPUT, IS_ENABLE_INDENT_OUTPUT);    }    private static void config(ObjectMapper objectMapper) {        objectMapper.enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN);        objectMapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT);        objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);        objectMapper.enable(DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY);        objectMapper.enable(DeserializationFeature.FAIL_ON_NUMBERS_FOR_ENUMS);        objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);        objectMapper.disable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES);        objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);        objectMapper.enable(JsonParser.Feature.ALLOW_COMMENTS);        objectMapper.disable(JsonGenerator.Feature.ESCAPE_NON_ASCII);        objectMapper.enable(JsonGenerator.Feature.IGNORE_UNKNOWN);        objectMapper.enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES);        objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);        objectMapper.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));        objectMapper.enable(JsonParser.Feature.ALLOW_SINGLE_QUOTES);        objectMapper.registerModule(new ParameterNamesModule());        objectMapper.registerModule(new Jdk8Module());        objectMapper.registerModule(new JavaTimeModule());    }    public static void setSerializationInclusion(JsonInclude.Include inclusion) {        DEFAULT_PROPERTY_INCLUSION = inclusion;        configPropertyInclusion();    }    public static void setIndentOutput(boolean isEnable) {        IS_ENABLE_INDENT_OUTPUT = isEnable;        configIndentOutput();    }    public static <V> V from(URL url, Class<V> c) {        try {            return mapper.readValue(url, c);        } catch (IOException e) {            log.error("jackson from error, url: {}, type: {}", url.getPath(), c, e);            return null;        }    }    public static <V> V from(InputStream inputStream, Class<V> c) {        try {            return mapper.readValue(inputStream, c);        } catch (IOException e) {            log.error("jackson from error, type: {}", c, e);            return null;        }    }    public static <V> V from(File file, Class<V> c) {        try {            return mapper.readValue(file, c);        } catch (IOException e) {            log.error("jackson from error, file path: {}, type: {}", file.getPath(), c, e);            return null;        }    }    public static <V> V from(Object jsonObj, Class<V> c) {        try {            return mapper.readValue(jsonObj.toString(), c);        } catch (IOException e) {            log.error("jackson from error, json: {}, type: {}", jsonObj.toString(), c, e);            return null;        }    }    public static <V> V from(String json, Class<V> c) {        try {            return mapper.readValue(json, c);        } catch (IOException e) {            log.error("jackson from error, json: {}, type: {}", json, c, e);            return null;        }    }    public static <V> V from(URL url, TypeReference<V> type) {        try {            return mapper.readValue(url, type);        } catch (IOException e) {            log.error("jackson from error, url: {}, type: {}", url.getPath(), type, e);            return null;        }    }    public static <V> V from(InputStream inputStream, TypeReference<V> type) {        try {            return mapper.readValue(inputStream, type);        } catch (IOException e) {            log.error("jackson from error, type: {}", type, e);            return null;        }    }    public static <V> V from(File file, TypeReference<V> type) {        try {            return mapper.readValue(file, type);        } catch (IOException e) {            log.error("jackson from error, file path: {}, type: {}", file.getPath(), type, e);            return null;        }    }    public static <V> V from(Object jsonObj, TypeReference<V> type) {        try {            return mapper.readValue(jsonObj.toString(), type);        } catch (IOException e) {            log.error("jackson from error, json: {}, type: {}", jsonObj.toString(), type, e);            return null;        }    }    public static <V> V from(String json, TypeReference<V> type) {        try {            return mapper.readValue(json, type);        } catch (IOException e) {            log.error("jackson from error, json: {}, type: {}", json, type, e);            return null;        }    }    public static <V> String to(List<V> list) {        try {            return mapper.writeValueAsString(list);        } catch (JsonProcessingException e) {            log.error("jackson to error, obj: {}", list, e);            return null;        }    }    public static <V> String to(V v) {        try {            return mapper.writeValueAsString(v);        } catch (JsonProcessingException e) {            log.error("jackson to error, obj: {}", v, e);            return null;        }    }    public static <V> void toFile(String path, List<V> list) {        try (Writer writer = new FileWriter(new File(path), true)) {            mapper.writer().writeValues(writer).writeAll(list);            writer.flush();        } catch (Exception e) {            log.error("jackson to file error, path: {}, list: {}", path, list, e);        }    }    public static <V> void toFile(String path, V v) {        try (Writer writer = new FileWriter(new File(path), true)) {            mapper.writer().writeValues(writer).write(v);            writer.flush();        } catch (Exception e) {            log.error("jackson to file error, path: {}, obj: {}", path, v, e);        }    }    public static String getString(String json, String key) {        if (StringUtils.isEmpty(json)) {            return null;        }        try {            JsonNode node = mapper.readTree(json);            if (null != node) {                return node.get(key).asText();            } else {                return null;            }        } catch (IOException e) {            log.error("jackson get string error, json: {}, key: {}", json, key, e);            return null;        }    }    public static Integer getInt(String json, String key) {        if (StringUtils.isEmpty(json)) {            return null;        }        try {            JsonNode node = mapper.readTree(json);            if (null != node) {                return node.get(key).intValue();            } else {                return null;            }        } catch (IOException e) {            log.error("jackson get int error, json: {}, key: {}", json, key, e);            return null;        }    }    public static Long getLong(String json, String key) {        if (StringUtils.isEmpty(json)) {            return null;        }        try {            JsonNode node = mapper.readTree(json);            if (null != node) {                return node.get(key).longValue();            } else {                return null;            }        } catch (IOException e) {            log.error("jackson get long error, json: {}, key: {}", json, key, e);            return null;        }    }    public static Double getDouble(String json, String key) {        if (StringUtils.isEmpty(json)) {            return null;        }        try {            JsonNode node = mapper.readTree(json);            if (null != node) {                return node.get(key).doubleValue();            } else {                return null;            }        } catch (IOException e) {            log.error("jackson get double error, json: {}, key: {}", json, key, e);            return null;        }    }    public static BigInteger getBigInteger(String json, String key) {        if (StringUtils.isEmpty(json)) {            return new BigInteger(String.valueOf(0.00));        }        try {            JsonNode node = mapper.readTree(json);            if (null != node) {                return node.get(key).bigIntegerValue();            } else {                return null;            }        } catch (IOException e) {            log.error("jackson get biginteger error, json: {}, key: {}", json, key, e);            return null;        }    }    public static BigDecimal getBigDecimal(String json, String key) {        if (StringUtils.isEmpty(json)) {            return null;        }        try {            JsonNode node = mapper.readTree(json);            if (null != node) {                return node.get(key).decimalValue();            } else {                return null;            }        } catch (IOException e) {            log.error("jackson get bigdecimal error, json: {}, key: {}", json, key, e);            return null;        }    }    public static boolean getBoolean(String json, String key) {        if (StringUtils.isEmpty(json)) {            return false;        }        try {            JsonNode node = mapper.readTree(json);            if (null != node) {                return node.get(key).booleanValue();            } else {                return false;            }        } catch (IOException e) {            log.error("jackson get boolean error, json: {}, key: {}", json, key, e);            return false;        }    }    public static byte[] getByte(String json, String key) {        if (StringUtils.isEmpty(json)) {            return null;        }        try {            JsonNode node = mapper.readTree(json);            if (null != node) {                return node.get(key).binaryValue();            } else {                return null;            }        } catch (IOException e) {            log.error("jackson get byte error, json: {}, key: {}", json, key, e);            return null;        }    }    public static <T> ArrayList<T> getList(String json, String key) {        if (StringUtils.isEmpty(json)) {            return null;        }        String string = getString(json, key);        return from(string, new TypeReference<ArrayList<T>>() {});    }    public static <T> String add(String json, String key, T value) {        try {            JsonNode node = mapper.readTree(json);            add(node, key, value);            return node.toString();        } catch (IOException e) {            log.error("jackson add error, json: {}, key: {}, value: {}", json, key, value, e);            return json;        }    }    private static <T> void add(JsonNode jsonNode, String key, T value) {        if (value instanceof String) {            ((ObjectNode) jsonNode).put(key, (String) value);        } else if (value instanceof Short) {            ((ObjectNode) jsonNode).put(key, (Short) value);        } else if (value instanceof Integer) {            ((ObjectNode) jsonNode).put(key, (Integer) value);        } else if (value instanceof Long) {            ((ObjectNode) jsonNode).put(key, (Long) value);        } else if (value instanceof Float) {            ((ObjectNode) jsonNode).put(key, (Float) value);        } else if (value instanceof Double) {            ((ObjectNode) jsonNode).put(key, (Double) value);        } else if (value instanceof BigDecimal) {            ((ObjectNode) jsonNode).put(key, (BigDecimal) value);        } else if (value instanceof BigInteger) {            ((ObjectNode) jsonNode).put(key, (BigInteger) value);        } else if (value instanceof Boolean) {            ((ObjectNode) jsonNode).put(key, (Boolean) value);        } else if (value instanceof byte[]) {            ((ObjectNode) jsonNode).put(key, (byte[]) value);        } else {            ((ObjectNode) jsonNode).put(key, to(value));        }    }    public static String remove(String json, String key) {        try {            JsonNode node = mapper.readTree(json);            ((ObjectNode) node).remove(key);            return node.toString();        } catch (IOException e) {            log.error("jackson remove error, json: {}, key: {}", json, key, e);            return json;        }    }    public static <T> String update(String json, String key, T value) {        try {            JsonNode node = mapper.readTree(json);            ((ObjectNode) node).remove(key);            add(node, key, value);            return node.toString();        } catch (IOException e) {            log.error("jackson update error, json: {}, key: {}, value: {}", json, key, value, e);            return json;        }    }    public static String format(String json) {        try {            JsonNode node = mapper.readTree(json);            return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(node);        } catch (IOException e) {            log.error("jackson format json error, json: {}", json, e);            return json;        }    }    public static boolean isJson(String json) {        try {            mapper.readTree(json);            return true;        } catch (Exception e) {            log.error("jackson check json error, json: {}", json, e);            return false;        }    }    private static InputStream getResourceStream(String name) {        return JsonUtil.class.getClassLoader().getResourceAsStream(name);    }    private static InputStreamReader getResourceReader(InputStream inputStream) {        if (null == inputStream) {            return null;        }        return new InputStreamReader(inputStream, StandardCharsets.UTF_8);    }}

废除 OrderServiceImpl

之前的业务实现类 OrderServiceImpl 废除,前面用一个新的类 TxOrderService 来代替。

OrderServiceImpl.java 间接删除即可。

TxOrderService 发送事务音讯

TxAccountMessage 封装发送给账户服务的数据:用户id和扣减金额。另外还封装了事务id。

package cn.tedu.order.tx;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import java.math.BigDecimal;@Data@NoArgsConstructor@AllArgsConstructorpublic class TxAccountMessage {    Long userId;    BigDecimal money;    String xid;}

在业务办法 create() 中不间接保留订单,而是发送事务音讯。

音讯收回后,会触发 TxListener 执行本地事务,它执行时会回调这里的 doCreate() 办法实现订单的保留。

package cn.tedu.order.tx;import cn.tedu.order.entity.Order;import cn.tedu.order.feign.EasyIdGeneratorClient;import cn.tedu.order.mapper.OrderMapper;import cn.tedu.order.mapper.TxMapper;import cn.tedu.order.service.OrderService;import cn.tedu.order.util.JsonUtil;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Primary;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Service;import org.springframework.transaction.annotation.Transactional;import java.util.UUID;@Slf4j@Primary@Servicepublic class TxOrderService implements OrderService {    @Autowired    private RocketMQTemplate rocketMQTemplate;    @Autowired    private OrderMapper orderMapper;    @Autowired    private TxMapper txMapper;    @Autowired    EasyIdGeneratorClient easyIdGeneratorClient;    /*    创立订单的业务办法    这里批改为:只向 Rocketmq 发送事务音讯。     */    @Override    public void create(Order order) {        // 产生事务ID        String xid = UUID.randomUUID().toString().replace("-", "");        //对事务相干数据进行封装,并转成 json 字符串        TxAccountMessage sMsg = new TxAccountMessage(order.getUserId(), order.getMoney(), xid);        String json = JsonUtil.to(sMsg);        //json字符串封装到 Spring Message 对象        Message<String> msg = MessageBuilder.withPayload(json).build();        //发送事务音讯        rocketMQTemplate.sendMessageInTransaction("order-topic:account", msg, order);        log.info("事务音讯已发送");    }    //本地事务,执行订单保留    //这个办法在事务监听器中调用    @Transactional    public void doCreate(Order order, String xid) {        log.info("执行本地事务,保留订单");        // 从全局惟一id发号器取得id        Long orderId = easyIdGeneratorClient.nextId("order_business");        order.setId(orderId);        orderMapper.create(order);        log.info("订单已保留! 事务日志已保留");    }}

TxListener 事务监听器

发送事务音讯后会触发事务监听器执行。

事务监听器有两个办法:

  • executeLocalTransaction(): 执行本地事务
  • checkLocalTransaction(): 负责响应Rocketmq服务器的事务回查操作

TxListener 监听器类:

package cn.tedu.order.tx;import cn.tedu.order.entity.Order;import cn.tedu.order.mapper.TxMapper;import cn.tedu.order.util.JsonUtil;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.stereotype.Component;@Slf4j@Component@RocketMQTransactionListenerpublic class TxListener implements RocketMQLocalTransactionListener {    @Autowired    private TxOrderService orderService;    @Autowired    private TxMapper txMapper;    @Override    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {        log.info("事务监听 - 开始执行本地事务");        // 监听器中失去的 message payload 是 byte[]        String json = new String((byte[]) message.getPayload());        String xid = JsonUtil.getString(json, "xid");        log.info("事务监听 - "+json);        log.info("事务监听 - xid: "+xid);        RocketMQLocalTransactionState state;        int status = 0;        Order order = (Order) o;        try {            orderService.doCreate(order, xid);            log.info("本地事务执行胜利,提交音讯");            state = RocketMQLocalTransactionState.COMMIT;            status = 0;        } catch (Exception e) {            e.printStackTrace();            log.info("本地事务执行失败,回滚音讯");            state = RocketMQLocalTransactionState.ROLLBACK;            status = 1;        }        TxInfo txInfo = new TxInfo(xid, System.currentTimeMillis(), status);        txMapper.insert(txInfo);        return state;    }    @Override    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {        log.info("事务监听 - 回查事务状态");        // 监听器中失去的 message payload 是 byte[]        String json = new String((byte[]) message.getPayload());        String xid = JsonUtil.getString(json, "xid");        TxInfo txInfo = txMapper.selectById(xid);        if (txInfo == null) {            log.info("事务监听 - 回查事务状态 - 事务不存在:"+xid);            return RocketMQLocalTransactionState.UNKNOWN;        }        log.info("事务监听 - 回查事务状态 - "+ txInfo.getStatus());        switch (txInfo.getStatus()) {            case 0: return RocketMQLocalTransactionState.COMMIT;            case 1: return RocketMQLocalTransactionState.ROLLBACK;            default: return RocketMQLocalTransactionState.UNKNOWN;        }    }}

启动订单我的项目进行测试

按程序启动我的项目:

  1. Eureka
  2. Easy Id Generator
  3. Order

调用保留订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

察看控制台日志:


订单表:

事务表:

拜访 Rocketmq,查看事务音讯:

account 接管事务音讯,并执行本地事务

application.yml 增加 Rocketmq 连贯配置

rocketmq:  name-server: 192.168.64.151:9876;192.168.64.152:9876

JsonUtil 工具类

同下面的代码。

TxConsumer 接管事务音讯,调用账户业务办法

接管的音讯转换成 TxAccountMessage 对象,这里先创立这个类:

package cn.tedu.account.tx;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import java.math.BigDecimal;@Data@NoArgsConstructor@AllArgsConstructorpublic class TxAccountMessage {    Long userId;    BigDecimal money;    String xid;}

TxConsumer 实现音讯监听,收到音讯后实现扣减金额业务:

package cn.tedu.account.tx;import cn.tedu.account.service.AccountService;import cn.tedu.account.util.JsonUtil;import com.fasterxml.jackson.core.type.TypeReference;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Slf4j@Component@RocketMQMessageListener(consumerGroup = "account-consumer-group", topic = "order-topic", selectorExpression = "account")public class TxConsumer implements RocketMQListener<String> {    @Autowired    private AccountService accountService;    @Override    public void onMessage(String msg) {        TxAccountMessage txAccountMessage = JsonUtil.from(msg, new TypeReference<TxAccountMessage>() {});        log.info("收到音讯: "+txAccountMessage);        accountService.decrease(txAccountMessage.getUserId(), txAccountMessage.getMoney());    }}

AccountServiceImpl 增加事务注解

package cn.tedu.account.service;import cn.tedu.account.mapper.AccountMapper;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import org.springframework.transaction.annotation.Transactional;import java.math.BigDecimal;@Servicepublic class AccountServiceImpl implements AccountService {    @Autowired    private AccountMapper accountMapper;    @Transactional    @Override    public void decrease(Long userId, BigDecimal money) {        accountMapper.decrease(userId,money);    }}

启动 account 我的项目进行测试

按程序启动我的项目:

  1. Eureka
  2. Easy Id Generator
  3. Account
  4. Order

account 我的项目启动时,会立刻从 Rocketmq 收到音讯,执行账户扣减业务:

order 本地事务失败测试

批改 TxOrderService 增加模仿异样:

package cn.tedu.order.tx;import cn.tedu.order.entity.Order;import cn.tedu.order.feign.EasyIdGeneratorClient;import cn.tedu.order.mapper.OrderMapper;import cn.tedu.order.mapper.TxMapper;import cn.tedu.order.service.OrderService;import cn.tedu.order.util.JsonUtil;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Primary;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Service;import org.springframework.transaction.annotation.Transactional;import java.util.UUID;@Slf4j@Primary@Servicepublic class TxOrderService implements OrderService {    @Autowired    private RocketMQTemplate rocketMQTemplate;    @Autowired    private OrderMapper orderMapper;    @Autowired    private TxMapper txMapper;    @Autowired    EasyIdGeneratorClient easyIdGeneratorClient;    /*    创立订单的业务办法    这里批改为:只向 Rocketmq 发送事务音讯。     */    @Override    public void create(Order order) {        // 产生事务ID        String xid = UUID.randomUUID().toString().replace("-", "");        //对事务相干数据进行封装,并转成 json 字符串        TxAccountMessage sMsg = new TxAccountMessage(order.getUserId(), order.getMoney(), xid);        String json = JsonUtil.to(sMsg);        //json字符串封装到 Spring Message 对象        Message<String> msg = MessageBuilder.withPayload(json).build();        //发送事务音讯        log.info("开始发送事务音讯");        rocketMQTemplate.sendMessageInTransaction("order-topic:account", msg, order);        log.info("事务音讯已发送");    }    //本地事务,执行订单保留    //这个办法在事务监听器中调用    @Transactional    public void doCreate(Order order, String xid) {        log.info("执行本地事务,保留订单");        // 从全局惟一id发号器取得id        Long orderId = easyIdGeneratorClient.nextId("order_business");        order.setId(orderId);        orderMapper.create(order);        if (Math.random() < 0.5) {            throw new RuntimeException("模仿异样");        }        log.info("订单已保留! 事务日志已保留");    }}

调用保留订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

本地事务失败后,会告诉 Rocketmq 回滚事务:

测试完后,将模仿异样代码正文掉

account 本地事务失败测试

批改 AccountServiceImpl,增加随机模仿异样:

package cn.tedu.account.service;import cn.tedu.account.mapper.AccountMapper;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import org.springframework.transaction.annotation.Transactional;import java.math.BigDecimal;@Servicepublic class AccountServiceImpl implements AccountService {    @Autowired    private AccountMapper accountMapper;    @Transactional    @Override    public void decrease(Long userId, BigDecimal money) {        accountMapper.decrease(userId,money);        if (Math.random() < 0.5) {            throw new RuntimeException("模仿异样");        }    }}

调用保留订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

account 账户服务接管音讯后,如果解决失败,Rocketmq会进行重试,直到解决胜利为止。

我的项目源码: https://gitee.com/benwang6/rocketmq-dtx