前言:
有些人为了让我的项目疾速上线,服务器往往装置宝塔面板,而后再极速装置LNMP。只管环境搭建的工夫省了,然而宝塔上PHP中扩大包没有提供AMQP。这时候只是为了应用音讯队列而对PHP大动干戈, 不如应用一个PHP AMQP的库,即用即装,不对环境造成影响。
简介:
php-amqplib 客户端库,通过composer装置,不须要在PHP中装置扩大,以下为两种不同的装置形式。
- 我的项目中新建composer.json,增加如下代码,而后composer install
{ "require": { "php-amqplib/php-amqplib": " 2.6.*" }}
- 命令进入到我的项目,而后 composer require php-amqplib/php-amqplib 2.6.*
RabbitMQ设置:
- 进入web管控台,增加新用户,角色管理员,任何IP上都能够登录,受权指定虚拟机。
- 增加交换机
- 增加队列并与交互机绑定。
编码:
1. 封装rabbitMQ类。
<?phpuse PhpAmqpLib\Connection\AMQPStreamConnection;use PhpAmqpLib\Message\AMQPMessage;/** * Class RabbitMQ. */class RabbitMQ{ const READ_LINE_NUMBER = 0; const READ_LENGTH = 1; const READ_DATA = 2; public $config; public static $prefix = 'autoinc_key:'; protected $exchangeName = 'flow'; protected $queueName = 'flow_queue'; /** * @var \PhpAmqpLib\Connection\AMQPStreamConnection */ protected $connection; /** * @var \PhpAmqpLib\Channel\AMQPChannel */ protected $channel; protected $queue; //配置项 private $host; private $port; private $user; private $pass; private $vhost; public function __construct($config = []) { //$this->config = $config; //设置rabbitmq配置值 $this->host = '192.168.1.101'; $this->port = 5672; $this->user = 'beiqiaosu'; $this->pass = 'beiqiaosu'; $this->vhost = 'report'; $this->connect(); } public function __call($method, $args = []) { $reConnect = false; while (1) { try { $this->initChannel(); $result = call_user_func_array([$this->channel, $method], $args); } catch (\Exception $e) { //已重连过,依然报错 if ($reConnect) { throw $e; } \Swoole::$php->log->error(__CLASS__ . ' [' . posix_getpid() . "] Swoole RabbitMQ[{$this->config['host']}:{$this->config['port']}] Exception(Msg=" . $e->getMessage() . ', Code=' . $e->getCode() . "), RabbitMQ->{$method}, Params=" . var_export($args, 1)); if ($this->connection) { $this->close(); } $this->connect(); $reConnect = true; continue; } return $result; } //不可能到这里 return false; } /** * 连贯rabbitmq音讯队列. * * @return bool */ public function connect() { try { if ($this->connection) { unset($this->connection); } $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pass, $this->vhost); } catch (\Exception $e) { echo __CLASS__ ."Swoole RabbitMQ Exception'".$e->getMessage(); return false; } } /** * 敞开连贯. */ public function close() { $this->channel->close(); $this->connection->close(); } /** * 设置交换机名称. * * @param string $exchangeName */ public function setExchangeName($exchangeName = '') { $exchangeName && $this->exchangeName = $exchangeName; } /** * 设置队列名称. * * @param string $queueName */ public function setQueueName($queueName = '') { $queueName && $this->queueName = $queueName; } /** * 设置频道. */ public function initChannel() { if (!$this->channel) { //通道 $this->channel = $this->connection->channel(); $this->channel->queue_declare($this->queueName, false, true, false, false); $this->channel->exchange_declare($this->exchangeName, 'direct', false, true, false); $this->channel->queue_bind($this->queueName, $this->exchangeName); } } /** * 获取队列数据. * * @return mixed */ public function pop() { while (1) { try { $this->connect(); $this->initChannel(); $message = $this->channel->basic_get($this->queueName); if ($message) { $this->channel->basic_ack($message->delivery_info['delivery_tag']); $result = $message->body; } else { throw new \Exception('Empty Queue Data'); } } catch (\Exception $e) { //\Swoole::$php->log->error(__CLASS__ . " [" . posix_getpid() . "] Swoole RabbitMQ[{$this->config['host']}:{$this->config['port']}] Exception(Msg=" . $e->getMessage() . ", Code=" . $e->getCode() . ")"); sleep(1); continue; } return $result; } //不可能到这里 return false; } /** * 插入队列数据. * * @param $data * * @return bool */ public function push($data) { while (1) { try { $this->connect(); $this->initChannel(); $message = new AMQPMessage($data, ['content_type'=>'text/plain', 'devlivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); $this->channel->basic_publish($message, $this->exchangeName); } catch (\Exception $e) { echo "$e->getMessage()"; continue; } return true; } //不可能到这里 return false; }}
2. 操作mq,出队,入队。
<?phprequire_once "vendor/autoload.php";require_once "component/RabbitMQ.php";$mq = new RabbitMQ();// 音讯生产测试/*try { $res = $mq->pop(); }catch(\Exception $e) { var_dump($e->getMessage());die;}*/// 音讯生产测试try { $res = $mq->push(json_encode(['name'=>'beiqiaosu','order_id'=>'2020070115261425155'])); }catch(\Exception $e) { var_dump($e->getMessage());die;}var_dump($res);die;
测试:
- 先通过生产音讯(入队)办法运行一下,而后进入队列中get message查看音讯总数。
- 测试调用生产,再查看总数。