From 30c13533794817c1887622f405e840f2fc538d90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Sun, 8 Sep 2019 22:38:49 +0200 Subject: [PATCH 1/2] Change JSON stream to always report `data` events instead of `progress` --- README.md | 11 +++------ examples/events.php | 2 +- examples/pull.php | 2 +- src/Client.php | 42 ++++++++++++-------------------- src/Io/StreamingParser.php | 9 +++---- tests/ClientTest.php | 10 ++++---- tests/Io/StreamingParserTest.php | 14 +++++------ 7 files changed, 38 insertions(+), 52 deletions(-) diff --git a/README.md b/README.md index 88fe3cb..06a53b3 100644 --- a/README.md +++ b/README.md @@ -330,18 +330,15 @@ $stream = $client->eventsStream(); The resulting stream will emit the following events: -* `progress`: for *each* element in the update stream -* `error`: once if an error occurs, will close() stream then +* `data`: for *each* element in the update stream +* `error`: once if an error occurs, will close() stream then * Will emit an [`Io\JsonProgressException`](#jsonprogressexception) if an individual progress message contains an error message * Any other `Exception` in case of an transport error, like invalid request etc. -* `close`: once the stream ends (either finished or after "error") - -Please note that the resulting stream does not emit any "data" events, so -you will not be able to pipe() its events into another `WritableStream`. +* `close`: once the stream ends (either finished or after "error") ```php $stream = $client->imageCreateStream('clue/redis-benchmark'); -$stream->on('progress', function ($data) { +$stream->on('data', function ($data) { // data will be emitted for *each* complete element in the JSON stream echo $data['status'] . PHP_EOL; }); diff --git a/examples/events.php b/examples/events.php index 0f6d518..5f9f4a2 100644 --- a/examples/events.php +++ b/examples/events.php @@ -20,7 +20,7 @@ // stream all events for 10 seconds $stream = $client->eventsStream(null, microtime(true) + 10.0); -$stream->on('progress', function ($event) { +$stream->on('data', function ($event) { echo json_encode($event) . PHP_EOL; }); diff --git a/examples/pull.php b/examples/pull.php index 3fbc8a5..3ed14d6 100644 --- a/examples/pull.php +++ b/examples/pull.php @@ -15,7 +15,7 @@ $stream = $client->imageCreateStream($image); -$stream->on('progress', function ($progress) { +$stream->on('data', function ($progress) { echo 'progress: '. json_encode($progress) . PHP_EOL; }); diff --git a/src/Client.php b/src/Client.php index 6c8ba26..c066593 100644 --- a/src/Client.php +++ b/src/Client.php @@ -127,8 +127,7 @@ public function version() public function events($since = null, $until = null, $filters = array()) { return $this->streamingParser->deferredStream( - $this->eventsStream($since, $until, $filters), - 'progress' + $this->eventsStream($since, $until, $filters) ); } @@ -138,12 +137,9 @@ public function events($since = null, $until = null, $filters = array()) * This is a JSON streaming API endpoint that returns a stream instance. * * The resulting stream will emit the following events: - * - progress: for *each* element in the update stream - * - error: once if an error occurs, will close() stream then - * - close: once the stream ends (either finished or after "error") - * - * Please note that the resulting stream does not emit any "data" events, so - * you will not be able to pipe() its events into another `WritableStream`. + * - data: for *each* element in the update stream + * - error: once if an error occurs, will close() stream then + * - close: once the stream ends (either finished or after "error") * * The optional `$filters` parameter can be used to only get events for * certain event types, images and/or containers etc. like this: @@ -758,9 +754,9 @@ public function imageList($all = false) */ public function imageCreate($fromImage = null, $fromSrc = null, $repo = null, $tag = null, $registry = null, $registryAuth = null) { - $stream = $this->imageCreateStream($fromImage, $fromSrc, $repo, $tag, $registry, $registryAuth); - - return $this->streamingParser->deferredStream($stream, 'progress'); + return $this->streamingParser->deferredStream( + $this->imageCreateStream($fromImage, $fromSrc, $repo, $tag, $registry, $registryAuth) + ); } /** @@ -769,12 +765,9 @@ public function imageCreate($fromImage = null, $fromSrc = null, $repo = null, $t * This is a JSON streaming API endpoint that returns a stream instance. * * The resulting stream will emit the following events: - * - progress: for *each* element in the update stream - * - error: once if an error occurs, will close() stream then - * - close: once the stream ends (either finished or after "error") - * - * Please note that the resulting stream does not emit any "data" events, so - * you will not be able to pipe() its events into another `WritableStream`. + * - data: for *each* element in the update stream + * - error: once if an error occurs, will close() stream then + * - close: once the stream ends (either finished or after "error"). * * Pulling a private image from a remote registry will likely require authorization, so make * sure to pass the $registryAuth parameter, see `self::authHeaders()` for @@ -871,9 +864,9 @@ public function imageHistory($image) */ public function imagePush($image, $tag = null, $registry = null, $registryAuth = null) { - $stream = $this->imagePushStream($image, $tag, $registry, $registryAuth); - - return $this->streamingParser->deferredStream($stream, 'progress'); + return $this->streamingParser->deferredStream( + $this->imagePushStream($image, $tag, $registry, $registryAuth) + ); } /** @@ -882,12 +875,9 @@ public function imagePush($image, $tag = null, $registry = null, $registryAuth = * This is a JSON streaming API endpoint that returns a stream instance. * * The resulting stream will emit the following events: - * - progress: for *each* element in the update stream - * - error: once if an error occurs, will close() stream then - * - close: once the stream ends (either finished or after "error") - * - * Please note that the resulting stream does not emit any "data" events, so - * you will not be able to pipe() its events into another `WritableStream`. + * - data: for *each* element in the update stream + * - error: once if an error occurs, will close() stream then + * - close: once the stream ends (either finished or after "error") * * Pushing to a remote registry will likely require authorization, so make * sure to pass the $registryAuth parameter, see `self::authHeaders()` for diff --git a/src/Io/StreamingParser.php b/src/Io/StreamingParser.php index 69c68be..d769958 100644 --- a/src/Io/StreamingParser.php +++ b/src/Io/StreamingParser.php @@ -52,7 +52,7 @@ public function parseJsonStream(PromiseInterface $promise) $out->close(); return; } - $out->emit('progress', array($object, $out)); + $out->emit('data', array($object, $out)); } }); @@ -108,13 +108,12 @@ public function bufferedStream(ReadableStreamInterface $stream) } /** - * Returns a promise which resolves with an array of all "progress" events + * Returns a promise which resolves with an array of all "data" events * * @param ReadableStreamInterface $stream - * @param string $progressEventName the name of the event to collect * @return PromiseInterface Promise */ - public function deferredStream(ReadableStreamInterface $stream, $progressEventName) + public function deferredStream(ReadableStreamInterface $stream) { // cancelling the deferred will (try to) close the stream $deferred = new Deferred(function () use ($stream) { @@ -126,7 +125,7 @@ public function deferredStream(ReadableStreamInterface $stream, $progressEventNa if ($stream->isReadable()) { // buffer all data events for deferred resolving $buffered = array(); - $stream->on($progressEventName, function ($data) use (&$buffered) { + $stream->on('data', function ($data) use (&$buffered) { $buffered []= $data; }); diff --git a/tests/ClientTest.php b/tests/ClientTest.php index 6475d5d..b14f17c 100644 --- a/tests/ClientTest.php +++ b/tests/ClientTest.php @@ -106,7 +106,7 @@ public function testEvents() $this->expectRequest('GET', '/events', $this->createResponseJsonStream($json)); $this->streamingParser->expects($this->once())->method('parseJsonStream')->will($this->returnValue($stream)); - $this->streamingParser->expects($this->once())->method('deferredStream')->with($this->equalTo($stream), $this->equalTo('progress'))->will($this->returnPromise($json)); + $this->streamingParser->expects($this->once())->method('deferredStream')->with($this->equalTo($stream))->will($this->returnPromise($json)); $this->expectPromiseResolveWith($json, $this->client->events()); } @@ -118,7 +118,7 @@ public function testEventsArgs() $this->expectRequest('GET', '/events?since=10&until=20&filters=%7B%22image%22%3A%5B%22busybox%22%2C%22ubuntu%22%5D%7D', $this->createResponseJsonStream($json)); $this->streamingParser->expects($this->once())->method('parseJsonStream')->will($this->returnValue($stream)); - $this->streamingParser->expects($this->once())->method('deferredStream')->with($this->equalTo($stream), $this->equalTo('progress'))->will($this->returnPromise($json)); + $this->streamingParser->expects($this->once())->method('deferredStream')->with($this->equalTo($stream))->will($this->returnPromise($json)); $this->expectPromiseResolveWith($json, $this->client->events(10, 20, array('image' => array('busybox', 'ubuntu')))); } @@ -357,7 +357,7 @@ public function testImageCreate() $this->expectRequest('post', '/images/create?fromImage=busybox', $this->createResponseJsonStream($json)); $this->streamingParser->expects($this->once())->method('parseJsonStream')->will($this->returnValue($stream)); - $this->streamingParser->expects($this->once())->method('deferredStream')->with($this->equalTo($stream), $this->equalTo('progress'))->will($this->returnPromise($json)); + $this->streamingParser->expects($this->once())->method('deferredStream')->with($this->equalTo($stream))->will($this->returnPromise($json)); $this->expectPromiseResolveWith($json, $this->client->imageCreate('busybox')); } @@ -395,7 +395,7 @@ public function testImagePush() $this->expectRequest('post', '/images/123/push', $this->createResponseJsonStream($json)); $this->streamingParser->expects($this->once())->method('parseJsonStream')->will($this->returnValue($stream)); - $this->streamingParser->expects($this->once())->method('deferredStream')->with($this->equalTo($stream), $this->equalTo('progress'))->will($this->returnPromise($json)); + $this->streamingParser->expects($this->once())->method('deferredStream')->with($this->equalTo($stream))->will($this->returnPromise($json)); $this->expectPromiseResolveWith($json, $this->client->imagePush('123')); } @@ -419,7 +419,7 @@ public function testImagePushCustomRegistry() $this->expectRequest('post', '/images/demo.acme.com%3A5000/123/push?tag=test', $this->createResponseJsonStream($json)); $this->streamingParser->expects($this->once())->method('parseJsonStream')->will($this->returnValue($stream)); - $this->streamingParser->expects($this->once())->method('deferredStream')->with($this->equalTo($stream), $this->equalTo('progress'))->will($this->returnPromise($json)); + $this->streamingParser->expects($this->once())->method('deferredStream')->with($this->equalTo($stream))->will($this->returnPromise($json)); $this->expectPromiseResolveWith($json, $this->client->imagePush('123', 'test', 'demo.acme.com:5000', $auth)); } diff --git a/tests/Io/StreamingParserTest.php b/tests/Io/StreamingParserTest.php index 1adaaff..4497a24 100644 --- a/tests/Io/StreamingParserTest.php +++ b/tests/Io/StreamingParserTest.php @@ -73,7 +73,7 @@ public function testDeferredClosedStreamWillReject() $stream = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); $stream->expects($this->once())->method('isReadable')->will($this->returnValue(false)); - $promise = $this->parser->deferredStream($stream, 'anything'); + $promise = $this->parser->deferredStream($stream); $this->expectPromiseReject($promise); } @@ -81,11 +81,11 @@ public function testDeferredStreamEventsWillBeEmittedAndBuffered() { $stream = new ReadableStream(); - $promise = $this->parser->deferredStream($stream, 'demo'); + $promise = $this->parser->deferredStream($stream); $stream->emit('ignored', array('ignored')); - $stream->emit('demo', array('a')); - $stream->emit('demo', array('b')); + $stream->emit('data', array('a')); + $stream->emit('data', array('b')); $stream->close(); @@ -96,11 +96,11 @@ public function testDeferredStreamErrorEventWillRejectPromise() { $stream = new ReadableStream(); - $promise = $this->parser->deferredStream($stream, 'demo'); + $promise = $this->parser->deferredStream($stream); $stream->emit('ignored', array('ignored')); - $stream->emit('demo', array('a')); + $stream->emit('data', array('a')); $stream->emit('error', array('value', 'ignord')); @@ -115,7 +115,7 @@ public function testDeferredCancelingPromiseWillCloseStream() $stream = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); $stream->expects($this->once())->method('isReadable')->willReturn(true); - $promise = $this->parser->deferredStream($stream, 'anything'); + $promise = $this->parser->deferredStream($stream); if (!($promise instanceof CancellablePromiseInterface)) { $this->markTestSkipped('Requires Promise v2 API and has no effect on v1 API'); } From 3bcf96449c0b40adc3b800008ab704e629001716 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Tue, 17 Sep 2019 22:08:52 +0200 Subject: [PATCH 2/2] Strict stream semantics, support backpressure and improve error handling --- README.md | 26 +--- src/Io/JsonProgressException.php | 27 ---- src/Io/ReadableDemultiplexStream.php | 13 +- src/Io/ReadableJsonStream.php | 129 +++++++++++++++++ src/Io/StreamingParser.php | 44 +----- tests/FunctionalClientTest.php | 4 +- tests/Io/ReadableDemultiplexStreamTest.php | 22 +++ tests/Io/ReadableJsonStreamTest.php | 155 +++++++++++++++++++++ 8 files changed, 326 insertions(+), 94 deletions(-) delete mode 100644 src/Io/JsonProgressException.php create mode 100644 src/Io/ReadableJsonStream.php create mode 100644 tests/Io/ReadableJsonStreamTest.php diff --git a/README.md b/README.md index 06a53b3..fc3d829 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,6 @@ its event-driven model to react to changes and events happening. * [Command streaming](#command-streaming) * [TAR streaming](#tar-streaming) * [JSON streaming](#json-streaming) - * [JsonProgressException](#jsonprogressexception) * [Install](#install) * [Tests](#tests) * [License](#license) @@ -295,17 +294,13 @@ progress events once the stream ends: ```php $client->imageCreate('clue/streamripper')->then( - function ($data) { + function (array $data) { // $data is an array of *all* elements in the JSON stream + var_dump($data); }, - function ($error) { + function (Exception $error) { // an error occurred (possibly after receiving *some* elements) - - if ($error instanceof Io\JsonProgressException) { - // a progress message (usually the last) contains an error message - } else { - // any other error, like invalid request etc. - } + echo 'Error: ' . $error->getMessage() . PHP_EOL; } ); ``` @@ -332,13 +327,13 @@ The resulting stream will emit the following events: * `data`: for *each* element in the update stream * `error`: once if an error occurs, will close() stream then - * Will emit an [`Io\JsonProgressException`](#jsonprogressexception) if an individual progress message contains an error message - * Any other `Exception` in case of an transport error, like invalid request etc. + * Will emit a `RuntimeException` if an individual progress message contains an error message + or any other `Exception` in case of an transport error, like invalid request etc. * `close`: once the stream ends (either finished or after "error") ```php $stream = $client->imageCreateStream('clue/redis-benchmark'); -$stream->on('data', function ($data) { +$stream->on('data', function (array $data) { // data will be emitted for *each* complete element in the JSON stream echo $data['status'] . PHP_EOL; }); @@ -350,13 +345,6 @@ $stream->on('close', function () { See also the [pull example](examples/pull.php) and the [push example](examples/push.php). -### JsonProgressException - -The `Io\JsonProgressException` will be thrown by [JSON streaming](#json-streaming) -endpoints if an individual progress message contains an error message. - -The `getData()` method can be used to obtain the progress message. - ## Install The recommended way to install this library is [through Composer](https://getcomposer.org). diff --git a/src/Io/JsonProgressException.php b/src/Io/JsonProgressException.php deleted file mode 100644 index 089e9d9..0000000 --- a/src/Io/JsonProgressException.php +++ /dev/null @@ -1,27 +0,0 @@ -data = $data; - } - - public function getData() - { - return $data; - } -} diff --git a/src/Io/ReadableDemultiplexStream.php b/src/Io/ReadableDemultiplexStream.php index e684262..556edb7 100644 --- a/src/Io/ReadableDemultiplexStream.php +++ b/src/Io/ReadableDemultiplexStream.php @@ -2,10 +2,11 @@ namespace Clue\React\Docker\Io; -use React\Stream\ReadableStreamInterface; use Evenement\EventEmitter; -use React\Stream\WritableStreamInterface; +use React\Stream\ReadableStreamInterface; use React\Stream\Util; +use React\Stream\WritableStreamInterface; + /** * Parser for Docker's own frame format used for bidrectional frames * @@ -48,7 +49,7 @@ public function __construct(ReadableStreamInterface $multiplexed, $stderrEvent = // buffer must be empty on end, otherwise this is an error situation if ($buffer === '') { - $out->emit('end', array()); + $out->emit('end'); } else { $out->emit('error', array(new \RuntimeException('Stream ended within incomplete multiplexed chunk'))); } @@ -62,7 +63,7 @@ public function __construct(ReadableStreamInterface $multiplexed, $stderrEvent = }); // forward close event to output - $multiplexed->on('close', function ($error) use ($out) { + $multiplexed->on('close', function () use ($out) { $out->close(); }); } @@ -130,7 +131,9 @@ public function close() // closing output stream closes input stream $this->multiplexed->close(); + $this->buffer = ''; - $this->emit('close', array()); + $this->emit('close'); + $this->removeAllListeners(); } } diff --git a/src/Io/ReadableJsonStream.php b/src/Io/ReadableJsonStream.php new file mode 100644 index 0000000..5143fbb --- /dev/null +++ b/src/Io/ReadableJsonStream.php @@ -0,0 +1,129 @@ +input = $input; + $this->parser = $parser = new StreamingJsonParser(); + if (!$input->isReadable()) { + $this->close(); + return; + } + + // pass all input data chunks through the parser + $input->on('data', array($this, 'handleData')); + + // forward end event to output + $out = $this; + $closed =& $this->closed; + $input->on('end', function () use ($out, $parser, &$closed) { + // ignore duplicate end events + if ($closed) { + return; + } + + if ($parser->isEmpty()) { + $out->emit('end'); + } else { + $out->emit('error', array(new \RuntimeException('Stream ended within incomplete JSON data'))); + } + $out->close(); + }); + + // forward error event to output + $input->on('error', function ($error) use ($out) { + $out->emit('error', array($error)); + $out->close(); + }); + + // forward close event to output + $input->on('close', function () use ($out) { + $out->close(); + }); + } + + /** + * push the given stream chunk into the parser buffer and try to extract all JSON messages + * + * @internal + * @param string $data + */ + public function handleData($data) + { + // forward each data chunk to the streaming JSON parser + try { + $objects = $this->parser->push($data); + } catch (\Exception $e) { + $this->emit('error', array($e)); + $this->close(); + return; + } + + foreach ($objects as $object) { + // stop emitting data if stream is already closed + if ($this->closed) { + return; + } + + if (isset($object['error'])) { + $this->emit('error', array(new \RuntimeException($object['error']))); + $this->close(); + return; + } + $this->emit('data', array($object)); + } + } + + public function pause() + { + $this->input->pause(); + } + + public function resume() + { + $this->input->resume(); + } + + public function isReadable() + { + return $this->input->isReadable(); + } + + public function pipe(WritableStreamInterface $dest, array $options = array()) + { + return Util::pipe($this, $dest, $options); + } + + public function close() + { + if ($this->closed) { + return; + } + + $this->closed = true; + + // closing output stream closes input stream + $this->input->close(); + + $this->emit('close'); + $this->removeAllListeners(); + } +} diff --git a/src/Io/StreamingParser.php b/src/Io/StreamingParser.php index d769958..c0a0640 100644 --- a/src/Io/StreamingParser.php +++ b/src/Io/StreamingParser.php @@ -2,15 +2,12 @@ namespace Clue\React\Docker\Io; +use Clue\React\Promise\Stream; +use Psr\Http\Message\ResponseInterface; use React\Promise\PromiseInterface; -use Clue\JsonStream\StreamingJsonParser; use React\Promise\Deferred; -use React\Stream\ReadableStream; use React\Stream\ReadableStreamInterface; use RuntimeException; -use React\Promise\CancellablePromiseInterface; -use Clue\React\Promise\Stream; -use Psr\Http\Message\ResponseInterface; /** * StreamingParser is a simple helper to work with the streaming body of HTTP response objects @@ -31,42 +28,7 @@ public function parseJsonStream(PromiseInterface $promise) { // application/json - $in = $this->parsePlainStream($promise); - $out = new ReadableStream(); - - // invalid/closing input stream => return closed output stream - if (!$in->isReadable()) { - $out->close(); - - return $out; - } - - // forward each data chunk to the streaming JSON parser - $parser = new StreamingJsonParser(); - $in->on('data', function ($data) use ($parser, $out) { - $objects = $parser->push($data); - - foreach ($objects as $object) { - if (isset($object['error'])) { - $out->emit('error', array(new JsonProgressException($object), $out)); - $out->close(); - return; - } - $out->emit('data', array($object, $out)); - } - }); - - // forward error and make sure stream closes - $in->on('error', function ($error) use ($out) { - $out->emit('error', array($error, $out)); - $out->close(); - }); - - // closing either stream closes the other one - $in->on('close', array($out, 'close')); - $out->on('close', array($in, 'close')); - - return $out; + return new ReadableJsonStream($this->parsePlainStream($promise)); } /** diff --git a/tests/FunctionalClientTest.php b/tests/FunctionalClientTest.php index 23adb87..a988558 100644 --- a/tests/FunctionalClientTest.php +++ b/tests/FunctionalClientTest.php @@ -386,8 +386,8 @@ public function testImageCreateStreamMissingWillEmitJsonError() $old || $stream->on('progress', $this->expectCallableNever()); $stream->on('data', $this->expectCallableNever()); - // will emit "error" with JsonProgressException and close - $old && $stream->on('error', $this->expectCallableOnceParameter('Clue\React\Docker\Io\JsonProgressException')); + // will emit "error" with RuntimeException and close + $old && $stream->on('error', $this->expectCallableOnceParameter('RuntimeException')); $old || $stream->on('error', $this->expectCallableOnceParameter('Clue\React\Buzz\Message\ResponseException')); $stream->on('close', $this->expectCallableOnce()); diff --git a/tests/Io/ReadableDemultiplexStreamTest.php b/tests/Io/ReadableDemultiplexStreamTest.php index cf8c9bd..5f295f3 100644 --- a/tests/Io/ReadableDemultiplexStreamTest.php +++ b/tests/Io/ReadableDemultiplexStreamTest.php @@ -82,6 +82,28 @@ public function testStreamWillEmitDataOnCompleteFrameChunked() $this->stream->emit('data', array("st")); } + public function testCloseFromDataEventWillStopEmittingFurtherDataEvents() + { + $parser = $this->parser; + $this->parser->on('data', function () use ($parser) { + $parser->close(); + }); + + $this->parser->on('data', $this->expectCallableOnceWith('a')); + + $this->stream->emit('data', array("\x01\x00\x00\x00" . "\x00\x00\x00\x01" . "a" . "\x01\x00\x00\x00" . "\x00\x00\x00\x01" . "b")); + } + + public function testCloseTwiceWillEmitCloseOnceAndRemoveAllListeners() + { + $this->parser->on('close', $this->expectCallableOnce()); + + $this->parser->close(); + $this->parser->close(); + + $this->assertEquals(array(), $this->parser->listeners('close')); + } + public function testPipeWillBeForwardedToTargetStream() { $target = new WritableStream(); diff --git a/tests/Io/ReadableJsonStreamTest.php b/tests/Io/ReadableJsonStreamTest.php new file mode 100644 index 0000000..d273503 --- /dev/null +++ b/tests/Io/ReadableJsonStreamTest.php @@ -0,0 +1,155 @@ +stream = new ReadableStream(); + $this->parser = new ReadableJsonStream($this->stream); + } + + public function testStreamWillForwardEndAndClose() + { + $this->parser->on('data', $this->expectCallableNever()); + $this->parser->on('close', $this->expectCallableOnce()); + $this->parser->on('end', $this->expectCallableOnce()); + + $this->stream->emit('end'); + + $this->assertFalse($this->parser->isReadable()); + } + + public function testStreamWillForwardErrorAndClose() + { + $this->parser->on('error', $this->expectCallableOnce()); + $this->parser->on('close', $this->expectCallableOnce()); + $this->parser->on('end', $this->expectCallableNever()); + + $this->stream->emit('error', array(new \RuntimeException('Test'))); + + $this->assertFalse($this->parser->isReadable()); + } + + public function testStreamWillEmitErrorWhenEndingWithinStream() + { + $this->parser->on('data', $this->expectCallableNever()); + $this->parser->on('error', $this->expectCallableOnce()); + $this->parser->on('close', $this->expectCallableOnce()); + $this->parser->on('end', $this->expectCallableNever()); + + $this->stream->emit('data', array('[')); + $this->stream->emit('end'); + + $this->assertFalse($this->parser->isReadable()); + } + + public function testStreamWillEmitDataOnCompleteArray() + { + $this->parser->on('data', $this->expectCallableOnceWith(array(1, 2))); + + $this->stream->emit('data', array("[1,2]")); + } + + public function testStreamWillEmitErrorOnCompleteErrorObject() + { + $this->parser->on('data', $this->expectCallableNever()); + $this->parser->on('error', $this->expectCallableOnce()); + $this->parser->on('end', $this->expectCallableNever()); + $this->parser->on('close', $this->expectCallableOnce()); + + $this->stream->emit('data', array("{\"error\":\"message\"}")); + } + + public function testStreamWillEmitErrorOnInvalidData() + { + $this->parser->on('data', $this->expectCallableNever()); + $this->parser->on('error', $this->expectCallableOnce()); + $this->parser->on('end', $this->expectCallableNever()); + $this->parser->on('close', $this->expectCallableOnce()); + + $this->stream->emit('data', array("oops")); + } + + public function testStreamWillNotEmitDataOnIncompleteArray() + { + $this->parser->on('data', $this->expectCallableNever()); + + $this->stream->emit('data', array("[1,2")); + } + + public function testStreamWillEmitDataOnCompleteArrayChunked() + { + $this->parser->on('data', $this->expectCallableOnceWith(array(1,2))); + + $this->stream->emit('data', array("[1,")); + $this->stream->emit('data', array("2]")); + } + + public function testStreamWillEmitDataTwiceOnOneChunkWithTwoCompleteArrays() + { + $mock = $this->createCallableMock(); + $mock->expects($this->exactly(2))->method('__invoke'); + + $this->parser->on('data',$mock); + + $this->stream->emit('data', array("[1][2]")); + } + + public function testCloseFromDataEventWillStopEmittingFurtherDataEvents() + { + $parser = $this->parser; + $this->parser->on('data', function () use ($parser) { + $parser->close(); + }); + + $this->parser->on('data', $this->expectCallableOnceWith(array(1))); + + $this->stream->emit('data', array("[1][2]")); + } + + public function testCloseTwiceWillEmitCloseOnceAndRemoveAllListeners() + { + $this->parser->on('close', $this->expectCallableOnce()); + + $this->parser->close(); + $this->parser->close(); + + $this->assertEquals(array(), $this->parser->listeners('close')); + } + + public function testPipeWillBeForwardedToTargetStream() + { + $target = new WritableStream(); + $target->on('pipe', $this->expectCallableOnceWith($this->parser)); + + $this->parser->pipe($target); + } + + public function testPauseWillBeForwarded() + { + $this->stream = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $this->stream->expects($this->once())->method('pause'); + $this->parser = new ReadableJsonStream($this->stream); + + $this->parser->pause(); + } + + public function testResumeWillBeForwarded() + { + $this->stream = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $this->stream->expects($this->once())->method('resume'); + $this->parser = new ReadableJsonStream($this->stream); + + $this->parser->resume(); + } +}