引言

在阿里云的官方网站提供了RocketMq的商用版本,然而集体在我的项目利用上发现和SpirngBoot以及Spring Cloud(Alibaba)等开源的RocketMQ依赖尽管能够失常兼容,然而仍然呈现了注解生效、启动报错,商用和开源版本的不兼容导致局部代码要反复编写的蛋疼问题。

这样的兼容问题不是简略加个SDK依赖,切换到商用配置就能够间接应用的(因为集体起初真就是这么想),为了防止前面再遇到这种奇葩的开发测试用开源RocketMq,生产环境须要应用商用集群的RocketMq的混合配置的业务场景,集体花了小半天工夫熟读阿里云的接入文档,加上各种尝试和测试,总结出一套能够疾速应用的兼容模板计划。

如果不理解阿里云商用RocketMq,能够看最初一个小节的【阿里云商用RocketMq介绍】介绍。集体的兼容计划灵感来自于官网提供的这个DEMO我的项目:springboot/java-springboot-demo。

留神本计划是基于SpringBoot2.XSpring cloud Alibaba 的两个我的项目环境构建我的项目根底,在SpringBoot上只做了生产者的配置,而在Spring Cloud Alibaba的Nacos上进行了生产者和消费者的残缺兼容计划。

最初留神兼容集成的版本为商用RocketMq应用4.x版本,最近新出的5.X 的版本并未进行测试,不保障失常应用。

兼容关键点

  1. 在沿用SpringBoot的YML根底配置根底上实现商用和开源模式的兼容。
  2. 商用RocketMq须要应用官网提供的依赖包,依赖包能够失常兼容SpringBoot等依赖。
  3. 集成之后便于开发和扩大,并且易于其余开发人员了解。
  4. 两种模式之间相互不会产生烦扰。

SpringBoot2.X 兼容

上面先介绍SpringBoot我的项目的兼容。

SpringBoot我的项目兼容

开源版本

首先咱们察看YAML文件,对于开源版本的RocketMq设置,单机版本能够间接配置一个ip和端口即可,如果是集群则用分号隔开多个NameServer的连贯IP地址(NameServ独立部署,外部进行主动同步 )。

#音讯队列rocketmq:  # 自定义属性,作用下文将会进行解释  use-aliyun-rocketSever: false  name-server: 192.168.58.128:9876 # 192.168.244.128:9876;192.168.244.129:9876;  producer:    group: testGroup    # 本地开发不应用商业版,能够不配置    secret-key: NONE    # 本地开发不应用商业版,能够不配置    access-key: NONE    # 商用版本申请超时工夫,开源版本不应用此参数    timeoutMillis: NONE

SpringBoot中集成开源的RocketMq非常简单,只须要一个依赖就能够主动实现相干筹备:

<dependency>    <groupId>org.apache.rocketmq</groupId>    <artifactId>rocketmq-spring-boot</artifactId>    <version>2.2.2</version>    </dependency>

具体的应用通常为封装或者间接应用RocketMqTemplate

@Autowiredprivate RocketMQTemplate mqTemplate;public void sendRocketMqUniqueTextMessage(String topic, String tag, String queueMsg) {        if (StringUtils.isNotEmpty(topic) && StringUtils.isNotEmpty(tag) && StringUtils.isNotEmpty(queueMsg)) {            String queueName = topic + ":" + tag;            //封装音讯,调配惟一id            MessageData messageData = new MessageData();            messageData.setMsgId(IdUtil.randomUUID());            messageData.setMsgContent(queueMsg);            queueMsg = JSON.toJSONString(messageData);            log.info("线程:{},向队列:{},发送音讯:{}", Thread.currentThread()                    .getName(), queueName, queueMsg);            try {                mqTemplate.syncSend(queueName, queueMsg);            } catch (Exception e) {                log.info("向队列:{},发送音讯出现异常:{}", queueName, queueMsg);                //出现异常,保留异样信息到数据库                SaMqMessageFail saMqMessageFail = new SaMqMessageFail();                // 封装失败音讯,调用生效解决Service将失败发送申请入库,或者通过其余办法重试                saMqMessageFailService.insert(saMqMessageFail);            }        }    }

开源版本的SpringBoot集成RocketMq就是如此简略。

商用版本

商用版本RocketMq咱们应用商业版TCP协定SDK(举荐),留神这里用的是4.X版本。商用版本的YAML配置和开源版本显式配置是一样的,然而须要留神参数use-aliyun-rocketSever=true,并且secretKeyaccessKey以及name-server都须要配置为阿里云提供的配置,最初设置音讯发送超时工夫timeoutMillis设置正当工夫(单位为毫秒),便于排查问题和避免线程长期占用:

#音讯队列rocketmq:  use-aliyun-rocketSever: true  # 应用阿里云提供的endpoint  name-server: http://xxxxx.aliyuncs.com:8080  producer:    group: testGroup    secret-key: xxx    access-key: xxxx    # 商用版本申请超时工夫    timeoutMillis: 15000  # 如果须要设置消费者,能够依照同样的形式集成  consumer:      group: testGroup    secret-key: xxx    access-key: xxxx    # 商用版本申请超时工夫 15秒    timeoutMillis: 15000
留神这里仅仅配置了生产者,读者能够按需设置为消费者,设置形式和生产者同理。

设置YAML之后,咱们须要在Maven中引入上面的配置:

<dependency>    <groupId>com.aliyun.openservices</groupId>    <artifactId>ons-client</artifactId>    <!--以下版本号请替换为Java SDK的最新版本号-->    <version>1.8.8.1.Final</version></dependency>                            

最初应该如何应用呢?这里就是商用RocketMq比拟蛋疼的点了,应用RocketMQTemplate是这种状况下是无奈应用商用RocketMq的,咱们须要 手动注入商用的SDK依赖ProducerBean,具体的操作如下:

  1. 构建配置类,这里仿照了官网提供的demo增减配置:
/**阿里云服务配置封装,留神和本地部署的rocketmq配置辨别*/@Data@Configuration@ConfigurationProperties(prefix = "rocketmq")public class AliyunRocketMqConfig {    /**     *鉴权须要的AccessKey ID     */    @Value("${rocketmq.use-aliyun-rocketSever:null}")    private String useAliyunRocketMqServerEnable;    /**     *鉴权须要的AccessKey ID     */    @Value("${rocketmq.producer.access-key:null}")    private String accessKey;    /**     *鉴权须要的AccessKey Secret     */    @Value("${rocketmq.producer.secret-key:null}")    private String secretKey;    /**     * 实例TCP 协定公网接入地址(理论我的项目,填写本人阿里云MQ的公网地址)     */    @Value("${rocketmq.name-server:null}")    private String nameSrvAddr;    /**     * 延时队列group     */    @Value("${rocketmq.producer.group:null}")    private String groupId;    /**     * 音讯发送超时工夫,如果服务端在配置的对应工夫内未ACK,则发送客户端认为该音讯发送失败。     */    @Value("${rocketmq.producer.timeoutMillis:null}")    private String timeoutMillis;    //获取Properties    public Properties getRocketMqProperty() {        Properties properties = new Properties();        properties.setProperty(PropertyKeyConst.GROUP_ID,this.getGroupId());        properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);        properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.timeoutMillis);        return properties;    }}
  1. 构建商用RocketMq初始化类。这里会遇到比拟蛋疼的事件,因为咱们的依赖是商用RocketMq与开源的SpringBoot依赖共存的,尽管咱们能够商用的RocketMq,然而启动的时候会执行到此类进行初始化,返回NULL会导致SpringBoot我的项目无奈失常启动,这里无奈只能应用一个warn日志进行提醒开源版本有可能呈现Bean被笼罩问题,实际上应用下来没有特地大的影响。
写的比拟俊俏,读者有更优雅的解决形式欢送领导。笔者目前只想到了应用这种“不论”的形式保障我的项目不改任何代码的状况失常运行。
/** * 阿里云rocketMq初始化 **/@Configuration@Slf4jpublic class AliyunProducerInit {    @Autowired    private AliyunRocketMqConfig aliyunRocketMqConfig;    @Bean(initMethod = "start", destroyMethod = "shutdown")    public ProducerBean buildProducer() {        if(!Boolean.valueOf(aliyunRocketMqConfig.getUseAliyunRocketMqServerEnable())){            log.warn("非商用版本为了兼容仍然须要注入此Bean,然而只读取无关nameServ和group信息");        }        ProducerBean producer = new ProducerBean();        // ProducerBean中的properties只有被笼罩的配置会应用自定义配置,其余配置会应用SDK的默认配置。        producer.setProperties(aliyunRocketMqConfig.getRocketMqProperty());        return producer;    }}

此外这里必须要吐槽一下商用RocketMq的正文竟然是全中文的!比方com.aliyun.openservices.ons.api.bean.ProducerBean的正文:

这样是坏事还是好事,大家自行领会。。。。。集体第一次看到的时候着实被震惊了。
  1. 做完下面两步之后,咱们就能够实现RocketMqTemplate调用申请,至此实现兼容。

SpringBoot我的项目兼容小结

这里简略小结一下SpringBoot的兼容过程,能够看到整个步骤仅仅是 在商用RocketMq多做了一步bean注入的操作而已,整体应用上非常简略。然而这里只介绍了生产者的集成,那么消费者如何兼容?稍安勿躁,咱们接着看Spring Cloud版本的集成案例。

Spring Cloud Alibaba我的项目兼容

目前国内应用的比拟多的是Spring Cloud Alibaba,留神这些配置都写入到Nacos当中。

开源版本

开源版本的接入形式和SpringBoot是一样的,这里简略回顾:

  1. 开源版本须要设置参数,这里设置了生产者和消费者:
#音讯队列 - 开源版本rocketmq:  use-aliyun-rocketSever: false  name-server: 192.168.0.92:9876  producer:    group: testGroup    secret-key: NONE    access-key: NONE    timeoutMillis: 15000    consumeThreadNums: 20  consumer:    group: testGroup    secret-key: NONE    access-key: NONE    timeoutMillis: 15000    consumeThreadNums: 20
  1. 增加依赖:
<dependency>    <groupId>org.apache.rocketmq</groupId>    <artifactId>rocketmq-spring-boot</artifactId>    <version>2.2.2</version></dependency>
  1. 应用RocketMqTemplate能够进行音讯发送,而消费者则须要应用监听器+注解的形式,疾速注入一个消费者。大体模板如下:
@Slf4j@Component@RocketMQMessageListener(topic = "test_topic", consumerGroup = "testGroup", selectorExpression = "test_tag")public class QueueRemoteRecoListener implements RocketMQListener<MessageExt> {    @Override    public void onMessage(MessageExt message) {        // dosomething    }}

商用版本

这节是本文略微简单一点的局部,咱们依照步骤介绍接入过程:

  1. 在Nacos的配置中退出RocketMq商用所需的配置内容,和开源版本的设置相似:
#音讯队列 - 商用版本rocketmq: # 开源RocketMq和商业版RocketMq切换开关use-aliyun-rocketSever: truename-server: http://xxxx.mq.aliyuncs.com:80producer:# 目前uat借助开发测试应用  group: testGroup  secret-key: xxx  access-key: xxx  timeoutMillis: 15000  consumeThreadNums: 20consumer:  group: testGroup  secret-key: xxx  access-key: xxx  timeoutMillis: 15000  consumeThreadNums: 20
  1. 增加商用RocketMq的TCP接入形式须要的依赖包。
<dependency>    <groupId>com.aliyun.openservices</groupId>    <artifactId>ons-client</artifactId>    <!--以下版本号请替换为Java SDK的最新版本号-->    <version>1.8.8.1.Final</version></dependency>                            
  1. 新增配置类,和SpringBoot的商用版本形式也是相似的:
/** *  * rocketmq 阿里云服务配置封装,留神和本地部署的rocketmq配置辨别 **/@Data@Configuration@ConfigurationProperties(prefix = "rocketmq")public class AliyunCommercialRocketMqConfig {    /**     *鉴权须要的AccessKey ID     */    @Value("${rocketmq.use-aliyun-rocketSever:null}")    private String useAliyunRocketMqServerEnable;    /**     *鉴权须要的AccessKey ID     */    @Value("${rocketmq.consumer.access-key:null}")    private String accessKey;    /**     *     */    @Value("${rocketmq.use-aliyun-rocketsever:null}")    private String useAliyunRocketServer;    /**     *鉴权须要的AccessKey Secret     */    @Value("${rocketmq.consumer.secret-key:null}")    private String secretKey;    /**     * 实例TCP 协定公网接入地址(理论我的项目,填写本人阿里云MQ的公网地址)     */    @Value("${rocketmq.name-server:null}")    private String nameSrvAddr;    /**     * 延时队列group     */    @Value("${rocketmq.consumer.group:null}")    private String groupId;    /**     * 音讯发送超时工夫,如果服务端在配置的对应工夫内未ACK,则发送客户端认为该音讯发送失败。     */    @Value("${rocketmq.consumer.timeoutMillis:null}")    private String timeoutMillis;    /**     * 将消费者线程数固定为20个 20为默认值     */    @Value("${rocketmq.consumer.consumeThreadNums:null}")    private String consumeThreadNums;    public Properties getRocketMqProperty() {        Properties properties = new Properties();        properties.setProperty(PropertyKeyConst.GROUP_ID,this.getGroupId());        properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);        properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.timeoutMillis);        return properties;    }}
  1. 下一步是扩大官网音讯订阅类,为啥要这样做?次要是接入试验中发现官网的音讯订阅类没有 全量参数的结构器,难以对于音讯订阅类动态参数化常量,结构一个订阅音讯应用默认形式还须要static代码块设置。所以这里对于官网的音讯订阅类进行扩大,子类代码是对于父类拷贝了一遍,没有做其余改变:
/**对于指定类进行扩大,更容易的初始化@see com.aliyun.openservices.ons.api.bean.Subscription 被扩大类 **/@Data@ToString@AllArgsConstructor@NoArgsConstructorpublic class AliyunCommercialRocketMqSubscriptionExt extends Subscription {    /**     * 主题     */    private String topic;    /**     * 条件表达式,具体参考rocketmq     */    private String expression;    /**     * TAG or SQL92     * <br>if null, equals to TAG     *     * @see com.aliyun.openservices.ons.api.ExpressionType#TAG     * @see com.aliyun.openservices.ons.api.ExpressionType#SQL92     */    private String type;    @Override    public int hashCode() {        final int prime = 31;        int result = 1;        result = prime * result + ((topic == null) ? 0 : topic.hashCode());        return result;    }    @Override    public boolean equals(Object obj) {        if (this == obj) {            return true;        }        if (obj == null) {            return false;        }        if (getClass() != obj.getClass()) {            return false;        }        AliyunCommercialRocketMqSubscriptionExt other = (AliyunCommercialRocketMqSubscriptionExt) obj;        if (topic == null) {            if (other.topic != null) {                return false;            }        } else if (!topic.equals(other.topic)) {            return false;        }        return true;    }}
  1. 接着咱们构建动态化的音讯订阅类,这个常量类会封装零碎须要应用到的音讯订阅对象,在后续注册监听器须要应用,这里先提前定义。
/** * 队列惯例配置,用于启动时候初始化配置, * 所有配置均须要搁置到此类中对外扩大应用 **/public final class AliyunCommercialRocketMqConstants {        public static final String TEST_TOPIC = "test_topic";    public static final String TEST_TAG = "test_tag";        /**     * 参考案例,仅仅作为开发验证应用     */    public static final AliyunCommercialRocketMqSubscriptionExt QUEUE_TEST = new AliyunCommercialRocketMqSubscriptionExt(RocketMqKey.TEST_TOPIC, RocketMqKey.TEST_TAG, null);}

为了方便管理,咱们能够把这些要害Key在独自放到一个类:

public class RocketMqKey {    public static final String TEST_TOPIC = "test_topic";    public static final String TEST_TAG = "test_tag";   }/** * 队列惯例配置,用于启动时候初始化配置, * 所有配置均须要搁置到此类中对外扩大应用 **/public final class AliyunCommercialRocketMqConstants {    /**     * 参考案例,仅仅作为开发验证应用     */    public static final AliyunCommercialRocketMqSubscriptionExt QUEUE_TEST = new AliyunCommercialRocketMqSubscriptionExt(RocketMqKey.TEST_TOPIC, RocketMqKey.TEST_TAG, null);}
  1. 做好一系列筹备之后,咱们开始商用版本生产者兼容,商用版本的发送者能够像是上面这样封装官网提供的demo,也能够应用注入ProducerBean的形式集成:
/** * 商用aliyun Rocketmq 工具类封装 **/@Component@Slf4jpublic class AliyunCommercialRocketMqSendUtils {    private static final long KEEP_ALIVE_TIME = 60L;    private final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(0, Integer.MAX_VALUE,            KEEP_ALIVE_TIME, TimeUnit.SECONDS,            new SynchronousQueue<>());    @Autowired    private Producer producer;    @Autowired    private SaMqMessageFailService saMqMessageFailService;    @Autowired    private AliyunRocketMqConfig aliyunRocketMqConfig;    /**     * 异步推送,须要调用方手动指定callback          */    public void singleAsyncSend(String topic, String tag, String msgBody, SendCallback sendCallback) {        singleAsyncSend(topic, tag, msgBody, null, sendCallback);    }    /**     * 异步推送,须要调用方手动指定callback     * 如果须要应用callback告诉,能够应用上面的代码进行解决     <pre>         producer.sendAsync(msg, new SendCallback() {        @Override        public void onSuccess(final SendResult sendResult) {            assert sendResult != null;            System.out.println(sendResult);        }        @Override        public void onException(final OnExceptionContext context) {            ONSClientException exception = context.getException();            //                    //出现异常意味着发送失败,为了防止音讯失落,倡议缓存该音讯而后进行重试。            log.error("【RocketMq-Commercial】发送失败,音讯存储到失败记录表,发送 topic => {}, 发送 tag => {}, msgBody => {}, key => {}(如果设置key,能够应用key查找音讯),商用版须要在RocketMq云服务重试,失败起因为: {}"            , topic, tag, msgBody, key, exception.getMessage());            buildMqErrorInfoAndInsertDb(msgBody, exception);        }    });     </pre>     * @param topic 主题     * @return void     * @description     * @param: tag  标签     * @param: msgBody 推送音讯     * @param: key 不便音讯查找的key     * @param: sendCallback 手动回调     */    public void singleAsyncSend(String topic, String tag, String msgBody, String key, SendCallback sendCallback) {        if(Boolean.FALSE.equals(Boolean.valueOf(aliyunRocketMqConfig.getUseAliyunRocketMqServerEnable()))){            throw new UnsupportedOperationException("请设置 UseAliyunRocketServer=true 切换到商用rocketMq模式");        }        if (StringUtils.isAnyBlank(topic, tag, msgBody)) {            throw new IllegalArgumentException("topic, tag and msgBody must be not blank");        }        //对于应用异步接口,倡议设置独自的回调解决线程池,领有更灵便的配置和监控能力。        //如下结构线程的形式申请队列为无界仅用作示例,有OOM的危险。        //更正当的结构形式请参考阿里巴巴Java开发手册: https://github.com/alibaba/p3c        producer.setCallbackExecutor(THREAD_POOL_EXECUTOR);        //循环发送音讯        Message msg = new Message( //                // Message所属的Topic                topic,                // Message Tag 可了解为Gmail中的标签,对音讯进行再归类,不便Consumer指定过滤条件在MQ服务器过滤                tag,                // Message Body 能够是任何二进制模式的数据, MQ不做任何干涉                // 须要Producer与Consumer协商好统一的序列化和反序列化形式                msgBody.getBytes());        // 设置代表音讯的业务要害属性,请尽可能全局惟一        // 以不便您在无奈失常收到音讯状况下,可通过MQ 控制台查问音讯并补发        // 留神:不设置也不会影响音讯失常收发        if (CharSequenceUtil.isNotBlank(key)) {            msg.setKey(key);        }        // 发送音讯,只有不抛异样就是胜利        try {            producer.sendAsync(msg, sendCallback);        } catch (ONSClientException exception) {            //出现异常意味着发送失败,为了防止音讯失落,倡议缓存该音讯而后进行重试。            log.error("【RocketMq-Commercial】发送失败,音讯存储到失败记录表,发送 topic => {}, 发送 tag => {}, msgBody => {}, key => {}(如果设置key,能够应用key查找音讯),商用版须要在RocketMq云服务重试,失败起因为: {}"                    , topic, tag, msgBody, key, exception.getMessage());            buildMqErrorInfoAndInsertDb(msgBody, exception);        }    }    /**     * 阻塞独自推送队列,应用零碎默认的Key生成规定     *     * @param topic 主题     * @return void     * @description 阻塞独自推送队列     * @param: tag 标签     * @param: msgBody msgBody内容     */    public void singleSyncSend(String topic, String tag, String msgBody) {        singleSyncSend(topic, tag, msgBody, null);    }    /**     * 阻塞独自推送队列,应用零碎默认的Key生成规定     *     * @param topic 主题     * @return void     * @description 阻塞独自推送队列     * @param: tag 标签     * @param: msgBody msgBody内容     */    public void singleSyncSend(String topic, String tag, String msgBody, String key) {        if(!Boolean.valueOf(aliyunRocketMqConfig.getUseAliyunRocketMqServerEnable())){            throw new UnsupportedOperationException("请设置 UseAliyunRocketServer=true 切换到商用rocketMq模式");        }        if (StringUtils.isAnyBlank(topic, tag, msgBody)) {            throw new IllegalArgumentException("topic, tag and msgBody must be not blank");        }        Message msg = new Message( //                // Message所属的Topic                topic,                // Message Tag 可了解为Gmail中的标签,对音讯进行再归类,不便Consumer指定过滤条件在MQ服务器过滤                tag,                // Message Body 能够是任何二进制模式的数据, MQ不做任何干涉                // 须要Producer与Consumer协商好统一的序列化和反序列化形式                msgBody.getBytes());        // 设置代表音讯的业务要害属性,请尽可能全局惟一        // 以不便您在无奈失常收到音讯状况下,可通过MQ 控制台查问音讯并补发        // 留神:不设置也不会影响音讯失常收发        if (CharSequenceUtil.isNotBlank(key)) {            msg.setKey(key);        }        // 发送音讯,只有不抛异样就是胜利        try {            SendResult sendResult = producer.send(msg);            log.info("【RocketMq-Commercial】推送胜利,singleSyncSend => {}", JSON.toJSONString(sendResult));        } catch (ONSClientException e) {            log.error("【RocketMq-Commercial】发送失败,音讯存储到失败记录表,发送 topic => {}, 发送 tag => {}, msgBody => {}, key => {}(如果设置key,能够应用key查找音讯),商用版须要在RocketMq云服务重试,失败起因为: {}"                    , topic, tag, msgBody, key, e.getMessage());            buildMqErrorInfoAndInsertDb(msgBody, e);        }    }    /**     * 构建mq错误信息,并且插入到异样推送表     *     * @param msgBody     * @return void     * @description 构建mq错误信息,并且插入到异样推送表     * @param: e     */    private void buildMqErrorInfoAndInsertDb(String msgBody, ONSClientException e) {        //出现异常意味着发送失败,为了防止音讯失落,倡议缓存该音讯而后进行重试。        //出现异常,保留异样信息到数据库        SaMqMessageFail saMqMessageFail = new SaMqMessageFail();        saMqMessageFail.setMqId(IdUtil.randomUUID());        saMqMessageFail.setQueueName("RocketMq-Commercial");        saMqMessageFail.setQueueMessage(msgBody);        // 2 代表失败        saMqMessageFail.setStatus(2);        // 1 代表重试次数        saMqMessageFail.setProcCount(1);        saMqMessageFail.setFailReason(e.toString());        saMqMessageFail.setCreateDate(new Date());        saMqMessageFailService.insert(saMqMessageFail);    }}
注入ProductBean的形式能够从下面提到的SpringBoot集成形式解决。
  1. 当初咱们解决之前遗留的问题,如果是消费者,应该如何更优雅的接管音讯?首先咱们须要明确,商用RocketMq注入是须要依附手动构建监听器,然而咱们下面提到SpringBoot提供了注解+ @Component的形式实现队列监听的消费者。此外为了防止和开源版本抵触,咱们应用之前参数配置自定义的开关,在遇到开源版本的时候咱们返回null(这里返回null Spring会扫描获取SpringBoot的RocketMq依赖,不会呈现报错和无奈启动的问题,和后面的状况略有不同)避免主动注入商用RocketMq的监听器。

上面是商用版本注入ConsumerBean的代码,订阅商用RocketMq的生产监听器:

/** *  阿里云商用Rocketmq 队列接管 *  商用rocketmqConsumer的所有申请会进入一个入口,须要散发到不同的具体业务解决。 **/@Component@Slf4jpublic class AliyunCommercialRocketMqQueueConsumer {    @Autowired    private AliyunCommercialRocketMqConfig aliyunCommercialRocketMqConfig;    // 自定义的监听器    @Autowired    private AliyunCommerciaRocketMqTestListener aliyunCommerciaRocketMqTestListener;    @Bean(initMethod = "start", destroyMethod = "shutdown")    public ConsumerBean buildConsumer() {        // 如果开关为false, 则不能注入此对象,否则商用的API会顶替掉 框架诸如的Bean出现异常        if(!Boolean.valueOf(aliyunCommercialRocketMqConfig.getUseAliyunRocketServer())){            log.warn("非商用版本不注入商用版本监听器");            return null;        }        ConsumerBean consumerBean = new ConsumerBean();        //配置文件        Properties properties = aliyunCommercialRocketMqConfig.getRocketMqProperty();        properties.setProperty(PropertyKeyConst.GROUP_ID, aliyunCommercialRocketMqConfig.getGroupId());        //将消费者线程数固定为20个 20为默认值        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, aliyunCommercialRocketMqConfig.getConsumeThreadNums());        consumerBean.setProperties(properties);        //订阅关系        Map<Subscription, MessageListener> subscriptionTable = new HashMap<>(2);        // 应用了之前定义的测试用的监听器        subscriptionTable.put(AliyunCommercialRocketMqConstants.QUEUE_TEST, aliyunCommerciaRocketMqTestListener);        //订阅多个topic如下面设置        consumerBean.setSubscriptionTable(subscriptionTable);        return consumerBean;    }}

AliyunCommerciaRocketMqTestListener 自定义监听器为了兼容商用和开源版本做了上面的扭转,为了不便了解这里退出了相干注解:

/** * 监听器开发模板 **/@Slf4j@Component// 开源版本能够兼容此注解@RocketMQMessageListener(topic = RocketMqKey.TEST_TOPIC, consumerGroup = RocketMqKey.TEST_GROUP, selectorExpression = RocketMqKey.TEST_TAG)// RocketMQListener<MessageExt> 属于SpringBoot RocketMq的依赖// MessageListener 属于商用RocketMq的SDKpublic class QueueRemoteReconListener implements RocketMQListener<MessageExt>, MessageListener {    // 开源版本的应用    @Override    public void onMessage(MessageExt message) {        // 开源版本的SpringBoot形式,和上文介绍雷同    }    /**      * @description       * @param message 音讯类. 一条音讯由主题, 音讯体以及可选的音讯标签, 自定义从属键值对形成.     * 留神: 咱们对每条音讯的自定义键值对的长度没有限度, 但所有的自定义键值对, 零碎键值对序列化后, 所占空间不能超过32767字节.     * @param: context 每次生产音讯的上下文,供未来扩大应用     * @return com.aliyun.openservices.ons.api.Action      */     @Override    public Action consume(Message message, ConsumeContext context) {        // 商用版本要求实现这个办法,Action如下,默认失败重试16次,返回ReconsumeLater会触发重试机制,所以会存在反复生产的问题        //public enum Action {            /**             * 生产胜利,持续生产下一条音讯             */            //CommitMessage,            /**             * 生产失败,告知服务器稍后再投递这条音讯,持续生产其余音讯             */            //ReconsumeLater,        //}    }}

以上就是消费者的兼容解决,既能够满足开源版本的注解开发要求,也能够不收缩类的状况下沿用扩大。

Spring Cloud Alibaba我的项目兼容小结

从集体的角度来看,在生产者的兼容上比拟容易实现,依照官网的demo构建带商用RocketMq的ProducerBean即可,而消费者的集成则要简单一些,这里为了思考不反复的设置或者写反复代码,把要害的配置动态化,同时为了官网写的毛糙的音讯订阅类做了继承笼罩的操作兼容。此外为了避免开源版本的消费者Bean被商用的监听器笼罩导致生效,应用了简略的开关来进行Bean的注入管制,写的比拟毛糙,读者有更好的写法欢送探讨。

总体来看来说商用版本的RocketMq集成起来稍微麻烦,然而还是能够承受。此外也能够看到SDK的兼容性实际上是比拟简陋的,很多中央的很像然而又齐全是本人从新设计的,感觉就是一个新的团队给老团队做进去的货色做兼容的感觉,不过不就是桥接嘛,我也会,所以最初集体凑合消费者兼容就用来多接口实现的兼容写法了。

最终的集成成果是开源版本敞开注入bean的开关,配置为了关照Properties对于不须要的配置进行占位解决,而商用版本则关上开关,通过自定义注入监听器的形式顶替掉SpringBoot的依赖。

最初的成果是只须要批改RocketMq的配置,启动之后主动连贯到相干的RocketMq。

开源版本:

#音讯队列 - 开源版本rocketmq:  use-aliyun-rocketSever: false  name-server: 192.168.0.92:9876  producer:    group: testGroup    secret-key: NONE    access-key: NONE    timeoutMillis: 15000    consumeThreadNums: 20  consumer:    group: testGroup    secret-key: NONE    access-key: NONE    timeoutMillis: 15000    consumeThreadNums: 20

商用版本

#音讯队列 - 商用版本rocketmq: # 开源RocketMq和商业版RocketMq切换开关  use-aliyun-rocketSever: true  name-server: http://xxxx.mq.aliyuncs.com:80  producer:# 目前uat借助开发测试应用    group: testGroup      secret-key: xxx      access-key: xxx      timeoutMillis: 15000      consumeThreadNums: 20    consumer:      group: testGroup      secret-key: xxx      access-key: xxx      timeoutMillis: 15000      consumeThreadNums: 20

阿里云商用RocketMq简略介绍(4.X版本)

官网介绍:https://help.aliyun.com/product/29530.html

无关RocketMq自身的介绍局部能够浏览:[[【RocketMq】RocketMq 扫盲]],这里挑了商用版本集体认为须要关注的几个点进行介绍。留神这里应用的商用RocketMq版本为4.X的版本。

计费模式

商用RocketMq次要分为上面几个局部:

  • 主系列:标准版、专业版、铂金版
  • 子系列:单节点版、集群版

集体最初是白嫖了公司的商用集群版RocketMq进行试验,计费的形式分为包年包月按量付费,前者实用于流量比拟大并且固定的状况,应用套餐比拟划算,而后者按需付费则实用于RocketMq应用较少(或者不稳固)或者调用量较少的状况,集体学习也比拟举荐按量付费模式,比包年包月划算很多。

如果呈现欠费,在实例停服7天后两种付费模式均会革除所有的RocketMq数据,而日常欠费则会被主动停用,嘛,和电话卡欠费差不多的情理,这里就不过多拓展了。如果要退订商用RocketMq的服务。按量付费能够随时退订,包年包月也会依据天数进行退订。

PS:看完这一整套计费模式下来,发现还是挺良心的。让我没想到的是包年包月竟然能够计算未应用的天数返还,这一点挺不错的,用起来不必放心被套进去。

接入形式

商用RocketMq的接入步骤如下:

官网在疾速入门中提供了创立资源的解释视频:https://help.aliyun.com/document_detail/441914.html。有两个点须要留神,第一个点是RAM用户必须要进行账户受权能力失常应用,第二个点是对外拜访形式设置,音讯队列RocketMQ版反对VPC拜访和公网拜访。资源创立实现之后就能够应用官网提供的SDK收发音讯,在应用之前须要留神上面这些前提:

  • 步骤二:创立资源
  • 装置IDEA

    您能够应用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA Ultimate为例。

  • 装置1.8或以上版本JDK
  • 装置2.5或以上版本Maven

之后便是在我的项目中引入JAVA依赖和复制粘贴模板代码验证。

音讯队列模型

商用RocketMq的根本应用逻辑如下:

依照音讯类型,商用RocketMq提供了上面的音讯类型:

  • 一般音讯
  • 程序音讯
  • 定时/延时音讯
  • 事务音讯

SDK接入参考

SDK接入的主页链接如下:

https://help.aliyun.com/document_detail/69111.html

次要介绍了上面几个点,理论参考第二个举荐的接入形式即可。

  • SDK参考概述
  • 商业版TCP协定SDK(举荐)
  • 商业版HTTP协定SDK(多语言举荐)
  • 社区版TCP协定SDK(仅供开源用户上云应用)
  • 订阅音讯常见问题

对于JAVA应用程序倡议应用TCP协定SDK,集成的形式也比较简单,为了不便了解,也能够通过以下链接获取相干Demo,外面都有具体的正文解释:

  • spring/java-spring-demo
  • springboot/java-springboot-demo

如果是HTTP协定的SDK,则应用上面的依赖包:

<dependency>    <groupId>com.aliyun.mq</groupId>    <artifactId>mq-http-sdk</artifactId>    <!--以下版本号请替换为Java SDK的最新版本号-->    <version>1.0.3.2</version>     <classifier>jar-with-dependencies</classifier></dependency>

如果是TCP协定的接入形式,则应用上面的依赖包:

<dependency>    <groupId>com.aliyun.openservices</groupId>    <artifactId>ons-client</artifactId>    <!--以下版本号请替换为Java SDK的最新版本号-->    <version>1.8.8.1.Final</version></dependency>                            

接入细节阐明

集体在试验的时候发现HTTP协定的版本集成SDK更像是给了一套API工具包,益处是能够不须要依赖Spring框架等独自应用,然而最大的毛病也是这里,很难用于框架当中,如果要实现主动接管音讯并且解决,须要依附手动实现定时工作拉取音讯实现主动生产,这一点也比拟蛋疼。

这里列举 HTTP的SDK的发送和承受代码,首先是生产者的模板代码,这些代码和开源的RocketMq应用相似:

// 省略大量代码。。。// 循环发送4条音讯。            for (int i = 0; i < 4; i++) {                TopicMessage pubMsg;        // 一般音讯。                pubMsg = new TopicMessage(                // 音讯内容。                "hello mq!".getBytes(),                // 音讯标签。                "A"                );                // 设置音讯的自定义属性。                pubMsg.getProperties().put("a", String.valueOf(i));                // 设置音讯的Key。                pubMsg.setMessageKey("MessageKey");                           // 同步发送音讯,只有不抛异样就是胜利。            TopicMessage pubResultMsg = producer.publishMessage(pubMsg);            // 同步发送音讯,只有不抛异样就是胜利。            System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()                        + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());            }        } catch (Throwable e) {            // 音讯发送失败,须要进行重试解决,可从新发送这条音讯或长久化这条数据进行弥补解决。            System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);            e.printStackTrace();        }// 省略大量代码。。。

而在接管方略微麻烦一些,官网的案例是应用 死循环一直查看Broker是否有积压音讯,如果有则通过被动拉取音讯的模式去拉取音讯。

实现多线程期待的成果能够写入到Thread继承类的Run办法中。
 // 在以后线程循环生产音讯,倡议多开个几个线程并发生产音讯。        do {                        // 省略大量代码// 解决业务逻辑。            for (Message message : messages) {                System.out.println("Receive message: " + message);            }            // 音讯重试工夫达到前若不确认音讯生产胜利,则音讯会被反复生产。            // 音讯句柄有工夫戳,同一条音讯每次生产的工夫戳都不一样。            {                List<String> handles = new ArrayList<String>();                for (Message message : messages) {                    handles.add(message.getReceiptHandle());                }                try {                    consumer.ackMessage(handles);                } catch (Throwable e) {                    // 某些音讯的句柄可能超时,会导致音讯生产状态确认不胜利。                    if (e instanceof AckMessageException) {                        AckMessageException errors = (AckMessageException) e;                        System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");                        if (errors.getErrorMessages() != null) {                            for (String errorHandle :errors.getErrorMessages().keySet()) {                                System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()                                        + ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());                            }                        }                        continue;                    }                    e.printStackTrace();                }            }       } while (true);     // 省略大量代码            

写在最初

算是一次集体兼容的笔记。还是要吐槽商用的RocketMq集成真的挺无语的,不过阿里的货色懂的都懂。