Skip to content

Commit

Permalink
Improve async() by making its promises cancelable
Browse files Browse the repository at this point in the history
Since `async()` returns a promise and those are normally cancelable, implementing this puts them in line with the rest of our ecosystem. As such the following example will throw a timeout exception from the canceled `sleep()` call.

```php
$promise = async(static function (): int {
    echo 'a';
    await(sleep(2));
    echo 'b';

    return time();
})();

$promise->cancel();
await($promise);
````

This builds on top of reactphp#15, reactphp#18, reactphp#19, reactphp#26, reactphp#28, reactphp#30, and reactphp#32.
  • Loading branch information
WyriHaximus committed Mar 4, 2022
1 parent 4cadacc commit 262ef59
Show file tree
Hide file tree
Showing 6 changed files with 293 additions and 11 deletions.
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,29 @@ $promise->then(function (int $bytes) {
});
```

Promises returned by `async()` can be cancelled, and when done any currently and future awaited promise inside that and
any nested fibers with their awaited promises will also be cancelled. As such the following example will only output
`ab` as the [`sleep()`](https://reactphp.org/promise-timer/#sleep) between `a` and `b` is cancelled throwing a timeout
exception that bubbles up through the fibers ultimately to the end user through the [`await()`](#await) on the last line
of the example.

```php
$promise = async(static function (): int {
echo 'a';
await(async(static function(): void {
echo 'b';
await(sleep(2));
echo 'c';
})());
echo 'd';

return time();
})();

$promise->cancel();
await($promise);
```

### await()

The `await(PromiseInterface $promise): mixed` function can be used to
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
"react/promise": "^2.8 || ^1.2.1"
},
"require-dev": {
"phpunit/phpunit": "^9.3"
"phpunit/phpunit": "^9.3",
"react/promise-timer": "^1.8"
},
"autoload": {
"psr-4": {
Expand Down
55 changes: 55 additions & 0 deletions src/FiberMap.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
<?php

namespace React\Async;

use React\Promise\PromiseInterface;

/**
* @internal
*/
final class FiberMap
{
private static array $status = [];
private static array $map = [];

public static function register(\Fiber $fiber): void
{
self::$status[\spl_object_id($fiber)] = false;
self::$map[\spl_object_id($fiber)] = [];
}

public static function cancel(\Fiber $fiber): void
{
self::$status[\spl_object_id($fiber)] = true;
}

public static function isCancelled(\Fiber $fiber): bool
{
return self::$status[\spl_object_id($fiber)];
}

public static function setPromise(\Fiber $fiber, PromiseInterface $promise): void
{
self::$map[\spl_object_id($fiber)] = $promise;
}

public static function unsetPromise(\Fiber $fiber, PromiseInterface $promise): void
{
unset(self::$map[\spl_object_id($fiber)]);
}

public static function has(\Fiber $fiber): bool
{
return array_key_exists(\spl_object_id($fiber), self::$map);
}

public static function getPromise(\Fiber $fiber): ?PromiseInterface
{
return self::$map[\spl_object_id($fiber)] ?? null;
}

public static function unregister(\Fiber $fiber): void
{
unset(self::$status[\spl_object_id($fiber)], self::$map[\spl_object_id($fiber)]);
}
}
82 changes: 72 additions & 10 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -148,24 +148,69 @@
* });
* ```
*
* Promises returned by `async()` can be cancelled, and when done any currently
* and future awaited promise inside that and any nested fibers with their
* awaited promises will also be cancelled. As such the following example will
* only output `ab` as the [`sleep()`](https://reactphp.org/promise-timer/#sleep)
* between `a` and `b` is cancelled throwing a timeout exception that bubbles up
* through the fibers ultimately to the end user through the [`await()`](#await)
* on the last line of the example.
*
* ```php
* $promise = async(static function (): int {
* echo 'a';
* await(async(static function(): void {
* echo 'b';
* await(sleep(2));
* echo 'c';
* })());
* echo 'd';
*
* return time();
* })();
*
* $promise->cancel();
* await($promise);
* ```
*
* @param callable(mixed ...$args):mixed $function
* @return callable(): PromiseInterface<mixed>
* @since 4.0.0
* @see coroutine()
*/
function async(callable $function): callable
{
return static fn (mixed ...$args): PromiseInterface => new Promise(function (callable $resolve, callable $reject) use ($function, $args): void {
$fiber = new \Fiber(function () use ($resolve, $reject, $function, $args): void {
try {
$resolve($function(...$args));
} catch (\Throwable $exception) {
$reject($exception);
return static function (mixed ...$args) use ($function): PromiseInterface {
$fiber = null;
$promise = new Promise(function (callable $resolve, callable $reject) use ($function, $args, &$fiber): void {
$fiber = new \Fiber(function () use ($resolve, $reject, $function, $args, &$fiber): void {
try {
$resolve($function(...$args));
} catch (\Throwable $exception) {
$reject($exception);
} finally {
FiberMap::unregister($fiber);
}
});

FiberMap::register($fiber);

$fiber->start();
}, function () use (&$fiber): void {
FiberMap::cancel($fiber);
$promise = FiberMap::getPromise($fiber);
if ($promise instanceof CancellablePromiseInterface) {
$promise->cancel();
}
});

$fiber->start();
});
$lowLevelFiber = \Fiber::getCurrent();
if ($lowLevelFiber !== null) {
FiberMap::setPromise($lowLevelFiber, $promise);
}

return $promise;
};
}


Expand Down Expand Up @@ -230,9 +275,18 @@ function await(PromiseInterface $promise): mixed
$rejected = false;
$resolvedValue = null;
$rejectedThrowable = null;
$lowLevelFiber = \Fiber::getCurrent();

if ($lowLevelFiber !== null && FiberMap::isCancelled($lowLevelFiber) && $promise instanceof CancellablePromiseInterface) {
$promise->cancel();
}

$promise->then(
function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber): void {
function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber, $lowLevelFiber, $promise): void {
if ($lowLevelFiber !== null) {
FiberMap::unsetPromise($lowLevelFiber, $promise);
}

if ($fiber === null) {
$resolved = true;
$resolvedValue = $value;
Expand All @@ -241,7 +295,11 @@ function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber): void {

$fiber->resume($value);
},
function (mixed $throwable) use (&$rejected, &$rejectedThrowable, &$fiber): void {
function (mixed $throwable) use (&$rejected, &$rejectedThrowable, &$fiber, $lowLevelFiber, $promise): void {
if ($lowLevelFiber !== null) {
FiberMap::unsetPromise($lowLevelFiber, $promise);
}

if (!$throwable instanceof \Throwable) {
$throwable = new \UnexpectedValueException(
'Promise rejected with unexpected value of type ' . (is_object($throwable) ? get_class($throwable) : gettype($throwable))
Expand Down Expand Up @@ -285,6 +343,10 @@ function (mixed $throwable) use (&$rejected, &$rejectedThrowable, &$fiber): void
throw $rejectedThrowable;
}

if ($lowLevelFiber !== null) {
FiberMap::setPromise($lowLevelFiber, $promise);
}

$fiber = FiberFactory::create();

return $fiber->suspend();
Expand Down
105 changes: 105 additions & 0 deletions tests/AsyncTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use function React\Promise\all;
use function React\Promise\reject;
use function React\Promise\resolve;
use function React\Promise\Timer\sleep;

class AsyncTest extends TestCase
{
Expand Down Expand Up @@ -185,4 +186,108 @@ public function testAsyncReturnsPromiseThatFulfillsWithValueWhenCallbackReturnsA
$this->assertGreaterThan(0.1, $time);
$this->assertLessThan(0.12, $time);
}

public function testCancel()
{
self::expectOutputString('a');
$this->expectException(\Exception::class);
$this->expectExceptionMessage('Timer cancelled');

$promise = async(static function (): int {
echo 'a';
await(sleep(2));
echo 'b';

return time();
})();

$promise->cancel();
await($promise);
}

public function testCancelTryCatch()
{
self::expectOutputString('ab');

$promise = async(static function (): int {
echo 'a';
try {
await(sleep(2));
} catch (\Throwable) {
// No-Op
}
echo 'b';

return time();
})();

$promise->cancel();
await($promise);
}

public function testNestedCancel()
{
self::expectOutputString('abc');
$this->expectException(\Exception::class);
$this->expectExceptionMessage('Timer cancelled');

$promise = async(static function (): int {
echo 'a';
await(async(static function(): void {
echo 'b';
await(async(static function(): void {
echo 'c';
await(sleep(2));
echo 'd';
})());
echo 'e';
})());
echo 'f';

return time();
})();

$promise->cancel();
await($promise);
}

public function testCancelFiberThatCatchesExceptions()
{
self::expectOutputString('ab');
$this->expectException(\Exception::class);
$this->expectExceptionMessage('Timer cancelled');

$promise = async(static function (): int {
echo 'a';
try {
await(sleep(2));
} catch (\Throwable) {
// No-Op
}
echo 'b';
await(sleep(0.1));
echo 'c';

return time();
})();

$promise->cancel();
await($promise);
}

public function testNotAwaitedPromiseWillNotBeCanceled()
{
self::expectOutputString('acb');

async(static function (): int {
echo 'a';
sleep(0.001)->then(static function (): void {
echo 'b';
});
echo 'c';

return time();
})()->cancel();
Loop::run();
}
}
36 changes: 36 additions & 0 deletions tests/AwaitTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,42 @@ public function testNestedAwaits(callable $await)
})));
}

/**
* @dataProvider provideAwaiters
*/
public function testResolvedPromisesShouldBeDetached(callable $await)
{
$await(async(function () use ($await): int {
$fiber = \Fiber::getCurrent();
$await(React\Promise\Timer\sleep(0.01));
$this->assertNull(React\Async\FiberMap::getPromise($fiber));

return time();
})());
}

/**
* @dataProvider provideAwaiters
*/
public function testRejectedPromisesShouldBeDetached(callable $await)
{
$this->expectException(\Exception::class);
$this->expectExceptionMessage('Boom!');

$await(async(function () use ($await): int {
$fiber = \Fiber::getCurrent();
try {
$await(React\Promise\reject(new \Exception('Boom!')));
} catch (\Throwable $throwable) {
throw $throwable;
} finally {
$this->assertNull(React\Async\FiberMap::getPromise($fiber));
}

return time();
})());
}

public function provideAwaiters(): iterable
{
yield 'await' => [static fn (React\Promise\PromiseInterface $promise): mixed => React\Async\await($promise)];
Expand Down

0 comments on commit 262ef59

Please sign in to comment.