关于rocketmq:RocketMQ-应用篇

46次阅读

共计 12301 个字符,预计需要花费 31 分钟才能阅读完成。

本文偏重解说 RocketMQ 的理论利用,对于实践局部,在另外一篇文章中再做探讨。在此不多说,间接进入实战吧。

1. 配置

通常开发间接依赖 rocketmq-spring-boot-starter 即可,starter 中蕴含了所有所需的依赖,如:

  • rocketmq-client:封装了客户端的应用程序,还蕴含了 netty 的通信服务。
  • rocketmq-acl:拜访权限管制服务。

starter 还提供了很多现成封装类,如:RocketMQTemplate.javaRocketMQListener.javaRocketMQUtil.java 等,在利用开发时会常常用到。

倡议间接用上述的 rocketmq-spring-boot-starter,见过有公司为了外部兼容,本人封装了一个服务代替官网的 starter。但这个服务除了减少局部自定义程序外,其余的类和办法都是照拷贝 starter 的。
当后续 rocketmq-spring-boot-starter 降级了,或修复 bug、或拓展性能,公司外部的服务就很难降级了,除非再从头拷贝一遍。当公司外部没有相应的体量,倡议不要学大厂本人封装根底服务,否则容易欲罢不能。

pom 依赖

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.0</version>
        </dependency>

写文章时,starter 最新的版本是 2.2.0,对应的 rocketmq-clientrocketmq-acl 版本是 4.8.0

认准版本好很重要,因为 rocketmq-spring-boot-starter 始终在疾速迭代中,很多类和办法,在新版本中都会扭转,例如下文会提到的 tag、音讯事务等,这是也是为什么不倡议公司外部封装 starter。

application

# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
  name-server: 127.0.0.1:9876 # RocketMQ Namesrv
  # Producer 配置项
  producer:
    group: koala-dev-event-centre-group # 生产者分组
    send-message-timeout: 3000 # 发送音讯超时工夫,单位:毫秒。默认为 3000。compress-message-body-threshold: 4096 # 消息压缩阀值,当音讯体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
    max-message-size: 4194304 # 音讯体的最大容许大小。。默认为 4 * 1024 * 1024B
    retry-times-when-send-failed: 2 # 同步发送音讯时,失败重试次数。默认为 2 次。retry-times-when-send-async-failed: 2 # 异步发送音讯时,失败重试次数。默认为 2 次。retry-next-server: false # 发送音讯给 Broker 时,如果发送失败,是否重试另外一台 Broker。默认为 false
    access-key: # Access Key,可浏览 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档
    secret-key: # Secret Key
    enable-msg-trace: true # 是否开启音讯轨迹性能。默认为 true 开启。可浏览 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档
    customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义音讯轨迹的 Topic。默认为 RMQ_SYS_TRACE_TOPIC。# Consumer 配置项
  consumer:
    listeners: # 配置某个生产分组,是否监听指定 Topic。构造为 Map< 消费者分组, <Topic, Boolean>>。默认状况下,不配置示意监听。erbadagang-consumer-group:
        topic1: false # 敞开 test-consumer-group 对 topic1 的监听生产 

rocketmq 配置很多,除了根底无关 server 的配置以外,还有 acl、producer、consumer 等。但通常一个服务内会有多个 consumer,倡议在代码中实现。而 producer 如果只有一个,能够配置。

2. 一般音讯发送

无关 rocketmq 发送音讯的源码,倡议查看 org.apache.rocketmq.client.producer.DefaultMQProducer,类中的属性大多对应于配置文件中的参数。

2.1. 三种音讯发送

这里只探讨一般的音讯发送形式,区别于程序、事务、提早 / 定时等音讯发送形式,以后分为三种:

  • 同步(sync): 同步发送就是指 producer 发送音讯后,会同步期待,在接管到 broker 响应后果后才持续发下一条音讯。
  • 异步(async): 异步发送是指 producer 收回一条音讯后,不须要期待 broker 响应,就接着发送下一条音讯的通信形式。异步发送同样能够对音讯的响应后果进行解决,须要在发送音讯时实现异步发送回调接口。
  • 单方向(oneWay): 是一种单方向通信形式,也就是说 producer 只负责发送音讯,不期待 broker 发回响应后果,而且也没有回调函数触发,这也就意味着 producer 只发送申请不期待响应后果。

三种发送形式比照

发送形式 发送 TPS 发送后果响应 可靠性 应用场景
同步 个别 重要的告诉场景
异步 比拟重视 RT(响应工夫)的场景
单方向 最快 可靠性要求并不高的场景

async 异步执行的线程池配置

this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 60000L, TimeUnit.MILLISECONDS, this.asyncSenderThreadPoolQueue, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);

            public Thread newThread(Runnable r) {return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
            }
        });

三种发送音讯代码

    private String convertDestination(String topic, String tag) {return StringUtils.isBlank(tag) ? topic : StringUtils.join(topic, ":", tag);
    }

    /**
     * 同步发送
     */
    public SendResult syncSend(String topic, String tag, String content) {String destination = this.convertDestination(topic, tag);
        return rocketMQTemplate.syncSend(destination, content);
    }

    /**
     * 异步发送
     */
    public void asyncSend(String topic, String tag, String content, SendCallback sendCallback) {String destination = this.convertDestination(topic, tag);
        rocketMQTemplate.asyncSend(destination, content, sendCallback);
    }

    /**
     * 单向发送
     */
    public void sendOneWay(String topic, String tag, String content) {String destination = this.convertDestination(topic, tag);
        rocketMQTemplate.sendOneWay(destination, content);
    }

2.2. 批量发送

批量音讯发送是将同一主题的多条音讯一起打包发送到音讯服务端,缩小网络调用次数,进步网络传输效率。

当然,并不是在同一批次中发送的音讯数量越多,性能就越好,判断根据是单条音讯的长度,如果单条音讯内容比拟长,则打包发送多条音讯会影响其余线程发送音讯的响应工夫,并且单批次音讯发送总长度不能超过 DefaultMQProducer#maxMessageSize,即配置文件中的 rocketmq.producer.max-message-size

代码

    /**
     * 同步 - 批量发送
     */
    public SendResult syncBatchSend(String topic, String tag, List<String> contentList) {String destination = this.convertDestination(topic, tag);
        List<Message<String>> messageList = contentList.stream()
                .map(content -> MessageBuilder.withPayload(content).build())
                .collect(Collectors.toList());
        return rocketMQTemplate.syncSend(destination, messageList);
    }

4. 标签 tag

rocketmq 中,topic 与 tag 都是业务上用来归类的标识,辨别在于 topic 是一级分类,而 tag 能够了解为是二级分类。定义上:

  • topic: 音讯主题,通过 Topic 对不同的业务音讯进行分类。
  • tag: 音讯标签,用来进一步辨别某个 Topic 下的音讯分类,音讯从生产者收回即带上的属性。

理论业务中,什么时候该用 topic 或 tag 呢?有以下几种倡议:

  • 音讯类型是否统一: 如一般音讯、事务音讯、定时(延时)音讯、程序音讯,不同的音讯类型应用不同的 Topic,无奈通过 Tag 进行辨别。
  • 业务是否相关联: 没有间接关联的音讯,如淘宝交易音讯,京东物流音讯应用不同的 Topic 进行辨别;而同样是天猫交易音讯,电器类订单、女装类订单、化妆品类订单的音讯能够用 Tag 进行辨别。
  • 音讯优先级是否统一: 如同样是物流音讯,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则绝对会慢一些,不同优先级的音讯用不同的 Topic 进行辨别。
  • 音讯量级是否相当: 有些业务音讯尽管量小然而实时性要求高,如果跟某些万亿量级的音讯应用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时须要将不同量级的音讯进行拆分,应用不同的 Topic。

举个理论我的项目的例子吧:我刚刚做的一个我的项目叫共享核心,所有须要共享的资源都来自于各个业务服务方。创立共享资源、撤回共享资源等这类指令,我将其定义为不同的 topic。而发送给 topic 的音讯体中有“资源类型”的字段,每个业务接入方其实只关怀对应本人资源类型的音讯,那么就将“资源类型”定义为 tag,各个业务方的生产端只监听本人所需的 tag 即可。

如果没有 tag 的机制,生产端就得接管所有音讯,反序列化后只解决本人对应“资源类型”的音讯。有了 tag 机制,音讯在进入生产端途中就主动进行过滤散发。

与 RabbitMQ AMQP 协定 比拟

在应用 tag 机制后,第一工夫让我想到了 RabbitMQ 的 AMQP 协定,很像交换机和队列的机制。当应用 tag 之后,像扇形交换机;当没应用 tag 之后,就像直连交换机。

  • Exchange:音讯交换机,它指定音讯按什么规定,路由到哪个队列。
  • Binding:绑定,它的作用就是把 Exchange 和 Queue 依照路由规定绑定起来。
  • Queue:音讯队列载体,每个音讯都会被投入到一个或多个队列。

我想,二者设计的目标都是一样的,就是让生产者和消费者之间解耦,让一个音讯,能够自在地流转到不同的音讯端。RocketMQ 在这点,相较于 RabbitMQ 而言,提供的性能不够丰盛,但更实用、简洁。

示例代码:生产者

通过后面示例代码中,最简略的发送同步音讯代码来看,最新 starter 中封装的 RocketMQTemplate 类中,音讯发送的指标是 String destination。而 它蕴含了 topictag。即公有转换方法中的:destination = topic:tag

    private final RocketMQTemplate rocketMQTemplate;

    private String convertDestination(String topic, String tag) {return StringUtils.isBlank(tag) ? topic : StringUtils.join(topic, ":", tag);
    }

    /**
     * 同步发送
     */
    public SendResult syncSend(String topic, String tag, String content) {String destination = this.convertDestination(topic, tag);
        return rocketMQTemplate.syncSend(destination, content);
    }

示例代码:消费者

@RocketMQMessageListener(consumerGroup = ShareRocketMqConstants.GROUP_PREFIX + ShareRocketMqConstants.TOPIC_SHARE_RSRC_TO_BIZ_CALLBACK,
        topic = ShareRocketMqConstants.TOPIC_SHARE_RSRC_TO_BIZ_CALLBACK,
        selectorExpression = "2||3||4", 
        consumeThreadMax = 3)
public class ShareRsrcMqConsumer implements RocketMQListener<RsrcToBiz4Mq> {... ...}

上述生产端代码中,申明只生产 tag 值为:2、3、4 的音讯,注解中外围有两个属性:

  • selectorType: 默认值就是 SelectorType.TAG,所以示例代码中没有设置。
  • selectorExpression: 对应的表达式。针对 SelectorType.TAG 类型的,就须要设置 tag 的表达式。默认值是 *,即所有 tag 都生产。如果想要指定生产多个 tag,则用 || 或合乎来连贯。

留神: 生产者端,发消息只能指定一个 tag。但消费者端,接管音讯能够指定多个 tag。

5. 提早 / 定时音讯

定时音讯是指音讯发到 broker 后,不能立即被 consumer 生产,要到特定的工夫点或者期待特定的工夫后能力被生产。

原理

其实定时音讯实现原理比较简单,如果一个 topic 对应的音讯在发送端被设置为定时音讯,那么会将该音讯先寄存在 topic 为 SCHEDULE_TOPIC_XXXX 的音讯队列中,并将原始音讯的信息寄存在 commitLog 文件中,因为 topic 为 SCHEDULE_TOPIC_XXXX,所以该音讯不会被立刻音讯,而后通过定时扫描的形式,将达到延迟时间的音讯,转换为正确的音讯,发送到相应的队列进行生产。

提早级别

只管 rocketmq 反对定时音讯,然而以后开源版本的 rocketmq 所反对的定时工夫是无限的、不同级别的精度的工夫,并不是任意无限度的定时工夫。默认 Broker 服务器端有 18 个定时级别,每一个级别别离对应不同的延迟时间:

提早级别 延迟时间
1 1s
2 5s
3 10s
4 30s
5 1m
6 2m
7 3m
8 4m
9 5m
10 6m
11 7m
12 8m
13 9m
14 10m
15 20m
16 30m
17 1h
18 2h

代码

发送提早音讯并没有非凡的办法,而是基于一般发消息的办法(如:rocketMQTemplate.syncSend)做了重载,减少了一个传入参数 int delayLevel,默认值为 0,即立刻发送。

    /**
     * 同步提早发送
     *
     * @param delayLevel 延时等级:当初 RocketMq 并不反对任意工夫的延时,须要设置几个固定的延时等级,从 1s 到 2h 别离对应着等级 1 到 18
     *                   1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     */
    public SendResult syncSendDelay(String topic, String tag, String content, long timeout, int delayLevel) {String destination = this.convertDestination(topic, tag);
        Message message = MessageBuilder.withPayload(content).build();
        return rocketMQTemplate.syncSend(destination, message, timeout, delayLevel);
    }

6. 程序音讯

程序音讯是一种对音讯发送和生产程序有严格要求的音讯,对于一个指定的 Topic,音讯严格依照先进先出(FIFO)的准则进行音讯公布和生产,即先公布的音讯先生产,后公布的音讯后生产。
RocketMQ 目前只能保障同一个分区队列内的程序音讯,因而实现下列场景的形式有:

  • 分区有序: RocketMQ 反对同一个队列分区内的程序音讯。另外某个 Topic 下,所有音讯依据 ShardingKey 进行分区,雷同 ShardingKey 的音讯必须被发送到同一个分区队列。因而只有保障音讯依照同一 ShardingKey 发送即可,而后保障 Consumer 同一个队列单线程生产即可。
  • 全局有序: 当设置 Topic 下只有一个分区时,能够实现全局有序。

全局有序的性能太差,举荐应用分区有序。假如咱们要通过 mq 解决订单内的音讯。同一个 topic,通常咱们只须要保障同一个订单下的音讯程序公布和生产即可,不同订单下的音讯应该互不烦扰。因而能够采纳分区有序,将订单号转换为 ShardingKey,只有保障同一个订单下的音讯都流转到同一个队列下,而后程序生产。

最常见将订单号转换为 ShardingKey 的形式就是 hashKey。

生产者

    /**
     * 同步程序发送
     *
     * @param hashKey 依据 hashKey 和 队列 size() 取模,保障同一 hashKey 的音讯发往同一个队列,以实现 同一 hashKey 下的音讯 程序发送
     *                因而 hashKey 倡议取 业务上惟一标识符,如:订单号,只需保障同一订单号下的音讯程序发送
     */
    public SendResult syncSendOrderly(String topic, String tag, String content, String hashKey) {String destination = this.convertDestination(topic, tag);
        Message message = MessageBuilder.withPayload(content).build();
        return rocketMQTemplate.syncSendOrderly(destination, message, hashKey);
    }

消费者

针对程序音讯的生产,代码也很容易,次要是 @RocketMQMessageListener 注解,通过设置了 consumeMode = ConsumeMode.ORDERLY,示意应用程序生产。

ConsumeMode 有两种值:

  • CONCURRENTLY:默认值,并发同时接管异步传递的音讯。
  • ORDERLY:程序生产时开启,只开启一个线程,同一时间只有序接管一个队列的音讯。
@RocketMQMessageListener(topic = "xxx-topic",
        consumerGroup = "xxxGroup",
        consumeMode = ConsumeMode.ORDERLY)
public class OrderConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {... ...}
}

发问:如果针对程序音讯的消费者,同时启动了多个 spring 实例,会影响吗?

这个问题过后想了良久,为了保障音讯依照程序生产,消费者是单线程生产的。可理论线上程序都不会是单节点,如果有多个 spring 实例,不是也能够了解成“多线程”解决了吗?

首先,回顾几个知识点吧:

  • 程序生产只能针对“集群模式”生产,即 messageModel = MessageModel.CLUSTERING
  • 集群模式下,多个消费者如何对音讯队列进行负载呢?音讯队列负载机制遵循一个通用的思维: 一个音讯队列同一时间只容许被一个消费者生产,一个消费者能够生产多个音讯队列。
  • 尽管消费者代码中,RocketMQ 监听器像是 mq 中的“推模式”。但实际上,RocketMQ 音讯推模式基于拉模式实现,在拉模式上包装一层,一个拉取工作实现后开始下一个拉取工作。

程序音讯生产时,在每次拉取工作时,都会像 broker 申请锁住该队列。因而,就算有多个消费者实例同时在运行,针对单个队列中的程序音讯,仍然是程序生产的。

7. 事务音讯

这里着重阐明一下,RocketMQ 的事务机制,和咱们通常说的通过 MQ 来实现最终一致性的分布式事务机制,不是一个事件。

RocketMQ 的事务机制,只体现在生产者,保障的是生产者本地的事务执行、发消息,这两个事务达成一致性。至于消费者收到音讯后的事务处理,并不在以后机制内。

失常事务的流程

失常事务的流程,遵循的是 2PC 的计划。

  1. 调用发送事务音讯办法,失常发送音讯。发送事务的办法名为 syncSendInTransaction
  2. mq 服务器端胜利接管到音讯后,音讯处于一个半接管的状态,并响应给生产者客户端。
  3. 生产者收到服务器端胜利接管的响应后,执行本地事务。本地事务写在 executeLocalTransaction 办法外面,返回后果为枚举 RocketMQLocalTransactionState,有:COMMIT、ROLLBACK、UNKNOWN 三种值。
  4. 服务器端收到 COMMIT 状态后,会把音讯下发给消费者
  5. 服务器端收到 ROLLBACK 状态后,会删除掉以后半接管状态的音讯,不再解决。
  6. 服务器端收到 UNKNOWN 状态,或者服务器端超时未收到音讯,或者生产者未响应状态,则将进行音讯弥补机制。

音讯弥补机制

这部分比较简单,针对上述事务流程第 6 点的几种状况,会触发音讯回查。

  1. 当事务音讯呈现 UNKNOWN、超时、未响应时,服务器会被动调用生产者本地的回查办法 checkLocalTransaction,查问本地事务执行状况,返回后果还是枚举值 RocketMQLocalTransactionState
  2. 服务器接管到返回后果的解决流程和后面的失常流程一样。
  3. 如果仍然是 UNKNOWN、超时、未响应,将持续重试。如果超过最大重试次数后,仍然无果,则视为 ROLLBACK,删除以后音讯。

事务音讯相干的参数,根本在 org.apache.rocketmq.common.BrokerConfig 类中定义,例如以下几个罕用属性的默认值:

  • transactionTimeOut = 6000L:服务器未收到事务本地音讯的超时工夫为 1 分钟。
  • transactionCheckMax = 15:音讯弥补机制中的最大回查次数为 15 次。
  • transactionCheckInterval = 6000L:音讯弥补机制中每次回查的工夫距离为 1 分钟。

因为是 BrokerConfig 类中的属性,因而如果不想用默认值,能够在 broker.conf 文件中自定义批改。

代码:生产者发送事务音讯

    /**
     * 事务发送
     */
    public TransactionSendResult syncSendInTransaction(String topic, String tag, String content) {String destination = this.convertDestination(topic, tag);
        String transactionId = UUID.randomUUID().toString();
        Message message = MessageBuilder.withPayload(content)
                .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                .build();
        return rocketMQTemplate.sendMessageInTransaction(destination, message, content);
    }

代码:生产者定义事务本地办法

在生产者客户端,通过事务监听器,实现 RocketMQLocalTransactionListener 接口的两个上述办法。

@RocketMQTransactionListener
public class LocalTransactionListener implements RocketMQLocalTransactionListener {
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {System.out.println("executeLocalTransaction:"+ LocalDateTime.now());
        return RocketMQLocalTransactionState.UNKNOWN;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {System.out.println("checkLocalTransaction:"+ LocalDateTime.now());
        return RocketMQLocalTransactionState.COMMIT;
    }
}

发问:如果 spring 我的项目中有多个事务音讯生产者,怎么辨别不同的 RocketMQLocalTransactionListener?

@RocketMQLocalTransactionListener 这个注解提供了属性,能够辨别不同的事务音讯生产者。在 stater 2.0.4 版本中,是提供 txProducerGroup 这个属性指向一个音讯发送者组,映射不同的事务音讯发送逻辑。但如同有 bug,在后续新的版本迭代中,去掉了这个属性。

到了 2.1.1 版本,只能通过指定 rocketMQTemplateBeanName 来实现,即不同的事务音讯发送时,就得定义不同的 RocketMQTemplate。挺麻烦的,期待这个性能在后续的迭代中欠缺好。

8. 重试队列、死信队列

RocketMQ 很多应多异样的顾全机制,例如音讯重发的机制,这里能够分两类:

  • 生产者重发: 在后面介绍三种发送音讯形式时,针对同步、异步发送失败时,都会再重发,相应重发次数别离对应 DefaultMQProducer 类中属性值 retryTimesWhenSendFailedretryTimesWhenSendAsyncFailed,也能够在 properties 配置文件中自定义设置。
  • 消费者重发: 当音讯曾经进入 broker 后,消费者接管失败,broker 也会给消费者重发,以下衍生出本次的重试队列、死信队列。

重试队列

如果消费者端因为各种类型异样导致本次生产失败,为避免该音讯失落而须要将其从新回发给 broker 端保留,保留这种因为异样无奈失常生产而回发给 mq 的音讯队列称之为重试队列。

RocketMQ 会为每个生产组都设置一个 topic 名称为 “%RETRY%+consumerGroup” 的重试队列(这里须要留神的是,这个 Topic 的重试队列是针对生产组,而不是针对每个 Topic 设置的)。

用于临时保留因为各种异样而导致消费者端无奈生产的音讯。思考到异样复原起来须要一些工夫,会为重试队列设置多个重试级别,每个重试级别都有与之对应的从新投递延时,重试次数越多投递延时就越大。RocketMQ 对于重试音讯的解决是先保留至 topic 名称为 “SCHEDULE_TOPIC_XXXX” 的提早队列中,后盾定时工作依照对应的工夫进行 Delay 后从新保留至“%RETRY%+consumerGroup”的重试队列中。

死信队列

因为有些起因导致消费者端长时间的无奈失常生产从 broker 端 pull 过去的业务音讯,为了确保音讯不会被无端的抛弃,那么超过配置的“最大重试生产次数”后就会移入到这个死信队列中。

在 RocketMQ 中,SubscriptionGroupConfig 配置常量默认地设置了两个参数,一个是 retryQueueNums 为 1(重试队列数量为 1 个),另外一个是 retryMaxTimes 为 16(最大重试生产的次数为 16 次)。Broker 端通过校验判断,如果超过了最大重试生产次数则会将音讯移至这里所说的死信队列。这里,RocketMQ 会为每个生产组都设置一个 topic 命名为 “%DLQ%+consumerGroup" 的死信队列。但如果一个消费者组未产生死信音讯,音讯队列 RocketMQ 不会为其创立相应的死信队列的。

因为死信队列中的音讯是无奈被生产的,它也证实了一部分音讯呈现了意料之外的状况。因而个别在理论利用中,移入至死信队列的音讯,须要人工干预解决。例如通过 console 查看是否有私信队列,当解决问题后,可在 console 上手动重发消息。

正文完
 0