共计 22062 个字符,预计需要花费 56 分钟才能阅读完成。
引言
在阿里云的官方网站提供了 RocketMq 的商用版本,然而集体在我的项目利用上发现和 SpirngBoot 以及 Spring Cloud(Alibaba)等开源的 RocketMQ 依赖尽管能够失常兼容,然而仍然呈现了注解生效、启动报错,商用和开源版本的不兼容导致局部代码要反复编写的蛋疼问题。
这样的兼容问题不是简略加个 SDK 依赖,切换到商用配置就能够间接应用的(因为集体起初真就是这么想),为了防止前面再遇到这种 奇葩 的开发测试用开源 RocketMq,生产环境须要应用商用集群的 RocketMq 的混合配置的业务场景,集体花了小半天工夫熟读阿里云的接入文档,加上各种尝试和测试,总结出一套能够疾速应用的兼容模板计划。
如果不理解阿里云商用 RocketMq,能够看最初一个小节的【阿里云商用 RocketMq 介绍】介绍。集体的兼容计划灵感来自于官网提供的这个 DEMO 我的项目:springboot/java-springboot-demo。
留神本计划是基于 SpringBoot2.X 和 Spring cloud Alibaba 的两个我的项目环境构建我的项目根底,在 SpringBoot 上只做了生产者的配置,而在 Spring Cloud Alibaba 的 Nacos 上进行了生产者和消费者的残缺兼容计划。
最初留神兼容集成的版本为商用 RocketMq 应用 4.x 版本,最近新出的 5.X 的版本并 未进行测试,不保障失常应用。
兼容关键点
- 在沿用 SpringBoot 的 YML 根底配置根底上实现商用和开源模式的兼容。
- 商用 RocketMq 须要应用官网提供的依赖包,依赖包能够失常兼容 SpringBoot 等依赖。
- 集成之后便于开发和扩大,并且易于其余开发人员了解。
- 两种模式之间相互不会产生烦扰。
SpringBoot2.X 兼容
上面先介绍 SpringBoot 我的项目 的兼容。
SpringBoot 我的项目兼容
开源版本
首先咱们察看 YAML 文件,对于 开源版本的 RocketMq设置,单机版本能够间接配置一个 ip 和端口即可,如果是集群则用分号隔开多个 NameServer 的连贯 IP 地址(NameServ 独立部署,外部进行主动同步)。
# 音讯队列
rocketmq:
# 自定义属性,作用下文将会进行解释
use-aliyun-rocketSever: false
name-server: 192.168.58.128:9876 # 192.168.244.128:9876;192.168.244.129:9876;
producer:
group: testGroup
# 本地开发不应用商业版,能够不配置
secret-key: NONE
# 本地开发不应用商业版,能够不配置
access-key: NONE
# 商用版本申请超时工夫,开源版本不应用此参数
timeoutMillis: NONE
SpringBoot 中集成开源的 RocketMq 非常简单,只须要一个依赖就能够主动实现相干筹备:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot</artifactId>
<version>2.2.2</version>
</dependency>
具体的应用通常为封装或者间接应用RocketMqTemplate
:
@Autowired
private RocketMQTemplate mqTemplate;
public void sendRocketMqUniqueTextMessage(String topic, String tag, String queueMsg) {if (StringUtils.isNotEmpty(topic) && StringUtils.isNotEmpty(tag) && StringUtils.isNotEmpty(queueMsg)) {
String queueName = topic + ":" + tag;
// 封装音讯,调配惟一 id
MessageData messageData = new MessageData();
messageData.setMsgId(IdUtil.randomUUID());
messageData.setMsgContent(queueMsg);
queueMsg = JSON.toJSONString(messageData);
log.info("线程:{},向队列:{},发送音讯:{}", Thread.currentThread()
.getName(), queueName, queueMsg);
try {mqTemplate.syncSend(queueName, queueMsg);
} catch (Exception e) {log.info("向队列:{},发送音讯出现异常:{}", queueName, queueMsg);
// 出现异常,保留异样信息到数据库
SaMqMessageFail saMqMessageFail = new SaMqMessageFail();
// 封装失败音讯,调用生效解决 Service 将失败发送申请入库,或者通过其余办法重试
saMqMessageFailService.insert(saMqMessageFail);
}
}
}
开源版本的 SpringBoot 集成 RocketMq 就是如此简略。
商用版本
商用版本 RocketMq 咱们应用商业版 TCP 协定 SDK(举荐),留神这里用的是 4.X 版本。商用版本的 YAML 配置和开源版本 显式配置 是一样的,然而须要留神参数 use-aliyun-rocketSever=true
,并且secretKey
和accessKey
以及 name-server
都须要配置为阿里云提供的配置,最初设置音讯发送超时工夫 timeoutMillis
设置正当工夫(单位为毫秒),便于排查问题和避免线程长期占用:
# 音讯队列
rocketmq:
use-aliyun-rocketSever: true
# 应用阿里云提供的 endpoint
name-server: http://xxxxx.aliyuncs.com:8080
producer:
group: testGroup
secret-key: xxx
access-key: xxxx
# 商用版本申请超时工夫
timeoutMillis: 15000
# 如果须要设置消费者,能够依照同样的形式集成
consumer:
group: testGroup
secret-key: xxx
access-key: xxxx
# 商用版本申请超时工夫 15 秒
timeoutMillis: 15000
留神这里仅仅配置了生产者,读者能够按需设置为消费者,设置形式和生产者同理。
设置 YAML 之后,咱们须要在 Maven 中引入上面的配置:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<!-- 以下版本号请替换为 Java SDK 的最新版本号 -->
<version>1.8.8.1.Final</version>
</dependency>
最初应该如何应用呢?这里就是商用 RocketMq 比拟蛋疼的点了,应用 RocketMQTemplate
是这种状况下是无奈应用商用 RocketMq 的,咱们须要 手动注入商用的 SDK 依赖 ProducerBean,具体的操作如下:
- 构建 配置类,这里仿照了官网提供的 demo 增减配置:
/**
阿里云服务配置封装,留神和本地部署的 rocketmq 配置辨别
*/
@Data
@Configuration
@ConfigurationProperties(prefix = "rocketmq")
public class AliyunRocketMqConfig {
/**
* 鉴权须要的 AccessKey ID
*/
@Value("${rocketmq.use-aliyun-rocketSever:null}")
private String useAliyunRocketMqServerEnable;
/**
* 鉴权须要的 AccessKey ID
*/
@Value("${rocketmq.producer.access-key:null}")
private String accessKey;
/**
* 鉴权须要的 AccessKey Secret
*/
@Value("${rocketmq.producer.secret-key:null}")
private String secretKey;
/**
* 实例 TCP 协定公网接入地址(理论我的项目,填写本人阿里云 MQ 的公网地址)*/
@Value("${rocketmq.name-server:null}")
private String nameSrvAddr;
/**
* 延时队列 group
*/
@Value("${rocketmq.producer.group:null}")
private String groupId;
/**
* 音讯发送超时工夫,如果服务端在配置的对应工夫内未 ACK,则发送客户端认为该音讯发送失败。*/
@Value("${rocketmq.producer.timeoutMillis:null}")
private String timeoutMillis;
// 获取 Properties
public Properties getRocketMqProperty() {Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.GROUP_ID,this.getGroupId());
properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.timeoutMillis);
return properties;
}
}
- 构建商用 RocketMq 初始化类。这里会遇到 比拟蛋疼的事件 ,因为咱们的依赖是商用 RocketMq 与开源的 SpringBoot 依赖共存的,尽管咱们能够商用的 RocketMq,然而启动的时候会执行到此类进行初始化, 返回 NULL 会导致 SpringBoot 我的项目无奈失常启动,这里无奈只能应用一个 warn 日志进行提醒开源版本有可能呈现 Bean 被笼罩问题,实际上应用下来没有特地大的影响。
写的比拟俊俏,读者有更优雅的解决形式欢送领导。笔者目前只想到了应用这种“不论”的形式保障我的项目不改任何代码的状况失常运行。
/**
* 阿里云 rocketMq 初始化
**/
@Configuration
@Slf4j
public class AliyunProducerInit {
@Autowired
private AliyunRocketMqConfig aliyunRocketMqConfig;
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ProducerBean buildProducer() {if(!Boolean.valueOf(aliyunRocketMqConfig.getUseAliyunRocketMqServerEnable())){log.warn("非商用版本为了兼容仍然须要注入此 Bean,然而只读取无关 nameServ 和 group 信息");
}
ProducerBean producer = new ProducerBean();
// ProducerBean 中的 properties 只有被笼罩的配置会应用自定义配置,其余配置会应用 SDK 的默认配置。producer.setProperties(aliyunRocketMqConfig.getRocketMqProperty());
return producer;
}
}
此外这里必须要吐槽一下商用 RocketMq 的正文竟然是 全中文 的!比方 com.aliyun.openservices.ons.api.bean.ProducerBean
的正文:
这样是坏事还是好事,大家自行领会。。。。。集体第一次看到的时候着实被震惊了。
- 做完下面两步之后,咱们就能够实现
RocketMqTemplate
调用申请,至此实现兼容。
SpringBoot 我的项目兼容小结
这里简略小结一下 SpringBoot 的兼容过程,能够看到整个步骤仅仅是 在商用 RocketMq 多做了一步 bean 注入的操作而已,整体应用上非常简略。然而这里只介绍了生产者的集成,那么消费者如何兼容?稍安勿躁,咱们接着看 Spring Cloud 版本的集成案例。
Spring Cloud Alibaba 我的项目兼容
目前国内应用的比拟多的是Spring Cloud Alibaba,留神这些配置都写入到 Nacos 当中。
开源版本
开源版本的接入形式和 SpringBoot 是一样的,这里简略回顾:
- 开源版本须要设置参数,这里设置了生产者和消费者:
# 音讯队列 - 开源版本
rocketmq:
use-aliyun-rocketSever: false
name-server: 192.168.0.92:9876
producer:
group: testGroup
secret-key: NONE
access-key: NONE
timeoutMillis: 15000
consumeThreadNums: 20
consumer:
group: testGroup
secret-key: NONE
access-key: NONE
timeoutMillis: 15000
consumeThreadNums: 20
- 增加依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot</artifactId>
<version>2.2.2</version>
</dependency>
- 应用
RocketMqTemplate
能够进行音讯发送,而消费者则须要应用监听器 + 注解的形式,疾速注入一个消费者。大体模板如下:
@Slf4j
@Component
@RocketMQMessageListener(topic = "test_topic", consumerGroup = "testGroup", selectorExpression = "test_tag")
public class QueueRemoteRecoListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {// dosomething}
}
商用版本
这节是本文略微简单一点的局部,咱们依照步骤介绍接入过程:
- 在 Nacos 的配置中退出 RocketMq 商用所需的配置内容,和开源版本的设置相似:
# 音讯队列 - 商用版本
rocketmq:
# 开源 RocketMq 和商业版 RocketMq 切换开关
use-aliyun-rocketSever: true
name-server: http://xxxx.mq.aliyuncs.com:80
producer:
# 目前 uat 借助开发测试应用
group: testGroup
secret-key: xxx
access-key: xxx
timeoutMillis: 15000
consumeThreadNums: 20
consumer:
group: testGroup
secret-key: xxx
access-key: xxx
timeoutMillis: 15000
consumeThreadNums: 20
- 增加商用 RocketMq 的 TCP 接入形式须要的依赖包。
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<!-- 以下版本号请替换为 Java SDK 的最新版本号 -->
<version>1.8.8.1.Final</version>
</dependency>
- 新增 配置类,和 SpringBoot 的商用版本形式也是相似的:
/**
*
* rocketmq 阿里云服务配置封装,留神和本地部署的 rocketmq 配置辨别
**/
@Data
@Configuration
@ConfigurationProperties(prefix = "rocketmq")
public class AliyunCommercialRocketMqConfig {
/**
* 鉴权须要的 AccessKey ID
*/
@Value("${rocketmq.use-aliyun-rocketSever:null}")
private String useAliyunRocketMqServerEnable;
/**
* 鉴权须要的 AccessKey ID
*/
@Value("${rocketmq.consumer.access-key:null}")
private String accessKey;
/**
*
*/
@Value("${rocketmq.use-aliyun-rocketsever:null}")
private String useAliyunRocketServer;
/**
* 鉴权须要的 AccessKey Secret
*/
@Value("${rocketmq.consumer.secret-key:null}")
private String secretKey;
/**
* 实例 TCP 协定公网接入地址(理论我的项目,填写本人阿里云 MQ 的公网地址)*/
@Value("${rocketmq.name-server:null}")
private String nameSrvAddr;
/**
* 延时队列 group
*/
@Value("${rocketmq.consumer.group:null}")
private String groupId;
/**
* 音讯发送超时工夫,如果服务端在配置的对应工夫内未 ACK,则发送客户端认为该音讯发送失败。*/
@Value("${rocketmq.consumer.timeoutMillis:null}")
private String timeoutMillis;
/**
* 将消费者线程数固定为 20 个 20 为默认值
*/
@Value("${rocketmq.consumer.consumeThreadNums:null}")
private String consumeThreadNums;
public Properties getRocketMqProperty() {Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.GROUP_ID,this.getGroupId());
properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.timeoutMillis);
return properties;
}
}
- 下一步是扩大官网 音讯订阅类 ,为啥要这样做?次要是接入试验中发现官网的音讯订阅类没有 全量参数 的结构器,难以对于音讯订阅类 动态参数化 常量,结构一个订阅音讯应用默认形式还须要 static 代码块设置。所以这里对于官网的音讯订阅类进行扩大,子类代码是对于父类拷贝了一遍,没有做其余改变:
/**
对于指定类进行扩大,更容易的初始化
@see com.aliyun.openservices.ons.api.bean.Subscription 被扩大类
**/
@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class AliyunCommercialRocketMqSubscriptionExt extends Subscription {
/**
* 主题
*/
private String topic;
/**
* 条件表达式,具体参考 rocketmq
*/
private String expression;
/**
* TAG or SQL92
* <br>if null, equals to TAG
*
* @see com.aliyun.openservices.ons.api.ExpressionType#TAG
* @see com.aliyun.openservices.ons.api.ExpressionType#SQL92
*/
private String type;
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((topic == null) ? 0 : topic.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {if (this == obj) {return true;}
if (obj == null) {return false;}
if (getClass() != obj.getClass()) {return false;}
AliyunCommercialRocketMqSubscriptionExt other = (AliyunCommercialRocketMqSubscriptionExt) obj;
if (topic == null) {if (other.topic != null) {return false;}
} else if (!topic.equals(other.topic)) {return false;}
return true;
}
}
- 接着咱们构建动态化的音讯订阅类,这个常量类会封装零碎须要应用到的音讯订阅对象,在后续注册监听器须要应用,这里先提前定义。
/**
* 队列惯例配置,用于启动时候初始化配置,* 所有配置均须要搁置到此类中对外扩大应用
**/
public final class AliyunCommercialRocketMqConstants {
public static final String TEST_TOPIC = "test_topic";
public static final String TEST_TAG = "test_tag";
/**
* 参考案例,仅仅作为开发验证应用
*/
public static final AliyunCommercialRocketMqSubscriptionExt QUEUE_TEST = new AliyunCommercialRocketMqSubscriptionExt(RocketMqKey.TEST_TOPIC, RocketMqKey.TEST_TAG, null);
}
为了方便管理,咱们能够把这些要害 Key 在独自放到一个类:
public class RocketMqKey {
public static final String TEST_TOPIC = "test_topic";
public static final String TEST_TAG = "test_tag";
}
/**
* 队列惯例配置,用于启动时候初始化配置,* 所有配置均须要搁置到此类中对外扩大应用
**/
public final class AliyunCommercialRocketMqConstants {
/**
* 参考案例,仅仅作为开发验证应用
*/
public static final AliyunCommercialRocketMqSubscriptionExt QUEUE_TEST = new AliyunCommercialRocketMqSubscriptionExt(RocketMqKey.TEST_TOPIC, RocketMqKey.TEST_TAG, null);
}
- 做好一系列筹备之后,咱们开始 商用版本生产者兼容 ,商用版本的发送者能够像是上面这样封装官网提供的 demo,也能够应用 注入 ProducerBean的形式集成:
/**
* 商用 aliyun Rocketmq 工具类封装
**/
@Component
@Slf4j
public class AliyunCommercialRocketMqSendUtils {
private static final long KEEP_ALIVE_TIME = 60L;
private final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new SynchronousQueue<>());
@Autowired
private Producer producer;
@Autowired
private SaMqMessageFailService saMqMessageFailService;
@Autowired
private AliyunRocketMqConfig aliyunRocketMqConfig;
/**
* 异步推送,须要调用方手动指定 callback
*/
public void singleAsyncSend(String topic, String tag, String msgBody, SendCallback sendCallback) {singleAsyncSend(topic, tag, msgBody, null, sendCallback);
}
/**
* 异步推送,须要调用方手动指定 callback
* 如果须要应用 callback 告诉,能够应用上面的代码进行解决
<pre>
producer.sendAsync(msg, new SendCallback() {
@Override
public void onSuccess(final SendResult sendResult) {
assert sendResult != null;
System.out.println(sendResult);
}
@Override
public void onException(final OnExceptionContext context) {ONSClientException exception = context.getException();
// // 出现异常意味着发送失败,为了防止音讯失落,倡议缓存该音讯而后进行重试。log.error("【RocketMq-Commercial】发送失败,音讯存储到失败记录表,发送 topic => {}, 发送 tag => {},msgBody => {},key => {}(如果设置 key,能够应用 key 查找音讯), 商用版须要在 RocketMq 云服务重试,失败起因为: {}"
, topic, tag, msgBody, key, exception.getMessage());
buildMqErrorInfoAndInsertDb(msgBody, exception);
}
});
</pre>
* @param topic 主题
* @return void
* @description
* @param: tag 标签
* @param: msgBody 推送音讯
* @param: key 不便音讯查找的 key
* @param: sendCallback 手动回调
*/
public void singleAsyncSend(String topic, String tag, String msgBody, String key, SendCallback sendCallback) {if(Boolean.FALSE.equals(Boolean.valueOf(aliyunRocketMqConfig.getUseAliyunRocketMqServerEnable()))){throw new UnsupportedOperationException("请设置 UseAliyunRocketServer=true 切换到商用 rocketMq 模式");
}
if (StringUtils.isAnyBlank(topic, tag, msgBody)) {throw new IllegalArgumentException("topic, tag and msgBody must be not blank");
}
// 对于应用异步接口,倡议设置独自的回调解决线程池,领有更灵便的配置和监控能力。// 如下结构线程的形式申请队列为无界仅用作示例,有 OOM 的危险。// 更正当的结构形式请参考阿里巴巴 Java 开发手册:https://github.com/alibaba/p3c
producer.setCallbackExecutor(THREAD_POOL_EXECUTOR);
// 循环发送音讯
Message msg = new Message( //
// Message 所属的 Topic
topic,
// Message Tag 可了解为 Gmail 中的标签,对音讯进行再归类,不便 Consumer 指定过滤条件在 MQ 服务器过滤
tag,
// Message Body 能够是任何二进制模式的数据,MQ 不做任何干涉
// 须要 Producer 与 Consumer 协商好统一的序列化和反序列化形式
msgBody.getBytes());
// 设置代表音讯的业务要害属性,请尽可能全局惟一
// 以不便您在无奈失常收到音讯状况下,可通过 MQ 控制台查问音讯并补发
// 留神:不设置也不会影响音讯失常收发
if (CharSequenceUtil.isNotBlank(key)) {msg.setKey(key);
}
// 发送音讯,只有不抛异样就是胜利
try {producer.sendAsync(msg, sendCallback);
} catch (ONSClientException exception) {
// 出现异常意味着发送失败,为了防止音讯失落,倡议缓存该音讯而后进行重试。log.error("【RocketMq-Commercial】发送失败,音讯存储到失败记录表,发送 topic => {}, 发送 tag => {},msgBody => {},key => {}(如果设置 key,能够应用 key 查找音讯), 商用版须要在 RocketMq 云服务重试,失败起因为: {}"
, topic, tag, msgBody, key, exception.getMessage());
buildMqErrorInfoAndInsertDb(msgBody, exception);
}
}
/**
* 阻塞独自推送队列,应用零碎默认的 Key 生成规定
*
* @param topic 主题
* @return void
* @description 阻塞独自推送队列
* @param: tag 标签
* @param: msgBody msgBody 内容
*/
public void singleSyncSend(String topic, String tag, String msgBody) {singleSyncSend(topic, tag, msgBody, null);
}
/**
* 阻塞独自推送队列,应用零碎默认的 Key 生成规定
*
* @param topic 主题
* @return void
* @description 阻塞独自推送队列
* @param: tag 标签
* @param: msgBody msgBody 内容
*/
public void singleSyncSend(String topic, String tag, String msgBody, String key) {if(!Boolean.valueOf(aliyunRocketMqConfig.getUseAliyunRocketMqServerEnable())){throw new UnsupportedOperationException("请设置 UseAliyunRocketServer=true 切换到商用 rocketMq 模式");
}
if (StringUtils.isAnyBlank(topic, tag, msgBody)) {throw new IllegalArgumentException("topic, tag and msgBody must be not blank");
}
Message msg = new Message( //
// Message 所属的 Topic
topic,
// Message Tag 可了解为 Gmail 中的标签,对音讯进行再归类,不便 Consumer 指定过滤条件在 MQ 服务器过滤
tag,
// Message Body 能够是任何二进制模式的数据,MQ 不做任何干涉
// 须要 Producer 与 Consumer 协商好统一的序列化和反序列化形式
msgBody.getBytes());
// 设置代表音讯的业务要害属性,请尽可能全局惟一
// 以不便您在无奈失常收到音讯状况下,可通过 MQ 控制台查问音讯并补发
// 留神:不设置也不会影响音讯失常收发
if (CharSequenceUtil.isNotBlank(key)) {msg.setKey(key);
}
// 发送音讯,只有不抛异样就是胜利
try {SendResult sendResult = producer.send(msg);
log.info("【RocketMq-Commercial】推送胜利,singleSyncSend => {}", JSON.toJSONString(sendResult));
} catch (ONSClientException e) {log.error("【RocketMq-Commercial】发送失败,音讯存储到失败记录表,发送 topic => {}, 发送 tag => {},msgBody => {},key => {}(如果设置 key,能够应用 key 查找音讯), 商用版须要在 RocketMq 云服务重试,失败起因为: {}"
, topic, tag, msgBody, key, e.getMessage());
buildMqErrorInfoAndInsertDb(msgBody, e);
}
}
/**
* 构建 mq 错误信息,并且插入到异样推送表
*
* @param msgBody
* @return void
* @description 构建 mq 错误信息,并且插入到异样推送表
* @param: e
*/
private void buildMqErrorInfoAndInsertDb(String msgBody, ONSClientException e) {
// 出现异常意味着发送失败,为了防止音讯失落,倡议缓存该音讯而后进行重试。// 出现异常,保留异样信息到数据库
SaMqMessageFail saMqMessageFail = new SaMqMessageFail();
saMqMessageFail.setMqId(IdUtil.randomUUID());
saMqMessageFail.setQueueName("RocketMq-Commercial");
saMqMessageFail.setQueueMessage(msgBody);
// 2 代表失败
saMqMessageFail.setStatus(2);
// 1 代表重试次数
saMqMessageFail.setProcCount(1);
saMqMessageFail.setFailReason(e.toString());
saMqMessageFail.setCreateDate(new Date());
saMqMessageFailService.insert(saMqMessageFail);
}
}
注入 ProductBean 的形式能够从下面提到的 SpringBoot 集成形式解决。
- 当初咱们解决之前遗留的问题,如果是消费者,应该如何更优雅的接管音讯?首先咱们须要明确,商用 RocketMq 注入是须要依附手动构建 监听器 ,然而咱们下面提到 SpringBoot 提供了
注解 + @Component
的形式实现队列监听的消费者。此外为了防止和开源版本抵触,咱们应用之前参数配置自定义的开关,在遇到开源版本的时候咱们 返回 null(这里返回 null Spring 会扫描获取 SpringBoot 的 RocketMq 依赖,不会呈现报错和无奈启动的问题,和后面的状况略有不同)避免主动注入商用 RocketMq 的监听器。
上面是商用版本注入 ConsumerBean 的代码,订阅商用 RocketMq 的生产监听器:
/**
* 阿里云商用 Rocketmq 队列接管
* 商用 rocketmqConsumer 的所有申请会进入一个入口,须要散发到不同的具体业务解决。**/
@Component
@Slf4j
public class AliyunCommercialRocketMqQueueConsumer {
@Autowired
private AliyunCommercialRocketMqConfig aliyunCommercialRocketMqConfig;
// 自定义的监听器
@Autowired
private AliyunCommerciaRocketMqTestListener aliyunCommerciaRocketMqTestListener;
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ConsumerBean buildConsumer() {
// 如果开关为 false, 则不能注入此对象,否则商用的 API 会顶替掉 框架诸如的 Bean 出现异常
if(!Boolean.valueOf(aliyunCommercialRocketMqConfig.getUseAliyunRocketServer())){log.warn("非商用版本不注入商用版本监听器");
return null;
}
ConsumerBean consumerBean = new ConsumerBean();
// 配置文件
Properties properties = aliyunCommercialRocketMqConfig.getRocketMqProperty();
properties.setProperty(PropertyKeyConst.GROUP_ID, aliyunCommercialRocketMqConfig.getGroupId());
// 将消费者线程数固定为 20 个 20 为默认值
properties.setProperty(PropertyKeyConst.ConsumeThreadNums, aliyunCommercialRocketMqConfig.getConsumeThreadNums());
consumerBean.setProperties(properties);
// 订阅关系
Map<Subscription, MessageListener> subscriptionTable = new HashMap<>(2);
// 应用了之前定义的测试用的监听器
subscriptionTable.put(AliyunCommercialRocketMqConstants.QUEUE_TEST, aliyunCommerciaRocketMqTestListener);
// 订阅多个 topic 如下面设置
consumerBean.setSubscriptionTable(subscriptionTable);
return consumerBean;
}
}
AliyunCommerciaRocketMqTestListener
自定义监听器为了兼容商用和开源版本做了上面的扭转,为了不便了解这里退出了相干注解:
/**
* 监听器开发模板
**/
@Slf4j
@Component
// 开源版本能够兼容此注解
@RocketMQMessageListener(topic = RocketMqKey.TEST_TOPIC, consumerGroup = RocketMqKey.TEST_GROUP, selectorExpression = RocketMqKey.TEST_TAG)
// RocketMQListener<MessageExt> 属于 SpringBoot RocketMq 的依赖
// MessageListener 属于商用 RocketMq 的 SDK
public class QueueRemoteReconListener implements RocketMQListener<MessageExt>, MessageListener {
// 开源版本的应用
@Override
public void onMessage(MessageExt message) {// 开源版本的 SpringBoot 形式,和上文介绍雷同}
/**
* @description
* @param message 音讯类. 一条音讯由主题, 音讯体以及可选的音讯标签, 自定义从属键值对形成.
* 留神: 咱们对每条音讯的自定义键值对的长度没有限度, 但所有的自定义键值对, 零碎键值对序列化后, 所占空间不能超过 32767 字节.
* @param: context 每次生产音讯的上下文,供未来扩大应用
* @return com.aliyun.openservices.ons.api.Action
*/
@Override
public Action consume(Message message, ConsumeContext context) {
// 商用版本要求实现这个办法,Action 如下,默认失败重试 16 次,返回 ReconsumeLater 会触发重试机制,所以会存在反复生产的问题
//public enum Action {
/**
* 生产胜利,持续生产下一条音讯
*/
//CommitMessage,
/**
* 生产失败,告知服务器稍后再投递这条音讯,持续生产其余音讯
*/
//ReconsumeLater,
//}
}
}
以上就是消费者的兼容解决,既能够满足开源版本的注解开发要求,也能够不收缩类的状况下沿用扩大。
Spring Cloud Alibaba 我的项目兼容小结
从集体的角度来看,在生产者的兼容上比拟容易实现,依照官网的 demo 构建带商用 RocketMq 的 ProducerBean 即可,而消费者的集成则要简单一些,这里为了思考不反复的设置或者写反复代码,把要害的配置动态化,同时为了官网写的毛糙的音讯订阅类做了继承笼罩的操作兼容。此外为了避免开源版本的消费者 Bean 被商用的监听器笼罩导致生效,应用了简略的开关来进行 Bean 的注入管制,写的比拟毛糙,读者有更好的写法欢送探讨。
总体来看来说商用版本的 RocketMq 集成起来稍微麻烦,然而还是能够承受。此外也能够看到 SDK 的兼容性实际上是比拟简陋的,很多中央的很像然而又齐全是本人从新设计的,感觉就是一个新的团队给老团队做进去的货色做兼容的感觉,不过不就是桥接嘛,我也会,所以最初集体凑合消费者兼容就用来多接口实现的兼容写法了。
最终的集成成果是开源版本敞开注入 bean 的开关,配置为了关照 Properties 对于不须要的配置进行占位解决,而商用版本则关上开关,通过自定义注入监听器的形式顶替掉 SpringBoot 的依赖。
最初的成果是只须要批改 RocketMq 的配置,启动之后主动连贯到相干的 RocketMq。
开源版本:
# 音讯队列 - 开源版本
rocketmq:
use-aliyun-rocketSever: false
name-server: 192.168.0.92:9876
producer:
group: testGroup
secret-key: NONE
access-key: NONE
timeoutMillis: 15000
consumeThreadNums: 20
consumer:
group: testGroup
secret-key: NONE
access-key: NONE
timeoutMillis: 15000
consumeThreadNums: 20
商用版本
# 音讯队列 - 商用版本
rocketmq:
# 开源 RocketMq 和商业版 RocketMq 切换开关
use-aliyun-rocketSever: true
name-server: http://xxxx.mq.aliyuncs.com:80
producer:
# 目前 uat 借助开发测试应用
group: testGroup
secret-key: xxx
access-key: xxx
timeoutMillis: 15000
consumeThreadNums: 20
consumer:
group: testGroup
secret-key: xxx
access-key: xxx
timeoutMillis: 15000
consumeThreadNums: 20
阿里云商用 RocketMq 简略介绍(4.X 版本)
官网介绍:https://help.aliyun.com/product/29530.html
无关 RocketMq 自身的介绍局部能够浏览:[[【RocketMq】RocketMq 扫盲]],这里挑了商用版本集体认为须要关注的几个点进行介绍。留神这里应用的商用 RocketMq 版本为 4.X 的版本。
计费模式
商用 RocketMq 次要分为上面几个局部:
- 主系列:标准版、专业版、铂金版
- 子系列:单节点版、集群版
集体最初是白嫖了公司的商用集群版 RocketMq 进行试验,计费的形式分为 包年包月 和按量付费,前者实用于流量比拟大并且固定的状况,应用套餐比拟划算,而后者按需付费则实用于 RocketMq 应用较少(或者不稳固)或者调用量较少的状况,集体学习也比拟举荐按量付费模式,比包年包月划算很多。
如果呈现欠费,在实例停服 7 天后两种付费模式均会革除所有的 RocketMq 数据,而日常欠费则会被主动停用,嘛,和电话卡欠费差不多的情理,这里就不过多拓展了。如果要退订商用 RocketMq 的服务。按量付费能够随时退订,包年包月也会依据天数进行退订。
PS:看完这一整套计费模式下来,发现还是挺良心的。让我没想到的是包年包月竟然能够计算未应用的天数返还,这一点挺不错的,用起来不必放心被套进去。
接入形式
商用 RocketMq 的接入步骤如下:
官网在疾速入门中提供了创立资源的解释视频:https://help.aliyun.com/document_detail/441914.html。有两个点须要留神,第一个点是 RAM 用户必须要进行账户受权能力失常应用,第二个点是对外拜访形式设置,音讯队列 RocketMQ 版反对 VPC 拜访和公网拜访。资源创立实现之后就能够应用官网提供的 SDK 收发音讯,在应用之前须要留神上面这些前提:
- 步骤二:创立资源
-
装置 IDEA
您能够应用 IntelliJ IDEA 或者 Eclipse,本文以 IntelliJ IDEA Ultimate 为例。
- 装置 1.8 或以上版本 JDK
- 装置 2.5 或以上版本 Maven
之后便是在我的项目中引入 JAVA 依赖和复制粘贴模板代码验证。
音讯队列模型
商用 RocketMq 的根本应用逻辑如下:
依照音讯类型,商用 RocketMq 提供了上面的音讯类型:
- 一般音讯
- 程序音讯
- 定时 / 延时音讯
- 事务音讯
SDK 接入参考
SDK 接入的主页链接如下:
https://help.aliyun.com/document_detail/69111.html
次要介绍了上面几个点,理论参考第二个举荐的接入形式即可。
- SDK 参考概述
- 商业版 TCP 协定 SDK(举荐)
- 商业版 HTTP 协定 SDK(多语言举荐)
- 社区版 TCP 协定 SDK(仅供开源用户上云应用)
- 订阅音讯常见问题
对于 JAVA 应用程序倡议应用 TCP 协定 SDK,集成的形式也比较简单,为了不便了解,也能够通过以下链接获取相干 Demo,外面都有具体的正文解释:
- spring/java-spring-demo
- springboot/java-springboot-demo
如果是 HTTP 协定的 SDK,则应用上面的依赖包:
<dependency>
<groupId>com.aliyun.mq</groupId>
<artifactId>mq-http-sdk</artifactId>
<!-- 以下版本号请替换为 Java SDK 的最新版本号 -->
<version>1.0.3.2</version>
<classifier>jar-with-dependencies</classifier>
</dependency>
如果是 TCP 协定的接入形式,则应用上面的依赖包:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<!-- 以下版本号请替换为 Java SDK 的最新版本号 -->
<version>1.8.8.1.Final</version>
</dependency>
接入细节阐明
集体在试验的时候发现 HTTP 协定的版本集成 SDK 更像是给了一套 API 工具包,益处是能够不须要依赖 Spring 框架等独自应用,然而最大的毛病也是这里,很难用于框架当中,如果要实现主动接管音讯并且解决,须要依附 手动实现定时工作 拉取音讯实现主动生产,这一点也比拟蛋疼。
这里列举 HTTP 的 SDK的发送和承受代码,首先是生产者的模板代码,这些代码和开源的 RocketMq 应用相似:
// 省略大量代码。。。// 循环发送 4 条音讯。for (int i = 0; i < 4; i++) {
TopicMessage pubMsg; // 一般音讯。pubMsg = new TopicMessage(
// 音讯内容。"hello mq!".getBytes(),
// 音讯标签。"A"
);
// 设置音讯的自定义属性。pubMsg.getProperties().put("a", String.valueOf(i));
// 设置音讯的 Key。pubMsg.setMessageKey("MessageKey");
// 同步发送音讯,只有不抛异样就是胜利。TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
// 同步发送音讯,只有不抛异样就是胜利。System.out.println(new Date() + "Send mq message success. Topic is:" + topic + ", msgId is:" + pubResultMsg.getMessageId()
+ ", bodyMD5 is:" + pubResultMsg.getMessageBodyMD5());
}
} catch (Throwable e) {
// 音讯发送失败,须要进行重试解决,可从新发送这条音讯或长久化这条数据进行弥补解决。System.out.println(new Date() + "Send mq message failed. Topic is:" + topic);
e.printStackTrace();}
// 省略大量代码。。。
而在接管方略微麻烦一些,官网的案例是应用 死循环 一直查看 Broker 是否有积压音讯,如果有则通过 被动拉取音讯 的模式去拉取音讯。
实现多线程期待的成果能够写入到 Thread 继承类的 Run 办法中。
// 在以后线程循环生产音讯,倡议多开个几个线程并发生产音讯。do {
// 省略大量代码
// 解决业务逻辑。for (Message message : messages) {System.out.println("Receive message:" + message);
}
// 音讯重试工夫达到前若不确认音讯生产胜利,则音讯会被反复生产。// 音讯句柄有工夫戳,同一条音讯每次生产的工夫戳都不一样。{List<String> handles = new ArrayList<String>();
for (Message message : messages) {handles.add(message.getReceiptHandle());
}
try {consumer.ackMessage(handles);
} catch (Throwable e) {
// 某些音讯的句柄可能超时,会导致音讯生产状态确认不胜利。if (e instanceof AckMessageException) {AckMessageException errors = (AckMessageException) e;
System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");
if (errors.getErrorMessages() != null) {for (String errorHandle :errors.getErrorMessages().keySet()) {System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
+ ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
}
}
continue;
}
e.printStackTrace();}
}
} while (true);
// 省略大量代码
写在最初
算是一次集体兼容的笔记。还是要吐槽商用的 RocketMq 集成真的挺无语的,不过阿里的货色懂的都懂。