Skip to content

Commit

Permalink
Simplify usage by supporting new default loop
Browse files Browse the repository at this point in the history
  • Loading branch information
clue committed Jul 5, 2021
1 parent 2859a8c commit b7a85ad
Show file tree
Hide file tree
Showing 14 changed files with 116 additions and 61 deletions.
73 changes: 48 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ Re-attach the data source after a previous `pause()`.
```php
$stream->pause();

$loop->addTimer(1.0, function () use ($stream) {
Loop::addTimer(1.0, function () use ($stream) {
$stream->resume();
});
```
Expand Down Expand Up @@ -737,7 +737,7 @@ stream in order to stop waiting for the stream to flush its final data.

```php
$stream->end();
$loop->addTimer(1.0, function () use ($stream) {
Loop::addTimer(1.0, function () use ($stream) {
$stream->close();
});
```
Expand Down Expand Up @@ -821,7 +821,7 @@ This can be used to represent a read-only resource like a file stream opened in
readable mode or a stream such as `STDIN`:

```php
$stream = new ReadableResourceStream(STDIN, $loop);
$stream = new ReadableResourceStream(STDIN);
$stream->on('data', function ($chunk) {
echo $chunk;
});
Expand All @@ -838,7 +838,7 @@ Otherwise, it will throw an `InvalidArgumentException`:

```php
// throws InvalidArgumentException
$stream = new ReadableResourceStream(false, $loop);
$stream = new ReadableResourceStream(false);
```

See also the [`DuplexResourceStream`](#readableresourcestream) for read-and-write
Expand All @@ -851,14 +851,20 @@ If this fails, it will throw a `RuntimeException`:

```php
// throws RuntimeException on Windows
$stream = new ReadableResourceStream(STDIN, $loop);
$stream = new ReadableResourceStream(STDIN);
```

Once the constructor is called with a valid stream resource, this class will
take care of the underlying stream resource.
You SHOULD only use its public API and SHOULD NOT interfere with the underlying
stream resource manually.

This class takes an optional `LoopInterface|null $loop` parameter that can be used to
pass the event loop instance to use for this object. You can use a `null` value
here in order to use the [default loop](https://github.com/reactphp/event-loop#loop).
This value SHOULD NOT be given unless you're sure you want to explicitly use a
given event loop instance.

This class takes an optional `int|null $readChunkSize` parameter that controls
the maximum buffer size in bytes to read at once from the stream.
You can use a `null` value here in order to apply its default value.
Expand All @@ -874,7 +880,7 @@ This should read until the stream resource is not readable anymore
mean it reached EOF.

```php
$stream = new ReadableResourceStream(STDIN, $loop, 8192);
$stream = new ReadableResourceStream(STDIN, null, 8192);
```

> PHP bug warning: If the PHP process has explicitly been started without a
Expand All @@ -883,6 +889,9 @@ $stream = new ReadableResourceStream(STDIN, $loop, 8192);
stream like `php test.php < /dev/null` instead of `php test.php <&-`.
See [#81](https://github.com/reactphp/stream/issues/81) for more details.

> Changelog: As of v1.2.0 the `$loop` parameter can be omitted (or skipped with a
`null` value) to use the [default loop](https://github.com/reactphp/event-loop#loop).

### WritableResourceStream

The `WritableResourceStream` is a concrete implementation of the
Expand All @@ -892,7 +901,7 @@ This can be used to represent a write-only resource like a file stream opened in
writable mode or a stream such as `STDOUT` or `STDERR`:

```php
$stream = new WritableResourceStream(STDOUT, $loop);
$stream = new WritableResourceStream(STDOUT);
$stream->write('hello!');
$stream->end();
```
Expand All @@ -905,7 +914,7 @@ Otherwise, it will throw an `InvalidArgumentException`:

```php
// throws InvalidArgumentException
$stream = new WritableResourceStream(false, $loop);
$stream = new WritableResourceStream(false);
```

See also the [`DuplexResourceStream`](#readableresourcestream) for read-and-write
Expand All @@ -918,7 +927,7 @@ If this fails, it will throw a `RuntimeException`:

```php
// throws RuntimeException on Windows
$stream = new WritableResourceStream(STDOUT, $loop);
$stream = new WritableResourceStream(STDOUT);
```

Once the constructor is called with a valid stream resource, this class will
Expand All @@ -933,13 +942,19 @@ For this, it uses an in-memory buffer string to collect all outstanding writes.
This buffer has a soft-limit applied which defines how much data it is willing
to accept before the caller SHOULD stop sending further data.

This class takes an optional `LoopInterface|null $loop` parameter that can be used to
pass the event loop instance to use for this object. You can use a `null` value
here in order to use the [default loop](https://github.com/reactphp/event-loop#loop).
This value SHOULD NOT be given unless you're sure you want to explicitly use a
given event loop instance.

This class takes an optional `int|null $writeBufferSoftLimit` parameter that controls
this maximum buffer size in bytes.
You can use a `null` value here in order to apply its default value.
This value SHOULD NOT be changed unless you know what you're doing.

```php
$stream = new WritableResourceStream(STDOUT, $loop, 8192);
$stream = new WritableResourceStream(STDOUT, null, 8192);
```

This class takes an optional `int|null $writeChunkSize` parameter that controls
Expand All @@ -954,11 +969,14 @@ This can be `-1` which means "write everything available" to the
underlying stream resource.

```php
$stream = new WritableResourceStream(STDOUT, $loop, null, 8192);
$stream = new WritableResourceStream(STDOUT, null, null, 8192);
```

See also [`write()`](#write) for more details.

> Changelog: As of v1.2.0 the `$loop` parameter can be omitted (or skipped with a
`null` value) to use the [default loop](https://github.com/reactphp/event-loop#loop).

### DuplexResourceStream

The `DuplexResourceStream` is a concrete implementation of the
Expand All @@ -969,7 +987,7 @@ in read and write mode mode or a stream such as a TCP/IP connection:

```php
$conn = stream_socket_client('tcp://google.com:80');
$stream = new DuplexResourceStream($conn, $loop);
$stream = new DuplexResourceStream($conn);
$stream->write('hello!');
$stream->end();
```
Expand All @@ -982,7 +1000,7 @@ Otherwise, it will throw an `InvalidArgumentException`:

```php
// throws InvalidArgumentException
$stream = new DuplexResourceStream(false, $loop);
$stream = new DuplexResourceStream(false);
```

See also the [`ReadableResourceStream`](#readableresourcestream) for read-only
Expand All @@ -996,14 +1014,20 @@ If this fails, it will throw a `RuntimeException`:

```php
// throws RuntimeException on Windows
$stream = new DuplexResourceStream(STDOUT, $loop);
$stream = new DuplexResourceStream(STDOUT);
```

Once the constructor is called with a valid stream resource, this class will
take care of the underlying stream resource.
You SHOULD only use its public API and SHOULD NOT interfere with the underlying
stream resource manually.

This class takes an optional `LoopInterface|null $loop` parameter that can be used to
pass the event loop instance to use for this object. You can use a `null` value
here in order to use the [default loop](https://github.com/reactphp/event-loop#loop).
This value SHOULD NOT be given unless you're sure you want to explicitly use a
given event loop instance.

This class takes an optional `int|null $readChunkSize` parameter that controls
the maximum buffer size in bytes to read at once from the stream.
You can use a `null` value here in order to apply its default value.
Expand All @@ -1020,7 +1044,7 @@ mean it reached EOF.

```php
$conn = stream_socket_client('tcp://google.com:80');
$stream = new DuplexResourceStream($conn, $loop, 8192);
$stream = new DuplexResourceStream($conn, null, 8192);
```

Any `write()` calls to this class will not be performed instantly, but will
Expand All @@ -1040,12 +1064,15 @@ If you want to change the write buffer soft limit, you can pass an instance of

```php
$conn = stream_socket_client('tcp://google.com:80');
$buffer = new WritableResourceStream($conn, $loop, 8192);
$stream = new DuplexResourceStream($conn, $loop, null, $buffer);
$buffer = new WritableResourceStream($conn, null, 8192);
$stream = new DuplexResourceStream($conn, null, null, $buffer);
```

See also [`WritableResourceStream`](#writableresourcestream) for more details.

> Changelog: As of v1.2.0 the `$loop` parameter can be omitted (or skipped with a
`null` value) to use the [default loop](https://github.com/reactphp/event-loop#loop).

### ThroughStream

The `ThroughStream` implements the
Expand Down Expand Up @@ -1123,8 +1150,8 @@ This is useful for some APIs which may require a single
more convenient to work with a single stream instance like this:

```php
$stdin = new ReadableResourceStream(STDIN, $loop);
$stdout = new WritableResourceStream(STDOUT, $loop);
$stdin = new ReadableResourceStream(STDIN);
$stdout = new WritableResourceStream(STDOUT);

$stdio = new CompositeStream($stdin, $stdout);

Expand Down Expand Up @@ -1154,14 +1181,10 @@ The following example can be used to pipe the contents of a source file into
a destination file without having to ever read the whole file into memory:

```php
$loop = new React\EventLoop\StreamSelectLoop;

$source = new React\Stream\ReadableResourceStream(fopen('source.txt', 'r'), $loop);
$dest = new React\Stream\WritableResourceStream(fopen('destination.txt', 'w'), $loop);
$source = new React\Stream\ReadableResourceStream(fopen('source.txt', 'r'));
$dest = new React\Stream\WritableResourceStream(fopen('destination.txt', 'w'));

$source->pipe($dest);

$loop->run();
```

> Note that this example uses `fopen()` for illustration purposes only.
Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
],
"require": {
"php": ">=5.3.8",
"react/event-loop": "^1.0 || ^0.5 || ^0.4 || ^0.3.5",
"react/event-loop": "dev-master#78f7f43 as 1.2.0",
"evenement/evenement": "^3.0 || ^2.0 || ^1.0"
},
"require-dev": {
Expand Down
6 changes: 1 addition & 5 deletions examples/01-http.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
// $ php examples/01-http.php
// $ php examples/01-http.php reactphp.org

use React\EventLoop\Factory;
use React\Stream\DuplexResourceStream;

require __DIR__ . '/../vendor/autoload.php';
Expand All @@ -25,8 +24,7 @@
exit(1);
}

$loop = Factory::create();
$stream = new DuplexResourceStream($resource, $loop);
$stream = new DuplexResourceStream($resource);

$stream->on('data', function ($chunk) {
echo $chunk;
Expand All @@ -36,5 +34,3 @@
});

$stream->write("GET / HTTP/1.0\r\nHost: $host\r\n\r\n");

$loop->run();
6 changes: 1 addition & 5 deletions examples/02-https.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
// $ php examples/02-https.php
// $ php examples/02-https.php reactphp.org

use React\EventLoop\Factory;
use React\Stream\DuplexResourceStream;

require __DIR__ . '/../vendor/autoload.php';
Expand All @@ -25,8 +24,7 @@
exit(1);
}

$loop = Factory::create();
$stream = new DuplexResourceStream($resource, $loop);
$stream = new DuplexResourceStream($resource);

$stream->on('data', function ($chunk) {
echo $chunk;
Expand All @@ -36,5 +34,3 @@
});

$stream->write("GET / HTTP/1.0\r\nHost: $host\r\n\r\n");

$loop->run();
9 changes: 2 additions & 7 deletions examples/11-cat.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
// $ php examples/11-cat.php < README.md
// $ echo hello | php examples/11-cat.php

use React\EventLoop\Factory;
use React\Stream\ReadableResourceStream;
use React\Stream\WritableResourceStream;

Expand All @@ -19,10 +18,6 @@
exit(1);
}

$loop = Factory::create();

$stdout = new WritableResourceStream(STDOUT, $loop);
$stdin = new ReadableResourceStream(STDIN, $loop);
$stdout = new WritableResourceStream(STDOUT);
$stdin = new ReadableResourceStream(STDIN);
$stdin->pipe($stdout);

$loop->run();
18 changes: 8 additions & 10 deletions examples/91-benchmark-throughput.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// $ php examples/91-benchmark-throughput.php -t 10 -o zero.bin
// $ php examples/91-benchmark-throughput.php -t 60 -i zero.bin

use React\EventLoop\Loop;

require __DIR__ . '/../vendor/autoload.php';

if (DIRECTORY_SEPARATOR === '\\') {
Expand All @@ -27,36 +29,32 @@
$if = str_replace('/dev/fd/', 'php://fd/', $if);
$of = str_replace('/dev/fd/', 'php://fd/', $of);

$loop = new React\EventLoop\StreamSelectLoop();

// setup information stream
$info = new React\Stream\WritableResourceStream(STDERR, $loop);
$info = new React\Stream\WritableResourceStream(STDERR);
if (extension_loaded('xdebug')) {
$info->write('NOTICE: The "xdebug" extension is loaded, this has a major impact on performance.' . PHP_EOL);
}
$info->write('piping from ' . $if . ' to ' . $of . ' (for max ' . $t . ' second(s)) ...'. PHP_EOL);

// setup input and output streams and pipe inbetween
$fh = fopen($if, 'r');
$in = new React\Stream\ReadableResourceStream($fh, $loop);
$out = new React\Stream\WritableResourceStream(fopen($of, 'w'), $loop);
$in = new React\Stream\ReadableResourceStream($fh);
$out = new React\Stream\WritableResourceStream(fopen($of, 'w'));
$in->pipe($out);

// stop input stream in $t seconds
$start = microtime(true);
$timeout = $loop->addTimer($t, function () use ($in, &$bytes) {
$timeout = Loop::addTimer($t, function () use ($in) {
$in->close();
});

// print stream position once stream closes
$in->on('close', function () use ($fh, $start, $loop, $timeout, $info) {
$in->on('close', function () use ($fh, $start, $timeout, $info) {
$t = microtime(true) - $start;
$loop->cancelTimer($timeout);
Loop::cancelTimer($timeout);

$bytes = ftell($fh);

$info->write('read ' . $bytes . ' byte(s) in ' . round($t, 3) . ' second(s) => ' . round($bytes / 1024 / 1024 / $t, 1) . ' MiB/s' . PHP_EOL);
$info->write('peak memory usage of ' . round(memory_get_peak_usage(true) / 1024 / 1024, 1) . ' MiB' . PHP_EOL);
});

$loop->run();
7 changes: 5 additions & 2 deletions src/DuplexResourceStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
namespace React\Stream;

use Evenement\EventEmitter;
use React\EventLoop\Loop;
use React\EventLoop\LoopInterface;
use InvalidArgumentException;

final class DuplexResourceStream extends EventEmitter implements DuplexStreamInterface
{
private $stream;

/** @var LoopInterface */
private $loop;

/**
Expand All @@ -35,7 +38,7 @@ final class DuplexResourceStream extends EventEmitter implements DuplexStreamInt
private $closing = false;
private $listening = false;

public function __construct($stream, LoopInterface $loop, $readChunkSize = null, WritableStreamInterface $buffer = null)
public function __construct($stream, LoopInterface $loop = null, $readChunkSize = null, WritableStreamInterface $buffer = null)
{
if (!\is_resource($stream) || \get_resource_type($stream) !== "stream") {
throw new InvalidArgumentException('First parameter must be a valid stream resource');
Expand Down Expand Up @@ -70,7 +73,7 @@ public function __construct($stream, LoopInterface $loop, $readChunkSize = null,
}

$this->stream = $stream;
$this->loop = $loop;
$this->loop = $loop ?: Loop::get();
$this->bufferSize = ($readChunkSize === null) ? 65536 : (int)$readChunkSize;
$this->buffer = $buffer;

Expand Down
Loading

0 comments on commit b7a85ad

Please sign in to comment.