From d86b691c41a69ad357b00546110b210db66b8b7c Mon Sep 17 00:00:00 2001 From: Cees-Jan Kiewiet Date: Mon, 20 Dec 2021 23:31:35 +0100 Subject: [PATCH] Improve `async()` by making its promises cancelable Since `async()` returns a promise and those are normally cancelable, implementing this puts them in line with the rest of our ecosystem. As such the following example will throw a timeout exception from the canceled `sleep()` call. ```php $promise = async(static function (): int { echo 'a'; await(sleep(2)); echo 'b'; return time(); })(); $promise->cancel(); await($promise); ```` This builds on top of #15, #18, #19, #26, #28, #30, and #32. --- README.md | 23 ++++++++++ composer.json | 3 +- src/FiberMap.php | 51 +++++++++++++++++++++ src/functions.php | 65 +++++++++++++++++++++++---- tests/AsyncTest.php | 107 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 240 insertions(+), 9 deletions(-) create mode 100644 src/FiberMap.php diff --git a/README.md b/README.md index 838b63c..040aa07 100644 --- a/README.md +++ b/README.md @@ -204,6 +204,29 @@ $promise->then(function (int $bytes) { }); ``` +Promises returned by `async()` can be cancelled, and when done any currently and future awaited promise inside that and +any nested fibers with their awaited promises will also be cancelled. As such the following example will only output +`ab` as the `sleep()`[https://reactphp.org/promise-timer/#sleep] between `a` and `b` is cancelled throwing a timeout +exception that bubbles up through the fibers ultimately to the end user through the `await()`[#await] on the last line +of the example. + +```php +$promise = async(static function (): int { + echo 'a'; + await(async(static function(): void { + echo 'b'; + await(sleep(2)); + echo 'c'; + })()); + echo 'd'; + + return time(); +})(); + +$promise->cancel(); +await($promise); +``` + ### await() The `await(PromiseInterface $promise): mixed` function can be used to diff --git a/composer.json b/composer.json index d749726..5d93ed4 100644 --- a/composer.json +++ b/composer.json @@ -31,7 +31,8 @@ "react/promise": "^2.8 || ^1.2.1" }, "require-dev": { - "phpunit/phpunit": "^9.3" + "phpunit/phpunit": "^9.3", + "react/promise-timer": "^1.8" }, "autoload": { "psr-4": { diff --git a/src/FiberMap.php b/src/FiberMap.php new file mode 100644 index 0000000..a8689ec --- /dev/null +++ b/src/FiberMap.php @@ -0,0 +1,51 @@ +status[spl_object_hash($fiber)] = false; + $this->map[spl_object_hash($fiber)] = []; + } + + public function cancel(Fiber $fiber): void + { + $this->status[spl_object_hash($fiber)] = true; + } + + public function isCanceled(Fiber $fiber): bool + { + return $this->status[spl_object_hash($fiber)]; + } + + public function attachPromise(Fiber $fiber, PromiseInterface $promise): void + { + $this->map[spl_object_hash($fiber)][spl_object_hash($promise)] = $promise; + } + + public function has(Fiber $fiber): bool + { + return array_key_exists(spl_object_hash($fiber), $this->map); + } + + public function getPromises(Fiber $fiber): array + { + return $this->map[spl_object_hash($fiber)]; + } + + public function unregister(Fiber $fiber): void + { + unset($this->status[spl_object_hash($fiber)], $this->map[spl_object_hash($fiber)]); + } +} diff --git a/src/functions.php b/src/functions.php index e016853..559dd8b 100644 --- a/src/functions.php +++ b/src/functions.php @@ -2,6 +2,7 @@ namespace React\Async; +use Fiber; use React\EventLoop\Loop; use React\Promise\CancellablePromiseInterface; use React\Promise\Deferred; @@ -155,17 +156,40 @@ */ function async(callable $function): callable { - return static fn (mixed ...$args): PromiseInterface => new Promise(function (callable $resolve, callable $reject) use ($function, $args): void { - $fiber = new \Fiber(function () use ($resolve, $reject, $function, $args): void { - try { - $resolve($function(...$args)); - } catch (\Throwable $exception) { - $reject($exception); + return static function (mixed ...$args) use ($function): PromiseInterface { + $fiber = null; + $promise = new Promise(function (callable $resolve, callable $reject) use ($function, $args, &$fiber): void { + $fiber = new \Fiber(function () use ($resolve, $reject, $function, $args, &$fiber): void { + try { + $resolve($function(...$args)); + } catch (\Throwable $exception) { + $reject($exception); + } finally { + fiberMap()->unregister($fiber); + } + }); + + fiberMap()->register($fiber); + + $fiber->start(); + }, function () use (&$fiber): void { + if ($fiber instanceof Fiber) { + fiberMap()->cancel($fiber); + foreach (fiberMap()->getPromises($fiber) as $promise) { + if (method_exists($promise, 'cancel')) { + $promise->cancel(); + } + } } }); - $fiber->start(); - }); + $lowLevelFiber = \Fiber::getCurrent(); + if ($lowLevelFiber !== null) { + fiberMap()->attachPromise($lowLevelFiber, $promise); + } + + return $promise; + }; } @@ -230,6 +254,13 @@ function await(PromiseInterface $promise): mixed $rejected = false; $resolvedValue = null; $rejectedThrowable = null; + $lowLevelFiber = \Fiber::getCurrent(); + + if ($lowLevelFiber !== null) { + if (fiberMap()->isCanceled($lowLevelFiber) && $promise instanceof CancellablePromiseInterface) { + $promise->cancel(); + } + } $promise->then( function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber): void { @@ -285,6 +316,10 @@ function (mixed $throwable) use (&$rejected, &$rejectedThrowable, &$fiber): void throw $rejectedThrowable; } + if ($lowLevelFiber !== null) { + fiberMap()->attachPromise($lowLevelFiber, $promise); + } + $fiber = FiberFactory::create(); return $fiber->suspend(); @@ -601,3 +636,17 @@ function waterfall(array $tasks): PromiseInterface return $deferred->promise(); } + +/** + * @internal + */ +function fiberMap(): FiberMap +{ + static $wm = null; + + if ($wm === null) { + $wm = new FiberMap(); + } + + return $wm; +} diff --git a/tests/AsyncTest.php b/tests/AsyncTest.php index a4287fd..1d66f86 100644 --- a/tests/AsyncTest.php +++ b/tests/AsyncTest.php @@ -11,6 +11,7 @@ use function React\Promise\all; use function React\Promise\reject; use function React\Promise\resolve; +use function React\Promise\Timer\sleep; class AsyncTest extends TestCase { @@ -185,4 +186,110 @@ public function testAsyncReturnsPromiseThatFulfillsWithValueWhenCallbackReturnsA $this->assertGreaterThan(0.1, $time); $this->assertLessThan(0.12, $time); } + + public function testCancel() + { + self::expectOutputString('a'); + $this->expectException(\Exception::class); + $this->expectExceptionMessage('Timer cancelled'); + + $promise = async(static function (): int { + echo 'a'; + await(sleep(2)); + echo 'b'; + + return time(); + })(); + + $promise->cancel(); + await($promise); + } + + public function testCancelTryCatch() + { + self::expectOutputString('ab'); +// $this->expectException(\Exception::class); +// $this->expectExceptionMessage('Timer cancelled'); + + $promise = async(static function (): int { + echo 'a'; + try { + await(sleep(2)); + } catch (\Throwable) { + // No-Op + } + echo 'b'; + + return time(); + })(); + + $promise->cancel(); + await($promise); + } + + public function testNestedCancel() + { + self::expectOutputString('abc'); + $this->expectException(\Exception::class); + $this->expectExceptionMessage('Timer cancelled'); + + $promise = async(static function (): int { + echo 'a'; + await(async(static function(): void { + echo 'b'; + await(async(static function(): void { + echo 'c'; + await(sleep(2)); + echo 'd'; + })()); + echo 'e'; + })()); + echo 'f'; + + return time(); + })(); + + $promise->cancel(); + await($promise); + } + + public function testCancelFiberThatCatchesExceptions() + { + self::expectOutputString('ab'); + $this->expectException(\Exception::class); + $this->expectExceptionMessage('Timer cancelled'); + + $promise = async(static function (): int { + echo 'a'; + try { + await(sleep(2)); + } catch (\Throwable) { + // No-Op + } + echo 'b'; + await(sleep(0.1)); + echo 'c'; + + return time(); + })(); + + $promise->cancel(); + await($promise); + } + + public function testNotAwaitedPromiseWillNotBeCanceled() + { + self::expectOutputString('acb'); + + async(static function (): int { + echo 'a'; + sleep(0.001)->then(static function (): void { + echo 'b'; + }); + echo 'c'; + + return time(); + })()->cancel(); + Loop::run(); + } }