发现网上并没有RabbitMQ PHP客户端的较为详实的应用文档(当然也可能是我搜索引擎应用不纯熟?)总之,简略的总结了此文,附录各参数和罕用的工作队列,死信队列,以及不同类型交换器的示例,自己程度无限,不免有谬误之处,欢送大佬斧正~

服务器环境 Ubuntu 18.04.5 LTS
PHP 7.2.24
RabbitMQ 3.6.10
php-amqplib 2.7

1.各办法参数

//1.1 建设连贯$conn = new AMQPStreamConnection($host, $port, $user, $password, $vhost);参数:$host:      RabbitMQ服务器主机IP地址$port:      RabbitMQ服务器端口$user:      连贯RabbitMQ服务器的用户名$password:  连贯RabbitMQ服务器的用户明码$vhost:     连贯RabbitMQ服务器的vhost(服务器能够有多个vhost,虚拟主机,相似nginx的vhost)
//1.2 建设信道$channel = $conn->channel($channel_id); 参数:$channel_id 信道id,不传则获取$channel[“”]信道,再无则循环$this->channle数组,下标从1到最大信道数找第一个不是AMQPChannel对象的下标,实例化并返回AMQPChannel对象,无则抛出异样No free channel ids
//1.3 申明交换器$channel->exchange_declare($exhcange_name, $type, $passive, $durable, $auto_delete);参数:$exhcange_name 交换器名字$type          交换器类型$passive       是否检测同名队列$durable       交换机是否开启长久化$auto_detlete  通道敞开后是否删除队列(1)交换器类型枚举 [    direct: (默认)间接交换器,工作形式相似于单播,Exchange会将音讯发送齐全匹配ROUTING_KEY的Queue,    fanout: 播送是式交换器,不论音讯的ROUTING_KEY设置为什么,Exchange都会将音讯转发给所有绑定的Queue,    topic:  主题交换器,工作形式相似于组播,Exchange会将音讯转发和ROUTING_KEY匹配模式雷同的所有队列,比方,ROUTING_KEY为user.stock的Message会转发给绑定匹配模式为 * .stock,user.stock, * . * 和#.user.stock.#的队列。(* 表是匹配一个任意词组,#示意匹配0个或多个词组),    headers:依据音讯体的header匹配    ]
//1.4 申明队列$channel->queue_declare($queue_name, $passive, $durable, $exclusive, $auto_delete);参数:$queue_name  队列名称$passive     是否检测同名队列$durable     是否开启队列长久化$exclusive   队列是否能够被其余队列拜访$auto_delete 通道敞开后是否删除队列
//1.5 创立要发送的信息 ,能够创立多个音讯$msg = new AMQPMessage($data, $properties)$data             要发送的音讯$properties Array 设置的属性,比方设置该音讯长久化['delivery_mode'=>2]//单个发送$channel->basic_publish($msg,        $exchange = '',        $routing_key = '',        $mandatory = false,        $immediate = false,        $ticket = null);参数:$msg         音讯内容$exchange    交换器$routing_key routing_key$mandatory   匹配不到队列时,是否立刻抛弃音讯$immediate   队列无消费者时,是否立刻抛弃音讯$ticket      这个俺也不晓得 坐等大佬//多个发送1.屡次调用 $channel->batch_basic_publish($msg, $exchange = '', $routing_key = '', $mandatory = false, $immediate = false, $ticket = null)外部实现:往$this->batch_messages[]塞2.再调用一次$channel->publish_batch(),实现发送
1.6 路由绑定$channel->queue_bind(        $queue,        $exchange,        $routing_key = '',        $nowait = false,        $arguments = array(),        $ticket = null    )参数:$queue       队列名$exchange    交换器名$routing_key routing_key$nowait      同上 俺也不知$arguments$ticket 
1.7 生产音讯$channel->basic_consume(        $queue = '',        $consumer_tag = '',        $no_local = false,        $no_ack = false,        $exclusive = false,        $nowait = false,        $callback = null,        $ticket = null,        $arguments = array()    )参数:$queue        队列名$consumer_tag $no_local $no_ack       是否不须要手动ack:true就是不须要ack|false须要手动ack$exclusive$nowait$callback     音讯回调函数$ticket$arguments
1.8 手动ack 示例$callback = function($msg) {            sleep($msg->body);            echo " [x] Received sleep ", $msg->body, "\n";            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);            echo " [x] Ack "."\n";        };
1.9 限度散发 示例限度RabbitMQ只发不超过1条的音讯给同一个消费者。当音讯处理完毕后,有了反馈,才会进行第二次发送。$channel->basic_qos(null,1,null);

2.罕用场景

2.1 工作队列 按生产能力散发
生产者和消费者均减少$channel->basic_qos(null,1,null);即可。
2.2 fanout播送示例 注册行为
例如注册后须要发送欢送短信和邮件,将注册行为播送至短信和邮件生产者//定义交换器$channel->exchange_declare('register','fanout',false,false,false);$msg = new AMQPMessage('register event');$channel->basic_publish($msg, 'register');注册短信消费者$channel->exchange_declare('register','fanout',false,false,false);$channel->queue_declare('register.sms', false, false, false, false);$channel->queue_bind('register.sms', 'register');注册邮件消费者$channel->exchange_declare('register','fanout',false,false,false);$channel->queue_declare('register.mail', false, false, false, false);$channel->queue_bind('register.mail', 'register');
2.3 topic类型 含糊匹配示例 日志分级
例如我想一个消费者承受所有日志,一个消费者只接管Error级别日志生产者//定义交换器$channel->exchange_declare('log','topic',false,false,false);$num = rand(0,10);if ($num%3 == 0) {    $level = 'error';}elseif($num%3 == 1){    $level = 'warning';}else{    $level = 'common';}$msg = new AMQPMessage('log event '.$level);$channel->basic_publish($msg, 'log', 'log.'.$level);全量日志消费者$channel->exchange_declare('log','topic',false,false,false);$channel->queue_declare('log.all', false, false, false, false);$channel->queue_bind('log.all', 'log', 'log.*');Error日志消费者$channel->exchange_declare('log','topic',false,false,false);$channel->queue_declare('log.error', false, false, false, false);$channel->queue_bind('log.error', 'log', 'log.error');
2.4 headers类型 匹配示例 日志分级
例如我想一个消费者承受所有日志,一个消费者只接管Error级别日志生产者//定义交换器$channel->exchange_declare('log2','headers',false,false,false);$num = rand(0,10);if ($num%3 == 0) {    $level = 'error';}elseif($num%3 == 1){    $level = 'warning';}else{    $level = 'common';}$msg = new AMQPMessage('log2 event '.$level);$bindArguments = [    'level' => $level,    'type'  => 'log' ]; $headers = new AMQPTable($bindArguments);$msg->set('application_headers', $bindArguments);    $channel->basic_publish($msg, 'log2');全量日志消费者$channel->exchange_declare('log2','headers',false,false,false);$channel->queue_declare('log2.all', false, false, false, false);$bindArguments = [    'type'  => 'log',    //'x-match' => 'any' //默认any]; $headers = new AMQPTable($bindArguments);$channel->queue_bind('log2.all', 'log2', '', false, $headers);Error日志消费者$channel->exchange_declare('log2','headers',false,false,false);$channel->queue_declare('log2.error', false, false, false, false);$bindArguments = [    'type'    => 'log',    'level'   => 'error',    'x-match' => 'all' //默认any]; $headers = new AMQPTable($bindArguments);$channel->queue_bind('log2.error', 'log2', '', false, $headers);
2.5死信队列
//2.5.1 定义一个没有消费者,5s后音讯过期的队列//生产者$arguments = new AMQPTable([    'x-dead-letter-exchange'    => 'dead',    'x-message-ttl'             => 5000, //音讯存活工夫毫秒    'x-dead-letter-routing-key' => 'dead']);//定义队列 不要交换器$channel->queue_declare('no_consume', false, false, false, false, false, $arguments);$now = time();$msg = new AMQPMessage(time());        $channel->basic_qos(null,1,null);$channel->basic_publish($msg, '', 'no_consume');echo " [x] Sent no_consume :".date('Y-m-d H:i:s',$now)."\n";$channel->close();$connection->close();//消费者$channel->exchange_declare('dead','topic',false,false,false);$channel->queue_declare('dead.all', false, false, false, false);$channel->queue_bind('dead.all', 'dead', 'dead');$channel->basic_qos(null,1,null);echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";$callback = function($msg) {    var_dump('msg:'.date('Y-m-d H:i:s',$msg->body));    var_dump('now:'.date('Y-m-d H:i:s'));    echo " [x] Received log error ", $msg->body, "\n";};$channel->basic_consume('dead.all', '', false, true, false, false, $callback);while(count($channel->callbacks)) {    $channel->wait();}