[ Index ]

PHP Cross Reference of phpBB-3.3.7-deutsch

title

Body

[close]

/vendor/guzzlehttp/promises/src/ -> EachPromise.php (source)

   1  <?php
   2  
   3  namespace GuzzleHttp\Promise;
   4  
   5  /**
   6   * Represents a promise that iterates over many promises and invokes
   7   * side-effect functions in the process.
   8   */
   9  class EachPromise implements PromisorInterface
  10  {
  11      private $pending = [];
  12  
  13      private $nextPendingIndex = 0;
  14  
  15      /** @var \Iterator|null */
  16      private $iterable;
  17  
  18      /** @var callable|int|null */
  19      private $concurrency;
  20  
  21      /** @var callable|null */
  22      private $onFulfilled;
  23  
  24      /** @var callable|null */
  25      private $onRejected;
  26  
  27      /** @var Promise|null */
  28      private $aggregate;
  29  
  30      /** @var bool|null */
  31      private $mutex;
  32  
  33      /**
  34       * Configuration hash can include the following key value pairs:
  35       *
  36       * - fulfilled: (callable) Invoked when a promise fulfills. The function
  37       *   is invoked with three arguments: the fulfillment value, the index
  38       *   position from the iterable list of the promise, and the aggregate
  39       *   promise that manages all of the promises. The aggregate promise may
  40       *   be resolved from within the callback to short-circuit the promise.
  41       * - rejected: (callable) Invoked when a promise is rejected. The
  42       *   function is invoked with three arguments: the rejection reason, the
  43       *   index position from the iterable list of the promise, and the
  44       *   aggregate promise that manages all of the promises. The aggregate
  45       *   promise may be resolved from within the callback to short-circuit
  46       *   the promise.
  47       * - concurrency: (integer) Pass this configuration option to limit the
  48       *   allowed number of outstanding concurrently executing promises,
  49       *   creating a capped pool of promises. There is no limit by default.
  50       *
  51       * @param mixed $iterable Promises or values to iterate.
  52       * @param array $config   Configuration options
  53       */
  54      public function __construct($iterable, array $config = [])
  55      {
  56          $this->iterable = Create::iterFor($iterable);
  57  
  58          if (isset($config['concurrency'])) {
  59              $this->concurrency = $config['concurrency'];
  60          }
  61  
  62          if (isset($config['fulfilled'])) {
  63              $this->onFulfilled = $config['fulfilled'];
  64          }
  65  
  66          if (isset($config['rejected'])) {
  67              $this->onRejected = $config['rejected'];
  68          }
  69      }
  70  
  71      /** @psalm-suppress InvalidNullableReturnType */
  72      public function promise()
  73      {
  74          if ($this->aggregate) {
  75              return $this->aggregate;
  76          }
  77  
  78          try {
  79              $this->createPromise();
  80              /** @psalm-assert Promise $this->aggregate */
  81              $this->iterable->rewind();
  82              $this->refillPending();
  83          } catch (\Throwable $e) {
  84              /**
  85               * @psalm-suppress NullReference
  86               * @phpstan-ignore-next-line
  87               */
  88              $this->aggregate->reject($e);
  89          } catch (\Exception $e) {
  90              /**
  91               * @psalm-suppress NullReference
  92               * @phpstan-ignore-next-line
  93               */
  94              $this->aggregate->reject($e);
  95          }
  96  
  97          /**
  98           * @psalm-suppress NullableReturnStatement
  99           * @phpstan-ignore-next-line
 100           */
 101          return $this->aggregate;
 102      }
 103  
 104      private function createPromise()
 105      {
 106          $this->mutex = false;
 107          $this->aggregate = new Promise(function () {
 108              if ($this->checkIfFinished()) {
 109                  return;
 110              }
 111              reset($this->pending);
 112              // Consume a potentially fluctuating list of promises while
 113              // ensuring that indexes are maintained (precluding array_shift).
 114              while ($promise = current($this->pending)) {
 115                  next($this->pending);
 116                  $promise->wait();
 117                  if (Is::settled($this->aggregate)) {
 118                      return;
 119                  }
 120              }
 121          });
 122  
 123          // Clear the references when the promise is resolved.
 124          $clearFn = function () {
 125              $this->iterable = $this->concurrency = $this->pending = null;
 126              $this->onFulfilled = $this->onRejected = null;
 127              $this->nextPendingIndex = 0;
 128          };
 129  
 130          $this->aggregate->then($clearFn, $clearFn);
 131      }
 132  
 133      private function refillPending()
 134      {
 135          if (!$this->concurrency) {
 136              // Add all pending promises.
 137              while ($this->addPending() && $this->advanceIterator());
 138              return;
 139          }
 140  
 141          // Add only up to N pending promises.
 142          $concurrency = is_callable($this->concurrency)
 143              ? call_user_func($this->concurrency, count($this->pending))
 144              : $this->concurrency;
 145          $concurrency = max($concurrency - count($this->pending), 0);
 146          // Concurrency may be set to 0 to disallow new promises.
 147          if (!$concurrency) {
 148              return;
 149          }
 150          // Add the first pending promise.
 151          $this->addPending();
 152          // Note this is special handling for concurrency=1 so that we do
 153          // not advance the iterator after adding the first promise. This
 154          // helps work around issues with generators that might not have the
 155          // next value to yield until promise callbacks are called.
 156          while (--$concurrency
 157              && $this->advanceIterator()
 158              && $this->addPending());
 159      }
 160  
 161      private function addPending()
 162      {
 163          if (!$this->iterable || !$this->iterable->valid()) {
 164              return false;
 165          }
 166  
 167          $promise = Create::promiseFor($this->iterable->current());
 168          $key = $this->iterable->key();
 169  
 170          // Iterable keys may not be unique, so we use a counter to
 171          // guarantee uniqueness
 172          $idx = $this->nextPendingIndex++;
 173  
 174          $this->pending[$idx] = $promise->then(
 175              function ($value) use ($idx, $key) {
 176                  if ($this->onFulfilled) {
 177                      call_user_func(
 178                          $this->onFulfilled,
 179                          $value,
 180                          $key,
 181                          $this->aggregate
 182                      );
 183                  }
 184                  $this->step($idx);
 185              },
 186              function ($reason) use ($idx, $key) {
 187                  if ($this->onRejected) {
 188                      call_user_func(
 189                          $this->onRejected,
 190                          $reason,
 191                          $key,
 192                          $this->aggregate
 193                      );
 194                  }
 195                  $this->step($idx);
 196              }
 197          );
 198  
 199          return true;
 200      }
 201  
 202      private function advanceIterator()
 203      {
 204          // Place a lock on the iterator so that we ensure to not recurse,
 205          // preventing fatal generator errors.
 206          if ($this->mutex) {
 207              return false;
 208          }
 209  
 210          $this->mutex = true;
 211  
 212          try {
 213              $this->iterable->next();
 214              $this->mutex = false;
 215              return true;
 216          } catch (\Throwable $e) {
 217              $this->aggregate->reject($e);
 218              $this->mutex = false;
 219              return false;
 220          } catch (\Exception $e) {
 221              $this->aggregate->reject($e);
 222              $this->mutex = false;
 223              return false;
 224          }
 225      }
 226  
 227      private function step($idx)
 228      {
 229          // If the promise was already resolved, then ignore this step.
 230          if (Is::settled($this->aggregate)) {
 231              return;
 232          }
 233  
 234          unset($this->pending[$idx]);
 235  
 236          // Only refill pending promises if we are not locked, preventing the
 237          // EachPromise to recursively invoke the provided iterator, which
 238          // cause a fatal error: "Cannot resume an already running generator"
 239          if ($this->advanceIterator() && !$this->checkIfFinished()) {
 240              // Add more pending promises if possible.
 241              $this->refillPending();
 242          }
 243      }
 244  
 245      private function checkIfFinished()
 246      {
 247          if (!$this->pending && !$this->iterable->valid()) {
 248              // Resolve the promise if there's nothing left to do.
 249              $this->aggregate->resolve(null);
 250              return true;
 251          }
 252  
 253          return false;
 254      }
 255  }


Generated: Thu Mar 24 21:31:15 2022 Cross-referenced by PHPXref 0.7.1