消费者1:
Consumer_PubSub1.java
package com.example;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
- @version v1.0
- @Date: 2021/6/11 22:58
- @Author: Mr.Throne
- @Description: 消费者
*/
public class Consumer_PubSub1 {
public static void main(String[] args) throws IOException, TimeoutException { String queueName1 = "test_fanout_queue1"; String queueName2 = "test_fanout_queue2"; //1.创立连贯工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设定参数 ip地址 factory.setHost("121.196.161.240"); factory.setPort(5672); factory.setVirtualHost("/admin"); factory.setUsername("admin"); factory.setPassword("admin"); //3.创立连贯 Connection Connection connection = factory.newConnection(); //4.创立Channel Channel channel = connection.createChannel(); //5.创立队列Queue // 参数String queue名称 没有主动创立 // boolean durable 是否长久化 // boolean exclusive 是否独占 连贯敞开后是否删除队列 // boolean autoDelete 没有Consumer时 主动删除 // Map<String, Object> arguments 参数信息 channel.queueDeclare("work_queues",true,false,false,null); //6.接管音讯 // String queue 队列名称 // boolean autoAck 是否主动确认 // Consumer callback 回调对象 Consumer consumer = new DefaultConsumer(channel){ //收到音讯后的回调办法 //consumerTag 音讯标识 //envelope 获取交换机 路由等信息 //body 数据 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
// System.out.println("properties:"+properties);
System.out.println("body:"+new String(body)); System.out.println("将日志信息打印到控制台"); } }; channel.basicConsume("test_fanout_queue1",true,consumer);}
}
消费者2:
package com.example;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
- @version v1.0
- @Date: 2021/6/11 22:58
- @Author: Mr.Throne
- @Description: 消费者
*/
public class Consumer_PubSub2 {
public static void main(String[] args) throws IOException, TimeoutException { String queueName1 = "test_fanout_queue1"; String queueName2 = "test_fanout_queue2"; //1.创立连贯工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设定参数 ip地址 factory.setHost("121.196.161.240"); factory.setPort(5672); factory.setVirtualHost("/admin"); factory.setUsername("admin"); factory.setPassword("admin"); //3.创立连贯 Connection Connection connection = factory.newConnection(); //4.创立Channel Channel channel = connection.createChannel(); //5.创立队列Queue // 参数String queue名称 没有主动创立 // boolean durable 是否长久化 // boolean exclusive 是否独占 连贯敞开后是否删除队列 // boolean autoDelete 没有Consumer时 主动删除 // Map<String, Object> arguments [能源期货](https://www.gendan5.com/cf/ef.html)参数信息 channel.queueDeclare("work_queues",true,false,false,null); //6.接管音讯 // String queue 队列名称 // boolean autoAck 是否主动确认 // Consumer callback 回调对象 Consumer consumer = new DefaultConsumer(channel){ //收到音讯后的回调办法 //consumerTag 音讯标识 //envelope 获取交换机 路由等信息 //body 数据 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
// System.out.println("properties:"+properties);
System.out.println("body:"+new String(body)); System.out.println("将日志信息保留到数据库"); } }; channel.basicConsume("test_fanout_queue2",true,consumer);}
}