共计 28207 个字符,预计需要花费 71 分钟才能阅读完成。
事务音讯的原理
上面来看 RocketMQ 的 事务音讯 是如何来发送“可靠消息”的,只须要以下三步:
- 发送半音讯(半音讯不会发送给消费者)
- 执行本地事务
- 提交音讯
实现 事务音讯 发送后,消费者就能够以失常的形式来生产数据。
RocketMQ 的主动重发机制在绝大多数状况下,都能够保障音讯被正确生产。
如果音讯最终生产失败了,还能够由人工解决进行托底。
下面剖析的是失常状况下的执行流程。上面再来看两种谬误状况:
- 事务执行失败时回滚音讯
- 服务器无奈得悉音讯状态时,须要被动回查音讯状态
回滚:
音讯回查:
生产者
package demo8;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.Scanner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
public class Producer {public static void main(String[] args) throws MQClientException {TransactionMQProducer p = new TransactionMQProducer("producer-demo8");
p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
p.setExecutorService(Executors.newFixedThreadPool(5));
p.setTransactionListener(new TransactionListener() {ConcurrentHashMap<String, LocalTransactionState> localTx = new ConcurrentHashMap<>();
/*
在这里执行本地事务
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {System.out.println("执行本地事务");
if (Math.random()<0.333) {System.out.println("本地事务执行胜利, 按回车提交事务音讯");
new Scanner(System.in).nextLine();
localTx.put(message.getTransactionId(), LocalTransactionState.COMMIT_MESSAGE);
return LocalTransactionState.COMMIT_MESSAGE;
} else if (Math.random()<0.666) {System.out.println("本地事务执行失败, 按回车回滚事务音讯");
new Scanner(System.in).nextLine();
localTx.put(message.getTransactionId(), LocalTransactionState.ROLLBACK_MESSAGE);
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {System.out.println("本地事务执行状况未知, 按回车持续");
new Scanner(System.in).nextLine();
localTx.put(message.getTransactionId(), LocalTransactionState.UNKNOW);
return LocalTransactionState.UNKNOW;
}
}
/*
回查办法
检测频率默认 1 分钟,可通过在 broker.conf 文件中设置 transactionCheckInterval 的值来扭转默认值,单位为毫秒。*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {System.out.println("服务器正在回查音讯状态");
LocalTransactionState s = localTx.get(messageExt.getTransactionId());
if (s == null || s == LocalTransactionState.UNKNOW) {s = LocalTransactionState.ROLLBACK_MESSAGE;}
return s;
}
});
p.start();
String topic = "Topic8";
while (true) {System.out.print("输出音讯, 用逗号分隔多条音讯:");
String[] a = new Scanner(System.in).nextLine().split(",");
for (String s : a) {Message msg = new Message(topic, s.getBytes());
System.out.println("--------- 发送半音讯 -----------");
TransactionSendResult r = p.sendMessageInTransaction(msg, null);
System.out.println("事务音讯发送后果:"+ r.getLocalTransactionState().name());
}
}
}
}
消费者
package demo8;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/*
如果返回 RECONSUME_LATER, 服务器会期待一会再重试发送音讯
音讯属性默认设置 DELAY=6, 等待时间为 2 分钟,
org/apache/rocketmq/store/config/MessageStoreConfig.java
this.messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
*/
public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo8");
c.setNamesrvAddr("192.168.64.151:9876:192.168.64.152:9876");
c.subscribe("Topic8", "*");
c.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext ctx) {for (MessageExt msg : list) {System.out.println(new String(msg.getBody()) + "-" + msg);
}
if (Math.random()<0.5) {System.out.println("音讯解决实现");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} else {System.out.println("音讯解决失败, 要求服务器稍后重试发送音讯");
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
c.start();
System.out.println("开始生产数据");
}
}
筹备订单我的项目案例
新建 rocketmq-dtx 工程
新建 Empty Project:
工程命名为 rocketmq-dtx
,寄存到任意文件夹下:
导入订单我的项目,无事务版本
下载我的项目代码
- 拜访 git 仓库 https://gitee.com/benwang6/seata-samples
- 拜访我的项目标签
- 下载无事务版
解压到 rocketmq-dtx 目录
压缩文件中的 7 个我的项目目录解压缩到 rocketmq-dtx
目录:
导入我的项目
在 idea 中按两下 shift
键,搜寻 add maven projects
,关上 maven 工具:
而后抉择 rocketmq-dtx
工程目录下的 7 个我的项目的 pom.xml
导入:
order 增加事务状态表
Rocketmq 收到事务音讯后,会期待生产者提交或回滚该音讯。如果无奈失去生产者的提交或回滚指令,则会被动向生产者询问音讯状态,称为回查。
在 order 我的项目中,为了让 Rocketmq 能够回查到事务的状态,须要记录事务的状态,所以咱们增加一个事务的状态表来记录事务状态。
批改 db-init
我的项目中的 order.sql
文件,创立 tx_table
表:
drop database if exists `seata_order`;
CREATE DATABASE `seata_order` charset utf8;
use `seata_order`;
CREATE TABLE `order` (`id` bigint(11) NOT NULL,
`user_id` bigint(11) DEFAULT NULL COMMENT '用户 id',
`product_id` bigint(11) DEFAULT NULL COMMENT '产品 id',
`count` int(11) DEFAULT NULL COMMENT '数量',
`money` decimal(11,0) DEFAULT NULL COMMENT '金额',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;
ALTER TABLE `order` ADD COLUMN `status` int(1) DEFAULT NULL COMMENT '订单状态:0:创立中;1:已完结' AFTER `money` ;
-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(`branch_id` BIGINT(20) NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(100) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';
CREATE TABLE IF NOT EXISTS segment
(
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY COMMENT '自增主键',
VERSION BIGINT DEFAULT 0 NOT NULL COMMENT '版本号',
business_type VARCHAR(63) DEFAULT ''NOT NULL COMMENT' 业务类型,惟一 ',
max_id BIGINT DEFAULT 0 NOT NULL COMMENT '以后最大 id',
step INT DEFAULT 0 NULL COMMENT '步长',
increment INT DEFAULT 1 NOT NULL COMMENT '每次 id 增量',
remainder INT DEFAULT 0 NOT NULL COMMENT '余数',
created_at BIGINT UNSIGNED NOT NULL COMMENT '创立工夫',
updated_at BIGINT UNSIGNED NOT NULL COMMENT '更新工夫',
CONSTRAINT uniq_business_type UNIQUE (business_type)
) CHARSET = utf8mb4
ENGINE INNODB COMMENT '号段表';
INSERT INTO segment
(VERSION, business_type, max_id, step, increment, remainder, created_at, updated_at)
VALUES (1, 'order_business', 1000, 1000, 1, 0, NOW(), NOW());
CREATE TABLE tx_table(`xid` char(32) PRIMARY KEY COMMENT '事务 id',
`status` int COMMENT '0- 提交,1- 回滚,2- 未知',
`created_at` BIGINT UNSIGNED NOT NULL COMMENT '创立工夫'
);
运行 db-init 我的项目,会创立这个表:
order 发送事务音讯,并执行本地事务
Rocketmq 中增加 Topic
应用 order-topic
来收发音讯,在 Rocketmq 服务器上创立这个 Topic:
order-parent 中增加 rocketmq 起步依赖
批改 pom.xml
增加以下内容:
在 properties
中设置 rocketmq 起步依赖的版本
<rocketmq-spring-boot-starter.version>2.1.0</rocketmq-spring-boot-starter.version>
增加 rocketmq 起步依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq-spring-boot-starter.version}</version>
</dependency>
order 我的项目中增加 rocketmq 连贯信息配置:
批改 order 我的项目的 application.yml,增加 NameServer 地址,指定生产者组名:
rocketmq:
name-server: 192.168.64.151:9876;192.168.64.152:9876
producer:
group: order-group
增加 TxMapper
拜访事务状态表
事务状态保留到 tx_table
表,在 TxMapper
接口和 TxMapper.xml
中增加事务状态数据的读写办法。
本地事务执行后要保留事务信息(事务 id、事务状态)到数据库,以便之后进行事务回查,首先创立封装事务信息的类 TxInfo
:
package cn.tedu.order.tx;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TxInfo {
private String xid;
private Long created;
private Integer status;
}
TxMapper
接口:
package cn.tedu.order.mapper;
import cn.tedu.order.tx.TxInfo;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
public interface TxMapper extends BaseMapper<TxInfo> {Boolean exists(String xid);
}
TxMapper.xml
:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.tedu.order.mapper.TxMapper" >
<resultMap id="BaseResultMap" type="cn.tedu.order.tx.TxInfo" >
<id column="xid" property="xid" jdbcType="CHAR" />
<result column="created_at" property="created" jdbcType="BIGINT" />
<result column="status" property="status" jdbcType="INTEGER"/>
</resultMap>
<insert id="insert">
INSERT INTO `tx_table`(`xid`,`created_at`,`status`) VALUES(#{xid},#{created},#{status});
</insert>
<select id="exists" resultType="boolean">
SELECT COUNT(1) FROM tx_table WHERE xid=#{xid};
</select>
<select id="selectById" resultMap="BaseResultMap">
SELECT `xid`,`created_at`,`status` FROM tx_table WHERE xid=#{xid};
</select>
</mapper>
Json 解决工具
发送事务音讯时,咱们把事务对象序列化成 Json 字符串再发送。这里先增加一个工具 JsonUtil
用来解决 Json:
package cn.tedu.order.util;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Writer;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class JsonUtil {
private static ObjectMapper mapper;
private static JsonInclude.Include DEFAULT_PROPERTY_INCLUSION = JsonInclude.Include.NON_DEFAULT;
private static boolean IS_ENABLE_INDENT_OUTPUT = false;
private static String CSV_DEFAULT_COLUMN_SEPARATOR = ",";
static {
try {initMapper();
configPropertyInclusion();
configIndentOutput();
configCommon();} catch (Exception e) {log.error("jackson config error", e);
}
}
private static void initMapper() {mapper = new ObjectMapper();
}
private static void configCommon() {config(mapper);
}
private static void configPropertyInclusion() {mapper.setSerializationInclusion(DEFAULT_PROPERTY_INCLUSION);
}
private static void configIndentOutput() {mapper.configure(SerializationFeature.INDENT_OUTPUT, IS_ENABLE_INDENT_OUTPUT);
}
private static void config(ObjectMapper objectMapper) {objectMapper.enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN);
objectMapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT);
objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);
objectMapper.enable(DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY);
objectMapper.enable(DeserializationFeature.FAIL_ON_NUMBERS_FOR_ENUMS);
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
objectMapper.disable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES);
objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
objectMapper.enable(JsonParser.Feature.ALLOW_COMMENTS);
objectMapper.disable(JsonGenerator.Feature.ESCAPE_NON_ASCII);
objectMapper.enable(JsonGenerator.Feature.IGNORE_UNKNOWN);
objectMapper.enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES);
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
objectMapper.enable(JsonParser.Feature.ALLOW_SINGLE_QUOTES);
objectMapper.registerModule(new ParameterNamesModule());
objectMapper.registerModule(new Jdk8Module());
objectMapper.registerModule(new JavaTimeModule());
}
public static void setSerializationInclusion(JsonInclude.Include inclusion) {
DEFAULT_PROPERTY_INCLUSION = inclusion;
configPropertyInclusion();}
public static void setIndentOutput(boolean isEnable) {
IS_ENABLE_INDENT_OUTPUT = isEnable;
configIndentOutput();}
public static <V> V from(URL url, Class<V> c) {
try {return mapper.readValue(url, c);
} catch (IOException e) {log.error("jackson from error, url: {}, type: {}", url.getPath(), c, e);
return null;
}
}
public static <V> V from(InputStream inputStream, Class<V> c) {
try {return mapper.readValue(inputStream, c);
} catch (IOException e) {log.error("jackson from error, type: {}", c, e);
return null;
}
}
public static <V> V from(File file, Class<V> c) {
try {return mapper.readValue(file, c);
} catch (IOException e) {log.error("jackson from error, file path: {}, type: {}", file.getPath(), c, e);
return null;
}
}
public static <V> V from(Object jsonObj, Class<V> c) {
try {return mapper.readValue(jsonObj.toString(), c);
} catch (IOException e) {log.error("jackson from error, json: {}, type: {}", jsonObj.toString(), c, e);
return null;
}
}
public static <V> V from(String json, Class<V> c) {
try {return mapper.readValue(json, c);
} catch (IOException e) {log.error("jackson from error, json: {}, type: {}", json, c, e);
return null;
}
}
public static <V> V from(URL url, TypeReference<V> type) {
try {return mapper.readValue(url, type);
} catch (IOException e) {log.error("jackson from error, url: {}, type: {}", url.getPath(), type, e);
return null;
}
}
public static <V> V from(InputStream inputStream, TypeReference<V> type) {
try {return mapper.readValue(inputStream, type);
} catch (IOException e) {log.error("jackson from error, type: {}", type, e);
return null;
}
}
public static <V> V from(File file, TypeReference<V> type) {
try {return mapper.readValue(file, type);
} catch (IOException e) {log.error("jackson from error, file path: {}, type: {}", file.getPath(), type, e);
return null;
}
}
public static <V> V from(Object jsonObj, TypeReference<V> type) {
try {return mapper.readValue(jsonObj.toString(), type);
} catch (IOException e) {log.error("jackson from error, json: {}, type: {}", jsonObj.toString(), type, e);
return null;
}
}
public static <V> V from(String json, TypeReference<V> type) {
try {return mapper.readValue(json, type);
} catch (IOException e) {log.error("jackson from error, json: {}, type: {}", json, type, e);
return null;
}
}
public static <V> String to(List<V> list) {
try {return mapper.writeValueAsString(list);
} catch (JsonProcessingException e) {log.error("jackson to error, obj: {}", list, e);
return null;
}
}
public static <V> String to(V v) {
try {return mapper.writeValueAsString(v);
} catch (JsonProcessingException e) {log.error("jackson to error, obj: {}", v, e);
return null;
}
}
public static <V> void toFile(String path, List<V> list) {try (Writer writer = new FileWriter(new File(path), true)) {mapper.writer().writeValues(writer).writeAll(list);
writer.flush();} catch (Exception e) {log.error("jackson to file error, path: {}, list: {}", path, list, e);
}
}
public static <V> void toFile(String path, V v) {try (Writer writer = new FileWriter(new File(path), true)) {mapper.writer().writeValues(writer).write(v);
writer.flush();} catch (Exception e) {log.error("jackson to file error, path: {}, obj: {}", path, v, e);
}
}
public static String getString(String json, String key) {if (StringUtils.isEmpty(json)) {return null;}
try {JsonNode node = mapper.readTree(json);
if (null != node) {return node.get(key).asText();} else {return null;}
} catch (IOException e) {log.error("jackson get string error, json: {}, key: {}", json, key, e);
return null;
}
}
public static Integer getInt(String json, String key) {if (StringUtils.isEmpty(json)) {return null;}
try {JsonNode node = mapper.readTree(json);
if (null != node) {return node.get(key).intValue();} else {return null;}
} catch (IOException e) {log.error("jackson get int error, json: {}, key: {}", json, key, e);
return null;
}
}
public static Long getLong(String json, String key) {if (StringUtils.isEmpty(json)) {return null;}
try {JsonNode node = mapper.readTree(json);
if (null != node) {return node.get(key).longValue();} else {return null;}
} catch (IOException e) {log.error("jackson get long error, json: {}, key: {}", json, key, e);
return null;
}
}
public static Double getDouble(String json, String key) {if (StringUtils.isEmpty(json)) {return null;}
try {JsonNode node = mapper.readTree(json);
if (null != node) {return node.get(key).doubleValue();} else {return null;}
} catch (IOException e) {log.error("jackson get double error, json: {}, key: {}", json, key, e);
return null;
}
}
public static BigInteger getBigInteger(String json, String key) {if (StringUtils.isEmpty(json)) {return new BigInteger(String.valueOf(0.00));
}
try {JsonNode node = mapper.readTree(json);
if (null != node) {return node.get(key).bigIntegerValue();} else {return null;}
} catch (IOException e) {log.error("jackson get biginteger error, json: {}, key: {}", json, key, e);
return null;
}
}
public static BigDecimal getBigDecimal(String json, String key) {if (StringUtils.isEmpty(json)) {return null;}
try {JsonNode node = mapper.readTree(json);
if (null != node) {return node.get(key).decimalValue();} else {return null;}
} catch (IOException e) {log.error("jackson get bigdecimal error, json: {}, key: {}", json, key, e);
return null;
}
}
public static boolean getBoolean(String json, String key) {if (StringUtils.isEmpty(json)) {return false;}
try {JsonNode node = mapper.readTree(json);
if (null != node) {return node.get(key).booleanValue();} else {return false;}
} catch (IOException e) {log.error("jackson get boolean error, json: {}, key: {}", json, key, e);
return false;
}
}
public static byte[] getByte(String json, String key) {if (StringUtils.isEmpty(json)) {return null;}
try {JsonNode node = mapper.readTree(json);
if (null != node) {return node.get(key).binaryValue();} else {return null;}
} catch (IOException e) {log.error("jackson get byte error, json: {}, key: {}", json, key, e);
return null;
}
}
public static <T> ArrayList<T> getList(String json, String key) {if (StringUtils.isEmpty(json)) {return null;}
String string = getString(json, key);
return from(string, new TypeReference<ArrayList<T>>() {});
}
public static <T> String add(String json, String key, T value) {
try {JsonNode node = mapper.readTree(json);
add(node, key, value);
return node.toString();} catch (IOException e) {log.error("jackson add error, json: {}, key: {}, value: {}", json, key, value, e);
return json;
}
}
private static <T> void add(JsonNode jsonNode, String key, T value) {if (value instanceof String) {((ObjectNode) jsonNode).put(key, (String) value);
} else if (value instanceof Short) {((ObjectNode) jsonNode).put(key, (Short) value);
} else if (value instanceof Integer) {((ObjectNode) jsonNode).put(key, (Integer) value);
} else if (value instanceof Long) {((ObjectNode) jsonNode).put(key, (Long) value);
} else if (value instanceof Float) {((ObjectNode) jsonNode).put(key, (Float) value);
} else if (value instanceof Double) {((ObjectNode) jsonNode).put(key, (Double) value);
} else if (value instanceof BigDecimal) {((ObjectNode) jsonNode).put(key, (BigDecimal) value);
} else if (value instanceof BigInteger) {((ObjectNode) jsonNode).put(key, (BigInteger) value);
} else if (value instanceof Boolean) {((ObjectNode) jsonNode).put(key, (Boolean) value);
} else if (value instanceof byte[]) {((ObjectNode) jsonNode).put(key, (byte[]) value);
} else {((ObjectNode) jsonNode).put(key, to(value));
}
}
public static String remove(String json, String key) {
try {JsonNode node = mapper.readTree(json);
((ObjectNode) node).remove(key);
return node.toString();} catch (IOException e) {log.error("jackson remove error, json: {}, key: {}", json, key, e);
return json;
}
}
public static <T> String update(String json, String key, T value) {
try {JsonNode node = mapper.readTree(json);
((ObjectNode) node).remove(key);
add(node, key, value);
return node.toString();} catch (IOException e) {log.error("jackson update error, json: {}, key: {}, value: {}", json, key, value, e);
return json;
}
}
public static String format(String json) {
try {JsonNode node = mapper.readTree(json);
return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(node);
} catch (IOException e) {log.error("jackson format json error, json: {}", json, e);
return json;
}
}
public static boolean isJson(String json) {
try {mapper.readTree(json);
return true;
} catch (Exception e) {log.error("jackson check json error, json: {}", json, e);
return false;
}
}
private static InputStream getResourceStream(String name) {return JsonUtil.class.getClassLoader().getResourceAsStream(name);
}
private static InputStreamReader getResourceReader(InputStream inputStream) {if (null == inputStream) {return null;}
return new InputStreamReader(inputStream, StandardCharsets.UTF_8);
}
}
废除 OrderServiceImpl
类
之前的业务实现类 OrderServiceImpl
废除,前面用一个新的类 TxOrderService
来代替。
OrderServiceImpl.java
间接删除即可。
TxOrderService
发送事务音讯
TxAccountMessage
封装发送给账户服务的数据:用户 id 和扣减金额。另外还封装了事务 id。
package cn.tedu.order.tx;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TxAccountMessage {
Long userId;
BigDecimal money;
String xid;
}
在业务办法 create()
中不间接保留订单,而是发送事务音讯。
音讯收回后,会触发 TxListener
执行本地事务,它执行时会回调这里的 doCreate()
办法实现订单的保留。
package cn.tedu.order.tx;
import cn.tedu.order.entity.Order;
import cn.tedu.order.feign.EasyIdGeneratorClient;
import cn.tedu.order.mapper.OrderMapper;
import cn.tedu.order.mapper.TxMapper;
import cn.tedu.order.service.OrderService;
import cn.tedu.order.util.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.UUID;
@Slf4j
@Primary
@Service
public class TxOrderService implements OrderService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private OrderMapper orderMapper;
@Autowired
private TxMapper txMapper;
@Autowired
EasyIdGeneratorClient easyIdGeneratorClient;
/*
创立订单的业务办法
这里批改为:只向 Rocketmq 发送事务音讯。*/
@Override
public void create(Order order) {
// 产生事务 ID
String xid = UUID.randomUUID().toString().replace("-", "");
// 对事务相干数据进行封装,并转成 json 字符串
TxAccountMessage sMsg = new TxAccountMessage(order.getUserId(), order.getMoney(), xid);
String json = JsonUtil.to(sMsg);
//json 字符串封装到 Spring Message 对象
Message<String> msg = MessageBuilder.withPayload(json).build();
// 发送事务音讯
rocketMQTemplate.sendMessageInTransaction("order-topic:account", msg, order);
log.info("事务音讯已发送");
}
// 本地事务,执行订单保留
// 这个办法在事务监听器中调用
@Transactional
public void doCreate(Order order, String xid) {log.info("执行本地事务,保留订单");
// 从全局惟一 id 发号器取得 id
Long orderId = easyIdGeneratorClient.nextId("order_business");
order.setId(orderId);
orderMapper.create(order);
log.info("订单已保留!事务日志已保留");
}
}
TxListener
事务监听器
发送事务音讯后会触发事务监听器执行。
事务监听器有两个办法:
executeLocalTransaction()
:执行本地事务checkLocalTransaction()
:负责响应 Rocketmq 服务器的事务回查操作
TxListener
监听器类:
package cn.tedu.order.tx;
import cn.tedu.order.entity.Order;
import cn.tedu.order.mapper.TxMapper;
import cn.tedu.order.util.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQTransactionListener
public class TxListener implements RocketMQLocalTransactionListener {
@Autowired
private TxOrderService orderService;
@Autowired
private TxMapper txMapper;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {log.info("事务监听 - 开始执行本地事务");
// 监听器中失去的 message payload 是 byte[]
String json = new String((byte[]) message.getPayload());
String xid = JsonUtil.getString(json, "xid");
log.info("事务监听 -"+json);
log.info("事务监听 - xid:"+xid);
RocketMQLocalTransactionState state;
int status = 0;
Order order = (Order) o;
try {orderService.doCreate(order, xid);
log.info("本地事务执行胜利,提交音讯");
state = RocketMQLocalTransactionState.COMMIT;
status = 0;
} catch (Exception e) {e.printStackTrace();
log.info("本地事务执行失败,回滚音讯");
state = RocketMQLocalTransactionState.ROLLBACK;
status = 1;
}
TxInfo txInfo = new TxInfo(xid, System.currentTimeMillis(), status);
txMapper.insert(txInfo);
return state;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {log.info("事务监听 - 回查事务状态");
// 监听器中失去的 message payload 是 byte[]
String json = new String((byte[]) message.getPayload());
String xid = JsonUtil.getString(json, "xid");
TxInfo txInfo = txMapper.selectById(xid);
if (txInfo == null) {log.info("事务监听 - 回查事务状态 - 事务不存在:"+xid);
return RocketMQLocalTransactionState.UNKNOWN;
}
log.info("事务监听 - 回查事务状态 -"+ txInfo.getStatus());
switch (txInfo.getStatus()) {
case 0: return RocketMQLocalTransactionState.COMMIT;
case 1: return RocketMQLocalTransactionState.ROLLBACK;
default: return RocketMQLocalTransactionState.UNKNOWN;
}
}
}
启动订单我的项目进行测试
按程序启动我的项目:
- Eureka
- Easy Id Generator
- Order
调用保留订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100
察看控制台日志:
订单表:
事务表:
拜访 Rocketmq,查看事务音讯:
account 接管事务音讯,并执行本地事务
application.yml
增加 Rocketmq 连贯配置
rocketmq:
name-server: 192.168.64.151:9876;192.168.64.152:9876
JsonUtil
工具类
同下面的代码。
TxConsumer
接管事务音讯,调用账户业务办法
接管的音讯转换成 TxAccountMessage
对象,这里先创立这个类:
package cn.tedu.account.tx;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TxAccountMessage {
Long userId;
BigDecimal money;
String xid;
}
TxConsumer
实现音讯监听,收到音讯后实现扣减金额业务:
package cn.tedu.account.tx;
import cn.tedu.account.service.AccountService;
import cn.tedu.account.util.JsonUtil;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "account-consumer-group", topic = "order-topic", selectorExpression = "account")
public class TxConsumer implements RocketMQListener<String> {
@Autowired
private AccountService accountService;
@Override
public void onMessage(String msg) {TxAccountMessage txAccountMessage = JsonUtil.from(msg, new TypeReference<TxAccountMessage>() {});
log.info("收到音讯:"+txAccountMessage);
accountService.decrease(txAccountMessage.getUserId(), txAccountMessage.getMoney());
}
}
AccountServiceImpl
增加事务注解
package cn.tedu.account.service;
import cn.tedu.account.mapper.AccountMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
@Service
public class AccountServiceImpl implements AccountService {
@Autowired
private AccountMapper accountMapper;
@Transactional
@Override
public void decrease(Long userId, BigDecimal money) {accountMapper.decrease(userId,money);
}
}
启动 account 我的项目进行测试
按程序启动我的项目:
- Eureka
- Easy Id Generator
- Account
- Order
account 我的项目启动时,会立刻从 Rocketmq 收到音讯,执行账户扣减业务:
order 本地事务失败测试
批改 TxOrderService
增加模仿异样:
package cn.tedu.order.tx;
import cn.tedu.order.entity.Order;
import cn.tedu.order.feign.EasyIdGeneratorClient;
import cn.tedu.order.mapper.OrderMapper;
import cn.tedu.order.mapper.TxMapper;
import cn.tedu.order.service.OrderService;
import cn.tedu.order.util.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.UUID;
@Slf4j
@Primary
@Service
public class TxOrderService implements OrderService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private OrderMapper orderMapper;
@Autowired
private TxMapper txMapper;
@Autowired
EasyIdGeneratorClient easyIdGeneratorClient;
/*
创立订单的业务办法
这里批改为:只向 Rocketmq 发送事务音讯。*/
@Override
public void create(Order order) {
// 产生事务 ID
String xid = UUID.randomUUID().toString().replace("-", "");
// 对事务相干数据进行封装,并转成 json 字符串
TxAccountMessage sMsg = new TxAccountMessage(order.getUserId(), order.getMoney(), xid);
String json = JsonUtil.to(sMsg);
//json 字符串封装到 Spring Message 对象
Message<String> msg = MessageBuilder.withPayload(json).build();
// 发送事务音讯
log.info("开始发送事务音讯");
rocketMQTemplate.sendMessageInTransaction("order-topic:account", msg, order);
log.info("事务音讯已发送");
}
// 本地事务,执行订单保留
// 这个办法在事务监听器中调用
@Transactional
public void doCreate(Order order, String xid) {log.info("执行本地事务,保留订单");
// 从全局惟一 id 发号器取得 id
Long orderId = easyIdGeneratorClient.nextId("order_business");
order.setId(orderId);
orderMapper.create(order);
if (Math.random() < 0.5) {throw new RuntimeException("模仿异样");
}
log.info("订单已保留!事务日志已保留");
}
}
调用保留订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100
本地事务失败后,会告诉 Rocketmq 回滚事务:
测试完后,将模仿异样代码正文掉
account 本地事务失败测试
批改 AccountServiceImpl
,增加随机模仿异样:
package cn.tedu.account.service;
import cn.tedu.account.mapper.AccountMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
@Service
public class AccountServiceImpl implements AccountService {
@Autowired
private AccountMapper accountMapper;
@Transactional
@Override
public void decrease(Long userId, BigDecimal money) {accountMapper.decrease(userId,money);
if (Math.random() < 0.5) {throw new RuntimeException("模仿异样");
}
}
}
调用保留订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100
account 账户服务接管音讯后,如果解决失败,Rocketmq 会进行重试,直到解决胜利为止。
我的项目源码:https://gitee.com/benwang6/rocketmq-dtx