前言
相干概念
音讯(Message)
是指在利用间传送的数据。音讯能够非常简单,比方只蕴含文本字符串,也能够更简单,可能蕴含嵌入对象。
音讯队列(Message Queue)
是一种利用间的通信形式,音讯发送后能够立刻返回,由音讯零碎来确保音讯的牢靠传递。音讯发布者只管把音讯公布到 MQ 中而不必管谁来取,音讯使用者只管从 MQ 中取音讯而不管是谁公布的。这样发布者和使用者都不必晓得对方的存在。
RabbitMQ
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP
Advanced Message Queue,高级音讯队列协定。
Erlang
面向并发的编程语言。
RabbitMQ
特点
1. 可靠性(Reliability)RabbitMQ 应用一些机制来保障可靠性,如长久化、传输确认、公布确认。
2. 灵便的路由(Flexible Routing)在音讯进入队列之前,通过 Exchange 来路由音讯的。对于典型的路由性能,RabbitMQ 曾经提供了一些内置的 Exchange 来实现。针对更简单的路由性能,能够将多个 Exchange 绑定在一起,也通过插件机制实现本人的 Exchange。
3. 音讯集群(Clustering)多个 RabbitMQ 服务器能够组成一个集群,造成一个逻辑 Broker
4. 高可用(Highly Available Queues)队列能够在集群中的机器上进行镜像,使得在局部节点出问题的状况下队列依然可用。
5. 多种协定(Multi-protocol)RabbitMQ 反对多种音讯队列协定,比方 STOMP、MQTT 等等。
6. 多语言客户端(Many Clients)RabbitMQ 简直反对所有罕用语言,比方 Java、.NET、Ruby 等等。
7. 治理界面(Management UI)RabbitMQ 提供了一个易用的用户界面,使得用户能够监控和治理音讯 Broker 的许多方面。
8. 跟踪机制(Tracing)如果音讯异样,RabbitMQ 提供了音讯跟踪机制,使用者能够找出产生了什么。
9. 插件机制(Plugin System)RabbitMQ 提供了许多插件,来从多方面进行扩大,也能够编写本人的插件。
概念模型
- Message
音讯,音讯是不具名的,它由音讯头和音讯体组成。音讯体是不通明的,而音讯头则由一系列的可选属性组成,这些属性包含 routing-key(路由键)、priority(绝对于其余音讯的优先权)、delivery-mode(指出该音讯可能须要持久性存储)等。 - Publisher
音讯的生产者,也是一个向交换器公布音讯的客户端应用程序。 -
Exchange
交换器,用来接管生产者发送的音讯并将这些音讯路由给服务器中的队列。- Direct Exchange:直连交换机,依据 Routing Key(路由键)进行投递到不同队列。
- Fanout Exchange:扇形交换机,采纳播送模式,依据绑定的交换机,路由到与之对应的所有队列。
- Topic Exchange:主题交换机,对路由键进行模式匹配后进行投递,符号 #示意一个或多个词,* 示意一个词。
- Header Exchange:头交换机,不解决路由键。而是依据发送的音讯内容中的 headers 属性进行匹配。
-
Binding
绑定,用于音讯队列和交换器之间的关联。一个绑定就是基于路由键将交换器和音讯队列连接起来的路由规定,所以能够将交换器了解成一个由绑定形成的路由表。$queue->bind($exchange->getName(), $routeKey);
- Queue
音讯队列,用来保留音讯直到发送给消费者。它是音讯的容器,也是音讯的起点。一个音讯可投入一个或多个队列。音讯始终在队列外面,期待消费者连贯到这个队列将其取走。 - Connection
网络连接,比方一个 TCP 连贯。 - Channel
信道,多路复用连贯中的一条独立的双向数据流通道。信道是建设在实在的 TCP 连贯边疆虚构连贯,AMQP 命令都是通过信道收回去的,不论是公布音讯、订阅队列还是接管音讯,这些动作都是通过信道实现。因为对于操作系统来说建设和销毁 TCP 都是十分低廉的开销,所以引入了信道的概念,以复用一条 TCP 连贯。 - Consumer
音讯的消费者,示意一个从音讯队列中获得音讯的客户端应用程序。 - Virtual Host
虚拟主机,示意一批交换器、音讯队列和相干对象。虚拟主机是共享雷同的身份认证和加密环境的独立服务器域。每个 vhost 实质上就是一个 mini 版的 RabbitMQ 服务器,领有本人的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的根底,必须在连贯时指定,RabbitMQ 默认的 vhost 是 /。 - Broker
示意音讯队列服务器实体。
装置
Windows 环境
装置 Erlang
下载地址:https://erlang.org/download/otp_win64_25.0.exe
配置环境变量 ERLANG_HOME C:\Program Files (x86)\erl5.9
增加到 PATH %ERLANG_HOME%\bin;
装置 RabbitMq
下载地址:https://www.rabbitmq.com/ 或 百度网盘
配置环境变量 C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.9.11
增加到 PATH %RABBITMQ_SERVER%\sbin;
运行:
D:\Program Files\RabbitMQ Server\rabbitmq_server-3.9.11\sbin>rabbitmq-server.bat
拜访:
关上浏览器。拜访 http://127.0.0.1:15672
注:
- 默认账号:guest 明码:guest,仅用于浏览器拜访,API 须要新建用户
- 默认浏览器拜访端口 15672,API 拜访端口 5672
装置 php 的 amqp 扩大
1. 下载地址:http://pecl.php.net/package/amqp
2. 将 php_amqp.dll 复制到 php/ext,同时在 php.ini 中增加如下代码:
[amqp]
extension=php_amqp.dll
3. 而后将 rabbitmq.4.dll 复制到 php 根目录
4. 查看是否装置胜利:php -m
Docker 环境
装置 RabbitMq
docker pull rabbitmq # 镜像未配有控制台
docker pull rabbitmq:management # 镜像配有控制台
docker run --name rabbitmq -d -p 15672:15672 -p 5672:5672 rabbitmq:management
启动容器后,能够浏览器中拜访 http://localhost:15672 来查看控制台信息。RabbitMQ
默认的用户名:guest
,明码:guest
装置 php 和 amqp 扩大
docker pull php:7.4-fpm
docker run -d -p 9000:9000 -v /Users/ma/docker/php:/www --name phpfpm php:7.4-fpm
docker exec -it phpfpm /bin/bash
#装置扩大
apt-get update && apt-get install -y libfreetype6-dev librabbitmq-dev libjpeg62-turbo-dev libmcrypt-dev libpng-dev
pecl install amqp
docker-php-ext-enable amqp
应用
一般队列
生产者
$params = [
'host' => '192.168.0.134',
'port' => '5672',
'vhost' => '/',
'login' => 'admin',
'password' => '123456'
];
$connection = new \AMQPConnection($params);
if (!$connection->connect()) {
echo "Cannot connect to the broker!";
exit;
}
$channel = new \AMQPChannel($connection);
$exchangeName = 'direct_exchange';
$queueName = 'direct_queue';
$routeKey = 'direct_queue';
$exchange = new \AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setFlags(AMQP_DURABLE);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
// 申明交换机
$exchange->declareExchange();
// 创立音讯队列
$queue = new \AMQPQueue($channel);
$queue->setName($queueName);
// 设置持久性
$queue->setFlags(AMQP_DURABLE);
// 申明音讯队列
$queue->declareQueue();
// 开启事务, 确保数据真正不失落
$channel->startTransaction();
// 将音讯和标识绑定到交换器中
$exchange->publish($message, $routeKey);
$channel->commitTransaction();
$connection->disconnect();
var_dump("[x] Sent $message");
消费者
$params = [
'host' => '192.168.0.134',
'port' => '5672',
'vhost' => '/',
'login' => 'admin',
'password' => '123456'
];
$connection = new \AMQPConnection($params);
if (!$connection->connect()) {
echo "Cannot connect to the broker!";
exit;
}
$channel = new \AMQPChannel($connection);
$exchangeName = 'direct_exchange';
$queueName = 'direct_queue';
$routeKey = 'direct_queue';
$exchange = new \AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declareExchange();
$queue = new \AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
$queue->bind($exchange->getName(), $routeKey);
// 接管音讯并解决回调
$queue->consume(function ($envelop, $queue) {$message = $envelop->getBody();
echo $message . PHP_EOL;
// ACK 告诉生产者工作实现
$queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM);
});
// 设置每次只能解决一条, 防止音讯沉积, 从而导致队列挂掉
$channel->qos(0, 1);
// 敞开连贯
$connection->disconnect();
提早队列
生产者
<?php
// 起源公众号:【码农编程进阶笔记】//header('Content-Type:text/html;charset=utf-8;');
$params = array(
'exchangeName' => 'test_cache_exchange',
'queueName' => 'test_cache_queue',
'routeKey' => 'test_cache_route',
);
$connectConfig = array(
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
);
//var_dump(extension_loaded('amqp')); 判断是否加载 amqp 扩大
//exit();
for($i=5;$i>0;$i--){
try {$conn = new AMQPConnection($connectConfig);
$conn->connect();
if (!$conn->isConnected()) {//die('Conexiune esuata');
//TODO 记录日志
echo 'rabbit-mq 连贯谬误:', json_encode($connectConfig);
exit();}
$channel = new AMQPChannel($conn);
if (!$channel->isConnected()) {// die('Connection through channel failed');
//TODO 记录日志
echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
exit();}
$exchange = new AMQPExchange($channel);
$exchange->setFlags(AMQP_DURABLE);// 长久化
$exchange->setName($params['exchangeName']);
$exchange->setType(AMQP_EX_TYPE_DIRECT); //direct 类型
$exchange->declareExchange();
//$channel->startTransaction();
//RabbitMQ 不容许申明 2 个雷同名称、配置不同的 Queue 队列
$queue = new AMQPQueue($channel);
$queue->setName($params['queueName'].$i);
$queue->setFlags(AMQP_DURABLE);
$queue->setArguments(array(
'x-dead-letter-exchange' => 'delay_exchange', 死信交换机
'x-dead-letter-routing-key' => 'delay_route', // 死信路由
'x-message-ttl' => (10000*$i), // 当下面的音讯扔到该队列中后,过了 60 秒,如果没有被生产,它就死了
// 在 RMQ 中想要应用优先级个性须要的版本为 3.5+。//'x-max-priority'=>0,// 将队列申明为优先级队列,即在创立队列的时候增加参数 x-max-priority 以指定最大的优先级,值为 0 -255(整数)。));
$queue->declareQueue();
// 绑定队列和交换机
$queue->bind($params['exchangeName'], $params['routeKey'].$i);
//$channel->commitTransaction();} catch(Exception $e) { }
// 当 mandatory 标记位设置为 true 时,如果 exchange 依据本身类型和音讯 routeKey 无奈找到一个符合条件的 queue,那么会调用 basic.return 办法将音讯返还给生产者;当 mandatory 设为 false 时,呈现上述情景 broker 会间接将音讯扔掉。//delivery_mode= 2 指明 message 为长久的
// 生成音讯
echo '发送工夫:'.date("Y-m-d H:i:s", time()).PHP_EOL;
echo 'i='.$i.',提早'.($i*10).'秒'.PHP_EOL;
$message = json_encode(['order_id'=>time(),'i'=>$i]);
$exchange->publish($message, $params['routeKey'].$i, AMQP_MANDATORY, array('delivery_mode'=>2));
$conn->disconnect();
sleep(2);
}
消费者
<?php
// 起源公众号:【码农编程进阶笔记】//header('Content-Type:text/html;charset=utf8;');
$params = array(
'exchangeName' => 'delay_exchange',
'queueName' => 'delay_queue',
'routeKey' => 'delay_route',
);
$connectConfig = array(
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
);
//var_dump(extension_loaded('amqp'));
try {$conn = new AMQPConnection($connectConfig);
$conn->connect();
if (!$conn->isConnected()) {//die('Conexiune esuata');
//TODO 记录日志
echo 'rabbit-mq 连贯谬误:', json_encode($connectConfig);
exit();}
$channel = new AMQPChannel($conn);
if (!$channel->isConnected()) {// die('Connection through channel failed');
//TODO 记录日志
echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
exit();}
$exchange = new AMQPExchange($channel);
$exchange->setFlags(AMQP_DURABLE);// 申明一个已存在的交换器的,如果不存在将抛出异样,这个个别用在 consume 端
$exchange->setName($params['exchangeName']?:'');
$exchange->setType(AMQP_EX_TYPE_DIRECT); //direct 类型
$exchange->declareExchange();
//$channel->startTransaction();
$queue = new AMQPQueue($channel);
$queue->setName($params['queueName']?:'');
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
// 绑定
$queue->bind($params['exchangeName'], $params['routeKey']);
} catch(Exception $e) {echo $e->getMessage();
exit();}
function callback(AMQPEnvelope $message) {
global $queue;
if ($message) {$body = $message->getBody();
echo '接管工夫:'.date("Y-m-d H:i:s", time()). PHP_EOL;
echo '接管内容:'.$body . PHP_EOL;
// 为了避免接收端在解决音讯时 down 掉,只有在音讯解决实现后才发送 ack 音讯
$queue->ack($message->getDeliveryTag());
} else {echo 'no message' . PHP_EOL;}
}
//$queue->consume('callback'); 第一种生产形式, 然而会阻塞, 程序始终会卡在此处
// 留神:这里须要留神的是这个办法:$queue->consume,queue 对象有两个办法可用于取音讯:consume 和 get。前者是阻塞的,无音讯时会被挂起,适宜循环中应用;后者则是非阻塞的,取音讯时有则取,无则返回 false。// 就是说用了 consume 之后,会同步阻塞,该程序常驻内存,不能用 nginx,apache 调用。$action = '2';
if($action == '1'){$queue->consume('callback'); // 第一种生产形式, 然而会阻塞, 程序始终会卡在此处
}else{
// 第二种生产形式, 非阻塞
$start = time();
while(true)
{$message = $queue->get();
if(!empty($message))
{echo '接管工夫:'.date("Y-m-d H:i:s", time()). PHP_EOL;
echo '接管内容:'.$message->getBody().PHP_EOL;
$queue->ack($message->getDeliveryTag()); // 应答,代表该音讯曾经生产
$end = time();
echo '运行工夫:'.($end - $start).'秒'.PHP_EOL;
//exit();}
else
{//echo 'message not found' . PHP_EOL;}
}
}
守护过程
Windows 端:supervisor-win
下载地址:https://pypi.org/project/supervisor-win/
装置
# 重装 setuptools
sudo pip3 uninstall setuptools
pip3 install setuptools --upgrade
#装置 supervisor-win
pip install supervisor-win
配置
app/public/supervisord/supervisord.conf
[program:cancelUnpayUniOrder]
directory=E:\\dev\\tp51\\app
command=D:\\phpstudy_pro\\Extensions\\php\\php7.3.4nts\\php.exe think cancelUnpayUniOrder
[program:syncWechatPayResult]
directory=E:\\dev\\tp51\\app
command=D:\\phpstudy_pro\\Extensions\\php\\php7.3.4nts\\php.exe think syncWechatPayResult
[supervisord]
nodaemon=true
logfile = E:\dev\tp51\app\runtime\log\supervisord.log
pidfile = E:\dev\tp51\app\runtime\log\supervisord.pid
[supervisorctl]
启动
start.bat
:: 守护过程应设置工作打算,开机时启动
supervisord -c supervisord.conf
title: PHP-RabbitMQ- 2 高级个性
date: 2022-11-06
categories:
- PHP
- 中间件
tags: - RabbitMQ
-
PHP
常见问题
- 生产者音讯送达失败
- 反复生产
- 音讯没有胜利生产
- 音讯 N 年后始终没有被销毁
- 高并发
- 音讯失落后无奈找回
PhpAmqpLib
php-amqplib 是 Advanced Message Queuing Protocol (AMQP)的一个 PHP 开源实现。
高级音讯队列协定(AMQP)是一个异步消息传递所应用的应用层协定标准。作为线路层协定,而不是 API(例如 JMS),AMQP 客户端可能忽视音讯的起源任意发送和承受信息。
装置
composer require php-amqplib/php-amqplib
或间接援用
"autoload": {
"psr-4": {"PhpAmqpLib\\": "PhpAmqpLib/"}
},
应用
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
示例下载:php-amqplib-master
链接: https://pan.baidu.com/s/1tk26bbQyL8frNPZRf8EJug 提取码: d5ad
高级个性
- ACK(confirm 机制)
- 如何保障音讯百分百投递胜利
- 幂等性
- return 机制
- 限流
- 重回队列
- TTL
- 死信队列
1 ACK(confirm 机制)
概念
消费者实现一个工作可能须要一段时间,如果其中一个消费者解决一个长的工作并仅只实现了局部忽然它挂掉了,会导致音讯失落。RabbitMQ 一旦向消费者传递了一条音讯,便立刻将该音讯标记为删除。在这种状况下,忽然有个消费者挂掉了,咱们将失落正在解决的音讯。以及后续发送给该生产这的音讯,因为它无奈接管到。
为了保障音讯在发送过程中不失落,rabbitmq 引入音讯应答机制,音讯应答就是: 消费者在接管到音讯并且解决该音讯之后,通知 rabbitmq 它曾经解决了,rabbitmq 能够把该音讯删除了。
自动应答
音讯发送后立刻被认为曾经传送胜利
弊病:如果音讯在接管到之前,消费者那边呈现连贯或者 信道 敞开,那么音讯就丢 失了, 当然另一方面这种模式消费者那边能够传递过载的音讯,没有对传递的音讯数量进行限度,当然这样有可能使得消费者这边因为接管太多还来不及解决的音讯,导致这些音讯的积压,最终 使得内存耗尽,最终这些消费者线程被操作系统杀死
所以在理论开发中咱们应抉择 手动应答
音讯应答的办法
Channel.basicAck():用于必定确认
RabbitMQ 已晓得该音讯并且胜利的解决音讯,能够将其抛弃了
Channel.basicNack():用于否定确认
Channel.basicReject():用于否定确认(举荐应用)
实现
- 在 channel 上开启确认模式:
$channel->confirm_select();
- 在 channel 上增加监听:
$channel->wait_for_pending_acks();
监听胜利和失败的返回后果,依据具体的后果对音讯进行从新发送、或记录日志等后续解决。
代码
生产者
<?php
require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// 申明连贯参数
$config = [
'host' => '192.168.31.51',
'vhost' => '/',
'port' => 5672,
'login' => 'test',
'password' => '123456'
];
$exchange = 'ack_exchange';
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['login'], $config['password'], $config['vhost']);
$channel = $connection->channel();
// 推送胜利
$channel->set_ack_handler(function (AMQPMessage $message) {echo "set_ack_handler:" . $message->body . PHP_EOL;}
);
// 推送失败
$channel->set_nack_handler(function (AMQPMessage $message) {echo "set_nack_handler:" . $message->body . PHP_EOL;}
);
$channel->confirm_select();
$channel->exchange_declare($exchange, 'fanout', false, false, true);
$msg = new AMQPMessage('xxx', array('content_type' => 'text/plain'));
$channel->basic_publish($msg, $exchange);
$channel->wait_for_pending_acks();
$channel->close();
$connection->close();
消费者
<?php
require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$config = [
'host'=>'192.168.31.51',
'vhost'=>'/',
'port'=>5672,
'login'=>'test',
'password'=>'123456'
];
$exchange = 'ack_exchange';
$queue = 'ack_queue';
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['login'], $config['password'], $config['vhost']);
$channel = $connection->channel();
$channel->queue_declare($queue, false, true, false, false);
$channel->exchange_declare($exchange, AMQP_EX_TYPE_FANOUT, false, false, true);
$channel->queue_bind($queue, $exchange);
function process_message($message)
{
echo "胜利收到音讯,音讯内容为:".$message->body ;
// 生产完音讯之后进行应答,通知 rabbit 我曾经生产了,能够发送下一组了
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
}
$channel->basic_consume($queue, '', false, false, false, false,'process_message');
while ($channel->is_consuming()) {
// After 10 seconds there will be a timeout exception.
$channel->wait(null, false, 30000);
}
2 确保百分百投递胜利
计划一 音讯落库打标(罕用)
将音讯长久化到 DB 并设置状态值,收到 Consumer 的应答就扭转以后记录的状态。
再轮询从新发送没接管到应答的音讯,留神这里要设置重试次数。
实现流程
下单胜利:
-
对订单数据入 BIZ DB 订单库,并对因而生成的业务音讯入 MSG DB 音讯库(状态 0)
注:肯定要保障音讯都存储胜利了,生产端再进行音讯发送。如果失败了就进行疾速失败机制
- 发送音讯到 MQ 服务上
-
生产端有一个 Confirm Listener,异步监听 Broker 回送的响应,从而判断音讯是否投递胜利
- 如果胜利, 去数据库查问该音讯, 并将音讯状态更新为 1
- 如果出现意外状况,消费者未接管到或者 Listener 接管确认时产生网络闪断,导致生产端的 Listener 就永远收不到这条音讯的 confirm 应答了,也就是说这条音讯的状态就始终为 0 了,这时候就须要用到咱们的分布式定时工作来从 MSG 数据库抓取那些超时了还未被生产的音讯,从新发送一遍。
- 此时咱们须要设置一个规定,比如说音讯在入库时候设置一个临界值 timeout,5 分钟之后如果还是 0 的状态那就须要把音讯抽取进去。这里咱们应用的是分布式定时工作,去定时抓取 DB 中距离音讯创立工夫超过 5 分钟的且状态为 0 的音讯。
- 把抓取进去的音讯进行从新投递(Retry Send),也就是从第二步开始持续往下走
- 有些音讯可能就是因为一些理论的问题无奈路由到 Broker,比方 routingKey 设置不对,对应的队列被误删除了,那么这种音讯即便重试屡次也依然无奈投递胜利,所以须要对重试次数做限度,比方限度 3 次,如果投递次数大于三次,那么就将音讯状态更新为 2,示意这个音讯最终投递失败, 而后通过弥补机制,人工去解决。理论生产中,这种状况还是比拟少的,然而你不能没有这个弥补机制,要不然就做不到可靠性了。
毛病: 在第一步须要更新或者插入操作数据库 2 次;在大厂中 都不会加事务,都是进行的弥补操作。
优化: 不须要音讯进行长久化 只须要业务长久化
计划二 音讯二次确认
音讯的提早投递,做二次确认,回调查看(不罕用,大厂在用的高并发计划)
- (上游服务:Upstream service)业务入库 而后 send 音讯到 broker
- 进行音讯提早发送到新的 queue(延迟时间为 5 分钟: 业务决定)
- (上游服务:Downstream service)监听到音讯而后解决音讯
- 上游服务 send confirm 生成新的音讯到 broker (这里是一个新的 queue)
- callback service 去监听这个音讯 并且入库 如果监听到示意这个音讯曾经生产胜利
- callback service 去查看 第二步投递的提早音讯是否 在 msgDB 外面是否生产胜利,不存在或者生产失败就会 Resend command
- 如果在第 1,2,4 步失败,如果胜利 broker 会给咱们一个 confirm,失败当然没有,这是音讯可靠性投递的重要保障
3 幂等性
定义:用户对于同一操作发动的一次申请或者屡次申请的后果是统一的。
实现计划:惟一 ID+ 指纹码,利用数据库主键去重
惟一 ID: 业务表的主键 指纹码: 为了区别每次失常操作的码,每次操作时生成指纹码; 能够用工夫戳 + 业务编号或者标记位(具体视业务场景而定)
毛病:高并发下有数据库写入的性能瓶颈
解决方案:依据 ID 进行分库分表算法路由
4 return 机制
Return Listener 用于解决未找到交换机或未路由到队列的音讯。也是生产段增加的一个监听。
示例:打印错误信息
<?php
require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// declare exchange but don`t bind any queue
$channel->exchange_declare('hidden_exchange', AMQPExchangeType::TOPIC);
$message = new AMQPMessage("Hello World!");
$wait = true;
$returnListener = function(
$replyCode,
$replyText,
$exchange,
$routingKey,
$message
) use ($wait){$GLOBALS['wait'] = false;
echo "return:",
$replyCode,"\n",
$replyText,"\n",
$exchange,"\n",
$routingKey,"\n",
$message->body,"\n";
};
$channel->set_return_listener($returnListener);
//echo "[x] Sent non-mandatory ...";
$channel->basic_publish(
$message,
'hidden_exchange',
'rkey',
ture
);
//echo "done.\n";
while ($wait) {$channel->wait();
}
$channel->close();
$connection->close();
5 限流机制
RabbitMQ 提供了一种 qos (服务质量保障)性能, 即在非主动确认音讯的前提下, 如果肯定数目的音讯 (通过基于 Con 或者 channel 设置 Qos 的值) 未被确认前, 不生产新的音讯
限流设置
$channel->basic_qos($prefetchSize, 20, $global);
- prefetchSize: 单条音讯的大小限度,Con 通常设置为 0,示意不做限度
- prefetchCount: 一次最多能解决多少条音讯
- global: 是否将下面设置 true 利用于 channel 级别还是取 false 代表 Con 级别
注:prefetchCount 在 autoAck=false 的状况下失效, 即在自动应答的状况下该值有效
手工 ACK
prefetchCount 在 autoAck=false 的状况下失效, 即在自动应答的状况下该值有效
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
调用这个办法就会被动回送给 Broker 一个应答,示意这条音讯我解决完了,你能够给我下一条了。
参数 multiple 示意是否批量签收,因为咱们是一次解决一条音讯,所以设置为 false
如果正文掉这行,会重复生产
实现
<?php
include(__DIR__ . '/config.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();
$channel->queue_declare('qos_queue', false, true, false, false);
// 第二个参数代表:每次只生产 100 条
$channel->basic_qos(null, 109, null);
function process_message($message)
{
/* 业务逻辑 */
// 生产完音讯之后进行应答,通知 rabbit 我曾经生产了,能够发送下一组了
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
}
$channel->basic_consume('qos_queue', '', false, false, false, false,'process_message');
while ($channel->is_consuming()) {
// After 10 seconds there will be a timeout exception.
$channel->wait(null, false, 30000);
}
注:$message->ack();
就是封装过的$channel->basic_ack();
6 重回队列
当咱们设置 autoACK=false
时,就能够应用手工 ACK 形式了,其实手工形式包含了手工 ACK 与 NACK。
- 手工 ACK 时,会发送给 Broker 一个应答,代表音讯解决胜利,Broker 就可回送响应给 Pro;
- NACK 示意音讯解决失败,如果设置了重回队列,Broker 端就会将没有胜利解决的音讯从新发送。
重回队列
- 重回队列是为了对没有解决胜利的音讯, 将音讯从新投递给 Broker
- 重回队列, 会把生产失败的音讯从新增加到队列的尾端, 供 Con 持续生产
- 个别在理论利用中, 都会敞开重回队列, 即设置为 false
具体实现
<?php
/**
* - Start this consumer in one window by calling: php demo/basic_nack.php
* - Then on a separate window publish a message like this: php demo/amqp_publisher.php good
* that message should be "ack'ed"
* - Then publish a message like this: php demo/amqp_publisher.php bad
* that message should be "nack'ed"
*/
include(__DIR__ . '/config.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$exchange = 'router';
$queue = 'msgs';
$consumerTag = 'consumer';
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();
$channel->queue_declare($queue, false, true, false, false);
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
$channel->queue_bind($queue, $exchange);
/**
* @param \PhpAmqpLib\Message\AMQPMessage $message
*/
function process_message($message)
{
/*
if(插入胜利){
echo "将音讯删除:";
服务器死掉,相当于 exit;
$message->ack(true);
}else if(插入失败){
echo "将音讯不要删除,等着下次生产";
$message->nack(true);
}*/
if ($message->body == 'good') {$message->ack();
} else {
echo "胜利收到音讯,音讯内容为:".$message->body ;
echo "将音讯打回, 重回队列:";
$message->nack(true);
}
// Send a message with the string "quit" to cancel the consumer.
if ($message->body === 'quit') {$message->getChannel()->basic_cancel($message->getConsumerTag());
}
}
$channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');
/**
* @param \PhpAmqpLib\Channel\AMQPChannel $channel
* @param \PhpAmqpLib\Connection\AbstractConnection $connection
*/
function shutdown($channel, $connection)
{$channel->close();
$connection->close();}
register_shutdown_function('shutdown', $channel, $connection);
// Loop as long as the channel has callbacks registered
while ($channel->is_consuming()) {$channel->wait();
}
7 TTL
- TTL(Time To Live), 即生存工夫
- RabbitMQ 反对音讯的过期工夫,在音讯发送时能够进行指定
- RabbitMQ 反对为每个队列设置音讯的超时工夫,从音讯入队列开始计算,只有超过了队列的超时工夫配置,那么音讯会被主动革除
实现
// 音讯过期形式: 设置 queue.normal 队列中的音讯 10s 之后过期
$args->set('x-message-ttl', 10000);
$args->set('x-dead-letter-exchange', 'exchange.dlx');
$args->set('x-dead-letter-routing-key', 'routingkey');
8 死信队列
DLX – 死信队列(dead-letter-exchange) 利用 DLX, 当音讯在一个队列中变成死信 (dead message) 之后, 它能被从新 publish 到另一个 Exchange 中, 这个 Exchange 就是 DLX.
产生场景
音讯被回绝(basic.reject / basic.nack), 并且 requeue = false 音讯因 TTL 过期 队列达到最大长度
处理过程
DLX 亦为一个一般的 Exchange, 它能在任何队列上被指定, 实际上就是设置某个队列的属性 当某队列中有死信时,RabbitMQ 会主动地将该音讯从新公布到设置的 Exchange, 进而被路由到另一个队列 能够监听这个队列中的音讯做相应的解决. 该个性能够补救 RabbitMQ 3.0 以前反对的 immediate 参数的性能
配置
设置死信队列的 exchange 和 queue, 而后进行绑定 – Exchange:dlx.exchange – Queue: dlx.queue – RoutingKey:# 失常申明交换机、队列、绑定,只不过咱们须要在队列加上一个参数即可 arguments.put(” x-dead-letter- exchange”,”dlx.exchange”);
这样音讯在过期、requeue、队列在达到最大长度时,音讯就能够间接路由到死信队列!
实现
<?php
include(__DIR__ . '/config.php');
use PhpAmqpLib\Wire\AMQPTable;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Connection\AMQPStreamConnection;
/**
* 死信队列测试
* 1、创立两个交换器 exchange.normal 和 exchange.dlx, 别离绑定两个队列 queue.normal 和 queue.dlx
* 2、把 queue.normal 队列外面的音讯配置过期工夫,而后通过 x-dead-letter-exchange 指定死信交换器为 exchange.dlx
* 3、发送音讯到 queue.normal 中,音讯过期之后流入 exchange.dlx,而后路由到 queue.dlx 队列中,进行生产
*/
// todo 更改配置
//$connection = new AMQPStreamConnection('192.168.33.1', 5672, 'zhangcs', 'zhangcs', '/');
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();
$channel->exchange_declare('exchange.dlx', AMQPExchangeType::DIRECT, false, true);
$channel->exchange_declare('exchange.normal', AMQPExchangeType::FANOUT, false, true);
$args = new AMQPTable();
// 音讯过期形式:设置 queue.normal 队列中的音讯 10s 之后过期
$args->set('x-message-ttl', 10000);
// 设置队列最大长度形式:x-max-length
//$args->set('x-max-length', 1);
$args->set('x-dead-letter-exchange', 'exchange.dlx');
$args->set('x-dead-letter-routing-key', 'routingkey');
$channel->queue_declare('queue.normal', false, true, false, false, false, $args);
$channel->queue_declare('queue.dlx', false, true, false, false);
$channel->queue_bind('queue.normal', 'exchange.normal');
$channel->queue_bind('queue.dlx', 'exchange.dlx', 'routingkey');
$message = new AMQPMessage('Hello DLX Message');
$channel->basic_publish($message, 'exchange.normal', 'rk');
$channel->close();
$connection->close();
常见问题
1.Docker 环境装置扩大时报错 git was not found in your PATH, skipping source download
解决:https://blog.csdn.net/phpstory/article/details/116016980
apt --fix-broken install
apt-get update
apt-get upgrade
apt-get install zip
2.Call to undefined function PhpAmqpLib\Wire\bcmod
起因:PHP 短少 bcmath 扩大
docker-php-ext-install -j$(nproc) bcmath
参考
PHP 进阶 RabbitMQ 深入浅出
RabbitMQ 中文文档
Windows 装置 RabbitMQ 具体教程](https://zhuanlan.zhihu.com/p/534570980)
基于 Docker 装置 RabbitMQ
docker 装置 PHP 扩大
PHP 应用 rabbitmq 入门教程
php 应用 rabbitMQ
基于 RabbitMQ 实现提早队列 –PHP 版
教你如何在 Windows 下让解体的 Python 程序自重启
基于 Docker 装置 RabbitMQ