前言:
上次介绍了RabbitMQ在windows下的装置,以及PHP的AMQP扩大的增加。这里将本地测试的代码放到了服务,所以也就是在Linux上再装一遍。然而这里就省掉间接进入应用。次要是RabbitMQ装置结束后,管控台的应用以及在PHP中如何调用其接口实现音讯的生产和生产。
步骤:
1. Linux装置完rabbitMQ后,没有配置文件,须要将rabbitmq.conf.example复制下载下来改为rabbitmq.conf。
2. 能够批改默认Vhost和管理员默认账号密码,最重要的是要将管理员凋谢给所有IP都能够登录(默认是只能是本地能够登录),批改形式如下。
3. 进入到管控台后,先创立vhost,点击admin,侧栏框点击Virtual Hosts。
4. 增加Exchanges,先抉择对应的Vhost
5. 增加队列,抉择Queues,抉择对应的Vhost
6. 队列增加结束后抉择其中一个增加的名称进入。
7. 绑定之前的Exchanges中其中一个,输出exchanges名字和自定义一个路由Key。
8. 胜利后就能够通过PHP代码生产音讯到该虚拟机,而后通过交换机路由到该队列中。
<?php
namespace rabbitmq;
class Amq
{
/**
* @var object 对象实例
*/
protected static $instance;
protected $exchange='router_visit'; // 交换机(须要在队列中绑定)
protected $queue ='visit_log'; // 队列
protected $route ='router_visit'; // 路由key(须要在队列中绑定)
protected $consumer_tag='consumer';
protected $config = [
'host' => '146.53.206.264',
'port' => 5672,
'login' => 'guest', //guest
'password' => 'guest', //Na18gR@9tf
'vhost' => 'log',
'amqp_debug' => true
];
protected $exchange_index = 0;
protected $exchange_type = [
'direct',
'fanout',
'topic',
'headers'
];
/**
* @note 实例化
* @author: beiqiaosu
* @since: 2019/11/13 16:10
*/
public static function getInstance()
{
if (!(self::$instance instanceof self)) {
self::$instance = new self();
}
return self::$instance;
}
/**
* @Notes: 音讯生产
*/
public function publisher($message,$config=[]) {
//如果有配置就用新配置
$this->config ['vhost'] = $config['vhost']?? $this->config ['vhost'];
$this->exchange = $config['exchange']?? $this->exchange;
$this->queue = $config['queue']?? $this->queue;
$this->consumer_tag = $config['consumer_tag']?? $this->consumer_tag;
$this->route = $config['route']?? $this->route;
$this->exchange_index = $config['exchange_index']?? $this->exchange_index;
$cnn = new \AMQPConnection($this->config);
if (!$cnn->connect()) {
echo "Cannot connect to the broker";
exit();
}
$channel = new \AMQPChannel($cnn);
$ex = new \AMQPExchange($channel);
$ex->setName($this->exchange);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$ex->setFlags(AMQP_DURABLE); //长久化
$ex->declareExchange();
return "Send Message:".$ex->publish($message, $this->route, AMQP_NOPARAM, array('delivery_mode' => 2))."\n";
}
/**
* @note 生产
* @author: tata
* @since: 2019/11/13 16:10
*/
public function consumer() {
$exchange='router_visit'; //交换机
$queue ='visit_log'; //队列
$route ='router_visit'; //路由
//连贯broker
$cnn = new \AMQPConnection($this->config);
if (!$cnn->connect()) {
echo "Cannot connect to the broker";
exit();
}
$channel = new \AMQPChannel($cnn);
$ex = new \AMQPExchange($channel);
//设置交换机名称
$ex->setName($exchange);
//设置交换机类型
//AMQP_EX_TYPE_DIRECT:直连交换机
//AMQP_EX_TYPE_FANOUT:扇形交换机
//AMQP_EX_TYPE_HEADERS:头交换机
//AMQP_EX_TYPE_TOPIC:主题交换机
$ex->setType(AMQP_EX_TYPE_DIRECT);
//设置交换机长久
$ex->setFlags(AMQP_DURABLE);
//申明交换机
$ex->declareExchange();
//创立一个音讯队列
$q = new \AMQPQueue($channel);
//设置队列名称
$q->setName($queue);
//设置队列长久
$q->setFlags(AMQP_DURABLE);
//申明音讯队列
//$q->declareQueue();
//交换机和队列通过$route进行绑定
$q->bind($exchange, $route);
$ret = $q->consume(function($envelope, $queue) {
// 取出音讯主题转为数组
// $origin_data = json_decode($envelope->getBody(),true);
// dump($envelope->getBody());die;
/**对音讯主题执行业务**/
$res = true;
/**对音讯主题执行业务**/
// 业务处理完毕发送给MQ生产掉该音讯
if ($res) $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
});
dump($ret);die;
$cnn->disconnect();
}
}
9. 每执行一次生产或生产代码,能够在Queue中的统计或图表中看到,测试代码是否胜利。
发表回复