RabbitMQ 应用场景
服务解耦
假如有这样一个场景, 服务A产生数据, 而服务B,C,D须要这些数据, 那么咱们能够在A服务中间接调用B,C,D服务,把数据传递到上游服务即可
然而,随着咱们的利用规模不断扩大,会有更多的服务须要A的数据,如果有几十甚至几百个上游服务,而且会一直变更,再加上还要思考上游服务出错的状况,那么A服务中调用代码的保护会极为艰难
这是因为服务之间耦合度过于严密
再来思考用RabbitMQ解耦的状况
A服务只须要向音讯服务器发送音讯,而不必思考谁须要这些数据;上游服务如果须要数据,自行从音讯服务器订阅音讯,不再须要数据时则勾销订阅即可
流量削峰
假如咱们有一个利用,平时访问量是每秒300申请,咱们用一台服务器即可轻松应答
而在高峰期,访问量霎时翻了十倍,达到每秒3000次申请,那么单台服务器必定无奈应答,这时咱们能够思考减少到10台服务器,来扩散拜访压力
但如果这种刹时顶峰的状况每天只呈现一次,每次只有半小时,那么咱们10台服务器在少数工夫都只分担每秒几十次申请,这样就有点浪费资源了
这种状况,咱们就能够应用RabbitMQ来进行流量削峰,顶峰状况下,霎时呈现的大量申请数据,先发送到音讯队列服务器,排队期待被解决,而咱们的利用,能够缓缓的从音讯队列接管申请数据进行解决,这样把数据处理工夫拉长,以加重刹时压力
这是音讯队列服务器十分典型的利用场景
异步调用
思考定外卖领取胜利的状况
领取后要发送领取胜利的告诉,再寻找外卖小哥来进行配送,而寻找外卖小哥的过程十分耗时,尤其是高峰期,可能要期待几十秒甚至更长
这样就造成整条调用链路响应十分迟缓
而如果咱们引入RabbitMQ音讯队列,订单数据能够发送到音讯队列服务器,那么调用链路也就能够到此结束,订单零碎则能够立刻失去响应,整条链路的响应工夫只有200毫秒左右
寻找外卖小哥的利用能够以异步的形式从音讯队列接管订单音讯,再执行耗时的寻找操作
rabbitmq 基本概念
RabbitMQ是一种消息中间件,用于解决来自客户端的异步音讯。服务端将要发送的音讯放入到队列池中。接收端能够依据RabbitMQ配置的转发机制接管服务端发来的音讯。RabbitMQ根据指定的转发规定进行音讯的转发、缓冲和长久化操作,次要用在多服务器间或单服务器的子系统间进行通信,是分布式系统规范的配置。
Exchange
承受生产者发送的音讯,并依据Binding规定将音讯路由给服务器中的队列。ExchangeType决定了Exchange路由音讯的行为。在RabbitMQ中,ExchangeType罕用的有direct、Fanout和Topic三种。
Message Queue
音讯队列。咱们发送给RabbitMQ的音讯最初都会达到各种queue,并且存储在其中(如果路由找不到相应的queue则数据会失落),期待消费者来取。
Binding Key
它示意的是Exchange与Message Queue是通过binding key进行分割的,这个关系是固定。
Routing Key
生产者在将音讯发送给Exchange的时候,个别会指定一个routing key,来指定这个音讯的路由规定。这个routing key须要与Exchange Type及binding key联结应用能力生,咱们的生产者只须要通过指定routing key来决定音讯流向哪里。
rabbitmq装置
离线装置
下载离线安装包文件
- https://download.csdn.net/download/weixin_38305440/12265906
上传离线安装包
rabbitmq-install
目录上传到/root
切换到rabbitmq-install
目录
cd rabbitmq-install
装置
rpm -ivh *.rpm
Yum在线装置
以下内容来自 RabbitMQ 官网手册
rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
# centos7 用这个
cat <<EOF > /etc/yum.repos.d/rabbitmq.repo
[bintray-rabbitmq-server]
name=bintray-rabbitmq-rpm
baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/7/
gpgcheck=0
repo_gpgcheck=0
enabled=1
EOF
# centos6 用这个
cat <<EOF > /etc/yum.repos.d/rabbitmq.repo
[bintray-rabbitmq-server]
name=bintray-rabbitmq-rpm
baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/6/
gpgcheck=0
repo_gpgcheck=0
enabled=1
EOF
yum makecache
yum install socat
wget https://github.com/rabbitmq/erlang-rpm/releases/download/v21.3.8.12/erlang-21.3.8.12-1.el7.x86_64.rpm
rpm -ivh erlang-21.3.8.12-1.el7.x86_64.rpm --force --nodeps
yum install rabbitmq-server
启动rabbitmq服务器
# 设置服务,开机主动启动
systemctl enable rabbitmq-server
# 启动服务
systemctl start rabbitmq-server
rabbitmq治理界面
启用治理界面
# 开启治理界面插件
rabbitmq-plugins enable rabbitmq_management
# 防火墙关上 15672 治理端口
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --reload
重启RabbitMQ服务
systemctl restart rabbitmq-server
拜访
拜访服务器的15672
端口,例如:
http://192.168.64.140:15672
增加用户
# 增加用户
rabbitmqctl add_user admin admin
# 新用户设置用户为超级管理员
rabbitmqctl set_user_tags admin administrator
设置拜访权限
- 用户治理参考
https://www.cnblogs.com/AloneSword/p/4200051.html
凋谢客户端连贯端口
# 关上客户端连贯端口
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --reload
次要端口介绍
- 4369 – erlang发现口
- 5672 – client端通信口
- 15672 – 治理界面ui端口
- 25672 – server间外部通信口
rabbitmq六种工作模式
简略模式
RabbitMQ是一个消息中间件,你能够设想它是一个邮局。当你把函件放到邮箱里时,可能确信邮递员会正确地递送你的函件。RabbitMq就是一个邮箱、一个邮局和一个邮递员。
- 发送音讯的程序是生产者
- 队列就代表一个邮箱。尽管音讯会流经RbbitMQ和你的应用程序,但音讯只能被存储在队列里。队列存储空间只受服务器内存和磁盘限度,它实质上是一个大的音讯缓冲区。多个生产者能够向同一个队列发送音讯,多个消费者也能够从同一个队列接管音讯.
- 消费者期待从队列接管音讯
pom.xml
增加 slf4j 依赖, 和 rabbitmq amqp 依赖
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tedu</groupId>
<artifactId>rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.8.0-alpha2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.8.0-alpha2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
生产者发送音讯
package m1;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws Exception {
//创立连贯工厂,并设置连贯信息
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);//可选,5672是默认端口
f.setUsername("admin");
f.setPassword("admin");
/*
* 与rabbitmq服务器建设连贯,
* rabbitmq服务器端应用的是nio,会复用tcp连贯,
* 并开拓多个信道与客户端通信
* 以加重服务器端建设连贯的开销
*/
Connection c = f.newConnection();
//建设信道
Channel ch = c.createChannel();
/*
* 申明队列,会在rabbitmq中创立一个队列
* 如果曾经创立过该队列,就不能再应用其余参数来创立
*
* 参数含意:
* -queue: 队列名称
* -durable: 队列长久化,true示意RabbitMQ重启后队列仍存在
* -exclusive: 排他,true示意限度仅以后连贯可用
* -autoDelete: 当最初一个消费者断开后,是否删除队列
* -arguments: 其余参数
*/
ch.queueDeclare("helloworld", false,false,false,null);
/*
* 公布音讯
* 这里把音讯向默认交换机发送.
* 默认交换机隐含与所有队列绑定,routing key即为队列名称
*
* 参数含意:
* -exchange: 交换机名称,空串示意默认交换机"(AMQP default)",不能用 null
* -routingKey: 对于默认交换机,路由键就是指标队列名称
* -props: 其余参数,例如头信息
* -body: 音讯内容byte[]数组
*/
ch.basicPublish("", "helloworld", null, "Hello world!".getBytes());
System.out.println("音讯已发送");
c.close();
}
}
消费者接管音讯
package m1;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws Exception {
//连贯工厂
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setUsername("admin");
f.setPassword("admin");
//建设连贯
Connection c = f.newConnection();
//建设信道
Channel ch = c.createChannel();
//申明队列,如果该队列曾经创立过,则不会反复创立
ch.queueDeclare("helloworld",false,false,false,null);
System.out.println("期待接收数据");
//收到音讯后用来解决音讯的回调对象
DeliverCallback callback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("收到: "+msg);
}
};
//消费者勾销时的回调对象
CancelCallback cancel = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
ch.basicConsume("helloworld", true, callback, cancel);
}
}
工作模式
工作队列(即工作队列)背地的次要思维是防止立刻执行资源密集型工作,并且必须期待它实现。相同,咱们将工作安顿在稍后实现。
咱们将工作封装为音讯并将其发送到队列。后盾运行的工作过程将获取工作并最终执行工作。当运行多个消费者时,工作将在它们之间散发。
应用工作队列的一个长处是可能轻松地并行工作。如果咱们正在积压工作工作,咱们能够增加更多工作过程,这样就能够轻松扩大。
生产者发送音讯
这里模仿耗时工作,发送的音讯中,每个点使工作过程暂停一秒钟,例如”Hello…”将破费3秒钟来解决
package m2_work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.util.Scanner;
public class Producer {
public static void main(String[] args) throws Exception {
//连贯
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
//f.setPort(5672);//默认端口能够省略
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
//定义队列
//队列如果在服务器端曾经存在,属性不可变(第二个参数改为true)
c.queueDeclare("task_queue", false, false, false, null);
while (true){
System.out.print("输出音讯:");
String msg = new Scanner(System.in).nextLine();
if ("exit".equals(msg)) {
break;
}
c.basicPublish("", "helloworld", null, msg.getBytes());
}
}
}
消费者接管音讯
package m2_work;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) throws Exception {
//连贯
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
//f.setPort(5672);//默认端口能够省略
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
//定义队列
c.queueDeclare("helloworld", false, false, false, null);
// 解决音讯的回调对象
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
//遍历音讯字符串,每个点字符都暂停一秒
String msg = new String(message.getBody());
System.out.println("收到:"+msg);
for (int i = 0 ; i<msg.length();i++){
if (msg.charAt(i)=='.'){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
System.out.println("音讯解决完结");
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
//生产数据
//第二个参数:true - 主动确认 false - 手动确认
c.basicConsume("task_queue", true, deliverCallback, cancelCallback);
}
}
运行测试
运行:
- 一个生产者
- 两个消费者
生产者发送多条音讯,
如: 1,2,3,4,5. 两个消费者别离收到:
- 消费者一: 1,3,5
- 消费者二: 2,4
rabbitmq在所有消费者中轮询散发音讯,把音讯平均地发送给所有消费者
音讯确认
一个消费者接管音讯后,在音讯没有齐全解决完时就挂掉了,那么这时会产生什么呢?
就当初的代码来说,rabbitmq把音讯发送给消费者后,会立刻删除音讯,那么消费者挂掉后,它没来得及解决的音讯就会失落
如果生产者发送以下音讯:
1…
2
3
4
5
两个消费者别离收到:
* 消费者一: 1…, 3, 5
* 消费者二: 2, 4
当消费者一收到所有音讯后,要话费7秒工夫来解决第一条音讯,这期间如果敞开该消费者,那么1未解决实现,3,5则没有被解决
咱们并不想失落任何音讯, 如果一个消费者挂掉,咱们想把它的工作音讯派发给其余消费者
为了确保音讯不会失落,rabbitmq反对音讯确认(回执)。当一个音讯被消费者接管到并且执行实现后,消费者会发送一个ack (acknowledgment) 给rabbitmq服务器, 通知他我曾经执行实现了,你能够把这条音讯删除了。
如果一个消费者没有返回音讯确认就挂掉了(信道敞开,连贯敞开或者TCP链接失落),rabbitmq就会明确,这个音讯没有被解决实现,rebbitmq就会把这条音讯从新放入队列,如果在这时有其余的消费者在线,那么rabbitmq就会迅速的把这条消息传递给其余的消费者,这样就确保了没有音讯会失落。
这里不存在音讯超时, rabbitmq只在消费者挂掉时从新分派音讯, 即便消费者花十分久的工夫来解决音讯也能够
手动音讯确认默认是开启的,后面的例子咱们通过autoAck=ture把它敞开了。咱们当初要把它设置为false,而后工作过程解决完动向工作时,发送一个音讯确认(回执)。
package m2_work;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) throws Exception {
//连贯
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
//f.setPort(5672);//默认端口能够省略
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
//定义队列
c.queueDeclare("task_queue", false, false, false, null);
// 解决音讯的回调对象
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
//遍历音讯字符串,每个点字符都暂停一秒
String msg = new String(message.getBody());
System.out.println("收到:"+msg);
for (int i = 0 ; i<msg.length();i++){
if (msg.charAt(i)=='.'){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
//发送回执(参数1:回值(long类型的编码)参数2:是否一次确认多条(boolean,个别都是false))
c.basicAck(message.getEnvelope().getDeliveryTag(),false);//封装的回值对象(envelope)的标签(deliveryTag)
System.out.println("音讯解决完结");
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
//生产数据
//第二个参数:true - 主动确认 false - 手动确认
c.basicConsume("task_queue", false, deliverCallback, cancelCallback);
}
}
应用以上代码,就算杀掉一个正在解决音讯的工作过程也不会失落任何音讯,工作过程挂掉之后,没有确认的音讯就会被主动从新传递。
遗记确认(ack)是一个常见的谬误, 这样结果是很重大的, 因为未确认的音讯不会被开释, rabbitmq会吃掉越来越多的内存
能够应用上面命令打印工作队列中未确认音讯的数量
rabbitmqctl list_queues name >messages_readymessages_unacknowledged
当解决音讯时异常中断, 能够抉择让音讯重回队列从新发送.
nack
操作能够是音讯重回队列, 能够应用 basicNack() 办法:// requeue为true时重回队列, 反之音讯被抛弃或被发送到死信队列 c.basicNack(tag, multiple, requeue)
正当地散发
rabbitmq会一次把多个音讯分发给消费者, 这样可能造成有的消费者十分忙碌, 而其它消费者闲暇. 而rabbitmq对此无所不知, 依然会平均的散发音讯
咱们能够应用 basicQos(1)
办法, 这通知rabbitmq一次只向消费者发送一条音讯, 在返回确认回执前, 不要向消费者发送新音讯. 而是把音讯发给下一个闲暇的消费者
音讯长久化
当rabbitmq敞开时, 咱们队列中的音讯依然会失落, 除非明确要求它不要失落数据
要求rabbitmq不失落数据要做如下两点: 把队列和音讯都设置为可长久化(durable)
队列设置为可长久化, 能够在定义队列时指定参数durable为true
//第二个参数是长久化参数durable
ch.queueDeclare("helloworld", true, false, false, null);
因为之前咱们曾经定义过队列”hello”是不可长久化的, 对已存在的队列, rabbitmq不容许对其定义不同的参数, 否则会出错, 所以这里咱们定义一个不同名字的队列”task_queue”
//定义一个新的队列,名为 task_queue
//第二个参数是长久化参数 durable
ch.queueDeclare("task_queue", true, false, false, null);
生产者和消费者代码都要批改
这样即便rabbitmq重新启动, 队列也不会失落. 当初咱们再设置队列中音讯的长久化, 应用MessageProperties.PERSISTENT_TEXT_PLAIN
参数
//第三个参数设置音讯长久化
ch.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
msg.getBytes());
上面是”工作模式”最终实现的生产者和消费者代码
生产者代码
package m2_work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.util.Scanner;
public class Producer {
public static void main(String[] args) throws Exception {
//连贯
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
//f.setPort(5672);//默认端口能够省略
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
//定义队列
//队列如果在服务器端曾经存在,属性不可变(第二个参数改为true)
c.queueDeclare("task_queue", true, false, false, null);
//发送音讯
//循环输出音讯发送
/*
输出音讯:ger.err....er.....er.....e....r......
消费者收到的音讯,遍历字符串,每个点字符都暂停一秒,模仿耗时音讯
*/
while (true){
System.out.print("输出音讯:");
String msg = new Scanner(System.in).nextLine();
c.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());//第三个参数代表,长久的纯文本文件
}
}
}
消费者代码
package m2_work;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) throws Exception {
//连贯
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
//f.setPort(5672);//默认端口能够省略
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
//定义队列
c.queueDeclare("task_queue", true, false, false, null);
// 解决音讯的回调对象
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
//遍历音讯字符串,每个点字符都暂停一秒
String msg = new String(message.getBody());
System.out.println("收到:"+msg);
for (int i = 0 ; i<msg.length();i++){
if (msg.charAt(i)=='.'){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
//发送回执(参数1:回值(long类型的编码)参数2:是否一次确认多条(boolean,个别都是false))
c.basicAck(message.getEnvelope().getDeliveryTag(),false);//封装的回值对象(envelope)的标签(deliveryTag)
System.out.println("音讯解决完结");
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
//每次只承受解决一条音讯,解决实现之前不承受下一条
//必须在手动 ACK 模式下才无效
c.basicQos(1);
//生产数据
//第二个参数:true - 主动确认 false - 手动确认
c.basicConsume("task_queue", false, deliverCallback, cancelCallback);
}
}
公布订阅模式
在后面的例子中,咱们工作音讯只交付给一个工作过程。在这部分,咱们将做一些齐全不同的事件——咱们将向多个消费者传递同一条音讯。这种模式称为“公布/订阅”。
为了阐明该模式,咱们将构建一个简略的日志零碎。它将由两个程序组成——第一个程序将收回日志音讯,第二个程序接管它们。
在咱们的日志零碎中,接管程序的每个运行正本都将取得音讯。这样,咱们就能够运行一个消费者并将日志保留到磁盘; 同时咱们能够运行另一个消费者在屏幕上打印日志。
最终, 音讯会被播送到所有音讯接受者
Exchanges 交换机
RabbitMQ消息传递模型的核心思想是,生产者永远不会将任何音讯间接发送到队列。实际上,通常生产者甚至不晓得音讯是否会被传递到任何队列。
相同,生产者只能向交换机(Exchange)发送音讯。交换机是一个非常简单的货色。一边接管来自生产者的音讯,另一边将音讯推送到队列。交换器必须确切地晓得如何解决它接管到的音讯。它应该被增加到一个特定的队列中吗?它应该增加到多个队列中吗?或者它应该被抛弃。这些规定由exchange的类型定义。
有几种可用的替换类型:direct、topic、header和fanout。咱们将关注最初一个——fanout。让咱们创立一个这种类型的交换机,并称之为 logs: ch.exchangeDeclare("logs", "fanout");
fanout交换机非常简单。它只是将接管到的所有音讯播送给它所晓得的所有队列。这正是咱们的日志零碎所须要的。
咱们后面应用的队列具备特定的名称(还记得hello和task_queue吗?)可能为队列命名对咱们来说至关重要——咱们须要将工作过程指向同一个队列,在生产者和消费者之间共享队列。
但日志记录案例不是这种状况。咱们想要接管所有的日志音讯,而不仅仅是其中的一部分。咱们还只对以后的最新消息感兴趣,而不是旧音讯。
要解决这个问题,咱们须要两件事。首先,每当咱们连贯到Rabbitmq时,咱们须要一个新的空队列。为此,咱们能够创立一个具备随机名称的队列,或者,更好的办法是让服务器为咱们抉择一个随机队列名称。其次,一旦断开与使用者的连贯,队列就会主动删除。在Java客户端中,当咱们不向queueDeclare()提供任何参数时,会创立一个具备生成名称的、非长久的、独占的、主动删除队列
//主动生成队列名
//非长久,独占,主动删除
String queueName = ch.queueDeclare().getQueue();
绑定 Bindings
咱们曾经创立了一个fanout交换机和一个队列。当初咱们须要通知exchange向指定队列发送音讯。exchange和队列之间的关系称为绑定。
//指定的队列,与指定的交换机关联起来
//成为绑定 -- binding
//第三个参数时 routingKey, 因为是fanout交换机, 这里疏忽 routingKey
ch.queueBind(queueName, "logs", "");
当初, logs交换机将会向咱们指定的队列增加音讯
列出绑定关系:
rabbitmqctl list_bindings
实现的代码
生产者
生产者收回日志音讯,看起来与前一教程没有太大不同。最重要的更改是,咱们当初心愿将音讯公布到logs交换机,而不是无名的日志交换机。咱们须要在发送时提供一个routingKey,然而对于fanout交换机类型,该值会被疏忽。
package m3_pub_sub;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//连贯
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
//f.setPort(5672);//默认端口能够省略
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
//定义交换机,服务器如果曾经存在这个交换机,不会反复创立(名称,类型)
//c.exchangeDeclare("logs", "fanout");//两种写法都能够
c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
//向交换机发送音讯
while (true){
System.out.print("输出音讯:");
String msg = new Scanner(System.in).nextLine();
//参数1:交换机名称 参数2:抉择队列,对fanout交换机有效
c.basicPublish("logs", "", null, msg.getBytes());
}
}
}
消费者
如果还没有队列绑定到交换器,音讯就会失落,但这对咱们来说没有问题;如果还没有消费者在听,咱们能够平安地抛弃这些信息。
ReceiveLogs.java代码:
package m3_pub_sub;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//连贯
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
//f.setPort(5672);//默认端口能够省略
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
//1.定义随机队列 2.定义交换机 3.绑定
String queue = UUID.randomUUID().toString();
c.queueDeclare(queue, false, true, true, null);//非长久,独占,主动删除
c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
//交换机与队列进行绑定 对于fanout交换机来说,参数三有效
c.queueBind(queue, "logs", "");
//失常从队列接管音讯
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
String msg= new String(message.getBody());
System.out.println("收到: " + msg);
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
c.basicConsume(queue, true, deliverCallback, cancelCallback);
}
}
路由模式
在上一大节,咱们构建了一个简略的日志零碎。咱们可能向多个接收者播送日志音讯。
在这一节,咱们将向其增加一个个性—咱们将只订阅所有音讯中的一部分。例如,咱们只接管要害谬误音讯并保留到日志文件(以节俭磁盘空间),同时依然可能在管制台上打印所有日志音讯。
绑定 Bindings
在上一节,咱们曾经创立了队列与交换机的绑定。应用上面这样的代码:
ch.queueBind(queueName, "logs", "");
绑定是交换机和队列之间的关系。这能够简略地了解为:队列对来自此替换的音讯感兴趣。
绑定能够应用额定的routingKey参数。为了防止与basic_publish参数混同,咱们将其称为bindingKey。这是咱们如何创立一个键绑定:
ch.queueBind(queueName, EXCHANGE_NAME, "black");
bindingKey的含意取决于交换机类型。咱们后面应用的fanout交换机齐全疏忽它。
直连交换机 Direct exchange
上一节中的日志零碎向所有消费者播送所有音讯。咱们心愿扩大它,容许依据音讯的严重性过滤音讯。例如,咱们心愿将日志音讯写入磁盘的程序只接管要害error,而不是在warning或info日志音讯上节约磁盘空间。
后面咱们应用的是fanout交换机,这并没有给咱们太多的灵活性——它只能进行简略的播送。
咱们将用直连交换机(Direct exchange)代替。它背地的路由算法很简略——消息传递到bindingKey与routingKey齐全匹配的队列。为了阐明这一点,请思考以下设置
其中咱们能够看到直连交换机X
,它绑定了两个队列。第一个队列用绑定键orange
绑定,第二个队列有两个绑定,一个绑定black
,另一个绑定键green
。
这样设置,应用路由键orange
公布到交换器的音讯将被路由到队列Q1。带有black
或green
路由键的音讯将转到Q2
。而所有其余音讯都将被抛弃。
多重绑定 Multiple bindings
应用雷同的bindingKey绑定多个队列是齐全容许的。如图所示,能够应用binding key black将X与Q1和Q2绑定。在这种状况下,直连交换机的行为相似于fanout,并将音讯播送给所有匹配的队列。一条路由键为black的音讯将同时发送到Q1和Q2。
发送日志
咱们将在日志零碎中应用这个模型。咱们把音讯发送到一个Direct交换机,而不是fanout。咱们将提供日志级别作为routingKey。这样,接管程序将可能抉择它心愿接管的级别。让咱们首先来看收回日志。
和后面一样,咱们首先须要创立一个exchange:
//参数1: 交换机名
//参数2: 交换机类型
ch.exchangeDeclare("direct_logs", "direct");
接着来看发送音讯的代码
//参数1: 交换机名
//参数2: routingKey, 路由键,这里咱们用日志级别,如"error","info","warning"
//参数3: 其余配置属性
//参数4: 公布的音讯数据
ch.basicPublish("direct_logs", "error", null, message.getBytes());
订阅
接管音讯的工作原理与后面章节一样,但有一个例外——咱们将为感兴趣的每个日志级别创立一个新的绑定, 示例代码如下:
ch.queueBind(queueName, "logs", "info");
ch.queueBind(queueName, "logs", "warning");
残缺的代码
生产者
package m4_routing;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//连贯
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
//f.setPort(5672);//默认端口能够省略
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
//定义交换机
//参数1: 交换机名
//参数2: 交换机类型
c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
//发送音讯,须要携带路由键
while (true){
System.out.print("输出音讯:");
String msg = new Scanner(System.in).nextLine();
System.out.print("输出路由键:");
String key = new Scanner(System.in).nextLine();
//第二个参数是音讯上携带的路由键
//参数1: 交换机名
//参数2: routingKey, 路由键,这里咱们用日志级别,如"error","info","warning"
//参数3: 其余配置属性
//参数4: 公布的音讯数据
c.basicPublish("direct_logs",key,null, msg.getBytes());
}
}
}
消费者
package m4_routing;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//连贯
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
//f.setPort(5672);//默认端口能够省略
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
//1.定义随机队列 2.定义交换机 3.绑定,指定绑定的关键词
//由 rabbitmq服务器主动命名,默认参数:false,true,true
String queue = c.queueDeclare().getQueue();
c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
System.out.print("输出绑定键:用空格隔开:");
String s = new Scanner(System.in).nextLine();
String[] a = s.split("\\s+");//正则表达式 \s+ 示意一个或多个空格
for (String key : a){
c.queueBind(queue, "direct_logs", key);
}
//失常的从队列生产数据
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
//取出音讯数据,和音讯上携带的路由键
//定义名字为 direct_logs 的交换机, 它的类型是 "direct"
String msg = new String(message.getBody());
String key = message.getEnvelope().getRoutingKey();
System.out.println("收到:"+key+"-"+msg);
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
c.basicConsume(queue, true, deliverCallback, cancelCallback);
}
}
主题模式
在上一大节,咱们改良了日志零碎。咱们没有应用只能进行播送的fanout交换机,而是应用Direct交换机,从而能够选择性接管日志。
尽管应用Direct交换机改良了咱们的零碎,但它依然有局限性——它不能基于多个规范进行路由。
在咱们的日志零碎中,咱们可能不仅心愿依据级别订阅日志,还心愿依据收回日志的源订阅日志。
这将给咱们带来很大的灵活性——咱们可能只想接管来自“cron”的要害谬误,但也要接管来自“kern”的所有日志。
要在日志零碎中实现这一点,咱们须要理解更简单的Topic交换机。
主题交换机 Topic exchange
发送到Topic交换机的音讯,它的的routingKey,必须是由点分隔的多个单词。单词能够是任何货色,但通常是与音讯相干的一些个性。几个无效的routingKey示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。routingKey能够有任意多的单词,最多255个字节。
bindingKey也必须采纳雷同的模式。Topic交换机的逻辑与直连交换机相似——应用特定routingKey发送的音讯将被传递到所有应用匹配bindingKey绑定的队列。bindingKey有两个重要的非凡点:
*
能够通配单个单词。#
能够通配零个或多个单词。
用一个例子来解释这个问题是最简略的
在本例中,咱们将发送形容动物的音讯。这些音讯将应用由三个单词(两个点)组成的routingKey发送。routingKey中的第一个单词示意速度,第二个是色彩,第三个是物种:“<速度>.<色彩>.<物种>
”。
咱们创立三个绑定:Q1与bindingKey “*.orange.*
” 绑定。和Q2是 “*.*.rabbit
” 和 “lazy.#
” 。
这些绑定可概括为:
- Q1对所有橙色的动物感兴趣。
- Q2想接管对于兔子和慢速动物的所有音讯。
将routingKey设置为”quick.orange.rabbit
“的音讯将被发送到两个队列。音讯 “lazy.orange.elephant
“也发送到它们两个。另外”quick.orange.fox
“只会发到第一个队列,”lazy.brown.fox
“只发给第二个。”lazy.pink.rabbit
“将只被传递到第二个队列一次,即便它匹配两个绑定。”quick.brown.fox
“不匹配任何绑定,因而将被抛弃。
如果咱们违反约定,发送一个或四个单词的信息,比方”orange
“或”quick.orange.male.rabbit
“,会产生什么?这些音讯将不匹配任何绑定,并将失落。
另外,”lazy.orange.male.rabbit
“,即便它有四个单词,也将匹配最初一个绑定,并将被传递到第二个队列。
实现的代码
生产者
package m5_topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//连贯
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
//f.setPort(5672);//默认端口能够省略
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
//定义交换机
c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
//发送音讯,须要携带路由键
while (true){
System.out.print("输出音讯:");
String msg = new Scanner(System.in).nextLine();
System.out.print("输出路由键:");
String key = new Scanner(System.in).nextLine();
//第二个参数是音讯上携带的路由键
c.basicPublish("topic_logs",key,null, msg.getBytes());
}
}
}
package m5_topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//连贯
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
//f.setPort(5672);//默认端口能够省略
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
//定义交换机
c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
//发送音讯,须要携带路由键
while (true){
System.out.print("输出音讯:");
String msg = new Scanner(System.in).nextLine();
System.out.print("输出路由键:");
String key = new Scanner(System.in).nextLine();
//第二个参数是音讯上携带的路由键
c.basicPublish("topic_logs",key,null, msg.getBytes());
}
}
}
消费者
package m5_topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//连贯
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
//f.setPort(5672);//默认端口能够省略
f.setUsername("admin");
f.setPassword("admin");
Channel c = f.newConnection().createChannel();
//1.定义随机队列 2.定义交换机 3.绑定,指定绑定的关键词
//由 rabbitmq服务器主动命名,默认参数:false,true,true
String queue = c.queueDeclare().getQueue();
c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
System.out.print("输出绑定键:用空格隔开:");
String s = new Scanner(System.in).nextLine();
String[] a = s.split("\\s+");//正则表达式 \s+ 示意一个或多个空格
for (String key : a){
c.queueBind(queue, "topic_logs", key);
}
//失常的从队列生产数据
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
//取出音讯数据,和音讯上携带的路由键
String msg = new String(message.getBody());
String key = message.getEnvelope().getRoutingKey();
System.out.println("收到:"+key+"-"+msg);
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
c.basicConsume(queue, true, deliverCallback, cancelCallback);
}
}
RPC模式
如果咱们须要在近程电脑上运行一个办法,并且还要期待一个返回后果该怎么办?这和后面的例子不太一样, 这种模式咱们通常称为近程过程调用,即RPC.
在本节中,咱们将会学习应用RabbitMQ去搭建一个RPC零碎:一个客户端和一个能够降级(扩大)的RPC服务器。为了模仿一个耗时工作,咱们将创立一个返回斐波那契数列的虚构的RPC服务。
客户端
在客户端定义一个RPCClient类,并定义一个call()办法,这个办法发送一个RPC申请,并期待接管响应后果
RPCClient client = new RPCClient();
String result = client.call("4");
System.out.println( "第四个斐波那契数是: " + result);
回调队列 Callback Queue
应用RabbitMQ去实现RPC很容易。一个客户端发送申请信息,并失去一个服务器端回复的响应信息。为了失去响应信息,咱们须要在申请的时候发送一个“回调”队列地址。咱们能够应用默认队列。上面是示例代码:
//定义回调队列,
//主动生成对列名,非长久,独占,主动删除
callbackQueueName = ch.queueDeclare().getQueue();
//用来设置回调队列的参数对象
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
//发送调用音讯
ch.basicPublish("", "rpc_queue", props, message.getBytes());
音讯属性 Message Properties
AMQP 0-9-1协定定义了音讯的14个属性。大部分属性很少应用,上面是比拟罕用的4个:deliveryMode
:将音讯标记为长久化(值为2)或非长久化(任何其余值)。contentType
:用于形容mime类型。例如,对于常常应用的JSON格局,将此属性设置为:application/json
。replyTo
:通常用于指定回调队列。correlationId
:将RPC响应与申请关联起来十分有用。
关联id (correlationId):
在下面的代码中,咱们会为每个RPC申请创立一个回调队列。 这是十分低效的,这里还有一个更好的办法:让咱们为每个客户端创立一个回调队列。
这就提出了一个新的问题,在队列中失去一个响应时,咱们不分明这个响应所对应的是哪一条申请。这时候就须要应用关联id(correlationId)。咱们将为每一条申请设置惟一的的id值。稍后,当咱们在回调队列里收到一条音讯的时候,咱们将查看它的id属性,这样咱们就能够匹配对应的申请和响应。如果咱们发现了一个未知的id值,咱们能够平安的抛弃这条音讯,因为它不属于咱们的申请。
小结
RPC的工作形式是这样的:
- 对于RPC申请,客户端发送一条带有两个属性的音讯:replyTo,设置为仅为申请创立的匿名独占队列,和correlationId,设置为每个申请的惟一id值。
- 申请被发送到rpc_queue队列。
- RPC工作过程(即:服务器)在队列上期待申请。当一个申请呈现时,它执行工作,并应用replyTo字段中的队列将后果发回客户机。
- 客户机在回应音讯队列上期待数据。当音讯呈现时,它查看correlationId属性。如果匹配申请中的值,则向程序返回该响应数据。
实现的代码
服务器端
package rabbitmq.rpc;
import java.io.IOException;
import java.util.Random;
import java.util.Scanner;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.AMQP.BasicProperties;
public class RPCServer {
public static void main(String[] args) throws Exception {
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
Connection c = f.newConnection();
Channel ch = c.createChannel();
/*
* 定义队列 rpc_queue, 将从它接管申请信息
*
* 参数:
* 1. queue, 对列名
* 2. durable, 长久化
* 3. exclusive, 排他
* 4. autoDelete, 主动删除
* 5. arguments, 其余参数属性
*/
ch.queueDeclare("rpc_queue",false,false,false,null);
ch.queuePurge("rpc_queue");//革除队列中的内容
ch.basicQos(1);//一次只接管一条音讯
//收到申请音讯后的回调对象
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
//解决收到的数据(要求第几个斐波那契数)
String msg = new String(message.getBody(), "UTF-8");
int n = Integer.parseInt(msg);
//求出第n个斐波那契数
int r = fbnq(n);
String response = String.valueOf(r);
//设置发回响应的id, 与申请id统一, 这样客户端能够把该响应与它的申请进行对应
BasicProperties replyProps = new BasicProperties.Builder()
.correlationId(message.getProperties().getCorrelationId())
.build();
/*
* 发送响应音讯
* 1. 默认交换机
* 2. 由客户端指定的,用来传递响应音讯的队列名
* 3. 参数(关联id)
* 4. 发回的响应音讯
*/
ch.basicPublish("",message.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
//发送确认音讯
ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
};
//
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
//消费者开始接管音讯, 期待从 rpc_queue接管申请音讯, 不主动确认
ch.basicConsume("rpc_queue", false, deliverCallback, cancelCallback);
}
protected static int fbnq(int n) {
if(n == 1 || n == 2) return 1;
return fbnq(n-1)+fbnq(n-2);
}
}
客户端
package rabbitmq.rpc;
import java.io.IOException;
import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.AMQP.BasicProperties;
public class RPCClient {
Connection con;
Channel ch;
public RPCClient() throws Exception {
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setUsername("admin");
f.setPassword("admin");
con = f.newConnection();
ch = con.createChannel();
}
public String call(String msg) throws Exception {
//主动生成对列名,非长久,独占,主动删除
String replyQueueName = ch.queueDeclare().getQueue();
//生成关联id
String corrId = UUID.randomUUID().toString();
//设置两个参数:
//1. 申请和响应的关联id
//2. 传递响应数据的queue
BasicProperties props = new BasicProperties.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
//向 rpc_queue 队列发送申请数据, 申请第n个斐波那契数
ch.basicPublish("", "rpc_queue", props, msg.getBytes("UTF-8"));
//用来保留后果的阻塞汇合,取数据时,没有数据会暂停期待
BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
//接管响应数据的回调对象
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
//如果响应音讯的关联id,与申请的关联id雷同,咱们来解决这个响应数据
if (message.getProperties().getCorrelationId().contentEquals(corrId)) {
//把收到的响应数据,放入阻塞汇合
response.offer(new String(message.getBody(), "UTF-8"));
}
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
//开始从队列接管响应数据
ch.basicConsume(replyQueueName, true, deliverCallback, cancelCallback);
//返回保留在汇合中的响应数据
return response.take();
}
public static void main(String[] args) throws Exception {
RPCClient client = new RPCClient();
while (true) {
System.out.print("求第几个斐波那契数:");
int n = new Scanner(System.in).nextInt();
String r = client.call(""+n);
System.out.println(r);
}
}
}
virtual host
在RabbitMQ中叫做虚构音讯服务器VirtualHost,每个VirtualHost相当于一个绝对独立的RabbitMQ服务器,每个VirtualHost之间是互相隔离的。exchange、queue、message不能互通
创立virtual host: /pd
- 进入虚拟机治理界面
- 增加新的虚拟机’/pd’,名称必须以”/”结尾
- 查看增加的后果
设置虚拟机的用户拜访权限
点击 /pd 虚拟主机, 设置用户 admin 对它的拜访权限
拼多商城整合 rabbitmq
当用户下订单时,咱们的业务零碎间接与数据库通信,把订单保留到数据库中
当零碎流量忽然激增,大量的订单压力,会拖慢业务零碎和数据库系统
咱们须要应答流量峰值,让流量曲线变得平缓,如下图
订单存储的解耦
为了进行流量削峰,咱们引入 rabbitmq 音讯队列,当购物零碎产生订单后,能够把订单数据发送到音讯队列;而订单消费者利用从音讯队列接管订单音讯,并把订单保留到数据库
这样,当流量激增时,大量订单会暂存在rabbitmq中,而订单消费者能够从容地从音讯队列缓缓接管订单,向数据库保留
生产者-发送订单
pom.xml 增加依赖
spring提供了更不便的音讯队列拜访接口,它对RabbitMQ的客户端API进行了封装,应用起来更加不便
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml
增加RabbitMQ的连贯信息
spring:
rabbitmq:
host: 192.168.64.140
port: 5672
virtualHost: /pd
username: admin
password: admin
批改主程序 RunPdAPP
在主程序中增加上面的办法创立Queue实例
当创立RabbitMQ连贯和信道后,Spring的RabbitMQ工具会主动在服务器中创立队列,代码在RabbitAdmin.declareQueues()
办法中
@Bean
public Queue getQueue() {
Queue q = new Queue("orderQueue", true);
return q;
}
批改 OrderServiceImpl
//RabbitAutoConfiguration中创立了AmpqTemplate实例
@Autowired
AmqpTemplate amqpTemplate;
//saveOrder原来的数据库拜访代码全副正文,增加rabbitmq音讯发送代码
public String saveOrder(PdOrder pdOrder) throws Exception {
String orderId = generateId();
pdOrder.setOrderId(orderId);
amqpTemplate.convertAndSend("orderQueue", pdOrder);
return orderId;
// String orderId = generateId();
// pdOrder.setOrderId(orderId);
//
//
// PdShipping pdShipping = pdShippingMapper.selectByPrimaryKey(pdOrder.getAddId());
// pdOrder.setShippingName(pdShipping.getReceiverName());
// pdOrder.setShippingCode(pdShipping.getReceiverAddress());
// pdOrder.setStatus(1);//
// pdOrder.setPaymentType(1);
// pdOrder.setPostFee(10D);
// pdOrder.setCreateTime(new Date());
//
// double payment = 0;
// List<ItemVO> itemVOs = selectCartItemByUseridAndItemIds(pdOrder.getUserId(), pdOrder.getItemIdList());
// for (ItemVO itemVO : itemVOs) {
// PdOrderItem pdOrderItem = new PdOrderItem();
// String id = generateId();
// //String id="2";
// pdOrderItem.setId(id);
// pdOrderItem.setOrderId(orderId);
// pdOrderItem.setItemId("" + itemVO.getPdItem().getId());
// pdOrderItem.setTitle(itemVO.getPdItem().getTitle());
// pdOrderItem.setPrice(itemVO.getPdItem().getPrice());
// pdOrderItem.setNum(itemVO.getPdCartItem().getNum());
//
// payment = payment + itemVO.getPdCartItem().getNum() * itemVO.getPdItem().getPrice();
// pdOrderItemMapper.insert(pdOrderItem);
// }
// pdOrder.setPayment(payment);
// pdOrderMapper.insert(pdOrder);
// return orderId;
}
消费者-接管订单,并保留到数据库
pd-web我的项目复制为pd-order-consumer
批改 application.yml
把端口批改成 81
server:
port: 81
spring:
datasource:
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/pd_store?useUnicode=true&characterEncoding=utf8
username: root
password: root
rabbitmq:
host: 192.168.64.140
port: 5672
virtualHost: /pd
username: admin
password: admin
mybatis:
#typeAliasesPackage: cn.tedu.ssm.pojo
mapperLocations: classpath:com.pd.mapper/*.xml
logging:
level:
cn.tedu.ssm.mapper: debug
删除无关代码
pd-order-consumer我的项目只须要从 RabbitMQ 接管订单数据, 再保留到数据库即可, 所以我的项目中只须要保留这部分代码
- 删除 com.pd.controller 包
- 删除 com.pd.payment.utils 包
- 删除无关的 Service,只保留 OrderService 和 OrderServiceImpl
新建 OrderConsumer
package com.pd;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.pd.pojo.PdOrder;
import com.pd.service.OrderService;
@Component
public class OrderConsumer {
//收到订单数据后,会调用订单的业务代码,把订单保留到数据库
@Autowired
private OrderService orderService;
//增加该注解后,会从指定的orderQueue接管音讯,
//并把数据转为 PdOrder 实例传递到此办法
@RabbitListener(queues="orderQueue")
public void save(PdOrder pdOrder)
{
System.out.println("消费者");
System.out.println(pdOrder.toString());
try {
orderService.saveOrder(pdOrder);
} catch (Exception e) {
e.printStackTrace();
}
}
}
批改 OrderServiceImpl 的 saveOrder() 办法
public String saveOrder(PdOrder pdOrder) throws Exception {
// String orderId = generateId();
// pdOrder.setOrderId(orderId);
//
// amqpTemplate.convertAndSend("orderQueue", pdOrder);
// return orderId;
//
//
//
// String orderId = generateId();
// pdOrder.setOrderId(orderId);
//从RabbitMQ接管的订单数据,
//曾经在上游订单业务中生成过id,这里不再从新生成id
//间接获取该订单的id
String orderId = pdOrder.getOrderId();
PdShipping pdShipping = pdShippingMapper.selectByPrimaryKey(pdOrder.getAddId());
pdOrder.setShippingName(pdShipping.getReceiverName());
pdOrder.setShippingCode(pdShipping.getReceiverAddress());
pdOrder.setStatus(1);//
pdOrder.setPaymentType(1);
pdOrder.setPostFee(10D);
pdOrder.setCreateTime(new Date());
double payment = 0;
List<ItemVO> itemVOs = selectCartItemByUseridAndItemIds(pdOrder.getUserId(), pdOrder.getItemIdList());
for (ItemVO itemVO : itemVOs) {
PdOrderItem pdOrderItem = new PdOrderItem();
String id = generateId();
//String id="2";
pdOrderItem.setId(id);
pdOrderItem.setOrderId(orderId);
pdOrderItem.setItemId("" + itemVO.getPdItem().getId());
pdOrderItem.setTitle(itemVO.getPdItem().getTitle());
pdOrderItem.setPrice(itemVO.getPdItem().getPrice());
pdOrderItem.setNum(itemVO.getPdCartItem().getNum());
payment = payment + itemVO.getPdCartItem().getNum() * itemVO.getPdItem().getPrice();
pdOrderItemMapper.insert(pdOrderItem);
}
pdOrder.setPayment(payment);
pdOrderMapper.insert(pdOrder);
return orderId;
}
手动确认
application.yml
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
OrderConsumer
package com.pd;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.pd.pojo.PdOrder;
import com.pd.service.OrderService;
import com.rabbitmq.client.Channel;
@Component
public class OrderConsumer {
//收到订单数据后,会调用订单的业务代码,把订单保留到数据库
@Autowired
private OrderService orderService;
//增加该注解后,会从指定的orderQueue接管音讯,
//并把数据转为 PdOrder 实例传递到此办法
@RabbitListener(queues="orderQueue")
public void save(PdOrder pdOrder, Channel channel, Message message)
{
System.out.println("消费者");
System.out.println(pdOrder.toString());
try {
orderService.saveOrder(pdOrder);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
}
}
}
发表回复