简略封装了一个rabbitmq类(业务代码轻易写的)

首先是账号密码配置

config.php

<?php    return $arr = [        'RabbitMq' => [            // Rabbitmq 服务地址            'host' => '127.0.0.1',            // Rabbitmq 服务端口            'port' => '5672',            // Rabbitmq 帐号            'login' => 'guest',            // Rabbitmq 明码            'password' => 'guest',            'vhost'=>'/'        ]    ];

基类 base.php

<?phpinclude dirname(__FILE__).'/object.php';include dirname(__FILE__).'/config.php';class RabbitMq implements object{    //保留类实例的动态成员变量    static private $_instance;    static private $_conn;    static private $amp ;    static private $route = 'key_1';    static private $q ;    static private $ex ;    static private $queue;    public static function getInstance(){        global $arr;        if (!(self::$_instance instanceof self)) {            self::$_instance = new self($arr['RabbitMq']);            return self::$_instance;        }        return self::$_instance;    }    private function __construct($conn)    {        //创立连贯和channel        $conn = new AMQPConnection($conn);        if(!$conn->connect()) {            die("Cannot connect to the broker!n");        }        self::$_conn = new AMQPChannel($conn);        self::$amp = $conn;    }    /* *     *     * parm 交换机名     * parm 队列名     *     * */    public function listen($exchangeName,$queuename){        self::$queue = $queuename;        return $this->setExchange($exchangeName,$queuename);    }    //连贯交换机    public function setExchange($exchangeName,$queueName){        //创立交换机        $ex = new AMQPExchange(self::$_conn);        self::$ex = $ex;        $ex->setName($exchangeName);        $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型        $ex->setFlags(AMQP_DURABLE); //长久化        $ex->declare();        return self::setQueue($queueName,$exchangeName);    }    //创立队列    private static function setQueue($queueName,$exchangeName){    //  创立队列        $q = new AMQPQueue(self::$_conn);        $q->setName($queueName);        $q->setFlags(AMQP_DURABLE);        $q->declareQueue();    // 用于绑定队列和交换机        $routingKey = self::$route;        $q->bind($exchangeName,  $routingKey);        self::$q = $q;        return(self::$_instance);    }    /* * 消费者 * $fun_name = array($classobj,$function) or function name string * $autoack 是否自动应答 * * function processMessage($envelope, $queue) {        $msg = $envelope->getBody();        echo $msg."n"; //解决音讯        $queue->ack($envelope->getDeliveryTag());//手动应答    } */    public function run($func, $autoack = True){        if (!$func || !self::$q) return False;        while(True){            if ($autoack) {                if(!self::$q->consume($func, AMQP_AUTOACK)){//                    self::$q->ack($envelope->getDeliveryTag());                   //失败之后会默认进入 noack 队列。下次从新开启会再次调用,目前还不分明 回调配置应该这里做一个失败反馈             //todu                }            }             self::$q->consume($func);        }    }    private static function closeConn(){        self::$amp->disconnect();    }    public function pushlish($msg){        while (1) {            sleep(1);            if (self::$ex->publish(date('H:i:s') . "用户" . "注册", self::$route)) {                //写入文件等操作                echo $msg;            }        }    }    //__clone办法避免对象被复制克隆    public function __clone()    {        trigger_error('Clone is not allow!', E_USER_ERROR);    }}

consume 监听类(一个操作对应一个class)

<?phpinclude dirname(__FILE__).'/base.php';class Add{    public static function run(){        $dbms='mysql';     //数据库类型        $host='127.0.01'; //数据库主机名        $dbName='test';    //应用的数据库        $user='root';      //数据库连贯用户名        $pass='admin';          //对应的明码        $dsn="$dbms:host=$host;dbname=$dbName";        sleep(1);        try {            $dbh = new PDO($dsn, $user, $pass); //初始化一个PDO对象            /*你还能够进行一次搜寻操作            foreach ($dbh->query('SELECT * from FOO') as $row) {                print_r($row); //你能够用 echo($GLOBAL); 来看到这些值            }            */            $dbh = null;        } catch (PDOException $e) {            die ("Error!: " . $e->getMessage() . "<br/>");        }//默认这个不是长连贯,如果须要数据库长连贯,须要最初加一个参数:array(PDO::ATTR_PERSISTENT => true) 变成这样:        $db = new PDO($dsn, $user, $pass, array(PDO::ATTR_PERSISTENT => true));        $sql = 'INSERT INTO `test`.`t_reg`(`names`) VALUES (9)';        $row = $db->query($sql);        if(!$row){           return false;        }            echo 'OK';    }}$consume = new Add();//tudo//$s = RabbitMq::getInstance()->listen('jiaohuanji','queue1')->run(array($consume,'run')); 将run函数带入到consume外面作为回调 在consume外面减少$funname ,减少代码粘性$s = RabbitMq::getInstance()->listen('jiaohuanji','queue1')->run(array($consume,'run'));

push 类(发送者)

<?phpinclude "base.php";RabbitMq::getInstance()->listen('jiaohuanji','queue1')->pushlish('申请已发送');

接口interface

<?php    interface object    {        public static function getInstance();    }

监听 add.php

执行 send.php 即可实现简略的rabit操作

以上内容心愿帮忙到大家,很多PHPer在进阶的时候总会遇到一些问题和瓶颈,业务代码写多了没有方向感,不晓得该从那里动手去晋升,对此我整顿了一些PHP高级、架构视频材料和大厂PHP面试PDF| 收费获取能够关注公众号:PHP开源社区