RocketMQ零术语消费模式应用场景

45次阅读

共计 3972 个字符,预计需要花费 10 分钟才能阅读完成。

1. 什么是 RocketMQ

RocketMQ是一个低延迟、高并发、高可用、高可靠的分布式消息中间件。

RocketMQ 的架构图可以看到,它是由 NameServer, Broker, Producer, Consumer 四种角色组成的,每一种角色都可以进行水平扩展而不会出现单点故障,所以它天然的支持分布式。

2. RocketMQ 名词介绍

在刚开始使用 RocketMQ 时相信大家都会跟着官网的 Quick Start 等案例来实现一个 demo,我也一样。但是完事之后,感觉好像跟之前没有什么区别,根本不知道demo 里面每个步骤中启动的是个什么玩意儿。
所以在这里先记录一下 RocketMQ 各个角色及名词的介绍,在知道这个之后,或许会对 RocketMQ 的架构和执行流程有了更好的理解。

2.1 角色名词

先来看一个购物的例子:假设我在京东商城买了一个东西,仓库会把相应的商品打包好,交给京东物流,次日京东物流会指派快递点的某个快递员把商品交到我手上。

在这个过程中有这样几个角色:京东物流、快递点、寄件人仓库、收件人我,这些角色就分别代表了NameServer, Broker, Producer, Consumer

  • NameServer: 在上述例子中就是京东物流,京东物流负责的工作是管理所有的快递点,可以把它看成是一个指挥部,它知道所有快递点的信息,它会把商品分发给适合的快递点,让快递点负责交付货物。
  • Broker: 作为快递点的Broker,负责的就是商品的暂存和传输,即消息暂存和消息传输,同时它还提供消息查询功能。
  • Producer: 消息生产者,即寄件人仓库。
  • Consumer: 消息消费者,即收件人我。

2.2 其他术语

RocketMQ 官网的 Simple Example 案例中发送消息时会指定消息的Topic, Tag,像下面的代码那样。

//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
    "TagA" /* Tag */,
    ("Hello RocketMQ" +i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);

而在发送消息之前,创建生产者时需要指定group name

 // Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
         

2.2.1 Topic, Tag

刚开始用 RocketMQ 时我根本不知道所谓的 Topic, Tag 代表了什么意思,后来才渐渐明白,所以我想如果我在最开始就先去了解这些术语的意思,或许会对 RocketMQ 的入门有很大的帮助。

首先来想象一个电商网站的页面,它的主页一定有各种类别的商品,比如衣服、裤子、鞋子等等。而每种类别的商品下面还有子类别,假设是品牌的分类。在这个画面中,各种类别的商品就代表Topic,而商品的子类别品牌就代表Tag

  • Topic: 主题,代表消息的类别,可以理解为消息类型的第一级划分。
  • Tag: 子主题,也属于消息的类别,但是它是 Topic 下的类别,可以理解为消息类型的第二级划分。

2.2.2 GroupName

所谓的 GroupName 是用来将相同角色(相同组)的生产者和消费者组合在一起的。

在同一个 GroupName 中的消费者会消费该组中生产者发送的消息。

3. RocketMQ 消费消息的方式

Demo 程序中创建消费者时,官方给出的是使用 DefaultMQPushConsumer 这个类来获取消息,其实在 RocketMQ 中还有一个用于获取消息的类——DefaultLitePullConsumer

RocketMQ 4.6.0 版本开始 DefaultMQPullConsumer 类已被标注为弃用,使用 DefaultLitePullConsumer 类替代其用于主动拉取消息的场景。

可以很容易看出,这两者一个用于 Push 场景,一个用于 Pull 场景。前者是服务端主动推送消息给客户端,后者则是客户端需要到服务端拉取数据。

3.1 Pull 模式

先来说说 Pull 模式,客户端循环地从服务端拉取消息。客户端可以设定适合的“拉取消息等待时间”,等到自己处理完消息之后再拉取新的消息,能够有效防止消息堆积的情况出现。

但是这也是 Pull 模式的缺陷,即拉取消息的时间间隔较难以界定。如果时间间隔过长,这一批消息都处理完了,时间间隔还没到,必须要等到时间到了之后再拉取新消息,会造成资源的浪费;而如果时间间隔过短,消息未处理完就拉取新消息,容易造成消息堆积。

3.2 Push 模式

Push模式是由服务端在接收到消息后,主动推送到客户端,所以这种方式的实时性较高。但是如果服务端源源不断地推送消息而客户端消费能力不足,就会产生消息堆积的问题。

事实上,Push模式的实现也是一种客户端主动拉取,即“长轮询”。

// PullRequestHoldService#run
while (!this.isStopped()) {
    try {if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {this.waitForRunning(5 * 1000);
        } else {this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
        }

        long beginLockTimestamp = this.systemClock.now();
        this.checkHoldRequest();
        long costTime = this.systemClock.now() - beginLockTimestamp;
        if (costTime > 5 * 1000) {log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
        }
    } catch (Throwable e) {log.warn(this.getServiceName() + "service has exception.", e);
    }
}

在服务端接收到消息后,队列并不会直接返回,而是通过上面这个循环不断查看其状态,每次等待一段时间(默认 5 秒),然后调用 PullRequestHoldService#checkHoldRequest 方法来检查当前 Broker 的状态。当服务端一直没有新消息,且进行到第三次检查的时候,且超过了用户最大挂起时间 brokerSuspendMaxTimeMillis 时,才会返回空队列。

而如果在检查的过程中 Broker 接收到了新消息,就会通过调用 PullRequestHoldService#notifyMessageArriving 方法发送一个消息已到达的通知。

private void checkHoldRequest() {for (String key : this.pullRequestTable.keySet()) {String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
        if (2 == kArray.length) {String topic = kArray[0];
            int queueId = Integer.parseInt(kArray[1]);
            final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
            try {this.notifyMessageArriving(topic, queueId, offset);
            } catch (Throwable e) {log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
            }
        }
    }
}

4. 消息队列的应用场景

下面再来说说消息队列 MQ(Message Queue) 的主要应用场景。

4.1 异步处理

当一个流程十分臃肿,而其中决定性的节点比较少的时候,就可以考虑将非决定性的节点改为使用消息队列发送异步消息实现,这样可以提高整个流程的效率。除此之外,还能将优先的服务器资源用来处理更多的决定性节点业务。

4.2 流量削峰

不知道大家有没有经历过这样的场景:大学时选课,经常会出现进不去选课页面的情况。

这就是流量超过了程序上限,最终导致整个系统都不可用。而消息队列就可以很大程度地防止出现这样的情况,它可以降低并发的请求,把流量控制在服务器能够接受的范围。

虽然整个系统瘫痪的情况基本是不会出现了,但是选课响应慢,还是会发生 …

4.3 服务解耦

在一个微服务化的程序中,主业务可能会有很多个下游业务,而每个下游业务需要的参数可能是不同的,并且下游业务可能会经常更新,这样就存在了服务间耦合性过于紧密的情况。

如果在此时引入消息队列,主业务只在一个队列中发送一个完整的数据,而其他下游业务通过订阅这个主题来完成自己的业务,这样无论下游业务做怎样的修改,参数如何变化,只要队列中数据时完整的,就不需要修改主业务。就实现了服务间的解耦。

5. 小结

本文记录了我在学习 RocketMQ 前不明白、不清楚的一些点,比如各个角色和术语的概念,消费消息的两种消费模式以及消息队列的应用场景。

不过在多写几个 demo 之后,对于这些概念的理解也就越来越深刻,这里只是做个记录,权当课前预习了。

正文完
 0