乐趣区

RabbitMQ实践经验

虚拟主机

可以将一台 RabbitMQ 服务器服务多个不同的应用,应用间通过不同虚拟主机的划分提供了消息逻辑上的独立。

# 创建虚拟主机
rabbitmqctl add_vhost test
#删除虚拟主机
rabbitmqctl delete_vhost test
#查询当前 RabbitMQ 服务器上所有虚拟机
rabbitmqctl list_vhosts

消息保存

RabbitMQ 对 Queue 中消息的保存方式有 disk 和 RAM 两种。采用 disk 方式消息数据会被保存到以.rdq 后缀命名的文件中。绝大部分情况下,对消息相关数据保存采用 disk 方式,如果有其他高可用手段,也可选用 RAM。
消息持久化涉及 Queue、Message、Exchange 三部分。
1.Queue 持久化
通过设置 durable 为 true 来实现。

com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare() throws IOException;

/**
* exclusive 表示是否为排他队列,为 true 表示为排他队列
* autoDelete 表示是否自动删除,为 true 表示队列会在没有任何订阅消费者时被自动删除。*/
com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5) throws IOException;

void queueDeclareNoWait(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5) throws IOException;

com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclarePassive(String var1) throws IOException;

2.Message 持久化
如果想要重启后 Queue 里面没有发送的消息也继续存在,需要设置消息持久化。

/**
* BasicProperties 可以使用 PERSISTENT_TEXT_PLAIN 表示发送的是需要持久化的消息,其实也就是将 BasicProperties 中的 deliveryMode 设置为 2
*/
void basicPublish(String var1, String var2, BasicProperties var3, byte[] var4) throws IOException;

void basicPublish(String var1, String var2, boolean var3, BasicProperties var4, byte[] var5) throws IOException;

void basicPublish(String var1, String var2, boolean var3, boolean var4, BasicProperties var5, byte[] var6) throws IOException;

3.Exchange 持久化
在声明 Exchange 时使用支持 durable 入参的方法,将其设置为 true。

消息可靠投递建议

假如为了保证消息可靠投递,可以使生产者在单独的信道上监听消息响应队列,这样消费者在应答消息时,生产者可以验证消费者是否收到消息,如果在一定时间内还没有收到,则重新发送。
即使对以上三种都设置了持久化,也不能保证消息使用过程中不会丢失。例如,如果消费者收到消息时,autoAck 为 true,但消费端还没有处理完就崩溃了,这种情况下,消息还是丢失了。这种情况下,需要先将 autoAck 设置为 false,并在消费逻辑完成之后在手动确认。

通道

尽量在一个连接上创建多个通道,避免线程共享一个通道。

总结

RabbitMQ 最大的优势在于提供了比较灵活的消息路由策略、高可用、可靠性以及丰富的插件,不过由于 AMQP 协议本身实现比较重量,从而导致在吞吐量上处于下风。

退出移动版