乐趣区

关于后端:程序员必须掌握的消息中间件RabbitMQ

一、Rabbit 概述

RabbitMQ 是一个开源的音讯代理和队列服务器,用来通过一般协定在齐全不同的利用两头共享数据,RabbitMQ 是应用 Erlang 语言 来编写的,并且 RabbitMQ 是基于 AMQP 协定的。

特点:

  • 开源、性能优良

    Erlang 语言 最后用在交换机的架构模式,这样使得 RabbitMQ 在 Broker 之间进行数据交互的性能时十分优良的。Erlang 的长处:Erlang 有着和原生 Socket 一样的提早。

  • 可靠性

    提供可靠性音讯投递模式(confirm)、返回模式(return)。

  • 扩展性

    多个 RabbitMQ 节点能够组成一个集群,也能够依据理论业务状况动静地扩大集群中节点。

  • 与 SpringAOP 完满的整合、API 丰盛
  • 保证数据不失落的前提做到高可靠性、可用性

二、AMQP 协定

AMQP (Advanced Message Queuing Protocol) 即高级音讯队列协定,是一个过程间传递 异步音讯 网络协议

AMQP 模型

工作过程如下:首先发布者(Publisher)公布音讯(Message),经由交换机 Exchange。交换机依据 路由规定 将收到的音讯分发给与该交换机绑定的 Queue。最初 AMQP 代理会将音讯投递给订阅了此队列的消费者,或者消费者依照需要自行获取。

对于 AMQP 模型的几点阐明:

  • 发布者、交换机、队列、消费者都能够有多个。AMQP 是一个网络协议,所以这个过程中的发布者,消费者,音讯代理能够别离存在于不同的设施上。
  • 布者公布音讯时能够给音讯指定各种音讯属性(Message Meta-data)。有些属性有可能会被音讯代理(Brokers)应用,然而其余的属性则是齐全不通明的,它们只能被接管音讯的利用所应用。
  • 从平安角度思考,网络是不牢靠的,又或是消费者在解决音讯的过程中意外挂掉,这样没有解决胜利的音讯就会失落。基于此起因,AMQP 模块蕴含了一个音讯确认机制:当一个音讯从队列中投递给消费者后,不会立刻从队列中删除,直到它收到来自消费者的确认回执(Acknowledgement)后,才齐全从队列中删除。
  • 在某些状况下,例如当一个音讯无奈被胜利路由时(无奈从交换机散发到队列),音讯或者会被返回给发布者并被抛弃。或者,如果音讯代理执行了 延期操作 ,音讯会被放入一个 死信队列 中。此时,音讯发布者能够抉择某些参数来解决这些非凡状况。

Producer & Consumer

音讯生产者(Producer),向 Broker 发送音讯的客户端。

音讯消费者(Consumer),从 Broker 生产音讯的客户端。

Broker

一个 RabbitMQ Broker 能够简略地看作一个 RabbitMQ 服务节点,或者 RabbitMQ 服务实例。大多数状况下能够将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。

Exchange

Exchange 即交换器,是用来发送音讯的 AMQP 实体。Exchange 拿到一个音讯之后将它 路由 给一个或零个队列。Exchange 应用哪种路由算法是由 交换机类型 绑定(Bindings)规定 所决定的。

Binding

Producer 将音讯发给 Exchange 时,个别会指定一个 RoutingKey (路由键),用来指定这个音讯的路由规定,而这个 RoutingKey 须要与交换器类型和 BindingKey (绑定键) 联结应用能力最终失效

RabbitMQ 中通过 Binding (绑定) 将 Exchange 与 Queue(音讯队列) 关联起来,在绑定时个别会指定一个 BindingKey,这样 RabbitMQ 就晓得如何正确将音讯路由到 Queue 中。一个绑定就是基于路由键将交换器和音讯队列连接起来的路由规定。

生产者将音讯发送给交换器,当 BindingKey 和 RoutingKey 相匹配时,音讯会被路由到对应的队列中。留神 BindingKey 并不是在所有的状况下都失效,它依赖于交换器类型,比方 fanout 类型的交换器就会忽视,而是将音讯路由到所有绑定到该交换器的队列中。

Exchange 类型

Exchange 有以下 4 种类型,不同的类型对应着不同的路由策略:

direct

Exchange 默认类型。路由规定是把音讯路由到 Bindingkey 与 RoutingKey 齐全匹配的 Queue 中 。direct 类型罕用在 解决有优先级的工作,依据工作的优先级把音讯发送到对应的队列,这样能够指派更多的资源去解决高优先级的队列。

以上图为例,如果发送音讯的时候 RoutingKey=”booking”,那么音讯会路由到 Queue1 和 Queue2。如果在发送音讯的时候设置 RoutingKey=”create” 或 “confirm”,音讯只会路由到 Queue2。如果以其余的 RoutingKey 发送音讯,则音讯不会路由到这两个队列中。

fanout

路由规定是把所有发送到该 Exchange 的音讯路由到所有与它绑定的 Queue 中,不须要做任何判断操作,所以 fanout 类型是所有的交换机类型外面速度最快的。fanout 类型罕用来播送音讯

topic

direct 类型的 Exchange 路由规定是齐全匹配 BindingKey 和 RoutingKey,然而这种严格的匹配形式在很多状况下不能满足理论业务的需要。

topic 类型的 Exchange 在匹配规定上进行了扩大,它与 direct 类型的 Exchange 类似,也是将音讯路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规定有些不同,它约定:

  • RoutingKey 为一个点号 . 分隔的字符串,其中 .分隔开的每一段独立的字符串称为一个单词
  • BindingKey 和 RoutingKey 一样也是点号 . 分隔的字符串;
  • BindingKey 中能够存在两种非凡字符串 *#,用于做含糊匹配,其中 * 用于匹配一个单词,# 用于匹配零个或多个单词。

以上图为例,如果发送音讯的时候 RoutingKey 为

  • “com.rabbitmq.client”,那么音讯会路由到 Queue1 和 Queue2
  • “com.hidden.client”,那么音讯只会路由到 Queue2 中
  • “com.hidden.demo”,那么音讯只会路由到 Queue2 中
  • “java.rabbitmq.demo”,那么音讯只会路由到 Queue1 中
  • “java.util.concurrent”,那么音讯将会被抛弃或者返回给生产者,因为它没有匹配任何路由键。
headers

headers 类型的 Exchange 不依赖于路由键的匹配规定来路由音讯,而是依据发送的音讯内容中的 headers 属性进行匹配。在绑定队列和交换器时指定一组键值对,当发送音讯到交换器时,RabbitMQ 会获取到该音讯的 headers(也是一个键值对的模式),比照其中的键值对是否齐全匹配队列和交换器绑定时指定的键值对,如果齐全匹配则音讯会路由到该队列,否则不会路由到该队列。headers 类型的 Exchange 性能会很差,不举荐应用。

Queue

Queue 其实是 Message Queue 即音讯队列,保留音讯并将它们转发给消费者。Queue 是音讯的容器,也是音讯的起点。一个音讯可投入一个或多个队列。音讯始终在队列外面,期待消费者连贯到这个队列将其生产。

RabbitMQ 中音讯只能存储在队列中,而 Kafka 将音讯存储在 Topic 中,即该 Topic 对应的 Partition 中。RabbitMQ 的生产者生产音讯并最终投递到队列中,消费者能够从队列中获取音讯并生产。

当多个消费者订阅同一个队列时,队列中的音讯会被均匀摊派(Round-Robin,即轮询)给多个消费者进行解决,而不是每个消费者都收到所有的音讯并解决,这样防止音讯被反复生产。

队列属性

Queue 跟 Exchange 共享某些属性,然而队列也有一些另外的属性:

  • Name
  • Durable:音讯代理重启后,队列仍旧存在
  • Exclusive:只被一个连贯应用,而且当连贯敞开后队列即被删除
  • Auto-delete:当最初一个消费者退订后即被删除
  • Arguments:一些音讯代理用他来实现相似与 TTL 的某些额定性能

队列创立

队列在申明(declare)后能力被应用。

如果一个队列尚不存在,申明一个队列会创立它。如果申明的队列曾经存在,并且属性完全相同,那么此次申明不会对原有队列产生任何影响。如果申明中的属性与已存在队列的属性有差别,那么一个错误代码为 406 的通道级异样就会被抛出。

队列长久化

长久化队列(Durable Queues)会被存储在磁盘上,当音讯代理(Broker)重启的时候,它仍旧存在。没有被长久化的队列称作暂存队列(Transient Queues)。并不是所有的场景和案例都须要将队列长久化。

长久化的队列并不会使得路由到它的音讯也具备持久性。假使音讯代理挂掉了,重新启动,那么在重启的过程中长久化队列会被从新申明,无论怎样,只有通过长久化的音讯能力被从新复原

音讯机制

音讯确认

AMQP 代理在什么时候删除音讯才是正确的?AMQP 0-9-1 标准给咱们两种倡议:

  • 主动确认模式:当音讯代理(Broker)将音讯发送给利用后立刻删除。(应用 AMQP 办法:basic.deliver 或 basic.get-ok)
  • 显示确认模式:待 Consumer 发送一个确认回执(acknowledgement)后再删除音讯。(应用 AMQP 办法:basic.ack)

    如果一个消费者在尚未发送确认回执的状况下挂掉了,那 AMQP 代理会将音讯从新投递给另一个消费者。如果过后没有可用的消费者了,音讯代理会死等下一个注册到此队列的消费者,而后再次尝试投递。

回绝音讯

当回绝某条音讯时,利用能够通知音讯代理销毁该条音讯或者从新将该条音讯放入队列。

当此队列只有一个消费者时,有可能存在回绝音讯并将音讯从新放入队列的行为而引起音讯在同一个消费者身上有限循环的状况。

预取音讯

在多个消费者共享一个队列时,明确指定在收到下一个确认回执前每个消费者一次能够承受多少条音讯是十分有用的。这能够在试图批量公布音讯的时候起到 简略的负载平衡和进步音讯吞吐量 的作用。

音讯属性

AMQP 模型中的音讯(Message)对象是带有属性(Attributes)的:

属性 阐明
Content type 内容类型
Content encoding 内容编码
Routing key 路由键
Delivery mode (persistent or not) 投递模式(长久化 或 非长久化)
Message priority 音讯优先权
Message publishing timestamp 音讯公布的工夫戳
Expiration period 音讯有效期
Publisher application id 公布利用的 ID

有些属性是被 AMQP 代理所应用的,然而大多数是凋谢给接管它们的利用解释器用的。有些属性是可选的也被称作音讯头(headers)。和 HTTP 协定的 X-Headers 很类似,音讯属性须要在音讯被公布的时候定义。

音讯主体

AMQP 的音讯除属性外,也含有一个有效载荷 Payload(音讯理论携带的数据),它被 AMQP 代理当作不通明的字节数组来看待。

音讯代理不会查看或者批改 Payload,音讯能够只蕴含属性而不携带有效载荷,它通常会应用相似 JSON 这种序列化的格局数据。

音讯长久化

音讯可能以长久化的形式公布,AMQP 代理会将此音讯存储在磁盘上。如果服务器重启,零碎会确认收到的长久化音讯未失落。

简略地将音讯发送给一个长久化的交换机或者路由给一个长久化的队列,并不会使得此音讯具备长久化性质:它齐全取决与音讯自身的长久模式(persistence mode)。将音讯以长久化形式公布时,会对性能造成肯定的影响(就像数据库操作一样,健壮性的存在必然造成一些性能损失)。

三、RabbitMQ 命令行操作

启动 & 进行服务器

  • 启动服务器

    rabbitmq-server start &
  • 进行服务器

    rabbitmqctl stop_app

查看管控台

http://localhost:15672/

# 用户名 guest
# 明码 guest

命令行根底操作

1. 利用

  • 敞开利用

    rabbitmqctl stop_app
  • 启动利用

    rabbitmqctl start_app
  • 查看节点状态

    rabbitmqctl status

2. 用户

  • 增加用户

    rabbitmqctl add_user username password
  • 删除用户

    rabbitmqctl delete_user username
  • 列出所有用户

    rabbitmqctl list_users
  • 革除用户权限

    rabbitmqctl clear_permissions -p vhostpath username
  • 列出用户权限

    rabbitmqctl list_user_permissions username
  • 批改明码

    rabbitmqctl change_password username newpassword
  • 设置用户权限

    rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"

3. 虚拟主机

  • 创立虚拟主机

    rabbitmqctl add_vhost vhostpath
  • 删除虚拟主机

    rabbitmqctl delete_vhost vhostpath
  • 列出所有虚拟主机

    rabbitmqctl list_vhosts
  • 列出虚拟主机上所有权限

    rabbitmqctl list_permissions -p vhostpath

4. 队列

  • 查看所有队列信息

    rabbitmqctl list_queues
  • 革除队列里的音讯

    rabbitmqctl -p vhostpath purge_queue blue

命令行高级操作

  • 移除所有数据

    rabbitmqctl reset
    # 要在 rabbitmqctl stop_app 之后应用
  • 组成集群命令

    rabbitmqctl join_cluster <clusternode> [--ram]
  • 查看集群状态

    rabbitmq cluster_status
  • 批改集群节点的存储模式

    rabbitmqctl change_cluser_node_type disc | ram
  • 摘除节点(遗记节点)

    rabbitmqctl forget_cluster_node [--offline]
  • 批改节点名称

    rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2]

四、RabbitMQ 高级个性

音讯 100% 可靠性投递的解决方案

生产端可靠性投递

  • 保障音讯胜利收回
  • 保障 MQ 节点的胜利接管
  • 发送端收到 MQ 节点(Broker)确认应答
  • 欠缺的音讯弥补机制

解决方案 1:音讯落库

音讯落库,对音讯状态进行打标。

解决方案 2:二次确认,回调查看

音讯的提早投递,做二次确认,回调查看。

生产端幂等性操作

  • 惟一 ID + 指纹码 机制,利用数据库主键去重

    长处:实现简略

    毛病:高并罚下有数据库写入的性能瓶颈

    解决方案:依据 ID 进行分库分表进行算法路由

  • 利用 Redis 原子个性实现

Confirm 音讯机制

音讯的确认是指生产者投递音讯后,如果 Broker 收到音讯,则会给生产者一个应答,生产者进行接管应答,用来确定这条音讯是否失常地发送到 Broker。

实现机制:

  • 第一步:在 channel 上开启确认模式

    channel.confirmSelect()
  • 第二步:在 channel 上增加监听

    channel.addConfirmListener()

    监听胜利和失败的返回后果,依据具体的后果对音讯进行从新发送或记录日志等后续解决。

Return 音讯机制

音讯生产者通过制动一个 Exchange 和 routing key,把音讯送达到某一个队列中去,而后消费者监听队列,进行生产解决操作。

在某些状况下,如果咱们在发送音讯的时候,以后的 Exchange 不存在 或者指定的 routing key 路由不到,此时咱们须要监听这种不可达的音讯,就要应用 Return Listener。

根底 API 有一个配置项 mandatory

  • 如果为 true,那么监听器会接管到路由不可达的音讯,而后进行后续解决
  • 如果为 false, 那么 Broker 端主动删除该音讯

生产端限流

RabbitMQ 提供了一种 QoS(服务质量保障)性能,在非主动确认音讯的前提下,如果肯定数目的音讯(通过基于 Consume 或者 Channel 设置 QoS 值)未被确认前,不进行生产新的音讯。

波及到的办法:

void BasicQoS(unit prefetchSize,ushort prefetchCount,bool global)
  • prefetchSize:0
  • prefetchCount:告知 RabbitMQ 不要同时给一个消费者推送多个 N 个音讯,即一旦有 N 个音讯还没有 ACK,则该 Consumer 将 block 掉,始终到有音讯 ack
  • golbal:true 示意将下面设置利用于 Channel;true 示意将下面设置利用于 Consumer。

留神:

  • prefetchSize 和 global 这两项,RabbitMQ 没有实现,暂且不钻研
  • prefetchCount 在 no_ask-false 的状况下失效,即在自动应答的状况下是不失效的

生产端 ACK 与重回队列

  • 生产端的手工 ACK 和 NACK

    生产端进行生产时:

    如果因为业务异样,咱们能够进行日志的记录,而后进行弥补;

    如果因为服务器宕机等重大问题,那么须要手工进行 ACK 保障生产端生产胜利

  • 生产端的重回队列

    生产端重回队列是为了对没有胜利的音讯,音讯会被从新投递给 Broker。个别在应用利用中,都会敞开重回队列,即设置为 false。

TTL

TTL(Time To Live)即生存工夫。

  • RabbitMQ 反对 音讯 的过期工夫,在音讯发送时能够进行指定
  • RabbitMQ 反对 队列 的过期工夫,从音讯如队列开始计算,只有超过了队列的超时工夫配置,那么会主动革除音讯

死信队列(DLX,Dead-Letter-Exchange)

利用 DLX,当音讯在一个队列中变成死信(dead message)之后,其能被从新 publish 到另一个 Exchange,这个 Exchange 就是 DLX。

音讯变成死信的几种状况:

  • 音讯被回绝(basic.reject / basic.nack),并且 requeue=false
  • 音讯 TTL 过期
  • 队列达到最大长度

留神:

  • DLX 也是一个失常的 Exchange,和个别的 Exchange 没有区别,它能在任何队列上被指定,实际上就是设置某个队列的属性。
  • 当这个队列中有死信时,RabbitMQ 就会主动的将这个音讯从新公布到设置的 Exchange 下来,进而被路由到另一个队列。
  • 死信队列设置须要设置 Exchange 和 队列,而后绑定

    channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
    channel.queueDeclare("dlx.queue", true, false, false, null);
    channel.queueBind("dlx.queue", "dlx.exchange", "#");

    而后咱们进行失常申明 Exchange、队列和绑定,此时须要在队列上加上参数 arguments

    Map<String, Object> agruments = new HashMap<String, Object>();
    agruments.put("x-dead-letter-exchange", "dlx.exchange");
    // 这个 agruments 属性,要设置到申明队列上
    channel.queueDeclare(queueName, true, false, false, agruments);

本文由 mdnice 多平台公布

退出移动版