消费者1:
Consumer_PubSub1.java
package com.example;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**

  • @version v1.0
  • @Date: 2021/6/11 22:58
  • @Author: Mr.Throne
  • @Description: 消费者
    */

public class Consumer_PubSub1 {

public static void main(String[] args) throws IOException, TimeoutException {    String queueName1 = "test_fanout_queue1";    String queueName2 = "test_fanout_queue2";    //1.创立连贯工厂    ConnectionFactory factory = new ConnectionFactory();    //2.设定参数 ip地址    factory.setHost("121.196.161.240");    factory.setPort(5672);    factory.setVirtualHost("/admin");    factory.setUsername("admin");    factory.setPassword("admin");    //3.创立连贯 Connection    Connection connection = factory.newConnection();    //4.创立Channel    Channel channel = connection.createChannel();    //5.创立队列Queue    // 参数String queue名称 没有主动创立    // boolean durable 是否长久化    // boolean exclusive 是否独占 连贯敞开后是否删除队列    // boolean autoDelete 没有Consumer时 主动删除    // Map<String, Object> arguments 参数信息    channel.queueDeclare("work_queues",true,false,false,null);    //6.接管音讯    // String queue 队列名称    // boolean autoAck 是否主动确认    // Consumer callback 回调对象    Consumer consumer = new DefaultConsumer(channel){        //收到音讯后的回调办法        //consumerTag 音讯标识        //envelope 获取交换机 路由等信息        //body 数据        @Override        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
// System.out.println("properties:"+properties);

            System.out.println("body:"+new String(body));            System.out.println("将日志信息打印到控制台");        }    };    channel.basicConsume("test_fanout_queue1",true,consumer);}

}
消费者2:
package com.example;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**

  • @version v1.0
  • @Date: 2021/6/11 22:58
  • @Author: Mr.Throne
  • @Description: 消费者
    */

public class Consumer_PubSub2 {

public static void main(String[] args) throws IOException, TimeoutException {    String queueName1 = "test_fanout_queue1";    String queueName2 = "test_fanout_queue2";    //1.创立连贯工厂    ConnectionFactory factory = new ConnectionFactory();    //2.设定参数 ip地址    factory.setHost("121.196.161.240");    factory.setPort(5672);    factory.setVirtualHost("/admin");    factory.setUsername("admin");    factory.setPassword("admin");    //3.创立连贯 Connection    Connection connection = factory.newConnection();    //4.创立Channel    Channel channel = connection.createChannel();    //5.创立队列Queue    // 参数String queue名称 没有主动创立    // boolean durable 是否长久化    // boolean exclusive 是否独占 连贯敞开后是否删除队列    // boolean autoDelete 没有Consumer时 主动删除    // Map<String, Object> arguments [能源期货](https://www.gendan5.com/cf/ef.html)参数信息    channel.queueDeclare("work_queues",true,false,false,null);    //6.接管音讯    // String queue 队列名称    // boolean autoAck 是否主动确认    // Consumer callback 回调对象    Consumer consumer = new DefaultConsumer(channel){        //收到音讯后的回调办法        //consumerTag 音讯标识        //envelope 获取交换机 路由等信息        //body 数据        @Override        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
// System.out.println("properties:"+properties);

            System.out.println("body:"+new String(body));            System.out.println("将日志信息保留到数据库");        }    };    channel.basicConsume("test_fanout_queue2",true,consumer);}

}