1.MQ的Hellow World程序

2.MQ的音讯应答

3.MQ的长久化

1.MQ的Hellow World程序
依据上一篇文章零碎学习音讯队列——RabbitMQ的根底概念,咱们学习了mq的基础理论,实践诚然重要,实际也尤其重要,咱们先来编写一个RabbitMq的Hello World程序,来感受一下队列。

在这里咱们创立一个音讯生产者,两个音讯消费者,轮询进行散发音讯。

pom:

        <!--rabbitmq 依赖客户端-->        <dependency>            <groupId>com.rabbitmq</groupId>            <artifactId>amqp-client</artifactId>            <version>5.8.0</version>        </dependency>

连贯工具类:

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;/** * @author sulingfeng * @title: RabbitMqUtils * @date 2022/6/14 9:33 */public class RabbitMqUtils {    //失去一个连贯的工具类    public static Channel getChannel() throws Exception {        //创立一个连贯工厂        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        factory.setUsername("guest");        factory.setPassword("guest");        Connection connection = factory.newConnection();        Channel channel = connection.createChannel();        return channel;    }}

生产者:

import com.rabbitmq.client.Channel;import java.util.Scanner;/** * @author sulingfeng * @title: Producer * @date 2022/6/13 19:55 */public class Producer {    private final static String QUEUE_NAME = "hello";    public static void main(String[] args) throws Exception {        //channel 实现了主动 close 接口 主动敞开 不须要本人进行敞开        try (Channel channel = RabbitMqUtils.getChannel()) {            /**             * 申明一个队列             * 1.队列名称             * 2.队列外面的音讯是否长久化 默认音讯存储在内存中             * 3.该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产             * 4.是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除             * 5.其余参数             */            channel.queueDeclare(QUEUE_NAME, false, false, false, null);            //从控制台当中承受信息            Scanner scanner = new Scanner(System.in);            while (scanner.hasNext()){                String message = scanner.next();                /**                 * 发送一个音讯                 * 1.发送到那个交换机                 * 2.路由的 key 是哪个                 * 3.其余的参数信息                 * 4.发送音讯的音讯体                 */                channel.basicPublish("",QUEUE_NAME,null,message.getBytes());                System.out.println("发送音讯实现:"+message);            }        }    }}

消费者:

import com.rabbitmq.client.*;/** * @author sulingfeng * @title: Consumer * @date 2022/6/13 20:06 */public class Consumer {    private final static String QUEUE_NAME = "hello";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMqUtils.getChannel();        System.out.println("期待接管音讯.........");        //音讯如何进行生产的业务逻辑        DeliverCallback deliverCallback = (consumerTag, delivery)-> {            String message = new String(delivery.getBody());            System.out.println(message);        };        //勾销生产的一个回调接口 如在生产的时候队列被删除掉了        CancelCallback cancelCallback = (consumerTag) -> {            System.out.println("音讯生产被中断");         };        /**         * 消费者生产音讯         * 1.生产哪个队列         * 2.生产胜利之后是否要自动应答 true 代表自动应答 false 手动应答         * 3.生产胜利的回调         * 4.消费者未胜利生产的回调         */        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);    }}

成果:

生产者:

消费者1:

消费者2:

2.MQ的音讯应答

咱们构想这样一个问题。Rabbitmq是如何保障音讯被稳固生产的?假如生产一个音讯要很长一段时间,然而消费者生产到一半就挂掉了,这个时候Rabbitmq就失落了这条音讯。
为了解决这样的问题,rabbitmq引入了音讯应答机制,音讯应答就是:消费者在承受到音讯并且生产结束之后,通知mq它曾经解决了,rabbitmq能够把音讯删除了。

2.1)自动应答
自动应答就是音讯发送之后就能够被认为发送胜利,这种模式须要在高吞吐量和数据传输安全性方面作出衡量,它没有对音讯是否胜利生产,发送音讯数量进行限度,可能会导致音讯失落,音讯大量积压,所以这种模式仅实用在消费者能够高效并以某种速率解决这些音讯的状况下应用。

2.2)手动应答
每一条音讯都须要消费者手动应答,如果消费者因为某种原因失去连贯,导致音讯未发送ack确认,Rabbitmq晓得了生产未被解决会将其从新插入队列,如果此时有其它的消费者能够解决,就将音讯发送给另外一个消费者。保障了音讯被确认生产的稳定性,也能够避免消费者音讯的积压。

手动应答代码演示:

生产者:

public class Consumer {    private final static String QUEUE_NAME = "hello";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMqUtils.getChannel();        System.out.println("期待接管音讯.........");        //音讯如何进行生产的业务逻辑        DeliverCallback deliverCallback = (consumerTag, delivery)-> {            String message = new String(delivery.getBody());            try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); }            System.out.println(message);            /**             * 1.音讯tag             * 2.是否批量应答             */            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),true);        };        //勾销生产的一个回调接口 如在生产的时候队列被删除掉了        CancelCallback cancelCallback = (consumerTag) -> {            System.out.println("音讯生产被中断");         };        /**         * 消费者生产音讯         * 1.生产哪个队列         * 2.生产胜利之后是否要自动应答 true 代表自动应答 false 手动应答         * 3.生产胜利的回调         * 4.消费者未胜利生产的回调         */        Boolean autoAck = false;        channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);    }}

消费者:

public class Producer {    private final static String QUEUE_NAME = "hello";    public static void main(String[] args) throws Exception {        //channel 实现了主动 close 接口 主动敞开 不须要本人进行敞开        try (Channel channel = RabbitMqUtils.getChannel()) {            /**             * 申明一个队列             * 1.队列名称             * 2.队列外面的音讯是否长久化 默认音讯存储在内存中             * 3.该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产             * 4.是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除             * 5.其余参数             */            channel.queueDeclare(QUEUE_NAME, false, false, false, null);            //从控制台当中承受信息            Scanner scanner = new Scanner(System.in);            while (scanner.hasNext()){                String message = scanner.next();                /**                 * 发送一个音讯                 * 1.发送到那个交换机                 * 2.路由的 key 是哪个                 * 3.其余的参数信息                 * 4.发送音讯的音讯体                 */                channel.basicPublish("",QUEUE_NAME,null,message.getBytes());                System.out.println("发送音讯实现:"+message);            }        }    }}

咱们当初开启两个消费者,而后生产者给消费者发送音讯,等发送了足够多的音讯之后,关掉一个消费者,让其中一个正在生产的音讯失落,看看队列会不会再进行重发。

生产者一共发了14条音讯:

消费者2生产到一半,进行服务

消费者1帮消费者2生产了消费者2未生产完的数据:

音讯确认的api:

Channel.basicAck(用于必定确认) RabbitMQ 已晓得该音讯并且胜利的解决音讯,能够将其抛弃了Channel.basicNack(用于否定确认) Channel.basicReject(用于否定确认) 与 Channel.basicNack 相比少一个参数,代表不解决该音讯了间接回绝,能够将其抛弃了

咱们在下面编写代码的时候,还发现了一个是否批量应答的标记位:

Multiple的true和false代表的含意不同:

true:
代表批量应答

false:
代表单个应答

2.3)音讯的不偏心散发
咱们发现发送音讯的时候,消费者解决的速度有快有慢,在这种场景下,轮询发送的效率如同不是最优的,所以咱们能够进行不偏心的散发,哪个消费者解决地快,就先发送给哪个消费者。

int prefetchCount = 1;channel.basicQos(prefetchCount);

有了这个值,rabbitmq就会把该任务分配给没有那么忙的消费者,谁解决地快,谁就生产。

留神,这个配置要在生产者和消费者同时配置。

咱们发送二十条音讯:

消费者1:

消费者2:

此时就是依照消费者的能力生产音讯了。

同时咱们也要留神一下,这个值还有一个概念,那就是预取值

音讯在发送的时候,实质上是异步生产的,因而消费者这里必定会有一个音讯的缓冲区,开发人员心愿限度该缓冲区的大小,以防止缓冲区外面无限度寄存未确认音讯的问题。这个时候,就能够应用basic.qos办法设置"预取值"该值就定义了容许未确认音讯的最大数量。例如取值为4,则rabbitMQ会在未确认信息为4之后,就不发送信息。找到一个适合的值是一个重复试探的过程,大略在100~300之间,取值为1最激进,然而这样吞吐量会变得很低。

3.MQ的长久化

刚刚咱们解决了消费者如何不失落生产的状况,然而咱们无奈防止Rabbitmq自身挂掉的状况,如何保障Rabbitmq服务停掉后,自身存储在Rabbotmq外面的音讯不失落,咱们须要把队列和音讯都标记为长久化。

3.1)队列如何长久化

咱们之前创立的队列都是非长久化的,所以如果要创立同名的长久化队列,咱们须要把以前的队列删除了才能够。

在申明队列的时候,同时要把durable参数设置为true.

/**             * 申明一个队列             * 1.队列名称             * 2.队列外面的音讯是否长久化 默认音讯存储在内存中             * 3.该队列是否只供一个消费者进行生产 是否进行共享 true 能够多个消费者生产             * 4.是否主动删除 最初一个消费者端开连贯当前 该队列是否主动删除 true 主动删除             * 5.其余参数             */            Boolean durable = true;            channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

咱们会发现队列曾经长久化了:

3.2)音讯如何长久化
要想让音讯长久化,须要在发送音讯的时候,减少MessageProperties.PERSISTENT_BASIC参数。

                channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_BASIC,message.getBytes());                System.out.println("发送音讯实现:"+message);

其实将音讯标记为长久化并不能保障齐全不丢音讯,可能Rabbitmq在将音讯保留到磁盘的时候,音讯还没存储完,然而队列就挂了,如果须要更无效的长久化机制,能够参考前面的公布确认章节。