共计 6928 个字符,预计需要花费 18 分钟才能阅读完成。
一、前言
RabbitMQ 其实是我最早接触的一个 MQ 框架,我记得当时是在大学的时候跑到图书馆一个人去看,由于 RabbitMQ 官网的英文还不算太难,因此也是参考官网学习的,一共有 6 章,当时是用 Node 来开发的,当时花了一下午看完了,也理解了。而现在回过头来再看,发现已经忘记了个差不多了,现在再回过头来继续看看,然乎记之。以防再忘,读者看时最好有一定的 MQ 基础。
二、RabbitMQ
首先我们需要知道的是 RabbitMQ 它是基于高级队列协议(AMQP)的,它是 Elang 编写的,下面将围绕 RabbitMQ 队列、交换机、RPC 三个重点进行展开。
2.1、队列
存储消息的地方,多个生产者可以将消息发送到一个队列,多个消费者也可以消费同一个队列的消息。
注意:当多个消费者监听一个队列,此时生产者发送消息到队列只有一个消费者被消费,并且消费端的消费方式是按照消费端在内部启动的顺序轮询(round-robin)。
2.2、消费者
消费消息的一方
public class Send {
private final static String QUEUE_NAME = "hello";
private final static String IP = "172.16.12.162";
public static void main(String[] args) {ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP);
factory.setUsername("admin");
factory.setPassword("admin");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("[x] Sent'" + message + "'");
}catch (Exception e){e.printStackTrace();
}
}
}
public class Recv {
private final static String QUEUE_NAME = "hello";
private final static String IP = "172.16.12.162";
public static void main(String[] args) {
try {ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("[*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] Received'" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}catch (Exception e){e.printStackTrace();
}
}
}
2.3、小结
1、Rabbit 是如何保证消息被消费的?
答:通过 ack 机制。每当一个消息被消费端消费的时候,消费端可以发送一个 ack 给 RabbitMQ,这样 RabbitMQ 就知道了该条消息已经被完整消费并且可以被 delete 了。;如果一条消息被消费但是没有发送 ack,那么此时 RabbitMQ 将会认为需要重新消费该消息,如果此时还有其它的消费者,那么此时 RabbitMQ 将会把这条消息交给它处理。
注意:开启 ack 机制的是 autoAck=
false
;
2、消息如何进行持久化?
- 将 queue 持久化,即设置 channel.queueDeclare(QUEUE_NAME, true, false, false, null); 第二个参数 durable 为 true
- 设置消息持久化,即设置 MessageProperties.PERSISTENT_TEXT_PLAIN
注意:消息持久化并不一定保证消息不会被丢失
3、RabbitMQ 如何避免两个消费者一个非常忙一个非常闲的情况?
通过如下设置,保证一个消费者一次只能消费一个消息,只有当它消费完成并且返回 ack 给 RabbitMQ 之后才给它派发新的消息。
int prefetchCount = 1 ;
channel.basicQos(prefetchCount)
4、RabbitMQ 异常情况下如何保证消息不会被重复消费?
需要业务自身实现密等性,RabbitMQ 没有提供比较好的方式去保证。
2.2、交换机
在 RabbitMQ 中,生产者其实从来不会发送消息到队列,甚至,它不知道消息被发送到了哪个队列。那它被发送到了哪里呢?就是本节的重点:交换机,下面就是它在 RabbitMQ 中的介绍图。(X 就是交换机)生产者发送消息给交换机,然后由交换机将消息转发给队列。
从上图就产生一个问题:X 怎么将消息发给 queue 呢?它是把消息发给所有 queue 还是发给一个指定的 queue 或者丢弃消息呢?这就是看交换机的类型了。下面一起谈谈这几种类型
2.2.1、fanout
fanout:广播模式,这个比较好理解,就是所有的队列都能收到交换机的消息。
如上面,两个队列都能收到交换机的消息。
2.2.2、direct
这个模式相当于发布 / 订阅模式的一种,当交换机类型为 direct 的时候,此时我们需要设置两个参数:
- channel.basicPublish(EXCHANGE_NAME, “”, null, message.getBytes(“UTF-8”)); 第二个参数,我们可以把它称呼为 routeKey
- channel.queueBind(queueName, EXCHANGE_NAME, “”); 第三个参数,我们把它称呼为 bindKey
有了这两个参数,我们就可以指定我们订阅哪些消息了。
如图,Q1 订阅了 orange 的消息,Q2 订阅了 black、green 的消息。
2.2.3、topic
其实 topic 和 direct 有一点类似,它相当于对 direct 作了增强。在 direct 中,我们上面所说的 bind routeKey 为 black、green 的它是有限制的,它只能绝对的等于 routeKey,但是有时候我们的需求不是这样,我们可能想要的是正则匹配即可,那么 Topic 就派上用场了。
当类型为 topic 时,它的 bindKey 对应字符串需要是以“.”分割,同时 RabbitMQ 还提供了两个符号:
- 星号(*): 表示 1 个单词
- 井号(#): 表示 0、多个单词
上图的意思是: 所有第二个单词为 orange 的消息发送个 Q1,所有最后一个单词为 rabbit 或者第一个单词为 lazy 的消息发送给 Q2。
2.2.4、header
这一种类型官方 demo 没有过多解释,这里也不研究了。
2.3、RPC
RabbitMQ 还可以实现 RPC(远程过程调用)。什么是 RPC, 简单来说就是 local 调用 remote 方法。对应于 RabbitMQ 中则是 Client 发送一个 request message,Server 处理完成之后将其返回给 Client。这里就有了一个疑问?Server 是如何将 response 返回给 Client 的,这里 RabbitMQ 定义了一个概念:Callback Queue。
Callback Queue
注意这个队列是独一无二的 String replyQueueName = channel.queueDeclare().getQueue();
。
首先我们需要明白一点的是为什么需要这个 queue?我们知道在 RabbitMQ 作消息队列的时候,Client 只需要将消息投放到 queue 中,然后 Server 从 queue 去取就可以了。但是在 RabbitMQ 作为 RPC 的时候多了一点就是,Client 还需要返回结果,这时 Server 端怎么知道把消息发送给 Client,这就是 Callback Queue 的用处了。
Correlation Id
在上面我们知道 Server 返回数据给 Client 是通过 Callback Queue 的,那么是为每一个 request 都创建一个 queue 吗?这未免太过浪费资源,RabbitMQ 有更好的方案。在我们发送 request,绑定一个唯一 ID(correlationId), 然后在消息被处理返回的时候取出这个 ID 和发出去的 ID 进行匹配。这样来说一个 Callback Queue 是 Client 级别而不是 request 级别的了。
实现
上面介绍了 RabbitMQ 实现 RPC 最重要的两个概念,具体代码比较简单还是贴下把。
client 端
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
public RPCClient() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();}
public static void main(String[] argv) throws Exception{RPCClient fibonacciRpc = new RPCClient();
try {for (int i = 0; i < 32; i++) {String i_str = Integer.toString(i);
System.out.println("[x] Requesting fib(" + i_str + ")");
String response = fibonacciRpc.call(i_str);
System.out.println("[.] Got'" + response + "'");
}
} catch (Exception e) {e.printStackTrace();
}
}
public String call(String message) throws IOException, InterruptedException {final String corrId = UUID.randomUUID().toString();
String replyQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {if (delivery.getProperties().getCorrelationId().equals(corrId)) {response.offer(new String(delivery.getBody(), "UTF-8"));
}
}, consumerTag -> {});
String result = response.take();
channel.basicCancel(ctag);
return result;
}
public void close() throws IOException {connection.close();
}
}
服务端
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) {if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.queuePurge(RPC_QUEUE_NAME);
channel.basicQos(1);
System.out.println("[x] Awaiting RPC requests");
Object monitor = new Object();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
String response = "";
try {String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println("[.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e) {System.out.println("[.]" + e.toString());
} finally {channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC server owner thread
synchronized (monitor) {monitor.notify();
}
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> {}));
// Wait and be prepared to consume the message from RPC client.
while (true) {synchronized (monitor) {
try {monitor.wait();
} catch (InterruptedException e) {e.printStackTrace();
}
}
}
}
}
}
三、总结
这次回头再看 RabbitMQ,再次重新理解了以下 RabbitMQ,有些东西还是要慢慢嚼的。当然这些也都是官网的入门例子,后续有机会的话再深入研究。