关于rabbitmq:RabbitMQ-phpamqplib-基本操作与日常场景示例

9次阅读

共计 5935 个字符,预计需要花费 15 分钟才能阅读完成。

发现网上并没有 RabbitMQ PHP 客户端的较为详实的应用文档 (当然也可能是我搜索引擎应用不纯熟?) 总之,简略的总结了此文,附录各参数和罕用的工作队列,死信队列,以及不同类型交换器的示例,自己程度无限,不免有谬误之处,欢送大佬斧正~

服务器环境 Ubuntu 18.04.5 LTS
PHP 7.2.24
RabbitMQ 3.6.10
php-amqplib 2.7

1. 各办法参数

//1.1 建设连贯
$conn = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
参数:
$host:      RabbitMQ 服务器主机 IP 地址
$port:      RabbitMQ 服务器端口
$user:      连贯 RabbitMQ 服务器的用户名
$password:  连贯 RabbitMQ 服务器的用户明码
$vhost:     连贯 RabbitMQ 服务器的 vhost(服务器能够有多个 vhost,虚拟主机,相似 nginx 的 vhost)
//1.2 建设信道
$channel = $conn->channel($channel_id); 
参数:
$channel_id 信道 id,不传则获取 $channel[“”]信道,再无则循环 $this->channle 数组,下标从 1 到最大信道数找第一个不是 AMQPChannel 对象的下标,实例化并返回 AMQPChannel 对象,无则抛出异样 No free channel ids
//1.3 申明交换器
$channel->exchange_declare($exhcange_name, $type, $passive, $durable, $auto_delete);
参数:$exhcange_name 交换器名字
$type          交换器类型
$passive       是否检测同名队列
$durable       交换机是否开启长久化
$auto_detlete  通道敞开后是否删除队列
(1)交换器类型
枚举 [direct: (默认)间接交换器,工作形式相似于单播,Exchange 会将音讯发送齐全匹配 ROUTING_KEY 的 Queue,
    fanout: 播送是式交换器,不论音讯的 ROUTING_KEY 设置为什么,Exchange 都会将音讯转发给所有绑定的 Queue,
    topic:  主题交换器,工作形式相似于组播,Exchange 会将音讯转发和 ROUTING_KEY 匹配模式雷同的所有队列,比方,ROUTING_KEY 为 user.stock 的 Message 会转发给绑定匹配模式为 * .stock,user.stock,* . * 和 #.user.stock.# 的队列。(* 表是匹配一个任意词组,# 示意匹配 0 个或多个词组),
    headers: 依据音讯体的 header 匹配
    
]
//1.4 申明队列
$channel->queue_declare($queue_name, $passive, $durable, $exclusive, $auto_delete);
参数:
$queue_name  队列名称
$passive     是否检测同名队列
$durable     是否开启队列长久化
$exclusive   队列是否能够被其余队列拜访
$auto_delete 通道敞开后是否删除队列
//1.5 创立要发送的信息,能够创立多个音讯
$msg = new AMQPMessage($data, $properties)
$data             要发送的音讯
$properties Array 设置的属性,比方设置该音讯长久化['delivery_mode'=>2]
// 单个发送
$channel->basic_publish($msg,
        $exchange = '',
        $routing_key = '',
        $mandatory = false,
        $immediate = false,
        $ticket = null);
参数:$msg         音讯内容
$exchange    交换器
$routing_key routing_key
$mandatory   匹配不到队列时,是否立刻抛弃音讯
$immediate   队列无消费者时,是否立刻抛弃音讯
$ticket      这个俺也不晓得 坐等大佬

// 多个发送
1. 屡次调用 $channel->batch_basic_publish($msg, $exchange = '', $routing_key ='', $mandatory = false, $immediate = false, $ticket = null)
外部实现: 往 $this->batch_messages[]塞
2. 再调用一次 $channel->publish_batch(),实现发送
1.6 路由绑定
$channel->queue_bind(
        $queue,
        $exchange,
        $routing_key = '',
        $nowait = false,
        $arguments = array(),
        $ticket = null
    )
参数:
$queue       队列名
$exchange    交换器名
$routing_key routing_key
$nowait      同上 俺也不知
$arguments
$ticket 
1.7 生产音讯
$channel->basic_consume(
        $queue = '',
        $consumer_tag = '',
        $no_local = false,
        $no_ack = false,
        $exclusive = false,
        $nowait = false,
        $callback = null,
        $ticket = null,
        $arguments = array())
参数:
$queue        队列名
$consumer_tag 
$no_local 
$no_ack       是否不须要手动 ack:true 就是不须要 ack|false 须要手动 ack
$exclusive
$nowait
$callback     音讯回调函数
$ticket
$arguments
1.8 手动 ack 示例
$callback = function($msg) {sleep($msg->body);
            echo "[x] Received sleep", $msg->body, "\n";
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
            echo "[x] Ack"."\n";
        };
1.9 限度散发 示例
限度 RabbitMQ 只发不超过 1 条的音讯给同一个消费者。当音讯处理完毕后,有了反馈,才会进行第二次发送。$channel->basic_qos(null,1,null);

2. 罕用场景

2.1 工作队列 按生产能力散发
生产者和消费者均减少
$channel->basic_qos(null,1,null);
即可。
2.2 fanout 播送示例 注册行为
例如注册后须要发送欢送短信和邮件,将注册行为播送至短信和邮件

生产者
// 定义交换器
$channel->exchange_declare('register','fanout',false,false,false);
$msg = new AMQPMessage('register event');
$channel->basic_publish($msg, 'register');

注册短信消费者
$channel->exchange_declare('register','fanout',false,false,false);
$channel->queue_declare('register.sms', false, false, false, false);
$channel->queue_bind('register.sms', 'register');

注册邮件消费者
$channel->exchange_declare('register','fanout',false,false,false);
$channel->queue_declare('register.mail', false, false, false, false);
$channel->queue_bind('register.mail', 'register');
2.3 topic 类型 含糊匹配示例 日志分级
例如我想一个消费者承受所有日志,一个消费者只接管 Error 级别日志

生产者
// 定义交换器
$channel->exchange_declare('log','topic',false,false,false);
$num = rand(0,10);
if ($num%3 == 0) {$level = 'error';}elseif($num%3 == 1){$level = 'warning';}else{$level = 'common';}
$msg = new AMQPMessage('log event'.$level);
$channel->basic_publish($msg, 'log', 'log.'.$level);

全量日志消费者
$channel->exchange_declare('log','topic',false,false,false);
$channel->queue_declare('log.all', false, false, false, false);
$channel->queue_bind('log.all', 'log', 'log.*');

Error 日志消费者
$channel->exchange_declare('log','topic',false,false,false);
$channel->queue_declare('log.error', false, false, false, false);
$channel->queue_bind('log.error', 'log', 'log.error');
2.4 headers 类型 匹配示例 日志分级
例如我想一个消费者承受所有日志,一个消费者只接管 Error 级别日志

生产者
// 定义交换器
$channel->exchange_declare('log2','headers',false,false,false);
$num = rand(0,10);
if ($num%3 == 0) {$level = 'error';}elseif($num%3 == 1){$level = 'warning';}else{$level = 'common';}
$msg = new AMQPMessage('log2 event'.$level);

$bindArguments = [
    'level' => $level,
    'type'  => 'log'
 ]; 
$headers = new AMQPTable($bindArguments);
$msg->set('application_headers', $bindArguments);    

$channel->basic_publish($msg, 'log2');

全量日志消费者
$channel->exchange_declare('log2','headers',false,false,false);
$channel->queue_declare('log2.all', false, false, false, false);

$bindArguments = [
    'type'  => 'log',
    //'x-match' => 'any' // 默认 any
]; 
$headers = new AMQPTable($bindArguments);
$channel->queue_bind('log2.all', 'log2', '', false, $headers);

Error 日志消费者
$channel->exchange_declare('log2','headers',false,false,false);
$channel->queue_declare('log2.error', false, false, false, false);

$bindArguments = [
    'type'    => 'log',
    'level'   => 'error',
    'x-match' => 'all' // 默认 any
]; 
$headers = new AMQPTable($bindArguments);
$channel->queue_bind('log2.error', 'log2', '', false, $headers);
2.5 死信队列
//2.5.1 定义一个没有消费者,5s 后音讯过期的队列
// 生产者
$arguments = new AMQPTable([
    'x-dead-letter-exchange'    => 'dead',
    'x-message-ttl'             => 5000, // 音讯存活工夫毫秒
    'x-dead-letter-routing-key' => 'dead'
]);

// 定义队列 不要交换器
$channel->queue_declare('no_consume', false, false, false, false, false, $arguments);
$now = time();
$msg = new AMQPMessage(time());        
$channel->basic_qos(null,1,null);
$channel->basic_publish($msg, '','no_consume');
echo "[x] Sent no_consume :".date('Y-m-d H:i:s',$now)."\n";
$channel->close();
$connection->close();

// 消费者
$channel->exchange_declare('dead','topic',false,false,false);
$channel->queue_declare('dead.all', false, false, false, false);
$channel->queue_bind('dead.all', 'dead', 'dead');

$channel->basic_qos(null,1,null);
echo '[*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg) {var_dump('msg:'.date('Y-m-d H:i:s',$msg->body));
    var_dump('now:'.date('Y-m-d H:i:s'));
    echo "[x] Received log error", $msg->body, "\n";
};
$channel->basic_consume('dead.all', '', false, true, false, false, $callback);
while(count($channel->callbacks)) {$channel->wait();
}

正文完
 0