测试
$promise1 = function () { msleep(500); return 'one';};$promise2 = function () { msleep(100); return 'two';};$promise3 = function () { msleep(50); throw new \Exception('Reject');};var_dump(promise_all([$promise1, $promise2]));var_dump(promise_race([$promise1, $promise2]));var_dump(promise_race([$promise1, $promise2, $promise3]));
后果
# php promise.phparray(2) {[1]=>string(3) "two"[0]=>string(3) "one"}string(3) "two"object(Exception)#15 (7) {}
实现
<?phpdeclare(strict_types=1);use Swow\Coroutine;use Swow\Channel;use Swow\Selector;use Swow\Sync\WaitGroup;use function Swow\defer;/** * @param array $callbacks * @param int $parallel 并发数量 * @return array */function promise_all(array $callbacks, int $parallel = -1){ $wg = new WaitGroup(); $channel = new Channel($parallel); $results = []; foreach ($callbacks as $key => $callback) { $wg->add(); $channel->push(true); Coroutine::run(static function () use ($wg, $channel, $callback, $key, &$results) { try { $results[$key] = $callback(); } catch (\Throwable) { } finally { $channel->pop(); $wg->done(); } }); } $wg->wait(); return $results;}/** * @param array $callbacks * @param int $timeout 超时 * @param bool $throw 是否抛出异样 * @return mixed */function promise_race(array $callbacks, int $timeout = -1, bool $throw = true){ $coroutines = []; defer(static function () use (&$coroutines) { Coroutine::run(static function () use (&$coroutines) { foreach ($coroutines as $coroutine) { if ($coroutine && $coroutine->isAlive()) { $coroutine->kill(); } } }); }); $selector = new Selector(); foreach ($callbacks as $callback) { $channel = new Channel(); $coroutines[] = Coroutine::run(static function () use ($channel, $callback) { try { $channel->push($callback()); } catch (\Throwable $e) { $channel->push($e); } }); $selector->pop($channel); } try { $selector->commit($timeout); return $selector->fetch(); } catch (\Throwable $e) { if ($throw) { throw $e; } } return false;}