本文偏重解说 RocketMQ 的理论利用,对于实践局部,在另外一篇文章中再做探讨。在此不多说,间接进入实战吧。
1. 配置
通常开发间接依赖 rocketmq-spring-boot-starter
即可,starter 中蕴含了所有所需的依赖,如:
rocketmq-client
:封装了客户端的应用程序,还蕴含了 netty 的通信服务。rocketmq-acl
:拜访权限管制服务。
starter
还提供了很多现成封装类,如:RocketMQTemplate.java
、RocketMQListener.java
、RocketMQUtil.java
等,在利用开发时会常常用到。
倡议间接用上述的 rocketmq-spring-boot-starter
,见过有公司为了外部兼容,本人封装了一个服务代替官网的 starter
。但这个服务除了减少局部自定义程序外,其余的类和办法都是照拷贝 starter
的。
当后续 rocketmq-spring-boot-starter
降级了,或修复 bug、或拓展性能,公司外部的服务就很难降级了,除非再从头拷贝一遍。当公司外部没有相应的体量,倡议不要学大厂本人封装根底服务,否则容易欲罢不能。
pom 依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
写文章时,starter 最新的版本是 2.2.0
,对应的 rocketmq-client
、rocketmq-acl
版本是 4.8.0
。
认准版本好很重要,因为 rocketmq-spring-boot-starter
始终在疾速迭代中,很多类和办法,在新版本中都会扭转,例如下文会提到的 tag、音讯事务等,这是也是为什么不倡议公司外部封装 starter。
application
# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv
# Producer 配置项
producer:
group: koala-dev-event-centre-group # 生产者分组
send-message-timeout: 3000 # 发送音讯超时工夫,单位:毫秒。默认为 3000。compress-message-body-threshold: 4096 # 消息压缩阀值,当音讯体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
max-message-size: 4194304 # 音讯体的最大容许大小。。默认为 4 * 1024 * 1024B
retry-times-when-send-failed: 2 # 同步发送音讯时,失败重试次数。默认为 2 次。retry-times-when-send-async-failed: 2 # 异步发送音讯时,失败重试次数。默认为 2 次。retry-next-server: false # 发送音讯给 Broker 时,如果发送失败,是否重试另外一台 Broker。默认为 false
access-key: # Access Key,可浏览 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档
secret-key: # Secret Key
enable-msg-trace: true # 是否开启音讯轨迹性能。默认为 true 开启。可浏览 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档
customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义音讯轨迹的 Topic。默认为 RMQ_SYS_TRACE_TOPIC。# Consumer 配置项
consumer:
listeners: # 配置某个生产分组,是否监听指定 Topic。构造为 Map< 消费者分组, <Topic, Boolean>>。默认状况下,不配置示意监听。erbadagang-consumer-group:
topic1: false # 敞开 test-consumer-group 对 topic1 的监听生产
rocketmq 配置很多,除了根底无关 server 的配置以外,还有 acl、producer、consumer 等。但通常一个服务内会有多个 consumer,倡议在代码中实现。而 producer 如果只有一个,能够配置。
2. 一般音讯发送
无关 rocketmq 发送音讯的源码,倡议查看 org.apache.rocketmq.client.producer.DefaultMQProducer
,类中的属性大多对应于配置文件中的参数。
2.1. 三种音讯发送
这里只探讨一般的音讯发送形式,区别于程序、事务、提早 / 定时等音讯发送形式,以后分为三种:
- 同步(sync): 同步发送就是指 producer 发送音讯后,会同步期待,在接管到 broker 响应后果后才持续发下一条音讯。
- 异步(async): 异步发送是指 producer 收回一条音讯后,不须要期待 broker 响应,就接着发送下一条音讯的通信形式。异步发送同样能够对音讯的响应后果进行解决,须要在发送音讯时实现异步发送回调接口。
- 单方向(oneWay): 是一种单方向通信形式,也就是说 producer 只负责发送音讯,不期待 broker 发回响应后果,而且也没有回调函数触发,这也就意味着 producer 只发送申请不期待响应后果。
三种发送形式比照
发送形式 | 发送 TPS | 发送后果响应 | 可靠性 | 应用场景 |
---|---|---|---|---|
同步 | 个别 | 有 | 高 | 重要的告诉场景 |
异步 | 快 | 有 | 高 | 比拟重视 RT(响应工夫)的场景 |
单方向 | 最快 | 无 | 低 | 可靠性要求并不高的场景 |
async 异步执行的线程池配置
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 60000L, TimeUnit.MILLISECONDS, this.asyncSenderThreadPoolQueue, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);
public Thread newThread(Runnable r) {return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
三种发送音讯代码
private String convertDestination(String topic, String tag) {return StringUtils.isBlank(tag) ? topic : StringUtils.join(topic, ":", tag);
}
/**
* 同步发送
*/
public SendResult syncSend(String topic, String tag, String content) {String destination = this.convertDestination(topic, tag);
return rocketMQTemplate.syncSend(destination, content);
}
/**
* 异步发送
*/
public void asyncSend(String topic, String tag, String content, SendCallback sendCallback) {String destination = this.convertDestination(topic, tag);
rocketMQTemplate.asyncSend(destination, content, sendCallback);
}
/**
* 单向发送
*/
public void sendOneWay(String topic, String tag, String content) {String destination = this.convertDestination(topic, tag);
rocketMQTemplate.sendOneWay(destination, content);
}
2.2. 批量发送
批量音讯发送是将同一主题的多条音讯一起打包发送到音讯服务端,缩小网络调用次数,进步网络传输效率。
当然,并不是在同一批次中发送的音讯数量越多,性能就越好,判断根据是单条音讯的长度,如果单条音讯内容比拟长,则打包发送多条音讯会影响其余线程发送音讯的响应工夫,并且单批次音讯发送总长度不能超过 DefaultMQProducer#maxMessageSize,即配置文件中的 rocketmq.producer.max-message-size
。
代码
/**
* 同步 - 批量发送
*/
public SendResult syncBatchSend(String topic, String tag, List<String> contentList) {String destination = this.convertDestination(topic, tag);
List<Message<String>> messageList = contentList.stream()
.map(content -> MessageBuilder.withPayload(content).build())
.collect(Collectors.toList());
return rocketMQTemplate.syncSend(destination, messageList);
}
4. 标签 tag
rocketmq 中,topic 与 tag 都是业务上用来归类的标识,辨别在于 topic 是一级分类,而 tag 能够了解为是二级分类。定义上:
- topic: 音讯主题,通过 Topic 对不同的业务音讯进行分类。
- tag: 音讯标签,用来进一步辨别某个 Topic 下的音讯分类,音讯从生产者收回即带上的属性。
理论业务中,什么时候该用 topic 或 tag 呢?有以下几种倡议:
- 音讯类型是否统一: 如一般音讯、事务音讯、定时(延时)音讯、程序音讯,不同的音讯类型应用不同的 Topic,无奈通过 Tag 进行辨别。
- 业务是否相关联: 没有间接关联的音讯,如淘宝交易音讯,京东物流音讯应用不同的 Topic 进行辨别;而同样是天猫交易音讯,电器类订单、女装类订单、化妆品类订单的音讯能够用 Tag 进行辨别。
- 音讯优先级是否统一: 如同样是物流音讯,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则绝对会慢一些,不同优先级的音讯用不同的 Topic 进行辨别。
- 音讯量级是否相当: 有些业务音讯尽管量小然而实时性要求高,如果跟某些万亿量级的音讯应用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时须要将不同量级的音讯进行拆分,应用不同的 Topic。
举个理论我的项目的例子吧:我刚刚做的一个我的项目叫共享核心,所有须要共享的资源都来自于各个业务服务方。创立共享资源、撤回共享资源等这类指令,我将其定义为不同的 topic。而发送给 topic 的音讯体中有“资源类型”的字段,每个业务接入方其实只关怀对应本人资源类型的音讯,那么就将“资源类型”定义为 tag,各个业务方的生产端只监听本人所需的 tag 即可。
如果没有 tag 的机制,生产端就得接管所有音讯,反序列化后只解决本人对应“资源类型”的音讯。有了 tag 机制,音讯在进入生产端途中就主动进行过滤散发。
与 RabbitMQ AMQP 协定 比拟
在应用 tag 机制后,第一工夫让我想到了 RabbitMQ 的 AMQP 协定,很像交换机和队列的机制。当应用 tag 之后,像扇形交换机;当没应用 tag 之后,就像直连交换机。
- Exchange:音讯交换机,它指定音讯按什么规定,路由到哪个队列。
- Binding:绑定,它的作用就是把 Exchange 和 Queue 依照路由规定绑定起来。
- Queue:音讯队列载体,每个音讯都会被投入到一个或多个队列。
我想,二者设计的目标都是一样的,就是让生产者和消费者之间解耦,让一个音讯,能够自在地流转到不同的音讯端。RocketMQ 在这点,相较于 RabbitMQ 而言,提供的性能不够丰盛,但更实用、简洁。
示例代码:生产者
通过后面示例代码中,最简略的发送同步音讯代码来看,最新 starter
中封装的 RocketMQTemplate
类中,音讯发送的指标是 String destination
。而 它蕴含了 topic
和 tag
。即公有转换方法中的:destination = topic:tag
。
private final RocketMQTemplate rocketMQTemplate;
private String convertDestination(String topic, String tag) {return StringUtils.isBlank(tag) ? topic : StringUtils.join(topic, ":", tag);
}
/**
* 同步发送
*/
public SendResult syncSend(String topic, String tag, String content) {String destination = this.convertDestination(topic, tag);
return rocketMQTemplate.syncSend(destination, content);
}
示例代码:消费者
@RocketMQMessageListener(consumerGroup = ShareRocketMqConstants.GROUP_PREFIX + ShareRocketMqConstants.TOPIC_SHARE_RSRC_TO_BIZ_CALLBACK,
topic = ShareRocketMqConstants.TOPIC_SHARE_RSRC_TO_BIZ_CALLBACK,
selectorExpression = "2||3||4",
consumeThreadMax = 3)
public class ShareRsrcMqConsumer implements RocketMQListener<RsrcToBiz4Mq> {... ...}
上述生产端代码中,申明只生产 tag 值为:2、3、4 的音讯,注解中外围有两个属性:
selectorType:
默认值就是 SelectorType.TAG,所以示例代码中没有设置。selectorExpression:
对应的表达式。针对 SelectorType.TAG 类型的,就须要设置 tag 的表达式。默认值是*
,即所有 tag 都生产。如果想要指定生产多个 tag,则用||
或合乎来连贯。
留神: 生产者端,发消息只能指定一个 tag。但消费者端,接管音讯能够指定多个 tag。
5. 提早 / 定时音讯
定时音讯是指音讯发到 broker 后,不能立即被 consumer 生产,要到特定的工夫点或者期待特定的工夫后能力被生产。
原理
其实定时音讯实现原理比较简单,如果一个 topic 对应的音讯在发送端被设置为定时音讯,那么会将该音讯先寄存在 topic 为 SCHEDULE_TOPIC_XXXX
的音讯队列中,并将原始音讯的信息寄存在 commitLog 文件中,因为 topic 为 SCHEDULE_TOPIC_XXXX,所以该音讯不会被立刻音讯,而后通过定时扫描的形式,将达到延迟时间的音讯,转换为正确的音讯,发送到相应的队列进行生产。
提早级别
只管 rocketmq 反对定时音讯,然而以后开源版本的 rocketmq 所反对的定时工夫是无限的、不同级别的精度的工夫,并不是任意无限度的定时工夫。默认 Broker 服务器端有 18 个定时级别,每一个级别别离对应不同的延迟时间:
提早级别 | 延迟时间 |
---|---|
1 | 1s |
2 | 5s |
3 | 10s |
4 | 30s |
5 | 1m |
6 | 2m |
7 | 3m |
8 | 4m |
9 | 5m |
10 | 6m |
11 | 7m |
12 | 8m |
13 | 9m |
14 | 10m |
15 | 20m |
16 | 30m |
17 | 1h |
18 | 2h |
代码
发送提早音讯并没有非凡的办法,而是基于一般发消息的办法(如:rocketMQTemplate.syncSend)做了重载,减少了一个传入参数 int delayLevel
,默认值为 0,即立刻发送。
/**
* 同步提早发送
*
* @param delayLevel 延时等级:当初 RocketMq 并不反对任意工夫的延时,须要设置几个固定的延时等级,从 1s 到 2h 别离对应着等级 1 到 18
* 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
*/
public SendResult syncSendDelay(String topic, String tag, String content, long timeout, int delayLevel) {String destination = this.convertDestination(topic, tag);
Message message = MessageBuilder.withPayload(content).build();
return rocketMQTemplate.syncSend(destination, message, timeout, delayLevel);
}
6. 程序音讯
程序音讯是一种对音讯发送和生产程序有严格要求的音讯,对于一个指定的 Topic,音讯严格依照先进先出(FIFO)的准则进行音讯公布和生产,即先公布的音讯先生产,后公布的音讯后生产。
RocketMQ 目前只能保障同一个分区队列内的程序音讯,因而实现下列场景的形式有:
- 分区有序: RocketMQ 反对同一个队列分区内的程序音讯。另外某个 Topic 下,所有音讯依据 ShardingKey 进行分区,雷同 ShardingKey 的音讯必须被发送到同一个分区队列。因而只有保障音讯依照同一 ShardingKey 发送即可,而后保障 Consumer 同一个队列单线程生产即可。
- 全局有序: 当设置 Topic 下只有一个分区时,能够实现全局有序。
全局有序的性能太差,举荐应用分区有序。假如咱们要通过 mq 解决订单内的音讯。同一个 topic,通常咱们只须要保障同一个订单下的音讯程序公布和生产即可,不同订单下的音讯应该互不烦扰。因而能够采纳分区有序,将订单号转换为 ShardingKey,只有保障同一个订单下的音讯都流转到同一个队列下,而后程序生产。
最常见将订单号转换为 ShardingKey 的形式就是 hashKey。
生产者
/**
* 同步程序发送
*
* @param hashKey 依据 hashKey 和 队列 size() 取模,保障同一 hashKey 的音讯发往同一个队列,以实现 同一 hashKey 下的音讯 程序发送
* 因而 hashKey 倡议取 业务上惟一标识符,如:订单号,只需保障同一订单号下的音讯程序发送
*/
public SendResult syncSendOrderly(String topic, String tag, String content, String hashKey) {String destination = this.convertDestination(topic, tag);
Message message = MessageBuilder.withPayload(content).build();
return rocketMQTemplate.syncSendOrderly(destination, message, hashKey);
}
消费者
针对程序音讯的生产,代码也很容易,次要是 @RocketMQMessageListener
注解,通过设置了 consumeMode = ConsumeMode.ORDERLY
,示意应用程序生产。
ConsumeMode 有两种值:
- CONCURRENTLY:默认值,并发同时接管异步传递的音讯。
- ORDERLY:程序生产时开启,只开启一个线程,同一时间只有序接管一个队列的音讯。
@RocketMQMessageListener(topic = "xxx-topic",
consumerGroup = "xxxGroup",
consumeMode = ConsumeMode.ORDERLY)
public class OrderConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {... ...}
}
发问:如果针对程序音讯的消费者,同时启动了多个 spring 实例,会影响吗?
这个问题过后想了良久,为了保障音讯依照程序生产,消费者是单线程生产的。可理论线上程序都不会是单节点,如果有多个 spring 实例,不是也能够了解成“多线程”解决了吗?
首先,回顾几个知识点吧:
- 程序生产只能针对“集群模式”生产,即
messageModel = MessageModel.CLUSTERING
。 - 集群模式下,多个消费者如何对音讯队列进行负载呢?音讯队列负载机制遵循一个通用的思维: 一个音讯队列同一时间只容许被一个消费者生产,一个消费者能够生产多个音讯队列。
- 尽管消费者代码中,RocketMQ 监听器像是 mq 中的“推模式”。但实际上,RocketMQ 音讯推模式基于拉模式实现,在拉模式上包装一层,一个拉取工作实现后开始下一个拉取工作。
程序音讯生产时,在每次拉取工作时,都会像 broker 申请锁住该队列。因而,就算有多个消费者实例同时在运行,针对单个队列中的程序音讯,仍然是程序生产的。
7. 事务音讯
这里着重阐明一下,RocketMQ 的事务机制,和咱们通常说的通过 MQ 来实现最终一致性的分布式事务机制,不是一个事件。
RocketMQ 的事务机制,只体现在生产者,保障的是生产者本地的事务执行、发消息,这两个事务达成一致性。至于消费者收到音讯后的事务处理,并不在以后机制内。
失常事务的流程
失常事务的流程,遵循的是 2PC
的计划。
- 调用发送事务音讯办法,失常发送音讯。发送事务的办法名为
syncSendInTransaction
。 - mq 服务器端胜利接管到音讯后,音讯处于一个半接管的状态,并响应给生产者客户端。
- 生产者收到服务器端胜利接管的响应后,执行本地事务。本地事务写在
executeLocalTransaction
办法外面,返回后果为枚举RocketMQLocalTransactionState
,有:COMMIT、ROLLBACK、UNKNOWN
三种值。 - 服务器端收到
COMMIT
状态后,会把音讯下发给消费者 - 服务器端收到
ROLLBACK
状态后,会删除掉以后半接管状态的音讯,不再解决。 - 服务器端收到
UNKNOWN
状态,或者服务器端超时未收到音讯,或者生产者未响应状态,则将进行音讯弥补机制。
音讯弥补机制
这部分比较简单,针对上述事务流程第 6 点的几种状况,会触发音讯回查。
- 当事务音讯呈现
UNKNOWN
、超时、未响应时,服务器会被动调用生产者本地的回查办法checkLocalTransaction
,查问本地事务执行状况,返回后果还是枚举值RocketMQLocalTransactionState
。 - 服务器接管到返回后果的解决流程和后面的失常流程一样。
- 如果仍然是
UNKNOWN
、超时、未响应,将持续重试。如果超过最大重试次数后,仍然无果,则视为ROLLBACK
,删除以后音讯。
事务音讯相干的参数,根本在 org.apache.rocketmq.common.BrokerConfig
类中定义,例如以下几个罕用属性的默认值:
transactionTimeOut = 6000L
:服务器未收到事务本地音讯的超时工夫为 1 分钟。transactionCheckMax = 15
:音讯弥补机制中的最大回查次数为 15 次。transactionCheckInterval = 6000L
:音讯弥补机制中每次回查的工夫距离为 1 分钟。
因为是 BrokerConfig
类中的属性,因而如果不想用默认值,能够在 broker.conf
文件中自定义批改。
代码:生产者发送事务音讯
/**
* 事务发送
*/
public TransactionSendResult syncSendInTransaction(String topic, String tag, String content) {String destination = this.convertDestination(topic, tag);
String transactionId = UUID.randomUUID().toString();
Message message = MessageBuilder.withPayload(content)
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.build();
return rocketMQTemplate.sendMessageInTransaction(destination, message, content);
}
代码:生产者定义事务本地办法
在生产者客户端,通过事务监听器,实现 RocketMQLocalTransactionListener
接口的两个上述办法。
@RocketMQTransactionListener
public class LocalTransactionListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {System.out.println("executeLocalTransaction:"+ LocalDateTime.now());
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {System.out.println("checkLocalTransaction:"+ LocalDateTime.now());
return RocketMQLocalTransactionState.COMMIT;
}
}
发问:如果 spring 我的项目中有多个事务音讯生产者,怎么辨别不同的
RocketMQLocalTransactionListener
?
@RocketMQLocalTransactionListener
这个注解提供了属性,能够辨别不同的事务音讯生产者。在 stater 2.0.4 版本中,是提供 txProducerGroup 这个属性指向一个音讯发送者组,映射不同的事务音讯发送逻辑。但如同有 bug,在后续新的版本迭代中,去掉了这个属性。
到了 2.1.1 版本,只能通过指定 rocketMQTemplateBeanName
来实现,即不同的事务音讯发送时,就得定义不同的 RocketMQTemplate。挺麻烦的,期待这个性能在后续的迭代中欠缺好。
8. 重试队列、死信队列
RocketMQ 很多应多异样的顾全机制,例如音讯重发的机制,这里能够分两类:
- 生产者重发: 在后面介绍三种发送音讯形式时,针对同步、异步发送失败时,都会再重发,相应重发次数别离对应
DefaultMQProducer
类中属性值retryTimesWhenSendFailed
、retryTimesWhenSendAsyncFailed
,也能够在 properties 配置文件中自定义设置。 - 消费者重发: 当音讯曾经进入 broker 后,消费者接管失败,broker 也会给消费者重发,以下衍生出本次的重试队列、死信队列。
重试队列
如果消费者端因为各种类型异样导致本次生产失败,为避免该音讯失落而须要将其从新回发给 broker 端保留,保留这种因为异样无奈失常生产而回发给 mq 的音讯队列称之为重试队列。
RocketMQ 会为每个生产组都设置一个 topic 名称为 “%RETRY%+consumerGroup”
的重试队列(这里须要留神的是,这个 Topic 的重试队列是针对生产组,而不是针对每个 Topic 设置的)。
用于临时保留因为各种异样而导致消费者端无奈生产的音讯。思考到异样复原起来须要一些工夫,会为重试队列设置多个重试级别,每个重试级别都有与之对应的从新投递延时,重试次数越多投递延时就越大。RocketMQ 对于重试音讯的解决是先保留至 topic 名称为 “SCHEDULE_TOPIC_XXXX”
的提早队列中,后盾定时工作依照对应的工夫进行 Delay 后从新保留至“%RETRY%+consumerGroup”的重试队列中。
死信队列
因为有些起因导致消费者端长时间的无奈失常生产从 broker 端 pull 过去的业务音讯,为了确保音讯不会被无端的抛弃,那么超过配置的“最大重试生产次数”后就会移入到这个死信队列中。
在 RocketMQ 中,SubscriptionGroupConfig 配置常量默认地设置了两个参数,一个是 retryQueueNums 为 1(重试队列数量为 1 个),另外一个是 retryMaxTimes 为 16(最大重试生产的次数为 16 次)。Broker 端通过校验判断,如果超过了最大重试生产次数则会将音讯移至这里所说的死信队列。这里,RocketMQ 会为每个生产组都设置一个 topic 命名为 “%DLQ%+consumerGroup"
的死信队列。但如果一个消费者组未产生死信音讯,音讯队列 RocketMQ 不会为其创立相应的死信队列的。
因为死信队列中的音讯是无奈被生产的,它也证实了一部分音讯呈现了意料之外的状况。因而个别在理论利用中,移入至死信队列的音讯,须要人工干预解决。例如通过 console 查看是否有私信队列,当解决问题后,可在 console 上手动重发消息。