Skip to content

Commit

Permalink
Merge pull request #9 from clue-labs/cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
WyriHaximus authored Nov 15, 2021
2 parents 98ae760 + 36eb448 commit a112d86
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 33 deletions.
76 changes: 49 additions & 27 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace React\Async;

use React\EventLoop\Loop;
use React\Promise\CancellablePromiseInterface;
use React\Promise\Deferred;
use React\Promise\PromiseInterface;

Expand Down Expand Up @@ -96,46 +97,53 @@ function ($error) use (&$exception, &$rejected, &$wait) {
*/
function parallel(array $tasks)
{
$deferred = new Deferred();
$results = array();
$errors = array();

$done = function () use (&$results, &$errors, $deferred) {
if (count($errors)) {
$deferred->reject(array_shift($errors));
return;
$pending = array();
$deferred = new Deferred(function () use (&$pending) {
foreach ($pending as $promise) {
if ($promise instanceof CancellablePromiseInterface) {
$promise->cancel();
}
}

$deferred->resolve($results);
};
$pending = array();
});
$results = array();
$errored = false;

$numTasks = count($tasks);

if (0 === $numTasks) {
$done();
$deferred->resolve($results);
}

$checkDone = function () use (&$results, &$errors, $numTasks, $done) {
if ($numTasks === count($results) + count($errors)) {
$done();
}
};
$taskErrback = function ($error) use (&$pending, $deferred, &$errored) {
$errored = true;
$deferred->reject($error);

$taskErrback = function ($error) use (&$errors, $checkDone) {
$errors[] = $error;
$checkDone();
foreach ($pending as $promise) {
if ($promise instanceof CancellablePromiseInterface) {
$promise->cancel();
}
}
$pending = array();
};

foreach ($tasks as $i => $task) {
$taskCallback = function ($result) use (&$results, $i, $checkDone) {
$taskCallback = function ($result) use (&$results, &$pending, $numTasks, $i, $deferred) {
$results[$i] = $result;
$checkDone();

if (count($results) === $numTasks) {
$deferred->resolve($results);
}
};

$promise = call_user_func($task);
assert($promise instanceof PromiseInterface);
$pending[$i] = $promise;

$promise->then($taskCallback, $taskErrback);

if ($errored) {
break;
}
}

return $deferred->promise();
Expand All @@ -147,7 +155,13 @@ function parallel(array $tasks)
*/
function series(array $tasks)
{
$deferred = new Deferred();
$pending = null;
$deferred = new Deferred(function () use (&$pending) {
if ($pending instanceof CancellablePromiseInterface) {
$pending->cancel();
}
$pending = null;
});
$results = array();

/** @var callable():void $next */
Expand All @@ -156,7 +170,7 @@ function series(array $tasks)
$next();
};

$next = function () use (&$tasks, $taskCallback, $deferred, &$results) {
$next = function () use (&$tasks, $taskCallback, $deferred, &$results, &$pending) {
if (0 === count($tasks)) {
$deferred->resolve($results);
return;
Expand All @@ -165,6 +179,7 @@ function series(array $tasks)
$task = array_shift($tasks);
$promise = call_user_func($task);
assert($promise instanceof PromiseInterface);
$pending = $promise;

$promise->then($taskCallback, array($deferred, 'reject'));
};
Expand All @@ -180,10 +195,16 @@ function series(array $tasks)
*/
function waterfall(array $tasks)
{
$deferred = new Deferred();
$pending = null;
$deferred = new Deferred(function () use (&$pending) {
if ($pending instanceof CancellablePromiseInterface) {
$pending->cancel();
}
$pending = null;
});

/** @var callable $next */
$next = function ($value = null) use (&$tasks, &$next, $deferred) {
$next = function ($value = null) use (&$tasks, &$next, $deferred, &$pending) {
if (0 === count($tasks)) {
$deferred->resolve($value);
return;
Expand All @@ -192,6 +213,7 @@ function waterfall(array $tasks)
$task = array_shift($tasks);
$promise = call_user_func_array($task, func_get_args());
assert($promise instanceof PromiseInterface);
$pending = $promise;

$promise->then($next, array($deferred, 'reject'));
};
Expand Down
66 changes: 60 additions & 6 deletions tests/ParallelTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ function () {
},
function () {
return new Promise(function ($resolve) {
Loop::addTimer(0.1, function () use ($resolve) {
Loop::addTimer(0.11, function () use ($resolve) {
$resolve('bar');
});
});
Expand All @@ -49,7 +49,7 @@ function () {
$timer->assertInRange(0.1, 0.2);
}

public function testParallelWithError()
public function testParallelWithErrorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
{
$called = 0;

Expand All @@ -60,7 +60,8 @@ function () use (&$called) {
$resolve('foo');
});
},
function () {
function () use (&$called) {
$called++;
return new Promise(function () {
throw new \RuntimeException('whoops');
});
Expand All @@ -80,7 +81,59 @@ function () use (&$called) {
$this->assertSame(2, $called);
}

public function testParallelWithDelayedError()
public function testParallelWithErrorWillCancelPendingPromises()
{
$cancelled = 0;

$tasks = array(
function () use (&$cancelled) {
return new Promise(function () { }, function () use (&$cancelled) {
$cancelled++;
});
},
function () {
return new Promise(function () {
throw new \RuntimeException('whoops');
});
},
function () use (&$cancelled) {
return new Promise(function () { }, function () use (&$cancelled) {
$cancelled++;
});
}
);

$promise = React\Async\parallel($tasks);

$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('whoops')));

$this->assertSame(1, $cancelled);
}

public function testParallelWillCancelPendingPromisesWhenCallingCancelOnResultingPromise()
{
$cancelled = 0;

$tasks = array(
function () use (&$cancelled) {
return new Promise(function () { }, function () use (&$cancelled) {
$cancelled++;
});
},
function () use (&$cancelled) {
return new Promise(function () { }, function () use (&$cancelled) {
$cancelled++;
});
}
);

$promise = React\Async\parallel($tasks);
$promise->cancel();

$this->assertSame(2, $cancelled);
}

public function testParallelWithDelayedErrorReturnsPromiseRejectedWithExceptionFromTask()
{
$called = 0;

Expand All @@ -91,7 +144,8 @@ function () use (&$called) {
$resolve('foo');
});
},
function () {
function () use (&$called) {
$called++;
return new Promise(function ($_, $reject) {
Loop::addTimer(0.001, function () use ($reject) {
$reject(new \RuntimeException('whoops'));
Expand All @@ -112,6 +166,6 @@ function () use (&$called) {

Loop::run();

$this->assertSame(2, $called);
$this->assertSame(3, $called);
}
}
23 changes: 23 additions & 0 deletions tests/SeriesTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,27 @@ function () use (&$called) {

$this->assertSame(1, $called);
}

public function testSeriesWillCancelFirstPendingPromiseWhenCallingCancelOnResultingPromise()
{
$cancelled = 0;

$tasks = array(
function () {
return new Promise(function ($resolve) {
$resolve();
});
},
function () use (&$cancelled) {
return new Promise(function () { }, function () use (&$cancelled) {
$cancelled++;
});
}
);

$promise = React\Async\series($tasks);
$promise->cancel();

$this->assertSame(1, $cancelled);
}
}
23 changes: 23 additions & 0 deletions tests/WaterfallTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,27 @@ function () use (&$called) {

$this->assertSame(1, $called);
}

public function testWaterfallWillCancelFirstPendingPromiseWhenCallingCancelOnResultingPromise()
{
$cancelled = 0;

$tasks = array(
function () {
return new Promise(function ($resolve) {
$resolve();
});
},
function () use (&$cancelled) {
return new Promise(function () { }, function () use (&$cancelled) {
$cancelled++;
});
}
);

$promise = React\Async\waterfall($tasks);
$promise->cancel();

$this->assertSame(1, $cancelled);
}
}

0 comments on commit a112d86

Please sign in to comment.