前言

延时队列的作用不再累述
本文使用rabbitmq的queue可以设置ttl时间,将到期的message设为死信,message会被push到delay_queue,消费delay_queue即可实现延时队列功能

code

对RabbitMQ进行简单的封装

<?phpnamespace RabbitMQ;use PhpAmqpLib\Connection\AMQPStreamConnection;use PhpAmqpLib\Message\AMQPMessage;class RabbitMQ{    private $host = '127.0.0.1';    private $port = 5672;    private $user = 'guest';    private $password = 'guest';    protected $connection;    protected $channel;    /**     * RabbitMQ constructor.     */    public function __construct()    {        $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password);        $this->channel    = $this->connection->channel();    }    /**     * 生成信息     * @param $message     */    public function sendMessage($message, $routeKey, $exchange = '', $properties = [])    {        $data = new AMQPMessage(            $message, $properties        );        $this->channel->basic_publish($data, $exchange, $routeKey);    }    /**     * 消费消息     * @param $queueName     * @param $callback     * @throws \ErrorException     */    public function consumeMessage($queueName,$callback)    {        $this->channel->basic_consume($queueName, '', false, false, false, false, $callback);        while ($this->channel->is_consuming()) {            $this->channel->wait();        }    }    /**     * @throws \Exception     */    public function __destruct()    {        $this->channel->close();        $this->connection->close();    }}

创建延时队列

<?phpnamespace RabbitMQ;use PhpAmqpLib\Exchange\AMQPExchangeType;use PhpAmqpLib\Message\AMQPMessage;use PhpAmqpLib\Wire\AMQPTable;/** * 使用RabbitMQ实现延时队列功能 * Class DelayQueue * @package RabbitMQ */class DelayQueue extends RabbitMQ{    /**     * 创建延时队列     * @param $ttl     * @param $delayExName     * @param $delayQueueName     * @param $queueName     */    public function createQueue($ttl, $delayExName, $delayQueueName, $queueName)    {        $args = new AMQPTable([            'x-dead-letter-exchange'    => $delayExName,            'x-message-ttl'             => $ttl, //消息存活时间            'x-dead-letter-routing-key' => $queueName        ]);        $this->channel->queue_declare($queueName, false, true, false, false, false, $args);        //绑定死信queue        $this->channel->exchange_declare($delayExName, AMQPExchangeType::DIRECT, false, true, false);        $this->channel->queue_declare($delayQueueName, false, true, false, false);        $this->channel->queue_bind($delayQueueName, $delayExName, $queueName, false);    }}

生产者,代码很简单,看看运行之后的效果,订单的message越来越多

<?phprequire_once '../vendor/autoload.php';// 生产者$delay = new \RabbitMQ\DelayQueue();$ttl            = 1000 * 100;//订单100s后超时$delayExName    = 'delay-order-exchange';//超时exchange$delayQueueName = 'delay-order-queue';//超时queue$queueName      = 'ttl-order-queue';//订单queue$delay->createQueue($ttl, $delayExName, $delayQueueName, $queueName);//100个订单信息,每个订单超时时间都是10sfor ($i = 0; $i < 100; $i++) {    $data = [        'order_id' => $i + 1,        'remark'   => 'this is a order test'    ];    $delay->sendMessage(json_encode($data), $queueName);    sleep(1);}

消费者,看看消费之后的,过一会会观察到,已经有到期message被push到了delay_order_queue

消费者也消费到了message

<?phprequire_once '../vendor/autoload.php';// 消费者$delay = new \RabbitMQ\DelayQueue();$delayQueueName = 'delay-order-queue';$callback = function ($msg) {    echo $msg->body . PHP_EOL;    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);    //处理订单超时逻辑,给用户推送提醒等等。。。    sleep(10);};/** * 消费已经超时的订单信息,进行处理 */$delay->consumeMessage($delayQueueName, $callback);

代码见:https://github.com/jiaoyang3/...