共计 4549 个字符,预计需要花费 12 分钟才能阅读完成。
1、namrsrv 与 broker
1.1、namesrv 与 broker 架构
从架构图来看,namesrv 充当的角色是 注册中心。只不过有点特殊的是,namesrv 之间互不通信。
1.2、namesrv 与 broker 的通信
broker 每隔 30s 会向 namesrv 注册自身的信息。namesrv 每隔 10s 检查 120s 内 无响应的 broker, 并进行剔除。
那么 namesrv 与 broker 的通信模型,会出现一个问题。那就是,namesrv 至少需要 120s 左右才会感知到 broker 死亡。
1.3、rocketmq 的消息模型
在 rocketMQ 消息模型中,有以下几个主要角色:生产者、消费者、队列。之间的沟通方式为如下。
图主要表达几个信息:
1、topic 实际上是逻辑结构,queue 才是 物理结构,也就是 rocketmq 是基于 队列 进行消费的
2、消费者是以组为单位进行消费的。
3、在图中,我特意让 groupA 只订阅 topicA,而没有订阅 topicB。这个原因是,rocketMQ 在被设计时,就不希望一个消费者同时处理多个类型的消息。因此同一个 consumerGroup 下的 consumer 职责应该是一样的,不要干不同的事情(即消费多个 topic)。
2、消息发送者
2.1、消息发送的三种方式
- 同步发送
同步发送时,需要等待 broker 将消息存入 commitlog 文件后,才会返回,生产者线程阻塞。 - 异步发送
异步发送时,是使用线程池提交任务的。核心线程、最大线程数量 =cpu 核数。见 DefaultMQProducerImpl。发送消息,不需要等到服务器返回结果。 - 单向发送
只管发,不管成功与否
2.2、消息发送流程
大体流程:验证消息 => 找到 broker => 选择队列 => 消息发送。
2.3、broker 的选择
会先从本地缓存获取 broker 的信息,如果不存在,则向 namesrv 获取 broker 信息。
在选择 broker 时,会避开上一次发送失败的 broker。
故障延迟机制
DefaultMQProducer#setSendLatencyFaultEnable
在消息发送失败后,如果启用 故障延迟机制,那么会在一定的时间内,将该 broker 设置为不可用。并且在选择队列的时候,跳过该 broker。
2.4、消息队列的选择
代码入口:TopicPublishInfo#selectOneMessageQueue。
轮询选择队列。(每个消费者,内部维护了一个计数器,每次选中完后,该计数器的值就会 + 1。算法则为,该值 % 队列数量。)
2.5、消息发送
消息发送,就是将消息发送给 broker,如果消息发送失败,会根据配置的重试次数,进行重试。
2.6、批量发送
所谓的批量发送,不过是将消息变成 List,然后一次性传输。
3、消息存储
3.1、消息被发送出去了,broker 端接收到消息后,如何做处理呢?如何存储消息?
在这之前,需要先说下 rocketMQ 3 个重要的存储文件
3.1.1、commitlog
commitlog 真正存储消息的文件。
rocketmq 为了加速存储,采用顺序写。所有的消息,不分 topic 均被存储在 commitlog 里面。也因此,如果想删掉某个 topic 的消息,几乎做不到。你只能把 rocketmq 的 commitlog 文件直接删掉,或者启用更多的消费者,把消息消费掉。
关于 commitlog,还有几点需要知道的。
- 在磁盘使用率,超过 0.9(默认), 那么会禁止继续写入
-
为了防止 commitlog 文件过多,占用存储空间过大问题。rocketMQ 会清理文件。
- rocketMQ 每隔 10s(默认)扫描过期文件,并在每日的凌晨 4 点(默认)定期清理过期。
这里有一个概念,什么是过期的 commitlog?
过期的概念是指:当前时间 – commitlog 的最后一次修改时间 >= 3d(默认)则成为过期文件。这样就会出现一个问题,过期的消息会自动被清除。如果你打算用 rocketMQ 实现过久的延迟消息,那么这是不建议的。 - 如果磁盘的使用率,超过 0.85(默认)那么也会执行删除过期文件的操作
- rocketMQ 每隔 10s(默认)扫描过期文件,并在每日的凌晨 4 点(默认)定期清理过期。
我们知道,所有 topic 的消息全都被放在 commit 文件中,这样虽然写入的性能非常高(顺序写入),但是读的时候,效率就很低了(随机读)。那么 rocketMQ 如何是如何解决这样的问题?那么这个时候 consumequeue 就登场了。
3.1.2、consumequeue
所有的消息都存储在 commitlog 上,那么要检索消息时,效率就非常慢。consumequeue 解决了该问题,可以认为该文件是 commitlog 的索引文件。
那么 consumequeue 是如何加速消息检索的?
大致的流程如下,找到 MappedFile,根据 MappedFile 中的 offset 获取 对应的 consumequeue。再根据 consumequeue 中的 commitlog offset 到 commitlog 中查找
3.1.3、index
index 文件是为了提高 commitlog、consumequeue 的检索速度而生。
index 文件的写入过程如下:
先将 index 放入 index 条目中,然后再以 key 的 hashcode 为 k,index 下标为 v,放入 hash 槽中,最后更新 IndexHead 信息。
查找的时候,则 先确定要查找的 index 文件,然后到 hash 槽中,获取 index 的位置,接着到 index 条目中获取对应的信息。
说完了 3 个存储文件,对 broker 如何存储消息的,也有个大概的了解,那么 broker 消息存储的时候,主要的工作,就是将消息放到这 3 个文件中。
先存储 commitlog。存储完毕后,转发给专门存储 consumequeue 和 index 的线程
4、消息消费
4.1、当 broker 接收到消息后,消费者端是立马能够拉取到消息?还是说要等到 broker 端把消息存储后,才能拉取到消息?
从 rocketMQ 的消息模型来看,消费者 是从 队列中拉取消息的。也就是,需要等到 rocketMQ 将消息刷写到 consumequeue 中,才可以拉取到消息。
4.2、这里产生了一个问题,rocketMQ 什么时候会将消息刷入 consumequeue,如果太慢,是不是会造成消费者无法及时拉取到消息?
每隔 10 毫秒刷新一次。刷新 commitlog 后,会将请求转发至 comsumequeue,indexFile。最后将 consumequeue,indexFile 刷入磁盘。消费者端则是近乎实时的从 broker 端拉取消息。
4.3、消息要消费,那么肯定要先拉取消息那么拉取消息的时候,涉及到一个问题。rocketMQ 如何知道要拉取哪个队列的消息?
rocketMQ 会替每个消费者初始化 拉取的队列。默认算法是 平均分配算法。例如,现在有 8 个 c1,c2,c3,c4,c5,c6,c7,c8。假设有 3 个 a1,a2,a3。那么每个消费者分配的队列如下。
a1:c1,c2,c3
a2:c4,c5,c6
a3:c7,c8
另外一个比较推荐的算法则是 轮询算法。
a1:c1,c4,c7
a2:c2,c5,c8
a3:c3,c6
每隔 20s(默认)会执行一次队列的负载均衡,有新的消费者进来时,也会执行队列的负载均衡。
消费者,知道了从哪个队列拉取消息后,接下来就是拉取消息了。
rocketMQ 拉取消息时,会一次性拉取 32 条消息。放到 ProcessQueue 中,然后再提交给 线程池 处理。
4.4、如果消费者端拉取了太多的消息,消费速度又跟不上,消费者端会无尽的拉取消息吗?导致消息堆积得更加厉害?
消费者端不会无尽的拉取消息,rocketMQ 在消息拉取的时候,有流控功能,每次流控,都会延迟 50ms 再继续拉取消息。
那么,什么情况下,会触发流控?
- processQueue 的消息数量 大于 1000,processQueue 的消息大小 大于 100 MB,将延迟 50 毫秒后拉取消息
- processQueue 中偏移量最大的消息与偏移量最小的消息的跨度超过 2000 则延迟 50 毫秒再拉取消息
rocketMQ 的消息堆积,不仅仅在 broker 端会堆积,在消费者端也会发生堆积。而且,消费者端一旦发生大量的堆积,必定造成 gc 频繁,cpu 飙高。
消费者端的堆积是指,消费者消费速度太慢,但是拉取了很多消息放在本地。
4.5、紧接着,如果我们消费失败的时候,rocketMQ 会帮我们做什么处理?难道就一直失败下去?
消费者,只要消费了这条消息,不管失败或成功,rocketMQ 都会认为消息已经被消费了。如果消息消费失败,rocketMQ 会重新 发送一条消息内容跟该消息失败的消息 到 重试主题 %RETRY% + topic。并且延迟等级 + 1。
这里涉及到 2 个问题.
- 如果这条消息一直消费失败呢?rocketMQ 又会如何处理?
rocketMQ 最大重试 16 次(默认),当重试完毕后,还是消费失败,那么 rocketMQ 会将该消息放入 DLQ 队列,也就是我们说的死信队列。 - 延迟等级从哪里来?
rocketMQ 的延迟等级需要配置在 broker.conf 中。默认的延迟等级如下。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
4.6、大体的消息消费流程,已经说清楚。在开发过程中,可能碰到这样的问题。如果中途加进来一个消费者,那么该消费者,会从哪个点开始消费,我们有没有办法指定该消费者从哪个位置开始消费呢?
可以调用 DefaultMQPushConsumer#setConsumeFromWhere 设置
- ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET:从队列当前最大偏移量开始消费(默认)
- ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET 从最早可用的消息开始消费
- ConsumeFromWhere.CONSUME_FROM_TIMESTAMP 从指定的时间戳开始消费
但是这个是有限制的,即 ConsumeFromWhere 消费进度校验只有在从磁盘中获取的消费进度返回 -1 时才失效。即刚创建的 消费组,如果 broker 中已经有记录该消费组的消费进度,那么该值的设置是无效的。
5、消息过滤
消息过滤支持 表达式过滤类过滤。表达式过滤分为 TAG 和 SQL92。这里主要说常用的 TAG 表达式。
一条消息可以设置多个 TAG,
5.1、这里产生了一个疑问,关于 TAG 表达式是在 broker 端过滤还是 消费者端进行过滤的呢?
之所以产生这个疑问,是因为,如果在 broker 端过滤,那么 broker 的压力就会很大,如果在消费者端过滤,消费者端就会接收到很多无用的消息。rocketMQ 是如何权衡?
rocketMQ 的做法比较有意思。是先在 broker 端 利用 tag 的 hashcode 进行一次快速的过滤,消费者拉取的时候,再根据实际 tag 的值进行过滤。
5.2、同样有个问题,如果 2 个 消费者,在同一个组内,订阅同一个 topic,但是 tag 不一样。这个时候会正常工作吗?
不会正常工作。
理由:这是 2 个一样的消费者,他们组成了集群,虽然 tag 不一样。那么 在集群模式下(默认),就绝对只有一个 消费者 会生效。因此,正确的做法是,定义不同的消费组,订阅同一个 topic。再用 tag 进行区分