关于php:php-实现-Promiseall-和-Promiserace

52次阅读

共计 1692 个字符,预计需要花费 5 分钟才能阅读完成。

  • 测试

    $promise1 = function () {msleep(500);
      return 'one';
    };
    $promise2 = function () {msleep(100);
      return 'two';
    };
    $promise3 = function () {msleep(50);
      throw new \Exception('Reject');
    };
    var_dump(promise_all([$promise1, $promise2]));
    var_dump(promise_race([$promise1, $promise2]));
    var_dump(promise_race([$promise1, $promise2, $promise3]));
  • 后果

    # php promise.php
    array(2) {[1]=>
    string(3) "two"
    [0]=>
    string(3) "one"
    }
    string(3) "two"
    object(Exception)#15 (7) {}
  • 实现

    <?php
    
    declare(strict_types=1);
    
    use Swow\Coroutine;
    use Swow\Channel;
    use Swow\Selector;
    use Swow\Sync\WaitGroup;
    use function Swow\defer;
    
    /**
     * @param array $callbacks 
     * @param int $parallel 并发数量
     * @return array 
     */
    function promise_all(array $callbacks, int $parallel = -1)
    {$wg = new WaitGroup();
      $channel = new Channel($parallel);
      $results = [];
      foreach ($callbacks as $key => $callback) {$wg->add();
          $channel->push(true);
          Coroutine::run(static function () use ($wg, $channel, $callback, $key, &$results) {
              try {$results[$key] = $callback();} catch (\Throwable) { } finally {$channel->pop();
                  $wg->done();}
          });
      }
      $wg->wait();
      return $results;
    }
    
    /**
     * @param array $callbacks 
     * @param int $timeout 超时
     * @param bool $throw 是否抛出异样
     * @return mixed 
     */
    function promise_race(array $callbacks, int $timeout = -1, bool $throw = true)
    {$coroutines = [];
      defer(static function () use (&$coroutines) {Coroutine::run(static function () use (&$coroutines) {foreach ($coroutines as $coroutine) {if ($coroutine && $coroutine->isAlive()) {$coroutine->kill();
                  }
              }
          });
      });
      $selector = new Selector();
      foreach ($callbacks as $callback) {$channel = new Channel();
          $coroutines[] = Coroutine::run(static function () use ($channel, $callback) {
              try {$channel->push($callback());
              } catch (\Throwable $e) {$channel->push($e);
              }
          });
          $selector->pop($channel);
      }
      try {$selector->commit($timeout);
          return $selector->fetch();} catch (\Throwable $e) {if ($throw) {throw $e;}
      }
      return false;
    }

正文完
 0