共计 2981 个字符,预计需要花费 8 分钟才能阅读完成。
1.RabbitMQ 特点
- 保证可靠性。如持久化、传输确认、发布确认等。
- 路由灵活。通过 Exchange 交换器来路由消息。
- 支持集群。
- 具有高可用性。
- 支持多种协议,除了 AMQP,还支持 STOMP 和 MQTT 等。
- 支持多语言。
- 提供管理界面。
- 提供跟踪机制。
- 提供插件机制,多方面扩展。
2.RabbitMQ 基本概念
- Message(消息):由消息头和消息体组成。消息体是不透明的,但是消息头则由一系列可选属性组成,包括 routing-key(路右键)、priority(优先级)、delivery-mode(消息持久化)等。
- Publisher(消息生产者):向交换器发布消息的客户端应用程序。
- Exchange(交换器):接收生产者发送来的消息,并将消息路由到服务器中的队列上。
- Binding(绑定):消息队列和交换器之间的关联。
- Queue(消息队列):保存消息,直到发送给消费者。
- Connection(网络连接):一个 TCP 连接。
- Channel(信道):一个双向数据流通道,建立在真实的 TCP 连接内的虚拟连接。
- Consumer(消息消费者):从消息队列取走消息的客户端应用程序。
- Virtual Host(虚拟主机):一批交换器、消息队列等相关的对象。默认 vhost 是“/”。
- Broker:消息队列服务器实体。
(1)AMQP 中的消息路由
AMQP 中增加了 Exchange 和 Binding 的角色。生产者需要把消息发布到 Exchange 上,然后通过 Binding,将消息发送到指定的队列。
(2)交换器类型
- Direct 交换器
如果消息的路由键(RoutingKey)和绑定键(BindingKey)完全匹配,就会把消息发送到对应的队列上。 - Fanout 交换器
不处理路右键,只是简单的将队列绑定到交换器,当有消息发送到这个交换器时,就会将消息发送到和它绑定的队列上。 - Topic 交换器
将路由键和某种模式进行匹配,此时的 BindingKey 是一种模式。路由键和绑定键中的单词与单词之间是用“.”分割的。其中模式中的“#”表示匹配 0 到多个单词,“*”表示只匹配一个单词。
3. 用 Java 访问 RabbitMQ
1. 在 Maven 工程中添加依赖。
<!-- RabbitMQ 依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
2. 消息生产者
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
// 设置 RabbitMQ 地址
factory.setHost("localhost");
factory.setVirtualHost("/");
// 建立到代理服务器的连接
Connection conn = factory.newConnection();
// 创建信道
Channel channel = conn.createChannel();
// 声明交换器
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
String routingKey = "testRoutingKey";
// 发布消息
byte[] messageBodyBytes = "quit".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
// 关闭信道和连接
channel.close();
conn.close();}
}
3. 消息消费者
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
// 设置 RabbitMQ 地址
factory.setHost("localhost");
factory.setVirtualHost("/");
// 建立到代理服务器的连接
Connection conn = factory.newConnection();
// 创建信道
Channel channel = conn.createChannel();
// 声明交换器
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
// 声明队列
String queueName = channel.queueDeclare().getQueue();
String routingKey = "testRoutingKey";
// 绑定队列,通过路由键 testRoutingKey 将队列和交换器绑定起来
channel.queueBind(queueName, exchangeName, routingKey);
while(true) {
// 消费消息
boolean autoAck = false;
String consumerag = "";
channel.basicConsume(queueName, autoAck, consumerag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消息的路由键:" + routingKey);
System.out.println("消息的内容类型:" + contentType);
long deliveruTag = envelope.getDeliveryTag();
// 确认消息
channel.basicAck(deliveruTag, false);
System.out.println("消息的消息体内容:");
String bodyStr = new String(body, "UTF-8");
System.out.println(bodyStr);
}
});
}
}
}
4.Spring 整合 RabbitMQ
正文完