公布订阅的利用场景通常为一个生产者、一个交换机,多个消费者各自创立队列绑定到交换机上订阅音讯并生产。

公布订阅通常配合以下设置独特应用:

  • 消费者创立的队列随机命名即可
  • 敞开队列长久化
  • 敞开音讯长久化
  • 消费者创立的队列在连贯敞开时主动删除
  • 开启主动音讯确认

装置依赖

# composer.json{    "require": {        "php-amqplib/php-amqplib": ">=3.0"    }}> composer.phar install

模式构造

在 RabbitMQ 消息传递模型中,生产者是不会向队列间接发送音讯的,只能将音讯发送给交换机。

交换机接管来自生产者的音讯,而后将它们推送到队列中。

公布订阅应用的是fanout 交换机,这个交换机非常简单,将它收到的所有音讯播送到它绑定的所有队列。

生产者

生产者连贯到RabbitMQ,发送一条音讯,而后退出。

# send.php<?phprequire_once __DIR__ . '/vendor/autoload.php';use PhpAmqpLib\Connection\AMQPStreamConnection;use PhpAmqpLib\Message\AMQPMessage;// 创立连贯$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');// 创立通道$channel = $connection->channel();// 定义一个名为 logs 的 fanout 播送交换机$channel->exchange_declare('logs', 'fanout', false, false, false);$data = implode(' ', array_slice($argv, 1));if (empty($data)) {    $data = "info: Hello World!";}// $msg = new AMQPMessage($data);// 将音讯发送到名为 logs 的 fanout 播送交换机 (音讯内容, 交换机, 路由键);$channel->basic_publish($msg, 'logs');echo ' [x] Sent ', $data, "\n";$channel->close();$connection->close();

消费者

消费者监听来自RabbitMQ的音讯,通常须要始终放弃运行状态以监听音讯。

# receive.php<?phprequire_once __DIR__ . '/vendor/autoload.php';use PhpAmqpLib\Connection\AMQPStreamConnection;// 创立连贯$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');// 创立通道$channel = $connection->channel();// 定义一个名为 logs 的 fanout 播送交换机$channel->exchange_declare('logs', 'fanout', false, false, false);// 创立一个随机命名的新队列,第三个参数为敞开队列长久化,第四个参数为当申明它的连贯敞开时队列会被主动删除list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);// 将随机命名的队列绑定到 fanout 播送交换机,生产者向交换机发送音讯将被播送到绑定的队列中$channel->queue_bind($queue_name, 'logs');echo " [*] Waiting for messages. To exit press CTRL+C\n";// 定义回调函数$callback = function ($msg) {    echo ' [x] ', $msg->body, "\n";};// 第四个参数设为true开启主动音讯确认,即投递音讯后立即标记为删除$channel->basic_consume($queue_name, '', false, true, false, false, $callback);while ($channel->is_open()) {    $channel->wait();}$channel->close();$connection->close();

运行

关上一个终端,运行消费者,将日志放到文件中:

php receive.php > logs_from_rabbit.log

关上另一个终端,运行消费者,将日志输入到终端:

php receive.php# => [*] Waiting for messages. To exit press CTRL+C# => [x] Received 'Second message..'# => [x] Received 'Fourth message....'

关上另一个终端,运行生产者:

php send.php

查看所有交换机

sudo rabbitmqctl list_exchanges

查看所有的绑定关系

sudo rabbitmqctl list_bindings# => Listing bindings ...# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []# => ...done.