消费者 1:
Consumer_PubSub1.java
package com.example;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
- @version v1.0
- @Date: 2021/6/11 22:58
- @Author: Mr.Throne
- @Description: 消费者
*/
public class Consumer_PubSub1 {
public static void main(String[] args) throws IOException, TimeoutException {
String queueName1 = "test_fanout_queue1";
String queueName2 = "test_fanout_queue2";
//1. 创立连贯工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设定参数 ip 地址
factory.setHost("121.196.161.240");
factory.setPort(5672);
factory.setVirtualHost("/admin");
factory.setUsername("admin");
factory.setPassword("admin");
//3. 创立连贯 Connection
Connection connection = factory.newConnection();
//4. 创立 Channel
Channel channel = connection.createChannel();
//5. 创立队列 Queue
// 参数 String queue 名称 没有主动创立
// boolean durable 是否长久化
// boolean exclusive 是否独占 连贯敞开后是否删除队列
// boolean autoDelete 没有 Consumer 时 主动删除
// Map<String, Object> arguments 参数信息
channel.queueDeclare("work_queues",true,false,false,null);
//6. 接管音讯
// String queue 队列名称
// boolean autoAck 是否主动确认
// Consumer callback 回调对象
Consumer consumer = new DefaultConsumer(channel){
// 收到音讯后的回调办法
//consumerTag 音讯标识
//envelope 获取交换机 路由等信息
//body 数据
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println(“consumerTag:”+consumerTag);
// System.out.println(“Exchange:”+envelope.getExchange());
// System.out.println(“RoutingKey:”+envelope.getRoutingKey());
// System.out.println(“properties:”+properties);
System.out.println("body:"+new String(body));
System.out.println("将日志信息打印到控制台");
}
};
channel.basicConsume("test_fanout_queue1",true,consumer);
}
}
消费者 2:
package com.example;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
- @version v1.0
- @Date: 2021/6/11 22:58
- @Author: Mr.Throne
- @Description: 消费者
*/
public class Consumer_PubSub2 {
public static void main(String[] args) throws IOException, TimeoutException {
String queueName1 = "test_fanout_queue1";
String queueName2 = "test_fanout_queue2";
//1. 创立连贯工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设定参数 ip 地址
factory.setHost("121.196.161.240");
factory.setPort(5672);
factory.setVirtualHost("/admin");
factory.setUsername("admin");
factory.setPassword("admin");
//3. 创立连贯 Connection
Connection connection = factory.newConnection();
//4. 创立 Channel
Channel channel = connection.createChannel();
//5. 创立队列 Queue
// 参数 String queue 名称 没有主动创立
// boolean durable 是否长久化
// boolean exclusive 是否独占 连贯敞开后是否删除队列
// boolean autoDelete 没有 Consumer 时 主动删除
// Map<String, Object> arguments [能源期货](https://www.gendan5.com/cf/ef.html) 参数信息
channel.queueDeclare("work_queues",true,false,false,null);
//6. 接管音讯
// String queue 队列名称
// boolean autoAck 是否主动确认
// Consumer callback 回调对象
Consumer consumer = new DefaultConsumer(channel){
// 收到音讯后的回调办法
//consumerTag 音讯标识
//envelope 获取交换机 路由等信息
//body 数据
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println(“consumerTag:”+consumerTag);
// System.out.println(“Exchange:”+envelope.getExchange());
// System.out.println(“RoutingKey:”+envelope.getRoutingKey());
// System.out.println(“properties:”+properties);
System.out.println("body:"+new String(body));
System.out.println("将日志信息保留到数据库");
}
};
channel.basicConsume("test_fanout_queue2",true,consumer);
}
}