[ Index ] |
PHP Cross Reference of phpBB-3.2.11-deutsch |
[Summary view] [Print] [Text view]
1 <?php 2 namespace GuzzleHttp; 3 4 use GuzzleHttp\Event\BeforeEvent; 5 use GuzzleHttp\Event\RequestEvents; 6 use GuzzleHttp\Message\RequestInterface; 7 use GuzzleHttp\Message\ResponseInterface; 8 use GuzzleHttp\Ring\Core; 9 use GuzzleHttp\Ring\Future\FutureInterface; 10 use GuzzleHttp\Event\ListenerAttacherTrait; 11 use GuzzleHttp\Event\EndEvent; 12 use React\Promise\Deferred; 13 use React\Promise\FulfilledPromise; 14 use React\Promise\PromiseInterface; 15 use React\Promise\RejectedPromise; 16 17 /** 18 * Sends and iterator of requests concurrently using a capped pool size. 19 * 20 * The Pool object implements FutureInterface, meaning it can be used later 21 * when necessary, the requests provided to the pool can be cancelled, and 22 * you can check the state of the pool to know if it has been dereferenced 23 * (sent) or has been cancelled. 24 * 25 * When sending the pool, keep in mind that no results are returned: callers 26 * are expected to handle results asynchronously using Guzzle's event system. 27 * When requests complete, more are added to the pool to ensure that the 28 * requested pool size is always filled as much as possible. 29 * 30 * IMPORTANT: Do not provide a pool size greater that what the utilized 31 * underlying RingPHP handler can support. This will result is extremely poor 32 * performance. 33 */ 34 class Pool implements FutureInterface 35 { 36 use ListenerAttacherTrait; 37 38 /** @var \GuzzleHttp\ClientInterface */ 39 private $client; 40 41 /** @var \Iterator Yields requests */ 42 private $iter; 43 44 /** @var Deferred */ 45 private $deferred; 46 47 /** @var PromiseInterface */ 48 private $promise; 49 50 private $waitQueue = []; 51 private $eventListeners = []; 52 private $poolSize; 53 private $isRealized = false; 54 55 /** 56 * The option values for 'before', 'complete', 'error' and 'end' can be a 57 * callable, an associative array containing event data, or an array of 58 * event data arrays. Event data arrays contain the following keys: 59 * 60 * - fn: callable to invoke that receives the event 61 * - priority: Optional event priority (defaults to 0) 62 * - once: Set to true so that the event is removed after it is triggered 63 * 64 * @param ClientInterface $client Client used to send the requests. 65 * @param array|\Iterator $requests Requests to send in parallel 66 * @param array $options Associative array of options 67 * - pool_size: (callable|int) Maximum number of requests to send 68 * concurrently, or a callback that receives 69 * the current queue size and returns the 70 * number of new requests to send 71 * - before: (callable|array) Receives a BeforeEvent 72 * - complete: (callable|array) Receives a CompleteEvent 73 * - error: (callable|array) Receives a ErrorEvent 74 * - end: (callable|array) Receives an EndEvent 75 */ 76 public function __construct( 77 ClientInterface $client, 78 $requests, 79 array $options = [] 80 ) { 81 $this->client = $client; 82 $this->iter = $this->coerceIterable($requests); 83 $this->deferred = new Deferred(); 84 $this->promise = $this->deferred->promise(); 85 $this->poolSize = isset($options['pool_size']) 86 ? $options['pool_size'] : 25; 87 $this->eventListeners = $this->prepareListeners( 88 $options, 89 ['before', 'complete', 'error', 'end'] 90 ); 91 } 92 93 /** 94 * Sends multiple requests in parallel and returns an array of responses 95 * and exceptions that uses the same ordering as the provided requests. 96 * 97 * IMPORTANT: This method keeps every request and response in memory, and 98 * as such, is NOT recommended when sending a large number or an 99 * indeterminate number of requests concurrently. 100 * 101 * @param ClientInterface $client Client used to send the requests 102 * @param array|\Iterator $requests Requests to send in parallel 103 * @param array $options Passes through the options available in 104 * {@see GuzzleHttp\Pool::__construct} 105 * 106 * @return BatchResults Returns a container for the results. 107 * @throws \InvalidArgumentException if the event format is incorrect. 108 */ 109 public static function batch( 110 ClientInterface $client, 111 $requests, 112 array $options = [] 113 ) { 114 $hash = new \SplObjectStorage(); 115 foreach ($requests as $request) { 116 $hash->attach($request); 117 } 118 119 // In addition to the normally run events when requests complete, add 120 // and event to continuously track the results of transfers in the hash. 121 (new self($client, $requests, RequestEvents::convertEventArray( 122 $options, 123 ['end'], 124 [ 125 'priority' => RequestEvents::LATE, 126 'fn' => function (EndEvent $e) use ($hash) { 127 $hash[$e->getRequest()] = $e->getException() 128 ? $e->getException() 129 : $e->getResponse(); 130 } 131 ] 132 )))->wait(); 133 134 return new BatchResults($hash); 135 } 136 137 /** 138 * Creates a Pool and immediately sends the requests. 139 * 140 * @param ClientInterface $client Client used to send the requests 141 * @param array|\Iterator $requests Requests to send in parallel 142 * @param array $options Passes through the options available in 143 * {@see GuzzleHttp\Pool::__construct} 144 */ 145 public static function send( 146 ClientInterface $client, 147 $requests, 148 array $options = [] 149 ) { 150 $pool = new self($client, $requests, $options); 151 $pool->wait(); 152 } 153 154 private function getPoolSize() 155 { 156 return is_callable($this->poolSize) 157 ? call_user_func($this->poolSize, count($this->waitQueue)) 158 : $this->poolSize; 159 } 160 161 /** 162 * Add as many requests as possible up to the current pool limit. 163 */ 164 private function addNextRequests() 165 { 166 $limit = max($this->getPoolSize() - count($this->waitQueue), 0); 167 while ($limit--) { 168 if (!$this->addNextRequest()) { 169 break; 170 } 171 } 172 } 173 174 public function wait() 175 { 176 if ($this->isRealized) { 177 return false; 178 } 179 180 // Seed the pool with N number of requests. 181 $this->addNextRequests(); 182 183 // Stop if the pool was cancelled while transferring requests. 184 if ($this->isRealized) { 185 return false; 186 } 187 188 // Wait on any outstanding FutureResponse objects. 189 while ($response = array_pop($this->waitQueue)) { 190 try { 191 $response->wait(); 192 } catch (\Exception $e) { 193 // Eat exceptions because they should be handled asynchronously 194 } 195 $this->addNextRequests(); 196 } 197 198 // Clean up no longer needed state. 199 $this->isRealized = true; 200 $this->waitQueue = $this->eventListeners = []; 201 $this->client = $this->iter = null; 202 $this->deferred->resolve(true); 203 204 return true; 205 } 206 207 /** 208 * {@inheritdoc} 209 * 210 * Attempt to cancel all outstanding requests (requests that are queued for 211 * dereferencing). Returns true if all outstanding requests can be 212 * cancelled. 213 * 214 * @return bool 215 */ 216 public function cancel() 217 { 218 if ($this->isRealized) { 219 return false; 220 } 221 222 $success = $this->isRealized = true; 223 foreach ($this->waitQueue as $response) { 224 if (!$response->cancel()) { 225 $success = false; 226 } 227 } 228 229 return $success; 230 } 231 232 /** 233 * Returns a promise that is invoked when the pool completed. There will be 234 * no passed value. 235 * 236 * {@inheritdoc} 237 */ 238 public function then( 239 callable $onFulfilled = null, 240 callable $onRejected = null, 241 callable $onProgress = null 242 ) { 243 return $this->promise->then($onFulfilled, $onRejected, $onProgress); 244 } 245 246 public function promise() 247 { 248 return $this->promise; 249 } 250 251 private function coerceIterable($requests) 252 { 253 if ($requests instanceof \Iterator) { 254 return $requests; 255 } elseif (is_array($requests)) { 256 return new \ArrayIterator($requests); 257 } 258 259 throw new \InvalidArgumentException('Expected Iterator or array. ' 260 . 'Found ' . Core::describeType($requests)); 261 } 262 263 /** 264 * Adds the next request to pool and tracks what requests need to be 265 * dereferenced when completing the pool. 266 */ 267 private function addNextRequest() 268 { 269 add_next: 270 271 if ($this->isRealized || !$this->iter || !$this->iter->valid()) { 272 return false; 273 } 274 275 $request = $this->iter->current(); 276 $this->iter->next(); 277 278 if (!($request instanceof RequestInterface)) { 279 throw new \InvalidArgumentException(sprintf( 280 'All requests in the provided iterator must implement ' 281 . 'RequestInterface. Found %s', 282 Core::describeType($request) 283 )); 284 } 285 286 // Be sure to use "lazy" futures, meaning they do not send right away. 287 $request->getConfig()->set('future', 'lazy'); 288 $hash = spl_object_hash($request); 289 $this->attachListeners($request, $this->eventListeners); 290 $request->getEmitter()->on('before', [$this, '_trackRetries'], RequestEvents::EARLY); 291 $response = $this->client->send($request); 292 $this->waitQueue[$hash] = $response; 293 $promise = $response->promise(); 294 295 // Don't recursively call itself for completed or rejected responses. 296 if ($promise instanceof FulfilledPromise 297 || $promise instanceof RejectedPromise 298 ) { 299 try { 300 $this->finishResponse($request, $response->wait(), $hash); 301 } catch (\Exception $e) { 302 $this->finishResponse($request, $e, $hash); 303 } 304 goto add_next; 305 } 306 307 // Use this function for both resolution and rejection. 308 $thenFn = function ($value) use ($request, $hash) { 309 $this->finishResponse($request, $value, $hash); 310 if (!$request->getConfig()->get('_pool_retries')) { 311 $this->addNextRequests(); 312 } 313 }; 314 315 $promise->then($thenFn, $thenFn); 316 317 return true; 318 } 319 320 public function _trackRetries(BeforeEvent $e) 321 { 322 $e->getRequest()->getConfig()->set('_pool_retries', $e->getRetryCount()); 323 } 324 325 private function finishResponse($request, $value, $hash) 326 { 327 unset($this->waitQueue[$hash]); 328 $result = $value instanceof ResponseInterface 329 ? ['request' => $request, 'response' => $value, 'error' => null] 330 : ['request' => $request, 'response' => null, 'error' => $value]; 331 $this->deferred->notify($result); 332 } 333 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body
Generated: Wed Nov 11 20:33:01 2020 | Cross-referenced by PHPXref 0.7.1 |