简略模式队列

 在这部分的使用指南中,咱们要用 Java 写两个程序;一个是生产者,他发送一个音讯,另一个是消费者,它接管音讯,并且把音讯打印进去。咱们将会疏忽一些Java API 的细节,而是将注意力次要放在咱们将要做的这件事上,这件事就是发送一个 "Hello World" 音讯。

 在上面的图中,"P" 代表生产者,而 "C" 代表消费者。两头的就是一个 Queue,一个音讯缓存区。

创立我的项目

增加依赖

<!-- rabbitmq依赖 --><dependency>    <groupId>com.rabbitmq</groupId>    <artifactId>amqp-client</artifactId>    <version>5.6.0</version></dependency>

Sending

咱们把音讯发送者叫 Send,音讯接收者叫 Recv。音讯发送者连贯 RabbitMQ ,发送一个音讯,而后退出。

Send.java

package com.xxxx.simple.send;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 简略模式队列-音讯发送者 */public class Send {    // 队列名称    private final static String QUEUE_NAME = "hello";    public static void main(String[] args) {        // 创立连贯工厂        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        factory.setPort(5672);        factory.setUsername("shop");        factory.setPassword("shop");        factory.setVirtualHost("/shop");        Connection connection = null;        Channel channel = null;        try {            // 通过工厂创立连贯            connection = factory.newConnection();            // 获取通道            channel = connection.createChannel();            /**             * 申明队列             *  第一个参数queue:队列名称             *  第二个参数durable:是否长久化             *  第三个参数Exclusive:排他队列,如果一个队列被申明为排他队列,该队列仅对首次申明它的连贯可见,并在连贯断开时主动删除。             *      这里须要留神三点:             *          1. 排他队列是基于连贯可见的,同一连贯的不同通道是能够同时拜访同一个连贯创立的排他队列的。             *          2. "首次",如果一个连贯曾经申明了一个排他队列,其余连贯是不容许建设同名的排他队列的,这个与一般队列不同。             *          3. 即便该队列是长久化的,一旦连贯敞开或者客户端退出,该排他队列都会被主动删除的。             *          这种队列实用于只限于一个客户端发送读取音讯的利用场景。             *  第四个参数Auto-delete:主动删除,如果该队列没有任何订阅的消费者的话,该队列会被主动删除。             *                          这种队列实用于长期队列。             */            channel.queueDeclare(QUEUE_NAME, false, false, false, null);            // 创立音讯            String message = "Hello World!";            // 将产生的音讯放入队列            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));            System.out.println(" [x] Sent '" + message + "'");        } catch (IOException e) {            e.printStackTrace();        } catch (TimeoutException e) {            e.printStackTrace();        } finally {            try {                // 敞开通道                if (null != channel && channel.isOpen())                    channel.close();                // 敞开连贯                if (null != connection && connection.isOpen())                    connection.close();            } catch (TimeoutException e) {                e.printStackTrace();            } catch (IOException e) {                e.printStackTrace();            }        }    }}

音讯发送胜利当前,通过RabbitMQ治理界面能够看到队列的相干信息

Receiving

音讯的发送者只是发送一个音讯,咱们的接收者须要一直的监听音讯,并把它们打印进去。

Recv.java

package com.xxxx.simple.recv;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 简略模式队列-音讯接收者 */public class Recv {    // 队列名称    private final static String QUEUE_NAME = "hello";    public static void main(String[] args) {        // 创立连贯工厂        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        factory.setPort(5672);        factory.setUsername("shop");        factory.setPassword("shop");        factory.setVirtualHost("/shop");        try {            // 通过工厂创立连贯            Connection connection = factory.newConnection();            // 获取通道            Channel channel = connection.createChannel();            // 指定队列            channel.queueDeclare(QUEUE_NAME, false, false, false, null);            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");            // ---------------------之前旧版本的写法-------begin-----------            /*            // 获取音讯            Consumer consumer = new DefaultConsumer(channel) {                @Override                public void handleDelivery(String consumerTag, Envelope envelope,                                           AMQP.BasicProperties properties,                                           byte[] body) throws IOException {                    // 获取音讯并在控制台打印                    String message = new String(body, "utf-8");                    System.out.println(" [x] Received '" + message + "'");                }            };            // 监听队列            channel.basicConsume(QUEUE_NAME, true, consumer);            */            // ---------------------之前旧版本的写法--------end------------            // 获取音讯            DeliverCallback deliverCallback = (consumerTag, delivery) -> {                String message = new String(delivery.getBody(), "UTF-8");                System.out.println(" [x] Received '" + message + "'");            };            // 监听队列            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {            });        } catch (IOException e) {            e.printStackTrace();        } catch (TimeoutException e) {            e.printStackTrace();        }    }}

音讯接管胜利当前,通过RabbitMQ治理界面能够看到队列的相干信息

测试

运行Send.java

运行Recv.java

总结

问题:如果任务量很大,音讯得不到及时的生产会造成队列积压,问题十分重大,比方内存溢出,音讯失落等。

解决:配置多个消费者生产音讯。

总结:简略队列-解决音讯效率不高,吞吐量较低,不适宜生成环境

Work queues-工作模式队列

工作模式队列-音讯轮询散发(Round-robin)

 通过Helloworld工程咱们曾经可能构建一个简略的音讯队列的根本我的项目,我的项目中存在几个角色:生产者、消费者、队列,而对于咱们实在的开发中,对于音讯的消费者通过是有多个的,比方在实现用户注册性能时,用户注册胜利,会给响对应用户发送邮件,同时给用户发送手机短信,通知用户已胜利注册网站或者app 利用,这种性能在大部分我的项目开发中都比拟常见,而对于helloworld 的利用中尽管可能对音讯进行生产,然而有很大问题:音讯消费者只有一个,当音讯量十分大时,单个消费者解决音讯就会变得很慢,同时给节点页带来很大压力,导致音讯沉积越来越多。对于这种状况,RabbitMQ 提供了工作队列模式,通过工作队列提供做个消费者,对MQ产生的音讯进行生产,进步MQ音讯的吞吐率,升高音讯的解决工夫。解决模型图如下

Sending

Send.java

package com.xxxx.work.roundRobin.send;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 工作模式队列-轮询散发-音讯发送者 */public class Send {    // 队列名称    private final static String QUEUE_NAME = "work_roundRobin";    public static void main(String[] args) {        // 创立连贯工厂        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        factory.setPort(5672);        factory.setUsername("shop");        factory.setPassword("shop");        factory.setVirtualHost("/shop");        Connection connection = null;        Channel channel = null;        try {            // 通过工厂创立连贯            connection = factory.newConnection();            // 获取通道            channel = connection.createChannel();            // 申明队列            channel.queueDeclare(QUEUE_NAME, false, false, false, null);            // 创立音讯            for (int i = 1; i <= 20; i++) {                String message = "Hello World! ----- " + i;                // 将产生的音讯放入队列                channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));                System.out.println(" [x] Sent '" + message + "'");            }        } catch (IOException e) {            e.printStackTrace();        } catch (TimeoutException e) {            e.printStackTrace();        } finally {            try {                // 敞开通道                if (null != channel && channel.isOpen())                    channel.close();                // 敞开连贯                if (null != connection && connection.isOpen())                    connection.close();            } catch (TimeoutException e) {                e.printStackTrace();            } catch (IOException e) {                e.printStackTrace();            }        }    }}

Receiving

Recv01.java

package com.xxxx.work.roundRobin.recv;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 工作模式队列-轮询散发-音讯接收者 */public class Recv01 {    // 队列名称    private final static String QUEUE_NAME = "work_roundRobin";    public static void main(String[] args) {        // 创立连贯工厂        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        factory.setPort(5672);        factory.setUsername("shop");        factory.setPassword("shop");        factory.setVirtualHost("/shop");        try {            // 通过工厂创立连贯            Connection connection = factory.newConnection();            // 获取通道            Channel channel = connection.createChannel();            // 指定队列            channel.queueDeclare(QUEUE_NAME, false, false, false, null);            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");            // 获取音讯            DeliverCallback deliverCallback = (consumerTag, delivery) -> {                String message = new String(delivery.getBody(), "UTF-8");                System.out.println(" [x] Received01 '" + message + "'");                // 模拟程序执行所耗时间                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }            };            // 监听队列            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {            });        } catch (IOException e) {            e.printStackTrace();        } catch (TimeoutException e) {            e.printStackTrace();        }    }}

Recv02.java

package com.xxxx.work.roundRobin.recv;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 工作模式队列-轮询散发-音讯接收者 */public class Recv02 {    // 队列名称    private final static String QUEUE_NAME = "work_roundRobin";    public static void main(String[] args) {        // 创立连贯工厂        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        factory.setPort(5672);        factory.setUsername("shop");        factory.setPassword("shop");        factory.setVirtualHost("/shop");        try {            // 通过工厂创立连贯            Connection connection = factory.newConnection();            // 获取通道            Channel channel = connection.createChannel();            // 指定队列            channel.queueDeclare(QUEUE_NAME, false, false, false, null);            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");            // 获取音讯            DeliverCallback deliverCallback = (consumerTag, delivery) -> {                String message = new String(delivery.getBody(), "UTF-8");                System.out.println(" [x] Received02 '" + message + "'");                // 模拟程序执行所耗时间                try {                    Thread.sleep(2000);                } catch (InterruptedException e) {                    e.printStackTrace();                }            };            // 监听队列            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {            });        } catch (IOException e) {            e.printStackTrace();        } catch (TimeoutException e) {            e.printStackTrace();        }    }}

测试

运行Send

运行Recv

总结

 从后果能够看出音讯被平均分配到两个生产方,来对音讯进行解决,进步了音讯解决效率,创立多个消费者来对音讯进行解决。这里RabitMQ采纳轮询来对音讯进行散发时保障了音讯被平均分配到每个生产方,然而引入新的问题:真正的生产环境下,对于音讯的解决根本不会像咱们当初看到的这样,每个生产方解决的音讯数量是平均分配的,比方因为网络起因,机器cpu,内存等硬件问题,生产方解决音讯时同类音讯不同机器进行解决时耗费工夫也是不一样的,比方1号消费者生产1条音讯时1秒,2号消费者生产1条音讯是5秒,对于1号消费者比2号消费者解决音讯快,那么在调配音讯时就应该让1号消费者多收到音讯进行解决,也即是咱们通常所说的”能者多劳”,同样Rabbitmq对于这种音讯分配模式提供了反对。

 问题:任务量很大,音讯尽管失去了及时的生产,单位工夫内音讯处理速度放慢,进步了吞吐量,可是不同消费者解决音讯的工夫不同,导致局部消费者的资源被节约。

解决:采纳音讯偏心散发。

 总结:工作队列-音讯轮询散发-消费者收到的音讯数量平均分配,单位工夫内音讯处理速度放慢,进步了吞吐量。

工作模式队列-音讯偏心散发(fair dispatch)

 在案例01中对于音讯散发采纳的是默认轮询散发,音讯应答采纳的自动应答模式,这是因为当音讯进入队列,RabbitMQ就会分派音讯。它不看消费者为应答的数目,只是自觉的将第n条音讯发给第n个消费者。

 为了解决这个问题,咱们应用basicQos(prefetchCount = 1)办法,来限度RabbitMQ只发不超过1条的音讯给同一个消费者。当音讯处理完毕后,有了反馈,才会进行第二次发送。执行模型图如下:

Sending

Send.java

package com.xxxx.work.fair.send;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 工作模式队列-偏心散发-音讯发送者 */public class Send {    // 队列名称    private final static String QUEUE_NAME = "work_fair";    public static void main(String[] args) {        // 创立连贯工厂        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        factory.setPort(5672);        factory.setUsername("shop");        factory.setPassword("shop");        factory.setVirtualHost("/shop");        Connection connection = null;        Channel channel = null;        try {            // 通过工厂创立连贯            connection = factory.newConnection();            // 获取通道            channel = connection.createChannel();            // 申明队列            channel.queueDeclare(QUEUE_NAME, false, false, false, null);            // 创立音讯            for (int i = 1; i <= 20; i++) {                String message = "Hello World! ----- " + i;                // 将产生的音讯放入队列                channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));                System.out.println(" [x] Sent '" + message + "'");            }        } catch (IOException e) {            e.printStackTrace();        } catch (TimeoutException e) {            e.printStackTrace();        } finally {            try {                // 敞开通道                if (null != channel && channel.isOpen())                    channel.close();                // 敞开连贯                if (null != connection && connection.isOpen())                    connection.close();            } catch (TimeoutException e) {                e.printStackTrace();            } catch (IOException e) {                e.printStackTrace();            }        }    }}

Receiving

Recv01.java

package com.xxxx.work.fair.recv;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 工作模式队列-偏心散发-音讯接收者 */public class Recv01 {    // 队列名称    private final static String QUEUE_NAME = "work_fair";    public static void main(String[] args) {        // 创立连贯工厂        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        factory.setPort(5672);        factory.setUsername("shop");        factory.setPassword("shop");        factory.setVirtualHost("/shop");        try {            // 通过工厂创立连贯            final Connection connection = factory.newConnection();            // 获取通道            final Channel channel = connection.createChannel();            // 指定队列            channel.queueDeclare(QUEUE_NAME, false, false, false, null);            /*                限度RabbitMQ只发不超过1条的音讯给同一个消费者。                当音讯处理完毕后,有了反馈,才会进行第二次发送。             */            int prefetchCount = 1;            channel.basicQos(prefetchCount);            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");            // 获取音讯            DeliverCallback deliverCallback = (consumerTag, delivery) -> {                String message = new String(delivery.getBody(), "UTF-8");                System.out.println(" [x] Received01 '" + message + "'");                // 模拟程序执行所耗时间                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }                // 手动回执音讯                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);            };            // 监听队列            /*                autoAck = true代表主动确认音讯                autoAck = false代表手动确认音讯             */            boolean autoAck = false;            channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {            });        } catch (IOException e) {            e.printStackTrace();        } catch (TimeoutException e) {            e.printStackTrace();        }    }}

Recv02.java

package com.xxxx.work.fair.recv;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 工作队列-偏心散发-音讯接收者 */public class Recv02 {    // 队列名称    private final static String QUEUE_NAME = "work_fair";    public static void main(String[] args) {        // 创立连贯工厂        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        factory.setPort(5672);        factory.setUsername("shop");        factory.setPassword("shop");        factory.setVirtualHost("/shop");        try {            // 通过工厂创立连贯            final Connection connection = factory.newConnection();            // 获取通道            final Channel channel = connection.createChannel();            // 指定队列            channel.queueDeclare(QUEUE_NAME, false, false, false, null);            /*                限度RabbitMQ只发不超过1条的音讯给同一个消费者。                当音讯处理完毕后,有了反馈,才会进行第二次发送。             */            int prefetchCount = 1;            channel.basicQos(prefetchCount);            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");            // 获取音讯            DeliverCallback deliverCallback = (consumerTag, delivery) -> {                String message = new String(delivery.getBody(), "UTF-8");                System.out.println(" [x] Received02 '" + message + "'");                // 模拟程序执行所耗时间                try {                    Thread.sleep(2000);                } catch (InterruptedException e) {                    e.printStackTrace();                }                // 手动回执音讯                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);            };            // 监听队列            /*                autoAck = true代表主动确认音讯                autoAck = false代表手动确认音讯             */            boolean autoAck = false;            channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {            });        } catch (IOException e) {            e.printStackTrace();        } catch (TimeoutException e) {            e.printStackTrace();        }    }}

测试

运行Send

运行Recv

总结

从后果能够看出1号消费者生产音讯数量显著高于2号,即音讯通过fair 机制被偏心散发到每个消费者。

问题:生产者产生的音讯只能够被一个消费者生产,可不可以被多个消费者生产呢?

解决:采纳公布与订阅模式。

总结:工作队列-偏心轮询散发-依据不同消费者机器硬件配置,音讯处理速度不同,收到的音讯数量也不同,通常速度快的解决的音讯数量比拟多,最大化应用计算机资源。实用于生成环境。

Publish/Subscribe-音讯的公布与订阅模式队列

 对于微信公众号,置信每个人都订阅过,当公众号发送新的音讯后,对于订阅过该公众号的所有用户均能够收到音讯,这个场景大家都能明确,同样对于RabbitMQ音讯的解决也反对这种音讯解决,当生产者把音讯投送进来后,不同的消费者均能够对该音讯进行生产,而不是音讯被一个消费者生产后就立刻从队列中删除,对于这种音讯解决,咱们通常称之为音讯的公布与订阅模式,但凡消费者订阅了该音讯,均可能收到对应音讯进行解决,比拟常见的如用户注册操作。模型图如下:

从图中看到:

  • 音讯产生后不是间接投送到队列中,而是将音讯先投送给Exchange交换机,而后音讯通过Exchange 交换机投递到相干队列
  • 多个消费者生产的不再是同一个队列,而是每个消费者生产属于本人的队列。

具体实现外围代码如下:

Sending

Send.java

package com.xxxx.publish.subscribe.fanout.send;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 公布与订阅模式队列-fanout播送模式-音讯发送者 */public class Send {    // 队列名称    // 如果不申明队列,会应用默认值,RabbitMQ会创立一个排他队列,连贯断开后主动删除    //private final static String QUEUE_NAME = "ps_fanout";    // 交换机名称    private static final String EXCHANGE_NAME = "exchange_fanout";    public static void main(String[] args) {        // 创立连贯工厂        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        factory.setPort(5672);        factory.setUsername("shop");        factory.setPassword("shop");        factory.setVirtualHost("/shop");        Connection connection = null;        Channel channel = null;        try {            // 通过工厂创立连贯            connection = factory.newConnection();            // 获取通道            channel = connection.createChannel();            // 申明队列            //channel.queueDeclare(QUEUE_NAME, false, false, false, null);            // 绑定交换机 fanout:播送模式            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);            // 创立音讯,模仿发送手机号码和邮件地址            String message = "18600002222|12345@qq.com";            // 将产生的音讯发送至交换机            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));            System.out.println(" [x] Sent '" + message + "'");        } catch (IOException e) {            e.printStackTrace();        } catch (TimeoutException e) {            e.printStackTrace();        } finally {            try {                // 敞开通道                if (null != channel && channel.isOpen())                    channel.close();                // 敞开连贯                if (null != connection && connection.isOpen())                    connection.close();            } catch (TimeoutException e) {                e.printStackTrace();            } catch (IOException e) {                e.printStackTrace();            }        }    }}

Receiving

这里对于消费者,生产音讯时,音讯通过交换机Exchange被路由到指定队列,绑定队列到指定交换机来实现,一个消费者接到音讯后用于邮件发送模仿,另一消费者收到音讯,用于短信发送模仿。

Recv01.java

package com.xxxx.publish.subscribe.fanout.recv;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 公布与订阅模式队列-fanout播送模式-音讯接收者 */public class Recv01 {    // 交换机名称    private static final String EXCHANGE_NAME = "exchange_fanout";    public static void main(String[] args) {        // 创立连贯工厂        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        factory.setPort(5672);        factory.setUsername("shop");        factory.setPassword("shop");        factory.setVirtualHost("/shop");        try {            // 通过工厂创立连贯            final Connection connection = factory.newConnection();            // 获取通道            final Channel channel = connection.createChannel();            // 绑定交换机 fanout:播送模式            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);            // 获取队列名称            String queueName = channel.queueDeclare().getQueue();            // 绑定队列            channel.queueBind(queueName, EXCHANGE_NAME, "");            /*                限度RabbitMQ只发不超过1条的音讯给同一个消费者。                当音讯处理完毕后,有了反馈,才会进行第二次发送。             */            int prefetchCount = 1;            channel.basicQos(prefetchCount);            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");            // 获取音讯,按|宰割当前一个消费者发短信,一个消费者发邮件            DeliverCallback deliverCallback = (consumerTag, delivery) -> {                String message = new String(delivery.getBody(), "UTF-8");                System.out.println(" [x] Received01 '" + message + "'");                // 手动回执音讯                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);            };            // 监听队列            /*                autoAck = true代表主动确认音讯                autoAck = false代表手动确认音讯             */            boolean autoAck = false;            channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {            });        } catch (IOException e) {            e.printStackTrace();        } catch (TimeoutException e) {            e.printStackTrace();        }    }}

Recv02.java

package com.xxxx.publish.subscribe.fanout.recv;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 公布与订阅模式队列-fanout播送模式-音讯接收者 */public class Recv02 {    // 交换机名称    private static final String EXCHANGE_NAME = "exchange_fanout";    public static void main(String[] args) {        // 创立连贯工厂        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        factory.setPort(5672);        factory.setUsername("shop");        factory.setPassword("shop");        factory.setVirtualHost("/shop");        try {            // 通过工厂创立连贯            final Connection connection = factory.newConnection();            // 获取通道            final Channel channel = connection.createChannel();            // 绑定交换机 fanout:播送模式            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);            // 获取队列名称            String queueName = channel.queueDeclare().getQueue();            // 绑定队列            channel.queueBind(queueName, EXCHANGE_NAME, "");            /*                限度RabbitMQ只发不超过1条的音讯给同一个消费者。                当音讯处理完毕后,有了反馈,才会进行第二次发送。             */            int prefetchCount = 1;            channel.basicQos(prefetchCount);            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");            // 获取音讯,按|宰割当前一个消费者发短信,一个消费者发邮件            DeliverCallback deliverCallback = (consumerTag, delivery) -> {                String message = new String(delivery.getBody(), "UTF-8");                System.out.println(" [x] Received01 '" + message + "'");                // 手动回执音讯                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);            };            // 监听队列            /*                autoAck = true代表主动确认音讯                autoAck = false代表手动确认音讯             */            boolean autoAck = false;            channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {            });        } catch (IOException e) {            e.printStackTrace();        } catch (TimeoutException e) {            e.printStackTrace();        }    }}

测试

运行Send

运行Recv

总结

从后果能够看出生产者发送了一条音讯,用于邮件发送和短信发送的消费者均能够收到音讯进行后续解决。

问题:生产者产生的音讯所有消费者都能够生产,可不可以指定某些消费者生产呢?

 解决:采纳direct路由模式。

Routing-路由模式队列

 通过案例03,能够看到,生产者将音讯投送给交换机后,音讯经交换机散发到不同的队列即:交换机收到音讯,默认对于绑定到每个交换机的队列均会接管到交换机散发的音讯,对于案例03的交换机的音讯散发Exchange Types为fanout类型,通常在真正我的项目开发时会遇到这种状况:在对我的项目信息输入日志进行收集时,会把日志(error warning,info)分类进行输入,这时通过Exchange Types中的direct类型就能够实现,针对不同的音讯,在对音讯进行生产时,通过Exchange types 以及 Routing key 设置的规定 ,便能够将不同音讯路由到不同的队列中而后交给不同消费者进行生产操作。模型图如下:

从图中能够看出:

  1. 生产者产生的音讯投给交换机
  2. 交换机投送音讯时的Exchange Types为direct类型
  3. 音讯通过定义的Routing Key被路由到指定的队列进行后续生产

具体实现外围代码如下:

Sending

Send.java

package com.xxxx.publish.subscribe.direct.send;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * direct路由模式队列-音讯发送者 */public class Send {    // 队列名称    // 如果不申明队列,会应用默认值,RabbitMQ会创立一个排他队列,连贯断开后主动删除    // 交换机名称    private static final String EXCHANGE_NAME = "exchange_direct";    public static void main(String[] args) {        // 创立连贯工厂        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        factory.setPort(5672);        factory.setUsername("shop");        factory.setPassword("shop");        factory.setVirtualHost("/shop");        Connection connection = null;        Channel channel = null;        try {            // 通过工厂创立连贯            connection = factory.newConnection();            // 获取通道            channel = connection.createChannel();            // 绑定交换机 direct:路由模式            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);            // 创立音讯,模仿收集不同级别日志            String message = "INFO音讯";            //String message = "WARNING音讯";            //String message = "ERROR音讯";            // 设置路由routingKey            String routingKey = "info";            //String routingKey = "error";            // 将产生的音讯发送至交换机            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));            System.out.println(" [x] Sent '" + message + "'");        } catch (IOException e) {            e.printStackTrace();        } catch (TimeoutException e) {            e.printStackTrace();        } finally {            try {                // 敞开通道                if (null != channel && channel.isOpen())                    channel.close();                // 敞开连贯                if (null != connection && connection.isOpen())                    connection.close();            } catch (TimeoutException e) {                e.printStackTrace();            } catch (IOException e) {                e.printStackTrace();            }        }    }}

Receiving

 消费者对音讯进行后续生产时,对于接管音讯的队列在对音讯进行接管时,绑定到每一个交换机上的队列均会指定其Routing Key规定,通过路由规定将音讯路由到执行队列中。

 消费者01 routingKey=info和warning,对应级别日志音讯均会路由到该队列中。

 Recv01.java

package com.xxxx.publish.subscribe.direct.recv;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * direct路由模式队列-音讯接收者 */public class Recv01 {    // 交换机名称    private static final String EXCHANGE_NAME = "exchange_direct";    public static void main(String[] args) {        // 创立连贯工厂        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        factory.setPort(5672);        factory.setUsername("shop");        factory.setPassword("shop");        factory.setVirtualHost("/shop");        try {            // 通过工厂创立连贯            final Connection connection = factory.newConnection();            // 获取通道            final Channel channel = connection.createChannel();            // 绑定交换机 direct:路由模式            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);            // 获取队列名称            String queueName = channel.queueDeclare().getQueue();            // 设置路由routingKey            String routingKeyInfo = "info";            String routingKeyWarning = "warning";            // 绑定队列            channel.queueBind(queueName, EXCHANGE_NAME, routingKeyInfo);            channel.queueBind(queueName, EXCHANGE_NAME, routingKeyWarning);            /*                限度RabbitMQ只发不超过1条的音讯给同一个消费者。                当音讯处理完毕后,有了反馈,才会进行第二次发送。             */            int prefetchCount = 1;            channel.basicQos(prefetchCount);            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");            // 获取音讯,按|宰割当前一个消费者发短信,一个消费者发邮件            DeliverCallback deliverCallback = (consumerTag, delivery) -> {                String message = new String(delivery.getBody(), "UTF-8");                System.out.println(" [x] Received01 '" + message + "'");                // 手动回执音讯                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);            };            // 监听队列            /*                autoAck = true代表主动确认音讯                autoAck = false代表手动确认音讯             */            boolean autoAck = false;            channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {            });        } catch (IOException e) {            e.printStackTrace();        } catch (TimeoutException e) {            e.printStackTrace();        }    }}

 消费者02 routingKey=error,对应级别日志音讯均会路由到该队列中。

 Recv02.java

package com.xxxx.publish.subscribe.direct.recv;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * direct路由模式队列-音讯接收者 */public class Recv02 {    // 交换机名称    private static final String EXCHANGE_NAME = "exchange_direct";    public static void main(String[] args) {        // 创立连贯工厂        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        factory.setPort(5672);        factory.setUsername("shop");        factory.setPassword("shop");        factory.setVirtualHost("/shop");        try {            // 通过工厂创立连贯            final Connection connection = factory.newConnection();            // 获取通道            final Channel channel = connection.createChannel();            // 绑定交换机 direct:路由模式            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);            // 获取队列名称            String queueName = channel.queueDeclare().getQueue();            // 设置路由routingKey            String routingKey = "error";            // 绑定队列            channel.queueBind(queueName, EXCHANGE_NAME, routingKey);            /*                限度RabbitMQ只发不超过1条的音讯给同一个消费者。                当音讯处理完毕后,有了反馈,才会进行第二次发送。             */            int prefetchCount = 1;            channel.basicQos(prefetchCount);            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");            // 获取音讯            DeliverCallback deliverCallback = (consumerTag, delivery) -> {                String message = new String(delivery.getBody(), "UTF-8");                System.out.println(" [x] Received02 '" + message + "'");                // 手动回执音讯                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);            };            // 监听队列            /*                autoAck = true代表主动确认音讯                autoAck = false代表手动确认音讯             */            boolean autoAck = false;            channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {            });        } catch (IOException e) {            e.printStackTrace();        } catch (TimeoutException e) {            e.printStackTrace();        }    }}

测试

运行Send

运行Recv

总结

从后果能够看出生产者发送了多条设置了路由规定的音讯,消费者能够依据具体的路由规定生产对应队列中的音讯,而不是所有消费者都能够生产所有音讯了。

问题:生产者产生的音讯如果场景需要过多须要设置很多路由规定,可不可以缩小?

 解决:采纳topic主题模式。

Topics-主题模式队列

 通过案例04看到音讯通过交换机Exchange Type以及Routing Key规定,能够将音讯路由到指定的队列,也合乎在工作中的场景去应用的一种形式,对于RabbitMq 除了direct 模式外,Mq 同样还提供了topics主题模式来对音讯进行匹配路由,比方在我的项目开发中,拿商品模块来说,对于商品的查问性能在对商品进行查问时咱们将查问音讯路由到查问对应队列,而对于商品的增加、更新、删除等操作咱们对立路由到另外一个队列来进行解决,此时采纳direct 模式能够实现,但对于保护的队列可能就不太容易进行保护,如果波及模块很多,此时对应队列数量就很多,此时咱们就能够通过topic主题模式来对音讯路由时进行匹配,通过指定的匹配模式将音讯路由到匹配到的队列中进行后续解决。对于routing key匹配模式定义规定举例如下:

  • routing key为一个句点号.分隔的字符串(咱们将被句点号.分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
  • routing key中能够存在两种特殊字符*#,用于做含糊匹配,其中*用于匹配一个单词,#用于匹配多个单词(能够是零个)

例如:

以上图中的配置为例:

  • routingKey=”quick.orange.rabbit”的音讯会同时路由到Q1与Q2,
  • routingKey=”lazy.orange.fox”的音讯会路由到Q1,Q2,
  • routingKey=”lazy.brown.fox”的音讯会路由到Q2,
  • routingKey=”lazy.pink.rabbit”的音讯会路由到Q2;
  • routingKey=”quick.brown.fox”;
  • routingKey=”orange”;
  • routingKey=”quick.orange.male.rabbit”的音讯将会被抛弃,因为它们没有匹配任何bindingKey。

具体实现外围代码:

Sending

 这里以商品模块为例,商品查问路由到商品查问队列,商品更新路由到商品更新(增加,更新,删除操作)队列中。

Send.java

package com.xxxx.publish.subscribe.topic.send;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * topic主题模式队列-音讯发送者 */public class Send {    // 队列名称    // 如果不申明队列,会应用默认值,RabbitMQ会创立一个排他队列,连贯断开后主动删除    // 交换机名称    private static final String EXCHANGE_NAME = "exchange_topic";    public static void main(String[] args) {        // 创立连贯工厂        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        factory.setPort(5672);        factory.setUsername("shop");        factory.setPassword("shop");        factory.setVirtualHost("/shop");        Connection connection = null;        Channel channel = null;        try {            // 通过工厂创立连贯            connection = factory.newConnection();            // 获取通道            channel = connection.createChannel();            // 绑定交换机 topic:主题模式            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);            // 创立音讯,模仿商品模块            String message = "商品查问操作";            //String message = "商品更新操作";            // 设置路由routingKey            String routingKey = "select.goods.byId";            //String routingKey = "update.goods.byId.andName";            // 将产生的音讯发送至交换机            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));            System.out.println(" [x] Sent '" + message + "'");        } catch (IOException e) {            e.printStackTrace();        } catch (TimeoutException e) {            e.printStackTrace();        } finally {            try {                // 敞开通道                if (null != channel && channel.isOpen())                    channel.close();                // 敞开连贯                if (null != connection && connection.isOpen())                    connection.close();            } catch (TimeoutException e) {                e.printStackTrace();            } catch (IOException e) {                e.printStackTrace();            }        }    }}

Receiving

Recv01.java

package com.xxxx.publish.subscribe.topic.recv;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * topic主题模式队列-音讯接收者 */public class Recv01 {    // 交换机名称    private static final String EXCHANGE_NAME = "exchange_topic";    public static void main(String[] args) {        // 创立连贯工厂        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        factory.setPort(5672);        factory.setUsername("shop");        factory.setPassword("shop");        factory.setVirtualHost("/shop");        try {            // 通过工厂创立连贯            final Connection connection = factory.newConnection();            // 获取通道            final Channel channel = connection.createChannel();            // 绑定交换机 topic:主题模式            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);            // 获取队列名称            String queueName = channel.queueDeclare().getQueue();            // 设置路由routingKey            String routingKey = "select.goods.*";            // 绑定队列            channel.queueBind(queueName, EXCHANGE_NAME, routingKey);            /*                限度RabbitMQ只发不超过1条的音讯给同一个消费者。                当音讯处理完毕后,有了反馈,才会进行第二次发送。             */            int prefetchCount = 1;            channel.basicQos(prefetchCount);            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");            // 获取音讯,按|宰割当前一个消费者发短信,一个消费者发邮件            DeliverCallback deliverCallback = (consumerTag, delivery) -> {                String message = new String(delivery.getBody(), "UTF-8");                System.out.println(" [x] Received01 '" + message + "'");                // 手动回执音讯                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);            };            // 监听队列            /*                autoAck = true代表主动确认音讯                autoAck = false代表手动确认音讯             */            boolean autoAck = false;            channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {            });        } catch (IOException e) {            e.printStackTrace();        } catch (TimeoutException e) {            e.printStackTrace();        }    }}

Recv02.java

package com.xxxx.publish.subscribe.topic.recv;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * topic主题模式队列-音讯接收者 */public class Recv02 {    // 交换机名称    private static final String EXCHANGE_NAME = "exchange_topic";    public static void main(String[] args) {        // 创立连贯工厂        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        factory.setPort(5672);        factory.setUsername("shop");        factory.setPassword("shop");        factory.setVirtualHost("/shop");        try {            // 通过工厂创立连贯            final Connection connection = factory.newConnection();            // 获取通道            final Channel channel = connection.createChannel();            // 绑定交换机 topic:主题模式            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);            // 获取队列名称            String queueName = channel.queueDeclare().getQueue();            // 设置路由routingKey            String routingKey = "update.goods.#";            // 绑定队列            channel.queueBind(queueName, EXCHANGE_NAME, routingKey);            /*                限度RabbitMQ只发不超过1条的音讯给同一个消费者。                当音讯处理完毕后,有了反馈,才会进行第二次发送。             */            int prefetchCount = 1;            channel.basicQos(prefetchCount);            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");            // 获取音讯            DeliverCallback deliverCallback = (consumerTag, delivery) -> {                String message = new String(delivery.getBody(), "UTF-8");                System.out.println(" [x] Received02 '" + message + "'");                // 手动回执音讯                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);            };            // 监听队列            /*                autoAck = true代表主动确认音讯                autoAck = false代表手动确认音讯             */            boolean autoAck = false;            channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {            });        } catch (IOException e) {            e.printStackTrace();        } catch (TimeoutException e) {            e.printStackTrace();        }    }}

测试

运行Send

运行Recv

总结

从后果能够看出生产者发送了多条设置了路由匹配规定(主题)的音讯,依据不同的路由匹配规定(主题),能够将音讯依据指定的routing key路由到匹配到的队列中,也是在生产中比拟常见的一种音讯解决形式。

问题:RabbitMQ自身是基于异步的音讯解决,是否能够同步实现?

 解决:采纳RPC模式。

RPC-近程过程调用模式队列

 MQ自身是基于异步的音讯解决,后面的示例中所有的生产者(P)将音讯发送到RabbitMQ后不会晓得消费者(C)解决胜利或者失败(甚至连有没有消费者来解决这条音讯都不晓得)。

 但理论的利用场景中,咱们很可能须要一些同步解决,须要同步期待服务端将我的音讯解决实现后再进行下一步解决。这相当于RPC(Remote Procedure Call,近程过程调用)。在RabbitMQ中也反对RPC。

 RabbitMQ中实现RPC的机制是:

  1. 客户端发送申请(音讯)时,在音讯的属性(MessageProperties,在AMQP协定中定义了14种properties,这些属性会随着音讯一起发送)中设置两个值replyTo(一个Queue名称,用于通知服务器解决实现后将告诉我的音讯发送到这个Queue中)和correlationId(此次申请的标识号,服务器解决实现后须要将此属性返还,客户端将依据这个id理解哪条申请被胜利执行了或执行失败)
  2. 服务器端收到音讯并解决
  3. 服务器端解决完音讯后,将生成一条应答音讯到replyTo指定的Queue,同时携带correlationId属性

 客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答音讯后,依据其中的correlationId属性剖析哪条申请被执行了,依据执行后果进行后续业务解决。

具体实现外围代码:

Server

Server.java

package com.xxxx.rpc.server;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * RPC模式队列-服务端 */public class RPCServer {    // 队列名称    private static final String RPC_QUEUE_NAME = "rpc_queue";    /**     * 计算斐波那契数列     *     * @param n     * @return     */    private static int fib(int n) {        if (n == 0) return 0;        if (n == 1) return 1;        return fib(n - 1) + fib(n - 2);    }    public static void main(String[] args) {        // 创立连贯工厂        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        factory.setPort(5672);        factory.setUsername("shop");        factory.setPassword("shop");        factory.setVirtualHost("/shop");        try {            // 通过工厂创立连贯            final Connection connection = factory.newConnection();            // 获取通道            final Channel channel = connection.createChannel();            // 申明队列            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);            channel.queuePurge(RPC_QUEUE_NAME);            /*                限度RabbitMQ只发不超过1条的音讯给同一个消费者。                当音讯处理完毕后,有了反馈,才会进行第二次发送。             */            int prefetchCount = 1;            channel.basicQos(prefetchCount);            System.out.println(" [x] Awaiting RPC requests");            Object monitor = new Object();            // 获取音讯            DeliverCallback deliverCallback = (consumerTag, delivery) -> {                // 获取replyTo队列和correlationId申请标识                AMQP.BasicProperties replyProps = new AMQP.BasicProperties                        .Builder()                        .correlationId(delivery.getProperties().getCorrelationId())                        .build();                String response = "";                try {                    // 接管客户端音讯                    String message = new String(delivery.getBody(), "UTF-8");                    int n = Integer.parseInt(message);                    System.out.println(" [.] fib(" + message + ")");                    // 服务端依据业务需要解决                    response += fib(n);                } catch (RuntimeException e) {                    System.out.println(" [.] " + e.toString());                } finally {                    // 将处理结果发送至replyTo队列同时携带correlationId属性                    channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps,                            response.getBytes("UTF-8"));                    // 手动回执音讯                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);                    // RabbitMq consumer worker thread notifies the RPC server owner thread                    // RabbitMq消费者工作线程告诉RPC服务器其余所有线程运行                    synchronized (monitor) {                        monitor.notify();                    }                }            };            // 监听队列            /*                autoAck = true代表主动确认音讯                autoAck = false代表手动确认音讯             */            boolean autoAck = false;            channel.basicConsume(RPC_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {            });            // Wait and be prepared to consume the message from RPC client.            // 线程期待并筹备接管来自RPC客户端的音讯            while (true) {                synchronized (monitor) {                    try {                        monitor.wait();                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                }            }        } catch (IOException e) {            e.printStackTrace();        } catch (TimeoutException e) {            e.printStackTrace();        }    }}

Client

Client.java

package com.xxxx.rpc.client;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.UUID;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeoutException;/** * RPC模式队列-客户端 */public class RPCClient implements AutoCloseable {    private Connection connection;    private Channel channel;    // 队列名称    private String requestQueueName = "rpc_queue";    // 初始化连贯    public RPCClient() throws IOException, TimeoutException {        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("127.0.0.1");        factory.setPort(5672);        factory.setUsername("shop");        factory.setPassword("shop");        factory.setVirtualHost("/shop");        connection = factory.newConnection();        channel = connection.createChannel();    }    public static void main(String[] args) {        try (RPCClient fibonacciRpc = new RPCClient()) {            for (int i = 0; i < 10; i++) {                String i_str = Integer.toString(i);                System.out.println(" [x] Requesting fib(" + i_str + ")");                // 申请服务端                String response = fibonacciRpc.call(i_str);                System.out.println(" [.] Got '" + response + "'");            }        } catch (IOException | TimeoutException | InterruptedException e) {            e.printStackTrace();        }    }    // 申请服务端    public String call(String message) throws IOException, InterruptedException {        // correlationId申请标识ID        final String corrId = UUID.randomUUID().toString();        // 获取队列名称        String replyQueueName = channel.queueDeclare().getQueue();        // 设置replyTo队列和correlationId申请标识        AMQP.BasicProperties props = new AMQP.BasicProperties                .Builder()                .correlationId(corrId)                .replyTo(replyQueueName)                .build();        // 发送音讯至队列        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));        // 设置线程期待,每次只接管一个响应后果        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);        // 承受服务器返回后果        String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {            if (delivery.getProperties().getCorrelationId().equals(corrId)) {                // 将给定的元素在给定的工夫内设置到线程队列中,如果设置胜利返回true, 否则返回false                response.offer(new String(delivery.getBody(), "UTF-8"));            }        }, consumerTag -> {        });        // 从线程队列中获取值,如果线程队列中没有值,线程会始终阻塞,直到线程队列中有值,并且获得该值        String result = response.take();        // 从音讯队列中抛弃该值        channel.basicCancel(ctag);        return result;    }    // 敞开连贯    public void close() throws IOException {        connection.close();    }}

测试

运行Server

运行Client

查看Server