-
测试
$promise1 = function () {msleep(500); return 'one'; }; $promise2 = function () {msleep(100); return 'two'; }; $promise3 = function () {msleep(50); throw new \Exception('Reject'); }; var_dump(promise_all([$promise1, $promise2])); var_dump(promise_race([$promise1, $promise2])); var_dump(promise_race([$promise1, $promise2, $promise3]));
-
后果
# php promise.php array(2) {[1]=> string(3) "two" [0]=> string(3) "one" } string(3) "two" object(Exception)#15 (7) {}
-
实现
<?php declare(strict_types=1); use Swow\Coroutine; use Swow\Channel; use Swow\Selector; use Swow\Sync\WaitGroup; use function Swow\defer; /** * @param array $callbacks * @param int $parallel 并发数量 * @return array */ function promise_all(array $callbacks, int $parallel = -1) {$wg = new WaitGroup(); $channel = new Channel($parallel); $results = []; foreach ($callbacks as $key => $callback) {$wg->add(); $channel->push(true); Coroutine::run(static function () use ($wg, $channel, $callback, $key, &$results) { try {$results[$key] = $callback();} catch (\Throwable) { } finally {$channel->pop(); $wg->done();} }); } $wg->wait(); return $results; } /** * @param array $callbacks * @param int $timeout 超时 * @param bool $throw 是否抛出异样 * @return mixed */ function promise_race(array $callbacks, int $timeout = -1, bool $throw = true) {$coroutines = []; defer(static function () use (&$coroutines) {Coroutine::run(static function () use (&$coroutines) {foreach ($coroutines as $coroutine) {if ($coroutine && $coroutine->isAlive()) {$coroutine->kill(); } } }); }); $selector = new Selector(); foreach ($callbacks as $callback) {$channel = new Channel(); $coroutines[] = Coroutine::run(static function () use ($channel, $callback) { try {$channel->push($callback()); } catch (\Throwable $e) {$channel->push($e); } }); $selector->pop($channel); } try {$selector->commit($timeout); return $selector->fetch();} catch (\Throwable $e) {if ($throw) {throw $e;} } return false; }