RabbitMQ
一 简介
RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ 服务器是用 Erlang 语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
二 历史
Rabbit 科技有限公司开发了 RabbitMQ,并提供对其的支持。起初,Rabbit 科技是 LSHIFT 和 CohesiveFT 在 2007 年成立的合资企业,2010 年 4 月被 VMware 旗下的 SpringSource 收购。RabbitMQ 在 2013 年 5 月成为 GoPivotal 的一部分
三 基本概念
RabbitMQ 是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成。
RabbitMQ 从整体上来看是一个典型的生产者消费者模型,主要负责接收、存储和转发消息。其整体模型架构如下图所示:
- 左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。
- 中间即是 RabbitMQ,其中包括了 交换机 和 队列
- 右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。
队列
Queue:队列,是 RabbitMQ 的内部对象,用于存储消息。
RabbitMQ 中的消息只能存储在队列中,消费者从队列中获取消息并消费。
多个消费者可以订阅到同一个队列,但是队列中的消息并不会广播,而是轮询给绑定的每一个消费者。
交换器、路由键、绑定
交换器:Exchange,生产者将消息发送到交换器,由交换器将消息路由到一个或者多个队列中。
路由键:Rountingkey,生产者将消息发送到交换器的时候,通常会指定一个 RountingKey,用来指定消息的路由规则。
绑定:binding,RabbitMQ 中通过绑定将交换器和队列关联起来
生产者将消息发送到交换器的时候,需要指定一个 RountingKey。队列和交换器绑定的时候,需要一个 BindingKey。当 RountingKey 和 BindingKey 相匹配时,消息才会传送到指定的队列。但是 fanout 类型的交换器(广播)会无视 BindingKey,而是广播发送到绑定到它自己所有队列中。
四 特性
1. 消息确认
RabbitMQ 有两种应答模式,自动和手动。这也是 AMQP 协议所推荐的。这在 point-to-point 和 broadcast 都是一样的。
自动应答 - 当 RabbitMQ 把消息发送到接收端,接收端把消息出队列的时候就自动帮你发应答消息给服务。
手动应答 - 需要我们开发人员手动去调用 ack 方法去告诉服务已经收到。
文档推荐在大数据传输中,如果对个别消息的丢失不是很敏感的话选用自动应答比较理想,而对于那些一个消息都不能丢的场景,需要选用手动应答,也就是说在正确处理完以后才应答。如果选择了自动应答,那么消息重发这个功能就没有了。
点对点模式:也就是一发一接的模式,不适用发布 / 订阅这种广播模式。
//autoAck 设置 false, 消费端挂掉,信息不会丢失,server 会 re-queue
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
// 向服务器发送应答
channel.basicAck(envelope.getDeliveryTag(), false);
在 RabbitMQ 中,为了不让消息丢失,它提供了消息应答的概念。当消费者获取到了一个消息以后,需要给 RabbitMQ 服务一个应答的消息,告知服务我已经收到或正确处理了该消息。那么 RabbitMQ 可以放心的在队列中删除该消息
2. 队列持久化
//durable 设置 true,queue 持久化,server 重启,此 queue 不丢失
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
方法的第四的参数 autoDelete,一般都会输入 false。文档描述这个参数如果是 true 的话,意思是:如果这个 queue 不再使用(没有被订阅)的话,server 就会删除它。在我的测试过程中,只要是连接改 queue 的所有接收者都断开连接的话,该 queue 就会被删除,即使里面还有没有处理的消息。RabbitMQ 的重启也同样会删除他们。如果输入的是 false,那与之相连的客户端都断开连接的话,服务是不会删除这个队列的,队列中的消息也就会存在。发送端在没有客户端连接的时候也可以把消息放入改队列,客户端起来的时候,就会得到这些消息。但是如果 RabbitMQ 服务重启的话,该队列就没有了,里面的消息自然也就没有了。
第三个参数是 exclusive,文档描述说,如果是 true,那么申明这个 queue 的 connection 断了,那么这个队列就被删除了,包括里面的消息。
第二个参数 durable,文档描述说,如果是 true,则代表是一个持久的队列,那么在服务重启后,也会存在。因为服务会把持久化的 queue 存放在硬盘上,放服务重启的时候,会重新申明这个 queue。当然必须是在 autoDelete 和 exclusive 都为 false 的时候。队列是可以被持久化,但是里面的消息是否为持久化那还要看消息的持久化设置。也就是说,如果重启之前那个 queue 里面还有没有发出去的消息的话,重启之后那队列里面是不是还存在原来的消息,这个就要取决于发送者在发送消息时对消息的设置了(信息持久化)。
3. 信息持久化
//BasicProperties 设置 MessageProperties.PERSISTENT_TEXT_PLAIN,信息持久化
channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
IBasicProperties properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2;
channel.BasicPublish("","TaskQueue", properties, bytes);
DeliveryMode 等于 2 就说明这个消息是 persistent 的。1 是默认是,不是持久的。在接收者接收消息并处理的时候会出现各种各样的问题:抛出异常导致与 RabbitMQ 连接断开,程序挂掉,网络问题等等。往往在出现这些问题的时候我们通常都希望队列能保存这些消息,并在程序再次起来的时候能够重新处理,或如果是负载均衡的模式下,能够把这个消息重新分配给其他的同等的接受者来处理。这同样也是 RabbitMQ 对消息持久化的一种功能。这我们在消息的传输控制中做详细的说明。
4. 消息的拒收
拒收,是接收端在收到消息的时候响应给 RabbitMQ 服务的一种命令,告诉服务器不应该由我处理,或者拒绝处理,扔掉。接收端在发送 reject 命令的时候可以选择是否要重新放回 queue 中。如果没有其他接收者监控这个 queue 的话,要注意一直无限循环发送的危险。
BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
channel.BasicReject(ea.DeliveryTag, false);
BasicReject 方法第一个参数是消息的 DeliveryTag,对于每个 Channel 来说,每个消息都会有一个 DeliveryTag,一般用接收消息的顺序来表示:1,2,3,4 等等。第二个参数是是否放回 queue 中,requeue。
BasicReject 一次只能拒绝接收一个消息,而 BasicNack 方法可以支持一次 0 个或多个消息的拒收,并且也可以设置是否 requeue。
channel.BasicNack(3, true, false);
1
在第一个参数 DeliveryTag 中如果输入 3,则消息 DeliveryTag 小于等于 3 的,这个 Channel 的,都会被拒收。
**
5. 消息的 QoS**
QoS = quality-of-service,顾名思义,服务的质量。通常我们设计系统的时候不能完全排除故障或保证说没有故障,而应该设计有完善的异常处理机制。在出现错误的时候知道在哪里出现什么样子的错误,原因是什么,怎么去恢复或者处理才是真正应该去做的。在接收消息出现故障的时候我们可以通过 RabbitMQ 重发机制来处理。重发就有重发次数的限制,有些时候你不可能不限次数的重发,这取决于消息的大小,重要程度和处理方式。
甚至 QoS 是在接收端设置的。发送端没有任何变化,接收端的代码也比较简单,只需要加如下代码:
channel.BasicQos(0, 1, false);
1
代码第一个参数是可接收消息的大小的,但是似乎在客户端 2.8.6 版本中它必须为 0,即使:不受限制。如果不输 0,程序会在运行到这一行的时候报错,说还没有实现不为 0 的情况。第二个参数是处理消息最大的数量。举个例子,如果输入 1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息,消息只会在队列中阻塞。如果输入 3,那么可以最多有 3 个消息不应答,如果到达了 3 个,则发送端发给这个接收方得消息只会在队列中,而接收方不会有接收到消息的事件产生。总结说,就是在下一次发送应答消息前,客户端可以收到的消息最大数量。第三个参数则设置了是不是针对整个 Connection 的,因为一个 Connection 可以有多个 Channel,如果是 false 则说明只是针对于这个 Channel 的。
这种数量的设置,也为我们在多个客户端监控同一个 queue 的这种负载均衡环境下提供了更多的选择。
// 对服务器确认之前,一次只接受一条信息
channel.basicQos(1);