关于php:rabbitmq常见功能封装-php版本

8次阅读

共计 3491 个字符,预计需要花费 9 分钟才能阅读完成。

在我的项目中 rabbitmq 失去了宽泛的时候,这里对 rabbitmq 的惯例性能做了一个简略的总结,并封装成了 composer 包,composer 包地址、github 地址,欢送 fork,因为程度无限,不免存在 bug,欢送提出宝贵意见

easy-rabbitmq 包简介

对 php-amqplib/php-amqplib 包的二次封装,为常见性能提供一套开箱即用的生产解决方案
。目前反对的性能列表如下:

  • 推送音讯到直连交换机 (含提早音讯)
  • 推送音讯到扇形交换机 (含提早音讯)
  • 推送音讯到主题交换机 (含提早音讯)
  • 订阅模式下的牢靠生产, 消费者生产失败后将会尝试持续生产,最多尝试 5 次。
  • 拉取模式下的牢靠生产, 消费者生产失败后将会尝试持续生产,最多尝试 5 次。

如果还有其它场景,欢送持续补充,随后进行迭代!!

要求

  • 安装包对 PHP 版本对要求次要取决于 php-amqplib/php-amqplib 包自身对要求, 这里为了兼顾 php5.0 的使用者, 咱们应用了 php-amqplib/php-amqplib 包 V2.9.0 的版本。
    具体的要求参照这里。
    不过笔者举荐应用 php7.0 及其以上版本, 这个开发包也是在 7.0 这个版本下面开发实现的!

装置

    composer require maweibinguo/easyrabbitmq

应用

在这里咱们举荐 php 脚本 +supervisor 联合应用,用以保障生产过程的可靠性、加强 worker 的生产能力!如果你还没有据说过 supervisor,能够点击这里理解.

1、推送音讯

1-1、推送音讯到直连交换机

    $config = [
        "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
    ];    
    $instance = RabbitMq::getInstance($config);
    
    // 提早音讯,30 秒中后才会达到指定的交换机
    $instance->pushToDirect($msg = time(), // 音讯体内容
                $exchange = "easy_direct_exchange", // 交换机名称
                $routingKey = "direct_test_queue", // 音讯的 routingkey,consume 办法到 bingdingkey 要和 routingkey 保持一致
                $delaySec = 30 // 提早秒数
    );

    // 无提早,推入到指定到直链交换机
    $instance->pushToDirect($msg = time(), // 音讯体内容
                $exchange = "easy_direct_exchange", // 交换机名称
                $routingKey = "direct_test_queue", // 音讯的 routingkey,consume 办法到 bingdingkey 要和 routingkey 保持一致
    );

1-2、推送音讯到扇形交换机

    $config = [
        "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
    ];    
    $instance = RabbitMq::getInstance($config);
    
    // 提早音讯,30 秒中后才会达到指定的交换机
    $instance->pushToFanout($msg = time(), // 音讯体内容
                $exchange = "easy_fanout_exchange", // 交换机名称
                $delaySec = 30 // 提早秒数
    );

    // 无提早,推入到指定到直链交换机
    $instance->pushToFanout($msg = time(), // 音讯体内容
                $exchange = "easy_fanout_exchange" // 交换机名称
    );

1-3、推送音讯到主题交换机

    $config = [
        "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
    ];    
    $instance = RabbitMq::getInstance($config);
    
    // 提早音讯,30 秒中后才会达到指定的交换机
    $instance->pushToTopic($msg = time(), // 音讯体内容
                $exchange = "easy_topic_exchange", // 交换机名称
                $routingKey = "",
                $delaySec = 30 // 提早秒数
    );

    // 无提早,推入到指定到直链交换机
    $instance->pushToTopic($msg = time(), // 音讯体内容
                $exchange = "easy_topic_exchange", // 交换机名称
                $routingKey = "easy.topic.queue" //routingKey 要同 consum 的 bindingKey 相匹配
                                 //bindingKey 反对两种非凡的字符 "*"、“#”,用作含糊匹配, 其中 "*" 用于匹配一个单词、“#”用于匹配多个单词 (也能够是 0 个)
                                 // 无论是 bindingKey 还是 routingKey, 被 "." 分隔开的每一段独立的字符串就是一个单词, easy.topic.queue, 蕴含三个单词 easy、topic、queue
    );

2、生产音讯

生产反对主动重试,最多尝试重试 5 次,每次生产失败后该音讯将会被从新投入到生产队列中。从新的工夫将会随着失败的次数增多逐步推移, 本客户端反对的推移策略如下:
失败 1 次(1 秒钟后会再被投递), 失败 2 次(2 秒钟后会再被投递), 失败 3 次(4 秒钟后会再被投递), 失败 4 次(8 秒钟后会再被投递), 失败 5 次(16 秒钟后会再被投递)

2-1、订阅模式

订阅模式下的牢靠生产
    $config = [
        "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
    ];    
    $instance = RabbitMq::getInstance($config);
    $instance->consume(
        $queueName = "direct_test_queue",// 订阅的队列名称
        $consumerTag = "c1",// 生产标记
        $exchange = "easy_direct_exchange",// 交换机名称
        $bindingKey = "direct_test_queue",//bindingkey,如果是直链交换机须要同 routingKey 保持一致
        $callback = function($msg){
            $body = $msg->body;
            file_put_contents("./test.log", "time =>" . time() . "\t" . "body =>" . $body . PHP_EOL , FILE_APPEND);
            // 如果返回后果不相对等于 (===)true, 那么将触发音讯重试机制
            return false;
        },
        // 5 次生产生产失败后,失败音讯将会投递到的队列名称
        $failedQueue = "easymq@failed"
    );

2-2、拉取模式

拉取模式下的牢靠生产
    $config = [
        "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
    ];    
    $instance = RabbitMq::getInstance($config);
    $instance->get(
        $queue = "get_queue",
        $exchange = "easy_fanout_exchange",
        $bindingKey = "",
        $callback = function($msg){
            $body = $msg->body;
            file_put_contents("./test.log", "time =>" . time() . "\t" . "body =>" . $body . PHP_EOL , FILE_APPEND);
            // 如果返回后果不相对等于 (===)true, 那么将触发音讯重试机制
            return false;
        },
        // 5 次生产生产失败后,失败音讯将会投递到的队列名称
        $failedQueue = 'easymq@failed'
        );
正文完
 0