[ Index ]

PHP Cross Reference of phpBB-3.2.11-deutsch

title

Body

[close]

/vendor/guzzlehttp/guzzle/src/ -> Pool.php (source)

   1  <?php
   2  namespace GuzzleHttp;
   3  
   4  use GuzzleHttp\Event\BeforeEvent;
   5  use GuzzleHttp\Event\RequestEvents;
   6  use GuzzleHttp\Message\RequestInterface;
   7  use GuzzleHttp\Message\ResponseInterface;
   8  use GuzzleHttp\Ring\Core;
   9  use GuzzleHttp\Ring\Future\FutureInterface;
  10  use GuzzleHttp\Event\ListenerAttacherTrait;
  11  use GuzzleHttp\Event\EndEvent;
  12  use React\Promise\Deferred;
  13  use React\Promise\FulfilledPromise;
  14  use React\Promise\PromiseInterface;
  15  use React\Promise\RejectedPromise;
  16  
  17  /**
  18   * Sends and iterator of requests concurrently using a capped pool size.
  19   *
  20   * The Pool object implements FutureInterface, meaning it can be used later
  21   * when necessary, the requests provided to the pool can be cancelled, and
  22   * you can check the state of the pool to know if it has been dereferenced
  23   * (sent) or has been cancelled.
  24   *
  25   * When sending the pool, keep in mind that no results are returned: callers
  26   * are expected to handle results asynchronously using Guzzle's event system.
  27   * When requests complete, more are added to the pool to ensure that the
  28   * requested pool size is always filled as much as possible.
  29   *
  30   * IMPORTANT: Do not provide a pool size greater that what the utilized
  31   * underlying RingPHP handler can support. This will result is extremely poor
  32   * performance.
  33   */
  34  class Pool implements FutureInterface
  35  {
  36      use ListenerAttacherTrait;
  37  
  38      /** @var \GuzzleHttp\ClientInterface */
  39      private $client;
  40  
  41      /** @var \Iterator Yields requests */
  42      private $iter;
  43  
  44      /** @var Deferred */
  45      private $deferred;
  46  
  47      /** @var PromiseInterface */
  48      private $promise;
  49  
  50      private $waitQueue = [];
  51      private $eventListeners = [];
  52      private $poolSize;
  53      private $isRealized = false;
  54  
  55      /**
  56       * The option values for 'before', 'complete', 'error' and 'end' can be a
  57       * callable, an associative array containing event data, or an array of
  58       * event data arrays. Event data arrays contain the following keys:
  59       *
  60       * - fn: callable to invoke that receives the event
  61       * - priority: Optional event priority (defaults to 0)
  62       * - once: Set to true so that the event is removed after it is triggered
  63       *
  64       * @param ClientInterface $client   Client used to send the requests.
  65       * @param array|\Iterator $requests Requests to send in parallel
  66       * @param array           $options  Associative array of options
  67       *     - pool_size: (callable|int)   Maximum number of requests to send
  68       *                                   concurrently, or a callback that receives
  69       *                                   the current queue size and returns the
  70       *                                   number of new requests to send
  71       *     - before:    (callable|array) Receives a BeforeEvent
  72       *     - complete:  (callable|array) Receives a CompleteEvent
  73       *     - error:     (callable|array) Receives a ErrorEvent
  74       *     - end:       (callable|array) Receives an EndEvent
  75       */
  76      public function __construct(
  77          ClientInterface $client,
  78          $requests,
  79          array $options = []
  80      ) {
  81          $this->client = $client;
  82          $this->iter = $this->coerceIterable($requests);
  83          $this->deferred = new Deferred();
  84          $this->promise = $this->deferred->promise();
  85          $this->poolSize = isset($options['pool_size'])
  86              ? $options['pool_size'] : 25;
  87          $this->eventListeners = $this->prepareListeners(
  88              $options,
  89              ['before', 'complete', 'error', 'end']
  90          );
  91      }
  92  
  93      /**
  94       * Sends multiple requests in parallel and returns an array of responses
  95       * and exceptions that uses the same ordering as the provided requests.
  96       *
  97       * IMPORTANT: This method keeps every request and response in memory, and
  98       * as such, is NOT recommended when sending a large number or an
  99       * indeterminate number of requests concurrently.
 100       *
 101       * @param ClientInterface $client   Client used to send the requests
 102       * @param array|\Iterator $requests Requests to send in parallel
 103       * @param array           $options  Passes through the options available in
 104       *                                  {@see GuzzleHttp\Pool::__construct}
 105       *
 106       * @return BatchResults Returns a container for the results.
 107       * @throws \InvalidArgumentException if the event format is incorrect.
 108       */
 109      public static function batch(
 110          ClientInterface $client,
 111          $requests,
 112          array $options = []
 113      ) {
 114          $hash = new \SplObjectStorage();
 115          foreach ($requests as $request) {
 116              $hash->attach($request);
 117          }
 118  
 119          // In addition to the normally run events when requests complete, add
 120          // and event to continuously track the results of transfers in the hash.
 121          (new self($client, $requests, RequestEvents::convertEventArray(
 122              $options,
 123              ['end'],
 124              [
 125                  'priority' => RequestEvents::LATE,
 126                  'fn'       => function (EndEvent $e) use ($hash) {
 127                      $hash[$e->getRequest()] = $e->getException()
 128                          ? $e->getException()
 129                          : $e->getResponse();
 130                  }
 131              ]
 132          )))->wait();
 133  
 134          return new BatchResults($hash);
 135      }
 136  
 137      /**
 138       * Creates a Pool and immediately sends the requests.
 139       *
 140       * @param ClientInterface $client   Client used to send the requests
 141       * @param array|\Iterator $requests Requests to send in parallel
 142       * @param array           $options  Passes through the options available in
 143       *                                  {@see GuzzleHttp\Pool::__construct}
 144       */
 145      public static function send(
 146          ClientInterface $client,
 147          $requests,
 148          array $options = []
 149      ) {
 150          $pool = new self($client, $requests, $options);
 151          $pool->wait();
 152      }
 153  
 154      private function getPoolSize()
 155      {
 156          return is_callable($this->poolSize)
 157              ? call_user_func($this->poolSize, count($this->waitQueue))
 158              : $this->poolSize;
 159      }
 160  
 161      /**
 162       * Add as many requests as possible up to the current pool limit.
 163       */
 164      private function addNextRequests()
 165      {
 166          $limit = max($this->getPoolSize() - count($this->waitQueue), 0);
 167          while ($limit--) {
 168              if (!$this->addNextRequest()) {
 169                  break;
 170              }
 171          }
 172      }
 173  
 174      public function wait()
 175      {
 176          if ($this->isRealized) {
 177              return false;
 178          }
 179  
 180          // Seed the pool with N number of requests.
 181          $this->addNextRequests();
 182  
 183          // Stop if the pool was cancelled while transferring requests.
 184          if ($this->isRealized) {
 185              return false;
 186          }
 187  
 188          // Wait on any outstanding FutureResponse objects.
 189          while ($response = array_pop($this->waitQueue)) {
 190              try {
 191                  $response->wait();
 192              } catch (\Exception $e) {
 193                  // Eat exceptions because they should be handled asynchronously
 194              }
 195              $this->addNextRequests();
 196          }
 197  
 198          // Clean up no longer needed state.
 199          $this->isRealized = true;
 200          $this->waitQueue = $this->eventListeners = [];
 201          $this->client = $this->iter = null;
 202          $this->deferred->resolve(true);
 203  
 204          return true;
 205      }
 206  
 207      /**
 208       * {@inheritdoc}
 209       *
 210       * Attempt to cancel all outstanding requests (requests that are queued for
 211       * dereferencing). Returns true if all outstanding requests can be
 212       * cancelled.
 213       *
 214       * @return bool
 215       */
 216      public function cancel()
 217      {
 218          if ($this->isRealized) {
 219              return false;
 220          }
 221  
 222          $success = $this->isRealized = true;
 223          foreach ($this->waitQueue as $response) {
 224              if (!$response->cancel()) {
 225                  $success = false;
 226              }
 227          }
 228  
 229          return $success;
 230      }
 231  
 232      /**
 233       * Returns a promise that is invoked when the pool completed. There will be
 234       * no passed value.
 235       *
 236       * {@inheritdoc}
 237       */
 238      public function then(
 239          callable $onFulfilled = null,
 240          callable $onRejected = null,
 241          callable $onProgress = null
 242      ) {
 243          return $this->promise->then($onFulfilled, $onRejected, $onProgress);
 244      }
 245  
 246      public function promise()
 247      {
 248          return $this->promise;
 249      }
 250  
 251      private function coerceIterable($requests)
 252      {
 253          if ($requests instanceof \Iterator) {
 254              return $requests;
 255          } elseif (is_array($requests)) {
 256              return new \ArrayIterator($requests);
 257          }
 258  
 259          throw new \InvalidArgumentException('Expected Iterator or array. '
 260              . 'Found ' . Core::describeType($requests));
 261      }
 262  
 263      /**
 264       * Adds the next request to pool and tracks what requests need to be
 265       * dereferenced when completing the pool.
 266       */
 267      private function addNextRequest()
 268      {
 269          add_next:
 270  
 271          if ($this->isRealized || !$this->iter || !$this->iter->valid()) {
 272              return false;
 273          }
 274  
 275          $request = $this->iter->current();
 276          $this->iter->next();
 277  
 278          if (!($request instanceof RequestInterface)) {
 279              throw new \InvalidArgumentException(sprintf(
 280                  'All requests in the provided iterator must implement '
 281                  . 'RequestInterface. Found %s',
 282                  Core::describeType($request)
 283              ));
 284          }
 285  
 286          // Be sure to use "lazy" futures, meaning they do not send right away.
 287          $request->getConfig()->set('future', 'lazy');
 288          $hash = spl_object_hash($request);
 289          $this->attachListeners($request, $this->eventListeners);
 290          $request->getEmitter()->on('before', [$this, '_trackRetries'], RequestEvents::EARLY);
 291          $response = $this->client->send($request);
 292          $this->waitQueue[$hash] = $response;
 293          $promise = $response->promise();
 294  
 295          // Don't recursively call itself for completed or rejected responses.
 296          if ($promise instanceof FulfilledPromise
 297              || $promise instanceof RejectedPromise
 298          ) {
 299              try {
 300                  $this->finishResponse($request, $response->wait(), $hash);
 301              } catch (\Exception $e) {
 302                  $this->finishResponse($request, $e, $hash);
 303              }
 304              goto add_next;
 305          }
 306  
 307          // Use this function for both resolution and rejection.
 308          $thenFn = function ($value) use ($request, $hash) {
 309              $this->finishResponse($request, $value, $hash);
 310              if (!$request->getConfig()->get('_pool_retries')) {
 311                  $this->addNextRequests();
 312              }
 313          };
 314  
 315          $promise->then($thenFn, $thenFn);
 316  
 317          return true;
 318      }
 319  
 320      public function _trackRetries(BeforeEvent $e)
 321      {
 322          $e->getRequest()->getConfig()->set('_pool_retries', $e->getRetryCount());
 323      }
 324  
 325      private function finishResponse($request, $value, $hash)
 326      {
 327          unset($this->waitQueue[$hash]);
 328          $result = $value instanceof ResponseInterface
 329              ? ['request' => $request, 'response' => $value, 'error' => null]
 330              : ['request' => $request, 'response' => null, 'error' => $value];
 331          $this->deferred->notify($result);
 332      }
 333  }


Generated: Wed Nov 11 20:33:01 2020 Cross-referenced by PHPXref 0.7.1