共计 2239 个字符,预计需要花费 6 分钟才能阅读完成。
前言:
因为之前在 PHP 中应用 Kafka 是通过 composer 包的形式,因为 nmred/kafka-php 很久没有保护,并且网上相干问题的文章也比拟少。所以我这次换成 PHP 扩大 RdKafka 持续应用,次要介绍扩大装置和这种形式的基本操作。
装置:
1. 下载地址。
找到与本人环境匹配的就能够
2. 目录
因为 php-rdkafka 依赖 librdkafka,linux 就须要先装置 librdkafka 后装置 php-rdkafka,而 windows 版本是如下几个文件,装置办法如下:
(1). 将 librdkafka.dll 和 librdkafka.pdb 放入 PHP 装置的根目录下,而 php_rdkafka.dll 和 php_rdkafka.pdb 放入 PHP 装置目录的 ext 下。
(2). php.ini 配置文件增加 extension=php_rdkafka.dll,最初重启 PHP。
(3). php-m 或这 phpinfo (); 就能够查看到扩大了。
通过 get_declared_classes() 也能够查看到扩大里预设的函数了。
应用:
1. 生产
public function kafkaTest() | |
{$rk = new \RdKafka\Producer(); | |
$rk->addBrokers("127.0.0.1:9092"); | |
$topic = $rk->newTopic("shop"); | |
$ret = []; | |
for ($i = 0; $i < 5; $i++) { | |
$content = "第" . $i . "次发送失败"; | |
$message = ["mobile" => "15623652142", "content" => $content]; | |
$payload = json_encode($message); | |
// 指定向 0 号 partition 生产数据 | |
$ret[]['produce_res'] = $topic->produce(0, 0, $payload, "sms_$i"); | |
// 随机抉择 partition | |
//$topic->produce(RD_KAFKA_PARTITION_UA, 0, $payload); | |
if ($rk->getOutQLen() > 0) {$ret[]['produce_poll'] = $rk->poll(500); | |
} else {$ret[]['produce_poll'] = $rk->poll(0); | |
} | |
} | |
dump($ret); | |
} |
2. 生产(从指定的 partition 生产)
protected function execute(Input $input, Output $output) | |
{$output->writeln("!!!hello kafka!!!"); | |
$conf = new \RdKafka\Conf(); | |
$conf->set('group.id', 'sms-consumer-group'); | |
$rk = new \RdKafka\Consumer($conf); | |
$rk->addBrokers("127.0.0.1:9092"); | |
$topicConf = new \RdKafka\TopicConf(); | |
$topicConf->set('auto.commit.interval.ms', 100); | |
$topicConf->set('offset.store.method', 'file'); | |
$topicConf->set('offset.store.path', sys_get_temp_dir()); | |
$topicConf->set('auto.offset.reset', 'smallest'); | |
$topic = $rk->newTopic("shop", $topicConf); | |
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); | |
while(true) { | |
// 设置生产时的工夫距离,单位毫秒,以下示意 5 秒生产一个 | |
$message = $topic->consume(0, 5000); | |
if ($message) { | |
echo "读取到音讯 \n\r"; | |
// 音讯对象,包含音讯主题,音讯创立工夫戳,音讯分区编号,音讯主体,音讯键名,音讯长度等 | |
var_dump($message); | |
switch ($message->err) { | |
case RD_KAFKA_RESP_ERR_NO_ERROR: | |
echo "读取音讯胜利:\n\r"; | |
var_dump($message->payload); | |
break; | |
case RD_KAFKA_RESP_ERR__PARTITION_EOF: | |
echo "读取音讯失败 \n\r"; | |
break; | |
case RD_KAFKA_RESP_ERR__TIMED_OUT: | |
echo "申请超时 \n\r"; | |
break; | |
default: | |
throw new \Exception($message->errstr(), $message->err); | |
break; | |
} | |
} else {echo "未读取到音讯 \n\r";} | |
} | |
$output->writeln("!!!the end!!!"); | |
} |
其余:
在执行生产过程中,发现 kafka 进行服务,抛出的异样:ERROR Shutdown broker because all log dirs in /tmp/kafka-logs have failed。
解决办法:
删除 kafka-logs 下的所有日志,再重新启动 Kafaka,kafka-server-start.bat ….\config\server.properties &
正文完