共计 9508 个字符,预计需要花费 24 分钟才能阅读完成。
前言
相干概念
音讯(Message)
是指在利用间传送的数据。音讯能够非常简单,比方只蕴含文本字符串,也能够更简单,可能蕴含嵌入对象。
音讯队列(Message Queue)
是一种利用间的通信形式,音讯发送后能够立刻返回,由音讯零碎来确保音讯的牢靠传递。音讯发布者只管把音讯公布到 MQ 中而不必管谁来取,音讯使用者只管从 MQ 中取音讯而不管是谁公布的。这样发布者和使用者都不必晓得对方的存在。
RabbitMQ
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP
Advanced Message Queue,高级音讯队列协定。
Erlang
面向并发的编程语言。
RabbitMQ
特点
1. 可靠性(Reliability)RabbitMQ 应用一些机制来保障可靠性,如长久化、传输确认、公布确认。
2. 灵便的路由(Flexible Routing)在音讯进入队列之前,通过 Exchange 来路由音讯的。对于典型的路由性能,RabbitMQ 曾经提供了一些内置的 Exchange 来实现。针对更简单的路由性能,能够将多个 Exchange 绑定在一起,也通过插件机制实现本人的 Exchange。
3. 音讯集群(Clustering)多个 RabbitMQ 服务器能够组成一个集群,造成一个逻辑 Broker
4. 高可用(Highly Available Queues)队列能够在集群中的机器上进行镜像,使得在局部节点出问题的状况下队列依然可用。
5. 多种协定(Multi-protocol)RabbitMQ 反对多种音讯队列协定,比方 STOMP、MQTT 等等。
6. 多语言客户端(Many Clients)RabbitMQ 简直反对所有罕用语言,比方 Java、.NET、Ruby 等等。
7. 治理界面(Management UI)RabbitMQ 提供了一个易用的用户界面,使得用户能够监控和治理音讯 Broker 的许多方面。
8. 跟踪机制(Tracing)如果音讯异样,RabbitMQ 提供了音讯跟踪机制,使用者能够找出产生了什么。
9. 插件机制(Plugin System)RabbitMQ 提供了许多插件,来从多方面进行扩大,也能够编写本人的插件。
概念模型
- Message
音讯,音讯是不具名的,它由音讯头和音讯体组成。音讯体是不通明的,而音讯头则由一系列的可选属性组成,这些属性包含 routing-key(路由键)、priority(绝对于其余音讯的优先权)、delivery-mode(指出该音讯可能须要持久性存储)等。 - Publisher
音讯的生产者,也是一个向交换器公布音讯的客户端应用程序。 - Exchange
交换器,用来接管生产者发送的音讯并将这些音讯路由给服务器中的队列。 - Binding
绑定,用于音讯队列和交换器之间的关联。一个绑定就是基于路由键将交换器和音讯队列连接起来的路由规定,所以能够将交换器了解成一个由绑定形成的路由表。 - Queue
音讯队列,用来保留音讯直到发送给消费者。它是音讯的容器,也是音讯的起点。一个音讯可投入一个或多个队列。音讯始终在队列外面,期待消费者连贯到这个队列将其取走。 - Connection
网络连接,比方一个 TCP 连贯。 - Channel
信道,多路复用连贯中的一条独立的双向数据流通道。信道是建设在实在的 TCP 连贯边疆虚构连贯,AMQP 命令都是通过信道收回去的,不论是公布音讯、订阅队列还是接管音讯,这些动作都是通过信道实现。因为对于操作系统来说建设和销毁 TCP 都是十分低廉的开销,所以引入了信道的概念,以复用一条 TCP 连贯。 - Consumer
音讯的消费者,示意一个从音讯队列中获得音讯的客户端应用程序。 - Virtual Host
虚拟主机,示意一批交换器、音讯队列和相干对象。虚拟主机是共享雷同的身份认证和加密环境的独立服务器域。每个 vhost 实质上就是一个 mini 版的 RabbitMQ 服务器,领有本人的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的根底,必须在连贯时指定,RabbitMQ 默认的 vhost 是 /。 - Broker
示意音讯队列服务器实体。
装置
Windows 环境
装置 Erlang
下载地址:https://erlang.org/download/o…
配置环境变量 ERLANG_HOME C:\Program Files (x86)\erl5.9
增加到 PATH %ERLANG_HOME%\bin;
装置 RabbitMq
下载地址:https://www.rabbitmq.com/ 或 百度网盘
配置环境变量 C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.9.11
增加到 PATH %RABBITMQ_SERVER%\sbin;
运行:
D:\Program Files\RabbitMQ Server\rabbitmq_server-3.9.11\sbin>rabbitmq-server.bat
拜访:
关上浏览器。拜访 http://127.0.0.1:15672
注:
- 默认账号:guest 明码:guest,仅用于浏览器拜访,API 须要新建用户
- 默认浏览器拜访端口 15672,API 拜访端口 5672
装置 php 的 amqp 扩大
1. 下载地址:http://pecl.php.net/package/amqp
2. 将 php_amqp.dll 复制到 php/ext,同时在 php.ini 中增加如下代码:
[amqp]
extension=php_amqp.dll
3. 而后将 rabbitmq.4.dll 复制到 php 根目录
4. 查看是否装置胜利:php -m
Docker 环境
装置 RabbitMq
docker pull rabbitmq # 镜像未配有控制台
docker pull rabbitmq:management # 镜像配有控制台
docker run --name rabbitmq -d -p 15672:15672 -p 5672:5672 rabbitmq:management
启动容器后,能够浏览器中拜访 http://localhost:15672 来查看控制台信息。RabbitMQ
默认的用户名:guest
,明码:guest
装置 php 和 amqp 扩大
docker pull php:7.4-fpm
docker run -d -p 9000:9000 -v /Users/ma/docker/php:/www --name phpfpm php:7.4-fpm
docker exec -it phpfpm /bin/bash
#装置扩大
apt-get update && apt-get install -y libfreetype6-dev librabbitmq-dev libjpeg62-turbo-dev libmcrypt-dev libpng-dev
pecl install amqp
docker-php-ext-enable amqp
应用
一般队列
生产者
$params = [
'host' => '192.168.0.134',
'port' => '5672',
'vhost' => '/',
'login' => 'admin',
'password' => '123456'
];
$connection = new \AMQPConnection($params);
if (!$connection->connect()) {
echo "Cannot connect to the broker!";
exit;
}
$channel = new \AMQPChannel($connection);
$exchangeName = 'direct_exchange';
$queueName = 'direct_queue';
$routeKey = 'direct_queue';
$exchange = new \AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setFlags(AMQP_DURABLE);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
// 申明交换机
$exchange->declareExchange();
// 创立音讯队列
$queue = new \AMQPQueue($channel);
$queue->setName($queueName);
// 设置持久性
$queue->setFlags(AMQP_DURABLE);
// 申明音讯队列
$queue->declareQueue();
// 开启事务, 确保数据真正不失落
$channel->startTransaction();
// 将音讯和标识绑定到交换器中
$exchange->publish($message, $routeKey);
$channel->commitTransaction();
$connection->disconnect();
var_dump("[x] Sent $message");
消费者
$params = [
'host' => '192.168.0.134',
'port' => '5672',
'vhost' => '/',
'login' => 'admin',
'password' => '123456'
];
$connection = new \AMQPConnection($params);
if (!$connection->connect()) {
echo "Cannot connect to the broker!";
exit;
}
$channel = new \AMQPChannel($connection);
$exchangeName = 'direct_exchange';
$queueName = 'direct_queue';
$routeKey = 'direct_queue';
$exchange = new \AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declareExchange();
$queue = new \AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
$queue->bind($exchange->getName(), $routeKey);
// 接管音讯并解决回调
$queue->consume(function ($envelop, $queue) {$message = $envelop->getBody();
echo $message . PHP_EOL;
// ACK 告诉生产者工作实现
$queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM);
});
// 设置每次只能解决一条, 防止音讯沉积, 从而导致队列挂掉
$channel->qos(0, 1);
// 敞开连贯
$connection->disconnect();
提早队列
生产者
<?php
// 起源公众号:【码农编程进阶笔记】//header('Content-Type:text/html;charset=utf-8;');
$params = array(
'exchangeName' => 'test_cache_exchange',
'queueName' => 'test_cache_queue',
'routeKey' => 'test_cache_route',
);
$connectConfig = array(
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
);
//var_dump(extension_loaded('amqp')); 判断是否加载 amqp 扩大
//exit();
for($i=5;$i>0;$i--){
try {$conn = new AMQPConnection($connectConfig);
$conn->connect();
if (!$conn->isConnected()) {//die('Conexiune esuata');
//TODO 记录日志
echo 'rabbit-mq 连贯谬误:', json_encode($connectConfig);
exit();}
$channel = new AMQPChannel($conn);
if (!$channel->isConnected()) {// die('Connection through channel failed');
//TODO 记录日志
echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
exit();}
$exchange = new AMQPExchange($channel);
$exchange->setFlags(AMQP_DURABLE);// 长久化
$exchange->setName($params['exchangeName']);
$exchange->setType(AMQP_EX_TYPE_DIRECT); //direct 类型
$exchange->declareExchange();
//$channel->startTransaction();
//RabbitMQ 不容许申明 2 个雷同名称、配置不同的 Queue 队列
$queue = new AMQPQueue($channel);
$queue->setName($params['queueName'].$i);
$queue->setFlags(AMQP_DURABLE);
$queue->setArguments(array(
'x-dead-letter-exchange' => 'delay_exchange', 死信交换机
'x-dead-letter-routing-key' => 'delay_route', // 死信路由
'x-message-ttl' => (10000*$i), // 当下面的音讯扔到该队列中后,过了 60 秒,如果没有被生产,它就死了
// 在 RMQ 中想要应用优先级个性须要的版本为 3.5+。//'x-max-priority'=>0,// 将队列申明为优先级队列,即在创立队列的时候增加参数 x-max-priority 以指定最大的优先级,值为 0 -255(整数)。));
$queue->declareQueue();
// 绑定队列和交换机
$queue->bind($params['exchangeName'], $params['routeKey'].$i);
//$channel->commitTransaction();} catch(Exception $e) { }
// 当 mandatory 标记位设置为 true 时,如果 exchange 依据本身类型和音讯 routeKey 无奈找到一个符合条件的 queue,那么会调用 basic.return 办法将音讯返还给生产者;当 mandatory 设为 false 时,呈现上述情景 broker 会间接将音讯扔掉。//delivery_mode= 2 指明 message 为长久的
// 生成音讯
echo '发送工夫:'.date("Y-m-d H:i:s", time()).PHP_EOL;
echo 'i='.$i.',提早'.($i*10).'秒'.PHP_EOL;
$message = json_encode(['order_id'=>time(),'i'=>$i]);
$exchange->publish($message, $params['routeKey'].$i, AMQP_MANDATORY, array('delivery_mode'=>2));
$conn->disconnect();
sleep(2);
}
消费者
<?php
// 起源公众号:【码农编程进阶笔记】//header('Content-Type:text/html;charset=utf8;');
$params = array(
'exchangeName' => 'delay_exchange',
'queueName' => 'delay_queue',
'routeKey' => 'delay_route',
);
$connectConfig = array(
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
);
//var_dump(extension_loaded('amqp'));
try {$conn = new AMQPConnection($connectConfig);
$conn->connect();
if (!$conn->isConnected()) {//die('Conexiune esuata');
//TODO 记录日志
echo 'rabbit-mq 连贯谬误:', json_encode($connectConfig);
exit();}
$channel = new AMQPChannel($conn);
if (!$channel->isConnected()) {// die('Connection through channel failed');
//TODO 记录日志
echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
exit();}
$exchange = new AMQPExchange($channel);
$exchange->setFlags(AMQP_DURABLE);// 申明一个已存在的交换器的,如果不存在将抛出异样,这个个别用在 consume 端
$exchange->setName($params['exchangeName']?:'');
$exchange->setType(AMQP_EX_TYPE_DIRECT); //direct 类型
$exchange->declareExchange();
//$channel->startTransaction();
$queue = new AMQPQueue($channel);
$queue->setName($params['queueName']?:'');
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
// 绑定
$queue->bind($params['exchangeName'], $params['routeKey']);
} catch(Exception $e) {echo $e->getMessage();
exit();}
function callback(AMQPEnvelope $message) {
global $queue;
if ($message) {$body = $message->getBody();
echo '接管工夫:'.date("Y-m-d H:i:s", time()). PHP_EOL;
echo '接管内容:'.$body . PHP_EOL;
// 为了避免接收端在解决音讯时 down 掉,只有在音讯解决实现后才发送 ack 音讯
$queue->ack($message->getDeliveryTag());
} else {echo 'no message' . PHP_EOL;}
}
//$queue->consume('callback'); 第一种生产形式, 然而会阻塞, 程序始终会卡在此处
// 留神:这里须要留神的是这个办法:$queue->consume,queue 对象有两个办法可用于取音讯:consume 和 get。前者是阻塞的,无音讯时会被挂起,适宜循环中应用;后者则是非阻塞的,取音讯时有则取,无则返回 false。// 就是说用了 consume 之后,会同步阻塞,该程序常驻内存,不能用 nginx,apache 调用。$action = '2';
if($action == '1'){$queue->consume('callback'); // 第一种生产形式, 然而会阻塞, 程序始终会卡在此处
}else{
// 第二种生产形式, 非阻塞
$start = time();
while(true)
{$message = $queue->get();
if(!empty($message))
{echo '接管工夫:'.date("Y-m-d H:i:s", time()). PHP_EOL;
echo '接管内容:'.$message->getBody().PHP_EOL;
$queue->ack($message->getDeliveryTag()); // 应答,代表该音讯曾经生产
$end = time();
echo '运行工夫:'.($end - $start).'秒'.PHP_EOL;
//exit();}
else
{//echo 'message not found' . PHP_EOL;}
}
}
守护过程
Windows 端:supervisor-win
下载地址:https://pypi.org/project/supe…
装置
# 重装 setuptools
sudo pip3 uninstall setuptools
pip3 install setuptools --upgrade
#装置 supervisor-win
pip install supervisor-win
配置
app/public/supervisord/supervisord.conf
[program:cancelUnpayUniOrder]
directory=E:\\dev\\tp51\\app
command=D:\\phpstudy_pro\\Extensions\\php\\php7.3.4nts\\php.exe think cancelUnpayUniOrder
[program:syncWechatPayResult]
directory=E:\\dev\\tp51\\app
command=D:\\phpstudy_pro\\Extensions\\php\\php7.3.4nts\\php.exe think syncWechatPayResult
[supervisord]
nodaemon=true
logfile = E:\dev\tp51\app\runtime\log\supervisord.log
pidfile = E:\dev\tp51\app\runtime\log\supervisord.pid
[supervisorctl]
启动
start.bat
:: 守护过程应设置工作打算,开机时启动
supervisord -c supervisord.conf
参考
RabbitMQ 中文文档
Windows 装置 RabbitMQ 具体教程 ](https://zhuanlan.zhihu.com/p/…)
基于 Docker 装置 RabbitMQ
docker 装置 PHP 扩大
PHP 应用 rabbitmq 入门教程
php 应用 rabbitMQ
基于 RabbitMQ 实现提早队列 –PHP 版
教你如何在 Windows 下让解体的 Python 程序自重启
基于 Docker 装置 RabbitMQ
docker 装置 PHP 扩大