关于rocketmq:rocketmqspring-实战与源码解析一网打尽

1次阅读

共计 4492 个字符,预计需要花费 12 分钟才能阅读完成。

RocketMQ 是大家耳熟能详的音讯队列,开源我的项目 rocketmq-spring 能够帮忙开发者在 Spring Boot 我的项目中疾速整合 RocketMQ。
这篇文章会介绍 Spring Boot 我的项目应用 rocketmq-spring SDK 实现音讯收发的操作流程,同时笔者会从开发者的角度解读 SDK 的设计逻辑。

1 SDK 简介

我的项目地址:

https://github.com/apache/rocketmq-spring

rocketmq-spring 的实质是一个 Spring Boot starter。
Spring Boot 基于“约定大于配置”(Convention over configuration)这一理念来疾速地开发、测试、运行和部署 Spring 利用,并能通过简略地与各种启动器(如 spring-boot-web-starter)联合,让利用间接以命令行的形式运行,不需再部署到独立容器中。
Spring Boot starter 结构的启动器应用起来十分不便,开发者只须要在 pom.xml 引入 starter 的依赖定义,在配置文件中编写约定的配置即可。
上面咱们看下 rocketmq-spring-boot-starter 的配置:
1、引入依赖
<dependency>  <groupId>org.apache.rocketmq</groupId>  <artifactId>rocketmq-spring-boot-starter</artifactId>  <version>2.2.3</version></dependency>
2、约定配置

接下来,咱们别离依照生产者和消费者的程序,具体的解说音讯收发的操作过程。
2 生产者
首先咱们增加依赖后,进行如下三个步骤:
1、配置文件中配置如下
rocketmq:  name-server: 127.0.0.1:9876  producer:      group: platform-sms-server-group    # access-key: myaccesskey    # secret-key: mysecretkey  topic: sms-common-topic
生产者配置非常简单,次要配置名字服务地址和生产者组。
2、须要发送音讯的类中注入 RcoketMQTemplate
@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Value(“${rocketmq.topic}”)private String smsTopic;
3、发送音讯,音讯体能够是自定义对象,也能够是 Message 对象
rocketMQTemplate 类蕴含多钟发送音讯的办法:

同步发送 syncSend 异步发送 asyncSend 程序发送 syncSendOrderlyoneway 发送 sendOneWay
上面的代码展现如何同步发送音讯。
String destination = StringUtils.isBlank(tags) ? topic : topic + “:” + tags;SendResult sendResult =         rocketMQTemplate.syncSend(destination,             MessageBuilder.withPayload(messageContent).            setHeader(MessageConst.PROPERTY_KEYS, uniqueId).            build());if (sendResult != null) {if (sendResult.getSendStatus() == SendStatus.SEND_OK) {// send message success,do something}}
syncSend 办法的第一个参数是发送的指标,格局是:topic + “:” + tags,
第二个参数是:spring-message 标准的 message 对象,而 MessageBuilder 是一个工具类,办法链式调用创立音讯对象。
3 消费者
1、配置文件中配置如下
rocketmq:  name-server: 127.0.0.1:9876  consumer1:    group: platform-sms-worker-common-group    topic: sms-common-topic
2、实现音讯监听器
@Component@RocketMQMessageListener(consumerGroup = “${rocketmq.consumer1.group}”,  // 生产组    topic = “${rocketmq.consumer1.topic}”       // 主题)public class SmsMessageCommonConsumer implements RocketMQListener<String> {public void onMessage(String message) {System.out.println(“ 一般短信:” + message);    }}
消费者实现类也能够实现 RocketMQListener<MessageExt>, 在 onMessage 办法里通过 RocketMQ 原生音讯对象 MessageExt 获取更具体的音讯数据。
public void onMessage(MessageExt message) {try {        String body = new String(message.getBody(), “UTF-8”);        logger.info(“ 一般短信:” + message);    } catch (Exception e) {logger.error(“common onMessage error:”, e);    }}
4 源码概览

最新源码中,咱们能够看到源码中蕴含四个模块:
1、rocketmq-spring-boot-parent
该模块是父模块,定义我的项目所有依赖的 jar 包。
2、rocketmq-spring-boot
外围模块,实现了 starter 的外围逻辑。
3、rocketmq-spring-boot-starter
SDK 模块,简略封装,内部我的项目援用。
4、rocketmq-spring-boot-samples
示例代码模块。这个模块十分重要,当用户应用 SDK 时,能够参考示例疾速开发。
5 starter 实现
咱们重点剖析下 rocketmq-spring-boot 模块的外围源码:

spring-boot-starter 实现须要蕴含如下三个局部:
1、定义 Spring 本身的依赖包和 RocketMQ 的依赖包 ;
2、定义 spring.factories 文件
在 resources 包下创立 META-INF 目录后,新建 spring.factories 文件,并在文件中定义主动加载类,文件内容是:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
spring boot 会依据文件中配置的自动化配置类来主动初始化相干的 Bean、Component 或 Service。
3、实现主动加载类
在 RocketMQAutoConfiguration 类的具体实现中,咱们重点剖析下生产者和消费者是如何别离启动的。
▍生产者发送模板类:RocketMQTemplate
RocketMQAutoConfiguration 类定义了两个默认的 Bean:

首先 SpringBoot 我的项目中配置文件中的配置值会依据属性条件绑定到 RocketMQProperties 对象 中,而后应用 RocketMQ 的原生 API 别离创立生产者 Bean 和拉取消费者 Bean , 别离将两个 bean 设置到 RocketMQTemplate 对象中。
两个重点须要强调:

发送音讯时,将 spring-message 标准下的音讯对象封装成 RocketMQ 音讯对象

默认拉取消费者 litePullConsumer。拉取消费者个别用于大数据批量解决场景。

原生应用形式

​ RocketMQTemplate 类封装了拉取消费者的 receive 办法,以不便开发者应用。

▍自定义消费者类
下图是并发消费者的例子:

消费者示例代码
那么 rocketmq-spring 是如何主动启动消费者呢?

spring 容器首先注册了音讯监听器后置处理器,而后调用 ListenerContainerConfiguration 类的 registerContainer 办法。
比照并发消费者的例子,咱们能够看到:DefaultRocketMQListenerContainer 是对 DefaultMQPushConsumer 生产逻辑的封装。

封装生产音讯的逻辑,同时满足 RocketMQListener 泛化接口反对不同参数,比方 String、MessageExt、自定义对象。
首先 DefaultRocketMQListenerContainer 初始化之后,获取 onMessage 办法的参数类型。

而后消费者调用 consumeMessage 解决音讯时,封装了一个 handleMessage 办法,将原生 RocketMQ 音讯对象 MessageExt 转换成 onMessage 办法定义的参数对象,而后调用 rocketMQListener 的 onMessage 办法。

mnjh9
上图右侧标红的代码也就是该办法的精华:
rocketMQListener.onMessage(doConvertMessage(messageExt));
6 写到最初
开源我的项目 rocketmq-spring 有很多值得学习的中央,咱们能够从如下四个层面逐层进阶:
1、学会如何应用:参考 rocketmq-spring-boot-samples 模块的示例代码,学会如何发送和接管音讯,疾速编码;
2、模块设计:学习我的项目的模块分层(父模块、SDK 模块、外围实现模块、示例代码模块);
3、starter 设计思路:定义主动配置文件 spring.factories、设计配置属性类、在 RocketMQ client 的根底上实现优雅的封装、深刻了解 RocketMQ 源码等;
4、触类旁通:当咱们了解了 rocketmq-spring 的源码,咱们能够尝试模拟该我的项目写一个简略的 spring boot starter。

如果我的文章对你有所帮忙,还请帮忙点赞、在看、转发一下,你的反对会激励我输入更高质量的文章,非常感谢!

正文完
 0