转发自白狼栈:查看原文

在应用 hyperf 官网自带的 AMQP 队列时你会发现,不须要咱们再额定启动过程对音讯进行生产。这是因为默认状况下,应用 @Consumer 注解时,hyperf 会为咱们主动创立子过程启动消费者,并且会在子过程异样退出后,从新拉起。

来看一个简略的例子。

1、创立producer

php bin/hyperf.php gen:amqp-producer DemoProducer

2、投递音讯

namespace App\Controller;use App\Amqp\Producer\DemoProducer;use Hyperf\Amqp\Producer;use Hyperf\Utils\ApplicationContext;class IndexController extends AbstractController{    public function index()    {        $user = $this->request->input('user', 'Hyperf');        $data = [            'message' => "Hello {$user}.",        ];        $message = new DemoProducer($data);        $producer = ApplicationContext::getContainer()->get(Producer::class);        $producer->produce($message);        return 'ok';    }}

3、创立消费者

php bin/hyperf.php gen:amqp-consumer DemoConsumer

4、测试

启动我的项目,浏览器拜访 http://127.0.0.1:9501/ ,咱们能够在控制台看到打印的音讯输入。

Array(    [method] => GET    [message] => Hello Hyperf.)[DEBUG] 1 acked.

这个是 hyperf 自启动过程对音讯进行生产。

当初咱们想把生产过程从我的项目中剥离进去,用其余机器独自进行生产。

咱们参考 hyperf 自启动的流程+command 实现。

上面先看看 hyperf 源码是怎么实现自启动消费者过程的。

首先咱们留神到 Hyperf\Amqp\Listener\BeforeMainServerStartListener 这个监听器,在 BeforeMainServerStart 或者 MainCoroutineServerStart 事件被触发时,Hyperf\Amqp\ConsumerManager::run 办法被执行。

public function process(object $event){    // Init the consumer process.    $consumerManager = $this->container->get(ConsumerManager::class);    $consumerManager->run();}

Hyperf\Amqp\ConsumerManager::run 办法如下:


大抵步骤都在图上进行了标注,能够看到整个过程都比较简单。

上面咱们参考这个过程,依据 command 来手动创立生产过程。

1、原 consumer 先禁用

【App\Amqp\Consumer\DemoConsumer】enable 设置 falsepublic function isEnable(): bool{    return false;}

2、手动创立一个新的 consumer 用于测试

<?phpdeclare(strict_types=1);namespace App\Amqp\Consumer;use Hyperf\Amqp\Result;use Hyperf\Amqp\Annotation\Consumer;use Hyperf\Amqp\Message\ConsumerMessage;use PhpAmqpLib\Message\AMQPMessage;class DemoConsumer2 extends ConsumerMessage{    public $exchange = 'hyperf';    public $routingKey = 'hyperf';    public $queue = 'hyperf';    public function consumeMessage($data, AMQPMessage $message): string    {        print_r($data);        return Result::ACK;    }    public function isEnable(): bool    {        return false;    }}

在这个 consumer 中,咱们手动指定了 exchange、routingKey 和 queue,同时禁止自启动(enable=false)。

3、构建 command

php bin/hyperf.php gen:command DemoConsumerCommand【App\Command\DemoConsumerCommand】代码如下:<?phpdeclare(strict_types=1);namespace App\Command;use App\Amqp\Consumer\DemoConsumer;use App\Amqp\Consumer\DemoConsumer2;use Hyperf\Amqp\Consumer;use Hyperf\Command\Command as HyperfCommand;use Hyperf\Command\Annotation\Command;use Hyperf\Di\Annotation\AnnotationCollector;use Psr\Container\ContainerInterface;use Hyperf\Amqp\Annotation\Consumer as ConsumerAnnotation;/** * @Command */#[Command]class DemoConsumerCommand extends HyperfCommand{    /**     * @var ContainerInterface     */    protected $container;    public function __construct(ContainerInterface $container)    {        $this->container = $container;        parent::__construct('DemoConsumer:command');    }    public function configure()    {        parent::configure();        $this->setDescription('手动启动生产过程测试');    }    public function handle()    {        $consumer = $this->container->get(Consumer::class);        $consumer->consume(make(DemoConsumer2::class));        $this->line('ok.', 'info');    }}

4、重新启动 hyperf 以及另起一个窗口启动 command

启动 hyperf : php bin/hyperf.php start 启动 command: php bin/hyperf.php DemoConsumer:command

5、测试

浏览器拜访 http://127.0.0.1:9501/ ,咱们能够在启动 command 的窗口看到音讯被胜利输入。