共计 21177 个字符,预计需要花费 53 分钟才能阅读完成。
1、RabbitMQ
-
音讯队列解决了什么问题
- 异步解决
- 利用解耦
- 流量削锋
- 日志解决
运行 rabbitmq 镜像
# docker run --name rabbitmq -tid -p 5672:5672 -p 15672:15672 -p 25672:25672 rabbitmq
批改 rabbitmq 设置
# docker exec -it rabbitmq /bin/bash
新增用户
# rabbitmqctl add_user [user_name] [pwd]
查看用户
# rabbitmqctl list_users
Setting permissions for user “[user_name]” in vhost “/” …
# rabbitmqctl set_permissions -p "/" [user_name] ".*" ".*" ".*"
# rabbitmqctl list_permissions -p /
将 [user_name] 用户设置为 administrator 角色
# rabbitmqctl set_user_tags asdf administrator
删除 guest 用户
# rabbitmqctl delete_user guest
开启 web 界面
# rabbitmq-plugins enable rabbitmq_management
web 拜访
# http://IP:15672
-
Java 操作 RabbitMQ
- simple 简略队列
- work queues 工作队列 偏心散发 轮询散发
- publish/subscribe 公布订阅
- routing 路由抉择 通配符模式
- Topics 主题
- 手动和主动确认音讯
- 队列的长久化和非长久化
- rabbitMQ 的提早队列
依赖
<dependencies> <!-- 引入队列依赖 --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.10</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> </dependency> </dependencies>
2、简略队列
2.2、定义连贯 MQ 的工具
public class connectionUtil {public static Connection getConnection() throws IOException, TimeoutException {
// 定义链接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务地址[运行 rabbitMQ 的地址]
factory.setHost("192.168.168.130");
//AMQ 的端口号
factory.setPort(5672);
//vHost
factory.setVirtualHost("lgz");
factory.setUsername("lgz");
factory.setPassword("pwd123456");
return factory.newConnection();}
}
2.3、生产者发送音讯
// 测试发送信息
public class ProducerSend {
private static final String QUEUE_NAME="test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {System.out.println("cs");
// 获取链接
Connection connection = connectionUtil.getConnection();
// 从链接中获取通道
Channel channel = connection.createChannel();
// 队列申明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String msg="hello simple";
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println("test success");
channel.close();
connection.close();}
}
2.4、消费者获取信息
public class comsumerGain {
private static final String QUE_NAME="test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取链接
Connection connection = connectionUtil.getConnection();
// 创立通道
Channel channel = connection.createChannel();
// 定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取达到的音讯
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);
String msg = new String(body, "utf-8");
System.out.println(msg);
}
};
// 监听队列
channel.basicConsume(QUE_NAME,true,consumer);
}
}
2.5、简略队列的毛病
- 耦合性高:生产者一 一对应消费者
3、工作队列
3.1、工作队列模型
3.2、生产者演示
public class WorkQueue {
public static final String QUEUE_NAME="work_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 创立连贯
Connection connection = connectionUtil.getConnection();
// 获取 channel 链接
Channel channel = connection.createChannel();
//
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
for (int i = 0; i <50 ; i++) {
String msg="No"+i;
System.out.println(msg);
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
Thread.sleep(i*20);
}
channel.close();
connection.close();}
}
3.3、消费者 1
public class WorkRecv {
private final static String QUEUE_NAME="work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取链接
Connection connection = connectionUtil.getConnection();
// 获取 channel
Channel channel = connection.createChannel();
// 申明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 定义一个消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
// 音讯触发
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);
String msg=new String(body,"utf-8");
System.out.println("receive"+msg);
try {Thread.sleep(1000);
} catch (InterruptedException e) {e.printStackTrace();
}finally {System.out.println("完结");
}
}
};
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
}
3.4、消费者 2
public class WorkRecv {
private final static String QUEUE_NAME="work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取链接
Connection connection = connectionUtil.getConnection();
// 获取 channel
Channel channel = connection.createChannel();
// 申明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 定义一个消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
// 音讯触发
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);
String msg=new String(body,"utf-8");
System.out.println("receive"+msg);
try {Thread.sleep(1000);
} catch (InterruptedException e) {e.printStackTrace();
}finally {System.out.println("完结");
}
}
};
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
}
}
3.5、总结
- 采纳了轮询算法轮询机制
4、偏心散发
4.1、生产者
public class WorkQueue {
public static final String QUEUE_NAME="work_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 创立连贯
Connection connection = connectionUtil.getConnection();
// 获取 channel 链接
Channel channel = connection.createChannel();
//
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/*
* 每个消费者发送确认音讯之前,音讯队列不发送下一个音讯到消费者。【一次只解决一个音讯】* */
int prefetchCount=1;
channel.basicQos(prefetchCount);
for (int i = 0; i <50 ; i++) {
String msg="No"+i;
System.out.println(msg);
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
Thread.sleep(i*20);
}
channel.close();
connection.close();}
}
4.2、消费者 1
public class WorkRecv {
private final static String QUEUE_NAME="work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取链接
Connection connection = connectionUtil.getConnection();
// 获取 channel
final Channel channel = connection.createChannel();
// 申明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicQos(1); // 一次只散发一个
// 定义一个消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
// 音讯触发
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);
String msg=new String(body,"utf-8");
System.out.println("receive"+msg);
try {Thread.sleep(1000);
} catch (InterruptedException e) {e.printStackTrace();
}finally {System.out.println("完结");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
channel.basicConsume(QUEUE_NAME,false,defaultConsumer);
}
}
4.2、消费者 2
public class WorkReceive {
private final static String QUEUE_NAME="work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取链接
Connection connection = connectionUtil.getConnection();
// 获取 channel
final Channel channel = connection.createChannel();
// 申明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 保障每次只散发一个
channel.basicQos(1);
// 定义一个消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
// 音讯触发
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);
String msg=new String(body,"utf-8");
System.out.println("receive"+msg);
try {Thread.sleep(2000);
} catch (InterruptedException e) {e.printStackTrace();
}finally {System.out.println("【start】:");
// 手动回执一个音讯
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
channel.basicConsume(QUEUE_NAME,false,defaultConsumer);
}
}
4.3、疑难问题
- basicQos(1); 用于限度 rabbitMQ 一次只散发一个音讯
- 应用偏心散发必须敞开自动应答 ack,改成手动的
5、音讯应答与音讯长久化
5.1、音讯应答
- boolean autoack=false(手动确认模式),如果有消费者宕机,则会将信息交付给其余的消费者。【rabbitmq 反对音讯应答,消费者发送一个音讯确认,则 rabbitmq 会进行删除内存数据】
- Boolean autoAck=true(主动确认模式),一旦 rabbitmq 将音讯分发给消费者就会从内存中删除
如果这种状况下,杀死正在执行的消费者,就会造成正在解决的信息失落。
- 默认状况下是 autoAck 是 false
- 如果 rabbitMQ 宕机,则服务器数据依然失落
5.2、音讯长久化
// 申明队列
channel.queueDeclare(QUEUE_NAME,[durable]false,false,false,null);
//durable:长久化,
-
将程序中的 durable 的 false 改称为 true,也是不能够的。因为定义的 QUEUE_NAME 代表这个 queue 是未长久化的,rabbitmq 不准从新定义一个已存在的队列
- 在控制台【拜访 mq 的界面】进行删除这个队列
6、订阅模式【fanout】
- 一个生产者,多个消费者
- 每个消费者都有本人的队列
- 生产者将音讯发送到交换机【转发器】,而不是间接发送到队列中
- 每个队列都绑定在交换机
- 生产者发送的音讯,通过交换机达到队列【实现一个音讯被多个消费者生产】
- 交换机中无奈数据【只有队列具备存储能力】,如果想要进行存储
6.1、生产者
public class Send {
public static final String EXCHANGE_NAME="test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 申明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");// 散发
String msg="hello_exchange";
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
System.out.println("send:"+msg);
channel.close();
connection.close();}
}
6.2、消费者
public class Receive1 {
public static final String QUEUE_NAME="test_queue_exchange";
public static final String EXCHANGE_NAME="test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
channel.basicQos(1);// 保障每次只能散发一个
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);
String msg=new String(body,"utf-8");
System.out.println("Receive1"+msg);
try {Thread.sleep(1000);
} catch (InterruptedException e) {e.printStackTrace();
}finally {System.out.println("[ok]");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck=false;
channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer);
}
}
6.3、消费者 2
public class Receive2{
public static final String QUEUE_NAME="email_queue_exchange";
public static final String EXCHANGE_NAME="test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
channel.basicQos(1);// 保障每次只能散发一个
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);
String msg=new String(body,"utf-8");
System.out.println("Receive2"+msg);
try {Thread.sleep(1000);
} catch (InterruptedException e) {e.printStackTrace();
}finally {System.out.println("[ok2]");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck=false;
channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer);
}
}
7、Routing【direct】
一方面接管生产者的音讯,另一方面向队列发送音讯
Fanout(不解决路由键)
Direct(解决路由键)
7.1、路由模式
7.2、生产者
public class Send {
public static final String EXCHANGE_NAME="test_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 申明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
String msg=new String("Hello direct");
String routingKey="warning";
channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
System.out.println("send"+msg);
channel.close();
connection.close();}
}
7.3、消费者 1
public class Receive1 {
public static final String EXCHANGE_NAME="test_exchange_direct";
public static final String QUEUE_NAME="queue_direct_1";
public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
//
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicQos(1);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
// 定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);
String msg=new String(body,"utf-8");
try {Thread.sleep(1000);
} catch (InterruptedException e) {e.printStackTrace();
}finally {System.out.println("[done]"+msg);
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck=false;// 自动应答 false
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
7.3、消费者 2
public class Receive2 {
public static final String EXCHANGE_NAME="test_exchange_direct";
public static final String QUEUE_NAME="queue_direct_1";
public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
//
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicQos(1);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
// 定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);
String msg=new String(body,"utf-8");
try {Thread.sleep(1000);
} catch (InterruptedException e) {e.printStackTrace();
}finally {System.out.println("[done]"+msg);
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck=false;// 自动应答 false
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
8、Topics Exchange
相似于 sql 中的含糊查问,
将路由键 routing key 和某个模式盘匹配
-
匹配一个或者多个
- * 匹配一个
8.1、生产者
public class Send {
private static final String EXCHANGE_NAME="test_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
String msg="商品";
channel.basicPublish(EXCHANGE_NAME,"goods.delete",null,msg.getBytes());
System.out.println("---send"+msg);
channel.close();
connection.close();}
}
8.2、消费者 1
public class Receive1 {
public static final String QUEUE_NAME="test_queue_topic_1";
private static final String EXCHANGE_NAME="test_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");
channel.basicQos(1);// 保障每次只能散发一个
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);
String msg=new String(body,"utf-8");
System.out.println("Receive1"+msg);
try {Thread.sleep(1000);
} catch (InterruptedException e) {e.printStackTrace();
}finally {System.out.println("[ok]");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck=false;
channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer);
}
}
8.2、消费者 2
public class Receive2 {
public static final String QUEUE_NAME="test_queue_topic_1=2";
private static final String EXCHANGE_NAME="test_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.#");
channel.basicQos(1);// 保障每次只能散发一个
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);
String msg=new String(body,"utf-8");
System.out.println("Receive1"+msg);
try {Thread.sleep(1000);
} catch (InterruptedException e) {e.printStackTrace();
}finally {System.out.println("[ok]");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck=false;
channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer);
}
}
9、音讯确认机制
在 MQ 中能够通过长久化数据解决 rabbitmq 服务器异样的数据失落问题
-
生产者将音讯发送进来当前,如何晓得音讯到底有没有达到 rabbitmq 服务器?
- 形式一:AMQP 实现事务机制
- 形式二:Confirm 模式
9.1、事务机制
txSelect:用户将以后 channel 设置成 transaction 模式
txCommit:用于提交事务
txRollback:回滚事务
生产者
public class TxSend {
private static final String QUEUE_NAME="tx_queue";
public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String msg="hello autoCommit";
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
try {channel.txSelect();
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
channel.txCommit();
int i=1/0;
System.out.println(msg);
} catch (IOException e) {channel.txRollback();
System.out.println("error");
e.printStackTrace();}
channel.close();
connection.close();}
}
消费者
public class TxReceive1 {
private static final String QUEUE_NAME="tx_queue";
public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("receive"+new String(body,"utf-8"));
}
});
}
}
9.2、生产者 Confirm 模式
9.2.1、发送单条
public class Send1 {
private static final String QUEUE_NAME="tx_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 设置为 confirm 模式
channel.confirmSelect();
String msg="confirm_text";
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
if (!channel.waitForConfirms()){System.out.println("send message failed");
}else {System.out.println("success");
}
channel.close();
connection.close();}
}
9.2.2、批量发送
public class SendMore {
private static final String QUEUE_NAME="confirm_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 设置为 confirm 模式
channel.confirmSelect();
String msg="confirm_text";
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
// 次要进行遍历发送,串行的【发送完之后在进行确认】for (int i = 0; i <10 ; i++) {channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
}
if (!channel.waitForConfirms()){System.out.println("send message failed");
}else {System.out.println("success");
}
channel.close();
connection.close();}
}
9.3、异步模式
10、参数详解
10.1、queueDeclare
// 绑定 channel 与音讯队列
// 参数一: 队列名称【如果不存在该队列,则主动创立】// 参数二:durable【是否要进行长久化】// 参数三:exclusive【是否独占队列】// 参数四: 是否在生产实现之后是否要立刻删除队列【true:主动删除】【false:不主动删除】// 参数五:额定参数
channel.QueueDeclare(name, durable, autoDelete, exclusive, noWait, args);
channel.queueDeclare("helloWorld",false,false,false,null);
10.2、basicPublish
// 公布音讯
/*
* 参数 1:【exchange】交换机名称
* 参数 2:【routingKey】队列名称
* 参数 3:【props】传递音讯的额定设置
* 参数 4:传递音讯的具体内容【byte 类型】*
* */
channel.basicPublish("","helloWorld",null,"hello rabbitMQ".getBytes());
10.3、basicConsume
// 生产信息
/*
* 参数一: 队列名称
* 参数二:开启音讯的主动确认机制
* 参数三:生产时的回调接口
* */
channel.basicConsume("helloWorld",true,consumer);
11、整合 Boot
11.1、依赖导入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
11.2、yml
spring:
application:
name: rabbit-springboot
rabbitmq:
host: 192.168.168.130
port: 5672
virtual-host: /
username: lgz
password: pwd123456
11.3boottest
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {rabbitTemplate.convertAndSend("hello","helloWorld");
System.out.println(1);
}
11.4、
@SpringBootTest
class BootRabbitmqApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {rabbitTemplate.convertAndSend("hello","helloWorld");
System.out.println(1);
}
@Test
public void testWork(){for (int i = 0; i <5 ; i++) {rabbitTemplate.convertAndSend("work","work 模型");
}
}
@Test
public void testFanout(){rabbitTemplate.convertAndSend("logs","","Fanout model");
}
@Test
public void testRouting(){rabbitTemplate.convertAndSend("directs","info","info_key_routing_information");
}
@Test
public void testTopic(){rabbitTemplate.convertAndSend("topics","user.save","user.save exchange");
}
}
11.5、测试 topic[消费者]
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(type = "topic",name = "topics"),
key = {"product.save","product.*"}
)
})
public void receive(String msg){System.out.println("consumer1"+msg);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(type = "topic",name = "topics"),
key = {"user.save","user.*"}
)
})
public void receive2(String msg){System.out.println("consumer2"+msg);
}
11.5、测试 routing
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "directs",type = "direct"),
key = {"info","error"}
)
})
public void receive1(String msg){System.out.println(msg);
}
11.6、一般的 work
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive(String message){System.out.println("message:["+message+"]");
}
11.7、Fanout 模式
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 创立长期队列
exchange =@Exchange(value = "logs", type = "fanout") // 绑定的交换机
)
})
public void receive1(String msg){System.out.println("["+msg+"]");
}
11.8、Simple
@Component
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class Hello {
@RabbitHandler
public void receive1(String message){System.out.println("message"+message);
}
}
正文完