Skip to content

Commit

Permalink
Merge pull request #50 from clue-labs/stream-semantics
Browse files Browse the repository at this point in the history
Change JSON stream to always report `data` events instead of `progress`, follow strict stream semantics, support backpressure and improve error handling
  • Loading branch information
clue authored Sep 18, 2019
2 parents b9b09be + 3bcf964 commit f4a28bf
Show file tree
Hide file tree
Showing 13 changed files with 362 additions and 144 deletions.
35 changes: 10 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ its event-driven model to react to changes and events happening.
* [Command streaming](#command-streaming)
* [TAR streaming](#tar-streaming)
* [JSON streaming](#json-streaming)
* [JsonProgressException](#jsonprogressexception)
* [Install](#install)
* [Tests](#tests)
* [License](#license)
Expand Down Expand Up @@ -295,17 +294,13 @@ progress events once the stream ends:

```php
$client->imageCreate('clue/streamripper')->then(
function ($data) {
function (array $data) {
// $data is an array of *all* elements in the JSON stream
var_dump($data);
},
function ($error) {
function (Exception $error) {
// an error occurred (possibly after receiving *some* elements)

if ($error instanceof Io\JsonProgressException) {
// a progress message (usually the last) contains an error message
} else {
// any other error, like invalid request etc.
}
echo 'Error: ' . $error->getMessage() . PHP_EOL;
}
);
```
Expand All @@ -330,18 +325,15 @@ $stream = $client->eventsStream();

The resulting stream will emit the following events:

* `progress`: for *each* element in the update stream
* `error`: once if an error occurs, will close() stream then
* Will emit an [`Io\JsonProgressException`](#jsonprogressexception) if an individual progress message contains an error message
* Any other `Exception` in case of an transport error, like invalid request etc.
* `close`: once the stream ends (either finished or after "error")

Please note that the resulting stream does not emit any "data" events, so
you will not be able to pipe() its events into another `WritableStream`.
* `data`: for *each* element in the update stream
* `error`: once if an error occurs, will close() stream then
* Will emit a `RuntimeException` if an individual progress message contains an error message
or any other `Exception` in case of an transport error, like invalid request etc.
* `close`: once the stream ends (either finished or after "error")

```php
$stream = $client->imageCreateStream('clue/redis-benchmark');
$stream->on('progress', function ($data) {
$stream->on('data', function (array $data) {
// data will be emitted for *each* complete element in the JSON stream
echo $data['status'] . PHP_EOL;
});
Expand All @@ -353,13 +345,6 @@ $stream->on('close', function () {

See also the [pull example](examples/pull.php) and the [push example](examples/push.php).

### JsonProgressException

The `Io\JsonProgressException` will be thrown by [JSON streaming](#json-streaming)
endpoints if an individual progress message contains an error message.

The `getData()` method can be used to obtain the progress message.

## Install

The recommended way to install this library is [through Composer](https://getcomposer.org).
Expand Down
2 changes: 1 addition & 1 deletion examples/events.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// stream all events for 10 seconds
$stream = $client->eventsStream(null, microtime(true) + 10.0);

$stream->on('progress', function ($event) {
$stream->on('data', function ($event) {
echo json_encode($event) . PHP_EOL;
});

Expand Down
2 changes: 1 addition & 1 deletion examples/pull.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

$stream = $client->imageCreateStream($image);

$stream->on('progress', function ($progress) {
$stream->on('data', function ($progress) {
echo 'progress: '. json_encode($progress) . PHP_EOL;
});

Expand Down
42 changes: 16 additions & 26 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ public function version()
public function events($since = null, $until = null, $filters = array())
{
return $this->streamingParser->deferredStream(
$this->eventsStream($since, $until, $filters),
'progress'
$this->eventsStream($since, $until, $filters)
);
}

Expand All @@ -138,12 +137,9 @@ public function events($since = null, $until = null, $filters = array())
* This is a JSON streaming API endpoint that returns a stream instance.
*
* The resulting stream will emit the following events:
* - progress: for *each* element in the update stream
* - error: once if an error occurs, will close() stream then
* - close: once the stream ends (either finished or after "error")
*
* Please note that the resulting stream does not emit any "data" events, so
* you will not be able to pipe() its events into another `WritableStream`.
* - data: for *each* element in the update stream
* - error: once if an error occurs, will close() stream then
* - close: once the stream ends (either finished or after "error")
*
* The optional `$filters` parameter can be used to only get events for
* certain event types, images and/or containers etc. like this:
Expand Down Expand Up @@ -758,9 +754,9 @@ public function imageList($all = false)
*/
public function imageCreate($fromImage = null, $fromSrc = null, $repo = null, $tag = null, $registry = null, $registryAuth = null)
{
$stream = $this->imageCreateStream($fromImage, $fromSrc, $repo, $tag, $registry, $registryAuth);

return $this->streamingParser->deferredStream($stream, 'progress');
return $this->streamingParser->deferredStream(
$this->imageCreateStream($fromImage, $fromSrc, $repo, $tag, $registry, $registryAuth)
);
}

/**
Expand All @@ -769,12 +765,9 @@ public function imageCreate($fromImage = null, $fromSrc = null, $repo = null, $t
* This is a JSON streaming API endpoint that returns a stream instance.
*
* The resulting stream will emit the following events:
* - progress: for *each* element in the update stream
* - error: once if an error occurs, will close() stream then
* - close: once the stream ends (either finished or after "error")
*
* Please note that the resulting stream does not emit any "data" events, so
* you will not be able to pipe() its events into another `WritableStream`.
* - data: for *each* element in the update stream
* - error: once if an error occurs, will close() stream then
* - close: once the stream ends (either finished or after "error").
*
* Pulling a private image from a remote registry will likely require authorization, so make
* sure to pass the $registryAuth parameter, see `self::authHeaders()` for
Expand Down Expand Up @@ -871,9 +864,9 @@ public function imageHistory($image)
*/
public function imagePush($image, $tag = null, $registry = null, $registryAuth = null)
{
$stream = $this->imagePushStream($image, $tag, $registry, $registryAuth);

return $this->streamingParser->deferredStream($stream, 'progress');
return $this->streamingParser->deferredStream(
$this->imagePushStream($image, $tag, $registry, $registryAuth)
);
}

/**
Expand All @@ -882,12 +875,9 @@ public function imagePush($image, $tag = null, $registry = null, $registryAuth =
* This is a JSON streaming API endpoint that returns a stream instance.
*
* The resulting stream will emit the following events:
* - progress: for *each* element in the update stream
* - error: once if an error occurs, will close() stream then
* - close: once the stream ends (either finished or after "error")
*
* Please note that the resulting stream does not emit any "data" events, so
* you will not be able to pipe() its events into another `WritableStream`.
* - data: for *each* element in the update stream
* - error: once if an error occurs, will close() stream then
* - close: once the stream ends (either finished or after "error")
*
* Pushing to a remote registry will likely require authorization, so make
* sure to pass the $registryAuth parameter, see `self::authHeaders()` for
Expand Down
27 changes: 0 additions & 27 deletions src/Io/JsonProgressException.php

This file was deleted.

13 changes: 8 additions & 5 deletions src/Io/ReadableDemultiplexStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

namespace Clue\React\Docker\Io;

use React\Stream\ReadableStreamInterface;
use Evenement\EventEmitter;
use React\Stream\WritableStreamInterface;
use React\Stream\ReadableStreamInterface;
use React\Stream\Util;
use React\Stream\WritableStreamInterface;

/**
* Parser for Docker's own frame format used for bidrectional frames
*
Expand Down Expand Up @@ -48,7 +49,7 @@ public function __construct(ReadableStreamInterface $multiplexed, $stderrEvent =

// buffer must be empty on end, otherwise this is an error situation
if ($buffer === '') {
$out->emit('end', array());
$out->emit('end');
} else {
$out->emit('error', array(new \RuntimeException('Stream ended within incomplete multiplexed chunk')));
}
Expand All @@ -62,7 +63,7 @@ public function __construct(ReadableStreamInterface $multiplexed, $stderrEvent =
});

// forward close event to output
$multiplexed->on('close', function ($error) use ($out) {
$multiplexed->on('close', function () use ($out) {
$out->close();
});
}
Expand Down Expand Up @@ -130,7 +131,9 @@ public function close()

// closing output stream closes input stream
$this->multiplexed->close();
$this->buffer = '';

$this->emit('close', array());
$this->emit('close');
$this->removeAllListeners();
}
}
129 changes: 129 additions & 0 deletions src/Io/ReadableJsonStream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
<?php

namespace Clue\React\Docker\Io;

use Clue\JsonStream\StreamingJsonParser;
use React\Stream\ReadableStreamInterface;
use React\Stream\Util;
use React\Stream\WritableStreamInterface;
use Evenement\EventEmitter;

/**
* Parser for Docker's JSON stream format used for log messages etc.
*
* @internal
*/
class ReadableJsonStream extends EventEmitter implements ReadableStreamInterface
{
private $closed = false;
private $input;
private $parser;

public function __construct(ReadableStreamInterface $input)
{
$this->input = $input;
$this->parser = $parser = new StreamingJsonParser();
if (!$input->isReadable()) {
$this->close();
return;
}

// pass all input data chunks through the parser
$input->on('data', array($this, 'handleData'));

// forward end event to output
$out = $this;
$closed =& $this->closed;
$input->on('end', function () use ($out, $parser, &$closed) {
// ignore duplicate end events
if ($closed) {
return;
}

if ($parser->isEmpty()) {
$out->emit('end');
} else {
$out->emit('error', array(new \RuntimeException('Stream ended within incomplete JSON data')));
}
$out->close();
});

// forward error event to output
$input->on('error', function ($error) use ($out) {
$out->emit('error', array($error));
$out->close();
});

// forward close event to output
$input->on('close', function () use ($out) {
$out->close();
});
}

/**
* push the given stream chunk into the parser buffer and try to extract all JSON messages
*
* @internal
* @param string $data
*/
public function handleData($data)
{
// forward each data chunk to the streaming JSON parser
try {
$objects = $this->parser->push($data);
} catch (\Exception $e) {
$this->emit('error', array($e));
$this->close();
return;
}

foreach ($objects as $object) {
// stop emitting data if stream is already closed
if ($this->closed) {
return;
}

if (isset($object['error'])) {
$this->emit('error', array(new \RuntimeException($object['error'])));
$this->close();
return;
}
$this->emit('data', array($object));
}
}

public function pause()
{
$this->input->pause();
}

public function resume()
{
$this->input->resume();
}

public function isReadable()
{
return $this->input->isReadable();
}

public function pipe(WritableStreamInterface $dest, array $options = array())
{
return Util::pipe($this, $dest, $options);
}

public function close()
{
if ($this->closed) {
return;
}

$this->closed = true;

// closing output stream closes input stream
$this->input->close();

$this->emit('close');
$this->removeAllListeners();
}
}
Loading

0 comments on commit f4a28bf

Please sign in to comment.