Skip to content

Commit

Permalink
Improve async() by making its promises cancelable
Browse files Browse the repository at this point in the history
Since `async()` returns a promise and those are normally cancelable, implementing this puts them in line with the rest of our ecosystem. As such the following example will throw a timeout exception from the canceled `sleep()` call.

```php
$promise = async(static function (): int {
    echo 'a';
    await(sleep(2));
    echo 'b';

    return time();
})();

$promise->cancel();
await($promise);
````

This builds on top of reactphp#15, reactphp#18, reactphp#19, reactphp#26, reactphp#28, reactphp#30, and reactphp#32.
  • Loading branch information
WyriHaximus committed Feb 21, 2022
1 parent 4cadacc commit d86b691
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 9 deletions.
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,29 @@ $promise->then(function (int $bytes) {
});
```

Promises returned by `async()` can be cancelled, and when done any currently and future awaited promise inside that and
any nested fibers with their awaited promises will also be cancelled. As such the following example will only output
`ab` as the `sleep()`[https://reactphp.org/promise-timer/#sleep] between `a` and `b` is cancelled throwing a timeout
exception that bubbles up through the fibers ultimately to the end user through the `await()`[#await] on the last line
of the example.

```php
$promise = async(static function (): int {
echo 'a';
await(async(static function(): void {
echo 'b';
await(sleep(2));
echo 'c';
})());
echo 'd';

return time();
})();

$promise->cancel();
await($promise);
```

### await()

The `await(PromiseInterface $promise): mixed` function can be used to
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
"react/promise": "^2.8 || ^1.2.1"
},
"require-dev": {
"phpunit/phpunit": "^9.3"
"phpunit/phpunit": "^9.3",
"react/promise-timer": "^1.8"
},
"autoload": {
"psr-4": {
Expand Down
51 changes: 51 additions & 0 deletions src/FiberMap.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?php

namespace React\Async;

use Fiber;
use React\Promise\PromiseInterface;

/**
* @internal
*/
final class FiberMap
{
private array $status = [];
private array $map = [];

public function register(Fiber $fiber): void
{
$this->status[spl_object_hash($fiber)] = false;
$this->map[spl_object_hash($fiber)] = [];
}

public function cancel(Fiber $fiber): void
{
$this->status[spl_object_hash($fiber)] = true;
}

public function isCanceled(Fiber $fiber): bool
{
return $this->status[spl_object_hash($fiber)];
}

public function attachPromise(Fiber $fiber, PromiseInterface $promise): void
{
$this->map[spl_object_hash($fiber)][spl_object_hash($promise)] = $promise;
}

public function has(Fiber $fiber): bool
{
return array_key_exists(spl_object_hash($fiber), $this->map);
}

public function getPromises(Fiber $fiber): array
{
return $this->map[spl_object_hash($fiber)];
}

public function unregister(Fiber $fiber): void
{
unset($this->status[spl_object_hash($fiber)], $this->map[spl_object_hash($fiber)]);
}
}
65 changes: 57 additions & 8 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace React\Async;

use Fiber;
use React\EventLoop\Loop;
use React\Promise\CancellablePromiseInterface;
use React\Promise\Deferred;
Expand Down Expand Up @@ -155,17 +156,40 @@
*/
function async(callable $function): callable
{
return static fn (mixed ...$args): PromiseInterface => new Promise(function (callable $resolve, callable $reject) use ($function, $args): void {
$fiber = new \Fiber(function () use ($resolve, $reject, $function, $args): void {
try {
$resolve($function(...$args));
} catch (\Throwable $exception) {
$reject($exception);
return static function (mixed ...$args) use ($function): PromiseInterface {
$fiber = null;
$promise = new Promise(function (callable $resolve, callable $reject) use ($function, $args, &$fiber): void {
$fiber = new \Fiber(function () use ($resolve, $reject, $function, $args, &$fiber): void {
try {
$resolve($function(...$args));
} catch (\Throwable $exception) {
$reject($exception);
} finally {
fiberMap()->unregister($fiber);
}
});

fiberMap()->register($fiber);

$fiber->start();
}, function () use (&$fiber): void {
if ($fiber instanceof Fiber) {
fiberMap()->cancel($fiber);
foreach (fiberMap()->getPromises($fiber) as $promise) {
if (method_exists($promise, 'cancel')) {
$promise->cancel();
}
}
}
});

$fiber->start();
});
$lowLevelFiber = \Fiber::getCurrent();
if ($lowLevelFiber !== null) {
fiberMap()->attachPromise($lowLevelFiber, $promise);
}

return $promise;
};
}


Expand Down Expand Up @@ -230,6 +254,13 @@ function await(PromiseInterface $promise): mixed
$rejected = false;
$resolvedValue = null;
$rejectedThrowable = null;
$lowLevelFiber = \Fiber::getCurrent();

if ($lowLevelFiber !== null) {
if (fiberMap()->isCanceled($lowLevelFiber) && $promise instanceof CancellablePromiseInterface) {
$promise->cancel();
}
}

$promise->then(
function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber): void {
Expand Down Expand Up @@ -285,6 +316,10 @@ function (mixed $throwable) use (&$rejected, &$rejectedThrowable, &$fiber): void
throw $rejectedThrowable;
}

if ($lowLevelFiber !== null) {
fiberMap()->attachPromise($lowLevelFiber, $promise);
}

$fiber = FiberFactory::create();

return $fiber->suspend();
Expand Down Expand Up @@ -601,3 +636,17 @@ function waterfall(array $tasks): PromiseInterface

return $deferred->promise();
}

/**
* @internal
*/
function fiberMap(): FiberMap
{
static $wm = null;

if ($wm === null) {
$wm = new FiberMap();
}

return $wm;
}
107 changes: 107 additions & 0 deletions tests/AsyncTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use function React\Promise\all;
use function React\Promise\reject;
use function React\Promise\resolve;
use function React\Promise\Timer\sleep;

class AsyncTest extends TestCase
{
Expand Down Expand Up @@ -185,4 +186,110 @@ public function testAsyncReturnsPromiseThatFulfillsWithValueWhenCallbackReturnsA
$this->assertGreaterThan(0.1, $time);
$this->assertLessThan(0.12, $time);
}

public function testCancel()
{
self::expectOutputString('a');
$this->expectException(\Exception::class);
$this->expectExceptionMessage('Timer cancelled');

$promise = async(static function (): int {
echo 'a';
await(sleep(2));
echo 'b';

return time();
})();

$promise->cancel();
await($promise);
}

public function testCancelTryCatch()
{
self::expectOutputString('ab');
// $this->expectException(\Exception::class);
// $this->expectExceptionMessage('Timer cancelled');

$promise = async(static function (): int {
echo 'a';
try {
await(sleep(2));
} catch (\Throwable) {
// No-Op
}
echo 'b';

return time();
})();

$promise->cancel();
await($promise);
}

public function testNestedCancel()
{
self::expectOutputString('abc');
$this->expectException(\Exception::class);
$this->expectExceptionMessage('Timer cancelled');

$promise = async(static function (): int {
echo 'a';
await(async(static function(): void {
echo 'b';
await(async(static function(): void {
echo 'c';
await(sleep(2));
echo 'd';
})());
echo 'e';
})());
echo 'f';

return time();
})();

$promise->cancel();
await($promise);
}

public function testCancelFiberThatCatchesExceptions()
{
self::expectOutputString('ab');
$this->expectException(\Exception::class);
$this->expectExceptionMessage('Timer cancelled');

$promise = async(static function (): int {
echo 'a';
try {
await(sleep(2));
} catch (\Throwable) {
// No-Op
}
echo 'b';
await(sleep(0.1));
echo 'c';

return time();
})();

$promise->cancel();
await($promise);
}

public function testNotAwaitedPromiseWillNotBeCanceled()
{
self::expectOutputString('acb');

async(static function (): int {
echo 'a';
sleep(0.001)->then(static function (): void {
echo 'b';
});
echo 'c';

return time();
})()->cancel();
Loop::run();
}
}

0 comments on commit d86b691

Please sign in to comment.