关于消息中间件:使用-rocketmqspringbootstarter-来配置发送和消费-RocketMQ-消息

8次阅读

共计 6532 个字符,预计需要花费 17 分钟才能阅读完成。

简介: 本文将 rocktmq-spring-boot 的设计实现做一个简略的介绍,读者能够通过本文理解将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,而后通过一个简略的示例来一步一步的解说如何应用这个 spring-boot-starter 工具包来配置,发送和生产 RocketMQ 音讯。

作者 | 辽天
起源 | 阿里巴巴云原生公众号

导读 :本文将 rocktmq-spring-boot 的设计实现做一个简略的介绍,读者能够通过本文理解将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,而后通过一个简略的示例来一步一步的解说如何应用这个 spring-boot-starter 工具包来配置,发送和生产 RocketMQ 音讯。

在 Spring 生态中玩转 RocketMQ 系列文章:

  • 《如何在 Spring 生态中玩转 RocketMQ?》
  • 《罗美琪和春波特的故事 …》
  • 《RocketMQ-Spring 毕业两周年,为什么能成为 Spring 生态中最受欢迎的 messaging 实现?》

本文配套可交互教程已登录阿里云知口头手实验室,PC 端登录 start.aliyun.com 在浏览器中立刻体验。

通过本文,您将理解到:

  • Spring 的音讯框架介绍
  • rocketmq-spring-boot 具体实现
  • 应用示例

前言

上世纪 90 年代末,随着 Java EE(Enterprise Edition) 的呈现,特地是 Enterprise Java Beans 的应用须要简单的描述符配置和死板简单的代码实现,减少了宽广开发者的学习曲线和开发成本,由此基于简略的 XML 配置和一般 Java 对象(Plain Old Java Objects)的 Spring 技术应运而生,依赖注入(Dependency Injection), 管制反转(Inversion of Control)和面向切面编程(AOP)的技术更加敏捷地解决了传统 Java 企业及版本的有余。

随着 Spring 的继续演进,基于注解(Annotation)的配置逐步取代了 XML 文件配置,2014 年 4 月 1 日,Spring Boot 1.0.0 正式公布,它基于“约定大于配置”(Convention over configuration)这一理念来疾速地开发、测试、运行和部署 Spring 利用,并能通过简略地与各种启动器(如 spring-boot-web-starter)联合,让利用间接以命令行的形式运行,不需再部署到独立容器中。这种简便间接疾速构建和开发利用的过程,能够应用约定的配置并且简化部署,受到越来越多的开发者的欢送。

Apache RocketMQ 是业界出名的分布式音讯和流解决中间件,简略地了解,它由 Broker 服务器和客户端两局部组成:

其中客户端一个是音讯发布者客户端(Producer),它负责向 Broker 服务器发送音讯;另外一个是音讯的消费者客户端(Consumer),多个消费者能够组成一个生产组,来订阅和拉取生产 Broker 服务器上存储的音讯。

为了利用 Spring Boot 的疾速开发和让用户可能更灵便地应用 RocketMQ 音讯客户端,Apache RocketMQ 社区推出了 spring-boot-starter 实现。随着分布式事务音讯性能在 RocketMQ 4.3.0 版本的公布,近期降级了相干的 spring-boot 代码,通过注解形式反对分布式事务的回查和事务音讯的发送。

本文将对以后的设计实现做一个简略的介绍,读者能够通过本文理解将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,而后通过一个简略的示例来一步一步的解说如何应用这个 spring-boot-starter 工具包来配置,发送和生产 RocketMQ 音讯。

Spring 中的音讯框架

顺便在这里讨论一下在 Spring 中对于音讯的两个次要的框架,即 Spring Messaging 和 Spring Cloud Stream。它们都可能与 Spring Boot 整合并提供了一些参考的实现。和所有的实现框架一样,音讯框架的目标是实现轻量级的音讯驱动的微服务,能够无效地简化开发人员对消息中间件的应用复杂度,让零碎开发人员能够有更多的精力关注于外围业务逻辑的解决。

1. Spring Messaging

Spring Messaging 是 Spring Framework 4 中增加的模块,是 Spring 与音讯系统集成的一个扩展性的反对。它实现了从基于 JmsTemplate 的简略的应用 JMS 接口到异步接管音讯的一整套残缺的基础架构,Spring AMQP 提供了该协定所要求的相似的功能集。在与 Spring Boot 的集成后,它领有了主动配置能力,可能在测试和运行时与相应的消息传递零碎进行集成。

单纯对于客户端而言,Spring Messaging 提供了一套形象的 API 或者说是约定的规范,对音讯发送端和音讯接收端的模式进行规定,不同的消息中间件提供商能够在这个模式下提供本人的 Spring 实现:在音讯发送端须要实现的是一个 XXXTemplate 模式的 Java Bean,联合 Spring Boot 的自动化配置选项提供多个不同的发送音讯办法;在音讯的生产端是一个 XXXMessageListener 接口(实现形式通常会应用一个注解来申明一个音讯驱动的 POJO),提供回调办法来监听和生产音讯,这个接口同样能够应用 Spring Boot 的自动化选项和一些定制化的属性。

如果有趣味深刻的理解 Spring Messaging 及针对不同的音讯产品的应用,举荐浏览这个文件。参考 Spring Messaging 的既有实现,RocketMQ 的 spring-boot-starter 中遵循了相干的设计模式并联合 RocketMQ 本身的性能特点提供了相应的 API(如程序、异步和事务半音讯等 )。

2. Spring Cloud Stream

Spring Cloud Stream 联合了 Spring Integration 的注解和性能,它的利用模型如下:


该图片引自 spring cloud stream

Spring Cloud Stream 框架中提供一个独立的利用内核,它通过输出(@Input)和输入(@Output)通道与内部世界进行通信,音讯源端(Source)通过输出通道发送音讯,生产指标端(Sink)通过监听输入通道来获取生产的音讯。这些通道通过专用的 Binder 实现与内部代理连贯。开发人员的代码只须要针对利用内核提供的固定的接口和注解形式进行编程,而不须要关怀运行时具体的 Binder 绑定的消息中间件。在运行时,Spring Cloud Stream 可能主动探测并应用在 classpath 下找到的 Binder。

这样开发人员能够轻松地在雷同的代码中应用不同类型的中间件:仅仅须要在构建时蕴含进不同的 Binder。在更加简单的应用场景中,也能够在利用中打包多个 Binder 并让它本人抉择 Binder,甚至在运行时为不同的通道应用不同的 Binder。

Binder 形象使得 Spring Cloud Stream 利用能够灵便的连贯到中间件,加之 Spring Cloud Stream 应用利用了 Spring Boot 的灵便配置配置能力,这样的配置能够通过内部配置的属性和 Spring Boot 反对的任何模式来提供(包含利用启动参数、环境变量和 application.yml 或者 application.properties 文件),部署人员能够在运行时动静抉择通道连贯 destination(例如,Kafka 的 topic 或者 RabbitMQ 的 exchange)。

Binder SPI 的形式来让消息中间件产品应用可扩大的 API 来编写相应的 Binder,并集成到 Spring Cloud Steam 环境,目前 RocketMQ 还没有提供相干的 Binder,咱们打算在下一步将欠缺这一性能,也心愿社区里有这方面教训的同学踊跃尝试,奉献 PR 或倡议。

spring-boot-starter 的实现

在开始的时候咱们曾经晓得,spring boot starter 结构的启动器对于使用者是十分不便的,使用者只有在 pom.xml 引入 starter 的依赖定义,相应的编译,运行和部署性能就全副主动引入。因而罕用的开源组件都会为 Spring 的用户提供一个 spring-boot-starter 封装给开发者,让开发者十分不便集成和应用,这里咱们具体的介绍一下 RocketMQ(客户端)的 starter 实现过程。

1. spring-boot-starter 的实现步骤

对于一个 spring-boot-starter 实现须要蕴含如下几个局部:

1)在 pom.xml 的定义

  • 定义最终要生成的 starter 组件信息
<groupId>org.apache.rocketmq</groupId>
<artifactId>spring-boot-starter-rocketmq</artifactId>
<version>1.0.0-SNAPSHOT</version>
  • 定义依赖包

它分为两个局部:Spring 本身的依赖包和 RocketMQ 的依赖包。

2)配置文件类

定义利用属性配置文件类 RocketMQProperties,这个 Bean 定义一组默认的属性值。用户在应用最终的 starter 时,能够依据这个类定义的属性来批改取值,当然不是间接批改这个类的配置,而是 spring-boot 利用中对应的配置文件:src/main/resources/application.properties。

3)定义主动加载类

定义 src/resources/META-INF/spring.factories 文件中的主动加载类,其目标是让 spring boot 更具文中中所指定的自动化配置类来主动初始化相干的 Bean、Component 或 Service,它的内容如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.rocketmq.spring.starter.RocketMQAutoConfiguration

在 RocketMQAutoConfiguration 类的具体实现中,定义凋谢给用户间接应用的 Bean 对象包含:

  • RocketMQProperties 加载利用属性配置文件的解决类;
  • RocketMQTemplate 发送端用户发送音讯的发送模板类;
  • ListenerContainerConfiguration 容器 Bean 负责发现和注册生产端生产实现接口类,这个类要求:由 @RocketMQMessageListener 注解标注;实现 RocketMQListener 泛化接口。

4)最初具体地进行 RpcketMQ 相干的封装

在发送端(producer)和生产端(consumer)客户端别离进行封装,在以后的实现版本提供了对 Spring Messaging 接口的兼容形式。

2. 音讯发送端实现

1)一般发送端

发送端的代码封装在 RocketMQTemplate POJO 中,下图是发送端的相干代码的调用关系图:

为了与 Spring Messaging 的发送模板兼容,在 RocketMQTemplate 集成了 AbstractMessageSendingTemplate 抽象类,来反对相干的音讯转换和发送办法,这些办法最终会代理给 doSend() 办法、doSend() 以及 RocoketMQ 所特有的一些办法如异步,单向和程序等办法间接增加到 RoketMQTempalte 中,这些办法间接代理调用到 RocketMQ 的 Producer API 来进行音讯的发送。

2)事务音讯发送端

对于事务音讯的解决,在音讯发送端进行了局部的扩大,参考下面的调用关系类图。

RocketMQTemplate 里退出了一个发送事务音讯的办法 sendMessageInTransaction(),并且最终这个办法会代理到 RocketMQ 的 TransactionProducer 进行调用,在这个 Producer 上会注册其关联的 TransactionListener 实现类,以便在发送音讯后可能对 TransactionListener 里的办法实现进行调用。

3. 音讯生产端实现

在生产端 Spring-Boot 利用启动后,会扫描所有蕴含 @RocketMQMessageListener 注解的类(这些类须要集成 RocketMQListener 接口,并实现 onMessage() 办法),这个 Listener 会一对一的被搁置到。

DefaultRocketMQListenerContainer 容器对象中,容器对象会依据生产的形式(并发或程序),将 RocketMQListener 封装到具体的 RocketMQ 外部的并发或者程序接口实现。在容器中创立 RocketMQ Consumer 对象,启动并监听定制的 Topic 音讯,如果有生产音讯,则回调到 Listener 的 onMessage() 办法。

应用示例

下面的一章介绍了 RocketMQ 在 spring-boot-starter 形式的实现,这里通过一个最简略的音讯发送和生产的例子来介绍如何使这个 rocketmq-spring-boot-starter。

1. RocketMQ 服务端的筹备

1)启动 NameServer 和 Broker

要验证 RocketMQ 的 Spring-Boot 客户端,首先要确保 RocketMQ 服务正确的下载并启动。能够参考 RocketMQ 主站的疾速开始来进行操作。确保启动 NameServer 和 Broker 曾经正确启动。

2)创立实例中所须要的 Topics

在执行启动命令的目录下执行上面的命令行操作:

bash bin/mqadmin updateTopic -c DefaultCluster -t string-topic

2. 编译 rocketmq-spring-boot-starter

目前的 spring-boot-starter 依赖还没有提交的 Maven 的核心库,用户应用前须要自行下载 git 源码,而后执行 mvn clean install 装置到本地仓库。

git clone https://github.com/apache/rocketmq-externals.git
cd rocketmq-spring-boot-starter
mvn clean install

3. 编写客户端代码

用户如果应用它,须要在音讯的公布和生产客户端的 maven 配置文件 pom.xml 中增加如下的依赖:

属性 spring-boot-starter-rocketmq-version 的取值为:1.0.0-SNAPSHOT,这与上一步骤中执行装置到本地仓库的版本统一。

1)音讯发送端的代码

发送端的配置文件 application.properties:

发送端的 Java 代码:

2)音讯生产端代码

生产端的配置文件 application.properties:

生产端的 Java 代码:

这里只是简略的介绍了应用 spring-boot 来编写最根本的音讯发送和接管的代码,如果须要理解更多的调用形式,如: 异步发送,对象音讯体,指定 tag 标签以及指定事务音讯,请参看 github 的阐明文档和具体的代码。咱们后续还会对这些高级性能进行陆续的介绍。

作者简介

辽天 ,阿里巴巴技术专家,Apache RocketMQ 内核控,领有多年分布式系统研发教训,对 Microservice、Messaging 和 Storage 等畛域有深刻理解,目前专一 RocketMQ 内核优化以及 Messaging 生态建设。

在 PC 端登录 start.aliyun.com 知口头手实验室,沉迷式体验在线交互教程。

版权申明: 本文内容由阿里云实名注册用户自发奉献,版权归原作者所有,阿里云开发者社区不领有其著作权,亦不承当相应法律责任。具体规定请查看《阿里云开发者社区用户服务协定》和《阿里云开发者社区知识产权爱护指引》。如果您发现本社区中有涉嫌剽窃的内容,填写侵权投诉表单进行举报,一经查实,本社区将立即删除涉嫌侵权内容。

正文完
 0