关于mq:RabbitMQ-Exchange-Types-四种类型

46次阅读

共计 6874 个字符,预计需要花费 18 分钟才能阅读完成。

交换器

RabbitMQ 消息传递模型的核心思想是生产者从不间接向队列发送任何音讯。生产者只将音讯发送到 Exchange 交换器中,并不知道音讯是否会被传送到队列。交换器负责接管生产者生产的音讯,并通过肯定路由规定将音讯发送到指定的队列,起到一个传递的作用

类型介绍

RabbitMQ 罕用的 Exchange Type 有 fanout、direct、topic、headers 这四种(AMQP 标准里还提到两种 Exchange Type,别离为 system 与自定义,这里不予以形容)。

fanout

fanout 类型的 Exchange 路由规定十分。它会把所有发送到该 Exchange 的音讯路由到所有与它绑定的 Queue 中。这种模式在 RabboitMQ 官网介绍中称之为:公布 / 订阅

图 1 fanout Exchange

来看下官网给出的代码示例:

emit_log.php

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange_name = 'exchange_name';
$channel->exchange_declare($exchange_name, 'fanout', false, false, false);

$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {$data = "info: Hello World!";}
$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'logs');

echo '[x] Sent', $data, "\n";

$channel->close();
$connection->close();

receive_logs.php

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange_name = 'exchange_name';
$channel->exchange_declare($exchange_name, 'fanout', false, false, false);
// 获取零碎返回的队列名称
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// 将 Queue 和 Exchange 绑定
$channel->queue_bind($queue_name, $exchange_name);

echo "[*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {echo '[x]', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($channel->is_open()) {$channel->wait();
}

$channel->close();
$connection->close();

执行 shell 命令

php receive_logs.php

php emit_log.php 

direct

direct 类型的 Exchange 路由规定也很简略,它会把音讯路由到那些 binding key 与
routing key 齐全匹配的 Queue 中。官网阐明:direct

图 2 direct Exchange

以上图的配置为例,咱们以 routingKey=”error”发送音讯到 Exchange,则音讯会路由到 Queue1(amqp.gen-S9b…,这是由 RabbitMQ 主动生成的 Queue 名称)和 Queue2(amqp.gen-Agl…);如果咱们以 routingKey=”info”或 routingKey=”warning”来发送音讯,则音讯只会路由到 Queue2。如果咱们以其余 routingKey 发送音讯,则音讯不会路由到这两个 Queue 中。

来看下官网给出的代码示例:

emit_log.php

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange_name = 'direct_logs';
$channel->exchange_declare($exchange_name, 'direct', false, false, false);

$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {$data = "Hello World!";}

$msg = new AMQPMessage($data);

// basic_publish 函数中第三个参数指定 routingKey,将音讯、交换器和 routingKey 绑定到一起
$channel->basic_publish($msg, $exchange_name, $severity);

echo '[x] Sent', $severity, ':', $data, "\n";

$channel->close();
$connection->close();

receive_logs.php

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange_name = 'direct_logs';
$channel->exchange_declare($exchange_name, 'direct', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$severities = array_slice($argv, 1);
if (empty($severities)) {file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
    exit(1);
}

// 循环调用 queue_bind 函数,第三个参数 routingKey,将队列、交换器和 routingKey 绑定到一起,交换器会依据 routingKey 将音讯路由到绑定的队列中
foreach ($severities as $severity) {$channel->queue_bind($queue_name, $exchange_name, $severity);
}

echo "[*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {echo '[x]', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($channel->is_open()) {$channel->wait();
}

$channel->close();
$connection->close();

执行 shell 命令:

 # 生产者:php emit_log.php error "routingKey error"
 php emit_log.php warning "routingKey warning"
 php emit_log.php info "routingKey info"
 php emit_log.php info warning error "routingKey info warning error"
 
 # 消费者:# 消费者能够执行多个终端
 php receive_logs.php info warning error
 php receive_logs.php info
 php receive_logs.php warning
 php receive_logs.php warning error

topic

direct 类型的 Exchange 路由规定是齐全匹配 binding key 与 routing key,但这种严格的匹配形式在很多状况下不能满足理论业务需要。topic 类型的 Exchange 在匹配规定上进行了扩大,它与 direct 类型的 Exchage 类似,也是将音讯路由到 binding key 与 routing key 相匹配的 Queue 中,但这里的匹配规定有些不同。官网解释:topic。它约定:

  • routing key 为一个英文句点号“.”分隔的字符串(咱们将被句点号“.”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
  • binding key 与 routing key 一样也是句点号“.”分隔的字符串
  • binding key 中能够存在两种特殊字符“”与“#”,用于做含糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(能够是零个

图 3 topic Exchange

以上图中的配置为例,routingKey=”quick.orange.rabbit”的音讯会同时路由到 Q1 与 Q2,routingKey=”lazy.orange.fox”的音讯会路由到 Q1,routingKey=”lazy.brown.fox”的音讯会路由到 Q2,routingKey=”lazy.pink.rabbit”的音讯会路由到 Q2(只会投递给 Q2 一次,尽管这个 routingKey 与 Q2 的两个 bindingKey 都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的音讯将会被抛弃,因为它们没有匹配任何 bindingKey。

来看下官网给出的代码示例:

emit_log.php

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange_name = 'topic_logs';
$channel->exchange_declare($exchange_name, 'topic', false, false, false);

$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {$data = "Hello World!";}

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, $exchange_name, $routing_key);

echo '[x] Sent', $routing_key, ':', $data, "\n";

$channel->close();
$connection->close();

receive_logs.php

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange_name = 'topic_logs';
$channel->exchange_declare($exchange_name, 'topic', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$binding_keys = array_slice($argv, 1);
if (empty($binding_keys)) {file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
    exit(1);
}

foreach ($binding_keys as $binding_key) {$channel->queue_bind($queue_name, $exchange_name, $binding_key);
}

echo "[*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {echo '[x]', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($channel->is_open()) {$channel->wait();
}

$channel->close();
$connection->close();

执行 shell 命令,可自行测试:

# 所有
php receive_logs.php "#" 

# 路由到 Q1 与 Q2
php receive_logs.php "quick.orange.rabbit"

# 路由到 Q1
php receive_logs.php "lazy.orange.fox" 

# 路由到 Q2
php receive_logs.php "lazy.brown.fox*"

# 路由到 Q1 与 Q2
php emit_log.php "quick.orange.rabbit" "Route to Q1 and Q2 at the same time"

headers

headers 类型的 Exchange 不依赖于 routing key 与 binding key 的匹配规定来路由消
息,而是依据发送的音讯内容中的 headers 属性进行匹配。

在绑定 Queue 与 Exchange 时指定一组键值对;当音讯发送到 Exchange 时,RabbitMQ 会取
到该音讯的 headers(也是一个键值对的模式),比照其中的键值对是否齐全匹配 Queue 与
Exchange 绑定时指定的键值对;如果齐全匹配则音讯会路由到该 Queue,否则不会路由到该
Queue。

headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。

正文完
 0