作者:小傅哥
博客:https://bugstack.cn
积淀、分享、成长,让本人和别人都能有所播种!😄
本文的主旨在于通过简略洁净实际的形式教会读者,应用 Docker 配置 RocketMQ 并在基于 DDD 分层构造的 SpringBoot 工程中应用 RocketMQ 技术。因为大部分 MQ 的发送都是基于特定业务场景的,所以本章节也是基于《MyBatis 应用教程和插件开发》章节的扩大。
本章也会包含对于 MQ 音讯的发送和接管应该处于 DDD 的哪一层的实际解说和应用。
本文波及的工程:
- xfg-dev-tech-rocketmq:https://gitcode.net/KnowledgePlanet/road-map/xfg-dev-tech-roc…
- RocketMQ Docker 装置:rocketmq-docker-compose-mac-amd-arm.yml
- 导入测试库表 road-map.sql
一、案例背景
首先咱们要晓得,MQ 音讯的作用是用于;解耦过长的业务流程
和应答流量冲击的消峰
。如;用户下单领取实现后,拿到领取音讯推动后续的发货流程。也能够是咱们基于《MyBatis 应用教程和插件开发》中的案例场景,给雇员晋升级别和薪资的时候,也发送一条 MQ 音讯,用于发送邮件告诉给用户。
- 从薪资调整到邮件发送,这里是 2 个业务流程,通过 MQ 音讯的形式进行连贯。
- 其实 MQ 音讯的应用场景特地多,原来你可能应用多线程的一些操作,当初就扩大为多实例的操作了。发送 MQ 音讯进去,让利用的各个实例接管并进行生产。
二、畛域事件
因为咱们本章所解说的内容是把 RocketMQ 放入 DDD 架构中进行应用,那么也就引申出畛域事件定义。所以咱们先来理解下,什么是畛域事件。
畛域事件,能够说是解耦微服务设计的要害。畛域事件也是畛域模型中十分重要的一部分内容,用于标示以后畛域模型中产生的事件行为。一个畛域事件会推动业务流程的进一步操作,在实现业务解耦的同时,也推动了整个业务的闭环。
- 首先,咱们须要在畛域模型层,增加一块 event 区域。它的存在是为了定义出于以后畛域下所需的事件音讯信息。信息的类型能够是 model 下的实体对象、聚合对象。
- 之后,音讯的发送是放在根底设置层。自身根底设置层就是依赖倒置于模型层,所以在模型层所定义的 event 对象,能够很不便的在根底设置层应用。而且大部分开发的时候,MQ 音讯的发送与数据库操作都是关联的,采纳的形式是,做完数据落库后,推送 MQ 音讯。所以定义在仓储中实现,会更加得心应手、瓜熟蒂落。
- 最初,就是 MQ 的音讯,MQ 的生产能够是本身服务所收回的音讯,也能够是内部其余微服务的音讯。就在小傅哥所整体讲述的这套扼要教程中 DDD 局部的触发器层。
三、环境装置
本案例波及了数据库和 RocketMQ 的应用,都曾经在工程中提供了装置脚本,能够按需执行。
这里次要介绍 RocketMQ 的装置;
1. 执行 compose yml
文件:docs/rocketmq/rocketmq-docker-compose-mac-amd-arm.yml – 对于装置小傅哥提供了不同的镜像,包含 Mac、Mac M1、Windows 能够按需抉择应用。
version: '3'
services:
# https://hub.docker.com/r/xuchengen/rocketmq
# 留神批改项;# 01:data/rocketmq/conf/broker.conf 增加 brokerIP1=127.0.0.1
# 02:data/console/config/application.properties server.port=9009 - 如果 8080 端口被占用,能够批改或者增加映射端口
rocketmq:
image: livinphp/rocketmq:5.1.0
container_name: rocketmq
ports:
- 9009:9009
- 9876:9876
- 10909:10909
- 10911:10911
- 10912:10912
volumes:
- ./data:/home/app/data
environment:
TZ: "Asia/Shanghai"
NAMESRV_ADDR: "rocketmq:9876"
- 在 IDEA 中关上 rocketmq-docker-compose-mac-amd-arm.yml 你会看到一个绿色的按钮在左侧侧边栏,点击即可装置。或者你也能够应用命令装置:
# /usr/local/bin/docker-compose -f /docs/dev-ops/environment/environment-docker-compose.yml up -d
– 比拟适宜在云服务器上执行。 - 首次装置可能应用不了,一个起因是 brokerIP1 未配置 IP,另外一个是默认的 8080 端口占用。能够依照如下小傅哥说的形式批改。
2. 批改默认配合
- 关上
data/rocketmq/conf/broker.conf
增加一条brokerIP1=127.0.0.1
在结尾
# 集群名称
brokerClusterName = DefaultCluster
# BROKER 名称
brokerName = broker-a
# 0 示意 Master, > 0 示意 Slave
brokerId = 0
# 删除文件工夫点,默认凌晨 4 点
deleteWhen = 04
# 文件保留工夫,默认 48 小时
fileReservedTime = 48
# BROKER 角色 ASYNC_MASTER 为异步主节点,SYNC_MASTER 为同步主节点,SLAVE 为从节点
brokerRole = ASYNC_MASTER
# 刷新数据到磁盘的形式,ASYNC_FLUSH 刷新
flushDiskType = ASYNC_FLUSH
# 存储门路
storePathRootDir = /home/app/data/rocketmq/store
# IP 地址
brokerIP1 = 127.0.0.1
- 关上
`data/console/config/application.properties
批改server.port=9009
端口。
server.address=0.0.0.0
server.port=9009
- 批改配置后,重启服务。
3. RockMQ 登录与配置
3.1 登录
RocketMQ 此镜像,会在装置后在控制台打印登录账号信息,你能够查看应用。
登录:http://localhost:9009/
3.2 创立 Topic
- 也能够应用命令创立:
docker exec -it rocketmq sh /home/app/rocketmq/bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t xfg-mq
3.3 创立消费者组
- 也能够应用命令创立:
docker exec -it rocketmq sh /home/app/rocketmq/bin/mqadmin updateSubGroup -n localhost:9876 -c DefaultCluster -g xfg-group
四、工程实现
1. 工程构造
- MQ 的应用无论是 RocketMQ 还是 Kafka 等,都很简略。但在应用之前,要思考好怎么在架构中正当的应用。如果最后没有定义好这些,那么胡乱的任何中央都能发送和接管 MQ,最初的工程将十分难以保护。
- 所以这里整个 MQ 的生产和生产,是依照整个 DDD 畛域事件构造进行设计。分为在 domain 应用根底层生产音讯,再有 trigger 层接管音讯。
2. 配置文件
引入 POM
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client-java -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
增加配置
# RocketMQ 配置
rocketmq:
name-server: 127.0.0.1:9876
consumer:
group: xfg-group
# 一次拉取音讯最大值,留神是拉取音讯的最大值而非生产最大值
pull-batch-size: 10
producer:
# 发送同一类音讯的设置为同一个 group,保障惟一
group: xfg-group
# 发送音讯超时工夫,默认 3000
sendMessageTimeout: 10000
# 发送音讯失败重试次数,默认 2
retryTimesWhenSendFailed: 2
# 异步音讯重试此处,默认 2
retryTimesWhenSendAsyncFailed: 2
# 音讯最大长度,默认 1024 * 1024 * 4(默认 4M)
maxMessageSize: 4096
# 压缩音讯阈值,默认 4k(1024 * 4)
compressMessageBodyThreshold: 4096
# 是否在外部发送失败时重试另一个 broker,默认 false
retryNextServer: false
3. 定义畛域事件
源码:cn.bugstack.xfg.dev.tech.domain.salary.event.SalaryAdjustEvent
@EqualsAndHashCode(callSuper = true)
@Data
public class SalaryAdjustEvent extends BaseEvent<AdjustSalaryApplyOrderAggregate> {
public static String TOPIC = "xfg-mq";
public static SalaryAdjustEvent create(AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate) {SalaryAdjustEvent event = new SalaryAdjustEvent();
event.setId(RandomStringUtils.randomNumeric(11));
event.setTimestamp(new Date());
event.setData(adjustSalaryApplyOrderAggregate);
return event;
}
}
- 每个畛域的音讯,都有畛域本人定义。发送的时候再交给基础设施层来发送。
4. 音讯发送
源码:cn.bugstack.xfg.dev.tech.infrastructure.event.EventPublisher
@Component
@Slf4j
public class EventPublisher {@Setter(onMethod_ = @Autowired)
private RocketMQTemplate rocketmqTemplate;
/**
* 一般音讯
*
* @param topic 主题
* @param message 音讯
*/
public void publish(String topic, BaseEvent<?> message) {
try {String mqMessage = JSON.toJSONString(message);
log.info("发送 MQ 音讯 topic:{} message:{}", topic, mqMessage);
rocketmqTemplate.convertAndSend(topic, mqMessage);
} catch (Exception e) {log.error("发送 MQ 音讯失败 topic:{} message:{}", topic, JSON.toJSONString(message), e);
// 大部分 MQ 发送失败后,会须要工作弥补
}
}
/**
* 提早音讯
*
* @param topic 主题
* @param message 音讯
* @param delayTimeLevel 提早时长
*/
public void publishDelivery(String topic, BaseEvent<?> message, int delayTimeLevel) {
try {String mqMessage = JSON.toJSONString(message);
log.info("发送 MQ 提早音讯 topic:{} message:{}", topic, mqMessage);
rocketmqTemplate.syncSend(topic, MessageBuilder.withPayload(message).build(), 1000, delayTimeLevel);
} catch (Exception e) {log.error("发送 MQ 提早音讯失败 topic:{} message:{}", topic, JSON.toJSONString(message), e);
// 大部分 MQ 发送失败后,会须要工作弥补
}
}
}
- 在基础设施层提供 event 事件的解决,也就是 MQ 音讯的发送。
源码:cn.bugstack.xfg.dev.tech.infrastructure.repository.SalaryAdjustRepository
@Resource
private EventPublisher eventPublisher;
@Override
@Transactional(rollbackFor = Exception.class, timeout = 350, propagation = Propagation.REQUIRED, isolation = Isolation.DEFAULT)
public String adjustSalary(AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate) {
// ... 省略局部代码
eventPublisher.publish(SalaryAdjustEvent.TOPIC, SalaryAdjustEvent.create(adjustSalaryApplyOrderAggregate));
return orderId;
}
在 SalaryAdjustRepository 仓储的实现中,做完业务流程开始发送 MQ 音讯。这里有 2 点要留神;
- 音讯发送,不要写在数据库事务中。因为事务始终占用数据库连贯,须要疾速开释。
- 对于一些强 MQ 要求的场景,须要在发送 MQ 前,写入一条数据库 Task 记录,发送音讯后更新 Task 状态为胜利。如果长时间未更新数据库状态或者为失败的,则须要由工作弥补进行解决。
5. 生产音讯
源码:cn.bugstack.xfg.dev.tech.trigger.mq.SalaryAdjustMQListener
@Component
@Slf4j
@RocketMQMessageListener(topic = "xfg-mq", consumerGroup = "xfg-group")
public class SalaryAdjustMQListener implements RocketMQListener<String> {
@Override
public void onMessage(String s) {log.info("接管到 MQ 音讯 {}", s);
}
}
- 生产音讯,配置消费者组合生产的主题,之后就能够接管到音讯了。接管当前你能够做本人的业务,如果抛出异样,音讯会进行从新接管解决。
六、测试验证
1. 独自发送音讯测试
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class RocketMQTest {@Setter(onMethod_ = @Autowired)
private RocketMQTemplate rocketmqTemplate;
@Test
public void test() throws InterruptedException {while (true) {rocketmqTemplate.convertAndSend("xfg-mq", "我是测试音讯");
Thread.sleep(3000);
}
}
}
- 这里不便你来发送音讯,验证流程。
2. 业务流程音讯验证
@Test
public void test_execSalaryAdjust() throws InterruptedException {AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate = AdjustSalaryApplyOrderAggregate.builder()
.employeeNumber("10000001")
.orderId("100908977676003")
.employeeEntity(EmployeeEntity.builder().employeeLevel(EmployeePostVO.T3).employeeTitle(EmployeePostVO.T3).build())
.employeeSalaryAdjustEntity(EmployeeSalaryAdjustEntity.builder()
.adjustTotalAmount(new BigDecimal(100))
.adjustBaseAmount(new BigDecimal(80))
.adjustMeritAmount(new BigDecimal(20)).build())
.build();
String orderId = salaryAdjustApplyService.execSalaryAdjust(adjustSalaryApplyOrderAggregate);
log.info("调薪测试 req: {} res: {}", JSON.toJSONString(adjustSalaryApplyOrderAggregate), orderId);
Thread.sleep(Integer.MAX_VALUE);
}
23-07-29.15:40:52.307 [main] INFO HikariDataSource - HikariPool-1 - Start completed.
23-07-29.15:40:52.445 [main] INFO EventPublisher - 发送 MQ 音讯 topic:xfg-mq message:{"data":{"employeeEntity":{"employeeLevel":"T3","employeeTitle":"T3"},"employeeNumber":"10000001","employeeSalaryAdjustEntity":{"adjustBaseAmount":80,"adjustMeritAmount":20,"adjustTotalAmount":100},"orderId":"100908977676004"},"id":"98117654515","timestamp":"2023-07-29 15:40:52.425"}
23-07-29.15:40:52.517 [main] INFO ISalaryAdjustApplyServiceTest - 调薪测试 req: {"employeeEntity":{"employeeLevel":"T3","employeeTitle":"T3"},"employeeNumber":"10000001","employeeSalaryAdjustEntity":{"adjustBaseAmount":80,"adjustMeritAmount":20,"adjustTotalAmount":100},"orderId":"100908977676004"} res: 100908977676004
23-07-29.15:40:52.520 [ConsumeMessageThread_1] INFO SalaryAdjustMQListener - 接管到 MQ 音讯 {"data":{"employeeEntity":{"employeeLevel":"T3","employeeTitle":"T3"},"employeeNumber":"10000001","employeeSalaryAdjustEntity":{"adjustBaseAmount":80,"adjustMeritAmount":20,"adjustTotalAmount":100},"orderId":"100908977676004"},"id":"98117654515","timestamp":"2023-07-29 15:40:52.425"}
- 当执行一次加薪调整后,就会接管到 MQ 音讯了。