共计 1244 个字符,预计需要花费 4 分钟才能阅读完成。
Runtime::enableCoroutine(false);
// mgo(function () {
// Create the logger
// $logger = new Logger('my_logger');
// // Now add some handlers
// $logger->pushHandler(new StdoutHandler());
// //$logger->pushHandler(new \Monolog\Handler\NullHandler());
// \Amp\Loop::set(new \Swoole\Driver\Amp());
$config = ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(500);
$config->setMetadataBrokerList('192.168.3.243:9092');
$config->setGroupId('swoole');
$config->setBrokerVersion('1.0.0');
$config->setTopics(['save_user_travel_data_one']);
$config->setOffsetReset('earliest');
$consumer = new Consumer();
// $consumer->setLogger($logger);
$consumer->start(function ($topic, $part, $message): void {mgo(function () use ($message) {$data = json_decode(json_decode($message['message']['value'], true), true);
$clearing_start_time = $data['clearing_start_time'];// 清算时间
$user_id = $data['user_id'];// 清算用户 id
$total = $data['total'];// 这次清算总用户数量
Console::Debug('开始一个任务.' . $user_id);
$this->saveUserTravel($user_id, $clearing_start_time);
Redis::incr('save_user_travel_log:' . $clearing_start_time);
Console::Debug('完成了第.' . Redis::get('save_user_travel_log:' . $clearing_start_time));
if (Redis::get('save_user_travel_log:' . $clearing_start_time) >= $total) {
//do
Console::Debug('完成了所有任务');
}
}, false);
});
\Swoole\Event::wait();
正文完