作者:海向\
起源:https://www.cnblogs.com/haixi...

RabbitMQ 简述

RabbitMQ是一个音讯代理:它承受并转发音讯。 您能够将其视为邮局:当您将要把寄发的邮件投递到邮箱中时,您能够确信Postman 学生最终会将邮件发送给收件人。 在这个比喻中,RabbitMQ是一个邮箱,邮局和邮递员,用来承受,存储和转发二进制数据块的音讯。

队列就像是在RabbitMQ中表演邮箱的角色。 尽管音讯通过RabbitMQ和应用程序,但它们只能存储在队列中。 队列只受主机的内存和磁盘限度的限度,它实质上是一个大的音讯缓冲区。 许多生产者能够发送到一个队列的音讯,许多消费者能够尝试从一个队列接收数据。

producer即为生产者,用来产生音讯发送给队列。consumer是消费者,须要去读队列内的音讯。producer,consumer和broker(rabbitMQ server)不用驻留在同一个主机上;的确在大多数应用程序中它们是这样散布的。

简略队列

简略队列是最简略的一种模式,由生产者、队列、消费者组成。生产者将音讯发送给队列,消费者从队列中读取音讯实现生产。

在下图中,“P”是咱们的生产者,“C”是咱们的消费者。 两头的框是队列 - RabbitMQ代表消费者的音讯缓冲区。

java 形式

生产者

package com.anqi.mq.nat;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class MyProducer {    private static final String QUEUE_NAME = "ITEM_QUEUE";    public static void main(String[] args) throws Exception {        //1. 创立一个 ConnectionFactory 并进行设置        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("localhost");        factory.setVirtualHost("/");        factory.setUsername("guest");        factory.setPassword("guest");        //2. 通过连贯工厂来创立连贯        Connection connection = factory.newConnection();        //3. 通过 Connection 来创立 Channel        Channel channel = connection.createChannel();        //理论场景中,音讯多为json格局的对象        String msg = "hello";        //4. 发送三条数据        for (int i = 1; i <= 3 ; i++) {            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());            System.out.println("Send message" + i +" : " + msg);        }        //5. 敞开连贯        channel.close();        connection.close();    }}/** * Declare a queue * @param queue the name of the queue * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * @param arguments other properties (construction arguments) for the queue * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.IOException if an error is encountered */Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;/** * Publish a message * @see com.rabbitmq.client.AMQP.Basic.Publish * @param exchange the exchange to publish the message to * @param routingKey the routing key * @param props other properties for the message - routing headers etc * @param body the message body * @throws java.io.IOException if an error is encountered */void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;/** * Start a non-nolocal, non-exclusive consumer, with * a server-generated consumerTag. * @param queue the name of the queue * @param autoAck true if the server should consider messages * acknowledged once delivered; false if the server should expect * explicit acknowledgements * @param callback an interface to the consumer object * @return the consumerTag generated by the server * @throws java.io.IOException if an error is encountered * @see com.rabbitmq.client.AMQP.Basic.Consume * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer) */String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

消费者

package com.anqi.mq.nat;import com.rabbitmq.client.*;import java.io.IOException;public class MyConsumer {    private static final String QUEUE_NAME = "ITEM_QUEUE";    public static void main(String[] args) throws Exception {        //1. 创立一个 ConnectionFactory 并进行设置        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("localhost");        factory.setVirtualHost("/");        factory.setUsername("guest");        factory.setPassword("guest");        //2. 通过连贯工厂来创立连贯        Connection connection = factory.newConnection();        //3. 通过 Connection 来创立 Channel        Channel channel = connection.createChannel();        //4. 申明一个队列        channel.queueDeclare(QUEUE_NAME, true, false, false, null);        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");        /*           true:示意主动确认,只有音讯从队列中获取,无论消费者获取到音讯后是否胜利生产,都会认为音讯曾经胜利生产           false:示意手动确认,消费者获取音讯后,服务器会将该音讯标记为不可用状态,期待消费者的反馈,如果消费者一           直没有反馈,那么该音讯将始终处于不可用状态,并且服务器会认为该消费者曾经挂掉,不会再给其发送音讯,           直到该消费者反馈。        */        //5. 创立消费者并接管音讯        Consumer consumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope,                                       AMQP.BasicProperties properties, byte[] body)                    throws IOException {                String message = new String(body, "UTF-8");                System.out.println(" [x] Received '" + message + "'");            }        };        //6. 设置 Channel 消费者绑定队列        channel.basicConsume(QUEUE_NAME, true, consumer);    }}Send message1 : helloSend message2 : helloSend message3 : hello [*] Waiting for messages. To exit press CTRL+C [x] Received 'hello' [x] Received 'hello' [x] Received 'hello'

当咱们启动生产者之后查看RabbitMQ治理后盾能够看到有一条音讯正在期待被生产。

当咱们启动消费者之后再次查看,能够看到积压的一条音讯曾经被生产。

总结

队列申明queueDeclare的参数:第一个参数示意队列名称、第二个参数为是否长久化(true示意是,队列将在服务器重启时生存)、第三个参数为是否是独占队列(创建者能够应用的公有队列,断开后主动删除)、第四个参数为当所有消费者客户端连贯断开时是否主动删除队列、第五个参数为队列的其余参数。

basicConsume的第二个参数autoAck: 应答模式,true:自动应答,即消费者获取到音讯,该音讯就会从队列中删除掉,false:手动应答,当从队列中取出音讯后,须要程序员手动调用办法应答,如果没有应答,该音讯还会再放进队列中,就会呈现该音讯始终没有被生产掉的景象。

这种简略队列的模式,零碎会为每个队列隐式地绑定一个默认交换机,交换机名称为" (AMQP default)",类型为直连 direct,当你手动创立一个队列时,零碎会主动将这个队列绑定到一个名称为空的 Direct 类型的交换机上,绑定的路由键 routing key 与队列名称雷同,相当于channel.queueBind(queue:"QUEUE_NAME", exchange:"(AMQP default)“, routingKey:"QUEUE_NAME");尽管实例没有显式申明交换机,然而当路由键和队列名称一样时,就会将音讯发送到这个默认的交换机中。这种形式比较简单,然而无奈满足简单的业务需要,所以通常在生产环境中很少应用这种形式。

The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.默认交换机隐式绑定到每个队列,其中路由键等于队列名称。不可能显式绑定到,或从缺省替换中解除绑定。它也不能被删除。

——引自 RabbitMQ 官网文档

spring-amqp形式

引入 Maven 依赖

<dependency>    <groupId>com.rabbitmq</groupId>    <artifactId>amqp-client</artifactId>    <version>5.6.0</version></dependency>        <dependency>    <groupId>org.springframework.amqp</groupId>    <artifactId>spring-rabbit</artifactId>    <version>2.1.5.RELEASE</version></dependency>

spring 配置文件

<beans xmlns="http://www.springframework.org/schema/beans"       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"       xmlns:rabbit="http://www.springframework.org/schema/rabbit"       xsi:schemaLocation="http://www.springframework.org/schema/rabbit           https://www.springframework.org/schema/rabbit/spring-rabbit.xsd           http://www.springframework.org/schema/beans           https://www.springframework.org/schema/beans/spring-beans.xsd">    <rabbit:connection-factory id="connectionFactory" host="localhost" virtual-host="/"    username="guest" password="guest"/>    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>    <rabbit:admin connection-factory="connectionFactory"/>    <rabbit:queue name="MY-QUEUE"/></beans>

应用测试

import org.springframework.amqp.core.AmqpTemplate;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class Main {    public static void main(String[] args) {        ApplicationContext app = new ClassPathXmlApplicationContext("spring/rabbit-context.xml");        AmqpTemplate amqpTemplate = app.getBean(AmqpTemplate.class);        amqpTemplate.convertAndSend("MY-QUEUE", "Item");        String msg = (String) amqpTemplate.receiveAndConvert("MY-QUEUE");        System.out.println(msg);    }}

参考办法

/** * Convert a Java object to an Amqp {@link Message} and send it to a specific exchange * with a specific routing key. * * @param exchange the name of the exchange * @param routingKey the routing key * @param message a message to send * @throws AmqpException if there is a problem */void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;/**     * Receive a message if there is one from a specific queue and convert it to a Java     * object. Returns immediately, possibly with a null value.     *     * @param queueName the name of the queue to poll     * @return a message or null if there is none waiting     * @throws AmqpException if there is a problem     */@NullableObject receiveAndConvert(String queueName) throws AmqpException;

近期热文举荐:

1.1,000+ 道 Java面试题及答案整顿(2021最新版)

2.终于靠开源我的项目弄到 IntelliJ IDEA 激活码了,真香!

3.阿里 Mock 工具正式开源,干掉市面上所有 Mock 工具!

4.Spring Cloud 2020.0.0 正式公布,全新颠覆性版本!

5.《Java开发手册(嵩山版)》最新公布,速速下载!

感觉不错,别忘了顺手点赞+转发哦!