关于rabbitmq:RabbitMQ-延迟消息实战

2次阅读

共计 4452 个字符,预计需要花费 12 分钟才能阅读完成。

RabbitMQ 提早音讯实战

RabbitMQ Assistant 是一款 RabbitMQ 可视化治理与监控——深刻理解您的队列、订阅与生产音讯,展现残缺的音讯流图以及压力测试。


现实生活中有一些场景须要提早或在特定工夫发送音讯,例如智能热水器须要 30 分钟后关上,未领取的订单或发送短信、电子邮件和推送告诉下午 2:00 开始的促销流动。

RabbitMQ 自身没有间接反对提早队列的性能,如果您搜寻“如何在 RabbitMQ 中应用提早音讯”,您很可能会遇到两种可能的解决方案。第一种解决方案是应用音讯 TTL 性能和死信性能的组合。第二种抉择是应用官网的 RabbitMQ 提早音讯插件。

本文具体介绍了 RabbitMQ 提早音讯。

[TOC]

什么是 RabbitMQ?

RabbitMQ 是一个开源音讯代理(也称为面向音讯的中间件),创立它是为了反对高级音讯队列协定 (Advanced Message Queuing Protocol, AMQP)。尔后,它通过插件架构进行了扩大,以反对简略(或流式)面向文本的音讯协定 (Text Oriented Message Protocol, STOMP)、音讯查问遥测传输 (Message Query Telemetry Transport, MQTT) 等协定。

对于集群和故障转移,RabbitMQ 服务器是用 Erlang 编写的,并采纳了凋谢电信平台框架。用于与代理交互的客户端库可用于所有次要编程语言,源代码可在 Mozilla 公共许可证下取得。

简略来说,RabbitMQ 是一个消息传递零碎,能够在本地或云端应用。并且反对多种消息传递协定。

RabbitMQ 的次要个性

以下是 RabbitMQ 的一些个性:

  • 集群:RabbitMQ 中的集群在设计时思考了两个指标。如果一个节点产生故障,事件的消费者和生产者能够持续运行,同时增加其余节点以横向扩大消息传递吞吐量。
  • 轻松路由:音讯通过交换器而后达到队列,这提供了灵便的路由形式。对于更简单的路由,用户能够将交换器连贯在一起或将他们的交换器类型开发为插件。
  • 可靠性:持久性、交付反馈、公布确认和高可用性是 RabbitMQ 对性能有间接影响的要害个性。
  • 安全性:客户端证书检查和仅 SSL 通信能够帮忙爱护客户端连贯。虚拟主机能够调节用户拜访,确保高级音讯隔离。

在 RabbitMQ 中启用提早音讯

很长一段时间以来,人们始终在寻找应用 RabbitMQ 实现提早消息传递的办法。迄今为止,公认的解决方案是应用音讯的组合——TTL 和死信交换器。

RabbitMQ 提早音讯插件向 RabbitMQ 增加了一种新的替换类型,如果用户违心,容许提早通过该替换路由的音讯。让咱们看看如何应用这两种办法。

  • 应用 TTL 和 DLX 提早消息传递
  • RabbitMQ 提早音讯插件

应用 TTL 和 DLX 提早消息传递

通过组合这些性能,咱们能够将音讯公布到队列,该音讯将在 TTL 后过期,而后它被从新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为
死信队列。

上面创立一个队列,为其设置 TTL 和 DLX 等:

// 创立两个交换器,一个为失常的交换器 exchange.normal,另一个为死信交换器 exchange.dlx
channel.exchangeDeclare("exchange.dlx", "direct", true);
channel.exchangeDeclare("exchange.normal", "fanout", true);


// 创立一个队列 queue.normal,并绑定到 exchange.normal
Map<String, Object> args = new HashMap <>();
// 设置队列中音讯的过期工夫
args.put("x-message-ttl", 10000);
// 当 queue.normal 中的音讯过期时,将发送到 exchange.dlx
args.put("x-dead-letter-exchange", "exchange.dlx");
// 也能够为这个 DLX 指定路由键,如果没有非凡指定,则应用原队列的路由键
args.put("x-dead-letter-routing-key", "routingkey");

channel.queueDeclare("queue.normal", true, false, false, args);
channel.queueBind("queue.normal", "exchange.normal", "");


// 创立死信队列 queue.dlx,并当到死信交换器 exchange.dlx
channel.queueDeclare("queue.dlx", true, false, false, null);
channel.queueBind("queue.dlx", "exchange.dlx", "routingkey");

// 向 exchange.normal 公布一条音讯
channel.basicPublish("exchange.normal", "rk", MessageProperties.PERSISTENT_TEXT_PLAIN , "dlx".getBytes());

参考下图,生产者首先发送一条携带路由键为“rk”的音讯,而后通过交换器 exchange.normal 顺利地存储到队列 queue.normal 中。因为队列 queue.normal 设置了过期工夫为 10s,在这 10s 内没有消费者生产这条音讯,那么断定这条音讯为过期。因为设置了 DLX,过期之时,音讯被丢给交换器 exchange.dlx 中,这时找到与 exchange.dlx 匹配的队列 queue.dlx,最初音讯被存储在 queue.dlx 这个死信队列中。对于 RabbitMQ 来说,DLX 是一个十分有用的个性。它能够解决异常情况下,音讯不可能被消费者正确生产(消费者调用了 Basic.Nack 或者 Basic.Reject)而被置入死信队列中的状况,后续分析程序能够通过生产这个死信队列中的内容来剖析过后所遇到的异常情况,进而能够改善和优化零碎。

在上图中,不仅展现的是死信队列的用法,也是提早队列的用法,对于 queue.dlx 这个死信队列来说,同样能够看
作提早队列。假如一个利用中须要将每条音讯都设置为 10 秒的提早,生产者通过 exchange.normal 这个交换器将发送的音讯存储在 queue.normal 这个队列中。消费者订阅的并非是 queue.normal 这个队列,而是 queue.dlx 这个队列。当音讯从 queue.normal 这个队列中过期之后被存入 queue.dlx 这个队列中,消费者就凑巧生产到了提早 10 秒的这条音讯。

在实在利用中,对于提早队列能够依据延迟时间的长短分为多个等级,个别分为 5 秒、10 秒、30 秒、1 分钟、5 分
钟、10 分钟、30 分钟、1 小时这几个维度,当然也能够再细化一下。

参考下图,为了简化阐明,这里只设置了 5 秒、10 秒、30 秒、1 分钟这四个等级。依据利用需要的不同,生产者在发送音讯的时候通过设置不同的路由键,以此将音讯发送到与交换器绑定的不同的队列中。这里队列别离设置了过期工夫为 5 秒、10 秒、30 秒、1 分钟,同时也别离配置了 DLX 和相应的死信队列。当相应的音讯过期时,就会转存到相应的死信队列(即提早队列)中,这样消费者依据业务本身的状况,别离抉择不同提早等级的提早队列进行生产。

RabbitMQ 提早音讯插件

从装置插件开始,但首先,让咱们看一下以下先决条件:

  • RabbitMQ 版本 3.5.8 及更高版本。
  • Erlang/OTP 18.0 及更高版本

插件装置

在 Github 下载插件。将插件复制到 RabbitMQ 的插件文件夹,而后运行以下命令启用它:

# 下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez
# 将插件挪动到 plugins 目录下
mv rabbitmq_delayed_message_exchange-3.11.1.ez ./rabbitmq_server-3.11.1/plugins/
# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

提早音讯交换器

要应用提早音讯交换器,只需申明一个类型为 x-delayed-message 的交换器,如下所示:

// ... elided code ...
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
// ... more code ...

稍后,咱们将解释替换申明中的非凡参数 x-delayed-type 的含意。

提早音讯

要提早音讯,用户必须应用 x-delay 标头公布它,该标头承受一个整数,示意音讯应由 RabbitMQ 提早的毫秒数。值得注意的是,在此上下文中的提早示意着音讯路由到队列或其余交换器的提早。交换器没有消费者的概念。
因而,一旦提早过来,插件将尝试将音讯路由到与交换器的路由规定匹配的队列。如果音讯无奈路由到任何队列,它将被抛弃。

// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes();
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
props.headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);

在下面的示例中,音讯在被插件路由之前将提早五秒钟。

路由灵活性

当咱们在下面申明替换时,咱们应用了一个设置为 directx-delayed-type 参数。这通知交换器咱们心愿它在路由音讯、创立绑定等时具备什么样的行为。

查看提早音讯

一旦咱们在消费者端收到音讯,咱们如何判断音讯是否被提早?x-delay 音讯头由插件保留。如果您以 5000 毫秒的提早发送音讯,消费者会发现 x-delay 标头设置为 5000。

参考资料:

  • RabbitMQ Delayed Messages 101: How to Delay & Schedule Messages Made Easy
  • RabbitMQ 实战:高效部署分布式音讯队列
正文完
 0