在我的项目中rabbitmq失去了宽泛的时候,这里对rabbitmq的惯例性能做了一个简略的总结,并封装成了composer包,composer包地址、github地址,欢送fork,因为程度无限,不免存在bug,欢送提出宝贵意见
easy-rabbitmq 包简介
对php-amqplib/php-amqplib包的二次封装,为常见性能提供一套开箱即用的生产解决方案
。目前反对的性能列表如下:
- 推送音讯到直连交换机(含提早音讯)
- 推送音讯到扇形交换机(含提早音讯)
- 推送音讯到主题交换机(含提早音讯)
- 订阅模式下的牢靠生产, 消费者生产失败后将会尝试持续生产,最多尝试5次。
- 拉取模式下的牢靠生产, 消费者生产失败后将会尝试持续生产,最多尝试5次。
如果还有其它场景,欢送持续补充,随后进行迭代!!
要求
- 安装包对PHP版本对要求次要取决于php-amqplib/php-amqplib包自身对要求,这里为了兼顾php5.0的使用者,咱们应用了php-amqplib/php-amqplib包V2.9.0的版本。
具体的要求参照这里。
不过笔者举荐应用php7.0及其以上版本, 这个开发包也是在7.0这个版本下面开发实现的!
装置
composer require maweibinguo/easyrabbitmq
应用
在这里咱们举荐php脚本+supervisor联合应用,用以保障生产过程的可靠性、加强worker的生产能力! 如果你还没有据说过supervisor,能够点击这里理解.
1、推送音讯
1-1、推送音讯到直连交换机
$config = [ "host" => "127.0.0.1", "port" => "5672", "user" => "guest", "password" => "guest", "vhost" => "/", "channel_max_num" => 10, ]; $instance = RabbitMq::getInstance($config); //提早音讯,30 秒中后才会达到指定的交换机 $instance->pushToDirect( $msg = time(), //音讯体内容 $exchange = "easy_direct_exchange", //交换机名称 $routingKey = "direct_test_queue", //音讯的routingkey,consume办法到bingdingkey 要和routingkey保持一致 $delaySec = 30 //提早秒数 ); //无提早,推入到指定到直链交换机 $instance->pushToDirect( $msg = time(), //音讯体内容 $exchange = "easy_direct_exchange", //交换机名称 $routingKey = "direct_test_queue", //音讯的routingkey,consume办法到bingdingkey 要和routingkey保持一致 );
1-2、推送音讯到扇形交换机
$config = [ "host" => "127.0.0.1", "port" => "5672", "user" => "guest", "password" => "guest", "vhost" => "/", "channel_max_num" => 10, ]; $instance = RabbitMq::getInstance($config); //提早音讯,30 秒中后才会达到指定的交换机 $instance->pushToFanout( $msg = time(), //音讯体内容 $exchange = "easy_fanout_exchange", //交换机名称 $delaySec = 30 //提早秒数 ); //无提早,推入到指定到直链交换机 $instance->pushToFanout( $msg = time(), //音讯体内容 $exchange = "easy_fanout_exchange" //交换机名称 );
1-3、推送音讯到主题交换机
$config = [ "host" => "127.0.0.1", "port" => "5672", "user" => "guest", "password" => "guest", "vhost" => "/", "channel_max_num" => 10, ]; $instance = RabbitMq::getInstance($config); //提早音讯,30 秒中后才会达到指定的交换机 $instance->pushToTopic( $msg = time(), //音讯体内容 $exchange = "easy_topic_exchange", //交换机名称 $routingKey = "", $delaySec = 30 //提早秒数 ); //无提早,推入到指定到直链交换机 $instance->pushToTopic( $msg = time(), //音讯体内容 $exchange = "easy_topic_exchange", //交换机名称 $routingKey = "easy.topic.queue" //routingKey 要同consum的bindingKey相匹配 //bindingKey反对两种非凡的字符"*"、“#”,用作含糊匹配, 其中"*"用于匹配一个单词、“#”用于匹配多个单词(也能够是0个) //无论是bindingKey 还是routingKey, 被"."分隔开的每一段独立的字符串就是一个单词, easy.topic.queue, 蕴含三个单词easy、topic、queue );
2、生产音讯
生产反对主动重试,最多尝试重试5次,每次生产失败后该音讯将会被从新投入到生产队列中。从新的工夫将会随着失败的次数增多逐步推移,本客户端反对的推移策略如下:
失败1次(1秒钟后会再被投递), 失败2次(2秒钟后会再被投递), 失败3次(4秒钟后会再被投递), 失败4次(8秒钟后会再被投递), 失败5次(16秒钟后会再被投递)
2-1、订阅模式
订阅模式下的牢靠生产
$config = [ "host" => "127.0.0.1", "port" => "5672", "user" => "guest", "password" => "guest", "vhost" => "/", "channel_max_num" => 10, ]; $instance = RabbitMq::getInstance($config); $instance->consume( $queueName = "direct_test_queue",//订阅的队列名称 $consumerTag = "c1",//生产标记 $exchange = "easy_direct_exchange",//交换机名称 $bindingKey = "direct_test_queue",//bindingkey,如果是直链交换机须要同routingKey保持一致 $callback = function($msg){ $body = $msg->body; file_put_contents("./test.log", "time => " . time() . "\t" . " body => " . $body . PHP_EOL , FILE_APPEND); //如果返回后果不相对等于(===)true,那么将触发音讯重试机制 return false; }, //5次生产生产失败后,失败音讯将会投递到的队列名称 $failedQueue = "easymq@failed" );
2-2、拉取模式
拉取模式下的牢靠生产
$config = [ "host" => "127.0.0.1", "port" => "5672", "user" => "guest", "password" => "guest", "vhost" => "/", "channel_max_num" => 10, ]; $instance = RabbitMq::getInstance($config); $instance->get( $queue = "get_queue", $exchange = "easy_fanout_exchange", $bindingKey = "", $callback = function($msg){ $body = $msg->body; file_put_contents("./test.log", "time => " . time() . "\t" . " body => " . $body . PHP_EOL , FILE_APPEND); //如果返回后果不相对等于(===)true,那么将触发音讯重试机制 return false; }, //5次生产生产失败后,失败音讯将会投递到的队列名称 $failedQueue = 'easymq@failed' );