Runtime::enableCoroutine(false);// mgo(function () {// Create the logger// $logger = new Logger('my_logger');// // Now add some handlers// $logger->pushHandler(new StdoutHandler());// //$logger->pushHandler(new \Monolog\Handler\NullHandler());// \Amp\Loop::set(new \Swoole\Driver\Amp());$config = ConsumerConfig::getInstance();$config->setMetadataRefreshIntervalMs(500);$config->setMetadataBrokerList('192.168.3.243:9092');$config->setGroupId('swoole');$config->setBrokerVersion('1.0.0');$config->setTopics(['save_user_travel_data_one']);$config->setOffsetReset('earliest');$consumer = new Consumer();// $consumer->setLogger($logger);$consumer->start(function ($topic, $part, $message): void { mgo(function () use ($message) { $data = json_decode(json_decode($message['message']['value'], true), true); $clearing_start_time = $data['clearing_start_time'];//清算时间 $user_id = $data['user_id'];//清算用户id $total = $data['total'];//这次清算总用户数量 Console::Debug('开始一个任务.' . $user_id); $this->saveUserTravel($user_id, $clearing_start_time); Redis::incr('save_user_travel_log:' . $clearing_start_time); Console::Debug('完成了第.' . Redis::get('save_user_travel_log:' . $clearing_start_time)); if (Redis::get('save_user_travel_log:' . $clearing_start_time) >= $total) { //do Console::Debug('完成了所有任务'); } }, false);});\Swoole\Event::wait();