乐趣区

关于java:面试官小伙子你给我简单说一下RocketMQ-整合-Spring-Boot吧

前言

在应用 SpringBoot 的 starter 集成包时,要特地留神版本。因为 SpringBoot 集成 RocketMQ 的 starter 依赖是由 Spring 社区提供的,目前正在疾速迭代的过程当中,不同版本之间的差距十分大,甚至根底的底层对象都会常常有改变。例如如果应用 rocketmq-spring-boot-starter:2.0.4 版本开发的代码,降级到目前最新的 rocketmq-spring-boot-starter:2.1.1 后,根本就用不了了

利用构造

TestController: 测试入口, 有根本音讯测试和事务音讯测试
TopicListener: 是监听 ”topic” 这个主题的一般音讯监听器
TopicTransactionListener: 是监听 ”topic” 这个主题的事务音讯监听器, 和 TopicTransactionRocketMQTemplate 绑定 (一一对应关系)
Customer: 是测试音讯体的一个 entity 对象
TopicTransactionRocketMQTemplate: 是扩大自 RocketMQTemplate 的另一个 RocketMQTemplate, 专门用来解决某一个业务流程, 和 TopicTransactionListener 绑定 (一一对应关系)

pom.xml

org.apache.rocketmq:rocketmq-spring-boot-starter:2.1.1, 援用的 springboot 版本是 2.0.5.RELEASE

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.mrathena.middle.ware</groupId>
    <artifactId>rocket.mq.springboot</artifactId>
    <version>1.0.0</version>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.4.0</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
            <!-- 屏蔽旧版本的 springboot, 援用的 springboot 版本是 2.0.5.RELEASE -->
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-webmvc</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-aop</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-context</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-messaging</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-databind</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-messaging</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

application.yml

server:
  servlet:
    context-path:
  port: 80
rocketmq:
  name-server: 116.62.162.48:9876
  producer:
    group: producer

Customer

package com.mrathena.rocket.mq.entity;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class Customer {
    private String username;
    private String nickname;
}

生产者 TestController

package com.mrathena.rocket.mq.controller;

import com.mrathena.rocket.mq.configuration.TopicTransactionRocketMQTemplate;
import com.mrathena.rocket.mq.entity.Customer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@RestController
@RequestMapping("test")
public class TestController {

    private static final String TOPIC = "topic";

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @Autowired
    private TopicTransactionRocketMQTemplate topicTransactionRocketMQTemplate;

    @GetMapping("base")
    public Object base() {
        // destination: topic/topic:tag, topic 或者是 topic 拼接 tag 的整合体
        // payload: 荷载即音讯体
        // message: org.springframework.messaging.Message, 是 Spring 本人封装的类, 和 RocketMQ 的 Message 不是一个类, 外面没有 tags/keys 等内容
        rocketMQTemplate.send(TOPIC, MessageBuilder.withPayload("你好").setHeader("你是谁", "你猜").build());
        // tags null
        rocketMQTemplate.convertAndSend(TOPIC, "tag null");
        // tags empty, 证实 tag 要么有值要么 null, 不存在 empty 的 tag
        rocketMQTemplate.convertAndSend(TOPIC + ":", "tag empty ?");
        // 只有 tag 没有 key
        rocketMQTemplate.convertAndSend(TOPIC + ":a", "tag a");
        rocketMQTemplate.convertAndSend(TOPIC + ":b", "tag b");
        // 有 property, 即 RocketMQ 根底 API 外面, Message(String topic, String tags, String keys, byte[] body) 外面的 key
        // rocketmq-spring-boot-starter 把 userProperty 和其余的一些属性都糅合在 headers 外面可, 具体能够参考 org.apache.rocketmq.spring.support.RocketMQUtil.addUserProperties
        // 获取某个自定义的属性的时候, 间接 headers.get("自定义属性 key") 就能够了
        Map<String, Object> properties = new HashMap<>();
        properties.put("property", 1);
        properties.put("another-property", "你好");
        rocketMQTemplate.convertAndSend(TOPIC, "property 1", properties);
        rocketMQTemplate.convertAndSend(TOPIC + ":a", "tag a property 1", properties);
        rocketMQTemplate.convertAndSend(TOPIC + ":b", "tag b property 1", properties);
        properties.put("property", 5);
        rocketMQTemplate.convertAndSend(TOPIC, "property 5", properties);
        rocketMQTemplate.convertAndSend(TOPIC + ":a", "tag a property 5", properties);
        rocketMQTemplate.convertAndSend(TOPIC + ":c", "tag c property 5", properties);

        // 音讯后置处理器, 能够在发送前对音讯体和 headers 再做一波操作
        rocketMQTemplate.convertAndSend(TOPIC, "音讯后置处理器", new MessagePostProcessor() {
            /**
             * org.springframework.messaging.Message
             */
            @Override
            public Message<?> postProcessMessage(Message<?> message) {Object payload = message.getPayload();
                MessageHeaders messageHeaders = message.getHeaders();
                return message;
            }
        });

        // convertAndSend 底层其实也是 syncSend
        // syncSend
        log.info("{}", rocketMQTemplate.syncSend(TOPIC, "sync send"));
        // asyncSend
        rocketMQTemplate.asyncSend(TOPIC, "async send", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {log.info("onSuccess");
            }

            @Override
            public void onException(Throwable e) {log.info("onException");
            }
        });
        // sendOneWay
        rocketMQTemplate.sendOneWay(TOPIC, "send one way");

        // 这个我还是不太分明是干嘛的? 跑的时候会报错!!!
//        Object receive = rocketMQTemplate.sendAndReceive(TOPIC, "你好", String.class);
//        log.info("{}", receive);

        return "success";
    }

    @GetMapping("transaction")
    public Object transaction() {Message<Customer> message = MessageBuilder.withPayload(new Customer("mrathena", "你是谁")).build();
        // 这里应用的是通过 @ExtRocketMQTemplateConfiguration(group = "anotherProducer") 扩大进去的另一个 RocketMQTemplate
        log.info("{}", topicTransactionRocketMQTemplate.sendMessageInTransaction(TOPIC, message, null));
        log.info("{}", topicTransactionRocketMQTemplate.sendMessageInTransaction(TOPIC + ":tag-a", message, null));
        return "success";
    }

}

配置 TopicTransactionRocketMQTemplate

package com.mrathena.rocket.mq.configuration;

import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;

/**
 * 一个事务流程和一个 RocketMQTemplate 须要一一对应
 * 能够通过 @ExtRocketMQTemplateConfiguration(留神该注解有 @Component 注解) 来扩大多个 RocketMQTemplate
 * 留神: 不同事务流程的 RocketMQTemplate 的 producerGroup 不能雷同
 * 因为 MQBroker 会反向调用同一个 producerGroup 下的某个 checkLocalTransactionState 办法, 不同流程应用雷同的 producerGroup 的话, 办法可能会调用错
 */
@ExtRocketMQTemplateConfiguration(group = "anotherProducer")
public class TopicTransactionRocketMQTemplate extends RocketMQTemplate {}

消费者 TopicListener

package com.mrathena.rocket.mq.listener;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * 最简略的消费者例子
 * topic: 主题
 * consumerGroup: 消费者组
 * selectorType: 过滤形式, TAG: 标签过滤, 仅反对标签, SQL92:SQL 过滤, 反对标签和属性
 * selectorExpression: 过滤表达式, 依据 selectorType 定, TAG 时, 写标签如 "a || b", SQL92 时, 写 SQL 表达式
 * consumeMode: CONCURRENTLY: 并发生产, ORDERLY: 程序生产
 * messageModel: CLUSTERING: 集群竞争生产, BROADCASTING: 播送生产
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "topic",
        // 只过滤 tag, 不论 headers 中的 key 和 value
//        selectorType = SelectorType.TAG,
        // 必须指定 selectorExpression, 能够过滤 tag 和 headers 中的 key 和 value
//        selectorType = SelectorType.SQL92,
        // 不限 tag
//        selectorExpression = "*",
        // 不限 tag, 和 * 统一
//        selectorExpression = "",
        // 只有 tag 为 a 的音讯
//        selectorExpression = "a",
        // 要 tag 为 a 或 b 的音讯
//        selectorExpression = "a || b",
        // SelectorType.SQL92 时, 能够跳过 tag, 间接用 headers 外面的 key 和 value 来判断
//        selectorExpression = "property = 1",
        // tag 不为 null
//        selectorExpression = "TAGS is not null",
        // tag 为 empty, 证实 tag 不会是 empty, 要么有值要么 null
//        selectorExpression = "TAGS =''",
        // SelectorType.SQL92 时, 即过滤 tag, 又过滤 headers 外面的 key 和 value
//        selectorExpression = "(TAGS is not null and TAGS ='a') and (property is not null and property between 4 and 6)",
        // 并发生产
        consumeMode = ConsumeMode.CONCURRENTLY,
        // 程序生产
//        consumeMode = ConsumeMode.ORDERLY,
        // 集群生产
        messageModel = MessageModel.CLUSTERING,
        // 播送生产
//        messageModel = MessageModel.BROADCASTING,
        consumerGroup = "consumer"
)
public class TopicListener implements RocketMQListener<String> {public void onMessage(String s) {log.info("{}", s);
    }
}

消费者 TopicTransactionListener

package com.mrathena.rocket.mq.listener;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RocketMQTransactionListener(rocketMQTemplateBeanName = "topicTransactionRocketMQTemplate")
public class TopicTransactionListener implements RocketMQLocalTransactionListener {

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        // message: org.springframework.messaging.Message, 是 Spring 本人封装的类, 和 RocketMQ 的 Message 不是一个类, 外面没有 tags/keys 等内容
        // 一般来说, 并不会在这里解决 tags/keys 等内容, 而是依据音讯体中的某些字段做不同的操作, 第二个参数也能够用来传递一些数据到这里
        log.info("executeLocalTransaction message:{}, object:{}", message, o);
        log.info("payload: {}", new String((byte[]) message.getPayload()));
        MessageHeaders headers = message.getHeaders();
        log.info("tags: {}", headers.get(RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS));
        log.info("rocketmq_TOPIC: {}", headers.get("rocketmq_TOPIC"));
        log.info("rocketmq_QUEUE_ID: {}", headers.get("rocketmq_QUEUE_ID"));
        log.info("rocketmq_MESSAGE_ID: {}", headers.get("rocketmq_MESSAGE_ID"));
        log.info("rocketmq_TRANSACTION_ID: {}", headers.get("rocketmq_TRANSACTION_ID"));
        log.info("TRANSACTION_CHECK_TIMES: {}", headers.get("TRANSACTION_CHECK_TIMES"));
        log.info("id: {}", headers.get("id"));
        return RocketMQLocalTransactionState.UNKNOWN;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {log.info("checkLocalTransaction message:{}", message);
        // 在调用了 checkLocalTransaction 后, 另一个惯例音讯监听器能力收到音讯
        return RocketMQLocalTransactionState.COMMIT;
    }
}

最初

欢送关注公众号:前程有光,支付一线大厂 Java 面试题总结 + 各知识点学习思维导 + 一份 300 页 pdf 文档的 Java 外围知识点总结!

退出移动版