Kafka集群部署安装rdkafkardkafka 依赖 libkafkayum install rdkafka rdkafka-develpecl install rdkafkaphp –ri rdkafkahttp://pecl.php.net/package/r… 可以参阅支持的kafka客户端版本生产者连接集群,创建 topic,生产数据。<?php$rk = new Rdkafka\Producer();$rk->setLogLevel(LOG_DEBUG);// 链接kafka集群$rk->addBrokers(“192.168.20.6:9092,192.168.20.6:9093”);// 创建topic$topic = $rk->newTopic(“topic_1”);while (true) { $message = “hello kafka " . date(“Y-m-d H:i:s”); echo “hello kafka " . date(“Y-m-d H:i:s”) . PHP_EOL; try { $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message); sleep(2); } catch (\Exception $e) { echo $e->getMessage() . PHP_EOL; }}消费者-HighLevel自动分配partition,rebalance,comsumer group。<?php$conf = new RdKafka\Conf();// Set a rebalance callback to log partition assignments (optional)$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo “Assign: “; var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo “Revoke: “; var_dump($partitions); $kafka->assign(null); break; default: throw new \Exception($err); }});// Configure the group.id. All consumer with the same group.id will consume// different partitions.$conf->set(‘group.id’, ‘group_1’);// Initial list of Kafka brokers$conf->set(‘metadata.broker.list’, ‘192.168.20.6:9092,192.168.20.6:9093’);$topicConf = new RdKafka\TopicConf();// Set where to start consuming messages when there is no initial offset in// offset store or the desired offset is out of range.// ‘smallest’: start from the beginning$topicConf->set(‘auto.offset.reset’, ‘smallest’);// Set the configuration to use for subscribed/assigned topics$conf->setDefaultTopicConf($topicConf);$consumer = new RdKafka\KafkaConsumer($conf);// Subscribe to topic ’topic_1’$consumer->subscribe([’topic_1’]);echo “Waiting for partition assignment… (make take some time when\n”;echo “quickly re-joining the group after leaving it.)\n”;while (true) { $message = $consumer->consume(3e3); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: sleep(2); case RD_KAFKA_RESP_ERR__TIMED_OUT: echo $message->errstr() . PHP_EOL; break; default: throw new \Exception($message->errstr(), $message->err); break; }}消费者-LowLevel指定partition消费。php consumer_lowlevel.php [partitonNuo]LowLevel 没有消费组的概念,也可以认为每个消费者都属于一个独立消费组。<?phpif (!isset($argv[1])) { fwrite(STDOUT, “请指定消费分区:”); $partition = (int) fread(STDIN, 1024);} else { $partition = (int) $argv[1];}$topic = “topic_1”;$conf = new RdKafka\Conf();// Set the group id. This is required when storing offsets on the broker$conf->set(‘group.id’, ‘group_2’);$rk = new RdKafka\Consumer($conf);$rk->addBrokers(‘192.168.20.6:9092,192.168.20.6:9093’);$topicConf = new RdKafka\TopicConf();$topicConf->set(‘auto.commit.interval.ms’, 2000);// Set the offset store method to ‘file’// $topicConf->set(‘offset.store.method’, ‘file’);// $topicConf->set(‘offset.store.path’, sys_get_temp_dir());// Alternatively, set the offset store method to ‘broker’$topicConf->set(‘offset.store.method’, ‘broker’);// Set where to start consuming messages when there is no initial offset in// offset store or the desired offset is out of range.// ‘smallest’: start from the beginning$topicConf->set(‘auto.offset.reset’, ‘smallest’);$topic = $rk->newTopic($topic, $topicConf);// Start consuming partition 0$topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED);while (true) { $message = $topic->consume($partition, 3 * 1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: case RD_KAFKA_RESP_ERR__TIMED_OUT: echo $message->errstr() . PHP_EOL; break; default: throw new \Exception($message->errstr(), $message->err); break; }}