RabbitMQ一

50次阅读

共计 2981 个字符,预计需要花费 8 分钟才能阅读完成。

1.RabbitMQ 特点

  • 保证可靠性。如持久化、传输确认、发布确认等。
  • 路由灵活。通过 Exchange 交换器来路由消息。
  • 支持集群。
  • 具有高可用性。
  • 支持多种协议,除了 AMQP,还支持 STOMP 和 MQTT 等。
  • 支持多语言。
  • 提供管理界面。
  • 提供跟踪机制。
  • 提供插件机制,多方面扩展。

2.RabbitMQ 基本概念

  • Message(消息):由消息头和消息体组成。消息体是不透明的,但是消息头则由一系列可选属性组成,包括 routing-key(路右键)、priority(优先级)、delivery-mode(消息持久化)等。
  • Publisher(消息生产者):向交换器发布消息的客户端应用程序。
  • Exchange(交换器):接收生产者发送来的消息,并将消息路由到服务器中的队列上。
  • Binding(绑定):消息队列和交换器之间的关联。
  • Queue(消息队列):保存消息,直到发送给消费者。
  • Connection(网络连接):一个 TCP 连接。
  • Channel(信道):一个双向数据流通道,建立在真实的 TCP 连接内的虚拟连接。
  • Consumer(消息消费者):从消息队列取走消息的客户端应用程序。
  • Virtual Host(虚拟主机):一批交换器、消息队列等相关的对象。默认 vhost 是“/”。
  • Broker:消息队列服务器实体。

(1)AMQP 中的消息路由
AMQP 中增加了 Exchange 和 Binding 的角色。生产者需要把消息发布到 Exchange 上,然后通过 Binding,将消息发送到指定的队列。

(2)交换器类型

  • Direct 交换器
    如果消息的路由键(RoutingKey)和绑定键(BindingKey)完全匹配,就会把消息发送到对应的队列上。
  • Fanout 交换器
    不处理路右键,只是简单的将队列绑定到交换器,当有消息发送到这个交换器时,就会将消息发送到和它绑定的队列上。
  • Topic 交换器
    将路由键和某种模式进行匹配,此时的 BindingKey 是一种模式。路由键和绑定键中的单词与单词之间是用“.”分割的。其中模式中的“#”表示匹配 0 到多个单词,“*”表示只匹配一个单词。

3. 用 Java 访问 RabbitMQ
1. 在 Maven 工程中添加依赖。

<!-- RabbitMQ 依赖 -->
<dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>4.1.0</version>
</dependency>

2. 消息生产者

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 设置 RabbitMQ 地址
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        // 建立到代理服务器的连接
        Connection conn = factory.newConnection();
        // 创建信道
        Channel channel = conn.createChannel();
        // 声明交换器
        String exchangeName = "hello-exchange";
        channel.exchangeDeclare(exchangeName, "direct", true);
        String routingKey = "testRoutingKey";
        // 发布消息
        byte[] messageBodyBytes = "quit".getBytes();
        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
        // 关闭信道和连接
        channel.close();
        conn.close();}
}

3. 消息消费者

public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 设置 RabbitMQ 地址
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        // 建立到代理服务器的连接
        Connection conn = factory.newConnection();
        // 创建信道
        Channel channel = conn.createChannel();
        // 声明交换器
        String exchangeName = "hello-exchange";
        channel.exchangeDeclare(exchangeName, "direct", true);
        // 声明队列
        String queueName = channel.queueDeclare().getQueue();
        String routingKey = "testRoutingKey";
        // 绑定队列,通过路由键 testRoutingKey 将队列和交换器绑定起来
        channel.queueBind(queueName, exchangeName, routingKey);
        while(true) {
            // 消费消息
            boolean autoAck = false;
            String consumerag = "";
            channel.basicConsume(queueName, autoAck, consumerag, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)  throws IOException {String routingKey = envelope.getRoutingKey();
                    String contentType = properties.getContentType();
                    System.out.println("消息的路由键:" + routingKey);
                    System.out.println("消息的内容类型:" + contentType);
                    long deliveruTag = envelope.getDeliveryTag();
                    // 确认消息
                    channel.basicAck(deliveruTag, false);
                    System.out.println("消息的消息体内容:");
                    String bodyStr = new String(body, "UTF-8");
                    System.out.println(bodyStr);
                }
            });
        }
    }
}

4.Spring 整合 RabbitMQ

正文完
 0