关于rabbitmq:RabbitMQ-主题模式的使用

15次阅读

共计 2974 个字符,预计需要花费 8 分钟才能阅读完成。

主题模式与路由模式相似,发送到 topic 交换机的路由键必须是一个单词列表,由点号分隔。

例如:stock.usd.nysenyse.vmwquick.orange.rabbit

路由键中能够有任意多个单词,最多 255 个字节。

同时绑定键也必须应用雷同的模式。

装置依赖

# composer.json
{
    "require": {"php-amqplib/php-amqplib": ">=3.0"}
}
> composer.phar install

模式构造

topic 交换机背地的路由算法相似于 direct 替换,应用特定路由键发送的音讯将被传递到应用匹配绑定键绑定的所有队列。

绑定键有两个重要的非凡状况:

  • *(星号)能够代替一个单词。
  • \#(井号)能够代替零个或多个单词。

上图的绑定能够总结为:

  • Q1 对所有 orange 色彩的音讯都感兴趣。
  • Q2 接管所有 rabbit 的音讯,以及所有 lazy 的音讯。

示例

路由键设置为 quick.orange.rabbit 的音讯将发送给两个队列。

lazy.orange.elephant 也会发送给两个队列。

quick.orange.fox 只会进入第一个队列。

lazy.brown.fox 只会进入第二个队列。

lazy.pink.rabbit 只会进入第二个队列一次,即便命中了两次绑定规定。

quick.brown.fox 不会进入任何队列。

orangequick.orange.male.rabbit 不会进入任何队列,会被抛弃。

lazy.orange.male.rabbit 会进入第二个队列,命中了最初一个规定。

主题交换机功能强大,反对其余所有交换机的性能。

当队列与 \#(井号)绑定键绑定时将接管所有音讯,和 fanout 交换机一样。

当绑定中不应用特殊字符 *(星号)和 \#(井号)时,和 direct 交换机一样。

生产者

生产者连贯到 RabbitMQ,发送一条音讯,而后退出。

# send.php

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 创立连贯
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 创立通道
$channel = $connection->channel();

// 定义一个名为 topic_logs 的 topic 交换机
$channel->exchange_declare('topic_logs', 'topic', false, false, false);

// 从参数中获取交换机的路由键
$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';

$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {$data = "Hello World!";}

// 
$msg = new AMQPMessage($data);

// 通过名为 topic_logs 的 topic 交换机发送音讯到队列 (音讯内容, 交换机, 路由键);
$channel->basic_publish($msg, 'topic_logs', $routing_key);

echo '[x] Sent', $routing_key, ':', $data, "\n";

$channel->close();
$connection->close();

消费者

消费者监听来自 RabbitMQ 的音讯,通常须要始终放弃运行状态以监听音讯。

# receive.php

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// 创立连贯
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 创立通道
$channel = $connection->channel();

// 定义一个名为 topic_logs 的 topic 交换机
$channel->exchange_declare('topic_logs', 'topic', false, false, false);

// 创立一个随机命名的新队列,第三个参数为敞开队列长久化,第四个参数为当申明它的连贯敞开时队列会被主动删除
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// 定义所有的绑定键
$binding_keys = array_slice($argv, 1);
if (empty($binding_keys)) {file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
    exit(1);
}

// 将随机命名的队列绑定到 topic 交换机,生产者向交换机发送音讯将被放到绑定的队列中
foreach ($binding_keys as $binding_key) {$channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}

echo "[*] Waiting for logs. To exit press CTRL+C\n";

// 定义回调函数
$callback = function ($msg) {echo '[x]', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

// 第四个参数设为 true 开启主动音讯确认,即投递音讯后立即标记为删除
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($channel->is_open()) {$channel->wait();
}

$channel->close();
$connection->close();

运行

关上一个终端,运行消费者,接管所有音讯:

php receive.php "#"

关上另一个终端,运行消费者,接管 kern.* 的音讯:

php receive.php "kern.*"

关上另一个终端,运行消费者,接管 *.critical 的音讯:

php receive.php "*.critical"

关上另一个终端,运行消费者,接管 kern.**.critical 的音讯:

php receive.php "kern.*" "*.critical"

关上另一个终端,运行生产者,发送一条 kern.critical 音讯:

php send.php "kern.critical" "A critical kernel error"

查看所有交换机

sudo rabbitmqctl list_exchanges

查看所有的绑定关系

sudo rabbitmqctl list_bindings

正文完
 0