乐趣区

【译】RabbitMQ系列(六)-RPC模式

RPC 模式
在第二章中我们学习了如何使用 Work 模式在多个 worker 之间派发时间敏感的任务。这种情况是不涉及到返回值的,worker 执行任务就好。如果涉及返回值,就要用到本章提到的 RPC(Remote Procedure Call)了。
本章我们使用 RabbitMQ 来构建一个 RPC 系统:一个客户端和一个可扩展的 RPC 服务端。我们让 RPC 服务返回一个斐波那契数组。
Client interface
我们创建一个简单的客户端类来演示如何使用 RPC 服务。call 方法发送 RPC 请求,并阻塞知道结果返回。
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call(“4”);
System.out.println(“fib(4) is ” + result);

RPC 贴士虽然 RPC 的使用在计算机领域非常普遍,但是却经常受到批评。主要问题是编码人如果不注意使用的方法是本地还是远程时,往往会造成问题。往往让系统变得不可预知,增加不必要的复杂性和调试的难度。对此我们有如下几点建议:

是本地方法还是远程方法要一目了然
把系统的依赖写进文档
系统要处理好超时的问题

如果可以尽量使用异步的 pipeline 来替代像 RPC 这种阻塞的操作。

Callback queue
在 RabbitMQ 上实现 RPC 是非常简单的。客户端发送一个 request message,服务端回应一个 response message。为了接受 response message 我们需要在发送 request message 的时候附带上 ’callback’ queue 的地址。我们可以使用默认的 queue。
callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();

channel.basicPublish(“”, “rpc_queue”, props, message.getBytes());

// … then code to read a response message from the callback_queue …

Message 的属性 AMQP 0-9- 1 协议预定义了 14 个消息属性,其中大部分很少使用,下面的属性较为常用

deliverMode: 标记 message 为持久 (设置为 2) 或其他值。
contentType:message 的编码类型,我们经常使用 JSON 编码,则设置为 application/json
replyTo: 命名回调 queue
correlationId: 将 RPC 的请求和回应关联起来

需要引入新的类
import com.rabbitmq.client.AMQP.BasicProperties;
Correlaton Id
在上面的代码中,每次 RPC 请求都会创建一个用于回调的临时 queue,我们有更好的方法,我们为每一个 client 创建一个回调 queue。
但是这样有新的问题,从回调 queue 中收到 response 无法和相应的 request 关联起来。这时候就是 correlationId 属性发挥作用的时候了。为每个 request 中设置唯一的值,在稍后的回调 queue 中收到的 response 里也有这个属性,基于此,我们就可以关联之前的 request 了。如果我们遇到一个匹配不到的 correlationId, 那么丢弃的行为是安全的。
你可能会问,为什么我们忽略这些无法匹配的 message,而不是当做一个错误处理呢?主要是考虑服务端的竞态条件,如果 RPC 服务器在发送 response 之后就宕机了,但是却没有发送 ack 消息。那么当 RPC Server 重启之后,会继续执行这个 request。这就是为什么 client 需要幂等处理 response。
Summary
我们的 RPC 向下面这样进行工作:

对于一个 RPC request,客户端发送 message 时设置两个属性:replyTo 设置成一个没有名字的 request 独有的 queue;为每个 request 设置一个唯一的 correlationId。
request 发送到 rpc_queue
RPC worker 监听 rpc_queue。当有消息时,进行计算并通过 replyTo 指定的 queue 发送 message 给客户端。
客户端监听回调 queue。当接收到 message,则检查 correlationId。如果和之前的 request 匹配,则将消息返回给应用进行处理。

开始执行
斐波那契处理函数
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}
这是一个简易的实现,如果传入一个较大的值,将会是个灾难。RPC 服务器的代码为 RPCServer.java, 代码是很简单明确的

先是建立 connection,channel 和声明 queue.
设置 prefetchCount,我们基于请求频繁程度,会启动多个 RPC Server
使用 basicConsume 来接收,该方法提供回调参数设置(DeliverCallback).

RPC 客户端的代码为 RPCClient.java, 代码略微有点复杂

建立 connection 和 channel。
call 方法来发送 RPC 请求
生成 correlationId
生成默认名字的 queue 用于 reply,并订阅它
发送 request message,设置参数 replyTo 和 correlationId.
然后返回并开始等待 response 到达
因为消费者发送 response 是在另一个线程中,我们需要让 main 线程阻塞,在这里我们使用 BlockingQueue。
消费者进行简单的处理,为每一个 response message 检查其 correlationId, 如果是,则将 response 添加进阻塞队列
main 函数阻塞在 BlockingQueue 返回
将 response 返回给用户

RPCClient.java 完整代码
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RPCClient implements AutoCloseable {

private Connection connection;
private Channel channel;
private String requestQueueName = “rpc_queue”;

public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(“localhost”);

connection = factory.newConnection();
channel = connection.createChannel();
}

public static void main(String[] argv) {
try (RPCClient fibonacciRpc = new RPCClient()) {
for (int i = 0; i < 32; i++) {
String i_str = Integer.toString(i);
System.out.println(” [x] Requesting fib(” + i_str + “)”);
String response = fibonacciRpc.call(i_str);
System.out.println(” [.] Got ‘” + response + “‘”);
}
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
}
}

public String call(String message) throws IOException, InterruptedException {
final String corrId = UUID.randomUUID().toString();

String replyQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();

channel.basicPublish(“”, requestQueueName, props, message.getBytes(“UTF-8”));

final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response.offer(new String(delivery.getBody(), “UTF-8”));
}
}, consumerTag -> {
});

String result = response.take();
channel.basicCancel(ctag);
return result;
}

public void close() throws IOException {
connection.close();
}
}
RPCServer.java 完整代码
import com.rabbitmq.client.*;

public class RPCServer {

private static final String RPC_QUEUE_NAME = “rpc_queue”;

private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n – 1) + fib(n – 2);
}

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(“localhost”);

try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.queuePurge(RPC_QUEUE_NAME);

channel.basicQos(1);

System.out.println(” [x] Awaiting RPC requests”);

Object monitor = new Object();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();

String response = “”;

try {
String message = new String(delivery.getBody(), “UTF-8″);
int n = Integer.parseInt(message);

System.out.println(” [.] fib(” + message + “)”);
response += fib(n);
} catch (RuntimeException e) {
System.out.println(” [.] ” + e.toString());
} finally {
channel.basicPublish(“”, delivery.getProperties().getReplyTo(), replyProps, response.getBytes(“UTF-8”));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC server owner thread
synchronized (monitor) {
monitor.notify();
}
}
};

channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> {}));
// Wait and be prepared to consume the message from RPC client.
while (true) {
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}

退出移动版