1.RabbitMQ特点
- 保证可靠性。如持久化、传输确认、发布确认等。
- 路由灵活。通过Exchange交换器来路由消息。
- 支持集群。
- 具有高可用性。
- 支持多种协议,除了AMQP,还支持STOMP和MQTT等。
- 支持多语言。
- 提供管理界面。
- 提供跟踪机制。
- 提供插件机制,多方面扩展。
2.RabbitMQ基本概念
- Message(消息):由消息头和消息体组成。消息体是不透明的,但是消息头则由一系列可选属性组成,包括routing-key(路右键)、priority(优先级)、delivery-mode(消息持久化)等。
- Publisher(消息生产者):向交换器发布消息的客户端应用程序。
- Exchange(交换器):接收生产者发送来的消息,并将消息路由到服务器中的队列上。
- Binding(绑定):消息队列和交换器之间的关联。
- Queue(消息队列):保存消息,直到发送给消费者。
- Connection(网络连接):一个TCP连接。
- Channel(信道):一个双向数据流通道,建立在真实的TCP连接内的虚拟连接。
- Consumer(消息消费者):从消息队列取走消息的客户端应用程序。
- Virtual Host(虚拟主机):一批交换器、消息队列等相关的对象。默认vhost是“/”。
- Broker:消息队列服务器实体。
(1)AMQP中的消息路由
AMQP中增加了Exchange和Binding的角色。生产者需要把消息发布到Exchange上,然后通过Binding,将消息发送到指定的队列。
(2)交换器类型
- Direct交换器
如果消息的路由键(RoutingKey)和绑定键(BindingKey)完全匹配,就会把消息发送到对应的队列上。 - Fanout交换器
不处理路右键,只是简单的将队列绑定到交换器,当有消息发送到这个交换器时,就会将消息发送到和它绑定的队列上。 - Topic交换器
将路由键和某种模式进行匹配,此时的BindingKey是一种模式。路由键和绑定键中的单词与单词之间是用“.”分割的。其中模式中的“#”表示匹配0到多个单词,“*”表示只匹配一个单词。
3.用Java访问RabbitMQ
1.在Maven工程中添加依赖。
<!-- RabbitMQ依赖 --><dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.1.0</version></dependency>
2.消息生产者
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); // 设置RabbitMQ地址 factory.setHost("localhost"); factory.setVirtualHost("/"); // 建立到代理服务器的连接 Connection conn = factory.newConnection(); // 创建信道 Channel channel = conn.createChannel(); // 声明交换器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); String routingKey = "testRoutingKey"; // 发布消息 byte[] messageBodyBytes = "quit".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes); // 关闭信道和连接 channel.close(); conn.close(); }}
3.消息消费者
public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); // 设置RabbitMQ地址 factory.setHost("localhost"); factory.setVirtualHost("/"); // 建立到代理服务器的连接 Connection conn = factory.newConnection(); // 创建信道 Channel channel = conn.createChannel(); // 声明交换器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); // 声明队列 String queueName = channel.queueDeclare().getQueue(); String routingKey = "testRoutingKey"; // 绑定队列,通过路由键testRoutingKey将队列和交换器绑定起来 channel.queueBind(queueName, exchangeName, routingKey); while(true) { // 消费消息 boolean autoAck = false; String consumerag = ""; channel.basicConsume(queueName, autoAck, consumerag, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消息的路由键:" + routingKey); System.out.println("消息的内容类型:" + contentType); long deliveruTag = envelope.getDeliveryTag(); // 确认消息 channel.basicAck(deliveruTag, false); System.out.println("消息的消息体内容:"); String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr); } }); } }}
4.Spring整合RabbitMQ