RabbitMQ 应用场景

服务解耦

假如有这样一个场景, 服务A产生数据, 而服务B,C,D须要这些数据, 那么咱们能够在A服务中间接调用B,C,D服务,把数据传递到上游服务即可

然而,随着咱们的利用规模不断扩大,会有更多的服务须要A的数据,如果有几十甚至几百个上游服务,而且会一直变更,再加上还要思考上游服务出错的状况,那么A服务中调用代码的保护会极为艰难

这是因为服务之间耦合度过于严密

再来思考用RabbitMQ解耦的状况

A服务只须要向音讯服务器发送音讯,而不必思考谁须要这些数据;上游服务如果须要数据,自行从音讯服务器订阅音讯,不再须要数据时则勾销订阅即可

流量削峰

假如咱们有一个利用,平时访问量是每秒300申请,咱们用一台服务器即可轻松应答

而在高峰期,访问量霎时翻了十倍,达到每秒3000次申请,那么单台服务器必定无奈应答,这时咱们能够思考减少到10台服务器,来扩散拜访压力

但如果这种刹时顶峰的状况每天只呈现一次,每次只有半小时,那么咱们10台服务器在少数工夫都只分担每秒几十次申请,这样就有点浪费资源了

这种状况,咱们就能够应用RabbitMQ来进行流量削峰,顶峰状况下,霎时呈现的大量申请数据,先发送到音讯队列服务器,排队期待被解决,而咱们的利用,能够缓缓的从音讯队列接管申请数据进行解决,这样把数据处理工夫拉长,以加重刹时压力

这是音讯队列服务器十分典型的利用场景

异步调用

思考定外卖领取胜利的状况

领取后要发送领取胜利的告诉,再寻找外卖小哥来进行配送,而寻找外卖小哥的过程十分耗时,尤其是高峰期,可能要期待几十秒甚至更长

这样就造成整条调用链路响应十分迟缓

而如果咱们引入RabbitMQ音讯队列,订单数据能够发送到音讯队列服务器,那么调用链路也就能够到此结束,订单零碎则能够立刻失去响应,整条链路的响应工夫只有200毫秒左右

寻找外卖小哥的利用能够以异步的形式从音讯队列接管订单音讯,再执行耗时的寻找操作

rabbitmq 基本概念

RabbitMQ是一种消息中间件,用于解决来自客户端的异步音讯。服务端将要发送的音讯放入到队列池中。接收端能够依据RabbitMQ配置的转发机制接管服务端发来的音讯。RabbitMQ根据指定的转发规定进行音讯的转发、缓冲和长久化操作,次要用在多服务器间或单服务器的子系统间进行通信,是分布式系统规范的配置。

Exchange

承受生产者发送的音讯,并依据Binding规定将音讯路由给服务器中的队列。ExchangeType决定了Exchange路由音讯的行为。在RabbitMQ中,ExchangeType罕用的有direct、Fanout和Topic三种。

Message Queue

音讯队列。咱们发送给RabbitMQ的音讯最初都会达到各种queue,并且存储在其中(如果路由找不到相应的queue则数据会失落),期待消费者来取。

Binding Key

它示意的是Exchange与Message Queue是通过binding key进行分割的,这个关系是固定。

Routing Key

生产者在将音讯发送给Exchange的时候,个别会指定一个routing key,来指定这个音讯的路由规定。这个routing key须要与Exchange Type及binding key联结应用能力生,咱们的生产者只须要通过指定routing key来决定音讯流向哪里。

rabbitmq装置

离线装置

下载离线安装包文件

  • https://download.csdn.net/download/weixin_38305440/12265906

上传离线安装包

  • rabbitmq-install 目录上传到 /root

切换到rabbitmq-install目录

cd rabbitmq-install

装置

rpm -ivh *.rpm

Yum在线装置

以下内容来自 RabbitMQ 官网手册

rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc# centos7 用这个cat <<EOF > /etc/yum.repos.d/rabbitmq.repo[bintray-rabbitmq-server]name=bintray-rabbitmq-rpmbaseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/7/gpgcheck=0repo_gpgcheck=0enabled=1EOF# centos6 用这个cat <<EOF > /etc/yum.repos.d/rabbitmq.repo[bintray-rabbitmq-server]name=bintray-rabbitmq-rpmbaseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/6/gpgcheck=0repo_gpgcheck=0enabled=1EOFyum makecacheyum install socatwget https://github.com/rabbitmq/erlang-rpm/releases/download/v21.3.8.12/erlang-21.3.8.12-1.el7.x86_64.rpmrpm -ivh erlang-21.3.8.12-1.el7.x86_64.rpm --force --nodepsyum install rabbitmq-server

启动rabbitmq服务器

# 设置服务,开机主动启动systemctl enable rabbitmq-server# 启动服务systemctl start rabbitmq-server

rabbitmq治理界面

启用治理界面
# 开启治理界面插件rabbitmq-plugins enable rabbitmq_management# 防火墙关上 15672 治理端口firewall-cmd --zone=public --add-port=15672/tcp --permanentfirewall-cmd --reload

重启RabbitMQ服务

systemctl restart rabbitmq-server

拜访

拜访服务器的15672端口,例如:
http://192.168.64.140:15672

增加用户

# 增加用户rabbitmqctl add_user admin admin# 新用户设置用户为超级管理员rabbitmqctl set_user_tags admin administrator

设置拜访权限


  • 用户治理参考
    https://www.cnblogs.com/AloneSword/p/4200051.html

凋谢客户端连贯端口

# 关上客户端连贯端口firewall-cmd --zone=public --add-port=5672/tcp --permanentfirewall-cmd --reload

次要端口介绍

  • 4369 – erlang发现口
  • 5672 – client端通信口
  • 15672 – 治理界面ui端口
  • 25672 – server间外部通信口

rabbitmq六种工作模式

简略模式

RabbitMQ是一个消息中间件,你能够设想它是一个邮局。当你把函件放到邮箱里时,可能确信邮递员会正确地递送你的函件。RabbitMq就是一个邮箱、一个邮局和一个邮递员。

  • 发送音讯的程序是生产者
  • 队列就代表一个邮箱。尽管音讯会流经RbbitMQ和你的应用程序,但音讯只能被存储在队列里。队列存储空间只受服务器内存和磁盘限度,它实质上是一个大的音讯缓冲区。多个生产者能够向同一个队列发送音讯,多个消费者也能够从同一个队列接管音讯.
  • 消费者期待从队列接管音讯

pom.xml

增加 slf4j 依赖, 和 rabbitmq amqp 依赖

<project xmlns="http://maven.apache.org/POM/4.0.0"    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <groupId>com.tedu</groupId>    <artifactId>rabbitmq</artifactId>    <version>0.0.1-SNAPSHOT</version>    <dependencies>        <dependency>            <groupId>com.rabbitmq</groupId>            <artifactId>amqp-client</artifactId>            <version>5.4.3</version>        </dependency>        <dependency>            <groupId>org.slf4j</groupId>            <artifactId>slf4j-api</artifactId>            <version>1.8.0-alpha2</version>        </dependency>        <dependency>            <groupId>org.slf4j</groupId>            <artifactId>slf4j-log4j12</artifactId>            <version>1.8.0-alpha2</version>        </dependency>    </dependencies>    <build>        <plugins>            <plugin>                <groupId>org.apache.maven.plugins</groupId>                <artifactId>maven-compiler-plugin</artifactId>                <version>3.8.0</version>                <configuration>                    <source>1.8</source>                    <target>1.8</target>                </configuration>            </plugin>        </plugins>    </build></project>
生产者发送音讯
package m1;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Producer {    public static void main(String[] args) throws Exception {        //创立连贯工厂,并设置连贯信息        ConnectionFactory f = new ConnectionFactory();        f.setHost("192.168.64.140");        f.setPort(5672);//可选,5672是默认端口        f.setUsername("admin");        f.setPassword("admin");        /*         * 与rabbitmq服务器建设连贯,         * rabbitmq服务器端应用的是nio,会复用tcp连贯,         * 并开拓多个信道与客户端通信         * 以加重服务器端建设连贯的开销         */        Connection c = f.newConnection();        //建设信道        Channel ch = c.createChannel();        /*         * 申明队列,会在rabbitmq中创立一个队列         * 如果曾经创立过该队列,就不能再应用其余参数来创立         *          * 参数含意:         *   -queue: 队列名称         *   -durable: 队列长久化,true示意RabbitMQ重启后队列仍存在         *   -exclusive: 排他,true示意限度仅以后连贯可用         *   -autoDelete: 当最初一个消费者断开后,是否删除队列         *   -arguments: 其余参数         */        ch.queueDeclare("helloworld", false,false,false,null);        /*         * 公布音讯         * 这里把音讯向默认交换机发送.         * 默认交换机隐含与所有队列绑定,routing key即为队列名称         *          * 参数含意:         *     -exchange: 交换机名称,空串示意默认交换机"(AMQP default)",不能用 null          *     -routingKey: 对于默认交换机,路由键就是指标队列名称         *     -props: 其余参数,例如头信息         *     -body: 音讯内容byte[]数组         */        ch.basicPublish("", "helloworld", null, "Hello world!".getBytes());        System.out.println("音讯已发送");        c.close();    }}
消费者接管音讯
package m1;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Consumer {    public static void main(String[] args) throws Exception {        //连贯工厂        ConnectionFactory f = new ConnectionFactory();        f.setHost("192.168.64.140");        f.setUsername("admin");        f.setPassword("admin");        //建设连贯        Connection c = f.newConnection();        //建设信道        Channel ch = c.createChannel();        //申明队列,如果该队列曾经创立过,则不会反复创立        ch.queueDeclare("helloworld",false,false,false,null);        System.out.println("期待接收数据");                //收到音讯后用来解决音讯的回调对象        DeliverCallback callback = new DeliverCallback() {            @Override            public void handle(String consumerTag, Delivery message) throws IOException {                String msg = new String(message.getBody(), "UTF-8");                System.out.println("收到: "+msg);            }        };                //消费者勾销时的回调对象        CancelCallback cancel = new CancelCallback() {            @Override            public void handle(String consumerTag) throws IOException {            }        };                ch.basicConsume("helloworld", true, callback, cancel);    }}

工作模式

工作队列(即工作队列)背地的次要思维是防止立刻执行资源密集型工作,并且必须期待它实现。相同,咱们将工作安顿在稍后实现。

咱们将工作封装为音讯并将其发送到队列。后盾运行的工作过程将获取工作并最终执行工作。当运行多个消费者时,工作将在它们之间散发。

应用工作队列的一个长处是可能轻松地并行工作。如果咱们正在积压工作工作,咱们能够增加更多工作过程,这样就能够轻松扩大。

生产者发送音讯

这里模仿耗时工作,发送的音讯中,每个点使工作过程暂停一秒钟,例如"Hello…"将破费3秒钟来解决

package m2_work;import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;import java.util.Scanner;public class Producer {    public static void main(String[] args) throws Exception {        //连贯        ConnectionFactory f = new ConnectionFactory();        f.setHost("192.168.64.140");        //f.setPort(5672);//默认端口能够省略        f.setUsername("admin");        f.setPassword("admin");        Channel c = f.newConnection().createChannel();        //定义队列        //队列如果在服务器端曾经存在,属性不可变(第二个参数改为true)        c.queueDeclare("task_queue", false, false, false, null);        while (true){            System.out.print("输出音讯:");            String msg = new Scanner(System.in).nextLine();            if ("exit".equals(msg)) {                break;            }            c.basicPublish("", "helloworld", null, msg.getBytes());        }    }}

消费者接管音讯

package m2_work;import com.rabbitmq.client.*;import java.io.IOException;public class Consumer {    public static void main(String[] args) throws Exception {        //连贯        ConnectionFactory f = new ConnectionFactory();        f.setHost("192.168.64.140");        //f.setPort(5672);//默认端口能够省略        f.setUsername("admin");        f.setPassword("admin");        Channel c = f.newConnection().createChannel();        //定义队列        c.queueDeclare("helloworld", false, false, false, null);        // 解决音讯的回调对象        DeliverCallback deliverCallback = new DeliverCallback() {            @Override            public void handle(String consumerTag, Delivery message) throws IOException {                //遍历音讯字符串,每个点字符都暂停一秒                String msg = new String(message.getBody());                System.out.println("收到:"+msg);                for (int i = 0 ; i<msg.length();i++){                    if (msg.charAt(i)=='.'){                        try {                            Thread.sleep(1000);                        } catch (InterruptedException e) {                        }                    }                }                System.out.println("音讯解决完结");            }        };        CancelCallback cancelCallback = new CancelCallback() {            @Override            public void handle(String consumerTag) throws IOException {            }        };        //生产数据        //第二个参数:true - 主动确认 false - 手动确认        c.basicConsume("task_queue", true, deliverCallback, cancelCallback);    }}

运行测试

运行:

  • 一个生产者
  • 两个消费者

生产者发送多条音讯,
如: 1,2,3,4,5. 两个消费者别离收到:

  • 消费者一: 1,3,5
  • 消费者二: 2,4

rabbitmq在所有消费者中轮询散发音讯,把音讯平均地发送给所有消费者

音讯确认

一个消费者接管音讯后,在音讯没有齐全解决完时就挂掉了,那么这时会产生什么呢?

就当初的代码来说,rabbitmq把音讯发送给消费者后,会立刻删除音讯,那么消费者挂掉后,它没来得及解决的音讯就会失落

如果生产者发送以下音讯:    1…    2    3    4    5两个消费者别离收到:*   消费者一: 1…, 3, 5*   消费者二: 2, 4当消费者一收到所有音讯后,要话费7秒工夫来解决第一条音讯,这期间如果敞开该消费者,那么1未解决实现,3,5则没有被解决

咱们并不想失落任何音讯, 如果一个消费者挂掉,咱们想把它的工作音讯派发给其余消费者

为了确保音讯不会失落,rabbitmq反对音讯确认(回执)。当一个音讯被消费者接管到并且执行实现后,消费者会发送一个ack (acknowledgment) 给rabbitmq服务器, 通知他我曾经执行实现了,你能够把这条音讯删除了。

如果一个消费者没有返回音讯确认就挂掉了(信道敞开,连贯敞开或者TCP链接失落),rabbitmq就会明确,这个音讯没有被解决实现,rebbitmq就会把这条音讯从新放入队列,如果在这时有其余的消费者在线,那么rabbitmq就会迅速的把这条消息传递给其余的消费者,这样就确保了没有音讯会失落。

这里不存在音讯超时, rabbitmq只在消费者挂掉时从新分派音讯, 即便消费者花十分久的工夫来解决音讯也能够

手动音讯确认默认是开启的,后面的例子咱们通过autoAck=ture把它敞开了。咱们当初要把它设置为false,而后工作过程解决完动向工作时,发送一个音讯确认(回执)。

package m2_work;import com.rabbitmq.client.*;import java.io.IOException;public class Consumer {    public static void main(String[] args) throws Exception {        //连贯        ConnectionFactory f = new ConnectionFactory();        f.setHost("192.168.64.140");        //f.setPort(5672);//默认端口能够省略        f.setUsername("admin");        f.setPassword("admin");        Channel c = f.newConnection().createChannel();        //定义队列        c.queueDeclare("task_queue", false, false, false, null);        // 解决音讯的回调对象        DeliverCallback deliverCallback = new DeliverCallback() {            @Override            public void handle(String consumerTag, Delivery message) throws IOException {                //遍历音讯字符串,每个点字符都暂停一秒                String msg = new String(message.getBody());                System.out.println("收到:"+msg);                for (int i = 0 ; i<msg.length();i++){                    if (msg.charAt(i)=='.'){                        try {                            Thread.sleep(1000);                        } catch (InterruptedException e) {                        }                    }                }                //发送回执(参数1:回值(long类型的编码)参数2:是否一次确认多条(boolean,个别都是false))                c.basicAck(message.getEnvelope().getDeliveryTag(),false);//封装的回值对象(envelope)的标签(deliveryTag)                System.out.println("音讯解决完结");            }        };        CancelCallback cancelCallback = new CancelCallback() {            @Override            public void handle(String consumerTag) throws IOException {            }        };        //生产数据        //第二个参数:true - 主动确认 false - 手动确认        c.basicConsume("task_queue", false, deliverCallback, cancelCallback);    }}

应用以上代码,就算杀掉一个正在解决音讯的工作过程也不会失落任何音讯,工作过程挂掉之后,没有确认的音讯就会被主动从新传递。

遗记确认(ack)是一个常见的谬误, 这样结果是很重大的, 因为未确认的音讯不会被开释, rabbitmq会吃掉越来越多的内存

能够应用上面命令打印工作队列中未确认音讯的数量

rabbitmqctl list_queues name >messages_readymessages_unacknowledged

当解决音讯时异常中断, 能够抉择让音讯重回队列从新发送.
nack 操作能够是音讯重回队列, 能够应用 basicNack() 办法:

// requeue为true时重回队列, 反之音讯被抛弃或被发送到死信队列c.basicNack(tag, multiple, requeue)

正当地散发

rabbitmq会一次把多个音讯分发给消费者, 这样可能造成有的消费者十分忙碌, 而其它消费者闲暇. 而rabbitmq对此无所不知, 依然会平均的散发音讯

咱们能够应用 basicQos(1) 办法, 这通知rabbitmq一次只向消费者发送一条音讯, 在返回确认回执前, 不要向消费者发送新音讯. 而是把音讯发给下一个闲暇的消费者

音讯长久化

当rabbitmq敞开时, 咱们队列中的音讯依然会失落, 除非明确要求它不要失落数据
要求rabbitmq不失落数据要做如下两点: 把队列和音讯都设置为可长久化(durable)
队列设置为可长久化, 能够在定义队列时指定参数durable为true

//第二个参数是长久化参数durablech.queueDeclare("helloworld", true, false, false, null);

因为之前咱们曾经定义过队列"hello"是不可长久化的, 对已存在的队列, rabbitmq不容许对其定义不同的参数, 否则会出错, 所以这里咱们定义一个不同名字的队列"task_queue"

//定义一个新的队列,名为 task_queue//第二个参数是长久化参数 durablech.queueDeclare("task_queue", true, false, false, null);

生产者和消费者代码都要批改

这样即便rabbitmq重新启动, 队列也不会失落. 当初咱们再设置队列中音讯的长久化, 应用MessageProperties.PERSISTENT_TEXT_PLAIN参数

//第三个参数设置音讯长久化ch.basicPublish("", "task_queue",            MessageProperties.PERSISTENT_TEXT_PLAIN,            msg.getBytes());

上面是"工作模式"最终实现的生产者和消费者代码

生产者代码

package m2_work;import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;import java.util.Scanner;public class Producer {    public static void main(String[] args) throws Exception {        //连贯        ConnectionFactory f = new ConnectionFactory();        f.setHost("192.168.64.140");        //f.setPort(5672);//默认端口能够省略        f.setUsername("admin");        f.setPassword("admin");        Channel c = f.newConnection().createChannel();        //定义队列        //队列如果在服务器端曾经存在,属性不可变(第二个参数改为true)        c.queueDeclare("task_queue", true, false, false, null);        //发送音讯        //循环输出音讯发送        /*        输出音讯:ger.err....er.....er.....e....r......        消费者收到的音讯,遍历字符串,每个点字符都暂停一秒,模仿耗时音讯         */        while (true){            System.out.print("输出音讯:");            String msg = new Scanner(System.in).nextLine();            c.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());//第三个参数代表,长久的纯文本文件        }    }}

消费者代码

package m2_work;import com.rabbitmq.client.*;import java.io.IOException;public class Consumer {    public static void main(String[] args) throws Exception {        //连贯        ConnectionFactory f = new ConnectionFactory();        f.setHost("192.168.64.140");        //f.setPort(5672);//默认端口能够省略        f.setUsername("admin");        f.setPassword("admin");        Channel c = f.newConnection().createChannel();        //定义队列        c.queueDeclare("task_queue", true, false, false, null);        // 解决音讯的回调对象        DeliverCallback deliverCallback = new DeliverCallback() {            @Override            public void handle(String consumerTag, Delivery message) throws IOException {                //遍历音讯字符串,每个点字符都暂停一秒                String msg = new String(message.getBody());                System.out.println("收到:"+msg);                for (int i = 0 ; i<msg.length();i++){                    if (msg.charAt(i)=='.'){                        try {                            Thread.sleep(1000);                        } catch (InterruptedException e) {                        }                    }                }                //发送回执(参数1:回值(long类型的编码)参数2:是否一次确认多条(boolean,个别都是false))                c.basicAck(message.getEnvelope().getDeliveryTag(),false);//封装的回值对象(envelope)的标签(deliveryTag)                System.out.println("音讯解决完结");            }        };        CancelCallback cancelCallback = new CancelCallback() {            @Override            public void handle(String consumerTag) throws IOException {            }        };        //每次只承受解决一条音讯,解决实现之前不承受下一条        //必须在手动 ACK 模式下才无效        c.basicQos(1);        //生产数据        //第二个参数:true - 主动确认 false - 手动确认        c.basicConsume("task_queue", false, deliverCallback, cancelCallback);    }}

公布订阅模式

在后面的例子中,咱们工作音讯只交付给一个工作过程。在这部分,咱们将做一些齐全不同的事件——咱们将向多个消费者传递同一条音讯。这种模式称为“公布/订阅”。

为了阐明该模式,咱们将构建一个简略的日志零碎。它将由两个程序组成——第一个程序将收回日志音讯,第二个程序接管它们。

在咱们的日志零碎中,接管程序的每个运行正本都将取得音讯。这样,咱们就能够运行一个消费者并将日志保留到磁盘; 同时咱们能够运行另一个消费者在屏幕上打印日志。

最终, 音讯会被播送到所有音讯接受者

Exchanges 交换机

RabbitMQ消息传递模型的核心思想是,生产者永远不会将任何音讯间接发送到队列。实际上,通常生产者甚至不晓得音讯是否会被传递到任何队列。

相同,生产者只能向交换机(Exchange)发送音讯。交换机是一个非常简单的货色。一边接管来自生产者的音讯,另一边将音讯推送到队列。交换器必须确切地晓得如何解决它接管到的音讯。它应该被增加到一个特定的队列中吗?它应该增加到多个队列中吗?或者它应该被抛弃。这些规定由exchange的类型定义。

有几种可用的替换类型:direct、topic、header和fanout。咱们将关注最初一个——fanout。让咱们创立一个这种类型的交换机,并称之为 logs: ch.exchangeDeclare("logs", "fanout");

fanout交换机非常简单。它只是将接管到的所有音讯播送给它所晓得的所有队列。这正是咱们的日志零碎所须要的。

咱们后面应用的队列具备特定的名称(还记得hello和task_queue吗?)可能为队列命名对咱们来说至关重要——咱们须要将工作过程指向同一个队列,在生产者和消费者之间共享队列。

但日志记录案例不是这种状况。咱们想要接管所有的日志音讯,而不仅仅是其中的一部分。咱们还只对以后的最新消息感兴趣,而不是旧音讯。

要解决这个问题,咱们须要两件事。首先,每当咱们连贯到Rabbitmq时,咱们须要一个新的空队列。为此,咱们能够创立一个具备随机名称的队列,或者,更好的办法是让服务器为咱们抉择一个随机队列名称。其次,一旦断开与使用者的连贯,队列就会主动删除。在Java客户端中,当咱们不向queueDeclare()提供任何参数时,会创立一个具备生成名称的、非长久的、独占的、主动删除队列

//主动生成队列名//非长久,独占,主动删除String queueName = ch.queueDeclare().getQueue();

绑定 Bindings

咱们曾经创立了一个fanout交换机和一个队列。当初咱们须要通知exchange向指定队列发送音讯。exchange和队列之间的关系称为绑定。

//指定的队列,与指定的交换机关联起来//成为绑定 -- binding//第三个参数时 routingKey, 因为是fanout交换机, 这里疏忽 routingKeych.queueBind(queueName, "logs", "");

当初, logs交换机将会向咱们指定的队列增加音讯

列出绑定关系:
rabbitmqctl list_bindings

实现的代码

生产者

生产者收回日志音讯,看起来与前一教程没有太大不同。最重要的更改是,咱们当初心愿将音讯公布到logs交换机,而不是无名的日志交换机。咱们须要在发送时提供一个routingKey,然而对于fanout交换机类型,该值会被疏忽。

package m3_pub_sub;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.Scanner;import java.util.concurrent.TimeoutException;public class Producer {    public static void main(String[] args) throws IOException, TimeoutException {        //连贯        ConnectionFactory f = new ConnectionFactory();        f.setHost("192.168.64.140");        //f.setPort(5672);//默认端口能够省略        f.setUsername("admin");        f.setPassword("admin");        Channel c = f.newConnection().createChannel();        //定义交换机,服务器如果曾经存在这个交换机,不会反复创立(名称,类型)        //c.exchangeDeclare("logs", "fanout");//两种写法都能够        c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);        //向交换机发送音讯        while (true){            System.out.print("输出音讯:");            String msg = new Scanner(System.in).nextLine();            //参数1:交换机名称  参数2:抉择队列,对fanout交换机有效            c.basicPublish("logs", "", null, msg.getBytes());        }    }}
消费者

如果还没有队列绑定到交换器,音讯就会失落,但这对咱们来说没有问题;如果还没有消费者在听,咱们能够平安地抛弃这些信息。
ReceiveLogs.java代码:

package m3_pub_sub;import com.rabbitmq.client.*;import java.io.IOException;import java.util.UUID;import java.util.concurrent.TimeoutException;public class Consumer {    public static void main(String[] args) throws IOException, TimeoutException {        //连贯        ConnectionFactory f = new ConnectionFactory();        f.setHost("192.168.64.140");        //f.setPort(5672);//默认端口能够省略        f.setUsername("admin");        f.setPassword("admin");        Channel c = f.newConnection().createChannel();        //1.定义随机队列 2.定义交换机 3.绑定        String queue = UUID.randomUUID().toString();        c.queueDeclare(queue, false, true, true, null);//非长久,独占,主动删除        c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);        //交换机与队列进行绑定 对于fanout交换机来说,参数三有效        c.queueBind(queue, "logs", "");        //失常从队列接管音讯        DeliverCallback deliverCallback = new DeliverCallback() {            @Override            public void handle(String consumerTag, Delivery message) throws IOException {                String msg= new String(message.getBody());                System.out.println("收到: " + msg);            }        };        CancelCallback cancelCallback = new CancelCallback() {            @Override            public void handle(String consumerTag) throws IOException {            }        };        c.basicConsume(queue, true, deliverCallback, cancelCallback);    }}

路由模式

在上一大节,咱们构建了一个简略的日志零碎。咱们可能向多个接收者播送日志音讯。

在这一节,咱们将向其增加一个个性—咱们将只订阅所有音讯中的一部分。例如,咱们只接管要害谬误音讯并保留到日志文件(以节俭磁盘空间),同时依然可能在管制台上打印所有日志音讯。

绑定 Bindings

在上一节,咱们曾经创立了队列与交换机的绑定。应用上面这样的代码:

ch.queueBind(queueName, "logs", "");

绑定是交换机和队列之间的关系。这能够简略地了解为:队列对来自此替换的音讯感兴趣。

绑定能够应用额定的routingKey参数。为了防止与basic_publish参数混同,咱们将其称为bindingKey。这是咱们如何创立一个键绑定:

ch.queueBind(queueName, EXCHANGE_NAME, "black");

bindingKey的含意取决于交换机类型。咱们后面应用的fanout交换机齐全疏忽它。

直连交换机 Direct exchange

上一节中的日志零碎向所有消费者播送所有音讯。咱们心愿扩大它,容许依据音讯的严重性过滤音讯。例如,咱们心愿将日志音讯写入磁盘的程序只接管要害error,而不是在warning或info日志音讯上节约磁盘空间。

后面咱们应用的是fanout交换机,这并没有给咱们太多的灵活性——它只能进行简略的播送。

咱们将用直连交换机(Direct exchange)代替。它背地的路由算法很简略——消息传递到bindingKey与routingKey齐全匹配的队列。为了阐明这一点,请思考以下设置

其中咱们能够看到直连交换机X,它绑定了两个队列。第一个队列用绑定键orange绑定,第二个队列有两个绑定,一个绑定black,另一个绑定键green

这样设置,应用路由键orange公布到交换器的音讯将被路由到队列Q1。带有blackgreen路由键的音讯将转到Q2。而所有其余音讯都将被抛弃。

多重绑定 Multiple bindings

应用雷同的bindingKey绑定多个队列是齐全容许的。如图所示,能够应用binding key black将X与Q1和Q2绑定。在这种状况下,直连交换机的行为相似于fanout,并将音讯播送给所有匹配的队列。一条路由键为black的音讯将同时发送到Q1和Q2。

发送日志

咱们将在日志零碎中应用这个模型。咱们把音讯发送到一个Direct交换机,而不是fanout。咱们将提供日志级别作为routingKey。这样,接管程序将可能抉择它心愿接管的级别。让咱们首先来看收回日志。

和后面一样,咱们首先须要创立一个exchange:

//参数1: 交换机名//参数2: 交换机类型ch.exchangeDeclare("direct_logs", "direct");

接着来看发送音讯的代码

//参数1: 交换机名//参数2: routingKey, 路由键,这里咱们用日志级别,如"error","info","warning"//参数3: 其余配置属性//参数4: 公布的音讯数据 ch.basicPublish("direct_logs", "error", null, message.getBytes());

订阅

接管音讯的工作原理与后面章节一样,但有一个例外——咱们将为感兴趣的每个日志级别创立一个新的绑定, 示例代码如下:

ch.queueBind(queueName, "logs", "info");ch.queueBind(queueName, "logs", "warning");

残缺的代码

生产者
package m4_routing;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.Scanner;import java.util.concurrent.TimeoutException;public class Producer {    public static void main(String[] args) throws IOException, TimeoutException {        //连贯        ConnectionFactory f = new ConnectionFactory();        f.setHost("192.168.64.140");        //f.setPort(5672);//默认端口能够省略        f.setUsername("admin");        f.setPassword("admin");        Channel c = f.newConnection().createChannel();        //定义交换机        //参数1: 交换机名        //参数2: 交换机类型        c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);        //发送音讯,须要携带路由键        while (true){            System.out.print("输出音讯:");            String msg = new Scanner(System.in).nextLine();            System.out.print("输出路由键:");            String key = new Scanner(System.in).nextLine();            //第二个参数是音讯上携带的路由键                        //参数1: 交换机名            //参数2: routingKey, 路由键,这里咱们用日志级别,如"error","info","warning"            //参数3: 其余配置属性            //参数4: 公布的音讯数据            c.basicPublish("direct_logs",key,null, msg.getBytes());        }    }}
消费者
package m4_routing;import com.rabbitmq.client.*;import java.io.IOException;import java.util.Scanner;import java.util.concurrent.TimeoutException;public class Consumer {    public static void main(String[] args) throws IOException, TimeoutException {        //连贯        ConnectionFactory f = new ConnectionFactory();        f.setHost("192.168.64.140");        //f.setPort(5672);//默认端口能够省略        f.setUsername("admin");        f.setPassword("admin");        Channel c = f.newConnection().createChannel();        //1.定义随机队列 2.定义交换机 3.绑定,指定绑定的关键词        //由 rabbitmq服务器主动命名,默认参数:false,true,true        String queue = c.queueDeclare().getQueue();        c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);        System.out.print("输出绑定键:用空格隔开:");        String s = new Scanner(System.in).nextLine();        String[] a = s.split("\\s+");//正则表达式 \s+ 示意一个或多个空格        for (String key : a){            c.queueBind(queue, "direct_logs", key);        }        //失常的从队列生产数据        DeliverCallback deliverCallback = new DeliverCallback() {            @Override            public void handle(String consumerTag, Delivery message) throws IOException {                //取出音讯数据,和音讯上携带的路由键                //定义名字为 direct_logs 的交换机, 它的类型是 "direct"                String msg = new String(message.getBody());                String key = message.getEnvelope().getRoutingKey();                System.out.println("收到:"+key+"-"+msg);            }        };        CancelCallback cancelCallback = new CancelCallback() {            @Override            public void handle(String consumerTag) throws IOException {            }        };        c.basicConsume(queue, true, deliverCallback, cancelCallback);    }}

主题模式

在上一大节,咱们改良了日志零碎。咱们没有应用只能进行播送的fanout交换机,而是应用Direct交换机,从而能够选择性接管日志。

尽管应用Direct交换机改良了咱们的零碎,但它依然有局限性——它不能基于多个规范进行路由。

在咱们的日志零碎中,咱们可能不仅心愿依据级别订阅日志,还心愿依据收回日志的源订阅日志。

这将给咱们带来很大的灵活性——咱们可能只想接管来自“cron”的要害谬误,但也要接管来自“kern”的所有日志。

要在日志零碎中实现这一点,咱们须要理解更简单的Topic交换机。

主题交换机 Topic exchange

发送到Topic交换机的音讯,它的的routingKey,必须是由点分隔的多个单词。单词能够是任何货色,但通常是与音讯相干的一些个性。几个无效的routingKey示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。routingKey能够有任意多的单词,最多255个字节。
bindingKey也必须采纳雷同的模式。Topic交换机的逻辑与直连交换机相似——应用特定routingKey发送的音讯将被传递到所有应用匹配bindingKey绑定的队列。bindingKey有两个重要的非凡点:

  • * 能够通配单个单词。
  • # 能够通配零个或多个单词。

用一个例子来解释这个问题是最简略的

在本例中,咱们将发送形容动物的音讯。这些音讯将应用由三个单词(两个点)组成的routingKey发送。routingKey中的第一个单词示意速度,第二个是色彩,第三个是物种:“<速度>.<色彩>.<物种>”。
咱们创立三个绑定:Q1与bindingKey “*.orange.*” 绑定。和Q2是 “*.*.rabbit” 和 “lazy.#” 。

这些绑定可概括为:

  • Q1对所有橙色的动物感兴趣。
  • Q2想接管对于兔子和慢速动物的所有音讯。

将routingKey设置为"quick.orange.rabbit"的音讯将被发送到两个队列。音讯 "lazy.orange.elephant“也发送到它们两个。另外”quick.orange.fox“只会发到第一个队列,”lazy.brown.fox“只发给第二个。”lazy.pink.rabbit“将只被传递到第二个队列一次,即便它匹配两个绑定。”quick.brown.fox"不匹配任何绑定,因而将被抛弃。

如果咱们违反约定,发送一个或四个单词的信息,比方"orange“或”quick.orange.male.rabbit",会产生什么?这些音讯将不匹配任何绑定,并将失落。

另外,"lazy.orange.male.rabbit",即便它有四个单词,也将匹配最初一个绑定,并将被传递到第二个队列。

实现的代码

生产者
package m5_topic;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.Scanner;import java.util.concurrent.TimeoutException;public class Producer {    public static void main(String[] args) throws IOException, TimeoutException {        //连贯        ConnectionFactory f = new ConnectionFactory();        f.setHost("192.168.64.140");        //f.setPort(5672);//默认端口能够省略        f.setUsername("admin");        f.setPassword("admin");        Channel c = f.newConnection().createChannel();        //定义交换机        c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);        //发送音讯,须要携带路由键        while (true){            System.out.print("输出音讯:");            String msg = new Scanner(System.in).nextLine();            System.out.print("输出路由键:");            String key = new Scanner(System.in).nextLine();            //第二个参数是音讯上携带的路由键            c.basicPublish("topic_logs",key,null, msg.getBytes());        }    }}package m5_topic;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.Scanner;import java.util.concurrent.TimeoutException;public class Producer {    public static void main(String[] args) throws IOException, TimeoutException {        //连贯        ConnectionFactory f = new ConnectionFactory();        f.setHost("192.168.64.140");        //f.setPort(5672);//默认端口能够省略        f.setUsername("admin");        f.setPassword("admin");        Channel c = f.newConnection().createChannel();        //定义交换机        c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);        //发送音讯,须要携带路由键        while (true){            System.out.print("输出音讯:");            String msg = new Scanner(System.in).nextLine();            System.out.print("输出路由键:");            String key = new Scanner(System.in).nextLine();            //第二个参数是音讯上携带的路由键            c.basicPublish("topic_logs",key,null, msg.getBytes());        }    }}
消费者
package m5_topic;import com.rabbitmq.client.*;import java.io.IOException;import java.util.Scanner;import java.util.concurrent.TimeoutException;public class Consumer {    public static void main(String[] args) throws IOException, TimeoutException {        //连贯        ConnectionFactory f = new ConnectionFactory();        f.setHost("192.168.64.140");        //f.setPort(5672);//默认端口能够省略        f.setUsername("admin");        f.setPassword("admin");        Channel c = f.newConnection().createChannel();        //1.定义随机队列 2.定义交换机 3.绑定,指定绑定的关键词        //由 rabbitmq服务器主动命名,默认参数:false,true,true        String queue = c.queueDeclare().getQueue();        c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);        System.out.print("输出绑定键:用空格隔开:");        String s = new Scanner(System.in).nextLine();        String[] a = s.split("\\s+");//正则表达式 \s+ 示意一个或多个空格        for (String key : a){            c.queueBind(queue, "topic_logs", key);        }        //失常的从队列生产数据        DeliverCallback deliverCallback = new DeliverCallback() {            @Override            public void handle(String consumerTag, Delivery message) throws IOException {                //取出音讯数据,和音讯上携带的路由键                String msg = new String(message.getBody());                String key = message.getEnvelope().getRoutingKey();                System.out.println("收到:"+key+"-"+msg);            }        };        CancelCallback cancelCallback = new CancelCallback() {            @Override            public void handle(String consumerTag) throws IOException {            }        };        c.basicConsume(queue, true, deliverCallback, cancelCallback);    }}

RPC模式

如果咱们须要在近程电脑上运行一个办法,并且还要期待一个返回后果该怎么办?这和后面的例子不太一样, 这种模式咱们通常称为近程过程调用,即RPC.

在本节中,咱们将会学习应用RabbitMQ去搭建一个RPC零碎:一个客户端和一个能够降级(扩大)的RPC服务器。为了模仿一个耗时工作,咱们将创立一个返回斐波那契数列的虚构的RPC服务。

客户端

在客户端定义一个RPCClient类,并定义一个call()办法,这个办法发送一个RPC申请,并期待接管响应后果

RPCClient client = new RPCClient();String result = client.call("4");System.out.println( "第四个斐波那契数是: " + result);

回调队列 Callback Queue

应用RabbitMQ去实现RPC很容易。一个客户端发送申请信息,并失去一个服务器端回复的响应信息。为了失去响应信息,咱们须要在申请的时候发送一个“回调”队列地址。咱们能够应用默认队列。上面是示例代码:

//定义回调队列,//主动生成对列名,非长久,独占,主动删除callbackQueueName = ch.queueDeclare().getQueue();//用来设置回调队列的参数对象BasicProperties props = new BasicProperties                            .Builder()                            .replyTo(callbackQueueName)                            .build();//发送调用音讯ch.basicPublish("", "rpc_queue", props, message.getBytes());
音讯属性 Message Properties
AMQP 0-9-1协定定义了音讯的14个属性。大部分属性很少应用,上面是比拟罕用的4个:
deliveryMode:将音讯标记为长久化(值为2)或非长久化(任何其余值)。
contentType:用于形容mime类型。例如,对于常常应用的JSON格局,将此属性设置为:application/json
replyTo:通常用于指定回调队列。
correlationId:将RPC响应与申请关联起来十分有用。

关联id (correlationId):

在下面的代码中,咱们会为每个RPC申请创立一个回调队列。 这是十分低效的,这里还有一个更好的办法:让咱们为每个客户端创立一个回调队列。

这就提出了一个新的问题,在队列中失去一个响应时,咱们不分明这个响应所对应的是哪一条申请。这时候就须要应用关联id(correlationId)。咱们将为每一条申请设置惟一的的id值。稍后,当咱们在回调队列里收到一条音讯的时候,咱们将查看它的id属性,这样咱们就能够匹配对应的申请和响应。如果咱们发现了一个未知的id值,咱们能够平安的抛弃这条音讯,因为它不属于咱们的申请。

小结

RPC的工作形式是这样的:

  • 对于RPC申请,客户端发送一条带有两个属性的音讯:replyTo,设置为仅为申请创立的匿名独占队列,和correlationId,设置为每个申请的惟一id值。
  • 申请被发送到rpc_queue队列。
  • RPC工作过程(即:服务器)在队列上期待申请。当一个申请呈现时,它执行工作,并应用replyTo字段中的队列将后果发回客户机。
  • 客户机在回应音讯队列上期待数据。当音讯呈现时,它查看correlationId属性。如果匹配申请中的值,则向程序返回该响应数据。

实现的代码

服务器端
package rabbitmq.rpc;import java.io.IOException;import java.util.Random;import java.util.Scanner;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;import com.rabbitmq.client.Delivery;import com.rabbitmq.client.AMQP.BasicProperties;public class RPCServer {    public static void main(String[] args) throws Exception {        ConnectionFactory f = new ConnectionFactory();        f.setHost("192.168.64.140");        f.setPort(5672);        f.setUsername("admin");        f.setPassword("admin");                Connection c = f.newConnection();        Channel ch = c.createChannel();        /*         * 定义队列 rpc_queue, 将从它接管申请信息         *          * 参数:         * 1. queue, 对列名         * 2. durable, 长久化         * 3. exclusive, 排他         * 4. autoDelete, 主动删除         * 5. arguments, 其余参数属性         */        ch.queueDeclare("rpc_queue",false,false,false,null);        ch.queuePurge("rpc_queue");//革除队列中的内容                ch.basicQos(1);//一次只接管一条音讯                        //收到申请音讯后的回调对象        DeliverCallback deliverCallback = new DeliverCallback() {            @Override            public void handle(String consumerTag, Delivery message) throws IOException {                //解决收到的数据(要求第几个斐波那契数)                String msg = new String(message.getBody(), "UTF-8");                int n = Integer.parseInt(msg);                //求出第n个斐波那契数                int r = fbnq(n);                String response = String.valueOf(r);                                //设置发回响应的id, 与申请id统一, 这样客户端能够把该响应与它的申请进行对应                BasicProperties replyProps = new BasicProperties.Builder()                        .correlationId(message.getProperties().getCorrelationId())                        .build();                /*                 * 发送响应音讯                 * 1. 默认交换机                 * 2. 由客户端指定的,用来传递响应音讯的队列名                 * 3. 参数(关联id)                 * 4. 发回的响应音讯                 */                ch.basicPublish("",message.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));                //发送确认音讯                ch.basicAck(message.getEnvelope().getDeliveryTag(), false);            }        };                //        CancelCallback cancelCallback = new CancelCallback() {            @Override            public void handle(String consumerTag) throws IOException {            }        };                //消费者开始接管音讯, 期待从 rpc_queue接管申请音讯, 不主动确认        ch.basicConsume("rpc_queue", false, deliverCallback, cancelCallback);    }    protected static int fbnq(int n) {        if(n == 1 || n == 2) return 1;                return fbnq(n-1)+fbnq(n-2);    }}
客户端
package rabbitmq.rpc;import java.io.IOException;import java.util.Scanner;import java.util.UUID;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;import com.rabbitmq.client.Delivery;import com.rabbitmq.client.AMQP.BasicProperties;public class RPCClient {    Connection con;    Channel ch;        public RPCClient() throws Exception {        ConnectionFactory f = new ConnectionFactory();        f.setHost("192.168.64.140");        f.setUsername("admin");        f.setPassword("admin");        con = f.newConnection();        ch = con.createChannel();    }        public String call(String msg) throws Exception {        //主动生成对列名,非长久,独占,主动删除        String replyQueueName = ch.queueDeclare().getQueue();        //生成关联id        String corrId = UUID.randomUUID().toString();                //设置两个参数:        //1. 申请和响应的关联id        //2. 传递响应数据的queue        BasicProperties props = new BasicProperties.Builder()                .correlationId(corrId)                .replyTo(replyQueueName)                .build();        //向 rpc_queue 队列发送申请数据, 申请第n个斐波那契数        ch.basicPublish("", "rpc_queue", props, msg.getBytes("UTF-8"));                //用来保留后果的阻塞汇合,取数据时,没有数据会暂停期待        BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);                //接管响应数据的回调对象        DeliverCallback deliverCallback = new DeliverCallback() {            @Override            public void handle(String consumerTag, Delivery message) throws IOException {                //如果响应音讯的关联id,与申请的关联id雷同,咱们来解决这个响应数据                if (message.getProperties().getCorrelationId().contentEquals(corrId)) {                    //把收到的响应数据,放入阻塞汇合                    response.offer(new String(message.getBody(), "UTF-8"));                }            }        };        CancelCallback cancelCallback = new CancelCallback() {            @Override            public void handle(String consumerTag) throws IOException {            }        };                //开始从队列接管响应数据        ch.basicConsume(replyQueueName, true, deliverCallback, cancelCallback);        //返回保留在汇合中的响应数据        return response.take();    }        public static void main(String[] args) throws Exception {        RPCClient client = new RPCClient();        while (true) {            System.out.print("求第几个斐波那契数:");            int n = new Scanner(System.in).nextInt();            String r = client.call(""+n);            System.out.println(r);        }    }}

virtual host

在RabbitMQ中叫做虚构音讯服务器VirtualHost,每个VirtualHost相当于一个绝对独立的RabbitMQ服务器,每个VirtualHost之间是互相隔离的。exchange、queue、message不能互通

创立virtual host: /pd
  • 进入虚拟机治理界面

  • 增加新的虚拟机’/pd’,名称必须以"/"结尾

  • 查看增加的后果

设置虚拟机的用户拜访权限

点击 /pd 虚拟主机, 设置用户 admin 对它的拜访权限

拼多商城整合 rabbitmq

当用户下订单时,咱们的业务零碎间接与数据库通信,把订单保留到数据库中

当零碎流量忽然激增,大量的订单压力,会拖慢业务零碎和数据库系统

咱们须要应答流量峰值,让流量曲线变得平缓,如下图

订单存储的解耦

为了进行流量削峰,咱们引入 rabbitmq 音讯队列,当购物零碎产生订单后,能够把订单数据发送到音讯队列;而订单消费者利用从音讯队列接管订单音讯,并把订单保留到数据库

这样,当流量激增时,大量订单会暂存在rabbitmq中,而订单消费者能够从容地从音讯队列缓缓接管订单,向数据库保留

生产者-发送订单

pom.xml 增加依赖

spring提供了更不便的音讯队列拜访接口,它对RabbitMQ的客户端API进行了封装,应用起来更加不便

<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-amqp</artifactId></dependency>
application.yml

增加RabbitMQ的连贯信息

spring:  rabbitmq:    host: 192.168.64.140    port: 5672    virtualHost: /pd    username: admin    password: admin
批改主程序 RunPdAPP

在主程序中增加上面的办法创立Queue实例

当创立RabbitMQ连贯和信道后,Spring的RabbitMQ工具会主动在服务器中创立队列,代码在RabbitAdmin.declareQueues()办法中

 @Bean    public Queue getQueue() {        Queue q = new Queue("orderQueue", true);        return q;    }
批改 OrderServiceImpl
 //RabbitAutoConfiguration中创立了AmpqTemplate实例    @Autowired    AmqpTemplate amqpTemplate;    //saveOrder原来的数据库拜访代码全副正文,增加rabbitmq音讯发送代码    public String saveOrder(PdOrder pdOrder) throws Exception {        String orderId = generateId();        pdOrder.setOrderId(orderId);        amqpTemplate.convertAndSend("orderQueue", pdOrder);        return orderId;                //        String orderId = generateId();        //        pdOrder.setOrderId(orderId);        //        //         //        PdShipping pdShipping = pdShippingMapper.selectByPrimaryKey(pdOrder.getAddId());        //        pdOrder.setShippingName(pdShipping.getReceiverName());        //        pdOrder.setShippingCode(pdShipping.getReceiverAddress());        //        pdOrder.setStatus(1);//         //        pdOrder.setPaymentType(1);        //        pdOrder.setPostFee(10D);        //        pdOrder.setCreateTime(new Date());        //        //        double payment = 0;        //        List<ItemVO> itemVOs = selectCartItemByUseridAndItemIds(pdOrder.getUserId(), pdOrder.getItemIdList());        //        for (ItemVO itemVO : itemVOs) {        //            PdOrderItem pdOrderItem = new PdOrderItem();        //            String id = generateId();        //            //String id="2";        //            pdOrderItem.setId(id);        //            pdOrderItem.setOrderId(orderId);        //            pdOrderItem.setItemId("" + itemVO.getPdItem().getId());        //            pdOrderItem.setTitle(itemVO.getPdItem().getTitle());        //            pdOrderItem.setPrice(itemVO.getPdItem().getPrice());        //            pdOrderItem.setNum(itemVO.getPdCartItem().getNum());        //        //            payment = payment + itemVO.getPdCartItem().getNum() * itemVO.getPdItem().getPrice();        //            pdOrderItemMapper.insert(pdOrderItem);        //        }        //        pdOrder.setPayment(payment);        //        pdOrderMapper.insert(pdOrder);        //        return orderId;    }

消费者-接管订单,并保留到数据库

pd-web我的项目复制为pd-order-consumer

批改 application.yml

把端口批改成 81

server:  port: 81spring:  datasource:    type: com.alibaba.druid.pool.DruidDataSource    driver-class-name: com.mysql.jdbc.Driver    url: jdbc:mysql://localhost:3306/pd_store?useUnicode=true&characterEncoding=utf8    username: root    password: root  rabbitmq:    host: 192.168.64.140    port: 5672    virtualHost: /pd    username: admin    password: adminmybatis:  #typeAliasesPackage: cn.tedu.ssm.pojo  mapperLocations: classpath:com.pd.mapper/*.xmllogging:  level:     cn.tedu.ssm.mapper: debug
删除无关代码

pd-order-consumer我的项目只须要从 RabbitMQ 接管订单数据, 再保留到数据库即可, 所以我的项目中只须要保留这部分代码

  • 删除 com.pd.controller 包
  • 删除 com.pd.payment.utils 包
  • 删除无关的 Service,只保留 OrderService 和 OrderServiceImpl
新建 OrderConsumer
package com.pd;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import com.pd.pojo.PdOrder;import com.pd.service.OrderService;@Componentpublic class OrderConsumer {    //收到订单数据后,会调用订单的业务代码,把订单保留到数据库    @Autowired    private OrderService orderService;    //增加该注解后,会从指定的orderQueue接管音讯,    //并把数据转为 PdOrder 实例传递到此办法    @RabbitListener(queues="orderQueue")    public void save(PdOrder pdOrder)    {        System.out.println("消费者");        System.out.println(pdOrder.toString());        try {            orderService.saveOrder(pdOrder);        } catch (Exception e) {            e.printStackTrace();        }    }}
批改 OrderServiceImpl 的 saveOrder() 办法
 public String saveOrder(PdOrder pdOrder) throws Exception {        //        String orderId = generateId();        //        pdOrder.setOrderId(orderId);        //        //        amqpTemplate.convertAndSend("orderQueue", pdOrder);        //        return orderId;        //        //         //         //        String orderId = generateId();        //        pdOrder.setOrderId(orderId);                //从RabbitMQ接管的订单数据,        //曾经在上游订单业务中生成过id,这里不再从新生成id        //间接获取该订单的id        String orderId = pdOrder.getOrderId();                PdShipping pdShipping = pdShippingMapper.selectByPrimaryKey(pdOrder.getAddId());        pdOrder.setShippingName(pdShipping.getReceiverName());        pdOrder.setShippingCode(pdShipping.getReceiverAddress());        pdOrder.setStatus(1);//         pdOrder.setPaymentType(1);        pdOrder.setPostFee(10D);        pdOrder.setCreateTime(new Date());        double payment = 0;        List<ItemVO> itemVOs = selectCartItemByUseridAndItemIds(pdOrder.getUserId(), pdOrder.getItemIdList());        for (ItemVO itemVO : itemVOs) {            PdOrderItem pdOrderItem = new PdOrderItem();            String id = generateId();            //String id="2";            pdOrderItem.setId(id);            pdOrderItem.setOrderId(orderId);            pdOrderItem.setItemId("" + itemVO.getPdItem().getId());            pdOrderItem.setTitle(itemVO.getPdItem().getTitle());            pdOrderItem.setPrice(itemVO.getPdItem().getPrice());            pdOrderItem.setNum(itemVO.getPdCartItem().getNum());            payment = payment + itemVO.getPdCartItem().getNum() * itemVO.getPdItem().getPrice();            pdOrderItemMapper.insert(pdOrderItem);        }        pdOrder.setPayment(payment);        pdOrderMapper.insert(pdOrder);        return orderId;    } 

手动确认

application.yml
spring:  rabbitmq:    listener:      simple:        acknowledge-mode: manual
OrderConsumer
package com.pd;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import com.pd.pojo.PdOrder;import com.pd.service.OrderService;import com.rabbitmq.client.Channel;@Componentpublic class OrderConsumer {    //收到订单数据后,会调用订单的业务代码,把订单保留到数据库    @Autowired    private OrderService orderService;    //增加该注解后,会从指定的orderQueue接管音讯,    //并把数据转为 PdOrder 实例传递到此办法    @RabbitListener(queues="orderQueue")    public void save(PdOrder pdOrder, Channel channel, Message message)    {        System.out.println("消费者");        System.out.println(pdOrder.toString());        try {            orderService.saveOrder(pdOrder);            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        } catch (Exception e) {            e.printStackTrace();        }     }}