共计 35159 个字符,预计需要花费 88 分钟才能阅读完成。
RabbitMQ 应用场景
服务解耦
假如有这样一个场景, 服务 A 产生数据, 而服务 B,C,D 须要这些数据, 那么咱们能够在 A 服务中间接调用 B,C,D 服务, 把数据传递到上游服务即可
然而, 随着咱们的利用规模不断扩大, 会有更多的服务须要 A 的数据, 如果有几十甚至几百个上游服务, 而且会一直变更, 再加上还要思考上游服务出错的状况, 那么 A 服务中调用代码的保护会极为艰难
这是因为服务之间耦合度过于严密
再来思考用 RabbitMQ 解耦的状况
A 服务只须要向音讯服务器发送音讯, 而不必思考谁须要这些数据; 上游服务如果须要数据, 自行从音讯服务器订阅音讯, 不再须要数据时则勾销订阅即可
流量削峰
假如咱们有一个利用, 平时访问量是每秒 300 申请, 咱们用一台服务器即可轻松应答
而在高峰期, 访问量霎时翻了十倍, 达到每秒 3000 次申请, 那么单台服务器必定无奈应答, 这时咱们能够思考减少到 10 台服务器, 来扩散拜访压力
但如果这种刹时顶峰的状况每天只呈现一次, 每次只有半小时, 那么咱们 10 台服务器在少数工夫都只分担每秒几十次申请, 这样就有点浪费资源了
这种状况, 咱们就能够应用 RabbitMQ 来进行流量削峰, 顶峰状况下, 霎时呈现的大量申请数据, 先发送到音讯队列服务器, 排队期待被解决, 而咱们的利用, 能够缓缓的从音讯队列接管申请数据进行解决, 这样把数据处理工夫拉长, 以加重刹时压力
这是音讯队列服务器十分典型的利用场景
异步调用
思考定外卖领取胜利的状况
领取后要发送领取胜利的告诉, 再寻找外卖小哥来进行配送, 而寻找外卖小哥的过程十分耗时, 尤其是高峰期, 可能要期待几十秒甚至更长
这样就造成整条调用链路响应十分迟缓
而如果咱们引入 RabbitMQ 音讯队列, 订单数据能够发送到音讯队列服务器, 那么调用链路也就能够到此结束, 订单零碎则能够立刻失去响应, 整条链路的响应工夫只有 200 毫秒左右
寻找外卖小哥的利用能够以异步的形式从音讯队列接管订单音讯, 再执行耗时的寻找操作
rabbitmq 基本概念
RabbitMQ 是一种消息中间件,用于解决来自客户端的异步音讯。服务端将要发送的音讯放入到队列池中。接收端能够依据 RabbitMQ 配置的转发机制接管服务端发来的音讯。RabbitMQ 根据指定的转发规定进行音讯的转发、缓冲和长久化操作,次要用在多服务器间或单服务器的子系统间进行通信,是分布式系统规范的配置。
Exchange
承受生产者发送的音讯,并依据 Binding 规定将音讯路由给服务器中的队列。ExchangeType 决定了 Exchange 路由音讯的行为。在 RabbitMQ 中,ExchangeType 罕用的有 direct、Fanout 和 Topic 三种。
Message Queue
音讯队列。咱们发送给 RabbitMQ 的音讯最初都会达到各种 queue,并且存储在其中(如果路由找不到相应的 queue 则数据会失落),期待消费者来取。
Binding Key
它示意的是 Exchange 与 Message Queue 是通过 binding key 进行分割的,这个关系是固定。
Routing Key
生产者在将音讯发送给 Exchange 的时候,个别会指定一个 routing key,来指定这个音讯的路由规定。这个 routing key 须要与 Exchange Type 及 binding key 联结应用能力生,咱们的生产者只须要通过指定 routing key 来决定音讯流向哪里。
rabbitmq 装置
离线装置
下载离线安装包文件
- https://download.csdn.net/download/weixin_38305440/12265906
上传离线安装包
rabbitmq-install
目录上传到/root
切换到 rabbitmq-install
目录
cd rabbitmq-install
装置
rpm -ivh *.rpm
Yum 在线装置
以下内容来自 RabbitMQ 官网手册
rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
# centos7 用这个
cat <<EOF > /etc/yum.repos.d/rabbitmq.repo
[bintray-rabbitmq-server]
name=bintray-rabbitmq-rpm
baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/7/
gpgcheck=0
repo_gpgcheck=0
enabled=1
EOF
# centos6 用这个
cat <<EOF > /etc/yum.repos.d/rabbitmq.repo
[bintray-rabbitmq-server]
name=bintray-rabbitmq-rpm
baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/6/
gpgcheck=0
repo_gpgcheck=0
enabled=1
EOF
yum makecache
yum install socat
wget https://github.com/rabbitmq/erlang-rpm/releases/download/v21.3.8.12/erlang-21.3.8.12-1.el7.x86_64.rpm
rpm -ivh erlang-21.3.8.12-1.el7.x86_64.rpm --force --nodeps
yum install rabbitmq-server
启动 rabbitmq 服务器
# 设置服务, 开机主动启动
systemctl enable rabbitmq-server
# 启动服务
systemctl start rabbitmq-server
rabbitmq 治理界面
启用治理界面
# 开启治理界面插件
rabbitmq-plugins enable rabbitmq_management
# 防火墙关上 15672 治理端口
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --reload
重启 RabbitMQ 服务
systemctl restart rabbitmq-server
拜访
拜访服务器的 15672
端口, 例如:
http://192.168.64.140:15672
增加用户
# 增加用户
rabbitmqctl add_user admin admin
# 新用户设置用户为超级管理员
rabbitmqctl set_user_tags admin administrator
设置拜访权限
- 用户治理参考
https://www.cnblogs.com/AloneSword/p/4200051.html
凋谢客户端连贯端口
# 关上客户端连贯端口
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --reload
次要端口介绍
- 4369 – erlang 发现口
- 5672 – client 端通信口
- 15672 – 治理界面 ui 端口
- 25672 – server 间外部通信口
rabbitmq 六种工作模式
简略模式
RabbitMQ 是一个消息中间件,你能够设想它是一个邮局。当你把函件放到邮箱里时,可能确信邮递员会正确地递送你的函件。RabbitMq 就是一个邮箱、一个邮局和一个邮递员。
- 发送音讯的程序是生产者
- 队列就代表一个邮箱。尽管音讯会流经 RbbitMQ 和你的应用程序,但音讯只能被存储在队列里。队列存储空间只受服务器内存和磁盘限度,它实质上是一个大的音讯缓冲区。多个生产者能够向同一个队列发送音讯,多个消费者也能够从同一个队列接管音讯.
- 消费者期待从队列接管音讯
pom.xml
增加 slf4j 依赖, 和 rabbitmq amqp 依赖
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tedu</groupId>
<artifactId>rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.8.0-alpha2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.8.0-alpha2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
生产者发送音讯
package m1;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {public static void main(String[] args) throws Exception {
// 创立连贯工厂, 并设置连贯信息
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);// 可选,5672 是默认端口
f.setUsername("admin");
f.setPassword("admin");
/*
* 与 rabbitmq 服务器建设连贯,
* rabbitmq 服务器端应用的是 nio, 会复用 tcp 连贯,
* 并开拓多个信道与客户端通信
* 以加重服务器端建设连贯的开销
*/
Connection c = f.newConnection();
// 建设信道
Channel ch = c.createChannel();
/*
* 申明队列, 会在 rabbitmq 中创立一个队列
* 如果曾经创立过该队列,就不能再应用其余参数来创立
*
* 参数含意:
* -queue: 队列名称
* -durable: 队列长久化,true 示意 RabbitMQ 重启后队列仍存在
* -exclusive: 排他,true 示意限度仅以后连贯可用
* -autoDelete: 当最初一个消费者断开后, 是否删除队列
* -arguments: 其余参数
*/
ch.queueDeclare("helloworld", false,false,false,null);
/*
* 公布音讯
* 这里把音讯向默认交换机发送.
* 默认交换机隐含与所有队列绑定,routing key 即为队列名称
*
* 参数含意:
* -exchange: 交换机名称, 空串示意默认交换机 "(AMQP default)", 不能用 null
* -routingKey: 对于默认交换机, 路由键就是指标队列名称
* -props: 其余参数, 例如头信息
* -body: 音讯内容 byte[]数组
*/
ch.basicPublish("","helloworld", null,"Hello world!".getBytes());
System.out.println("音讯已发送");
c.close();}
}
消费者接管音讯
package m1;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {public static void main(String[] args) throws Exception {
// 连贯工厂
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setUsername("admin");
f.setPassword("admin");
// 建设连贯
Connection c = f.newConnection();
// 建设信道
Channel ch = c.createChannel();
// 申明队列, 如果该队列曾经创立过, 则不会反复创立
ch.queueDeclare("helloworld",false,false,false,null);
System.out.println("期待接收数据");
// 收到音讯后用来解决音讯的回调对象
DeliverCallback callback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {String msg = new String(message.getBody(), "UTF-8");
System.out.println("收到:"+msg);
}
};
// 消费者勾销时的回调对象
CancelCallback cancel = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {}};
ch.basicConsume("helloworld", true, callback, cancel);
}
}
工作模式
工作队列 (即工作队列) 背地的次要思维是防止立刻执行资源密集型工作,并且必须期待它实现。相同,咱们将工作安顿在稍后实现。
咱们将工作封装为音讯并将其发送到队列。后盾运行的工作过程将获取工作并最终执行工作。当运行多个消费者时,工作将在它们之间散发。
应用工作队列的一个长处是可能轻松地并行工作。如果咱们正在积压工作工作,咱们能够增加更多工作过程,这样就能够轻松扩大。
生产者发送音讯
这里模仿耗时工作, 发送的音讯中, 每个点使工作过程暂停一秒钟, 例如 ”Hello…” 将破费 3 秒钟来解决
package m2_work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.util.Scanner;
public class Producer {public static void main(String[] args) throws Exception {
// 连贯
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
//f.setPort(5672);// 默认端口能够省略
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
// 定义队列
// 队列如果在服务器端曾经存在,属性不可变(第二个参数改为 true)
c.queueDeclare("task_queue", false, false, false, null);
while (true){System.out.print("输出音讯:");
String msg = new Scanner(System.in).nextLine();
if ("exit".equals(msg)) {break;}
c.basicPublish("","helloworld", null, msg.getBytes());
}
}
}
消费者接管音讯
package m2_work;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {public static void main(String[] args) throws Exception {
// 连贯
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
//f.setPort(5672);// 默认端口能够省略
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
// 定义队列
c.queueDeclare("helloworld", false, false, false, null);
// 解决音讯的回调对象
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
// 遍历音讯字符串,每个点字符都暂停一秒
String msg = new String(message.getBody());
System.out.println("收到:"+msg);
for (int i = 0 ; i<msg.length();i++){if (msg.charAt(i)=='.'){
try {Thread.sleep(1000);
} catch (InterruptedException e) {}}
}
System.out.println("音讯解决完结");
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {}};
// 生产数据
// 第二个参数:true - 主动确认 false - 手动确认
c.basicConsume("task_queue", true, deliverCallback, cancelCallback);
}
}
运行测试
运行:
- 一个生产者
- 两个消费者
生产者发送多条音讯,
如: 1,2,3,4,5. 两个消费者别离收到:
- 消费者一: 1,3,5
- 消费者二: 2,4
rabbitmq 在所有消费者中轮询散发音讯, 把音讯平均地发送给所有消费者
音讯确认
一个消费者接管音讯后, 在音讯没有齐全解决完时就挂掉了, 那么这时会产生什么呢?
就当初的代码来说,rabbitmq 把音讯发送给消费者后, 会立刻删除音讯, 那么消费者挂掉后, 它没来得及解决的音讯就会失落
如果生产者发送以下音讯:
1…
2
3
4
5
两个消费者别离收到:
* 消费者一: 1…, 3, 5
* 消费者二: 2, 4
当消费者一收到所有音讯后, 要话费 7 秒工夫来解决第一条音讯, 这期间如果敞开该消费者, 那么 1 未解决实现,3,5 则没有被解决
咱们并不想失落任何音讯, 如果一个消费者挂掉, 咱们想把它的工作音讯派发给其余消费者
为了确保音讯不会失落,rabbitmq 反对音讯确认(回执)。当一个音讯被消费者接管到并且执行实现后,消费者会发送一个 ack (acknowledgment) 给 rabbitmq 服务器, 通知他我曾经执行实现了,你能够把这条音讯删除了。
如果一个消费者没有返回音讯确认就挂掉了(信道敞开,连贯敞开或者 TCP 链接失落),rabbitmq 就会明确,这个音讯没有被解决实现,rebbitmq 就会把这条音讯从新放入队列,如果在这时有其余的消费者在线,那么 rabbitmq 就会迅速的把这条消息传递给其余的消费者,这样就确保了没有音讯会失落。
这里不存在音讯超时, rabbitmq 只在消费者挂掉时从新分派音讯, 即便消费者花十分久的工夫来解决音讯也能够
手动音讯确认默认是开启的,后面的例子咱们通过 autoAck=ture 把它敞开了。咱们当初要把它设置为 false,而后工作过程解决完动向工作时, 发送一个音讯确认(回执)。
package m2_work;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {public static void main(String[] args) throws Exception {
// 连贯
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
//f.setPort(5672);// 默认端口能够省略
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
// 定义队列
c.queueDeclare("task_queue", false, false, false, null);
// 解决音讯的回调对象
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
// 遍历音讯字符串,每个点字符都暂停一秒
String msg = new String(message.getBody());
System.out.println("收到:"+msg);
for (int i = 0 ; i<msg.length();i++){if (msg.charAt(i)=='.'){
try {Thread.sleep(1000);
} catch (InterruptedException e) {}}
}
// 发送回执(参数 1:回值(long 类型的编码)参数 2:是否一次确认多条(boolean,个别都是 false))c.basicAck(message.getEnvelope().getDeliveryTag(),false);// 封装的回值对象 (envelope) 的标签(deliveryTag)
System.out.println("音讯解决完结");
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {}};
// 生产数据
// 第二个参数:true - 主动确认 false - 手动确认
c.basicConsume("task_queue", false, deliverCallback, cancelCallback);
}
}
应用以上代码,就算杀掉一个正在解决音讯的工作过程也不会失落任何音讯,工作过程挂掉之后,没有确认的音讯就会被主动从新传递。
遗记确认 (ack) 是一个常见的谬误, 这样结果是很重大的, 因为未确认的音讯不会被开释, rabbitmq 会吃掉越来越多的内存
能够应用上面命令打印工作队列中未确认音讯的数量
rabbitmqctl list_queues name >messages_readymessages_unacknowledged
当解决音讯时异常中断, 能够抉择让音讯重回队列从新发送.
nack
操作能够是音讯重回队列, 能够应用 basicNack() 办法:// requeue 为 true 时重回队列, 反之音讯被抛弃或被发送到死信队列 c.basicNack(tag, multiple, requeue)
正当地散发
rabbitmq 会一次把多个音讯分发给消费者, 这样可能造成有的消费者十分忙碌, 而其它消费者闲暇. 而 rabbitmq 对此无所不知, 依然会平均的散发音讯
咱们能够应用 basicQos(1)
办法, 这通知 rabbitmq 一次只向消费者发送一条音讯, 在返回确认回执前, 不要向消费者发送新音讯. 而是把音讯发给下一个闲暇的消费者
音讯长久化
当 rabbitmq 敞开时, 咱们队列中的音讯依然会失落, 除非明确要求它不要失落数据
要求 rabbitmq 不失落数据要做如下两点: 把队列和音讯都设置为可长久化 (durable)
队列设置为可长久化, 能够在定义队列时指定参数 durable 为 true
// 第二个参数是长久化参数 durable
ch.queueDeclare("helloworld", true, false, false, null);
因为之前咱们曾经定义过队列 ”hello” 是不可长久化的, 对已存在的队列, rabbitmq 不容许对其定义不同的参数, 否则会出错, 所以这里咱们定义一个不同名字的队列 ”task_queue”
// 定义一个新的队列, 名为 task_queue
// 第二个参数是长久化参数 durable
ch.queueDeclare("task_queue", true, false, false, null);
生产者和消费者代码都要批改
这样即便 rabbitmq 重新启动, 队列也不会失落. 当初咱们再设置队列中音讯的长久化, 应用 MessageProperties.PERSISTENT_TEXT_PLAIN
参数
// 第三个参数设置音讯长久化
ch.basicPublish("","task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
msg.getBytes());
上面是 ” 工作模式 ” 最终实现的生产者和消费者代码
生产者代码
package m2_work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.util.Scanner;
public class Producer {public static void main(String[] args) throws Exception {
// 连贯
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
//f.setPort(5672);// 默认端口能够省略
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
// 定义队列
// 队列如果在服务器端曾经存在,属性不可变(第二个参数改为 true)
c.queueDeclare("task_queue", true, false, false, null);
// 发送音讯
// 循环输出音讯发送
/*
输出音讯:ger.err....er.....er.....e....r......
消费者收到的音讯,遍历字符串,每个点字符都暂停一秒,模仿耗时音讯
*/
while (true){System.out.print("输出音讯:");
String msg = new Scanner(System.in).nextLine();
c.basicPublish("","task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());// 第三个参数代表,长久的纯文本文件
}
}
}
消费者代码
package m2_work;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {public static void main(String[] args) throws Exception {
// 连贯
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
//f.setPort(5672);// 默认端口能够省略
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
// 定义队列
c.queueDeclare("task_queue", true, false, false, null);
// 解决音讯的回调对象
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
// 遍历音讯字符串,每个点字符都暂停一秒
String msg = new String(message.getBody());
System.out.println("收到:"+msg);
for (int i = 0 ; i<msg.length();i++){if (msg.charAt(i)=='.'){
try {Thread.sleep(1000);
} catch (InterruptedException e) {}}
}
// 发送回执(参数 1:回值(long 类型的编码)参数 2:是否一次确认多条(boolean,个别都是 false))c.basicAck(message.getEnvelope().getDeliveryTag(),false);// 封装的回值对象 (envelope) 的标签(deliveryTag)
System.out.println("音讯解决完结");
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {}};
// 每次只承受解决一条音讯,解决实现之前不承受下一条
// 必须在手动 ACK 模式下才无效
c.basicQos(1);
// 生产数据
// 第二个参数:true - 主动确认 false - 手动确认
c.basicConsume("task_queue", false, deliverCallback, cancelCallback);
}
}
公布订阅模式
在后面的例子中,咱们工作音讯只交付给一个工作过程。在这部分,咱们将做一些齐全不同的事件——咱们将向多个消费者传递同一条音讯。这种模式称为“公布 / 订阅”。
为了阐明该模式,咱们将构建一个简略的日志零碎。它将由两个程序组成——第一个程序将收回日志音讯,第二个程序接管它们。
在咱们的日志零碎中,接管程序的每个运行正本都将取得音讯。这样,咱们就能够运行一个消费者并将日志保留到磁盘; 同时咱们能够运行另一个消费者在屏幕上打印日志。
最终, 音讯会被播送到所有音讯接受者
Exchanges 交换机
RabbitMQ 消息传递模型的核心思想是,生产者永远不会将任何音讯间接发送到队列。实际上,通常生产者甚至不晓得音讯是否会被传递到任何队列。
相同,生产者只能向交换机 (Exchange) 发送音讯。交换机是一个非常简单的货色。一边接管来自生产者的音讯,另一边将音讯推送到队列。交换器必须确切地晓得如何解决它接管到的音讯。它应该被增加到一个特定的队列中吗? 它应该增加到多个队列中吗? 或者它应该被抛弃。这些规定由 exchange 的类型定义。
有几种可用的替换类型:direct、topic、header 和 fanout。咱们将关注最初一个——fanout。让咱们创立一个这种类型的交换机,并称之为 logs: ch.exchangeDeclare("logs", "fanout");
fanout 交换机非常简单。它只是将接管到的所有音讯播送给它所晓得的所有队列。这正是咱们的日志零碎所须要的。
咱们后面应用的队列具备特定的名称 (还记得 hello 和 task_queue 吗?) 可能为队列命名对咱们来说至关重要——咱们须要将工作过程指向同一个队列, 在生产者和消费者之间共享队列。
但日志记录案例不是这种状况。咱们想要接管所有的日志音讯,而不仅仅是其中的一部分。咱们还只对以后的最新消息感兴趣,而不是旧音讯。
要解决这个问题,咱们须要两件事。首先,每当咱们连贯到 Rabbitmq 时,咱们须要一个新的空队列。为此,咱们能够创立一个具备随机名称的队列,或者,更好的办法是让服务器为咱们抉择一个随机队列名称。其次,一旦断开与使用者的连贯,队列就会主动删除。在 Java 客户端中,当咱们不向 queueDeclare()提供任何参数时,会创立一个具备生成名称的、非长久的、独占的、主动删除队列
// 主动生成队列名
// 非长久, 独占, 主动删除
String queueName = ch.queueDeclare().getQueue();
绑定 Bindings
咱们曾经创立了一个 fanout 交换机和一个队列。当初咱们须要通知 exchange 向指定队列发送音讯。exchange 和队列之间的关系称为绑定。
// 指定的队列, 与指定的交换机关联起来
// 成为绑定 -- binding
// 第三个参数时 routingKey, 因为是 fanout 交换机, 这里疏忽 routingKey
ch.queueBind(queueName, "logs", "");
当初, logs 交换机将会向咱们指定的队列增加音讯
列出绑定关系:
rabbitmqctl list_bindings
实现的代码
生产者
生产者收回日志音讯,看起来与前一教程没有太大不同。最重要的更改是,咱们当初心愿将音讯公布到 logs 交换机,而不是无名的日志交换机。咱们须要在发送时提供一个 routingKey,然而对于 fanout 交换机类型,该值会被疏忽。
package m3_pub_sub;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {
// 连贯
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
//f.setPort(5672);// 默认端口能够省略
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
// 定义交换机, 服务器如果曾经存在这个交换机, 不会反复创立(名称,类型)//c.exchangeDeclare("logs", "fanout");// 两种写法都能够
c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
// 向交换机发送音讯
while (true){System.out.print("输出音讯:");
String msg = new Scanner(System.in).nextLine();
// 参数 1:交换机名称 参数 2:抉择队列, 对 fanout 交换机有效
c.basicPublish("logs", "", null, msg.getBytes());
}
}
}
消费者
如果还没有队列绑定到交换器,音讯就会失落,但这对咱们来说没有问题; 如果还没有消费者在听,咱们能够平安地抛弃这些信息。
ReceiveLogs.java 代码:
package m3_pub_sub;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {
// 连贯
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
//f.setPort(5672);// 默认端口能够省略
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
//1. 定义随机队列 2. 定义交换机 3. 绑定
String queue = UUID.randomUUID().toString();
c.queueDeclare(queue, false, true, true, null);// 非长久,独占,主动删除
c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
// 交换机与队列进行绑定 对于 fanout 交换机来说,参数三有效
c.queueBind(queue, "logs", "");
// 失常从队列接管音讯
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {String msg= new String(message.getBody());
System.out.println("收到:" + msg);
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {}};
c.basicConsume(queue, true, deliverCallback, cancelCallback);
}
}
路由模式
在上一大节,咱们构建了一个简略的日志零碎。咱们可能向多个接收者播送日志音讯。
在这一节,咱们将向其增加一个个性—咱们将只订阅所有音讯中的一部分。例如,咱们只接管要害谬误音讯并保留到日志文件(以节俭磁盘空间),同时依然可能在管制台上打印所有日志音讯。
绑定 Bindings
在上一节,咱们曾经创立了队列与交换机的绑定。应用上面这样的代码:
ch.queueBind(queueName, "logs", "");
绑定是交换机和队列之间的关系。这能够简略地了解为: 队列对来自此替换的音讯感兴趣。
绑定能够应用额定的 routingKey 参数。为了防止与 basic_publish 参数混同,咱们将其称为 bindingKey。这是咱们如何创立一个键绑定:
ch.queueBind(queueName, EXCHANGE_NAME, "black");
bindingKey 的含意取决于交换机类型。咱们后面应用的 fanout 交换机齐全疏忽它。
直连交换机 Direct exchange
上一节中的日志零碎向所有消费者播送所有音讯。咱们心愿扩大它,容许依据音讯的严重性过滤音讯。例如,咱们心愿将日志音讯写入磁盘的程序只接管要害 error,而不是在 warning 或 info 日志音讯上节约磁盘空间。
后面咱们应用的是 fanout 交换机,这并没有给咱们太多的灵活性——它只能进行简略的播送。
咱们将用直连交换机 (Direct exchange) 代替。它背地的路由算法很简略——消息传递到 bindingKey 与 routingKey 齐全匹配的队列。为了阐明这一点,请思考以下设置
其中咱们能够看到直连交换机 X
,它绑定了两个队列。第一个队列用绑定键orange
绑定,第二个队列有两个绑定,一个绑定black
,另一个绑定键green
。
这样设置,应用路由键 orange
公布到交换器的音讯将被路由到队列 Q1。带有 black
或green
路由键的音讯将转到Q2
。而所有其余音讯都将被抛弃。
多重绑定 Multiple bindings
应用雷同的 bindingKey 绑定多个队列是齐全容许的。如图所示,能够应用 binding key black 将 X 与 Q1 和 Q2 绑定。在这种状况下,直连交换机的行为相似于 fanout,并将音讯播送给所有匹配的队列。一条路由键为 black 的音讯将同时发送到 Q1 和 Q2。
发送日志
咱们将在日志零碎中应用这个模型。咱们把音讯发送到一个 Direct 交换机,而不是 fanout。咱们将提供日志级别作为 routingKey。这样,接管程序将可能抉择它心愿接管的级别。让咱们首先来看收回日志。
和后面一样,咱们首先须要创立一个 exchange:
// 参数 1: 交换机名
// 参数 2: 交换机类型
ch.exchangeDeclare("direct_logs", "direct");
接着来看发送音讯的代码
// 参数 1: 交换机名
// 参数 2: routingKey, 路由键, 这里咱们用日志级别, 如 "error","info","warning"
// 参数 3: 其余配置属性
// 参数 4: 公布的音讯数据
ch.basicPublish("direct_logs", "error", null, message.getBytes());
订阅
接管音讯的工作原理与后面章节一样,但有一个例外——咱们将为感兴趣的每个日志级别创立一个新的绑定, 示例代码如下:
ch.queueBind(queueName, "logs", "info");
ch.queueBind(queueName, "logs", "warning");
残缺的代码
生产者
package m4_routing;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {
// 连贯
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
//f.setPort(5672);// 默认端口能够省略
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
// 定义交换机
// 参数 1: 交换机名
// 参数 2: 交换机类型
c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
// 发送音讯,须要携带路由键
while (true){System.out.print("输出音讯:");
String msg = new Scanner(System.in).nextLine();
System.out.print("输出路由键:");
String key = new Scanner(System.in).nextLine();
// 第二个参数是音讯上携带的路由键
// 参数 1: 交换机名
// 参数 2: routingKey, 路由键, 这里咱们用日志级别, 如 "error","info","warning"
// 参数 3: 其余配置属性
// 参数 4: 公布的音讯数据
c.basicPublish("direct_logs",key,null, msg.getBytes());
}
}
}
消费者
package m4_routing;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {
// 连贯
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
//f.setPort(5672);// 默认端口能够省略
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
//1. 定义随机队列 2. 定义交换机 3. 绑定,指定绑定的关键词
// 由 rabbitmq 服务器主动命名, 默认参数:false,true,true
String queue = c.queueDeclare().getQueue();
c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
System.out.print("输出绑定键:用空格隔开:");
String s = new Scanner(System.in).nextLine();
String[] a = s.split("\\s+");// 正则表达式 \s+ 示意一个或多个空格
for (String key : a){c.queueBind(queue, "direct_logs", key);
}
// 失常的从队列生产数据
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
// 取出音讯数据,和音讯上携带的路由键
// 定义名字为 direct_logs 的交换机, 它的类型是 "direct"
String msg = new String(message.getBody());
String key = message.getEnvelope().getRoutingKey();
System.out.println("收到:"+key+"-"+msg);
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {}};
c.basicConsume(queue, true, deliverCallback, cancelCallback);
}
}
主题模式
在上一大节,咱们改良了日志零碎。咱们没有应用只能进行播送的 fanout 交换机,而是应用 Direct 交换机,从而能够选择性接管日志。
尽管应用 Direct 交换机改良了咱们的零碎,但它依然有局限性——它不能基于多个规范进行路由。
在咱们的日志零碎中,咱们可能不仅心愿依据级别订阅日志,还心愿依据收回日志的源订阅日志。
这将给咱们带来很大的灵活性——咱们可能只想接管来自“cron”的要害谬误,但也要接管来自“kern”的所有日志。
要在日志零碎中实现这一点,咱们须要理解更简单的 Topic 交换机。
主题交换机 Topic exchange
发送到 Topic 交换机的音讯, 它的的 routingKey, 必须是由点分隔的多个单词。单词能够是任何货色,但通常是与音讯相干的一些个性。几个无效的 routingKey 示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。routingKey 能够有任意多的单词,最多 255 个字节。
bindingKey 也必须采纳雷同的模式。Topic 交换机的逻辑与直连交换机相似——应用特定 routingKey 发送的音讯将被传递到所有应用匹配 bindingKey 绑定的队列。bindingKey 有两个重要的非凡点:
*
能够通配单个单词。#
能够通配零个或多个单词。
用一个例子来解释这个问题是最简略的
在本例中,咱们将发送形容动物的音讯。这些音讯将应用由三个单词 (两个点) 组成的 routingKey 发送。routingKey 中的第一个单词示意速度,第二个是色彩, 第三个是物种:“< 速度 >.< 色彩 >.< 物种 >
”。
咱们创立三个绑定:Q1 与 bindingKey“*.orange.*
”绑定。和 Q2 是“*.*.rabbit
”和“lazy.#
”。
这些绑定可概括为:
- Q1 对所有橙色的动物感兴趣。
- Q2 想接管对于兔子和慢速动物的所有音讯。
将 routingKey 设置为 ”quick.orange.rabbit
“ 的音讯将被发送到两个队列。音讯 “lazy.orange.elephant
“也发送到它们两个。另外”quick.orange.fox
“只会发到第一个队列,”lazy.brown.fox
“只发给第二个。”lazy.pink.rabbit
“将只被传递到第二个队列一次,即便它匹配两个绑定。”quick.brown.fox
“ 不匹配任何绑定,因而将被抛弃。
如果咱们违反约定,发送一个或四个单词的信息,比方 ”orange
“或”quick.orange.male.rabbit
“,会产生什么? 这些音讯将不匹配任何绑定,并将失落。
另外,”lazy.orange.male.rabbit
“,即便它有四个单词,也将匹配最初一个绑定,并将被传递到第二个队列。
实现的代码
生产者
package m5_topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {
// 连贯
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
//f.setPort(5672);// 默认端口能够省略
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
// 定义交换机
c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
// 发送音讯,须要携带路由键
while (true){System.out.print("输出音讯:");
String msg = new Scanner(System.in).nextLine();
System.out.print("输出路由键:");
String key = new Scanner(System.in).nextLine();
// 第二个参数是音讯上携带的路由键
c.basicPublish("topic_logs",key,null, msg.getBytes());
}
}
}
package m5_topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {
// 连贯
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
//f.setPort(5672);// 默认端口能够省略
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
// 定义交换机
c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
// 发送音讯,须要携带路由键
while (true){System.out.print("输出音讯:");
String msg = new Scanner(System.in).nextLine();
System.out.print("输出路由键:");
String key = new Scanner(System.in).nextLine();
// 第二个参数是音讯上携带的路由键
c.basicPublish("topic_logs",key,null, msg.getBytes());
}
}
}
消费者
package m5_topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {
// 连贯
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
//f.setPort(5672);// 默认端口能够省略
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
//1. 定义随机队列 2. 定义交换机 3. 绑定,指定绑定的关键词
// 由 rabbitmq 服务器主动命名, 默认参数:false,true,true
String queue = c.queueDeclare().getQueue();
c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
System.out.print("输出绑定键:用空格隔开:");
String s = new Scanner(System.in).nextLine();
String[] a = s.split("\\s+");// 正则表达式 \s+ 示意一个或多个空格
for (String key : a){c.queueBind(queue, "topic_logs", key);
}
// 失常的从队列生产数据
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
// 取出音讯数据,和音讯上携带的路由键
String msg = new String(message.getBody());
String key = message.getEnvelope().getRoutingKey();
System.out.println("收到:"+key+"-"+msg);
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {}};
c.basicConsume(queue, true, deliverCallback, cancelCallback);
}
}
RPC 模式
如果咱们须要在近程电脑上运行一个办法,并且还要期待一个返回后果该怎么办?这和后面的例子不太一样, 这种模式咱们通常称为近程过程调用, 即 RPC.
在本节中,咱们将会学习应用 RabbitMQ 去搭建一个 RPC 零碎:一个客户端和一个能够降级 (扩大) 的 RPC 服务器。为了模仿一个耗时工作,咱们将创立一个返回斐波那契数列的虚构的 RPC 服务。
客户端
在客户端定义一个 RPCClient 类, 并定义一个 call()办法, 这个办法发送一个 RPC 申请, 并期待接管响应后果
RPCClient client = new RPCClient();
String result = client.call("4");
System.out.println("第四个斐波那契数是:" + result);
回调队列 Callback Queue
应用 RabbitMQ 去实现 RPC 很容易。一个客户端发送申请信息,并失去一个服务器端回复的响应信息。为了失去响应信息,咱们须要在申请的时候发送一个“回调”队列地址。咱们能够应用默认队列。上面是示例代码:
// 定义回调队列,
// 主动生成对列名, 非长久, 独占, 主动删除
callbackQueueName = ch.queueDeclare().getQueue();
// 用来设置回调队列的参数对象
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
// 发送调用音讯
ch.basicPublish("","rpc_queue", props, message.getBytes());
音讯属性 Message Properties
AMQP 0-9- 1 协定定义了音讯的 14 个属性。大部分属性很少应用,上面是比拟罕用的 4 个:deliveryMode
: 将音讯标记为长久化 (值为 2) 或非长久化(任何其余值)。contentType
: 用于形容 mime 类型。例如,对于常常应用的 JSON 格局,将此属性设置为:application/json
。replyTo
: 通常用于指定回调队列。correlationId
: 将 RPC 响应与申请关联起来十分有用。
关联 id (correlationId):
在下面的代码中,咱们会为每个 RPC 申请创立一个回调队列。这是十分低效的,这里还有一个更好的办法: 让咱们为每个客户端创立一个回调队列。
这就提出了一个新的问题,在队列中失去一个响应时,咱们不分明这个响应所对应的是哪一条申请。这时候就须要应用关联 id(correlationId)。咱们将为每一条申请设置惟一的的 id 值。稍后,当咱们在回调队列里收到一条音讯的时候,咱们将查看它的 id 属性,这样咱们就能够匹配对应的申请和响应。如果咱们发现了一个未知的 id 值,咱们能够平安的抛弃这条音讯,因为它不属于咱们的申请。
小结
RPC 的工作形式是这样的:
- 对于 RPC 申请,客户端发送一条带有两个属性的音讯:replyTo, 设置为仅为申请创立的匿名独占队列, 和 correlationId, 设置为每个申请的惟一 id 值。
- 申请被发送到 rpc_queue 队列。
- RPC 工作过程 (即: 服务器) 在队列上期待申请。当一个申请呈现时,它执行工作, 并应用 replyTo 字段中的队列将后果发回客户机。
- 客户机在回应音讯队列上期待数据。当音讯呈现时,它查看 correlationId 属性。如果匹配申请中的值,则向程序返回该响应数据。
实现的代码
服务器端
package rabbitmq.rpc;
import java.io.IOException;
import java.util.Random;
import java.util.Scanner;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.AMQP.BasicProperties;
public class RPCServer {public static void main(String[] args) throws Exception {ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
Connection c = f.newConnection();
Channel ch = c.createChannel();
/*
* 定义队列 rpc_queue, 将从它接管申请信息
*
* 参数:
* 1. queue, 对列名
* 2. durable, 长久化
* 3. exclusive, 排他
* 4. autoDelete, 主动删除
* 5. arguments, 其余参数属性
*/
ch.queueDeclare("rpc_queue",false,false,false,null);
ch.queuePurge("rpc_queue");// 革除队列中的内容
ch.basicQos(1);// 一次只接管一条音讯
// 收到申请音讯后的回调对象
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {// 解决收到的数据(要求第几个斐波那契数)
String msg = new String(message.getBody(), "UTF-8");
int n = Integer.parseInt(msg);
// 求出第 n 个斐波那契数
int r = fbnq(n);
String response = String.valueOf(r);
// 设置发回响应的 id, 与申请 id 统一, 这样客户端能够把该响应与它的申请进行对应
BasicProperties replyProps = new BasicProperties.Builder()
.correlationId(message.getProperties().getCorrelationId())
.build();
/*
* 发送响应音讯
* 1. 默认交换机
* 2. 由客户端指定的, 用来传递响应音讯的队列名
* 3. 参数(关联 id)
* 4. 发回的响应音讯
*/
ch.basicPublish("",message.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
// 发送确认音讯
ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
};
//
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {}};
// 消费者开始接管音讯, 期待从 rpc_queue 接管申请音讯, 不主动确认
ch.basicConsume("rpc_queue", false, deliverCallback, cancelCallback);
}
protected static int fbnq(int n) {if(n == 1 || n == 2) return 1;
return fbnq(n-1)+fbnq(n-2);
}
}
客户端
package rabbitmq.rpc;
import java.io.IOException;
import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.AMQP.BasicProperties;
public class RPCClient {
Connection con;
Channel ch;
public RPCClient() throws Exception {ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setUsername("admin");
f.setPassword("admin");
con = f.newConnection();
ch = con.createChannel();}
public String call(String msg) throws Exception {
// 主动生成对列名, 非长久, 独占, 主动删除
String replyQueueName = ch.queueDeclare().getQueue();
// 生成关联 id
String corrId = UUID.randomUUID().toString();
// 设置两个参数:
//1. 申请和响应的关联 id
//2. 传递响应数据的 queue
BasicProperties props = new BasicProperties.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
// 向 rpc_queue 队列发送申请数据, 申请第 n 个斐波那契数
ch.basicPublish("","rpc_queue", props, msg.getBytes("UTF-8"));
// 用来保留后果的阻塞汇合, 取数据时, 没有数据会暂停期待
BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
// 接管响应数据的回调对象
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
// 如果响应音讯的关联 id, 与申请的关联 id 雷同, 咱们来解决这个响应数据
if (message.getProperties().getCorrelationId().contentEquals(corrId)) {
// 把收到的响应数据, 放入阻塞汇合
response.offer(new String(message.getBody(), "UTF-8"));
}
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {}};
// 开始从队列接管响应数据
ch.basicConsume(replyQueueName, true, deliverCallback, cancelCallback);
// 返回保留在汇合中的响应数据
return response.take();}
public static void main(String[] args) throws Exception {RPCClient client = new RPCClient();
while (true) {System.out.print("求第几个斐波那契数:");
int n = new Scanner(System.in).nextInt();
String r = client.call(""+n);
System.out.println(r);
}
}
}
virtual host
在 RabbitMQ 中叫做虚构音讯服务器 VirtualHost,每个 VirtualHost 相当于一个绝对独立的 RabbitMQ 服务器,每个 VirtualHost 之间是互相隔离的。exchange、queue、message 不能互通
创立 virtual host: /pd
- 进入虚拟机治理界面
- 增加新的虚拟机’/pd’, 名称必须以 ”/” 结尾
- 查看增加的后果
设置虚拟机的用户拜访权限
点击 /pd 虚拟主机, 设置用户 admin 对它的拜访权限
拼多商城整合 rabbitmq
当用户下订单时, 咱们的业务零碎间接与数据库通信, 把订单保留到数据库中
当零碎流量忽然激增, 大量的订单压力, 会拖慢业务零碎和数据库系统
咱们须要应答流量峰值, 让流量曲线变得平缓, 如下图
订单存储的解耦
为了进行流量削峰, 咱们引入 rabbitmq 音讯队列, 当购物零碎产生订单后, 能够把订单数据发送到音讯队列; 而订单消费者利用从音讯队列接管订单音讯, 并把订单保留到数据库
这样, 当流量激增时, 大量订单会暂存在 rabbitmq 中, 而订单消费者能够从容地从音讯队列缓缓接管订单, 向数据库保留
生产者 - 发送订单
pom.xml 增加依赖
spring 提供了更不便的音讯队列拜访接口, 它对 RabbitMQ 的客户端 API 进行了封装, 应用起来更加不便
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml
增加 RabbitMQ 的连贯信息
spring:
rabbitmq:
host: 192.168.64.140
port: 5672
virtualHost: /pd
username: admin
password: admin
批改主程序 RunPdAPP
在主程序中增加上面的办法创立 Queue 实例
当创立 RabbitMQ 连贯和信道后,Spring 的 RabbitMQ 工具会主动在服务器中创立队列, 代码在 RabbitAdmin.declareQueues()
办法中
@Bean
public Queue getQueue() {Queue q = new Queue("orderQueue", true);
return q;
}
批改 OrderServiceImpl
//RabbitAutoConfiguration 中创立了 AmpqTemplate 实例
@Autowired
AmqpTemplate amqpTemplate;
//saveOrder 原来的数据库拜访代码全副正文, 增加 rabbitmq 音讯发送代码
public String saveOrder(PdOrder pdOrder) throws Exception {String orderId = generateId();
pdOrder.setOrderId(orderId);
amqpTemplate.convertAndSend("orderQueue", pdOrder);
return orderId;
// String orderId = generateId();
// pdOrder.setOrderId(orderId);
//
//
// PdShipping pdShipping = pdShippingMapper.selectByPrimaryKey(pdOrder.getAddId());
// pdOrder.setShippingName(pdShipping.getReceiverName());
// pdOrder.setShippingCode(pdShipping.getReceiverAddress());
// pdOrder.setStatus(1);//
// pdOrder.setPaymentType(1);
// pdOrder.setPostFee(10D);
// pdOrder.setCreateTime(new Date());
//
// double payment = 0;
// List<ItemVO> itemVOs = selectCartItemByUseridAndItemIds(pdOrder.getUserId(), pdOrder.getItemIdList());
// for (ItemVO itemVO : itemVOs) {// PdOrderItem pdOrderItem = new PdOrderItem();
// String id = generateId();
// //String id="2";
// pdOrderItem.setId(id);
// pdOrderItem.setOrderId(orderId);
// pdOrderItem.setItemId("" + itemVO.getPdItem().getId());
// pdOrderItem.setTitle(itemVO.getPdItem().getTitle());
// pdOrderItem.setPrice(itemVO.getPdItem().getPrice());
// pdOrderItem.setNum(itemVO.getPdCartItem().getNum());
//
// payment = payment + itemVO.getPdCartItem().getNum() * itemVO.getPdItem().getPrice();
// pdOrderItemMapper.insert(pdOrderItem);
// }
// pdOrder.setPayment(payment);
// pdOrderMapper.insert(pdOrder);
// return orderId;
}
消费者 - 接管订单, 并保留到数据库
pd-web 我的项目复制为 pd-order-consumer
批改 application.yml
把端口批改成 81
server:
port: 81
spring:
datasource:
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/pd_store?useUnicode=true&characterEncoding=utf8
username: root
password: root
rabbitmq:
host: 192.168.64.140
port: 5672
virtualHost: /pd
username: admin
password: admin
mybatis:
#typeAliasesPackage: cn.tedu.ssm.pojo
mapperLocations: classpath:com.pd.mapper/*.xml
logging:
level:
cn.tedu.ssm.mapper: debug
删除无关代码
pd-order-consumer 我的项目只须要从 RabbitMQ 接管订单数据, 再保留到数据库即可, 所以我的项目中只须要保留这部分代码
- 删除 com.pd.controller 包
- 删除 com.pd.payment.utils 包
- 删除无关的 Service, 只保留 OrderService 和 OrderServiceImpl
新建 OrderConsumer
package com.pd;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.pd.pojo.PdOrder;
import com.pd.service.OrderService;
@Component
public class OrderConsumer {
// 收到订单数据后, 会调用订单的业务代码, 把订单保留到数据库
@Autowired
private OrderService orderService;
// 增加该注解后, 会从指定的 orderQueue 接管音讯,
// 并把数据转为 PdOrder 实例传递到此办法
@RabbitListener(queues="orderQueue")
public void save(PdOrder pdOrder)
{System.out.println("消费者");
System.out.println(pdOrder.toString());
try {orderService.saveOrder(pdOrder);
} catch (Exception e) {e.printStackTrace();
}
}
}
批改 OrderServiceImpl 的 saveOrder() 办法
public String saveOrder(PdOrder pdOrder) throws Exception {// String orderId = generateId();
// pdOrder.setOrderId(orderId);
//
// amqpTemplate.convertAndSend("orderQueue", pdOrder);
// return orderId;
//
//
//
// String orderId = generateId();
// pdOrder.setOrderId(orderId);
// 从 RabbitMQ 接管的订单数据,
// 曾经在上游订单业务中生成过 id, 这里不再从新生成 id
// 间接获取该订单的 id
String orderId = pdOrder.getOrderId();
PdShipping pdShipping = pdShippingMapper.selectByPrimaryKey(pdOrder.getAddId());
pdOrder.setShippingName(pdShipping.getReceiverName());
pdOrder.setShippingCode(pdShipping.getReceiverAddress());
pdOrder.setStatus(1);//
pdOrder.setPaymentType(1);
pdOrder.setPostFee(10D);
pdOrder.setCreateTime(new Date());
double payment = 0;
List<ItemVO> itemVOs = selectCartItemByUseridAndItemIds(pdOrder.getUserId(), pdOrder.getItemIdList());
for (ItemVO itemVO : itemVOs) {PdOrderItem pdOrderItem = new PdOrderItem();
String id = generateId();
//String id="2";
pdOrderItem.setId(id);
pdOrderItem.setOrderId(orderId);
pdOrderItem.setItemId("" + itemVO.getPdItem().getId());
pdOrderItem.setTitle(itemVO.getPdItem().getTitle());
pdOrderItem.setPrice(itemVO.getPdItem().getPrice());
pdOrderItem.setNum(itemVO.getPdCartItem().getNum());
payment = payment + itemVO.getPdCartItem().getNum() * itemVO.getPdItem().getPrice();
pdOrderItemMapper.insert(pdOrderItem);
}
pdOrder.setPayment(payment);
pdOrderMapper.insert(pdOrder);
return orderId;
}
手动确认
application.yml
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
OrderConsumer
package com.pd;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.pd.pojo.PdOrder;
import com.pd.service.OrderService;
import com.rabbitmq.client.Channel;
@Component
public class OrderConsumer {
// 收到订单数据后, 会调用订单的业务代码, 把订单保留到数据库
@Autowired
private OrderService orderService;
// 增加该注解后, 会从指定的 orderQueue 接管音讯,
// 并把数据转为 PdOrder 实例传递到此办法
@RabbitListener(queues="orderQueue")
public void save(PdOrder pdOrder, Channel channel, Message message)
{System.out.println("消费者");
System.out.println(pdOrder.toString());
try {orderService.saveOrder(pdOrder);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {e.printStackTrace();
}
}
}