关于阿里云:RocketMQ-的消费者类型详解与最佳实践

1次阅读

共计 6848 个字符,预计需要花费 18 分钟才能阅读完成。

作者:凌楚

在 RocketMQ 5.0 中,更加强调了客户端类型的概念,尤其是消费者类型。为了满足多样的 RocketMQ 中一共有三种不同的消费者类型,别离是 PushConsumer、SimpleConsumer 和 PullConsumer。不同的消费者类型对应着不同的业务场景。

消费者类型概览

本篇文章也会依据不同的消费者类型来进行讲述。在介绍不同的音讯类型之前,先明确一下不同 RocketMQ 消费者中的一个通用工作流程:在消费者中,达到客户端的音讯都是由客户端被动向服务端申请并挂起长轮询取得的。为了保障音讯达到的及时性,客户端须要一直地向服务端发动申请(申请是否须要由客户端被动发动则与具体的客户端类型无关),而新的符合条件的音讯一旦达到服务端,就会客户端申请走。最终依据客户端解决的后果不同,服务端对音讯的处理结果进行记录。

另外 PushConsumerSimpleConsumer 中还会有一个 ConsumerGroup 的概念,ConsumerGroup 相当于是一组雷同订阅关系的消费者的独特身份标识。而服务端也会依据 ConsumerGroup 来记录对应的生产进度。同一个 ConsumerGroup 下的音讯消费者将独特生产合乎以后订阅组要求的所有音讯,而不是独立进行生产。相比拟于 PullConsumerPushConsumerSimpleConsumer 更加实用于业务集成的场景,由服务端来托管生产状态和进度,相对来说更加的轻量与简略。

简略来说:

  • PushConsumer: 全托管的消费者类型,用户只须要注册音讯监听器即可,合乎对应订阅关系的音讯就会调用对应的生产办法,是与业务集成最为广泛的消费者类型。
  • SimpleConsumer: 解耦音讯生产与进度同步的消费者类型,用户自主承受来自服务端的音讯,并对单条音讯进行音讯确认。和 PushConsumer 一样也由服务端托管生产进度,实用于用户须要自主管制生产速率的业务场景。
  • PullConsumer: 应用流解决框架进行治理的消费者类型,用户依照队列(Topic 的最小逻辑组成单位)来进行音讯的接管并能够抉择主动或者手动提交生产位点。

PushConsumer

PushConsumer 是 RocketMQ 目前应用最为宽泛的消费者。用户只须要确认好订阅关系之后,注册绝对应的 Listener 即可。合乎对应订阅关系的音讯在由 Producer 收回后,消费者的 Listener 接口也会被即时调用,那么此时用户须要在 Listener 中去实现对应的业务逻辑。

应用简介

以下是 Push 消费者的应用示例:

PushConsumer pushConsumer = provider.newPushConsumerBuilder()
 .setClientConfiguration(clientConfiguration)
    // set the consumer group name.
    .setConsumerGroup(consumerGroup)
    // set the subscription for the consumer.
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    .setMessageListener(messageView -> {
        // handle the received message and return consume result.
        LOGGER.info("consume message={}", messageView);
        return ConsumeResult.SUCCESS;
    })
    .build();
// block the main thread, no need for production environment.
Thread.sleep(Long.MAX_VALUE);
// close the push consumer when you don't need it anymore.
pushConsumer.close();

用户须要依据本人业务处理结果的不同来返回 ConsumeResult.SUCCESS 或者 ConsumeResult.FAILURE。当用户返回 ConsumeResult.SUCCESS 时,音讯则被视为生产胜利;当用户返回 ConsumeResult.FAILURE 时,则服务端视为生产失败,会进行该条音讯的退却重试,音讯的退却重试是指,在音讯被生产胜利之前,以后音讯会被屡次投递到用户注册的 MessageListener 中直到生产胜利,而两次生产之间的工夫距离则是合乎退却法则的。

特地的,每个 ConsumerGroup 都会有一个最大生产次数的设置,如果以后音讯的生产次数超过了这个设置,则音讯不会再被投递,转而被投递进入死信队列。这个生产次数在音讯每次被投递到 MessageListener 时都会进行自增。譬如:如果音讯的最大生产次数为 1,那么无论对于这条音讯,以后是被返回生产胜利还是生产失败,都只会被生产这一次。

利用场景与最佳实际

PushConsumer 是一种近乎全托管的消费者,这里的托管的含意在于用户自身并不需要关怀音讯的接管,而只须要关注音讯的生产过程,除此之外的所有逻辑都在 Push 消费者的实现中封装掉了,用户只须要依据每条收到的音讯返回不同的生产后果即可,因而也是最为普适的消费者类型。

MessageListener 是针对单条音讯设计的监听器接口:

/**
* MessageListener is used only for the push consumer to process message consumption synchronously.
 *
 * <p> Refer to {@link PushConsumer}, push consumer will get message from server and dispatch the message to the
 * backend thread pool to consumer message concurrently.
 */
public interface MessageListener {
    /**
     * The callback interface to consume the message.
     *
     * <p>You should process the {@link MessageView} and return the corresponding {@link ConsumeResult}.
     * The consumption is successful only when {@link ConsumeResult#SUCCESS} is returned, null pointer is returned
     * or exception is thrown would cause message consumption failure too.
     */
    ConsumeResult consume(MessageView messageView);
}

绝大多数场景下,应用方应该疾速解决生产逻辑并返回生产胜利,不宜长时间阻塞生产逻辑。对于生产逻辑比拟重的情景,倡议能够后行提交生产状态,而后对音讯进行异步解决。

理论在 Push 消费者的实现中,为了保障音讯生产的及时性,音讯是会被事后拉取客户端再进行后续的生产的,因而在客户端中存在对已拉取音讯大小的缓存。为了避免缓存的音讯过多导致客户端内存透露,也提前预留了客户端参数供使用者自行进行设置。

// 设置本地最大缓存音讯数目为 16 条
pushConsumer.setMaxCachedMessageCount(16);
// 设置本地最大缓存音讯占用内存大小为 128 MB
pushConsumer.setMaxCachedMessageSizeInBytes(128 * 1024 * 1024);

SimpleConsumer

相比拟 PushConsumerSimpleConsumer 则裸露了更多的细节给使用者。在 SimpleConsumer 中,用户将自行管制音讯的接管与解决。

应用简介

以下是 SimpleConsumer 的应用示例:

SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    // Set the consumer group name.
    .setConsumerGroup(consumerGroup)
    // set await duration for long-polling.
    .setAwaitDuration(awaitDuration)
    // Set the subscription for the consumer.
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    .build();
// Max message num for each long polling.
int maxMessageNum = 16;
// Set message invisible duration after it is received.
Duration invisibleDuration = Duration.ofSeconds(15);
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
LOGGER.info("Received {} message(s)", messages.size());
for (MessageView message : messages) {final MessageId messageId = message.getMessageId();
    try {consumer.ack(message);
        LOGGER.info("Message is acknowledged successfully, messageId={}", messageId);
    } catch (Throwable t) {LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t);
    }
}
// Close the simple consumer when you don't need it anymore.
consumer.close();

SimpleConsumer 中用户须要自行进行音讯的拉取,这一动作通过 SimpleConsumer#receive 这个接口进行,而后再依据本人业务逻辑处理结果的不同再对拉取到的音讯进行不同的解决。SimpleConsumer#receive 也是通过长轮询来承受来自服务端的音讯,具体的长轮询工夫能够应用 SimpleConsumerBuilder#setAwaitDuration 来进行设置。

SimpleConsumer 中,用户须要通过 SimpleConsumer#receive 设置一个音讯不反复的工夫窗口(或者说对于通过这个接口收到的音讯的一个不可见工夫窗口),这个工夫窗口从用户承受到这条音讯开始计时,在这段时间之内音讯是不会反复投递到消费者的,而超出这个工夫窗口之后,则会对这条音讯进行再一次的投递。在这个过程中,音讯的生产次数也会进行递增。与 PushConsumer 相似的是,一旦生产次数超出 ConsumerGroup 的最大次数,也就不会进行重投了。

相比拟于 PushConsumer 而言,SimpleConsumer 用户能够自主管制承受音讯的节奏。SimpleConsumer#receive 会针对于以后的订阅关系去服务端拉取符合条件的音讯。SimpleConsumer 实际上的每次音讯接管申请是依照具体 Topic 的分区来 one by one 发动申请的,理论的 Topic 分区可能会比拟多,因而为了保障音讯接管的及时性,倡议综合本人的业务解决能力肯定水平上进步 SimpleConsumer#receive 的并发度。

用户在承受到音讯之后,能够抉择对音讯应用 ack 或者 changeInvisibleDuration,前者即对服务端示意对这条音讯的确认,与 PushConsumer 中的生产胜利相似,而 changeInvisibleDuration 则示意提早以后音讯的可见工夫,即须要服务端在以后一段时间之后再向客户端进行投递。值得注意的是,这里音讯的再次投递也是须要遵循 ConsumerGroup 的最大生产次数的限度,即一旦音讯的最大生产次数超出了最大生产次数(每次音讯达到可见工夫都会进行生产次数的自增),则不再进行投递,转而进入死信队列。举例来说:

  • 进行 ack,即示意音讯生产胜利被确认,生产进度被服务端同步。
  • 进行 changeInvisibleDuration,

1)如果音讯曾经超过以后 ConsumerGroup 的最大生产次数,那么音讯后续会被投递进入死信队列

2)如果音讯未超过以后 ConsumerGroup 的最大生产次数,若申请在上一次音讯可见工夫到来之前发动,则批改胜利,否则则批改失败。

利用场景与最佳实际

PushConsumer 中,音讯是单条地被投递进入 MessageListener 来解决的,而在 SimpleConsumer 中用户能够同时拿到一批音讯,每批音讯的最大条数也由 SimpleConsumer#receive 来决定。在一些 IO 密集型的利用中,会是一个更加不便的抉择。此时用户能够每次拿到一批音讯并集中进行解决从而进步生产速度。

PullConsumer

PullConsumer 也是 RocketMQ 始终以来都反对的消费者类型,RocketMQ 5.0 中全新的 PullConsumer API 还在演进中,敬请期待。下文中的 PullConsumer 会应用 4.0 中现存的 LitePullConsumer 进行阐述,也是以后举荐的形式。

应用简介

现存的 LitePullConsumer 中的次要接口

// PullConsumer 中的次要接口
public interface LitePullConsumer {
 // 注册路由变动监听器
void registerTopicMessageQueueChangeListener(String topic,
        TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException;
    // 将队列 assign 给以后消费者
    void assign(Collection<MessageQueue> messageQueues);
    // 针对以后 assigned 的队列获取音讯
    List<MessageExt> poll(long timeout);
    // 查找以后队列在服务端提交的位点
    Long committed(MessageQueue messageQueue) throws MQClientException;
    // 设置是否主动提交队列位点
    void setAutoCommit(boolean autoCommit);
    // 同步提交队列位点
    void commitSync();}

在 RocketMQ 中,无论是音讯的发送还是接管,都是通过队列来进行的,一个 Topic 由若干个队列组成,音讯自身也是依照队列的模式来一个个进行存储的,同一个队列中的音讯领有不同的位点,且位点的大小是随随音讯达到服务端的工夫逐次递增的,实质上不同 ConsumerGroup 在服务端的生产进度就是一个个队列中的位点信息,客户端将本人的生产进度同步给服务端实质上其实就是在同步一个个音讯的位点。

在 PullConsumer 中将队列这个概念残缺地裸露给了用户。用户能够针对本人关怀的 topic 设置路由监听器从而感知队列的变动,并将队列 assign 给以后消费者,当用户应用 LitePullConsumer#poll 时会尝试获取曾经 assign 好了的队列中的音讯。如果设置了 LitePullConsumer#setAutoCommit 的话,一旦音讯达到了客户端就会主动进行位点的提交,否则则须要应用 LitePullConsumer#commitSync 接口来进行手动提交。

利用场景与最佳实际

PullConsumer 中用户领有对音讯位点治理的相对自主权,能够自行治理生产进度,这是与 PushConsumer 和 SimpleConsumer 最为实质的不同,这也使得 PullConsumer 在流计算这种须要同时自主管制生产速率和生产进度的场景能失去十分宽泛的利用。更多状况下,PullConsumer 是与具体的流计算框架进行集成的。

如果您对 RocketMQ 感兴趣,欢送扫描下方二维码退出钉钉群一起沟通交流~

点击 此处 ,进入官网理解更多详情~

正文完
 0