前言
相干概念
音讯(Message)
是指在利用间传送的数据。音讯能够非常简单,比方只蕴含文本字符串,也能够更简单,可能蕴含嵌入对象。
音讯队列(Message Queue)
是一种利用间的通信形式,音讯发送后能够立刻返回,由音讯零碎来确保音讯的牢靠传递。音讯发布者只管把音讯公布到 MQ 中而不必管谁来取,音讯使用者只管从 MQ 中取音讯而不管是谁公布的。这样发布者和使用者都不必晓得对方的存在。
RabbitMQ
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP
Advanced Message Queue,高级音讯队列协定。
Erlang
面向并发的编程语言。
RabbitMQ
特点
1.可靠性(Reliability) RabbitMQ 应用一些机制来保障可靠性,如长久化、传输确认、公布确认。
2.灵便的路由(Flexible Routing) 在音讯进入队列之前,通过 Exchange 来路由音讯的。对于典型的路由性能,RabbitMQ曾经提供了一些内置的 Exchange 来实现。针对更简单的路由性能,能够将多个Exchange 绑定在一起,也通过插件机制实现本人的 Exchange 。
3.音讯集群(Clustering) 多个 RabbitMQ 服务器能够组成一个集群,造成一个逻辑 Broker
4.高可用(Highly Available Queues) 队列能够在集群中的机器上进行镜像,使得在局部节点出问题的状况下队列依然可用。
5.多种协定(Multi-protocol) RabbitMQ 反对多种音讯队列协定,比方 STOMP、MQTT 等等。
6.多语言客户端(Many Clients) RabbitMQ 简直反对所有罕用语言,比方 Java、.NET、Ruby 等等。
7.治理界面(Management UI) RabbitMQ 提供了一个易用的用户界面,使得用户能够监控和治理音讯 Broker 的许多方面。
8.跟踪机制(Tracing) 如果音讯异样,RabbitMQ 提供了音讯跟踪机制,使用者能够找出产生了什么。
9.插件机制(Plugin System) RabbitMQ 提供了许多插件,来从多方面进行扩大,也能够编写本人的插件。
概念模型
- Message
音讯,音讯是不具名的,它由音讯头和音讯体组成。音讯体是不通明的,而音讯头则由一系列的可选属性组成,这些属性包含routing-key(路由键)、priority(绝对于其余音讯的优先权)、delivery-mode(指出该音讯可能须要持久性存储)等。 - Publisher
音讯的生产者,也是一个向交换器公布音讯的客户端应用程序。 - Exchange
交换器,用来接管生产者发送的音讯并将这些音讯路由给服务器中的队列。 - Binding
绑定,用于音讯队列和交换器之间的关联。一个绑定就是基于路由键将交换器和音讯队列连接起来的路由规定,所以能够将交换器了解成一个由绑定形成的路由表。 - Queue
音讯队列,用来保留音讯直到发送给消费者。它是音讯的容器,也是音讯的起点。一个音讯可投入一个或多个队列。音讯始终在队列外面,期待消费者连贯到这个队列将其取走。 - Connection
网络连接,比方一个TCP连贯。 - Channel
信道,多路复用连贯中的一条独立的双向数据流通道。信道是建设在实在的TCP连贯边疆虚构连贯,AMQP 命令都是通过信道收回去的,不论是公布音讯、订阅队列还是接管音讯,这些动作都是通过信道实现。因为对于操作系统来说建设和销毁 TCP 都是十分低廉的开销,所以引入了信道的概念,以复用一条 TCP 连贯。 - Consumer
音讯的消费者,示意一个从音讯队列中获得音讯的客户端应用程序。 - Virtual Host
虚拟主机,示意一批交换器、音讯队列和相干对象。虚拟主机是共享雷同的身份认证和加密环境的独立服务器域。每个 vhost 实质上就是一个 mini 版的 RabbitMQ 服务器,领有本人的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的根底,必须在连贯时指定,RabbitMQ 默认的 vhost 是 / 。 - Broker
示意音讯队列服务器实体。
装置
Windows环境
装置Erlang
下载地址: https://erlang.org/download/o...
配置环境变量 ERLANG_HOME C:\Program Files (x86)\erl5.9
增加到PATH %ERLANG_HOME%\bin;
装置RabbitMq
下载地址:https://www.rabbitmq.com/ 或 百度网盘
配置环境变量 C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.9.11
增加到PATH %RABBITMQ_SERVER%\sbin;
运行:
D:\Program Files\RabbitMQ Server\rabbitmq_server-3.9.11\sbin>rabbitmq-server.bat
拜访:
关上浏览器。拜访 http://127.0.0.1:15672
注:
- 默认账号:guest 明码:guest,仅用于浏览器拜访,API须要新建用户
- 默认浏览器拜访端口15672,API拜访端口5672
装置php的amqp扩大
1.下载地址:http://pecl.php.net/package/amqp
2.将php_amqp.dll复制到php/ext,同时在php.ini中增加如下代码:
[amqp] extension=php_amqp.dll
3.而后将rabbitmq.4.dll复制到php根目录
4.查看是否装置胜利:php -m
Docker环境
装置RabbitMq
docker pull rabbitmq # 镜像未配有控制台docker pull rabbitmq:management # 镜像配有控制台docker run --name rabbitmq -d -p 15672:15672 -p 5672:5672 rabbitmq:management
启动容器后,能够浏览器中拜访http://localhost:15672来查看控制台信息。RabbitMQ
默认的用户名:guest
,明码:guest
装置php和amqp扩大
docker pull php:7.4-fpmdocker run -d -p 9000:9000 -v /Users/ma/docker/php:/www --name phpfpm php:7.4-fpmdocker exec -it phpfpm /bin/bash#装置扩大apt-get update && apt-get install -y libfreetype6-dev librabbitmq-dev libjpeg62-turbo-dev libmcrypt-dev libpng-devpecl install amqpdocker-php-ext-enable amqp
应用
一般队列
生产者
$params = [ 'host' => '192.168.0.134', 'port' => '5672', 'vhost' => '/', 'login' => 'admin', 'password' => '123456'];$connection = new \AMQPConnection($params);if (!$connection->connect()) { echo "Cannot connect to the broker!"; exit;}$channel = new \AMQPChannel($connection);$exchangeName = 'direct_exchange';$queueName = 'direct_queue';$routeKey = 'direct_queue';$exchange = new \AMQPExchange($channel);$exchange->setName($exchangeName);$exchange->setFlags(AMQP_DURABLE);$exchange->setType(AMQP_EX_TYPE_DIRECT);// 申明交换机$exchange->declareExchange();// 创立音讯队列$queue = new \AMQPQueue($channel);$queue->setName($queueName);// 设置持久性$queue->setFlags(AMQP_DURABLE);// 申明音讯队列$queue->declareQueue();// 开启事务,确保数据真正不失落$channel->startTransaction();// 将音讯和标识绑定到交换器中$exchange->publish($message, $routeKey);$channel->commitTransaction();$connection->disconnect();var_dump("[x] Sent $message");
消费者
$params = [ 'host' => '192.168.0.134', 'port' => '5672', 'vhost' => '/', 'login' => 'admin', 'password' => '123456'];$connection = new \AMQPConnection($params);if (!$connection->connect()) { echo "Cannot connect to the broker!"; exit;}$channel = new \AMQPChannel($connection);$exchangeName = 'direct_exchange';$queueName = 'direct_queue';$routeKey = 'direct_queue';$exchange = new \AMQPExchange($channel);$exchange->setName($exchangeName);$exchange->setType(AMQP_EX_TYPE_DIRECT);$exchange->declareExchange();$queue = new \AMQPQueue($channel);$queue->setName($queueName);$queue->setFlags(AMQP_DURABLE);$queue->declareQueue();$queue->bind($exchange->getName(), $routeKey);// 接管音讯并解决回调$queue->consume(function ($envelop, $queue) { $message = $envelop->getBody(); echo $message . PHP_EOL; // ACK 告诉生产者工作实现 $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM);});//设置每次只能解决一条,防止音讯沉积,从而导致队列挂掉$channel->qos(0, 1);//敞开连贯$connection->disconnect();
提早队列
生产者
<?php//起源公众号:【码农编程进阶笔记】//header('Content-Type:text/html;charset=utf-8;'); $params = array( 'exchangeName' => 'test_cache_exchange', 'queueName' => 'test_cache_queue', 'routeKey' => 'test_cache_route',); $connectConfig = array( 'host' => 'localhost', 'port' => 5672, 'login' => 'guest', 'password' => 'guest', 'vhost' => '/'); //var_dump(extension_loaded('amqp')); 判断是否加载amqp扩大//exit();for($i=5;$i>0;$i--){ try { $conn = new AMQPConnection($connectConfig); $conn->connect(); if (!$conn->isConnected()) { //die('Conexiune esuata'); //TODO 记录日志 echo 'rabbit-mq 连贯谬误:', json_encode($connectConfig); exit(); } $channel = new AMQPChannel($conn); if (!$channel->isConnected()) { // die('Connection through channel failed'); //TODO 记录日志 echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig); exit(); } $exchange = new AMQPExchange($channel); $exchange->setFlags(AMQP_DURABLE);//长久化 $exchange->setName($params['exchangeName']); $exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $exchange->declareExchange(); //$channel->startTransaction(); //RabbitMQ不容许申明2个雷同名称、配置不同的Queue队列 $queue = new AMQPQueue($channel); $queue->setName($params['queueName'].$i); $queue->setFlags(AMQP_DURABLE); $queue->setArguments(array( 'x-dead-letter-exchange' => 'delay_exchange', 死信交换机 'x-dead-letter-routing-key' => 'delay_route', // 死信路由 'x-message-ttl' => (10000*$i), // 当下面的音讯扔到该队列中后,过了60秒,如果没有被生产,它就死了 // 在RMQ中想要应用优先级个性须要的版本为3.5+。 //'x-max-priority'=>0,//将队列申明为优先级队列,即在创立队列的时候增加参数 x-max-priority 以指定最大的优先级,值为0-255(整数)。 )); $queue->declareQueue(); //绑定队列和交换机 $queue->bind($params['exchangeName'], $params['routeKey'].$i); //$channel->commitTransaction(); } catch(Exception $e) { } // 当mandatory标记位设置为true时,如果exchange依据本身类型和音讯routeKey无奈找到一个符合条件的queue,那么会调用basic.return办法将音讯返还给生产者;当mandatory设为false时,呈现上述情景broker会间接将音讯扔掉。 //delivery_mode=2指明message为长久的 //生成音讯 echo '发送工夫:'.date("Y-m-d H:i:s", time()).PHP_EOL; echo 'i='.$i.',提早'.($i*10).'秒'.PHP_EOL; $message = json_encode(['order_id'=>time(),'i'=>$i]); $exchange->publish($message, $params['routeKey'].$i, AMQP_MANDATORY, array('delivery_mode'=>2)); $conn->disconnect(); sleep(2);}
消费者
<?php//起源公众号:【码农编程进阶笔记】//header('Content-Type:text/html;charset=utf8;'); $params = array( 'exchangeName' => 'delay_exchange', 'queueName' => 'delay_queue', 'routeKey' => 'delay_route',); $connectConfig = array( 'host' => 'localhost', 'port' => 5672, 'login' => 'guest', 'password' => 'guest', 'vhost' => '/'); //var_dump(extension_loaded('amqp'));try { $conn = new AMQPConnection($connectConfig); $conn->connect(); if (!$conn->isConnected()) { //die('Conexiune esuata'); //TODO 记录日志 echo 'rabbit-mq 连贯谬误:', json_encode($connectConfig); exit(); } $channel = new AMQPChannel($conn); if (!$channel->isConnected()) { // die('Connection through channel failed'); //TODO 记录日志 echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig); exit(); } $exchange = new AMQPExchange($channel); $exchange->setFlags(AMQP_DURABLE);//申明一个已存在的交换器的,如果不存在将抛出异样,这个个别用在consume端 $exchange->setName($params['exchangeName']?:''); $exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $exchange->declareExchange(); //$channel->startTransaction(); $queue = new AMQPQueue($channel); $queue->setName($params['queueName']?:''); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); //绑定 $queue->bind($params['exchangeName'], $params['routeKey']);} catch(Exception $e) { echo $e->getMessage(); exit();} function callback(AMQPEnvelope $message) { global $queue; if ($message) { $body = $message->getBody(); echo '接管工夫:'.date("Y-m-d H:i:s", time()). PHP_EOL; echo '接管内容:'.$body . PHP_EOL; //为了避免接收端在解决音讯时down掉,只有在音讯解决实现后才发送ack音讯 $queue->ack($message->getDeliveryTag()); } else { echo 'no message' . PHP_EOL; }} //$queue->consume('callback'); 第一种生产形式,然而会阻塞,程序始终会卡在此处 //留神:这里须要留神的是这个办法:$queue->consume,queue对象有两个办法可用于取音讯:consume和get。前者是阻塞的,无音讯时会被挂起,适宜循环中应用;后者则是非阻塞的,取音讯时有则取,无则返回false。//就是说用了consume之后,会同步阻塞,该程序常驻内存,不能用nginx,apache调用。 $action = '2';if($action == '1'){ $queue->consume('callback'); //第一种生产形式,然而会阻塞,程序始终会卡在此处}else{ //第二种生产形式,非阻塞 $start = time(); while(true) { $message = $queue->get(); if(!empty($message)) { echo '接管工夫:'.date("Y-m-d H:i:s", time()). PHP_EOL; echo '接管内容:'.$message->getBody().PHP_EOL; $queue->ack($message->getDeliveryTag()); //应答,代表该音讯曾经生产 $end = time(); echo '运行工夫:'.($end - $start).'秒'.PHP_EOL; //exit(); } else { //echo 'message not found' . PHP_EOL; } }}
守护过程
Windows端:supervisor-win
下载地址:https://pypi.org/project/supe...
装置
#重装setuptoolssudo pip3 uninstall setuptoolspip3 install setuptools --upgrade#装置supervisor-winpip install supervisor-win
配置
app/public/supervisord/supervisord.conf
[program:cancelUnpayUniOrder]directory=E:\\dev\\tp51\\appcommand=D:\\phpstudy_pro\\Extensions\\php\\php7.3.4nts\\php.exe think cancelUnpayUniOrder[program:syncWechatPayResult]directory=E:\\dev\\tp51\\appcommand=D:\\phpstudy_pro\\Extensions\\php\\php7.3.4nts\\php.exe think syncWechatPayResult[supervisord]nodaemon=truelogfile = E:\dev\tp51\app\runtime\log\supervisord.logpidfile = E:\dev\tp51\app\runtime\log\supervisord.pid[supervisorctl]
启动
start.bat
::守护过程应设置工作打算,开机时启动supervisord -c supervisord.conf
参考
RabbitMQ 中文文档
Windows装置RabbitMQ具体教程](https://zhuanlan.zhihu.com/p/...)
基于 Docker 装置 RabbitMQ
docker装置PHP扩大
PHP 应用rabbitmq 入门教程
php应用rabbitMQ
基于RabbitMQ实现提早队列--PHP版
教你如何在 Windows 下让解体的 Python 程序自重启
基于 Docker 装置 RabbitMQ
docker装置PHP扩大