在我的项目中 rabbitmq 失去了宽泛的时候,这里对 rabbitmq 的惯例性能做了一个简略的总结,并封装成了 composer 包,composer 包地址、github 地址,欢送 fork,因为程度无限,不免存在 bug,欢送提出宝贵意见
easy-rabbitmq 包简介
对 php-amqplib/php-amqplib 包的二次封装,为常见性能提供一套开箱即用的生产解决方案
。目前反对的性能列表如下:
- 推送音讯到直连交换机 (含提早音讯)
- 推送音讯到扇形交换机 (含提早音讯)
- 推送音讯到主题交换机 (含提早音讯)
- 订阅模式下的牢靠生产, 消费者生产失败后将会尝试持续生产,最多尝试 5 次。
- 拉取模式下的牢靠生产, 消费者生产失败后将会尝试持续生产,最多尝试 5 次。
如果还有其它场景,欢送持续补充,随后进行迭代!!
要求
- 安装包对 PHP 版本对要求次要取决于 php-amqplib/php-amqplib 包自身对要求, 这里为了兼顾 php5.0 的使用者, 咱们应用了 php-amqplib/php-amqplib 包 V2.9.0 的版本。
具体的要求参照这里。
不过笔者举荐应用 php7.0 及其以上版本, 这个开发包也是在 7.0 这个版本下面开发实现的!
装置
composer require maweibinguo/easyrabbitmq
应用
在这里咱们举荐 php 脚本 +supervisor 联合应用,用以保障生产过程的可靠性、加强 worker 的生产能力!如果你还没有据说过 supervisor,能够点击这里理解.
1、推送音讯
1-1、推送音讯到直连交换机
$config = [
"host" => "127.0.0.1",
"port" => "5672",
"user" => "guest",
"password" => "guest",
"vhost" => "/",
"channel_max_num" => 10,
];
$instance = RabbitMq::getInstance($config);
// 提早音讯,30 秒中后才会达到指定的交换机
$instance->pushToDirect($msg = time(), // 音讯体内容
$exchange = "easy_direct_exchange", // 交换机名称
$routingKey = "direct_test_queue", // 音讯的 routingkey,consume 办法到 bingdingkey 要和 routingkey 保持一致
$delaySec = 30 // 提早秒数
);
// 无提早,推入到指定到直链交换机
$instance->pushToDirect($msg = time(), // 音讯体内容
$exchange = "easy_direct_exchange", // 交换机名称
$routingKey = "direct_test_queue", // 音讯的 routingkey,consume 办法到 bingdingkey 要和 routingkey 保持一致
);
1-2、推送音讯到扇形交换机
$config = [
"host" => "127.0.0.1",
"port" => "5672",
"user" => "guest",
"password" => "guest",
"vhost" => "/",
"channel_max_num" => 10,
];
$instance = RabbitMq::getInstance($config);
// 提早音讯,30 秒中后才会达到指定的交换机
$instance->pushToFanout($msg = time(), // 音讯体内容
$exchange = "easy_fanout_exchange", // 交换机名称
$delaySec = 30 // 提早秒数
);
// 无提早,推入到指定到直链交换机
$instance->pushToFanout($msg = time(), // 音讯体内容
$exchange = "easy_fanout_exchange" // 交换机名称
);
1-3、推送音讯到主题交换机
$config = [
"host" => "127.0.0.1",
"port" => "5672",
"user" => "guest",
"password" => "guest",
"vhost" => "/",
"channel_max_num" => 10,
];
$instance = RabbitMq::getInstance($config);
// 提早音讯,30 秒中后才会达到指定的交换机
$instance->pushToTopic($msg = time(), // 音讯体内容
$exchange = "easy_topic_exchange", // 交换机名称
$routingKey = "",
$delaySec = 30 // 提早秒数
);
// 无提早,推入到指定到直链交换机
$instance->pushToTopic($msg = time(), // 音讯体内容
$exchange = "easy_topic_exchange", // 交换机名称
$routingKey = "easy.topic.queue" //routingKey 要同 consum 的 bindingKey 相匹配
//bindingKey 反对两种非凡的字符 "*"、“#”,用作含糊匹配, 其中 "*" 用于匹配一个单词、“#”用于匹配多个单词 (也能够是 0 个)
// 无论是 bindingKey 还是 routingKey, 被 "." 分隔开的每一段独立的字符串就是一个单词, easy.topic.queue, 蕴含三个单词 easy、topic、queue
);
2、生产音讯
生产反对主动重试,最多尝试重试 5 次,每次生产失败后该音讯将会被从新投入到生产队列中。从新的工夫将会随着失败的次数增多逐步推移, 本客户端反对的推移策略如下:
失败 1 次(1 秒钟后会再被投递), 失败 2 次(2 秒钟后会再被投递), 失败 3 次(4 秒钟后会再被投递), 失败 4 次(8 秒钟后会再被投递), 失败 5 次(16 秒钟后会再被投递)
2-1、订阅模式
订阅模式下的牢靠生产
$config = [
"host" => "127.0.0.1",
"port" => "5672",
"user" => "guest",
"password" => "guest",
"vhost" => "/",
"channel_max_num" => 10,
];
$instance = RabbitMq::getInstance($config);
$instance->consume(
$queueName = "direct_test_queue",// 订阅的队列名称
$consumerTag = "c1",// 生产标记
$exchange = "easy_direct_exchange",// 交换机名称
$bindingKey = "direct_test_queue",//bindingkey,如果是直链交换机须要同 routingKey 保持一致
$callback = function($msg){
$body = $msg->body;
file_put_contents("./test.log", "time =>" . time() . "\t" . "body =>" . $body . PHP_EOL , FILE_APPEND);
// 如果返回后果不相对等于 (===)true, 那么将触发音讯重试机制
return false;
},
// 5 次生产生产失败后,失败音讯将会投递到的队列名称
$failedQueue = "easymq@failed"
);
2-2、拉取模式
拉取模式下的牢靠生产
$config = [
"host" => "127.0.0.1",
"port" => "5672",
"user" => "guest",
"password" => "guest",
"vhost" => "/",
"channel_max_num" => 10,
];
$instance = RabbitMq::getInstance($config);
$instance->get(
$queue = "get_queue",
$exchange = "easy_fanout_exchange",
$bindingKey = "",
$callback = function($msg){
$body = $msg->body;
file_put_contents("./test.log", "time =>" . time() . "\t" . "body =>" . $body . PHP_EOL , FILE_APPEND);
// 如果返回后果不相对等于 (===)true, 那么将触发音讯重试机制
return false;
},
// 5 次生产生产失败后,失败音讯将会投递到的队列名称
$failedQueue = 'easymq@failed'
);