乐趣区

PHP实战RabbitMQ之生产者Confirm篇

简介

为提高系统高可用,生产者在发送 message 需要通过 MQ 回复 ACK 来确保 message 被成功存储。

注:图片来源微信公众号:架构师之路

RabbitMQ官网对 Publisher Confirms 有相关介绍,可惜 example 代码没有 php 版本的,且 php-amqplib 也是一帮志愿者在维护,精力有限,没有完整的使用 example

php-amqblib也有开发者提了 Issues 希望完善example
https://github.com/php-amqpli…

本文将会使用 php 来实现Publisher Confirms,希望对大家有所帮助,代码已提交到github:https://github.com/jiaoyang3/…

几种策落

事务

事务可以保证原子性操作 RabbitMQ,使用起来也很简单。

$channel->tx_select();//begin trx
$channel->tx_commit();//commit trx
$channel->tx_rollback();//rollback

测试 example,运行之后发现没有message 提交到queue

<?php

require_once '../../vendor/autoload.php';

use PhpAmqpLib\Message\AMQPMessage;
use RabbitMQ\RabbitMQ;

try {$rabbit  = new RabbitMQ();
    $channel = $rabbit->getChannel();
    $channel->tx_select();//begin trx

    $queueName = 'test-single-queue2';
    $rabbit->createQueue($queueName, false, true, false, false);

    for ($i = 0; $i < 10000; $i++) {$rabbit->sendMessage($i . "this is a test message.", $queueName, '', ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT // 消息持久化,重启 rabbitmq,消息不会丢失]);
        if ($i == 10) {throw new Exception('rollbock');
        }
    }
    $channel->tx_commit();//commit trx
    unset($rabbit);//close
} catch (Exception $e) {$channel->tx_rollback();//rollback
    echo $e->getMessage();}

单条 ack

生产者每发送一条 messageMQ 服务会对出数据进行持久化存储成功之后再回复 ack/nack 消息给生产者。

开启 confirm

$channel->confirm_select();

注册 ack/nack 回掉方法

//ack callback function
$channel->set_ack_handler(function (AMQPMessage $message){echo 'ack' . $message->getBody() . PHP_EOL;
});
//nack callback function
$channel->set_nack_handler(function (AMQPMessage $message){echo 'ack' . $message->getBody() .PHP_EOL;
});

设置 ack/nack 超时时间

    $channel->wait_for_pending_acks_returns(5);//set wait time

测试代码如下,完整代码可访问 github

<?php

require_once '../../vendor/autoload.php';

use PhpAmqpLib\Message\AMQPMessage;
use RabbitMQ\RabbitMQ;


$rabbit  = new RabbitMQ();
$channel = $rabbit->getChannel();
$channel->confirm_select();//open confirm
//ack callback function
$channel->set_ack_handler(function (AMQPMessage $message){echo 'ack' . $message->getBody() . PHP_EOL;
});
//nack callback function
$channel->set_nack_handler(function (AMQPMessage $message){echo 'ack' . $message->getBody() .PHP_EOL;
});

$queueName = 'test-single-queue1';
$rabbit->createQueue($queueName, false, true, false, false);

for ($i = 0; $i < 10000; $i++) {
    $message = $i . "this is a test message.";
    $rabbit->sendMessage($message, $queueName, '', ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
    echo $message . 'has been sent' . PHP_EOL;
    $channel->wait_for_pending_acks_returns(5);//set wait time
    sleep(1);
}

unset($rabbit);// 关闭连接

多条 ack

每条 messageack性能很低
这里进行批量 ack,设置每 50 个message 集中ack,代码也很简单

<?php

require_once '../../vendor/autoload.php';

use PhpAmqpLib\Message\AMQPMessage;
use RabbitMQ\RabbitMQ;


$rabbit  = new RabbitMQ();
$channel = $rabbit->getChannel();
$channel->confirm_select();//open confirm
//ack callback function
$channel->set_ack_handler(function (AMQPMessage $message){echo 'ack' . $message->getBody() . PHP_EOL;
});
//nack callback function
$channel->set_nack_handler(function (AMQPMessage $message){echo 'ack' . $message->getBody() .PHP_EOL;
});


$queueName = 'test-single-queue';
$rabbit->createQueue($queueName, false, true, false, false);

// 每次都 ack 性能很低,批量 ack,设置每 50 个 message 集中 ack
$batchSize               = 50;
$outstandingMessageCount = 0;
for ($i = 0; $i < 10000; $i++) {
    $message = $i . "this is a test message.";
    $rabbit->sendMessage($message, $queueName, '', ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT // 消息持久化,重启 rabbitmq,消息不会丢失]);
    echo $message . '已发送' . PHP_EOL;
    if (++$outstandingMessageCount == $batchSize) {
        echo '------';
        $channel->wait_for_pending_acks_returns(5);
        $outstandingMessageCount = 0;
    }
    sleep(1);
}
if ($outstandingMessageCount > 0) {$channel->wait_for_pending_acks_returns(5);
}

unset($rabbit);// 关闭连接

异步 ack

毫无疑问,异步的效率是最高的,可惜没有发现 php-amqplib 并没有实现,一种语言用的人越多生态越好,php已不复当年。

退出移动版