关于mq:系统学习消息队列RabbitMQ的消息应答和持久化

33次阅读

共计 5753 个字符,预计需要花费 15 分钟才能阅读完成。

1.MQ 的 Hellow World 程序

2.MQ 的音讯应答

3.MQ 的长久化

1.MQ 的 Hellow World 程序
依据上一篇文章零碎学习音讯队列——RabbitMQ 的根底概念,咱们学习了 mq 的基础理论,实践诚然重要,实际也尤其重要,咱们先来编写一个 RabbitMq 的 Hello World 程序,来感受一下队列。

在这里咱们创立一个音讯生产者,两个音讯消费者,轮询 进行散发音讯。

pom:

        <!--rabbitmq 依赖客户端 -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>

连贯工具类:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author sulingfeng
 * @title: RabbitMqUtils
 * @date 2022/6/14 9:33
 */
public class RabbitMqUtils {

    // 失去一个连贯的工具类
    public static Channel getChannel() throws Exception {
        // 创立一个连贯工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }

}

生产者:


import com.rabbitmq.client.Channel;

import java.util.Scanner;

/**
 * @author sulingfeng
 * @title: Producer
 * @date 2022/6/13 19:55
 */
public class Producer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {

        //channel 实现了主动 close 接口 主动敞开 不须要本人进行敞开
        try (Channel channel = RabbitMqUtils.getChannel()) {
            /**
             * 申明一个队列
             * 1. 队列名称
             * 2. 队列外面的音讯是否长久化 默认音讯存储在内存中
             * 3. 该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产
             * 4. 是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除
             * 5. 其余参数
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 从控制台当中承受信息
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){String message = scanner.next();
                /**
                 * 发送一个音讯
                 * 1. 发送到那个交换机
                 * 2. 路由的 key 是哪个
                 * 3. 其余的参数信息
                 * 4. 发送音讯的音讯体
                 */
                channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
                System.out.println("发送音讯实现:"+message);
            }
        }
    }
}

消费者:

import com.rabbitmq.client.*;

/**
 * @author sulingfeng
 * @title: Consumer
 * @date 2022/6/13 20:06
 */
public class Consumer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
        System.out.println("期待接管音讯.........");
        // 音讯如何进行生产的业务逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery)-> {String message = new String(delivery.getBody());
            System.out.println(message);
        };
        // 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {System.out.println("音讯生产被中断");
         };
        /**
         * 消费者生产音讯
         * 1. 生产哪个队列
         * 2. 生产胜利之后是否要自动应答 true 代表自动应答 false 手动应答
         * 3. 生产胜利的回调
         * 4. 消费者未胜利生产的回调
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

    }
}

成果:

生产者:

消费者 1:

消费者 2:

2.MQ 的音讯应答

咱们构想这样一个问题。Rabbitmq 是如何保障音讯被稳固生产的?假如生产一个音讯要很长一段时间,然而消费者生产到一半就挂掉了,这个时候 Rabbitmq 就失落了这条音讯。
为了解决这样的问题,rabbitmq 引入了音讯应答机制 ,音讯应答就是: 消费者在承受到音讯并且生产结束之后,通知 mq 它曾经解决了,rabbitmq 能够把音讯删除了。

2.1)自动应答
自动应答就是音讯 发送之后就能够被认为发送胜利 ,这种模式须要在高吞吐量和数据传输安全性方面作出衡量,它没有对音讯是否胜利生产,发送音讯数量进行限度,可能会导致音讯失落,音讯大量积压,所以这种模式 仅实用在消费者能够高效并以某种速率解决这些音讯的状况下应用。

2.2)手动应答
每一条音讯都须要消费者手动应答 ,如果消费者 因为某种原因失去连贯 ,导致音讯未发送 ack 确认,Rabbitmq 晓得了生产未被解决 会将其从新插入队列 ,如果此时 有其它的消费者能够解决,就将音讯发送给另外一个消费者。保障了音讯被确认生产的稳定性,也能够避免消费者音讯的积压。

手动应答代码演示:

生产者:

public class Consumer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
        System.out.println("期待接管音讯.........");
        // 音讯如何进行生产的业务逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery)-> {String message = new String(delivery.getBody());
            try {Thread.sleep(5000); } catch (InterruptedException e) {e.printStackTrace(); }
            System.out.println(message);
            /**
             * 1. 音讯 tag
             * 2. 是否批量应答
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),true);
        };
        // 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {System.out.println("音讯生产被中断");
         };
        /**
         * 消费者生产音讯
         * 1. 生产哪个队列
         * 2. 生产胜利之后是否要自动应答 true 代表自动应答 false 手动应答
         * 3. 生产胜利的回调
         * 4. 消费者未胜利生产的回调
         */
        Boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);


    }
}

消费者:

public class Producer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {

        //channel 实现了主动 close 接口 主动敞开 不须要本人进行敞开
        try (Channel channel = RabbitMqUtils.getChannel()) {
            /**
             * 申明一个队列
             * 1. 队列名称
             * 2. 队列外面的音讯是否长久化 默认音讯存储在内存中
             * 3. 该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产
             * 4. 是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除
             * 5. 其余参数
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 从控制台当中承受信息
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){String message = scanner.next();
                /**
                 * 发送一个音讯
                 * 1. 发送到那个交换机
                 * 2. 路由的 key 是哪个
                 * 3. 其余的参数信息
                 * 4. 发送音讯的音讯体
                 */
                channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
                System.out.println("发送音讯实现:"+message);
            }
        }
    }
}

咱们当初开启两个消费者,而后生产者给消费者发送音讯,等发送了足够多的音讯之后,关掉一个消费者,让其中一个正在生产的音讯失落,看看队列会不会再进行重发。

生产者一共发了 14 条音讯:

消费者 2 生产到一半,进行服务

消费者 1 帮消费者 2 生产了消费者 2 未生产完的数据:

音讯确认的 api:

Channel.basicAck(用于必定确认) 
RabbitMQ 已晓得该音讯并且胜利的解决音讯,能够将其抛弃了

Channel.basicNack(用于否定确认) 

Channel.basicReject(用于否定确认) 与 Channel.basicNack 相比少一个参数,代表不解决该音讯了间接回绝,能够将其抛弃了

咱们在下面编写代码的时候,还发现了一个是否批量应答的标记位:

Multiple 的 true 和 false 代表的含意不同:

true:
代表批量应答

false:
代表单个应答

2.3)音讯的不偏心散发
咱们发现发送音讯的时候,消费者解决的速度 有快有慢 ,在这种场景下, 轮询发送的效率如同不是最优的 ,所以咱们能够进行 不偏心的散发,哪个消费者解决地快,就先发送给哪个消费者。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

有了这个值,rabbitmq 就会把该任务分配给没有那么忙的消费者,谁解决地快,谁就生产。

留神,这个配置要在生产者和消费者同时配置。

咱们发送二十条音讯:

消费者 1:

消费者 2:

此时就是依照消费者的能力生产音讯了。

同时咱们也要留神一下,这个值还有一个概念,那就是 预取值

音讯在发送的时候,实质上是异步生产的,因而消费者这里必定会有一个音讯的缓冲区,开发人员心愿限度该缓冲区的大小,以防止缓冲区外面无限度寄存未确认音讯的问题。这个时候,就能够应用 basic.qos 办法设置 ” 预取值 ” 该值就定义了容许未确认音讯的最大数量 。例如取值为 4,则 rabbitMQ 会在未确认信息为 4 之后,就不发送信息。 找到一个适合的值是一个重复试探的过程,大略在 100~300 之间,取值为 1 最激进,然而这样吞吐量会变得很低。

3.MQ 的长久化

刚刚咱们解决了消费者如何不失落生产的状况,然而咱们无奈防止 Rabbitmq 自身挂掉的状况,如何保障 Rabbitmq 服务停掉后,自身存储在 Rabbotmq 外面的音讯不失落,咱们须要把队列和音讯都标记为长久化。

3.1)队列如何长久化

咱们之前创立的队列都是非长久化的,所以如果要创立同名的长久化队列,咱们须要把以前的队列删除了才能够。

在申明队列的时候,同时要把 durable 参数设置为 true.

/**
             * 申明一个队列
             * 1. 队列名称
             * 2. 队列外面的音讯是否长久化 默认音讯存储在内存中
             * 3. 该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产
             * 4. 是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除
             * 5. 其余参数
             */
            Boolean durable = true;
            channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

咱们会发现队列曾经长久化了:

3.2)音讯如何长久化
要想让音讯长久化,须要在发送音讯的时候,减少 MessageProperties.PERSISTENT_BASIC 参数。

                channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_BASIC,message.getBytes());
                System.out.println("发送音讯实现:"+message);

其实将音讯标记为长久化并不能保障齐全不丢音讯,可能 Rabbitmq 在将音讯保留到磁盘的时候,音讯还没存储完,然而队列就挂了,如果须要更无效的长久化机制,能够参考前面的公布确认章节。

正文完
 0