rocketMQ461-系列教程-提炼篇

36次阅读

共计 4549 个字符,预计需要花费 12 分钟才能阅读完成。

1、namrsrv 与 broker

1.1、namesrv 与 broker 架构

从架构图来看,namesrv 充当的角色是 注册中心。只不过有点特殊的是,namesrv 之间互不通信。

1.2、namesrv 与 broker 的通信

broker 每隔 30s 会向 namesrv 注册自身的信息。namesrv 每隔 10s 检查 120s 内 无响应的 broker, 并进行剔除。

那么 namesrv 与 broker 的通信模型,会出现一个问题。那就是,namesrv 至少需要 120s 左右才会感知到 broker 死亡。

1.3、rocketmq 的消息模型

在 rocketMQ 消息模型中,有以下几个主要角色:生产者、消费者、队列。之间的沟通方式为如下。

图主要表达几个信息:
1、topic 实际上是逻辑结构,queue 才是 物理结构,也就是 rocketmq 是基于 队列 进行消费的
2、消费者是以组为单位进行消费的。
3、在图中,我特意让 groupA 只订阅 topicA,而没有订阅 topicB。这个原因是,rocketMQ 在被设计时,就不希望一个消费者同时处理多个类型的消息。因此同一个 consumerGroup 下的 consumer 职责应该是一样的,不要干不同的事情(即消费多个 topic)。

2、消息发送者

2.1、消息发送的三种方式

  • 同步发送
    同步发送时,需要等待 broker 将消息存入 commitlog 文件后,才会返回,生产者线程阻塞。
  • 异步发送
    异步发送时,是使用线程池提交任务的。核心线程、最大线程数量 =cpu 核数。见 DefaultMQProducerImpl。发送消息,不需要等到服务器返回结果。
  • 单向发送
    只管发,不管成功与否

2.2、消息发送流程

大体流程:验证消息 => 找到 broker => 选择队列 => 消息发送。

2.3、broker 的选择

会先从本地缓存获取 broker 的信息,如果不存在,则向 namesrv 获取 broker 信息。
在选择 broker 时,会避开上一次发送失败的 broker。

故障延迟机制

DefaultMQProducer#setSendLatencyFaultEnable

在消息发送失败后,如果启用 故障延迟机制,那么会在一定的时间内,将该 broker 设置为不可用。并且在选择队列的时候,跳过该 broker。

2.4、消息队列的选择

代码入口:TopicPublishInfo#selectOneMessageQueue。

轮询选择队列。(每个消费者,内部维护了一个计数器,每次选中完后,该计数器的值就会 + 1。算法则为,该值 % 队列数量。)

2.5、消息发送

消息发送,就是将消息发送给 broker,如果消息发送失败,会根据配置的重试次数,进行重试。

2.6、批量发送

所谓的批量发送,不过是将消息变成 List,然后一次性传输。

3、消息存储

3.1、消息被发送出去了,broker 端接收到消息后,如何做处理呢?如何存储消息?

在这之前,需要先说下 rocketMQ 3 个重要的存储文件

3.1.1、commitlog

commitlog 真正存储消息的文件。
rocketmq 为了加速存储,采用顺序写。所有的消息,不分 topic 均被存储在 commitlog 里面。也因此,如果想删掉某个 topic 的消息,几乎做不到。你只能把 rocketmq 的 commitlog 文件直接删掉,或者启用更多的消费者,把消息消费掉。

关于 commitlog,还有几点需要知道的。

  1. 在磁盘使用率,超过 0.9(默认), 那么会禁止继续写入
  2. 为了防止 commitlog 文件过多,占用存储空间过大问题。rocketMQ 会清理文件。

    1. rocketMQ 每隔 10s(默认)扫描过期文件,并在每日的凌晨 4 点(默认)定期清理过期。
      这里有一个概念,什么是过期的 commitlog?
      过期的概念是指:当前时间 – commitlog 的最后一次修改时间 >= 3d(默认)则成为过期文件。这样就会出现一个问题,过期的消息会自动被清除。如果你打算用 rocketMQ 实现过久的延迟消息,那么这是不建议的。
    2. 如果磁盘的使用率,超过 0.85(默认)那么也会执行删除过期文件的操作

我们知道,所有 topic 的消息全都被放在 commit 文件中,这样虽然写入的性能非常高(顺序写入),但是读的时候,效率就很低了(随机读)。那么 rocketMQ 如何是如何解决这样的问题?那么这个时候 consumequeue 就登场了。

3.1.2、consumequeue

所有的消息都存储在 commitlog 上,那么要检索消息时,效率就非常慢。consumequeue 解决了该问题,可以认为该文件是 commitlog 的索引文件。

那么 consumequeue 是如何加速消息检索的?

大致的流程如下,找到 MappedFile,根据 MappedFile 中的 offset 获取 对应的 consumequeue。再根据 consumequeue 中的 commitlog offset 到 commitlog 中查找

3.1.3、index

index 文件是为了提高 commitlog、consumequeue 的检索速度而生。

index 文件的写入过程如下:
先将 index 放入 index 条目中,然后再以 key 的 hashcode 为 k,index 下标为 v,放入 hash 槽中,最后更新 IndexHead 信息。

查找的时候,则 先确定要查找的 index 文件,然后到 hash 槽中,获取 index 的位置,接着到 index 条目中获取对应的信息。

说完了 3 个存储文件,对 broker 如何存储消息的,也有个大概的了解,那么 broker 消息存储的时候,主要的工作,就是将消息放到这 3 个文件中。

先存储 commitlog。存储完毕后,转发给专门存储 consumequeue 和 index 的线程

4、消息消费

4.1、当 broker 接收到消息后,消费者端是立马能够拉取到消息?还是说要等到 broker 端把消息存储后,才能拉取到消息?

从 rocketMQ 的消息模型来看,消费者 是从 队列中拉取消息的。也就是,需要等到 rocketMQ 将消息刷写到 consumequeue 中,才可以拉取到消息。

4.2、这里产生了一个问题,rocketMQ 什么时候会将消息刷入 consumequeue,如果太慢,是不是会造成消费者无法及时拉取到消息?

每隔 10 毫秒刷新一次。刷新 commitlog 后,会将请求转发至 comsumequeue,indexFile。最后将 consumequeue,indexFile 刷入磁盘。消费者端则是近乎实时的从 broker 端拉取消息。

4.3、消息要消费,那么肯定要先拉取消息那么拉取消息的时候,涉及到一个问题。rocketMQ 如何知道要拉取哪个队列的消息?

rocketMQ 会替每个消费者初始化 拉取的队列。默认算法是 平均分配算法。例如,现在有 8 个 c1,c2,c3,c4,c5,c6,c7,c8。假设有 3 个 a1,a2,a3。那么每个消费者分配的队列如下。

a1:c1,c2,c3
a2:c4,c5,c6
a3:c7,c8

另外一个比较推荐的算法则是 轮询算法。
a1:c1,c4,c7
a2:c2,c5,c8
a3:c3,c6

每隔 20s(默认)会执行一次队列的负载均衡,有新的消费者进来时,也会执行队列的负载均衡。

消费者,知道了从哪个队列拉取消息后,接下来就是拉取消息了。

rocketMQ 拉取消息时,会一次性拉取 32 条消息。放到 ProcessQueue 中,然后再提交给 线程池 处理。

4.4、如果消费者端拉取了太多的消息,消费速度又跟不上,消费者端会无尽的拉取消息吗?导致消息堆积得更加厉害?

消费者端不会无尽的拉取消息,rocketMQ 在消息拉取的时候,有流控功能,每次流控,都会延迟 50ms 再继续拉取消息。

那么,什么情况下,会触发流控?
  1. processQueue 的消息数量 大于 1000,processQueue 的消息大小 大于 100 MB,将延迟 50 毫秒后拉取消息
  2. processQueue 中偏移量最大的消息与偏移量最小的消息的跨度超过 2000 则延迟 50 毫秒再拉取消息

rocketMQ 的消息堆积,不仅仅在 broker 端会堆积,在消费者端也会发生堆积。而且,消费者端一旦发生大量的堆积,必定造成 gc 频繁,cpu 飙高。
消费者端的堆积是指,消费者消费速度太慢,但是拉取了很多消息放在本地。

4.5、紧接着,如果我们消费失败的时候,rocketMQ 会帮我们做什么处理?难道就一直失败下去?

消费者,只要消费了这条消息,不管失败或成功,rocketMQ 都会认为消息已经被消费了。如果消息消费失败,rocketMQ 会重新 发送一条消息内容跟该消息失败的消息 到 重试主题 %RETRY% + topic。并且延迟等级 + 1。

这里涉及到 2 个问题.

  1. 如果这条消息一直消费失败呢?rocketMQ 又会如何处理?
    rocketMQ 最大重试 16 次(默认),当重试完毕后,还是消费失败,那么 rocketMQ 会将该消息放入 DLQ 队列,也就是我们说的死信队列。
  2. 延迟等级从哪里来?
    rocketMQ 的延迟等级需要配置在 broker.conf 中。默认的延迟等级如下。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

4.6、大体的消息消费流程,已经说清楚。在开发过程中,可能碰到这样的问题。如果中途加进来一个消费者,那么该消费者,会从哪个点开始消费,我们有没有办法指定该消费者从哪个位置开始消费呢?

可以调用 DefaultMQPushConsumer#setConsumeFromWhere 设置

  • ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET:从队列当前最大偏移量开始消费(默认)
  • ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET 从最早可用的消息开始消费
  • ConsumeFromWhere.CONSUME_FROM_TIMESTAMP 从指定的时间戳开始消费

但是这个是有限制的,即 ConsumeFromWhere 消费进度校验只有在从磁盘中获取的消费进度返回 -1 时才失效。即刚创建的 消费组,如果 broker 中已经有记录该消费组的消费进度,那么该值的设置是无效的。

5、消息过滤

消息过滤支持 表达式过滤类过滤。表达式过滤分为 TAG 和 SQL92。这里主要说常用的 TAG 表达式。
一条消息可以设置多个 TAG,

5.1、这里产生了一个疑问,关于 TAG 表达式是在 broker 端过滤还是 消费者端进行过滤的呢?

之所以产生这个疑问,是因为,如果在 broker 端过滤,那么 broker 的压力就会很大,如果在消费者端过滤,消费者端就会接收到很多无用的消息。rocketMQ 是如何权衡?

rocketMQ 的做法比较有意思。是先在 broker 端 利用 tag 的 hashcode 进行一次快速的过滤,消费者拉取的时候,再根据实际 tag 的值进行过滤。

5.2、同样有个问题,如果 2 个 消费者,在同一个组内,订阅同一个 topic,但是 tag 不一样。这个时候会正常工作吗?

不会正常工作。

理由:这是 2 个一样的消费者,他们组成了集群,虽然 tag 不一样。那么 在集群模式下(默认),就绝对只有一个 消费者 会生效。因此,正确的做法是,定义不同的消费组,订阅同一个 topic。再用 tag 进行区分

正文完
 0