关于rabbitmq:RabbitMQ业务解耦实战

43次阅读

共计 3058 个字符,预计需要花费 8 分钟才能阅读完成。

前言:

上次介绍了 RabbitMQ 在 windows 下的装置,以及 PHP 的 AMQP 扩大的增加。这里将本地测试的代码放到了服务,所以也就是在 Linux 上再装一遍。然而这里就省掉间接进入应用。次要是 RabbitMQ 装置结束后,管控台的应用以及在 PHP 中如何调用其接口实现音讯的生产和生产。

步骤:

1. Linux 装置完 rabbitMQ 后,没有配置文件,须要将 rabbitmq.conf.example 复制下载下来改为 rabbitmq.conf。

2. 能够批改默认 Vhost 和管理员默认账号密码,最重要的是要将管理员凋谢给所有 IP 都能够登录 (默认是只能是本地能够登录),批改形式如下。

3. 进入到管控台后,先创立 vhost,点击 admin,侧栏框点击 Virtual Hosts。

4. 增加 Exchanges,先抉择对应的 Vhost

5. 增加队列,抉择 Queues,抉择对应的 Vhost

6. 队列增加结束后抉择其中一个增加的名称进入。

7. 绑定之前的 Exchanges 中其中一个,输出 exchanges 名字和自定义一个路由 Key。

 8. 胜利后就能够通过 PHP 代码生产音讯到该虚拟机,而后通过交换机路由到该队列中。
 

<?php


namespace rabbitmq;


class Amq
{
    /**
     * @var object 对象实例
     */
    protected static $instance;

    protected $exchange='router_visit';  // 交换机 (须要在队列中绑定)
    protected $queue ='visit_log';       // 队列
    protected $route ='router_visit';    // 路由 key(须要在队列中绑定)protected $consumer_tag='consumer';
    protected $config = [
        'host' => '146.53.206.264',
        'port' => 5672,
        'login' => 'guest',        //guest
        'password' => 'guest',    //Na18gR@9tf
        'vhost' => 'log',
        'amqp_debug' => true
    ];
    protected $exchange_index = 0;
    protected $exchange_type = [
        'direct',
        'fanout',
        'topic',
        'headers'
    ];

    /**
     * @note 实例化
     * @author: beiqiaosu
     * @since: 2019/11/13 16:10
     */
    public static function getInstance()
    {if (!(self::$instance instanceof self)) {self::$instance = new self();
        }
        return self::$instance;
    }

    
    /**
     * @Notes: 音讯生产
     */
    public function publisher($message,$config=[]) {
        // 如果有配置就用新配置
        $this->config ['vhost'] = $config['vhost']?? $this->config ['vhost'];
        $this->exchange = $config['exchange']?? $this->exchange;
        $this->queue = $config['queue']?? $this->queue;

        $this->consumer_tag = $config['consumer_tag']?? $this->consumer_tag;
        $this->route = $config['route']?? $this->route;
        $this->exchange_index = $config['exchange_index']?? $this->exchange_index;

        $cnn = new \AMQPConnection($this->config);
        if (!$cnn->connect()) {
            echo "Cannot connect to the broker";
            exit();}

        $channel = new \AMQPChannel($cnn);
        $ex = new \AMQPExchange($channel);
        $ex->setName($this->exchange);

        $ex->setType(AMQP_EX_TYPE_DIRECT); //direct 类型

        $ex->setFlags(AMQP_DURABLE); // 长久化
        $ex->declareExchange();
        return "Send Message:".$ex->publish($message, $this->route, AMQP_NOPARAM, array('delivery_mode' => 2))."\n";
    }


    /**
     * @note 生产
     * @author: tata
     * @since: 2019/11/13 16:10
     */
    public function consumer() {

        $exchange='router_visit';       // 交换机
        $queue ='visit_log';         // 队列
        $route ='router_visit';        // 路由

        // 连贯 broker
        $cnn = new \AMQPConnection($this->config);
        if (!$cnn->connect()) {
            echo "Cannot connect to the broker";
            exit();}
        $channel = new \AMQPChannel($cnn);
        $ex = new \AMQPExchange($channel);
        // 设置交换机名称
        $ex->setName($exchange);
        // 设置交换机类型
        //AMQP_EX_TYPE_DIRECT: 直连交换机
        //AMQP_EX_TYPE_FANOUT: 扇形交换机
        //AMQP_EX_TYPE_HEADERS: 头交换机
        //AMQP_EX_TYPE_TOPIC: 主题交换机
        $ex->setType(AMQP_EX_TYPE_DIRECT);
        // 设置交换机长久
        $ex->setFlags(AMQP_DURABLE);
        // 申明交换机
        $ex->declareExchange();
        // 创立一个音讯队列
        $q = new \AMQPQueue($channel);
        // 设置队列名称
        $q->setName($queue);
        // 设置队列长久
        $q->setFlags(AMQP_DURABLE);
        // 申明音讯队列
        //$q->declareQueue();
        // 交换机和队列通过 $route 进行绑定
        $q->bind($exchange, $route);

        $ret = $q->consume(function($envelope, $queue) {

            // 取出音讯主题转为数组
//            $origin_data = json_decode($envelope->getBody(),true);
//            dump($envelope->getBody());die;

            /** 对音讯主题执行业务 **/
            $res = true;
            /** 对音讯主题执行业务 **/

            // 业务处理完毕发送给 MQ 生产掉该音讯
            if ($res) $queue->ack($envelope->getDeliveryTag()); // 手动发送 ACK 应答
        });

        dump($ret);die;

        $cnn->disconnect();}

}

 9. 每执行一次生产或生产代码,能够在 Queue 中的统计或图表中看到,测试代码是否胜利。
 

 

正文完
 0