简介
RabbitMQ 是一个高可用的信息中间件,学习和使用 RabbitMQ 非常有必要。
- 异步消息传递
- 支持各种开发语言 java、python、php 等
- 可插拔的身份验证、授权
- RabbitMQ-Manager 可用于管理和监视。
安装
这里直接使用 docker,很方便的进行安装
拉取镜像docker pull rabbitmq:3.8.3-management-alpine
运行docker run --name run-rabbitmq -d -p 15672:15672 -p 5672:5672 rabbitmq
15672 端口是 RabbitMQ Web 管理页面,直接访问:http://localhost:15672/,初始用户密码:guest
使用
RabbitMQ 作为生产者和消费者来使用时,基本上有 2 中场景
- 一个 / 多个生产者,多个共享消费者
- 一个 / 多个生产者,多个独立消费者
共享的消费者可以同时消费一个队列的数据,增加吞吐量
独立的消费者不共享队列,每个消费者都有自己的队列,可以定义规则从 exchange 中 pull 数据到自己的 queue 中
下面将通过代码来实现各种场景
基础概念
queue
数据队列,数据可以推送到 queue,也可以从 queue 中消费
exchange 交换机
将数据推送到交换机中,队列可以绑定交换机,交换机的类型不同所支持的绑定规则也不同
- fanout 没有规则,所有 exchange 中的数据
- direct 精确匹配,只绑定 routingkey 指定值的数据
- topic 更加灵活的规则,路由键 routingkey 必须是一个由
.
分隔开的词语,*
(星号) 用来表示一个单词,#
(井号) 用来表示任意数量(零个或多个)单词
先封装了 RabbitMQ 一些常用操作
<?php
namespace RabbitMQ;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitMQ
{
private $host = '127.0.0.1';
private $port = 5672;
private $user = 'guest';
private $password = 'guest';
protected $connection;
protected $channel;
/**
* RabbitMQ constructor.
*/
public function __construct()
{$this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password);
$this->channel = $this->connection->channel();}
/**
* @param $exchangeName
* @param $type
* @param $pasive
* @param $durable
* @param $autoDelete
*/
public function createExchange($exchangeName, $type, $pasive = false, $durable = false, $autoDelete = false)
{$this->channel->exchange_declare($exchangeName, $type, $pasive, $durable, $autoDelete);
}
/**
* @param $queueName
* @param $pasive
* @param $durable
* @param $exlusive
* @param $autoDelete
*/
public function createQueue($queueName, $pasive = false, $durable = false, $exlusive = false, $autoDelete = false, $nowait = false, $arguments = [])
{$this->channel->queue_declare($queueName, $pasive, $durable, $exlusive, $autoDelete, $nowait, $arguments);
}
/**
* 生成信息
* @param $message
*/
public function sendMessage($message, $routeKey, $exchange = '', $properties = [])
{
$data = new AMQPMessage($message, $properties);
$this->channel->basic_publish($data, $exchange, $routeKey);
}
/**
* 消费消息
* @param $queueName
* @param $callback
* @throws \ErrorException
*/
public function consumeMessage($queueName, $callback, $tag = '', $noLocal = false, $noAck = false, $exclusive = false, $noWait = false)
{$this->channel->basic_consume($queueName, $tag, $noLocal, $noAck, $exclusive, $noWait, $callback);
while ($this->channel->is_consuming()) {$this->channel->wait();
}
}
/**
* @throws \Exception
*/
public function __destruct()
{$this->channel->close();
$this->connection->close();}
}
多个共享消费者
多个消费者可以增加消费速度,提供系统吞吐量
小二,直接上代码吧
生产者代码
<?php
require_once '../../vendor/autoload.php';
use RabbitMQ\RabbitMQ;
use PhpAmqpLib\Message\AMQPMessage;
$rabbit = new RabbitMQ();
$queueName = 'test-single-queue';
$rabbit->createQueue($queueName,false,true,false,false);
for ($i = 0; $i < 10000; $i++) {$rabbit->sendMessage($i . "this is a test message.", $queueName,'',['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT // 消息持久化,重启 rabbitmq,消息不会丢失]);
}
unset($rabbit);// 关闭连接
运行生产者php Producer
,在 manager web 页面可以可看到这个 queue 信息
消费者代码
<?php
require_once '../../vendor/autoload.php';
use RabbitMQ\RabbitMQ;
$rabbit = new RabbitMQ();
$queueName = 'test-single-queue';
$callback = function ($message){var_dump("Received Message :" . $message->body);//print message
sleep(2);// 处理耗时任务
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);//ack
};
$rabbit->consumeMessage($queueName,$callback);
unset($rabbit);// 关闭连接
运行消费者二次 php Consumer.php
可以看到二个消费者不会重复 message
在 manager web 看到此 queue 的 message 正在被消费
多个独立消费者
RabbitMQ 生产者将 message 推送到 exchange,通过将多个 queue 与 exchange 进行绑定,来实现多个独立消费者
定义一个 topic 类型的交换机,消费规则是:test.ex. 加一个单词
<?php
require_once '../../vendor/autoload.php';
use RabbitMQ\RabbitMQ;
$rabbit = new RabbitMQ();
$exchangeName = 'test-ex-topic';
$queueName = 'test-consumer-ex-topic';
$routingKey = 'test.ex.*';// 消费规则定义
// 创建队列
$rabbit->createQueue($queueName, false, true);
// 绑定到交换机
$rabbit->bindQueue($queueName, $exchangeName, $routingKey);
// 消费
$callback = function ($message) {var_dump("Received Message :" . $message->body);//print message
sleep(2);// 处理耗时任务
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);//ack
};
$rabbit->consumeMessage($queueName, $callback);
unset($rabbit);// 关闭连接
启动消费者php Consumer.php
定义生产者,会向 2 个不同的 routingkey 中推送 message
<?php
require_once '../../vendor/autoload.php';
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
use RabbitMQ\RabbitMQ;
$rabbit = new RabbitMQ();
$routingKey1 = 'test.ex.queue1';
$routingKey2 = 'test.ex.queue2';
$exchangeName = 'test-ex-topic';
$rabbit->createExchange($exchangeName, AMQPExchangeType::TOPIC, false, true, false);
// 向交换机和 routingkey = test-ex-queue1 中推送 10000 条数据
for ($i = 0; $i < 10000; $i++) {
$rabbit->sendMessage($i . "this is a queue1 message.", $routingKey1, $exchangeName, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT // 消息持久化,重启 rabbitmq,消息不会丢失]);
}
// 向交换机和 routingkey = test-ex-queue2 中推送 10000 条数据
for ($i = 0; $i < 10000; $i++) {
$rabbit->sendMessage($i . "this is a queue2 message.", $routingKey2, $exchangeName, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT // 消息持久化,重启 rabbitmq,消息不会丢失]);
}
unset($rabbit);// 关闭连接
运行生产者php Producer.php
, 可以看到消费者有 2 万条 message 可以消费