前言:

有些人为了让我的项目疾速上线,服务器往往装置宝塔面板,而后再极速装置LNMP。只管环境搭建的工夫省了,然而宝塔上PHP中扩大包没有提供AMQP。这时候只是为了应用音讯队列而对PHP大动干戈, 不如应用一个PHP AMQP的库,即用即装,不对环境造成影响。 

简介:

php-amqplib 客户端库,通过composer装置,不须要在PHP中装置扩大,以下为两种不同的装置形式。

  1. 我的项目中新建composer.json,增加如下代码,而后composer install
{    "require": {        "php-amqplib/php-amqplib": " 2.6.*"    }}
  1. 命令进入到我的项目,而后 composer require php-amqplib/php-amqplib 2.6.*

RabbitMQ设置:

  1. 进入web管控台,增加新用户,角色管理员,任何IP上都能够登录,受权指定虚拟机。

  1. 增加交换机

  1. 增加队列并与交互机绑定。

编码:

1. 封装rabbitMQ类。

<?phpuse PhpAmqpLib\Connection\AMQPStreamConnection;use PhpAmqpLib\Message\AMQPMessage;/** * Class RabbitMQ. */class RabbitMQ{    const READ_LINE_NUMBER = 0;    const READ_LENGTH      = 1;    const READ_DATA        = 2;    public $config;    public static $prefix   = 'autoinc_key:';    protected $exchangeName = 'flow';    protected $queueName    = 'flow_queue';    /**     * @var \PhpAmqpLib\Connection\AMQPStreamConnection     */    protected $connection;    /**     * @var \PhpAmqpLib\Channel\AMQPChannel     */    protected $channel;    protected $queue;        //配置项    private $host;    private $port;    private $user;    private $pass;    private $vhost;    public function __construct($config = [])    {        //$this->config = $config;        //设置rabbitmq配置值        $this->host  = '192.168.1.101';        $this->port  = 5672;        $this->user  = 'beiqiaosu';        $this->pass  = 'beiqiaosu';        $this->vhost = 'report';        $this->connect();    }    public function __call($method, $args = [])    {        $reConnect = false;        while (1) {            try {                $this->initChannel();                $result = call_user_func_array([$this->channel, $method], $args);            } catch (\Exception $e) {                //已重连过,依然报错                if ($reConnect) {                    throw $e;                }                \Swoole::$php->log->error(__CLASS__ . ' [' . posix_getpid() . "] Swoole RabbitMQ[{$this->config['host']}:{$this->config['port']}] Exception(Msg=" . $e->getMessage() . ', Code=' . $e->getCode() . "), RabbitMQ->{$method}, Params=" . var_export($args, 1));                if ($this->connection) {                    $this->close();                }                $this->connect();                $reConnect = true;                continue;            }            return $result;        }        //不可能到这里        return false;    }    /**     * 连贯rabbitmq音讯队列.     *     * @return bool     */    public function connect()    {        try {            if ($this->connection) {                unset($this->connection);            }            $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pass, $this->vhost);        } catch (\Exception $e) {            echo __CLASS__ ."Swoole RabbitMQ Exception'".$e->getMessage();            return false;        }    }    /**     * 敞开连贯.     */    public function close()    {        $this->channel->close();        $this->connection->close();    }    /**     * 设置交换机名称.     *     * @param string $exchangeName     */    public function setExchangeName($exchangeName = '')    {        $exchangeName && $this->exchangeName = $exchangeName;    }    /**     * 设置队列名称.     *     * @param string $queueName     */    public function setQueueName($queueName = '')    {        $queueName && $this->queueName = $queueName;    }    /**     * 设置频道.     */    public function initChannel()    {        if (!$this->channel) {            //通道            $this->channel = $this->connection->channel();            $this->channel->queue_declare($this->queueName, false, true, false, false);            $this->channel->exchange_declare($this->exchangeName, 'direct', false, true, false);            $this->channel->queue_bind($this->queueName, $this->exchangeName);        }    }    /**     * 获取队列数据.     *     * @return mixed     */    public function pop()    {        while (1) {            try {                $this->connect();                $this->initChannel();                $message = $this->channel->basic_get($this->queueName);                                if ($message) {                    $this->channel->basic_ack($message->delivery_info['delivery_tag']);                    $result = $message->body;                } else {                    throw new \Exception('Empty Queue Data');                }            } catch (\Exception $e) {                //\Swoole::$php->log->error(__CLASS__ . " [" . posix_getpid() . "] Swoole RabbitMQ[{$this->config['host']}:{$this->config['port']}] Exception(Msg=" . $e->getMessage() . ", Code=" . $e->getCode() . ")");                sleep(1);                continue;            }            return $result;        }                //不可能到这里        return false;    }    /**     * 插入队列数据.     *     * @param $data     *     * @return bool     */    public function push($data)    {            while (1) {                    try {                $this->connect();                $this->initChannel();                $message = new AMQPMessage($data, ['content_type'=>'text/plain', 'devlivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);                $this->channel->basic_publish($message, $this->exchangeName);            } catch (\Exception $e) {                echo "$e->getMessage()";                continue;            }            return true;        }                //不可能到这里        return false;    }}

2. 操作mq,出队,入队。

<?phprequire_once "vendor/autoload.php";require_once "component/RabbitMQ.php";$mq = new RabbitMQ();// 音讯生产测试/*try {    $res = $mq->pop();    }catch(\Exception $e) {        var_dump($e->getMessage());die;}*/// 音讯生产测试try {    $res = $mq->push(json_encode(['name'=>'beiqiaosu','order_id'=>'2020070115261425155']));    }catch(\Exception $e) {        var_dump($e->getMessage());die;}var_dump($res);die;

测试:

  1. 先通过生产音讯(入队)办法运行一下,而后进入队列中get message查看音讯总数。

  1. 测试调用生产,再查看总数。