关于php:用扩展的方式在-PHP-中使用-Kafka

前言:

因为之前在 PHP 中应用 Kafka 是通过 composer 包的形式,因为 nmred/kafka-php 很久没有保护,并且网上相干问题的文章也比拟少。所以我这次换成 PHP 扩大 RdKafka 持续应用,次要介绍扩大装置和这种形式的基本操作。 

装置:

1. 下载地址。

找到与本人环境匹配的就能够

2. 目录

因为 php-rdkafka 依赖 librdkafka,linux 就须要先装置 librdkafka 后装置 php-rdkafka,而 windows 版本是如下几个文件,装置办法如下:

(1). 将 librdkafka.dll 和 librdkafka.pdb 放入 PHP 装置的根目录下,而 php_rdkafka.dll 和 php_rdkafka.pdb 放入 PHP 装置目录的 ext 下。

(2). php.ini 配置文件增加 extension=php_rdkafka.dll,最初重启 PHP。

(3). php-m 或这 phpinfo (); 就能够查看到扩大了。

通过 get_declared_classes() 也能够查看到扩大里预设的函数了。

应用:

1. 生产

public function kafkaTest()
{
    $rk = new \RdKafka\Producer();
    $rk->addBrokers("127.0.0.1:9092");
    $topic = $rk->newTopic("shop");

    $ret = [];
    for ($i = 0; $i < 5; $i++) {
        $content = "第" . $i . "次发送失败";
        $message = ["mobile" => "15623652142", "content" => $content];
        $payload = json_encode($message);

        // 指定向0号partition生产数据
        $ret[]['produce_res'] = $topic->produce(0, 0, $payload, "sms_$i");

        // 随机抉择partition
        //$topic->produce(RD_KAFKA_PARTITION_UA, 0, $payload);
        if ($rk->getOutQLen() > 0) {
            $ret[]['produce_poll'] = $rk->poll(500);
        } else {
            $ret[]['produce_poll'] = $rk->poll(0);
        }
    }

    dump($ret);
}

2. 生产(从指定的 partition 生产)

protected function execute(Input $input, Output $output)
{
    $output->writeln("!!!hello kafka!!!");

    $conf = new \RdKafka\Conf();
    $conf->set('group.id', 'sms-consumer-group');

    $rk = new \RdKafka\Consumer($conf);
    $rk->addBrokers("127.0.0.1:9092");

    $topicConf = new \RdKafka\TopicConf();
    $topicConf->set('auto.commit.interval.ms', 100);
    $topicConf->set('offset.store.method', 'file');
    $topicConf->set('offset.store.path', sys_get_temp_dir());
    $topicConf->set('auto.offset.reset', 'smallest');
    $topic = $rk->newTopic("shop", $topicConf);
    $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

    while(true) {
        // 设置生产时的工夫距离,单位毫秒,以下示意5秒生产一个
        $message = $topic->consume(0, 5000);
        if ($message) {
            echo "读取到音讯\n\r";

            // 音讯对象,包含音讯主题,音讯创立工夫戳,音讯分区编号,音讯主体,音讯键名,音讯长度等
            var_dump($message);

            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    echo "读取音讯胜利:\n\r";
                    var_dump($message->payload);
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    echo "读取音讯失败\n\r";
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    echo "申请超时\n\r";
                    break;
                default:
                    throw new \Exception($message->errstr(), $message->err);
                    break;
            }
        } else {
            echo "未读取到音讯\n\r";
        }
    }

    $output->writeln("!!!the end!!!");
}

其余:

在执行生产过程中,发现 kafka 进行服务,抛出的异样:ERROR Shutdown broker because all log dirs in /tmp/kafka-logs have failed。 

解决办法:

删除 kafka-logs 下的所有日志,再重新启动 Kafaka, kafka-server-start.bat ….\config\server.properties & 

【腾讯云】轻量 2核2G4M,首年65元

阿里云限时活动-云数据库 RDS MySQL  1核2G配置 1.88/月 速抢

本文由乐趣区整理发布,转载请注明出处,谢谢。

您可能还喜欢...

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据