[ Index ]

PHP Cross Reference of phpBB-3.2.11-deutsch

title

Body

[close]

/vendor/guzzlehttp/streams/src/ -> AsyncReadStream.php (source)

   1  <?php
   2  namespace GuzzleHttp\Stream;
   3  
   4  /**
   5   * Represents an asynchronous read-only stream that supports a drain event and
   6   * pumping data from a source stream.
   7   *
   8   * The AsyncReadStream can be used as a completely asynchronous stream, meaning
   9   * the data you can read from the stream will immediately return only
  10   * the data that is currently buffered.
  11   *
  12   * AsyncReadStream can also be used in a "blocking" manner if a "pump" function
  13   * is provided. When a caller requests more bytes than are available in the
  14   * buffer, then the pump function is used to block until the requested number
  15   * of bytes are available or the remote source stream has errored, closed, or
  16   * timed-out. This behavior isn't strictly "blocking" because the pump function
  17   * can send other transfers while waiting on the desired buffer size to be
  18   * ready for reading (e.g., continue to tick an event loop).
  19   *
  20   * @unstable This class is subject to change.
  21   */
  22  class AsyncReadStream implements StreamInterface
  23  {
  24      use StreamDecoratorTrait;
  25  
  26      /** @var callable|null Fn used to notify writers the buffer has drained */
  27      private $drain;
  28  
  29      /** @var callable|null Fn used to block for more data */
  30      private $pump;
  31  
  32      /** @var int|null Highwater mark of the underlying buffer */
  33      private $hwm;
  34  
  35      /** @var bool Whether or not drain needs to be called at some point */
  36      private $needsDrain;
  37  
  38      /** @var int The expected size of the remote source */
  39      private $size;
  40  
  41      /**
  42       * In order to utilize high water marks to tell writers to slow down, the
  43       * provided stream must answer to the "hwm" stream metadata variable,
  44       * providing the high water mark. If no "hwm" metadata value is available,
  45       * then the "drain" functionality is not utilized.
  46       *
  47       * This class accepts an associative array of configuration options.
  48       *
  49       * - drain: (callable) Function to invoke when the stream has drained,
  50       *   meaning the buffer is now writable again because the size of the
  51       *   buffer is at an acceptable level (e.g., below the high water mark).
  52       *   The function accepts a single argument, the buffer stream object that
  53       *   has drained.
  54       * - pump: (callable) A function that accepts the number of bytes to read
  55       *   from the source stream. This function will block until all of the data
  56       *   that was requested has been read, EOF of the source stream, or the
  57       *   source stream is closed.
  58       * - size: (int) The expected size in bytes of the data that will be read
  59       *   (if known up-front).
  60       *
  61       * @param StreamInterface $buffer   Buffer that contains the data that has
  62       *                                  been read by the event loop.
  63       * @param array           $config   Associative array of options.
  64       *
  65       * @throws \InvalidArgumentException if the buffer is not readable and
  66       *                                   writable.
  67       */
  68      public function __construct(
  69          StreamInterface $buffer,
  70          array $config = []
  71      ) {
  72          if (!$buffer->isReadable() || !$buffer->isWritable()) {
  73              throw new \InvalidArgumentException(
  74                  'Buffer must be readable and writable'
  75              );
  76          }
  77  
  78          if (isset($config['size'])) {
  79              $this->size = $config['size'];
  80          }
  81  
  82          static $callables = ['pump', 'drain'];
  83          foreach ($callables as $check) {
  84              if (isset($config[$check])) {
  85                  if (!is_callable($config[$check])) {
  86                      throw new \InvalidArgumentException(
  87                          $check . ' must be callable'
  88                      );
  89                  }
  90                  $this->{$check} = $config[$check];
  91              }
  92          }
  93  
  94          $this->hwm = $buffer->getMetadata('hwm');
  95  
  96          // Cannot drain when there's no high water mark.
  97          if ($this->hwm === null) {
  98              $this->drain = null;
  99          }
 100  
 101          $this->stream = $buffer;
 102      }
 103  
 104      /**
 105       * Factory method used to create new async stream and an underlying buffer
 106       * if no buffer is provided.
 107       *
 108       * This function accepts the same options as AsyncReadStream::__construct,
 109       * but added the following key value pairs:
 110       *
 111       * - buffer: (StreamInterface) Buffer used to buffer data. If none is
 112       *   provided, a default buffer is created.
 113       * - hwm: (int) High water mark to use if a buffer is created on your
 114       *   behalf.
 115       * - max_buffer: (int) If provided, wraps the utilized buffer in a
 116       *   DroppingStream decorator to ensure that buffer does not exceed a given
 117       *   length. When exceeded, the stream will begin dropping data. Set the
 118       *   max_buffer to 0, to use a NullStream which does not store data.
 119       * - write: (callable) A function that is invoked when data is written
 120       *   to the underlying buffer. The function accepts the buffer as the first
 121       *   argument, and the data being written as the second. The function MUST
 122       *   return the number of bytes that were written or false to let writers
 123       *   know to slow down.
 124       * - drain: (callable) See constructor documentation.
 125       * - pump: (callable) See constructor documentation.
 126       *
 127       * @param array $options Associative array of options.
 128       *
 129       * @return array Returns an array containing the buffer used to buffer
 130       *               data, followed by the ready to use AsyncReadStream object.
 131       */
 132      public static function create(array $options = [])
 133      {
 134          $maxBuffer = isset($options['max_buffer'])
 135              ? $options['max_buffer']
 136              : null;
 137  
 138          if ($maxBuffer === 0) {
 139              $buffer = new NullStream();
 140          } elseif (isset($options['buffer'])) {
 141              $buffer = $options['buffer'];
 142          } else {
 143              $hwm = isset($options['hwm']) ? $options['hwm'] : 16384;
 144              $buffer = new BufferStream($hwm);
 145          }
 146  
 147          if ($maxBuffer > 0) {
 148              $buffer = new DroppingStream($buffer, $options['max_buffer']);
 149          }
 150  
 151          // Call the on_write callback if an on_write function was provided.
 152          if (isset($options['write'])) {
 153              $onWrite = $options['write'];
 154              $buffer = FnStream::decorate($buffer, [
 155                  'write' => function ($string) use ($buffer, $onWrite) {
 156                      $result = $buffer->write($string);
 157                      $onWrite($buffer, $string);
 158                      return $result;
 159                  }
 160              ]);
 161          }
 162  
 163          return [$buffer, new self($buffer, $options)];
 164      }
 165  
 166      public function getSize()
 167      {
 168          return $this->size;
 169      }
 170  
 171      public function isWritable()
 172      {
 173          return false;
 174      }
 175  
 176      public function write($string)
 177      {
 178          return false;
 179      }
 180  
 181      public function read($length)
 182      {
 183          if (!$this->needsDrain && $this->drain) {
 184              $this->needsDrain = $this->stream->getSize() >= $this->hwm;
 185          }
 186  
 187          $result = $this->stream->read($length);
 188  
 189          // If we need to drain, then drain when the buffer is empty.
 190          if ($this->needsDrain && $this->stream->getSize() === 0) {
 191              $this->needsDrain = false;
 192              $drainFn = $this->drain;
 193              $drainFn($this->stream);
 194          }
 195  
 196          $resultLen = strlen($result);
 197  
 198          // If a pump was provided, the buffer is still open, and not enough
 199          // data was given, then block until the data is provided.
 200          if ($this->pump && $resultLen < $length) {
 201              $pumpFn = $this->pump;
 202              $result .= $pumpFn($length - $resultLen);
 203          }
 204  
 205          return $result;
 206      }
 207  }


Generated: Wed Nov 11 20:33:01 2020 Cross-referenced by PHPXref 0.7.1