转发自白狼栈:查看原文
在应用 hyperf 官网自带的 AMQP 队列时你会发现,不须要咱们再额定启动过程对音讯进行生产。这是因为默认状况下,应用 @Consumer 注解时,hyperf 会为咱们主动创立子过程启动消费者,并且会在子过程异样退出后,从新拉起。
来看一个简略的例子。
1、创立 producer
php bin/hyperf.php gen:amqp-producer DemoProducer
2、投递音讯
namespace App\Controller;
use App\Amqp\Producer\DemoProducer;
use Hyperf\Amqp\Producer;
use Hyperf\Utils\ApplicationContext;
class IndexController extends AbstractController
{public function index()
{$user = $this->request->input('user', 'Hyperf');
$data = ['message' => "Hello {$user}.",
];
$message = new DemoProducer($data);
$producer = ApplicationContext::getContainer()->get(Producer::class);
$producer->produce($message);
return 'ok';
}
}
3、创立消费者
php bin/hyperf.php gen:amqp-consumer DemoConsumer
4、测试
启动我的项目,浏览器拜访 http://127.0.0.1:9501/,咱们能够在控制台看到打印的音讯输入。
Array
([method] => GET
[message] => Hello Hyperf.
)
[DEBUG] 1 acked.
这个是 hyperf 自启动过程对音讯进行生产。
当初咱们想把生产过程从我的项目中剥离进去,用其余机器独自进行生产。
咱们参考 hyperf 自启动的流程 +command 实现。
上面先看看 hyperf 源码是怎么实现自启动消费者过程的。
首先咱们留神到 Hyperf\Amqp\Listener\BeforeMainServerStartListener 这个监听器,在 BeforeMainServerStart 或者 MainCoroutineServerStart 事件被触发时,Hyperf\Amqp\ConsumerManager::run 办法被执行。
public function process(object $event)
{
// Init the consumer process.
$consumerManager = $this->container->get(ConsumerManager::class);
$consumerManager->run();}
Hyperf\Amqp\ConsumerManager::run 办法如下:
大抵步骤都在图上进行了标注,能够看到整个过程都比较简单。
上面咱们参考这个过程,依据 command 来手动创立生产过程。
1、原 consumer 先禁用
【App\Amqp\Consumer\DemoConsumer】enable 设置 false
public function isEnable(): bool
{return false;}
2、手动创立一个新的 consumer 用于测试
<?php
declare(strict_types=1);
namespace App\Amqp\Consumer;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use PhpAmqpLib\Message\AMQPMessage;
class DemoConsumer2 extends ConsumerMessage
{
public $exchange = 'hyperf';
public $routingKey = 'hyperf';
public $queue = 'hyperf';
public function consumeMessage($data, AMQPMessage $message): string
{print_r($data);
return Result::ACK;
}
public function isEnable(): bool
{return false;}
}
在这个 consumer 中,咱们手动指定了 exchange、routingKey 和 queue,同时禁止自启动(enable=false)。
3、构建 command
php bin/hyperf.php gen:command DemoConsumerCommand【App\Command\DemoConsumerCommand】代码如下:<?php
declare(strict_types=1);
namespace App\Command;
use App\Amqp\Consumer\DemoConsumer;
use App\Amqp\Consumer\DemoConsumer2;
use Hyperf\Amqp\Consumer;
use Hyperf\Command\Command as HyperfCommand;
use Hyperf\Command\Annotation\Command;
use Hyperf\Di\Annotation\AnnotationCollector;
use Psr\Container\ContainerInterface;
use Hyperf\Amqp\Annotation\Consumer as ConsumerAnnotation;
/**
* @Command
*/
#[Command]
class DemoConsumerCommand extends HyperfCommand
{
/**
* @var ContainerInterface
*/
protected $container;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
parent::__construct('DemoConsumer:command');
}
public function configure()
{parent::configure();
$this->setDescription('手动启动生产过程测试');
}
public function handle()
{$consumer = $this->container->get(Consumer::class);
$consumer->consume(make(DemoConsumer2::class));
$this->line('ok.', 'info');
}
}
4、重新启动 hyperf 以及另起一个窗口启动 command
启动 hyperf : php bin/hyperf.php start
启动 command: php bin/hyperf.php DemoConsumer:command
5、测试
浏览器拜访 http://127.0.0.1:9501/,咱们能够在启动 command 的窗口看到音讯被胜利输入。