RabbitMQ 应用场景
服务解耦
假如有这样一个场景, 服务A产生数据, 而服务B,C,D须要这些数据, 那么咱们能够在A服务中间接调用B,C,D服务,把数据传递到上游服务即可
然而,随着咱们的利用规模不断扩大,会有更多的服务须要A的数据,如果有几十甚至几百个上游服务,而且会一直变更,再加上还要思考上游服务出错的状况,那么A服务中调用代码的保护会极为艰难
这是因为服务之间耦合度过于严密
再来思考用RabbitMQ解耦的状况
A服务只须要向音讯服务器发送音讯,而不必思考谁须要这些数据;上游服务如果须要数据,自行从音讯服务器订阅音讯,不再须要数据时则勾销订阅即可
流量削峰
假如咱们有一个利用,平时访问量是每秒300申请,咱们用一台服务器即可轻松应答
而在高峰期,访问量霎时翻了十倍,达到每秒3000次申请,那么单台服务器必定无奈应答,这时咱们能够思考减少到10台服务器,来扩散拜访压力
但如果这种刹时顶峰的状况每天只呈现一次,每次只有半小时,那么咱们10台服务器在少数工夫都只分担每秒几十次申请,这样就有点浪费资源了
这种状况,咱们就能够应用RabbitMQ来进行流量削峰,顶峰状况下,霎时呈现的大量申请数据,先发送到音讯队列服务器,排队期待被解决,而咱们的利用,能够缓缓的从音讯队列接管申请数据进行解决,这样把数据处理工夫拉长,以加重刹时压力
这是音讯队列服务器十分典型的利用场景
异步调用
思考定外卖领取胜利的状况
领取后要发送领取胜利的告诉,再寻找外卖小哥来进行配送,而寻找外卖小哥的过程十分耗时,尤其是高峰期,可能要期待几十秒甚至更长
这样就造成整条调用链路响应十分迟缓
而如果咱们引入RabbitMQ音讯队列,订单数据能够发送到音讯队列服务器,那么调用链路也就能够到此结束,订单零碎则能够立刻失去响应,整条链路的响应工夫只有200毫秒左右
寻找外卖小哥的利用能够以异步的形式从音讯队列接管订单音讯,再执行耗时的寻找操作
rabbitmq 基本概念
RabbitMQ是一种消息中间件,用于解决来自客户端的异步音讯。服务端将要发送的音讯放入到队列池中。接收端能够依据RabbitMQ配置的转发机制接管服务端发来的音讯。RabbitMQ根据指定的转发规定进行音讯的转发、缓冲和长久化操作,次要用在多服务器间或单服务器的子系统间进行通信,是分布式系统规范的配置。
Exchange
承受生产者发送的音讯,并依据Binding规定将音讯路由给服务器中的队列。ExchangeType决定了Exchange路由音讯的行为。在RabbitMQ中,ExchangeType罕用的有direct、Fanout和Topic三种。
Message Queue
音讯队列。咱们发送给RabbitMQ的音讯最初都会达到各种queue,并且存储在其中(如果路由找不到相应的queue则数据会失落),期待消费者来取。
Binding Key
它示意的是Exchange与Message Queue是通过binding key进行分割的,这个关系是固定。
Routing Key
生产者在将音讯发送给Exchange的时候,个别会指定一个routing key,来指定这个音讯的路由规定。这个routing key须要与Exchange Type及binding key联结应用能力生,咱们的生产者只须要通过指定routing key来决定音讯流向哪里。
rabbitmq装置
离线装置
下载离线安装包文件
- https://download.csdn.net/download/weixin_38305440/12265906
上传离线安装包
rabbitmq-install
目录上传到/root
切换到rabbitmq-install
目录
cd rabbitmq-install
装置
rpm -ivh *.rpm
Yum在线装置
以下内容来自 RabbitMQ 官网手册
rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc# centos7 用这个cat <<EOF > /etc/yum.repos.d/rabbitmq.repo[bintray-rabbitmq-server]name=bintray-rabbitmq-rpmbaseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/7/gpgcheck=0repo_gpgcheck=0enabled=1EOF# centos6 用这个cat <<EOF > /etc/yum.repos.d/rabbitmq.repo[bintray-rabbitmq-server]name=bintray-rabbitmq-rpmbaseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/6/gpgcheck=0repo_gpgcheck=0enabled=1EOFyum makecacheyum install socatwget https://github.com/rabbitmq/erlang-rpm/releases/download/v21.3.8.12/erlang-21.3.8.12-1.el7.x86_64.rpmrpm -ivh erlang-21.3.8.12-1.el7.x86_64.rpm --force --nodepsyum install rabbitmq-server
启动rabbitmq服务器
# 设置服务,开机主动启动systemctl enable rabbitmq-server# 启动服务systemctl start rabbitmq-server
rabbitmq治理界面
启用治理界面
# 开启治理界面插件rabbitmq-plugins enable rabbitmq_management# 防火墙关上 15672 治理端口firewall-cmd --zone=public --add-port=15672/tcp --permanentfirewall-cmd --reload
重启RabbitMQ服务
systemctl restart rabbitmq-server
拜访
拜访服务器的15672
端口,例如:
http://192.168.64.140:15672
增加用户
# 增加用户rabbitmqctl add_user admin admin# 新用户设置用户为超级管理员rabbitmqctl set_user_tags admin administrator
设置拜访权限
- 用户治理参考
https://www.cnblogs.com/AloneSword/p/4200051.html
凋谢客户端连贯端口
# 关上客户端连贯端口firewall-cmd --zone=public --add-port=5672/tcp --permanentfirewall-cmd --reload
次要端口介绍
- 4369 – erlang发现口
- 5672 – client端通信口
- 15672 – 治理界面ui端口
- 25672 – server间外部通信口
rabbitmq六种工作模式
简略模式
RabbitMQ是一个消息中间件,你能够设想它是一个邮局。当你把函件放到邮箱里时,可能确信邮递员会正确地递送你的函件。RabbitMq就是一个邮箱、一个邮局和一个邮递员。
- 发送音讯的程序是生产者
- 队列就代表一个邮箱。尽管音讯会流经RbbitMQ和你的应用程序,但音讯只能被存储在队列里。队列存储空间只受服务器内存和磁盘限度,它实质上是一个大的音讯缓冲区。多个生产者能够向同一个队列发送音讯,多个消费者也能够从同一个队列接管音讯.
- 消费者期待从队列接管音讯
pom.xml
增加 slf4j 依赖, 和 rabbitmq amqp 依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tedu</groupId> <artifactId>rabbitmq</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.4.3</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.8.0-alpha2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.8.0-alpha2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build></project>
生产者发送音讯
package m1;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Producer { public static void main(String[] args) throws Exception { //创立连贯工厂,并设置连贯信息 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672);//可选,5672是默认端口 f.setUsername("admin"); f.setPassword("admin"); /* * 与rabbitmq服务器建设连贯, * rabbitmq服务器端应用的是nio,会复用tcp连贯, * 并开拓多个信道与客户端通信 * 以加重服务器端建设连贯的开销 */ Connection c = f.newConnection(); //建设信道 Channel ch = c.createChannel(); /* * 申明队列,会在rabbitmq中创立一个队列 * 如果曾经创立过该队列,就不能再应用其余参数来创立 * * 参数含意: * -queue: 队列名称 * -durable: 队列长久化,true示意RabbitMQ重启后队列仍存在 * -exclusive: 排他,true示意限度仅以后连贯可用 * -autoDelete: 当最初一个消费者断开后,是否删除队列 * -arguments: 其余参数 */ ch.queueDeclare("helloworld", false,false,false,null); /* * 公布音讯 * 这里把音讯向默认交换机发送. * 默认交换机隐含与所有队列绑定,routing key即为队列名称 * * 参数含意: * -exchange: 交换机名称,空串示意默认交换机"(AMQP default)",不能用 null * -routingKey: 对于默认交换机,路由键就是指标队列名称 * -props: 其余参数,例如头信息 * -body: 音讯内容byte[]数组 */ ch.basicPublish("", "helloworld", null, "Hello world!".getBytes()); System.out.println("音讯已发送"); c.close(); }}
消费者接管音讯
package m1;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Consumer { public static void main(String[] args) throws Exception { //连贯工厂 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setUsername("admin"); f.setPassword("admin"); //建设连贯 Connection c = f.newConnection(); //建设信道 Channel ch = c.createChannel(); //申明队列,如果该队列曾经创立过,则不会反复创立 ch.queueDeclare("helloworld",false,false,false,null); System.out.println("期待接收数据"); //收到音讯后用来解决音讯的回调对象 DeliverCallback callback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { String msg = new String(message.getBody(), "UTF-8"); System.out.println("收到: "+msg); } }; //消费者勾销时的回调对象 CancelCallback cancel = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; ch.basicConsume("helloworld", true, callback, cancel); }}
工作模式
工作队列(即工作队列)背地的次要思维是防止立刻执行资源密集型工作,并且必须期待它实现。相同,咱们将工作安顿在稍后实现。
咱们将工作封装为音讯并将其发送到队列。后盾运行的工作过程将获取工作并最终执行工作。当运行多个消费者时,工作将在它们之间散发。
应用工作队列的一个长处是可能轻松地并行工作。如果咱们正在积压工作工作,咱们能够增加更多工作过程,这样就能够轻松扩大。
生产者发送音讯
这里模仿耗时工作,发送的音讯中,每个点使工作过程暂停一秒钟,例如"Hello…"将破费3秒钟来解决
package m2_work;import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;import java.util.Scanner;public class Producer { public static void main(String[] args) throws Exception { //连贯 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); //f.setPort(5672);//默认端口能够省略 f.setUsername("admin"); f.setPassword("admin"); Channel c = f.newConnection().createChannel(); //定义队列 //队列如果在服务器端曾经存在,属性不可变(第二个参数改为true) c.queueDeclare("task_queue", false, false, false, null); while (true){ System.out.print("输出音讯:"); String msg = new Scanner(System.in).nextLine(); if ("exit".equals(msg)) { break; } c.basicPublish("", "helloworld", null, msg.getBytes()); } }}
消费者接管音讯
package m2_work;import com.rabbitmq.client.*;import java.io.IOException;public class Consumer { public static void main(String[] args) throws Exception { //连贯 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); //f.setPort(5672);//默认端口能够省略 f.setUsername("admin"); f.setPassword("admin"); Channel c = f.newConnection().createChannel(); //定义队列 c.queueDeclare("helloworld", false, false, false, null); // 解决音讯的回调对象 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { //遍历音讯字符串,每个点字符都暂停一秒 String msg = new String(message.getBody()); System.out.println("收到:"+msg); for (int i = 0 ; i<msg.length();i++){ if (msg.charAt(i)=='.'){ try { Thread.sleep(1000); } catch (InterruptedException e) { } } } System.out.println("音讯解决完结"); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; //生产数据 //第二个参数:true - 主动确认 false - 手动确认 c.basicConsume("task_queue", true, deliverCallback, cancelCallback); }}
运行测试
运行:
- 一个生产者
- 两个消费者
生产者发送多条音讯,
如: 1,2,3,4,5. 两个消费者别离收到:
- 消费者一: 1,3,5
- 消费者二: 2,4
rabbitmq在所有消费者中轮询散发音讯,把音讯平均地发送给所有消费者
音讯确认
一个消费者接管音讯后,在音讯没有齐全解决完时就挂掉了,那么这时会产生什么呢?
就当初的代码来说,rabbitmq把音讯发送给消费者后,会立刻删除音讯,那么消费者挂掉后,它没来得及解决的音讯就会失落
如果生产者发送以下音讯: 1… 2 3 4 5两个消费者别离收到:* 消费者一: 1…, 3, 5* 消费者二: 2, 4当消费者一收到所有音讯后,要话费7秒工夫来解决第一条音讯,这期间如果敞开该消费者,那么1未解决实现,3,5则没有被解决
咱们并不想失落任何音讯, 如果一个消费者挂掉,咱们想把它的工作音讯派发给其余消费者
为了确保音讯不会失落,rabbitmq反对音讯确认(回执)。当一个音讯被消费者接管到并且执行实现后,消费者会发送一个ack (acknowledgment) 给rabbitmq服务器, 通知他我曾经执行实现了,你能够把这条音讯删除了。
如果一个消费者没有返回音讯确认就挂掉了(信道敞开,连贯敞开或者TCP链接失落),rabbitmq就会明确,这个音讯没有被解决实现,rebbitmq就会把这条音讯从新放入队列,如果在这时有其余的消费者在线,那么rabbitmq就会迅速的把这条消息传递给其余的消费者,这样就确保了没有音讯会失落。
这里不存在音讯超时, rabbitmq只在消费者挂掉时从新分派音讯, 即便消费者花十分久的工夫来解决音讯也能够
手动音讯确认默认是开启的,后面的例子咱们通过autoAck=ture把它敞开了。咱们当初要把它设置为false,而后工作过程解决完动向工作时,发送一个音讯确认(回执)。
package m2_work;import com.rabbitmq.client.*;import java.io.IOException;public class Consumer { public static void main(String[] args) throws Exception { //连贯 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); //f.setPort(5672);//默认端口能够省略 f.setUsername("admin"); f.setPassword("admin"); Channel c = f.newConnection().createChannel(); //定义队列 c.queueDeclare("task_queue", false, false, false, null); // 解决音讯的回调对象 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { //遍历音讯字符串,每个点字符都暂停一秒 String msg = new String(message.getBody()); System.out.println("收到:"+msg); for (int i = 0 ; i<msg.length();i++){ if (msg.charAt(i)=='.'){ try { Thread.sleep(1000); } catch (InterruptedException e) { } } } //发送回执(参数1:回值(long类型的编码)参数2:是否一次确认多条(boolean,个别都是false)) c.basicAck(message.getEnvelope().getDeliveryTag(),false);//封装的回值对象(envelope)的标签(deliveryTag) System.out.println("音讯解决完结"); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; //生产数据 //第二个参数:true - 主动确认 false - 手动确认 c.basicConsume("task_queue", false, deliverCallback, cancelCallback); }}
应用以上代码,就算杀掉一个正在解决音讯的工作过程也不会失落任何音讯,工作过程挂掉之后,没有确认的音讯就会被主动从新传递。
遗记确认(ack)是一个常见的谬误, 这样结果是很重大的, 因为未确认的音讯不会被开释, rabbitmq会吃掉越来越多的内存
能够应用上面命令打印工作队列中未确认音讯的数量
rabbitmqctl list_queues name >messages_readymessages_unacknowledged
当解决音讯时异常中断, 能够抉择让音讯重回队列从新发送.
nack
操作能够是音讯重回队列, 能够应用 basicNack() 办法:// requeue为true时重回队列, 反之音讯被抛弃或被发送到死信队列c.basicNack(tag, multiple, requeue)
正当地散发
rabbitmq会一次把多个音讯分发给消费者, 这样可能造成有的消费者十分忙碌, 而其它消费者闲暇. 而rabbitmq对此无所不知, 依然会平均的散发音讯
咱们能够应用 basicQos(1)
办法, 这通知rabbitmq一次只向消费者发送一条音讯, 在返回确认回执前, 不要向消费者发送新音讯. 而是把音讯发给下一个闲暇的消费者
音讯长久化
当rabbitmq敞开时, 咱们队列中的音讯依然会失落, 除非明确要求它不要失落数据
要求rabbitmq不失落数据要做如下两点: 把队列和音讯都设置为可长久化(durable)
队列设置为可长久化, 能够在定义队列时指定参数durable为true
//第二个参数是长久化参数durablech.queueDeclare("helloworld", true, false, false, null);
因为之前咱们曾经定义过队列"hello"是不可长久化的, 对已存在的队列, rabbitmq不容许对其定义不同的参数, 否则会出错, 所以这里咱们定义一个不同名字的队列"task_queue"
//定义一个新的队列,名为 task_queue//第二个参数是长久化参数 durablech.queueDeclare("task_queue", true, false, false, null);
生产者和消费者代码都要批改
这样即便rabbitmq重新启动, 队列也不会失落. 当初咱们再设置队列中音讯的长久化, 应用MessageProperties.PERSISTENT_TEXT_PLAIN
参数
//第三个参数设置音讯长久化ch.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
上面是"工作模式"最终实现的生产者和消费者代码
生产者代码
package m2_work;import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;import java.util.Scanner;public class Producer { public static void main(String[] args) throws Exception { //连贯 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); //f.setPort(5672);//默认端口能够省略 f.setUsername("admin"); f.setPassword("admin"); Channel c = f.newConnection().createChannel(); //定义队列 //队列如果在服务器端曾经存在,属性不可变(第二个参数改为true) c.queueDeclare("task_queue", true, false, false, null); //发送音讯 //循环输出音讯发送 /* 输出音讯:ger.err....er.....er.....e....r...... 消费者收到的音讯,遍历字符串,每个点字符都暂停一秒,模仿耗时音讯 */ while (true){ System.out.print("输出音讯:"); String msg = new Scanner(System.in).nextLine(); c.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());//第三个参数代表,长久的纯文本文件 } }}
消费者代码
package m2_work;import com.rabbitmq.client.*;import java.io.IOException;public class Consumer { public static void main(String[] args) throws Exception { //连贯 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); //f.setPort(5672);//默认端口能够省略 f.setUsername("admin"); f.setPassword("admin"); Channel c = f.newConnection().createChannel(); //定义队列 c.queueDeclare("task_queue", true, false, false, null); // 解决音讯的回调对象 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { //遍历音讯字符串,每个点字符都暂停一秒 String msg = new String(message.getBody()); System.out.println("收到:"+msg); for (int i = 0 ; i<msg.length();i++){ if (msg.charAt(i)=='.'){ try { Thread.sleep(1000); } catch (InterruptedException e) { } } } //发送回执(参数1:回值(long类型的编码)参数2:是否一次确认多条(boolean,个别都是false)) c.basicAck(message.getEnvelope().getDeliveryTag(),false);//封装的回值对象(envelope)的标签(deliveryTag) System.out.println("音讯解决完结"); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; //每次只承受解决一条音讯,解决实现之前不承受下一条 //必须在手动 ACK 模式下才无效 c.basicQos(1); //生产数据 //第二个参数:true - 主动确认 false - 手动确认 c.basicConsume("task_queue", false, deliverCallback, cancelCallback); }}
公布订阅模式
在后面的例子中,咱们工作音讯只交付给一个工作过程。在这部分,咱们将做一些齐全不同的事件——咱们将向多个消费者传递同一条音讯。这种模式称为“公布/订阅”。
为了阐明该模式,咱们将构建一个简略的日志零碎。它将由两个程序组成——第一个程序将收回日志音讯,第二个程序接管它们。
在咱们的日志零碎中,接管程序的每个运行正本都将取得音讯。这样,咱们就能够运行一个消费者并将日志保留到磁盘; 同时咱们能够运行另一个消费者在屏幕上打印日志。
最终, 音讯会被播送到所有音讯接受者
Exchanges 交换机
RabbitMQ消息传递模型的核心思想是,生产者永远不会将任何音讯间接发送到队列。实际上,通常生产者甚至不晓得音讯是否会被传递到任何队列。
相同,生产者只能向交换机(Exchange)发送音讯。交换机是一个非常简单的货色。一边接管来自生产者的音讯,另一边将音讯推送到队列。交换器必须确切地晓得如何解决它接管到的音讯。它应该被增加到一个特定的队列中吗?它应该增加到多个队列中吗?或者它应该被抛弃。这些规定由exchange的类型定义。
有几种可用的替换类型:direct、topic、header和fanout。咱们将关注最初一个——fanout。让咱们创立一个这种类型的交换机,并称之为 logs: ch.exchangeDeclare("logs", "fanout");
fanout交换机非常简单。它只是将接管到的所有音讯播送给它所晓得的所有队列。这正是咱们的日志零碎所须要的。
咱们后面应用的队列具备特定的名称(还记得hello和task_queue吗?)可能为队列命名对咱们来说至关重要——咱们须要将工作过程指向同一个队列,在生产者和消费者之间共享队列。
但日志记录案例不是这种状况。咱们想要接管所有的日志音讯,而不仅仅是其中的一部分。咱们还只对以后的最新消息感兴趣,而不是旧音讯。
要解决这个问题,咱们须要两件事。首先,每当咱们连贯到Rabbitmq时,咱们须要一个新的空队列。为此,咱们能够创立一个具备随机名称的队列,或者,更好的办法是让服务器为咱们抉择一个随机队列名称。其次,一旦断开与使用者的连贯,队列就会主动删除。在Java客户端中,当咱们不向queueDeclare()提供任何参数时,会创立一个具备生成名称的、非长久的、独占的、主动删除队列
//主动生成队列名//非长久,独占,主动删除String queueName = ch.queueDeclare().getQueue();
绑定 Bindings
咱们曾经创立了一个fanout交换机和一个队列。当初咱们须要通知exchange向指定队列发送音讯。exchange和队列之间的关系称为绑定。
//指定的队列,与指定的交换机关联起来//成为绑定 -- binding//第三个参数时 routingKey, 因为是fanout交换机, 这里疏忽 routingKeych.queueBind(queueName, "logs", "");
当初, logs交换机将会向咱们指定的队列增加音讯
列出绑定关系:
rabbitmqctl list_bindings
实现的代码
生产者
生产者收回日志音讯,看起来与前一教程没有太大不同。最重要的更改是,咱们当初心愿将音讯公布到logs交换机,而不是无名的日志交换机。咱们须要在发送时提供一个routingKey,然而对于fanout交换机类型,该值会被疏忽。
package m3_pub_sub;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.Scanner;import java.util.concurrent.TimeoutException;public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //连贯 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); //f.setPort(5672);//默认端口能够省略 f.setUsername("admin"); f.setPassword("admin"); Channel c = f.newConnection().createChannel(); //定义交换机,服务器如果曾经存在这个交换机,不会反复创立(名称,类型) //c.exchangeDeclare("logs", "fanout");//两种写法都能够 c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT); //向交换机发送音讯 while (true){ System.out.print("输出音讯:"); String msg = new Scanner(System.in).nextLine(); //参数1:交换机名称 参数2:抉择队列,对fanout交换机有效 c.basicPublish("logs", "", null, msg.getBytes()); } }}
消费者
如果还没有队列绑定到交换器,音讯就会失落,但这对咱们来说没有问题;如果还没有消费者在听,咱们能够平安地抛弃这些信息。
ReceiveLogs.java代码:
package m3_pub_sub;import com.rabbitmq.client.*;import java.io.IOException;import java.util.UUID;import java.util.concurrent.TimeoutException;public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { //连贯 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); //f.setPort(5672);//默认端口能够省略 f.setUsername("admin"); f.setPassword("admin"); Channel c = f.newConnection().createChannel(); //1.定义随机队列 2.定义交换机 3.绑定 String queue = UUID.randomUUID().toString(); c.queueDeclare(queue, false, true, true, null);//非长久,独占,主动删除 c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT); //交换机与队列进行绑定 对于fanout交换机来说,参数三有效 c.queueBind(queue, "logs", ""); //失常从队列接管音讯 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { String msg= new String(message.getBody()); System.out.println("收到: " + msg); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; c.basicConsume(queue, true, deliverCallback, cancelCallback); }}
路由模式
在上一大节,咱们构建了一个简略的日志零碎。咱们可能向多个接收者播送日志音讯。
在这一节,咱们将向其增加一个个性—咱们将只订阅所有音讯中的一部分。例如,咱们只接管要害谬误音讯并保留到日志文件(以节俭磁盘空间),同时依然可能在管制台上打印所有日志音讯。
绑定 Bindings
在上一节,咱们曾经创立了队列与交换机的绑定。应用上面这样的代码:
ch.queueBind(queueName, "logs", "");
绑定是交换机和队列之间的关系。这能够简略地了解为:队列对来自此替换的音讯感兴趣。
绑定能够应用额定的routingKey参数。为了防止与basic_publish参数混同,咱们将其称为bindingKey。这是咱们如何创立一个键绑定:
ch.queueBind(queueName, EXCHANGE_NAME, "black");
bindingKey的含意取决于交换机类型。咱们后面应用的fanout交换机齐全疏忽它。
直连交换机 Direct exchange
上一节中的日志零碎向所有消费者播送所有音讯。咱们心愿扩大它,容许依据音讯的严重性过滤音讯。例如,咱们心愿将日志音讯写入磁盘的程序只接管要害error,而不是在warning或info日志音讯上节约磁盘空间。
后面咱们应用的是fanout交换机,这并没有给咱们太多的灵活性——它只能进行简略的播送。
咱们将用直连交换机(Direct exchange)代替。它背地的路由算法很简略——消息传递到bindingKey与routingKey齐全匹配的队列。为了阐明这一点,请思考以下设置
其中咱们能够看到直连交换机X
,它绑定了两个队列。第一个队列用绑定键orange
绑定,第二个队列有两个绑定,一个绑定black
,另一个绑定键green
。
这样设置,应用路由键orange
公布到交换器的音讯将被路由到队列Q1。带有black
或green
路由键的音讯将转到Q2
。而所有其余音讯都将被抛弃。
多重绑定 Multiple bindings
应用雷同的bindingKey绑定多个队列是齐全容许的。如图所示,能够应用binding key black将X与Q1和Q2绑定。在这种状况下,直连交换机的行为相似于fanout,并将音讯播送给所有匹配的队列。一条路由键为black的音讯将同时发送到Q1和Q2。
发送日志
咱们将在日志零碎中应用这个模型。咱们把音讯发送到一个Direct交换机,而不是fanout。咱们将提供日志级别作为routingKey。这样,接管程序将可能抉择它心愿接管的级别。让咱们首先来看收回日志。
和后面一样,咱们首先须要创立一个exchange:
//参数1: 交换机名//参数2: 交换机类型ch.exchangeDeclare("direct_logs", "direct");
接着来看发送音讯的代码
//参数1: 交换机名//参数2: routingKey, 路由键,这里咱们用日志级别,如"error","info","warning"//参数3: 其余配置属性//参数4: 公布的音讯数据 ch.basicPublish("direct_logs", "error", null, message.getBytes());
订阅
接管音讯的工作原理与后面章节一样,但有一个例外——咱们将为感兴趣的每个日志级别创立一个新的绑定, 示例代码如下:
ch.queueBind(queueName, "logs", "info");ch.queueBind(queueName, "logs", "warning");
残缺的代码
生产者
package m4_routing;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.Scanner;import java.util.concurrent.TimeoutException;public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //连贯 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); //f.setPort(5672);//默认端口能够省略 f.setUsername("admin"); f.setPassword("admin"); Channel c = f.newConnection().createChannel(); //定义交换机 //参数1: 交换机名 //参数2: 交换机类型 c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT); //发送音讯,须要携带路由键 while (true){ System.out.print("输出音讯:"); String msg = new Scanner(System.in).nextLine(); System.out.print("输出路由键:"); String key = new Scanner(System.in).nextLine(); //第二个参数是音讯上携带的路由键 //参数1: 交换机名 //参数2: routingKey, 路由键,这里咱们用日志级别,如"error","info","warning" //参数3: 其余配置属性 //参数4: 公布的音讯数据 c.basicPublish("direct_logs",key,null, msg.getBytes()); } }}
消费者
package m4_routing;import com.rabbitmq.client.*;import java.io.IOException;import java.util.Scanner;import java.util.concurrent.TimeoutException;public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { //连贯 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); //f.setPort(5672);//默认端口能够省略 f.setUsername("admin"); f.setPassword("admin"); Channel c = f.newConnection().createChannel(); //1.定义随机队列 2.定义交换机 3.绑定,指定绑定的关键词 //由 rabbitmq服务器主动命名,默认参数:false,true,true String queue = c.queueDeclare().getQueue(); c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT); System.out.print("输出绑定键:用空格隔开:"); String s = new Scanner(System.in).nextLine(); String[] a = s.split("\\s+");//正则表达式 \s+ 示意一个或多个空格 for (String key : a){ c.queueBind(queue, "direct_logs", key); } //失常的从队列生产数据 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { //取出音讯数据,和音讯上携带的路由键 //定义名字为 direct_logs 的交换机, 它的类型是 "direct" String msg = new String(message.getBody()); String key = message.getEnvelope().getRoutingKey(); System.out.println("收到:"+key+"-"+msg); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; c.basicConsume(queue, true, deliverCallback, cancelCallback); }}
主题模式
在上一大节,咱们改良了日志零碎。咱们没有应用只能进行播送的fanout交换机,而是应用Direct交换机,从而能够选择性接管日志。
尽管应用Direct交换机改良了咱们的零碎,但它依然有局限性——它不能基于多个规范进行路由。
在咱们的日志零碎中,咱们可能不仅心愿依据级别订阅日志,还心愿依据收回日志的源订阅日志。
这将给咱们带来很大的灵活性——咱们可能只想接管来自“cron”的要害谬误,但也要接管来自“kern”的所有日志。
要在日志零碎中实现这一点,咱们须要理解更简单的Topic交换机。
主题交换机 Topic exchange
发送到Topic交换机的音讯,它的的routingKey,必须是由点分隔的多个单词。单词能够是任何货色,但通常是与音讯相干的一些个性。几个无效的routingKey示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。routingKey能够有任意多的单词,最多255个字节。
bindingKey也必须采纳雷同的模式。Topic交换机的逻辑与直连交换机相似——应用特定routingKey发送的音讯将被传递到所有应用匹配bindingKey绑定的队列。bindingKey有两个重要的非凡点:
*
能够通配单个单词。#
能够通配零个或多个单词。
用一个例子来解释这个问题是最简略的
在本例中,咱们将发送形容动物的音讯。这些音讯将应用由三个单词(两个点)组成的routingKey发送。routingKey中的第一个单词示意速度,第二个是色彩,第三个是物种:“<速度>.<色彩>.<物种>
”。
咱们创立三个绑定:Q1与bindingKey “*.orange.*
” 绑定。和Q2是 “*.*.rabbit
” 和 “lazy.#
” 。
这些绑定可概括为:
- Q1对所有橙色的动物感兴趣。
- Q2想接管对于兔子和慢速动物的所有音讯。
将routingKey设置为"quick.orange.rabbit
"的音讯将被发送到两个队列。音讯 "lazy.orange.elephant
“也发送到它们两个。另外”quick.orange.fox
“只会发到第一个队列,”lazy.brown.fox
“只发给第二个。”lazy.pink.rabbit
“将只被传递到第二个队列一次,即便它匹配两个绑定。”quick.brown.fox
"不匹配任何绑定,因而将被抛弃。
如果咱们违反约定,发送一个或四个单词的信息,比方"orange
“或”quick.orange.male.rabbit
",会产生什么?这些音讯将不匹配任何绑定,并将失落。
另外,"lazy.orange.male.rabbit
",即便它有四个单词,也将匹配最初一个绑定,并将被传递到第二个队列。
实现的代码
生产者
package m5_topic;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.Scanner;import java.util.concurrent.TimeoutException;public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //连贯 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); //f.setPort(5672);//默认端口能够省略 f.setUsername("admin"); f.setPassword("admin"); Channel c = f.newConnection().createChannel(); //定义交换机 c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC); //发送音讯,须要携带路由键 while (true){ System.out.print("输出音讯:"); String msg = new Scanner(System.in).nextLine(); System.out.print("输出路由键:"); String key = new Scanner(System.in).nextLine(); //第二个参数是音讯上携带的路由键 c.basicPublish("topic_logs",key,null, msg.getBytes()); } }}package m5_topic;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.Scanner;import java.util.concurrent.TimeoutException;public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //连贯 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); //f.setPort(5672);//默认端口能够省略 f.setUsername("admin"); f.setPassword("admin"); Channel c = f.newConnection().createChannel(); //定义交换机 c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC); //发送音讯,须要携带路由键 while (true){ System.out.print("输出音讯:"); String msg = new Scanner(System.in).nextLine(); System.out.print("输出路由键:"); String key = new Scanner(System.in).nextLine(); //第二个参数是音讯上携带的路由键 c.basicPublish("topic_logs",key,null, msg.getBytes()); } }}
消费者
package m5_topic;import com.rabbitmq.client.*;import java.io.IOException;import java.util.Scanner;import java.util.concurrent.TimeoutException;public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { //连贯 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); //f.setPort(5672);//默认端口能够省略 f.setUsername("admin"); f.setPassword("admin"); Channel c = f.newConnection().createChannel(); //1.定义随机队列 2.定义交换机 3.绑定,指定绑定的关键词 //由 rabbitmq服务器主动命名,默认参数:false,true,true String queue = c.queueDeclare().getQueue(); c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC); System.out.print("输出绑定键:用空格隔开:"); String s = new Scanner(System.in).nextLine(); String[] a = s.split("\\s+");//正则表达式 \s+ 示意一个或多个空格 for (String key : a){ c.queueBind(queue, "topic_logs", key); } //失常的从队列生产数据 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { //取出音讯数据,和音讯上携带的路由键 String msg = new String(message.getBody()); String key = message.getEnvelope().getRoutingKey(); System.out.println("收到:"+key+"-"+msg); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; c.basicConsume(queue, true, deliverCallback, cancelCallback); }}
RPC模式
如果咱们须要在近程电脑上运行一个办法,并且还要期待一个返回后果该怎么办?这和后面的例子不太一样, 这种模式咱们通常称为近程过程调用,即RPC.
在本节中,咱们将会学习应用RabbitMQ去搭建一个RPC零碎:一个客户端和一个能够降级(扩大)的RPC服务器。为了模仿一个耗时工作,咱们将创立一个返回斐波那契数列的虚构的RPC服务。
客户端
在客户端定义一个RPCClient类,并定义一个call()办法,这个办法发送一个RPC申请,并期待接管响应后果
RPCClient client = new RPCClient();String result = client.call("4");System.out.println( "第四个斐波那契数是: " + result);
回调队列 Callback Queue
应用RabbitMQ去实现RPC很容易。一个客户端发送申请信息,并失去一个服务器端回复的响应信息。为了失去响应信息,咱们须要在申请的时候发送一个“回调”队列地址。咱们能够应用默认队列。上面是示例代码:
//定义回调队列,//主动生成对列名,非长久,独占,主动删除callbackQueueName = ch.queueDeclare().getQueue();//用来设置回调队列的参数对象BasicProperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build();//发送调用音讯ch.basicPublish("", "rpc_queue", props, message.getBytes());
音讯属性 Message Properties
AMQP 0-9-1协定定义了音讯的14个属性。大部分属性很少应用,上面是比拟罕用的4个:deliveryMode
:将音讯标记为长久化(值为2)或非长久化(任何其余值)。contentType
:用于形容mime类型。例如,对于常常应用的JSON格局,将此属性设置为:application/json
。replyTo
:通常用于指定回调队列。correlationId
:将RPC响应与申请关联起来十分有用。
关联id (correlationId):
在下面的代码中,咱们会为每个RPC申请创立一个回调队列。 这是十分低效的,这里还有一个更好的办法:让咱们为每个客户端创立一个回调队列。
这就提出了一个新的问题,在队列中失去一个响应时,咱们不分明这个响应所对应的是哪一条申请。这时候就须要应用关联id(correlationId)。咱们将为每一条申请设置惟一的的id值。稍后,当咱们在回调队列里收到一条音讯的时候,咱们将查看它的id属性,这样咱们就能够匹配对应的申请和响应。如果咱们发现了一个未知的id值,咱们能够平安的抛弃这条音讯,因为它不属于咱们的申请。
小结
RPC的工作形式是这样的:
- 对于RPC申请,客户端发送一条带有两个属性的音讯:replyTo,设置为仅为申请创立的匿名独占队列,和correlationId,设置为每个申请的惟一id值。
- 申请被发送到rpc_queue队列。
- RPC工作过程(即:服务器)在队列上期待申请。当一个申请呈现时,它执行工作,并应用replyTo字段中的队列将后果发回客户机。
- 客户机在回应音讯队列上期待数据。当音讯呈现时,它查看correlationId属性。如果匹配申请中的值,则向程序返回该响应数据。
实现的代码
服务器端
package rabbitmq.rpc;import java.io.IOException;import java.util.Random;import java.util.Scanner;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;import com.rabbitmq.client.Delivery;import com.rabbitmq.client.AMQP.BasicProperties;public class RPCServer { public static void main(String[] args) throws Exception { ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection c = f.newConnection(); Channel ch = c.createChannel(); /* * 定义队列 rpc_queue, 将从它接管申请信息 * * 参数: * 1. queue, 对列名 * 2. durable, 长久化 * 3. exclusive, 排他 * 4. autoDelete, 主动删除 * 5. arguments, 其余参数属性 */ ch.queueDeclare("rpc_queue",false,false,false,null); ch.queuePurge("rpc_queue");//革除队列中的内容 ch.basicQos(1);//一次只接管一条音讯 //收到申请音讯后的回调对象 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { //解决收到的数据(要求第几个斐波那契数) String msg = new String(message.getBody(), "UTF-8"); int n = Integer.parseInt(msg); //求出第n个斐波那契数 int r = fbnq(n); String response = String.valueOf(r); //设置发回响应的id, 与申请id统一, 这样客户端能够把该响应与它的申请进行对应 BasicProperties replyProps = new BasicProperties.Builder() .correlationId(message.getProperties().getCorrelationId()) .build(); /* * 发送响应音讯 * 1. 默认交换机 * 2. 由客户端指定的,用来传递响应音讯的队列名 * 3. 参数(关联id) * 4. 发回的响应音讯 */ ch.basicPublish("",message.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8")); //发送确认音讯 ch.basicAck(message.getEnvelope().getDeliveryTag(), false); } }; // CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; //消费者开始接管音讯, 期待从 rpc_queue接管申请音讯, 不主动确认 ch.basicConsume("rpc_queue", false, deliverCallback, cancelCallback); } protected static int fbnq(int n) { if(n == 1 || n == 2) return 1; return fbnq(n-1)+fbnq(n-2); }}
客户端
package rabbitmq.rpc;import java.io.IOException;import java.util.Scanner;import java.util.UUID;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;import com.rabbitmq.client.Delivery;import com.rabbitmq.client.AMQP.BasicProperties;public class RPCClient { Connection con; Channel ch; public RPCClient() throws Exception { ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setUsername("admin"); f.setPassword("admin"); con = f.newConnection(); ch = con.createChannel(); } public String call(String msg) throws Exception { //主动生成对列名,非长久,独占,主动删除 String replyQueueName = ch.queueDeclare().getQueue(); //生成关联id String corrId = UUID.randomUUID().toString(); //设置两个参数: //1. 申请和响应的关联id //2. 传递响应数据的queue BasicProperties props = new BasicProperties.Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); //向 rpc_queue 队列发送申请数据, 申请第n个斐波那契数 ch.basicPublish("", "rpc_queue", props, msg.getBytes("UTF-8")); //用来保留后果的阻塞汇合,取数据时,没有数据会暂停期待 BlockingQueue<String> response = new ArrayBlockingQueue<String>(1); //接管响应数据的回调对象 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { //如果响应音讯的关联id,与申请的关联id雷同,咱们来解决这个响应数据 if (message.getProperties().getCorrelationId().contentEquals(corrId)) { //把收到的响应数据,放入阻塞汇合 response.offer(new String(message.getBody(), "UTF-8")); } } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; //开始从队列接管响应数据 ch.basicConsume(replyQueueName, true, deliverCallback, cancelCallback); //返回保留在汇合中的响应数据 return response.take(); } public static void main(String[] args) throws Exception { RPCClient client = new RPCClient(); while (true) { System.out.print("求第几个斐波那契数:"); int n = new Scanner(System.in).nextInt(); String r = client.call(""+n); System.out.println(r); } }}
virtual host
在RabbitMQ中叫做虚构音讯服务器VirtualHost,每个VirtualHost相当于一个绝对独立的RabbitMQ服务器,每个VirtualHost之间是互相隔离的。exchange、queue、message不能互通
创立virtual host: /pd
- 进入虚拟机治理界面
- 增加新的虚拟机’/pd’,名称必须以"/"结尾
- 查看增加的后果
设置虚拟机的用户拜访权限
点击 /pd 虚拟主机, 设置用户 admin 对它的拜访权限
拼多商城整合 rabbitmq
当用户下订单时,咱们的业务零碎间接与数据库通信,把订单保留到数据库中
当零碎流量忽然激增,大量的订单压力,会拖慢业务零碎和数据库系统
咱们须要应答流量峰值,让流量曲线变得平缓,如下图
订单存储的解耦
为了进行流量削峰,咱们引入 rabbitmq 音讯队列,当购物零碎产生订单后,能够把订单数据发送到音讯队列;而订单消费者利用从音讯队列接管订单音讯,并把订单保留到数据库
这样,当流量激增时,大量订单会暂存在rabbitmq中,而订单消费者能够从容地从音讯队列缓缓接管订单,向数据库保留
生产者-发送订单
pom.xml 增加依赖
spring提供了更不便的音讯队列拜访接口,它对RabbitMQ的客户端API进行了封装,应用起来更加不便
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
application.yml
增加RabbitMQ的连贯信息
spring: rabbitmq: host: 192.168.64.140 port: 5672 virtualHost: /pd username: admin password: admin
批改主程序 RunPdAPP
在主程序中增加上面的办法创立Queue实例
当创立RabbitMQ连贯和信道后,Spring的RabbitMQ工具会主动在服务器中创立队列,代码在RabbitAdmin.declareQueues()
办法中
@Bean public Queue getQueue() { Queue q = new Queue("orderQueue", true); return q; }
批改 OrderServiceImpl
//RabbitAutoConfiguration中创立了AmpqTemplate实例 @Autowired AmqpTemplate amqpTemplate; //saveOrder原来的数据库拜访代码全副正文,增加rabbitmq音讯发送代码 public String saveOrder(PdOrder pdOrder) throws Exception { String orderId = generateId(); pdOrder.setOrderId(orderId); amqpTemplate.convertAndSend("orderQueue", pdOrder); return orderId; // String orderId = generateId(); // pdOrder.setOrderId(orderId); // // // PdShipping pdShipping = pdShippingMapper.selectByPrimaryKey(pdOrder.getAddId()); // pdOrder.setShippingName(pdShipping.getReceiverName()); // pdOrder.setShippingCode(pdShipping.getReceiverAddress()); // pdOrder.setStatus(1);// // pdOrder.setPaymentType(1); // pdOrder.setPostFee(10D); // pdOrder.setCreateTime(new Date()); // // double payment = 0; // List<ItemVO> itemVOs = selectCartItemByUseridAndItemIds(pdOrder.getUserId(), pdOrder.getItemIdList()); // for (ItemVO itemVO : itemVOs) { // PdOrderItem pdOrderItem = new PdOrderItem(); // String id = generateId(); // //String id="2"; // pdOrderItem.setId(id); // pdOrderItem.setOrderId(orderId); // pdOrderItem.setItemId("" + itemVO.getPdItem().getId()); // pdOrderItem.setTitle(itemVO.getPdItem().getTitle()); // pdOrderItem.setPrice(itemVO.getPdItem().getPrice()); // pdOrderItem.setNum(itemVO.getPdCartItem().getNum()); // // payment = payment + itemVO.getPdCartItem().getNum() * itemVO.getPdItem().getPrice(); // pdOrderItemMapper.insert(pdOrderItem); // } // pdOrder.setPayment(payment); // pdOrderMapper.insert(pdOrder); // return orderId; }
消费者-接管订单,并保留到数据库
pd-web我的项目复制为pd-order-consumer
批改 application.yml
把端口批改成 81
server: port: 81spring: datasource: type: com.alibaba.druid.pool.DruidDataSource driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/pd_store?useUnicode=true&characterEncoding=utf8 username: root password: root rabbitmq: host: 192.168.64.140 port: 5672 virtualHost: /pd username: admin password: adminmybatis: #typeAliasesPackage: cn.tedu.ssm.pojo mapperLocations: classpath:com.pd.mapper/*.xmllogging: level: cn.tedu.ssm.mapper: debug
删除无关代码
pd-order-consumer我的项目只须要从 RabbitMQ 接管订单数据, 再保留到数据库即可, 所以我的项目中只须要保留这部分代码
- 删除 com.pd.controller 包
- 删除 com.pd.payment.utils 包
- 删除无关的 Service,只保留 OrderService 和 OrderServiceImpl
新建 OrderConsumer
package com.pd;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import com.pd.pojo.PdOrder;import com.pd.service.OrderService;@Componentpublic class OrderConsumer { //收到订单数据后,会调用订单的业务代码,把订单保留到数据库 @Autowired private OrderService orderService; //增加该注解后,会从指定的orderQueue接管音讯, //并把数据转为 PdOrder 实例传递到此办法 @RabbitListener(queues="orderQueue") public void save(PdOrder pdOrder) { System.out.println("消费者"); System.out.println(pdOrder.toString()); try { orderService.saveOrder(pdOrder); } catch (Exception e) { e.printStackTrace(); } }}
批改 OrderServiceImpl 的 saveOrder() 办法
public String saveOrder(PdOrder pdOrder) throws Exception { // String orderId = generateId(); // pdOrder.setOrderId(orderId); // // amqpTemplate.convertAndSend("orderQueue", pdOrder); // return orderId; // // // // String orderId = generateId(); // pdOrder.setOrderId(orderId); //从RabbitMQ接管的订单数据, //曾经在上游订单业务中生成过id,这里不再从新生成id //间接获取该订单的id String orderId = pdOrder.getOrderId(); PdShipping pdShipping = pdShippingMapper.selectByPrimaryKey(pdOrder.getAddId()); pdOrder.setShippingName(pdShipping.getReceiverName()); pdOrder.setShippingCode(pdShipping.getReceiverAddress()); pdOrder.setStatus(1);// pdOrder.setPaymentType(1); pdOrder.setPostFee(10D); pdOrder.setCreateTime(new Date()); double payment = 0; List<ItemVO> itemVOs = selectCartItemByUseridAndItemIds(pdOrder.getUserId(), pdOrder.getItemIdList()); for (ItemVO itemVO : itemVOs) { PdOrderItem pdOrderItem = new PdOrderItem(); String id = generateId(); //String id="2"; pdOrderItem.setId(id); pdOrderItem.setOrderId(orderId); pdOrderItem.setItemId("" + itemVO.getPdItem().getId()); pdOrderItem.setTitle(itemVO.getPdItem().getTitle()); pdOrderItem.setPrice(itemVO.getPdItem().getPrice()); pdOrderItem.setNum(itemVO.getPdCartItem().getNum()); payment = payment + itemVO.getPdCartItem().getNum() * itemVO.getPdItem().getPrice(); pdOrderItemMapper.insert(pdOrderItem); } pdOrder.setPayment(payment); pdOrderMapper.insert(pdOrder); return orderId; }
手动确认
application.yml
spring: rabbitmq: listener: simple: acknowledge-mode: manual
OrderConsumer
package com.pd;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import com.pd.pojo.PdOrder;import com.pd.service.OrderService;import com.rabbitmq.client.Channel;@Componentpublic class OrderConsumer { //收到订单数据后,会调用订单的业务代码,把订单保留到数据库 @Autowired private OrderService orderService; //增加该注解后,会从指定的orderQueue接管音讯, //并把数据转为 PdOrder 实例传递到此办法 @RabbitListener(queues="orderQueue") public void save(PdOrder pdOrder, Channel channel, Message message) { System.out.println("消费者"); System.out.println(pdOrder.toString()); try { orderService.saveOrder(pdOrder); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); } }}