共计 2905 个字符,预计需要花费 8 分钟才能阅读完成。
rabbitmq 六种工作模式
1. 简略模式
RabbitMQ 是一个消息中间件,你能够设想它是一个邮局。当你把函件放到邮箱里时,可能确信邮递员会正确地递送你的函件。RabbitMq 就是一个邮箱、一个邮局和一个邮递员。
- 发送音讯的程序是生产者
- 队列就代表一个邮箱。尽管音讯会流经 RbbitMQ 和你的应用程序,但音讯只能被存储在队列里。队列存储空间只受服务器内存和磁盘限度,它实质上是一个大的音讯缓冲区。多个生产者能够向同一个队列发送音讯,多个消费者也能够从同一个队列接管音讯.
- 消费者期待从队列接管音讯
1.pom.xml
增加 slf4j 依赖, 和 rabbitmq 依赖
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tedu</groupId>
<artifactId>rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.8.0-alpha2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.8.0-alpha2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
2. 生产者发送音讯
public class Test1 {public static void main(String[] args) throws Exception {
// 创立连贯工厂, 并设置连贯信息
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);// 可选,5672 是默认端口
f.setUsername("admin");
f.setPassword("admin");
/*
* 与 rabbitmq 服务器建设连贯,
* rabbitmq 服务器端应用的是 nio, 会复用 tcp 连贯,
* 并开拓多个信道与客户端通信
* 以加重服务器端建设连贯的开销
*/
Connection c = f.newConnection();
// 建设信道
Channel ch = c.createChannel();
/*
* 申明队列, 会在 rabbitmq 中创立一个队列
* 如果曾经创立过该队列,就不能再应用其余参数来创立
*
* 参数含意:
* -queue: 队列名称
* -durable: 队列长久化,true 示意 RabbitMQ 重启后队列仍存在
* -exclusive: 排他,true 示意限度仅以后连贯可用
* -autoDelete: 当最初一个消费者断开后, 是否删除队列
* -arguments: 其余参数
*/
ch.queueDeclare("helloworld", false,false,false,null);
/*
* 公布音讯
* 这里把音讯向默认交换机发送.
* 默认交换机隐含与所有队列绑定,routing key 即为队列名称
*
* 参数含意:
* -exchange: 交换机名称, 空串示意默认交换机 "(AMQP default)", 不能用 null
* -routingKey: 对于默认交换机, 路由键就是指标队列名称
* -props: 其余参数, 例如头信息
* -body: 音讯内容 byte[] 数组
*/
ch.basicPublish("","helloworld", null,"Hello world!".getBytes());
System.out.println("音讯已发送");
c.close();}
}
3. 消费者接管队列
public class Test2 {public static void main(String[] args) throws Exception {
// 连贯工厂
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setUsername("admin");
f.setPassword("admin");
// 建设连贯
Connection c = f.newConnection();
// 建设信道
Channel ch = c.createChannel();
// 申明队列, 如果该队列曾经创立过, 则不会反复创立
ch.queueDeclare("helloworld",false,false,false,null);
System.out.println("期待接收数据");
// 收到音讯后用来解决音讯的回调对象
DeliverCallback callback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {String msg = new String(message.getBody(), "UTF-8");
System.out.println("收到:"+msg);
}
};
// 消费者勾销时的回调对象
CancelCallback cancel = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {}};
ch.basicConsume("helloworld", true, callback, cancel);
}
}
正文完