共计 1823 个字符,预计需要花费 5 分钟才能阅读完成。
你或许在思考数据分发、无阻塞作业或者消息推送。或者你想要进行发布 / 订阅,异步任务,工作队列。所有的这些模式,都是消息队列的一部分。
1. 安装 RabbmitMQ
wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.8/rabbitmq-server-3.6.8-1.el6.noarch.rpm
获取 rpm , 安装的时候会发现缺少依赖
安装依赖 erlang
wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
yum install erlang
注意不要用 epel 的源直接 yum 安装,epel 源的 erlang 版本太低安装依赖 socat
yum install socat
最后
rpm -Uvh rabbitmq-server-3.6.8-1.el6.noarch.rpm
2.php 的 RabbmitMQ 库
{
“require”: {
“php-amqplib/php-amqplib”: “2.6.*”
}
}
comoser install
3.php+RabbmitMQ helloworld
//send.php
require_once __DIR__ . ‘/vendor/autoload.php’;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection(‘localhost’, 5672, ‘guest’, ‘guest’);
$channel = $connection->channel();
$channel->queue_declare(‘hello’, false, false, false, false);
$msg = new AMQPMessage(‘Hello World!’);
$channel->basic_publish($msg, ”, ‘hello’); // 发送一个消息到 hello 频道
echo ” [x] Sent ‘Hello World!’\n”;
$channel->close();
$connection->close();
//receive.php
require_once __DIR__ . ‘/vendor/autoload.php’;
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection(‘localhost’, 5672, ‘guest’, ‘guest’);
$channel = $connection->channel();
$channel->queue_declare(‘hello’, false, false, false, false);
echo ‘ [*] Waiting for messages. To exit press CTRL+C’, “\n”;
$callback = function($msg) {
echo ” [x] Received “, $msg->body, “\n”;
};
// 接收 hello 频道的消息
$channel->basic_consume(‘hello’, ”, false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
4. 运行
首先运行起来 rabbmitMQ server
service rabbmitmq-server start
挂起 receive.php 接收消息处理
php receive.php
发送消息
## 另起一个窗口执行
php send.php
以上代码就构建了一个简单的消息队列
消息从 send.php 生产 (p) 进入队列 交由 消费者 (c)
注意: 当关闭了命令行窗口 receive.php 进程将会结束,这个时候就需要借助 supservisor 来将 receive.php 脚本后台运行。这部分代码可以改写应用作简单的异步队列任务的场景,但到了高并发高可用需求下就要进行额外的一些处理。在之后的文章中我会讲到 rabbmitMQ 的竞争消费模式和 superverisor 的用法。