关于java:RabbitMQ发布订阅模式

42次阅读

共计 3203 个字符,预计需要花费 9 分钟才能阅读完成。

消费者 1:
Consumer_PubSub1.java
package com.example;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**

  • @version v1.0
  • @Date: 2021/6/11 22:58
  • @Author: Mr.Throne
  • @Description: 消费者
    */

public class Consumer_PubSub1 {

public static void main(String[] args) throws IOException, TimeoutException {
    String queueName1 = "test_fanout_queue1";
    String queueName2 = "test_fanout_queue2";
    //1. 创立连贯工厂
    ConnectionFactory factory = new ConnectionFactory();
    //2. 设定参数 ip 地址
    factory.setHost("121.196.161.240");
    factory.setPort(5672);
    factory.setVirtualHost("/admin");
    factory.setUsername("admin");
    factory.setPassword("admin");
    //3. 创立连贯 Connection
    Connection connection = factory.newConnection();
    //4. 创立 Channel
    Channel channel = connection.createChannel();
    //5. 创立队列 Queue
    // 参数 String queue 名称 没有主动创立
    // boolean durable 是否长久化
    // boolean exclusive 是否独占 连贯敞开后是否删除队列
    // boolean autoDelete 没有 Consumer 时 主动删除
    // Map<String, Object> arguments 参数信息
    channel.queueDeclare("work_queues",true,false,false,null);
    //6. 接管音讯
    // String queue 队列名称
    // boolean autoAck 是否主动确认
    // Consumer callback 回调对象
    Consumer consumer = new DefaultConsumer(channel){
        // 收到音讯后的回调办法
        //consumerTag 音讯标识
        //envelope 获取交换机 路由等信息
        //body 数据
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

// System.out.println(“consumerTag:”+consumerTag);
// System.out.println(“Exchange:”+envelope.getExchange());
// System.out.println(“RoutingKey:”+envelope.getRoutingKey());
// System.out.println(“properties:”+properties);

            System.out.println("body:"+new String(body));
            System.out.println("将日志信息打印到控制台");
        }
    };
    channel.basicConsume("test_fanout_queue1",true,consumer);
}

}
消费者 2:
package com.example;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**

  • @version v1.0
  • @Date: 2021/6/11 22:58
  • @Author: Mr.Throne
  • @Description: 消费者
    */

public class Consumer_PubSub2 {

public static void main(String[] args) throws IOException, TimeoutException {
    String queueName1 = "test_fanout_queue1";
    String queueName2 = "test_fanout_queue2";
    //1. 创立连贯工厂
    ConnectionFactory factory = new ConnectionFactory();
    //2. 设定参数 ip 地址
    factory.setHost("121.196.161.240");
    factory.setPort(5672);
    factory.setVirtualHost("/admin");
    factory.setUsername("admin");
    factory.setPassword("admin");
    //3. 创立连贯 Connection
    Connection connection = factory.newConnection();
    //4. 创立 Channel
    Channel channel = connection.createChannel();
    //5. 创立队列 Queue
    // 参数 String queue 名称 没有主动创立
    // boolean durable 是否长久化
    // boolean exclusive 是否独占 连贯敞开后是否删除队列
    // boolean autoDelete 没有 Consumer 时 主动删除
    // Map<String, Object> arguments [能源期货](https://www.gendan5.com/cf/ef.html) 参数信息
    channel.queueDeclare("work_queues",true,false,false,null);
    //6. 接管音讯
    // String queue 队列名称
    // boolean autoAck 是否主动确认
    // Consumer callback 回调对象
    Consumer consumer = new DefaultConsumer(channel){
        // 收到音讯后的回调办法
        //consumerTag 音讯标识
        //envelope 获取交换机 路由等信息
        //body 数据
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

// System.out.println(“consumerTag:”+consumerTag);
// System.out.println(“Exchange:”+envelope.getExchange());
// System.out.println(“RoutingKey:”+envelope.getRoutingKey());
// System.out.println(“properties:”+properties);

            System.out.println("body:"+new String(body));
            System.out.println("将日志信息保留到数据库");
        }
    };
    channel.basicConsume("test_fanout_queue2",true,consumer);

}

}

正文完
 0