共计 5996 个字符,预计需要花费 15 分钟才能阅读完成。
作者 | RocketMQ 官微
起源 | 阿里巴巴云原生公众号
2019 年 1 月,孵化 6 个月的 RocketMQ-Spring 作为 Apache RocketMQ 的子项目正式毕业,公布了第一个 Release 版本 2.0.1。该我的项目是把 RocketMQ 的客户端应用 Spring Boot 的形式进行了封装,能够让用户通过简略的 annotation 和规范的 Spring Messaging API 编写代码来进行音讯的发送和生产。过后 RocketMQ 社区同学请 Spring 社区的同学对 RocketMQ-Spring 代码进行 review,引出一段罗美琪(RocketMQ)和春波特(Spring Boot)的故事。
时隔两年,RocketMQ-Spring 正式公布 2.2.0。在这期间,RocketMQ-Spring 迭代了数个版本,以 RocketMQ-Spring 为根底实现的 Spring Cloud Stream RocketMQ Binder、Spring Cloud Bus RocketMQ 登上了 Spring 的官网,Spring 布道师 baeldung 向国外同学介绍如何应用 RocketMQ-Spring,越来越多国内外的同学开始应用 RocketMQ-Spring 收发音讯,RocketMQ-Spring 仓库的 star 数也在短短两年工夫内超过了 Spring-Kafka 和 Spring-AMQP(注:两者均由 Spring 社区保护),成为 Apache RocketMQ 最受欢迎的生态我的项目之一。
RocketMQ-Spring 的受欢迎一方面得益于反对丰盛业务场景的 RocketMQ 与微服务生态 Spring 的完满符合,另一方面也与 RocketMQ-Spring 自身严格遵循 Spring Messaging API 标准,反对丰盛的音讯类型分不开。
遵循 Spring Messaging API 标准
Spring Messaging 提供了一套形象的 API,对音讯发送端和音讯接收端的模式进行规定,不同的消息中间件提供商能够在这个模式下提供本人的 Spring 实现:在音讯发送端须要实现的是一个 XXXTemplate 模式的 Java Bean,联合 Spring Boot 的自动化配置选项提供多个不同的发送音讯办法;在音讯的生产端是一个 XXXMessageListener 接口(实现形式通常会应用一个注解来申明一个音讯驱动的 POJO),提供回调办法来监听和生产音讯,这个接口同样能够应用 Spring Boot 的自动化选项和一些定制化的属性。
1. 发送端
RocketMQ-Spring 在遵循 Spring Messaging API 标准的根底上联合 RocketMQ 本身的性能特点提供了相应的 API。在音讯的发送端,RocketMQ-Spring 通过实现 RocketMQTemplate 实现音讯的发送。如下图所示,RocketMQTemplate 继承 AbstractMessageSendingTemplate 抽象类,来反对 Spring Messaging API 规范的音讯转换和发送办法,这些办法最终会代理给 doSend 办法,doSend 办法会最终调用 syncSend,由 DefaultMQProducer 实现。
除 Spring Messaging API 标准中的办法,RocketMQTemplate 还实现了 RocketMQ 原生客户端的一些办法,来反对更加丰盛的音讯类型。值得注意的是,相比于原生客户端须要本人去构建 RocketMQ Message(比方将对象序列化成 byte 数组放入 Message 对象),RocketMQTemplate 能够间接将对象、字符串或者 byte 数组作为参数发送进来(对象序列化操作由 RocketMQ-Spring 内置实现),在生产端约定好对应的 Schema 即可失常收发。
RocketMQTemplate Send API:SendResult syncSend(String destination, Object payload)
SendResult syncSend(String destination, Message<?> message)void asyncSend(String destination, Message<?> message, SendCallback sendCallback)
void asyncSend(String destination, Message<?> message, SendCallback sendCallback)
……
2. 生产端
在生产端,须要实现一个蕴含 @RocketMQMessageListener 注解的类(须要实现 RocketMQListener 接口,并实现 onMessage 办法,在注解中进行 topic、consumerGroup 等属性配置),这个 Listener 会一对一的被搁置到 DefaultRocketMQListenerContainer 容器对象中,容器对象会依据生产的形式(并发或程序),将 RocketMQListener 封装到具体的 RocketMQ 外部的并发或者程序接口实现。在容器中创立 RocketMQ DefaultPushConsumer 对象,启动并监听定制的 Topic 音讯,实现约定 Schema 对象的转换,回调到 Listener 的 onMessage 办法。
@Service
@RocketMQMessageListener(topic = "${demo.rocketmq.topic}", consumerGroup = "string_consumer", selectorExpression = "${demo.rocketmq.tag}")
public class StringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {System.out.printf("------- StringConsumer received: %s \n", message);
}
}
除此 Push 接口之外,在最新的 2.2.0 版本中,RocketMQ-Spring 实现了 RocketMQ Lite Pull Consumer。通过在配置文件中进行 consumer 的配置,利用 RocketMQTemplate 的 Recevie 办法即可被动 Pull 音讯。
配置文件 resource/application.properties:rocketmq.name-server=localhost:9876
rocketmq.consumer.group=my-group1
rocketmq.consumer.topic=test
Pull Consumer 代码:while(!isStop) {List<String> messages = rocketMQTemplate.receive(String.class);
System.out.println(messages);
}
丰盛的音讯类型
RocketMQ Spring 音讯类型反对方面与 RocketMQ 原生客户端齐全对齐,包含同步 / 异步 /one-way、程序、提早、批量、事务以及 Request-Reply 音讯。在这里,次要介绍较为非凡的事务音讯和 request-reply 音讯。
1. 事务音讯
RocketMQ 的事务音讯不同于 Spring Messaging 中的事务音讯,仍然采纳 RocketMQ 原生事务音讯的计划。如下所示,发送事务音讯时须要实现一个蕴含 @RocketMQTransactionListener 注解的类,并实现 executeLocalTransaction 和 checkLocalTransaction 办法,从而来实现执行本地事务以及查看本地事务执行后果。
// Build a SpringMessage for sending in transaction
Message msg = MessageBuilder.withPayload(..)...;
// In sendMessageInTransaction(), the first parameter transaction name ("test")
// must be same with the @RocketMQTransactionListener's member field'transName'rocketMQTemplate.sendMessageInTransaction("test-topic", msg, null);
// Define transaction listener with the annotation @RocketMQTransactionListener
@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// ... local transaction process, return bollback, commit or unknown
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// ... check transaction status and return bollback, commit or unknown
return RocketMQLocalTransactionState.COMMIT;
}
}
在 2.1.0 版本中,RocketMQ-Spring 重构了事务音讯的实现,如下图所示,旧版本中每一个 group 对应一个 TransactionProducer,而在新版本中改为每一个 RocketMQTemplate 对应一个 TransationProducer,从而解决了并发应用多个事务音讯的问题。当用户须要在单过程应用多个事务音讯时,能够应用 ExtRocketMQTemplate 来实现(个别状况下,举荐一个过程应用一个 RocketMQTemplate,ExtRocketMQTemplate 能够应用在同过程中须要应用多个 Producer / LitePullConsumer 的场景,能够为 ExtRocketMQTemplate 指定与规范模版 RocketMQTemplate 不同的 nameserver、group 等配置),并在对应的 RocketMQTransactionListener 注解中指定 rocketMQTemplateBeanName 为 ExtRocketMQTemplate 的 BeanName。
2. Request-Reply 音讯
在 2.1.0 版本中,RocketMQ-Spring 开始反对 Request-Reply 音讯。Request-Reply 音讯指的是上游服务投递音讯后进入期待被告诉的状态,直到生产端返回后果并返回给发送端。在 RocketMQ-Spring 中,发送端通过 RocketMQTemplate 的 sendAndReceivce 办法进行发送,如下所示,次要有同步和异步两种形式。异步形式中通过实现 RocketMQLocalRequestCallback 进行回调。
// 同步发送 request 并且期待 String 类型的返回值
String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class);
// 异步发送 request 并且期待 User 类型的返回值
rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback<User>() {@Override public void onSuccess(User message) {……}
@Override public void onException(Throwable e) {……}
});
在生产端,依然须要实现一个蕴含 @RocketMQMessageListener 注解的类,但须要实现的接口是 RocketMQReplyListener<T, R> 接口(一般音讯为 RocketMQListener<T> 接口),其中 T 示意接管值的类型,R 示意返回值的类型,接口须要实现带返回值的 onMessage 办法,返回值的内容返回给对应的 Producer。
@Service
@RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer")
public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
@Override
public String onMessage(String message) {
……
return "reply string";
}
}
RocketMQ-Spring 遵循 Spring 约定大于配置(Convention over configuration)的理念,通过启动器(Spring Boot Starter)的形式,在 pom 文件引入依赖(groupId:org.apache.rocketmq,artifactId:rocketmq-spring-boot-starter)便能够在 Spring Boot 中集成所有 RocketMQ 客户端的所有性能,通过简略的注解应用即可实现音讯的收发。在 RocketMQ-Spring Github Wiki 中有更加具体的用法和常见问题解答。
据统计,从 RocketMQ-Spring 公布第一个正式版本以来,RocketMQ-Spring 实现 16 个 bug 修复,37 个 imporvement,其中包含事务音讯重构,音讯过滤、音讯序列化、多实例 RocketMQTemplate 优化等重要优化,欢送更多的小伙伴能参加到 RocketMQ 社区的建设中来,罗美琪(RocketMQ)和春波特(Spring Boot)的故事还在持续 … 钉钉搜寻群号:21982288,即可进群和泛滥开发者交换!