关于rocketmq:RocketMq商用RocketMq和开源RocketMq的兼容问题解决方案

14次阅读

共计 22062 个字符,预计需要花费 56 分钟才能阅读完成。

引言

在阿里云的官方网站提供了 RocketMq 的商用版本,然而集体在我的项目利用上发现和 SpirngBoot 以及 Spring Cloud(Alibaba)等开源的 RocketMQ 依赖尽管能够失常兼容,然而仍然呈现了注解生效、启动报错,商用和开源版本的不兼容导致局部代码要反复编写的蛋疼问题。

这样的兼容问题不是简略加个 SDK 依赖,切换到商用配置就能够间接应用的(因为集体起初真就是这么想),为了防止前面再遇到这种 奇葩 的开发测试用开源 RocketMq,生产环境须要应用商用集群的 RocketMq 的混合配置的业务场景,集体花了小半天工夫熟读阿里云的接入文档,加上各种尝试和测试,总结出一套能够疾速应用的兼容模板计划。

如果不理解阿里云商用 RocketMq,能够看最初一个小节的【阿里云商用 RocketMq 介绍】介绍。集体的兼容计划灵感来自于官网提供的这个 DEMO 我的项目:springboot/java-springboot-demo。

留神本计划是基于 SpringBoot2.XSpring cloud Alibaba 的两个我的项目环境构建我的项目根底,在 SpringBoot 上只做了生产者的配置,而在 Spring Cloud Alibaba 的 Nacos 上进行了生产者和消费者的残缺兼容计划。

最初留神兼容集成的版本为商用 RocketMq 应用 4.x 版本,最近新出的 5.X 的版本并 未进行测试,不保障失常应用。

兼容关键点

  1. 在沿用 SpringBoot 的 YML 根底配置根底上实现商用和开源模式的兼容。
  2. 商用 RocketMq 须要应用官网提供的依赖包,依赖包能够失常兼容 SpringBoot 等依赖。
  3. 集成之后便于开发和扩大,并且易于其余开发人员了解。
  4. 两种模式之间相互不会产生烦扰。

SpringBoot2.X 兼容

上面先介绍 SpringBoot 我的项目 的兼容。

SpringBoot 我的项目兼容

开源版本

首先咱们察看 YAML 文件,对于 开源版本的 RocketMq设置,单机版本能够间接配置一个 ip 和端口即可,如果是集群则用分号隔开多个 NameServer 的连贯 IP 地址(NameServ 独立部署,外部进行主动同步)。

# 音讯队列
rocketmq:
  # 自定义属性,作用下文将会进行解释
  use-aliyun-rocketSever: false
  name-server: 192.168.58.128:9876 # 192.168.244.128:9876;192.168.244.129:9876;
  producer:
    group: testGroup
    # 本地开发不应用商业版,能够不配置
    secret-key: NONE
    # 本地开发不应用商业版,能够不配置
    access-key: NONE
    # 商用版本申请超时工夫,开源版本不应用此参数
    timeoutMillis: NONE

SpringBoot 中集成开源的 RocketMq 非常简单,只须要一个依赖就能够主动实现相干筹备:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot</artifactId>
    <version>2.2.2</version>
    </dependency>

具体的应用通常为封装或者间接应用RocketMqTemplate


@Autowired
private RocketMQTemplate mqTemplate;

public void sendRocketMqUniqueTextMessage(String topic, String tag, String queueMsg) {if (StringUtils.isNotEmpty(topic) && StringUtils.isNotEmpty(tag) && StringUtils.isNotEmpty(queueMsg)) {
            String queueName = topic + ":" + tag;
            // 封装音讯,调配惟一 id
            MessageData messageData = new MessageData();
            messageData.setMsgId(IdUtil.randomUUID());
            messageData.setMsgContent(queueMsg);
            queueMsg = JSON.toJSONString(messageData);
            log.info("线程:{},向队列:{},发送音讯:{}", Thread.currentThread()
                    .getName(), queueName, queueMsg);
            try {mqTemplate.syncSend(queueName, queueMsg);
            } catch (Exception e) {log.info("向队列:{},发送音讯出现异常:{}", queueName, queueMsg);
                // 出现异常,保留异样信息到数据库
                SaMqMessageFail saMqMessageFail = new SaMqMessageFail();
                // 封装失败音讯,调用生效解决 Service 将失败发送申请入库,或者通过其余办法重试
                saMqMessageFailService.insert(saMqMessageFail);
            }
        }
    }

开源版本的 SpringBoot 集成 RocketMq 就是如此简略。

商用版本

商用版本 RocketMq 咱们应用商业版 TCP 协定 SDK(举荐),留神这里用的是 4.X 版本。商用版本的 YAML 配置和开源版本 显式配置 是一样的,然而须要留神参数 use-aliyun-rocketSever=true,并且secretKeyaccessKey以及 name-server 都须要配置为阿里云提供的配置,最初设置音讯发送超时工夫 timeoutMillis 设置正当工夫(单位为毫秒),便于排查问题和避免线程长期占用:

# 音讯队列
rocketmq:
  use-aliyun-rocketSever: true
  # 应用阿里云提供的 endpoint
  name-server: http://xxxxx.aliyuncs.com:8080
  producer:
    group: testGroup
    secret-key: xxx
    access-key: xxxx
    # 商用版本申请超时工夫
    timeoutMillis: 15000
  # 如果须要设置消费者,能够依照同样的形式集成
  consumer:
      group: testGroup
    secret-key: xxx
    access-key: xxxx
    # 商用版本申请超时工夫 15 秒
    timeoutMillis: 15000

留神这里仅仅配置了生产者,读者能够按需设置为消费者,设置形式和生产者同理。

设置 YAML 之后,咱们须要在 Maven 中引入上面的配置:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client</artifactId>
    <!-- 以下版本号请替换为 Java SDK 的最新版本号 -->
    <version>1.8.8.1.Final</version>
</dependency>                            

最初应该如何应用呢?这里就是商用 RocketMq 比拟蛋疼的点了,应用 RocketMQTemplate 是这种状况下是无奈应用商用 RocketMq 的,咱们须要 手动注入商用的 SDK 依赖 ProducerBean,具体的操作如下:

  1. 构建 配置类,这里仿照了官网提供的 demo 增减配置:
/**
阿里云服务配置封装,留神和本地部署的 rocketmq 配置辨别
*/
@Data
@Configuration
@ConfigurationProperties(prefix = "rocketmq")
public class AliyunRocketMqConfig {

    /**
     * 鉴权须要的 AccessKey ID
     */
    @Value("${rocketmq.use-aliyun-rocketSever:null}")
    private String useAliyunRocketMqServerEnable;
    /**
     * 鉴权须要的 AccessKey ID
     */
    @Value("${rocketmq.producer.access-key:null}")
    private String accessKey;

    /**
     * 鉴权须要的 AccessKey Secret
     */
    @Value("${rocketmq.producer.secret-key:null}")
    private String secretKey;

    /**
     * 实例 TCP 协定公网接入地址(理论我的项目,填写本人阿里云 MQ 的公网地址)*/
    @Value("${rocketmq.name-server:null}")
    private String nameSrvAddr;

    /**
     * 延时队列 group
     */
    @Value("${rocketmq.producer.group:null}")
    private String groupId;

    /**
     * 音讯发送超时工夫,如果服务端在配置的对应工夫内未 ACK,则发送客户端认为该音讯发送失败。*/
    @Value("${rocketmq.producer.timeoutMillis:null}")
    private String timeoutMillis;


    // 获取 Properties
    public Properties getRocketMqProperty() {Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.GROUP_ID,this.getGroupId());
        properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
        properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.timeoutMillis);
        return properties;
    }

}
  1. 构建商用 RocketMq 初始化类。这里会遇到 比拟蛋疼的事件 ,因为咱们的依赖是商用 RocketMq 与开源的 SpringBoot 依赖共存的,尽管咱们能够商用的 RocketMq,然而启动的时候会执行到此类进行初始化, 返回 NULL 会导致 SpringBoot 我的项目无奈失常启动,这里无奈只能应用一个 warn 日志进行提醒开源版本有可能呈现 Bean 被笼罩问题,实际上应用下来没有特地大的影响。

写的比拟俊俏,读者有更优雅的解决形式欢送领导。笔者目前只想到了应用这种“不论”的形式保障我的项目不改任何代码的状况失常运行。

/**
 * 阿里云 rocketMq 初始化
 **/
@Configuration
@Slf4j
public class AliyunProducerInit {

    @Autowired
    private AliyunRocketMqConfig aliyunRocketMqConfig;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ProducerBean buildProducer() {if(!Boolean.valueOf(aliyunRocketMqConfig.getUseAliyunRocketMqServerEnable())){log.warn("非商用版本为了兼容仍然须要注入此 Bean,然而只读取无关 nameServ 和 group 信息");
        }
        ProducerBean producer = new ProducerBean();
        // ProducerBean 中的 properties 只有被笼罩的配置会应用自定义配置,其余配置会应用 SDK 的默认配置。producer.setProperties(aliyunRocketMqConfig.getRocketMqProperty());
        return producer;
    }
}

此外这里必须要吐槽一下商用 RocketMq 的正文竟然是 全中文 的!比方 com.aliyun.openservices.ons.api.bean.ProducerBean 的正文:

这样是坏事还是好事,大家自行领会。。。。。集体第一次看到的时候着实被震惊了。

  1. 做完下面两步之后,咱们就能够实现 RocketMqTemplate 调用申请,至此实现兼容。

SpringBoot 我的项目兼容小结

这里简略小结一下 SpringBoot 的兼容过程,能够看到整个步骤仅仅是 在商用 RocketMq 多做了一步 bean 注入的操作而已,整体应用上非常简略。然而这里只介绍了生产者的集成,那么消费者如何兼容?稍安勿躁,咱们接着看 Spring Cloud 版本的集成案例。

Spring Cloud Alibaba 我的项目兼容

目前国内应用的比拟多的是Spring Cloud Alibaba,留神这些配置都写入到 Nacos 当中。

开源版本

开源版本的接入形式和 SpringBoot 是一样的,这里简略回顾:

  1. 开源版本须要设置参数,这里设置了生产者和消费者:
# 音讯队列 - 开源版本
rocketmq:
  use-aliyun-rocketSever: false
  name-server: 192.168.0.92:9876
  producer:
    group: testGroup
    secret-key: NONE
    access-key: NONE
    timeoutMillis: 15000
    consumeThreadNums: 20
  consumer:
    group: testGroup
    secret-key: NONE
    access-key: NONE
    timeoutMillis: 15000
    consumeThreadNums: 20
  1. 增加依赖:
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot</artifactId>
    <version>2.2.2</version>
</dependency>
  1. 应用 RocketMqTemplate 能够进行音讯发送,而消费者则须要应用监听器 + 注解的形式,疾速注入一个消费者。大体模板如下:
@Slf4j
@Component
@RocketMQMessageListener(topic = "test_topic", consumerGroup = "testGroup", selectorExpression = "test_tag")
public class QueueRemoteRecoListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {// dosomething}

}

商用版本

这节是本文略微简单一点的局部,咱们依照步骤介绍接入过程:

  1. 在 Nacos 的配置中退出 RocketMq 商用所需的配置内容,和开源版本的设置相似:
# 音讯队列 - 商用版本
rocketmq:
 # 开源 RocketMq 和商业版 RocketMq 切换开关
use-aliyun-rocketSever: true
name-server: http://xxxx.mq.aliyuncs.com:80
producer:
# 目前 uat 借助开发测试应用
  group: testGroup
  secret-key: xxx
  access-key: xxx
  timeoutMillis: 15000
  consumeThreadNums: 20
consumer:
  group: testGroup
  secret-key: xxx
  access-key: xxx
  timeoutMillis: 15000
  consumeThreadNums: 20
  1. 增加商用 RocketMq 的 TCP 接入形式须要的依赖包。
<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client</artifactId>
    <!-- 以下版本号请替换为 Java SDK 的最新版本号 -->
    <version>1.8.8.1.Final</version>
</dependency>                            
  1. 新增 配置类,和 SpringBoot 的商用版本形式也是相似的:
/**
 * 
 * rocketmq 阿里云服务配置封装,留神和本地部署的 rocketmq 配置辨别
 **/
@Data
@Configuration
@ConfigurationProperties(prefix = "rocketmq")
public class AliyunCommercialRocketMqConfig {

    /**
     * 鉴权须要的 AccessKey ID
     */
    @Value("${rocketmq.use-aliyun-rocketSever:null}")
    private String useAliyunRocketMqServerEnable;
    /**
     * 鉴权须要的 AccessKey ID
     */
    @Value("${rocketmq.consumer.access-key:null}")
    private String accessKey;

    /**
     *
     */
    @Value("${rocketmq.use-aliyun-rocketsever:null}")
    private String useAliyunRocketServer;

    /**
     * 鉴权须要的 AccessKey Secret
     */
    @Value("${rocketmq.consumer.secret-key:null}")
    private String secretKey;

    /**
     * 实例 TCP 协定公网接入地址(理论我的项目,填写本人阿里云 MQ 的公网地址)*/
    @Value("${rocketmq.name-server:null}")
    private String nameSrvAddr;

    /**
     * 延时队列 group
     */
    @Value("${rocketmq.consumer.group:null}")
    private String groupId;

    /**
     * 音讯发送超时工夫,如果服务端在配置的对应工夫内未 ACK,则发送客户端认为该音讯发送失败。*/
    @Value("${rocketmq.consumer.timeoutMillis:null}")
    private String timeoutMillis;

    /**
     * 将消费者线程数固定为 20 个 20 为默认值
     */
    @Value("${rocketmq.consumer.consumeThreadNums:null}")
    private String consumeThreadNums;



    public Properties getRocketMqProperty() {Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.GROUP_ID,this.getGroupId());
        properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
        properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.timeoutMillis);
        return properties;
    }

}
  1. 下一步是扩大官网 音讯订阅类 ,为啥要这样做?次要是接入试验中发现官网的音讯订阅类没有 全量参数 的结构器,难以对于音讯订阅类 动态参数化 常量,结构一个订阅音讯应用默认形式还须要 static 代码块设置。所以这里对于官网的音讯订阅类进行扩大,子类代码是对于父类拷贝了一遍,没有做其余改变:
/**
对于指定类进行扩大,更容易的初始化
@see com.aliyun.openservices.ons.api.bean.Subscription 被扩大类
 **/
@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class AliyunCommercialRocketMqSubscriptionExt extends Subscription {
    /**
     * 主题
     */
    private String topic;
    /**
     * 条件表达式,具体参考 rocketmq
     */
    private String expression;

    /**
     * TAG or SQL92
     * <br>if null, equals to TAG
     *
     * @see com.aliyun.openservices.ons.api.ExpressionType#TAG
     * @see com.aliyun.openservices.ons.api.ExpressionType#SQL92
     */
    private String type;


    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((topic == null) ? 0 : topic.hashCode());
        return result;
    }

    @Override
    public boolean equals(Object obj) {if (this == obj) {return true;}
        if (obj == null) {return false;}
        if (getClass() != obj.getClass()) {return false;}
        AliyunCommercialRocketMqSubscriptionExt other = (AliyunCommercialRocketMqSubscriptionExt) obj;
        if (topic == null) {if (other.topic != null) {return false;}
        } else if (!topic.equals(other.topic)) {return false;}
        return true;
    }

}
  1. 接着咱们构建动态化的音讯订阅类,这个常量类会封装零碎须要应用到的音讯订阅对象,在后续注册监听器须要应用,这里先提前定义。
/**
 * 队列惯例配置,用于启动时候初始化配置,* 所有配置均须要搁置到此类中对外扩大应用
 **/
public final class AliyunCommercialRocketMqConstants {
    
    public static final String TEST_TOPIC = "test_topic";
    public static final String TEST_TAG = "test_tag";
    
    /**
     * 参考案例,仅仅作为开发验证应用
     */
    public static final AliyunCommercialRocketMqSubscriptionExt QUEUE_TEST = new AliyunCommercialRocketMqSubscriptionExt(RocketMqKey.TEST_TOPIC, RocketMqKey.TEST_TAG, null);
}

为了方便管理,咱们能够把这些要害 Key 在独自放到一个类:

public class RocketMqKey {
    public static final String TEST_TOPIC = "test_topic";
    public static final String TEST_TAG = "test_tag";   
}

/**
 * 队列惯例配置,用于启动时候初始化配置,* 所有配置均须要搁置到此类中对外扩大应用
 **/
public final class AliyunCommercialRocketMqConstants {
    /**
     * 参考案例,仅仅作为开发验证应用
     */
    public static final AliyunCommercialRocketMqSubscriptionExt QUEUE_TEST = new AliyunCommercialRocketMqSubscriptionExt(RocketMqKey.TEST_TOPIC, RocketMqKey.TEST_TAG, null);
}
  1. 做好一系列筹备之后,咱们开始 商用版本生产者兼容 ,商用版本的发送者能够像是上面这样封装官网提供的 demo,也能够应用 注入 ProducerBean的形式集成:
/**
 * 商用 aliyun Rocketmq 工具类封装
 **/
@Component
@Slf4j
public class AliyunCommercialRocketMqSendUtils {

    private static final long KEEP_ALIVE_TIME = 60L;

    private final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
            KEEP_ALIVE_TIME, TimeUnit.SECONDS,
            new SynchronousQueue<>());

    @Autowired
    private Producer producer;

    @Autowired
    private SaMqMessageFailService saMqMessageFailService;

    @Autowired
    private AliyunRocketMqConfig aliyunRocketMqConfig;

    /**
     * 异步推送,须要调用方手动指定 callback
     
     */
    public void singleAsyncSend(String topic, String tag, String msgBody, SendCallback sendCallback) {singleAsyncSend(topic, tag, msgBody, null, sendCallback);
    }

    /**
     * 异步推送,须要调用方手动指定 callback
     * 如果须要应用 callback 告诉,能够应用上面的代码进行解决
     <pre>
         producer.sendAsync(msg, new SendCallback() {
        @Override
        public void onSuccess(final SendResult sendResult) {
            assert sendResult != null;
            System.out.println(sendResult);
        }

        @Override
        public void onException(final OnExceptionContext context) {ONSClientException exception = context.getException();
            //                    // 出现异常意味着发送失败,为了防止音讯失落,倡议缓存该音讯而后进行重试。log.error("【RocketMq-Commercial】发送失败,音讯存储到失败记录表,发送 topic => {}, 发送 tag => {},msgBody => {},key => {}(如果设置 key,能够应用 key 查找音讯), 商用版须要在 RocketMq 云服务重试,失败起因为: {}"
            , topic, tag, msgBody, key, exception.getMessage());
            buildMqErrorInfoAndInsertDb(msgBody, exception);
        }
    });
     </pre>
     * @param topic 主题
     * @return void
     * @description
     * @param: tag  标签
     * @param: msgBody 推送音讯
     * @param: key 不便音讯查找的 key
     * @param: sendCallback 手动回调
     */
    public void singleAsyncSend(String topic, String tag, String msgBody, String key, SendCallback sendCallback) {if(Boolean.FALSE.equals(Boolean.valueOf(aliyunRocketMqConfig.getUseAliyunRocketMqServerEnable()))){throw new UnsupportedOperationException("请设置 UseAliyunRocketServer=true 切换到商用 rocketMq 模式");
        }
        if (StringUtils.isAnyBlank(topic, tag, msgBody)) {throw new IllegalArgumentException("topic, tag and msgBody must be not blank");
        }
        // 对于应用异步接口,倡议设置独自的回调解决线程池,领有更灵便的配置和监控能力。// 如下结构线程的形式申请队列为无界仅用作示例,有 OOM 的危险。// 更正当的结构形式请参考阿里巴巴 Java 开发手册:https://github.com/alibaba/p3c
        producer.setCallbackExecutor(THREAD_POOL_EXECUTOR);

        // 循环发送音讯
        Message msg = new Message( //
                // Message 所属的 Topic
                topic,
                // Message Tag 可了解为 Gmail 中的标签,对音讯进行再归类,不便 Consumer 指定过滤条件在 MQ 服务器过滤
                tag,
                // Message Body 能够是任何二进制模式的数据,MQ 不做任何干涉
                // 须要 Producer 与 Consumer 协商好统一的序列化和反序列化形式
                msgBody.getBytes());
        // 设置代表音讯的业务要害属性,请尽可能全局惟一
        // 以不便您在无奈失常收到音讯状况下,可通过 MQ 控制台查问音讯并补发
        // 留神:不设置也不会影响音讯失常收发
        if (CharSequenceUtil.isNotBlank(key)) {msg.setKey(key);
        }
        // 发送音讯,只有不抛异样就是胜利
        try {producer.sendAsync(msg, sendCallback);
        } catch (ONSClientException exception) {
            // 出现异常意味着发送失败,为了防止音讯失落,倡议缓存该音讯而后进行重试。log.error("【RocketMq-Commercial】发送失败,音讯存储到失败记录表,发送 topic => {}, 发送 tag => {},msgBody => {},key => {}(如果设置 key,能够应用 key 查找音讯), 商用版须要在 RocketMq 云服务重试,失败起因为: {}"
                    , topic, tag, msgBody, key, exception.getMessage());
            buildMqErrorInfoAndInsertDb(msgBody, exception);
        }


    }

    /**
     * 阻塞独自推送队列,应用零碎默认的 Key 生成规定
     *
     * @param topic 主题
     * @return void
     * @description 阻塞独自推送队列
     * @param: tag 标签
     * @param: msgBody msgBody 内容
     */
    public void singleSyncSend(String topic, String tag, String msgBody) {singleSyncSend(topic, tag, msgBody, null);
    }

    /**
     * 阻塞独自推送队列,应用零碎默认的 Key 生成规定
     *
     * @param topic 主题
     * @return void
     * @description 阻塞独自推送队列
     * @param: tag 标签
     * @param: msgBody msgBody 内容
     */
    public void singleSyncSend(String topic, String tag, String msgBody, String key) {if(!Boolean.valueOf(aliyunRocketMqConfig.getUseAliyunRocketMqServerEnable())){throw new UnsupportedOperationException("请设置 UseAliyunRocketServer=true 切换到商用 rocketMq 模式");
        }
        if (StringUtils.isAnyBlank(topic, tag, msgBody)) {throw new IllegalArgumentException("topic, tag and msgBody must be not blank");
        }
        Message msg = new Message( //
                // Message 所属的 Topic
                topic,
                // Message Tag 可了解为 Gmail 中的标签,对音讯进行再归类,不便 Consumer 指定过滤条件在 MQ 服务器过滤
                tag,
                // Message Body 能够是任何二进制模式的数据,MQ 不做任何干涉
                // 须要 Producer 与 Consumer 协商好统一的序列化和反序列化形式
                msgBody.getBytes());
        // 设置代表音讯的业务要害属性,请尽可能全局惟一
        // 以不便您在无奈失常收到音讯状况下,可通过 MQ 控制台查问音讯并补发
        // 留神:不设置也不会影响音讯失常收发
        if (CharSequenceUtil.isNotBlank(key)) {msg.setKey(key);
        }
        // 发送音讯,只有不抛异样就是胜利
        try {SendResult sendResult = producer.send(msg);
            log.info("【RocketMq-Commercial】推送胜利,singleSyncSend => {}", JSON.toJSONString(sendResult));
        } catch (ONSClientException e) {log.error("【RocketMq-Commercial】发送失败,音讯存储到失败记录表,发送 topic => {}, 发送 tag => {},msgBody => {},key => {}(如果设置 key,能够应用 key 查找音讯), 商用版须要在 RocketMq 云服务重试,失败起因为: {}"
                    , topic, tag, msgBody, key, e.getMessage());
            buildMqErrorInfoAndInsertDb(msgBody, e);
        }

    }

    /**
     * 构建 mq 错误信息,并且插入到异样推送表
     *
     * @param msgBody
     * @return void
     * @description 构建 mq 错误信息,并且插入到异样推送表
     * @param: e
     */
    private void buildMqErrorInfoAndInsertDb(String msgBody, ONSClientException e) {
        // 出现异常意味着发送失败,为了防止音讯失落,倡议缓存该音讯而后进行重试。// 出现异常,保留异样信息到数据库
        SaMqMessageFail saMqMessageFail = new SaMqMessageFail();
        saMqMessageFail.setMqId(IdUtil.randomUUID());
        saMqMessageFail.setQueueName("RocketMq-Commercial");
        saMqMessageFail.setQueueMessage(msgBody);
        // 2 代表失败
        saMqMessageFail.setStatus(2);
        // 1 代表重试次数
        saMqMessageFail.setProcCount(1);
        saMqMessageFail.setFailReason(e.toString());
        saMqMessageFail.setCreateDate(new Date());
        saMqMessageFailService.insert(saMqMessageFail);
    }

}

注入 ProductBean 的形式能够从下面提到的 SpringBoot 集成形式解决。

  1. 当初咱们解决之前遗留的问题,如果是消费者,应该如何更优雅的接管音讯?首先咱们须要明确,商用 RocketMq 注入是须要依附手动构建 监听器 ,然而咱们下面提到 SpringBoot 提供了 注解 + @Component的形式实现队列监听的消费者。此外为了防止和开源版本抵触,咱们应用之前参数配置自定义的开关,在遇到开源版本的时候咱们 返回 null(这里返回 null Spring 会扫描获取 SpringBoot 的 RocketMq 依赖,不会呈现报错和无奈启动的问题,和后面的状况略有不同)避免主动注入商用 RocketMq 的监听器。

上面是商用版本注入 ConsumerBean 的代码,订阅商用 RocketMq 的生产监听器:

/**
 *  阿里云商用 Rocketmq 队列接管
 *  商用 rocketmqConsumer 的所有申请会进入一个入口,须要散发到不同的具体业务解决。**/
@Component
@Slf4j
public class AliyunCommercialRocketMqQueueConsumer {

    @Autowired
    private AliyunCommercialRocketMqConfig aliyunCommercialRocketMqConfig;

    // 自定义的监听器
    @Autowired
    private AliyunCommerciaRocketMqTestListener aliyunCommerciaRocketMqTestListener;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean buildConsumer() {
        // 如果开关为 false, 则不能注入此对象,否则商用的 API 会顶替掉 框架诸如的 Bean 出现异常
        if(!Boolean.valueOf(aliyunCommercialRocketMqConfig.getUseAliyunRocketServer())){log.warn("非商用版本不注入商用版本监听器");
            return null;
        }
        ConsumerBean consumerBean = new ConsumerBean();
        // 配置文件
        Properties properties = aliyunCommercialRocketMqConfig.getRocketMqProperty();
        properties.setProperty(PropertyKeyConst.GROUP_ID, aliyunCommercialRocketMqConfig.getGroupId());
        // 将消费者线程数固定为 20 个 20 为默认值
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, aliyunCommercialRocketMqConfig.getConsumeThreadNums());
        consumerBean.setProperties(properties);
        // 订阅关系
        Map<Subscription, MessageListener> subscriptionTable = new HashMap<>(2);
        // 应用了之前定义的测试用的监听器
        subscriptionTable.put(AliyunCommercialRocketMqConstants.QUEUE_TEST, aliyunCommerciaRocketMqTestListener);
        // 订阅多个 topic 如下面设置

        consumerBean.setSubscriptionTable(subscriptionTable);
        return consumerBean;
    }


}

AliyunCommerciaRocketMqTestListener 自定义监听器为了兼容商用和开源版本做了上面的扭转,为了不便了解这里退出了相干注解:

/**
 * 监听器开发模板
 **/
@Slf4j
@Component
// 开源版本能够兼容此注解
@RocketMQMessageListener(topic = RocketMqKey.TEST_TOPIC, consumerGroup = RocketMqKey.TEST_GROUP, selectorExpression = RocketMqKey.TEST_TAG)
// RocketMQListener<MessageExt> 属于 SpringBoot RocketMq 的依赖
// MessageListener 属于商用 RocketMq 的 SDK
public class QueueRemoteReconListener implements RocketMQListener<MessageExt>, MessageListener {


    // 开源版本的应用
    @Override
    public void onMessage(MessageExt message) {// 开源版本的 SpringBoot 形式,和上文介绍雷同}

    /** 
     * @description  
     * @param message 音讯类. 一条音讯由主题, 音讯体以及可选的音讯标签, 自定义从属键值对形成.
     * 留神: 咱们对每条音讯的自定义键值对的长度没有限度, 但所有的自定义键值对, 零碎键值对序列化后, 所占空间不能超过 32767 字节.
     * @param: context 每次生产音讯的上下文,供未来扩大应用
     * @return com.aliyun.openservices.ons.api.Action 
     */ 
    @Override
    public Action consume(Message message, ConsumeContext context) {
        // 商用版本要求实现这个办法,Action 如下,默认失败重试 16 次,返回 ReconsumeLater 会触发重试机制,所以会存在反复生产的问题
        //public enum Action {
            /**
             * 生产胜利,持续生产下一条音讯
             */
            //CommitMessage,
            /**
             * 生产失败,告知服务器稍后再投递这条音讯,持续生产其余音讯
             */
            //ReconsumeLater,
        //}

    }
}

以上就是消费者的兼容解决,既能够满足开源版本的注解开发要求,也能够不收缩类的状况下沿用扩大。

Spring Cloud Alibaba 我的项目兼容小结

从集体的角度来看,在生产者的兼容上比拟容易实现,依照官网的 demo 构建带商用 RocketMq 的 ProducerBean 即可,而消费者的集成则要简单一些,这里为了思考不反复的设置或者写反复代码,把要害的配置动态化,同时为了官网写的毛糙的音讯订阅类做了继承笼罩的操作兼容。此外为了避免开源版本的消费者 Bean 被商用的监听器笼罩导致生效,应用了简略的开关来进行 Bean 的注入管制,写的比拟毛糙,读者有更好的写法欢送探讨。

总体来看来说商用版本的 RocketMq 集成起来稍微麻烦,然而还是能够承受。此外也能够看到 SDK 的兼容性实际上是比拟简陋的,很多中央的很像然而又齐全是本人从新设计的,感觉就是一个新的团队给老团队做进去的货色做兼容的感觉,不过不就是桥接嘛,我也会,所以最初集体凑合消费者兼容就用来多接口实现的兼容写法了。

最终的集成成果是开源版本敞开注入 bean 的开关,配置为了关照 Properties 对于不须要的配置进行占位解决,而商用版本则关上开关,通过自定义注入监听器的形式顶替掉 SpringBoot 的依赖。

最初的成果是只须要批改 RocketMq 的配置,启动之后主动连贯到相干的 RocketMq。

开源版本:

# 音讯队列 - 开源版本
rocketmq:
  use-aliyun-rocketSever: false
  name-server: 192.168.0.92:9876
  producer:
    group: testGroup
    secret-key: NONE
    access-key: NONE
    timeoutMillis: 15000
    consumeThreadNums: 20
  consumer:
    group: testGroup
    secret-key: NONE
    access-key: NONE
    timeoutMillis: 15000
    consumeThreadNums: 20

商用版本

# 音讯队列 - 商用版本
rocketmq:
 # 开源 RocketMq 和商业版 RocketMq 切换开关
  use-aliyun-rocketSever: true
  name-server: http://xxxx.mq.aliyuncs.com:80
  producer:
# 目前 uat 借助开发测试应用
    group: testGroup
      secret-key: xxx
      access-key: xxx
      timeoutMillis: 15000
      consumeThreadNums: 20
    consumer:
      group: testGroup
      secret-key: xxx
      access-key: xxx
      timeoutMillis: 15000
      consumeThreadNums: 20

阿里云商用 RocketMq 简略介绍(4.X 版本)

官网介绍:https://help.aliyun.com/product/29530.html

无关 RocketMq 自身的介绍局部能够浏览:[[【RocketMq】RocketMq 扫盲]],这里挑了商用版本集体认为须要关注的几个点进行介绍。留神这里应用的商用 RocketMq 版本为 4.X 的版本。

计费模式

商用 RocketMq 次要分为上面几个局部:

  • 主系列:标准版、专业版、铂金版
  • 子系列:单节点版、集群版

集体最初是白嫖了公司的商用集群版 RocketMq 进行试验,计费的形式分为 包年包月 按量付费,前者实用于流量比拟大并且固定的状况,应用套餐比拟划算,而后者按需付费则实用于 RocketMq 应用较少(或者不稳固)或者调用量较少的状况,集体学习也比拟举荐按量付费模式,比包年包月划算很多。

如果呈现欠费,在实例停服 7 天后两种付费模式均会革除所有的 RocketMq 数据,而日常欠费则会被主动停用,嘛,和电话卡欠费差不多的情理,这里就不过多拓展了。如果要退订商用 RocketMq 的服务。按量付费能够随时退订,包年包月也会依据天数进行退订。

PS:看完这一整套计费模式下来,发现还是挺良心的。让我没想到的是包年包月竟然能够计算未应用的天数返还,这一点挺不错的,用起来不必放心被套进去。

接入形式

商用 RocketMq 的接入步骤如下:

官网在疾速入门中提供了创立资源的解释视频:https://help.aliyun.com/document_detail/441914.html。有两个点须要留神,第一个点是 RAM 用户必须要进行账户受权能力失常应用,第二个点是对外拜访形式设置,音讯队列 RocketMQ 版反对 VPC 拜访和公网拜访。资源创立实现之后就能够应用官网提供的 SDK 收发音讯,在应用之前须要留神上面这些前提:

  • 步骤二:创立资源
  • 装置 IDEA

    您能够应用 IntelliJ IDEA 或者 Eclipse,本文以 IntelliJ IDEA Ultimate 为例。

  • 装置 1.8 或以上版本 JDK
  • 装置 2.5 或以上版本 Maven

之后便是在我的项目中引入 JAVA 依赖和复制粘贴模板代码验证。

音讯队列模型

商用 RocketMq 的根本应用逻辑如下:

依照音讯类型,商用 RocketMq 提供了上面的音讯类型:

  • 一般音讯
  • 程序音讯
  • 定时 / 延时音讯
  • 事务音讯

SDK 接入参考

SDK 接入的主页链接如下:

https://help.aliyun.com/document_detail/69111.html

次要介绍了上面几个点,理论参考第二个举荐的接入形式即可。

  • SDK 参考概述
  • 商业版 TCP 协定 SDK(举荐)
  • 商业版 HTTP 协定 SDK(多语言举荐)
  • 社区版 TCP 协定 SDK(仅供开源用户上云应用)
  • 订阅音讯常见问题

对于 JAVA 应用程序倡议应用 TCP 协定 SDK,集成的形式也比较简单,为了不便了解,也能够通过以下链接获取相干 Demo,外面都有具体的正文解释:

  • spring/java-spring-demo
  • springboot/java-springboot-demo

如果是 HTTP 协定的 SDK,则应用上面的依赖包:

<dependency>
    <groupId>com.aliyun.mq</groupId>
    <artifactId>mq-http-sdk</artifactId>
    <!-- 以下版本号请替换为 Java SDK 的最新版本号 -->
    <version>1.0.3.2</version> 
    <classifier>jar-with-dependencies</classifier>
</dependency>

如果是 TCP 协定的接入形式,则应用上面的依赖包:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client</artifactId>
    <!-- 以下版本号请替换为 Java SDK 的最新版本号 -->
    <version>1.8.8.1.Final</version>
</dependency>                            

接入细节阐明

集体在试验的时候发现 HTTP 协定的版本集成 SDK 更像是给了一套 API 工具包,益处是能够不须要依赖 Spring 框架等独自应用,然而最大的毛病也是这里,很难用于框架当中,如果要实现主动接管音讯并且解决,须要依附 手动实现定时工作 拉取音讯实现主动生产,这一点也比拟蛋疼。

这里列举 HTTP 的 SDK的发送和承受代码,首先是生产者的模板代码,这些代码和开源的 RocketMq 应用相似:

// 省略大量代码。。。// 循环发送 4 条音讯。for (int i = 0; i < 4; i++) {
                TopicMessage pubMsg;        // 一般音讯。pubMsg = new TopicMessage(
                // 音讯内容。"hello mq!".getBytes(),
                // 音讯标签。"A"
                );
                // 设置音讯的自定义属性。pubMsg.getProperties().put("a", String.valueOf(i));
                // 设置音讯的 Key。pubMsg.setMessageKey("MessageKey");
               
            // 同步发送音讯,只有不抛异样就是胜利。TopicMessage pubResultMsg = producer.publishMessage(pubMsg);

            // 同步发送音讯,只有不抛异样就是胜利。System.out.println(new Date() + "Send mq message success. Topic is:" + topic + ", msgId is:" + pubResultMsg.getMessageId()
                        + ", bodyMD5 is:" + pubResultMsg.getMessageBodyMD5());
            }
        } catch (Throwable e) {
            // 音讯发送失败,须要进行重试解决,可从新发送这条音讯或长久化这条数据进行弥补解决。System.out.println(new Date() + "Send mq message failed. Topic is:" + topic);
            e.printStackTrace();}
// 省略大量代码。。。

而在接管方略微麻烦一些,官网的案例是应用 死循环 一直查看 Broker 是否有积压音讯,如果有则通过 被动拉取音讯 的模式去拉取音讯。

实现多线程期待的成果能够写入到 Thread 继承类的 Run 办法中。

 // 在以后线程循环生产音讯,倡议多开个几个线程并发生产音讯。do {
            
            // 省略大量代码

// 解决业务逻辑。for (Message message : messages) {System.out.println("Receive message:" + message);
            }

            // 音讯重试工夫达到前若不确认音讯生产胜利,则音讯会被反复生产。// 音讯句柄有工夫戳,同一条音讯每次生产的工夫戳都不一样。{List<String> handles = new ArrayList<String>();
                for (Message message : messages) {handles.add(message.getReceiptHandle());
                }

                try {consumer.ackMessage(handles);
                } catch (Throwable e) {
                    // 某些音讯的句柄可能超时,会导致音讯生产状态确认不胜利。if (e instanceof AckMessageException) {AckMessageException errors = (AckMessageException) e;
                        System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");
                        if (errors.getErrorMessages() != null) {for (String errorHandle :errors.getErrorMessages().keySet()) {System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
                                        + ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
                            }
                        }
                        continue;
                    }
                    e.printStackTrace();}
            }
       } while (true);
     // 省略大量代码
            

写在最初

算是一次集体兼容的笔记。还是要吐槽商用的 RocketMq 集成真的挺无语的,不过阿里的货色懂的都懂。

正文完
 0