RabbitMQ-requestresponse

58次阅读

共计 2581 个字符,预计需要花费 7 分钟才能阅读完成。

类似于 ActiveMQ – request/response,通过设置消息的属性来的。

消息的属性:

属性值 描述
Delivery mode 是否持久化,1 为不持久化,2 为持久化
Type 应用程序特定的消息类型
Headers 用户自定义的其他属性
Content type 内容类型。比如 application/json
Content encoding 内容编码, 比如“gzip”
Message ID 消息 ID
Correlation ID 用于 request/response
Reply To 携带响应队列名称
Expiration 消息过期时间
Timestamp 消息的产生时间
User ID 用于验证发布消息的用户身份
App ID 应用程序的名称

request/response 示例


大体步骤如下:

  1. 客户端发携带两个参数,replyTo 和 correlationId。replyTo 是具有 exclusive 属性的队列,用于处理消费者返回的数据。correlationId 是为每个请求设置一个唯一的值。
  2. 把请求发送给 rpc 队列
  3. 服务端处理完数据,通过 replyTo 的队列把消息发给客户端。
  4. 客户端通过 correlationId 消息,请求中的值相匹配,进行处理。

客户端

客户端发送从 0 到 9 的数字给服务端,服务端返回数字的平方,客户端获取到输出:

public final static String requestQueueName = "rpc_queue";

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {for (int i = 0; i < 10; i++) {
        // 声明一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 创建一个与 rabbitmq 服务器的连接
        // 创建一个 Channel
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 定义 correlationId 的值
        final String corrId = UUID.randomUUID().toString();
        // 定义临时队列,临时队列默认是 exclusive 的
        String replyQueueName = channel.queueDeclare().getQueue();
        // 消息携带的 correlationId 和 replyTo
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();
        String message = i + "";
        // 发送消息
        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
        // 根据临时队列的名称等待消息
        channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {if (delivery.getProperties().getCorrelationId().equals(corrId)) {System.out.println("send" + message + "and receive message:" + new String(delivery.getBody(), "UTF-8"));
            }
        }, consumerTag -> {});
    }
    TimeUnit.SECONDS.sleep(10);
}

服务端

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 创建一个与 rabbitmq 服务器的连接
    Connection connection = factory.newConnection();
    // 创建一个 Channel
    Channel channel = connection.createChannel();
    // 通过 Channel 定义队列
    channel.queueDeclare(RPCClient.requestQueueName, false, false, false, null);
    // 异步回调处理
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        // 设置 correlationId
        AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                .Builder()
                .correlationId(delivery.getProperties().getCorrelationId())
                .build();
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("RPCServer Received'" + message);
        String response = "" + Integer.valueOf(message) * Integer.valueOf(message);
        // 通过 replyTo 把消息发回客户端队列
        channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
        // 确认
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    };
    // 接收消息
    channel.basicConsume(RPCClient.requestQueueName, false, deliverCallback, consumerTag -> {});
}

客户端运行结果如下:

服务端运行结果如下:

正文完
 0