共计 6874 个字符,预计需要花费 18 分钟才能阅读完成。
交换器
RabbitMQ 消息传递模型的核心思想是生产者从不间接向队列发送任何音讯。生产者只将音讯发送到 Exchange 交换器中,并不知道音讯是否会被传送到队列。交换器负责接管生产者生产的音讯,并通过肯定路由规定将音讯发送到指定的队列,起到一个传递的作用
类型介绍
RabbitMQ 罕用的 Exchange Type 有 fanout、direct、topic、headers 这四种(AMQP 标准里还提到两种 Exchange Type,别离为 system 与自定义,这里不予以形容)。
fanout
fanout 类型的 Exchange 路由规定十分。它会把所有发送到该 Exchange 的音讯路由到所有与它绑定的 Queue 中。这种模式在 RabboitMQ 官网介绍中称之为:公布 / 订阅
图 1 fanout Exchange
来看下官网给出的代码示例:
emit_log.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange_name = 'exchange_name';
$channel->exchange_declare($exchange_name, 'fanout', false, false, false);
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {$data = "info: Hello World!";}
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'logs');
echo '[x] Sent', $data, "\n";
$channel->close();
$connection->close();
receive_logs.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange_name = 'exchange_name';
$channel->exchange_declare($exchange_name, 'fanout', false, false, false);
// 获取零碎返回的队列名称
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// 将 Queue 和 Exchange 绑定
$channel->queue_bind($queue_name, $exchange_name);
echo "[*] Waiting for logs. To exit press CTRL+C\n";
$callback = function ($msg) {echo '[x]', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while ($channel->is_open()) {$channel->wait();
}
$channel->close();
$connection->close();
执行 shell 命令
php receive_logs.php
php emit_log.php
direct
direct 类型的 Exchange 路由规定也很简略,它会把音讯路由到那些 binding key 与
routing key 齐全匹配的 Queue 中。官网阐明:direct
图 2 direct Exchange
以上图的配置为例,咱们以 routingKey=”error”发送音讯到 Exchange,则音讯会路由到 Queue1(amqp.gen-S9b…,这是由 RabbitMQ 主动生成的 Queue 名称)和 Queue2(amqp.gen-Agl…);如果咱们以 routingKey=”info”或 routingKey=”warning”来发送音讯,则音讯只会路由到 Queue2。如果咱们以其余 routingKey 发送音讯,则音讯不会路由到这两个 Queue 中。
来看下官网给出的代码示例:
emit_log.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange_name = 'direct_logs';
$channel->exchange_declare($exchange_name, 'direct', false, false, false);
$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';
$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {$data = "Hello World!";}
$msg = new AMQPMessage($data);
// basic_publish 函数中第三个参数指定 routingKey,将音讯、交换器和 routingKey 绑定到一起
$channel->basic_publish($msg, $exchange_name, $severity);
echo '[x] Sent', $severity, ':', $data, "\n";
$channel->close();
$connection->close();
receive_logs.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange_name = 'direct_logs';
$channel->exchange_declare($exchange_name, 'direct', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$severities = array_slice($argv, 1);
if (empty($severities)) {file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
exit(1);
}
// 循环调用 queue_bind 函数,第三个参数 routingKey,将队列、交换器和 routingKey 绑定到一起,交换器会依据 routingKey 将音讯路由到绑定的队列中
foreach ($severities as $severity) {$channel->queue_bind($queue_name, $exchange_name, $severity);
}
echo "[*] Waiting for logs. To exit press CTRL+C\n";
$callback = function ($msg) {echo '[x]', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while ($channel->is_open()) {$channel->wait();
}
$channel->close();
$connection->close();
执行 shell 命令:
# 生产者:php emit_log.php error "routingKey error"
php emit_log.php warning "routingKey warning"
php emit_log.php info "routingKey info"
php emit_log.php info warning error "routingKey info warning error"
# 消费者:# 消费者能够执行多个终端
php receive_logs.php info warning error
php receive_logs.php info
php receive_logs.php warning
php receive_logs.php warning error
topic
direct 类型的 Exchange 路由规定是齐全匹配 binding key 与 routing key,但这种严格的匹配形式在很多状况下不能满足理论业务需要。topic 类型的 Exchange 在匹配规定上进行了扩大,它与 direct 类型的 Exchage 类似,也是将音讯路由到 binding key 与 routing key 相匹配的 Queue 中,但这里的匹配规定有些不同。官网解释:topic。它约定:
- routing key 为一个英文句点号“.”分隔的字符串(咱们将被句点号“.”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
- binding key 与 routing key 一样也是句点号“.”分隔的字符串
- binding key 中能够存在两种特殊字符“”与“#”,用于做含糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(能够是零个
图 3 topic Exchange
以上图中的配置为例,routingKey=”quick.orange.rabbit”的音讯会同时路由到 Q1 与 Q2,routingKey=”lazy.orange.fox”的音讯会路由到 Q1,routingKey=”lazy.brown.fox”的音讯会路由到 Q2,routingKey=”lazy.pink.rabbit”的音讯会路由到 Q2(只会投递给 Q2 一次,尽管这个 routingKey 与 Q2 的两个 bindingKey 都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的音讯将会被抛弃,因为它们没有匹配任何 bindingKey。
来看下官网给出的代码示例:
emit_log.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange_name = 'topic_logs';
$channel->exchange_declare($exchange_name, 'topic', false, false, false);
$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {$data = "Hello World!";}
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, $exchange_name, $routing_key);
echo '[x] Sent', $routing_key, ':', $data, "\n";
$channel->close();
$connection->close();
receive_logs.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange_name = 'topic_logs';
$channel->exchange_declare($exchange_name, 'topic', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$binding_keys = array_slice($argv, 1);
if (empty($binding_keys)) {file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
exit(1);
}
foreach ($binding_keys as $binding_key) {$channel->queue_bind($queue_name, $exchange_name, $binding_key);
}
echo "[*] Waiting for logs. To exit press CTRL+C\n";
$callback = function ($msg) {echo '[x]', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while ($channel->is_open()) {$channel->wait();
}
$channel->close();
$connection->close();
执行 shell 命令,可自行测试:
# 所有
php receive_logs.php "#"
# 路由到 Q1 与 Q2
php receive_logs.php "quick.orange.rabbit"
# 路由到 Q1
php receive_logs.php "lazy.orange.fox"
# 路由到 Q2
php receive_logs.php "lazy.brown.fox*"
# 路由到 Q1 与 Q2
php emit_log.php "quick.orange.rabbit" "Route to Q1 and Q2 at the same time"
headers
headers 类型的 Exchange 不依赖于 routing key 与 binding key 的匹配规定来路由消
息,而是依据发送的音讯内容中的 headers 属性进行匹配。
在绑定 Queue 与 Exchange 时指定一组键值对;当音讯发送到 Exchange 时,RabbitMQ 会取
到该音讯的 headers(也是一个键值对的模式),比照其中的键值对是否齐全匹配 Queue 与
Exchange 绑定时指定的键值对;如果齐全匹配则音讯会路由到该 Queue,否则不会路由到该
Queue。
headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。