共计 11853 个字符,预计需要花费 30 分钟才能阅读完成。
前言
在应用 SpringBoot 的 starter 集成包时,要特地留神版本。因为 SpringBoot 集成 RocketMQ 的 starter 依赖是由 Spring 社区提供的,目前正在疾速迭代的过程当中,不同版本之间的差距十分大,甚至根底的底层对象都会常常有改变。例如如果应用 rocketmq-spring-boot-starter:2.0.4 版本开发的代码,降级到目前最新的 rocketmq-spring-boot-starter:2.1.1 后,根本就用不了了
利用构造
TestController: 测试入口, 有根本音讯测试和事务音讯测试
TopicListener: 是监听 ”topic” 这个主题的一般音讯监听器
TopicTransactionListener: 是监听 ”topic” 这个主题的事务音讯监听器, 和 TopicTransactionRocketMQTemplate 绑定 (一一对应关系)
Customer: 是测试音讯体的一个 entity 对象
TopicTransactionRocketMQTemplate: 是扩大自 RocketMQTemplate 的另一个 RocketMQTemplate, 专门用来解决某一个业务流程, 和 TopicTransactionListener 绑定 (一一对应关系)
pom.xml
org.apache.rocketmq:rocketmq-spring-boot-starter:2.1.1, 援用的 springboot 版本是 2.0.5.RELEASE
<?xml version="1.0" encoding="UTF-8"?> | |
<project xmlns="http://maven.apache.org/POM/4.0.0" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>com.mrathena.middle.ware</groupId> | |
<artifactId>rocket.mq.springboot</artifactId> | |
<version>1.0.0</version> | |
<dependencyManagement> | |
<dependencies> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-dependencies</artifactId> | |
<version>2.4.0</version> | |
<type>pom</type> | |
<scope>import</scope> | |
</dependency> | |
</dependencies> | |
</dependencyManagement> | |
<dependencies> | |
<dependency> | |
<groupId>org.projectlombok</groupId> | |
<artifactId>lombok</artifactId> | |
<version>1.18.12</version> | |
</dependency> | |
<dependency> | |
<groupId>org.slf4j</groupId> | |
<artifactId>slf4j-api</artifactId> | |
<version>1.7.30</version> | |
</dependency> | |
<dependency> | |
<groupId>ch.qos.logback</groupId> | |
<artifactId>logback-classic</artifactId> | |
<version>1.2.3</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.rocketmq</groupId> | |
<artifactId>rocketmq-spring-boot-starter</artifactId> | |
<version>2.1.1</version> | |
<!-- 屏蔽旧版本的 springboot, 援用的 springboot 版本是 2.0.5.RELEASE --> | |
<exclusions> | |
<exclusion> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter</artifactId> | |
</exclusion> | |
<exclusion> | |
<groupId>org.springframework</groupId> | |
<artifactId>spring-core</artifactId> | |
</exclusion> | |
<exclusion> | |
<groupId>org.springframework</groupId> | |
<artifactId>spring-webmvc</artifactId> | |
</exclusion> | |
<exclusion> | |
<groupId>org.springframework</groupId> | |
<artifactId>spring-aop</artifactId> | |
</exclusion> | |
<exclusion> | |
<groupId>org.springframework</groupId> | |
<artifactId>spring-context</artifactId> | |
</exclusion> | |
<exclusion> | |
<groupId>org.springframework</groupId> | |
<artifactId>spring-messaging</artifactId> | |
</exclusion> | |
<exclusion> | |
<groupId>com.fasterxml.jackson.core</groupId> | |
<artifactId>jackson-databind</artifactId> | |
</exclusion> | |
</exclusions> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-web</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-test</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework</groupId> | |
<artifactId>spring-messaging</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>com.fasterxml.jackson.core</groupId> | |
<artifactId>jackson-databind</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>io.springfox</groupId> | |
<artifactId>springfox-swagger-ui</artifactId> | |
<version>2.9.2</version> | |
</dependency> | |
<dependency> | |
<groupId>io.springfox</groupId> | |
<artifactId>springfox-swagger2</artifactId> | |
<version>2.9.2</version> | |
</dependency> | |
</dependencies> | |
<build> | |
<plugins> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-compiler-plugin</artifactId> | |
<version>3.8.1</version> | |
<configuration> | |
<source>1.8</source> | |
<target>1.8</target> | |
<encoding>UTF-8</encoding> | |
</configuration> | |
</plugin> | |
</plugins> | |
</build> | |
</project> |
application.yml
server: | |
servlet: | |
context-path: | |
port: 80 | |
rocketmq: | |
name-server: 116.62.162.48:9876 | |
producer: | |
group: producer |
Customer
package com.mrathena.rocket.mq.entity; | |
import lombok.AllArgsConstructor; | |
import lombok.Getter; | |
import lombok.NoArgsConstructor; | |
import lombok.Setter; | |
@Getter | |
@Setter | |
@NoArgsConstructor | |
@AllArgsConstructor | |
public class Customer { | |
private String username; | |
private String nickname; | |
} |
生产者 TestController
package com.mrathena.rocket.mq.controller; | |
import com.mrathena.rocket.mq.configuration.TopicTransactionRocketMQTemplate; | |
import com.mrathena.rocket.mq.entity.Customer; | |
import lombok.extern.slf4j.Slf4j; | |
import org.apache.rocketmq.client.producer.SendCallback; | |
import org.apache.rocketmq.client.producer.SendResult; | |
import org.apache.rocketmq.spring.core.RocketMQTemplate; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.messaging.Message; | |
import org.springframework.messaging.MessageHeaders; | |
import org.springframework.messaging.core.MessagePostProcessor; | |
import org.springframework.messaging.support.MessageBuilder; | |
import org.springframework.web.bind.annotation.GetMapping; | |
import org.springframework.web.bind.annotation.RequestMapping; | |
import org.springframework.web.bind.annotation.RestController; | |
import java.util.HashMap; | |
import java.util.Map; | |
@Slf4j | |
@RestController | |
@RequestMapping("test") | |
public class TestController { | |
private static final String TOPIC = "topic"; | |
@Autowired | |
private RocketMQTemplate rocketMQTemplate; | |
@Autowired | |
private TopicTransactionRocketMQTemplate topicTransactionRocketMQTemplate; | |
@GetMapping("base") | |
public Object base() { | |
// destination: topic/topic:tag, topic 或者是 topic 拼接 tag 的整合体 | |
// payload: 荷载即音讯体 | |
// message: org.springframework.messaging.Message, 是 Spring 本人封装的类, 和 RocketMQ 的 Message 不是一个类, 外面没有 tags/keys 等内容 | |
rocketMQTemplate.send(TOPIC, MessageBuilder.withPayload("你好").setHeader("你是谁", "你猜").build()); | |
// tags null | |
rocketMQTemplate.convertAndSend(TOPIC, "tag null"); | |
// tags empty, 证实 tag 要么有值要么 null, 不存在 empty 的 tag | |
rocketMQTemplate.convertAndSend(TOPIC + ":", "tag empty ?"); | |
// 只有 tag 没有 key | |
rocketMQTemplate.convertAndSend(TOPIC + ":a", "tag a"); | |
rocketMQTemplate.convertAndSend(TOPIC + ":b", "tag b"); | |
// 有 property, 即 RocketMQ 根底 API 外面, Message(String topic, String tags, String keys, byte[] body) 外面的 key | |
// rocketmq-spring-boot-starter 把 userProperty 和其余的一些属性都糅合在 headers 外面可, 具体能够参考 org.apache.rocketmq.spring.support.RocketMQUtil.addUserProperties | |
// 获取某个自定义的属性的时候, 间接 headers.get("自定义属性 key") 就能够了 | |
Map<String, Object> properties = new HashMap<>(); | |
properties.put("property", 1); | |
properties.put("another-property", "你好"); | |
rocketMQTemplate.convertAndSend(TOPIC, "property 1", properties); | |
rocketMQTemplate.convertAndSend(TOPIC + ":a", "tag a property 1", properties); | |
rocketMQTemplate.convertAndSend(TOPIC + ":b", "tag b property 1", properties); | |
properties.put("property", 5); | |
rocketMQTemplate.convertAndSend(TOPIC, "property 5", properties); | |
rocketMQTemplate.convertAndSend(TOPIC + ":a", "tag a property 5", properties); | |
rocketMQTemplate.convertAndSend(TOPIC + ":c", "tag c property 5", properties); | |
// 音讯后置处理器, 能够在发送前对音讯体和 headers 再做一波操作 | |
rocketMQTemplate.convertAndSend(TOPIC, "音讯后置处理器", new MessagePostProcessor() { | |
/** | |
* org.springframework.messaging.Message | |
*/ | |
@Override | |
public Message<?> postProcessMessage(Message<?> message) {Object payload = message.getPayload(); | |
MessageHeaders messageHeaders = message.getHeaders(); | |
return message; | |
} | |
}); | |
// convertAndSend 底层其实也是 syncSend | |
// syncSend | |
log.info("{}", rocketMQTemplate.syncSend(TOPIC, "sync send")); | |
// asyncSend | |
rocketMQTemplate.asyncSend(TOPIC, "async send", new SendCallback() { | |
@Override | |
public void onSuccess(SendResult sendResult) {log.info("onSuccess"); | |
} | |
@Override | |
public void onException(Throwable e) {log.info("onException"); | |
} | |
}); | |
// sendOneWay | |
rocketMQTemplate.sendOneWay(TOPIC, "send one way"); | |
// 这个我还是不太分明是干嘛的? 跑的时候会报错!!! | |
// Object receive = rocketMQTemplate.sendAndReceive(TOPIC, "你好", String.class); | |
// log.info("{}", receive); | |
return "success"; | |
} | |
@GetMapping("transaction") | |
public Object transaction() {Message<Customer> message = MessageBuilder.withPayload(new Customer("mrathena", "你是谁")).build(); | |
// 这里应用的是通过 @ExtRocketMQTemplateConfiguration(group = "anotherProducer") 扩大进去的另一个 RocketMQTemplate | |
log.info("{}", topicTransactionRocketMQTemplate.sendMessageInTransaction(TOPIC, message, null)); | |
log.info("{}", topicTransactionRocketMQTemplate.sendMessageInTransaction(TOPIC + ":tag-a", message, null)); | |
return "success"; | |
} | |
} |
配置 TopicTransactionRocketMQTemplate
package com.mrathena.rocket.mq.configuration; | |
import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration; | |
import org.apache.rocketmq.spring.core.RocketMQTemplate; | |
/** | |
* 一个事务流程和一个 RocketMQTemplate 须要一一对应 | |
* 能够通过 @ExtRocketMQTemplateConfiguration(留神该注解有 @Component 注解) 来扩大多个 RocketMQTemplate | |
* 留神: 不同事务流程的 RocketMQTemplate 的 producerGroup 不能雷同 | |
* 因为 MQBroker 会反向调用同一个 producerGroup 下的某个 checkLocalTransactionState 办法, 不同流程应用雷同的 producerGroup 的话, 办法可能会调用错 | |
*/ | |
@ExtRocketMQTemplateConfiguration(group = "anotherProducer") | |
public class TopicTransactionRocketMQTemplate extends RocketMQTemplate {} |
消费者 TopicListener
package com.mrathena.rocket.mq.listener; | |
import lombok.extern.slf4j.Slf4j; | |
import org.apache.rocketmq.spring.annotation.ConsumeMode; | |
import org.apache.rocketmq.spring.annotation.MessageModel; | |
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; | |
import org.apache.rocketmq.spring.core.RocketMQListener; | |
import org.springframework.stereotype.Component; | |
/** | |
* 最简略的消费者例子 | |
* topic: 主题 | |
* consumerGroup: 消费者组 | |
* selectorType: 过滤形式, TAG: 标签过滤, 仅反对标签, SQL92:SQL 过滤, 反对标签和属性 | |
* selectorExpression: 过滤表达式, 依据 selectorType 定, TAG 时, 写标签如 "a || b", SQL92 时, 写 SQL 表达式 | |
* consumeMode: CONCURRENTLY: 并发生产, ORDERLY: 程序生产 | |
* messageModel: CLUSTERING: 集群竞争生产, BROADCASTING: 播送生产 | |
*/ | |
@Slf4j | |
@Component | |
@RocketMQMessageListener(topic = "topic", | |
// 只过滤 tag, 不论 headers 中的 key 和 value | |
// selectorType = SelectorType.TAG, | |
// 必须指定 selectorExpression, 能够过滤 tag 和 headers 中的 key 和 value | |
// selectorType = SelectorType.SQL92, | |
// 不限 tag | |
// selectorExpression = "*", | |
// 不限 tag, 和 * 统一 | |
// selectorExpression = "", | |
// 只有 tag 为 a 的音讯 | |
// selectorExpression = "a", | |
// 要 tag 为 a 或 b 的音讯 | |
// selectorExpression = "a || b", | |
// SelectorType.SQL92 时, 能够跳过 tag, 间接用 headers 外面的 key 和 value 来判断 | |
// selectorExpression = "property = 1", | |
// tag 不为 null | |
// selectorExpression = "TAGS is not null", | |
// tag 为 empty, 证实 tag 不会是 empty, 要么有值要么 null | |
// selectorExpression = "TAGS =''", | |
// SelectorType.SQL92 时, 即过滤 tag, 又过滤 headers 外面的 key 和 value | |
// selectorExpression = "(TAGS is not null and TAGS ='a') and (property is not null and property between 4 and 6)", | |
// 并发生产 | |
consumeMode = ConsumeMode.CONCURRENTLY, | |
// 程序生产 | |
// consumeMode = ConsumeMode.ORDERLY, | |
// 集群生产 | |
messageModel = MessageModel.CLUSTERING, | |
// 播送生产 | |
// messageModel = MessageModel.BROADCASTING, | |
consumerGroup = "consumer" | |
) | |
public class TopicListener implements RocketMQListener<String> {public void onMessage(String s) {log.info("{}", s); | |
} | |
} |
消费者 TopicTransactionListener
package com.mrathena.rocket.mq.listener; | |
import lombok.extern.slf4j.Slf4j; | |
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; | |
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; | |
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; | |
import org.apache.rocketmq.spring.support.RocketMQHeaders; | |
import org.springframework.messaging.Message; | |
import org.springframework.messaging.MessageHeaders; | |
import org.springframework.stereotype.Component; | |
@Slf4j | |
@Component | |
@RocketMQTransactionListener(rocketMQTemplateBeanName = "topicTransactionRocketMQTemplate") | |
public class TopicTransactionListener implements RocketMQLocalTransactionListener { | |
@Override | |
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { | |
// message: org.springframework.messaging.Message, 是 Spring 本人封装的类, 和 RocketMQ 的 Message 不是一个类, 外面没有 tags/keys 等内容 | |
// 一般来说, 并不会在这里解决 tags/keys 等内容, 而是依据音讯体中的某些字段做不同的操作, 第二个参数也能够用来传递一些数据到这里 | |
log.info("executeLocalTransaction message:{}, object:{}", message, o); | |
log.info("payload: {}", new String((byte[]) message.getPayload())); | |
MessageHeaders headers = message.getHeaders(); | |
log.info("tags: {}", headers.get(RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS)); | |
log.info("rocketmq_TOPIC: {}", headers.get("rocketmq_TOPIC")); | |
log.info("rocketmq_QUEUE_ID: {}", headers.get("rocketmq_QUEUE_ID")); | |
log.info("rocketmq_MESSAGE_ID: {}", headers.get("rocketmq_MESSAGE_ID")); | |
log.info("rocketmq_TRANSACTION_ID: {}", headers.get("rocketmq_TRANSACTION_ID")); | |
log.info("TRANSACTION_CHECK_TIMES: {}", headers.get("TRANSACTION_CHECK_TIMES")); | |
log.info("id: {}", headers.get("id")); | |
return RocketMQLocalTransactionState.UNKNOWN; | |
} | |
@Override | |
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {log.info("checkLocalTransaction message:{}", message); | |
// 在调用了 checkLocalTransaction 后, 另一个惯例音讯监听器能力收到音讯 | |
return RocketMQLocalTransactionState.COMMIT; | |
} | |
} |
最初
欢送关注公众号:前程有光,支付一线大厂 Java 面试题总结 + 各知识点学习思维导 + 一份 300 页 pdf 文档的 Java 外围知识点总结!