Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Fiber-based async and await functions #15

Merged
merged 3 commits into from
Nov 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,8 @@ $result = React\Async\await($promise);
```

This function will only return after the given `$promise` has settled, i.e.
either fulfilled or rejected.

While the promise is pending, this function will assume control over the event
loop. Internally, it will `run()` the [default loop](https://github.com/reactphp/event-loop#loop)
until the promise settles and then calls `stop()` to terminate execution of the
loop. This means this function is more suited for short-lived promise executions
when using promise-based APIs is not feasible. For long-running applications,
using promise-based APIs by leveraging chained `then()` calls is usually preferable.
either fulfilled or rejected. While the promise is pending, this function will
suspend the fiber it's called from until the promise is settled.

Once the promise is fulfilled, this function will return whatever the promise
resolved to.
Expand Down
3 changes: 3 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
"phpunit/phpunit": "^9.3"
},
"autoload": {
"psr-4": {
"React\\Async\\": "src/"
},
"files": [
"src/functions_include.php"
]
Expand Down
33 changes: 33 additions & 0 deletions src/FiberFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php

namespace React\Async;

/**
* This factory its only purpose is interoperability. Where with
* event loops one could simply wrap another event loop. But with fibers
* that has become impossible and as such we provide this factory and the
* FiberInterface.
*
* Usage is not documented and as such not supported and might chang without
* notice. Use at your own risk.
*
* @internal
*/
final class FiberFactory
{
private static ?\Closure $factory = null;

public static function create(): FiberInterface
{
return (self::factory())();
}

public static function factory(\Closure $factory = null): \Closure
{
if ($factory !== null) {
self::$factory = $factory;
}

return self::$factory ?? static fn (): FiberInterface => new SimpleFiber();
}
}
23 changes: 23 additions & 0 deletions src/FiberInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

namespace React\Async;

/**
* This interface its only purpose is interoperability. Where with
* event loops one could simply wrap another event loop. But with fibers
* that has become impossible and as such we provide this interface and the
* FiberFactory.
*
* Usage is not documented and as such not supported and might chang without
* notice. Use at your own risk.
*
* @internal
*/
interface FiberInterface
{
public function resume(mixed $value): void;

public function throw(mixed $throwable): void;

public function suspend(): mixed;
}
64 changes: 64 additions & 0 deletions src/SimpleFiber.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<?php

namespace React\Async;

use React\EventLoop\Loop;

/**
* @internal
*/
final class SimpleFiber implements FiberInterface
{
private static ?\Fiber $scheduler = null;
private ?\Fiber $fiber = null;

public function __construct()
{
$this->fiber = \Fiber::getCurrent();
}

public function resume(mixed $value): void
{
if ($this->fiber === null) {
Loop::futureTick(static fn() => \Fiber::suspend(static fn() => $value));
return;
}

Loop::futureTick(fn() => $this->fiber->resume($value));
}

public function throw(mixed $throwable): void
{
if (!$throwable instanceof \Throwable) {
$throwable = new \UnexpectedValueException(
'Promise rejected with unexpected value of type ' . (is_object($throwable) ? get_class($throwable) : gettype($throwable))
);
}

if ($this->fiber === null) {
Loop::futureTick(static fn() => \Fiber::suspend(static fn() => throw $throwable));
WyriHaximus marked this conversation as resolved.
Show resolved Hide resolved
return;
}

Loop::futureTick(fn() => $this->fiber->throw($throwable));
}

public function suspend(): mixed
{
if ($this->fiber === null) {
if (self::$scheduler === null || self::$scheduler->isTerminated()) {
self::$scheduler = new \Fiber(static fn() => Loop::run());
// Run event loop to completion on shutdown.
\register_shutdown_function(static function (): void {
if (self::$scheduler->isSuspended()) {
self::$scheduler->resume();
}
});
}

return (self::$scheduler->isStarted() ? self::$scheduler->resume() : self::$scheduler->start())();
}

return \Fiber::suspend();
}
}
66 changes: 32 additions & 34 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,36 @@
use React\EventLoop\Loop;
use React\Promise\CancellablePromiseInterface;
use React\Promise\Deferred;
use React\Promise\Promise;
use React\Promise\PromiseInterface;
use function React\Promise\reject;
use function React\Promise\resolve;

/**
* Execute an async Fiber-based function to "await" promises.
*
* @param callable(mixed ...$args):mixed $function
* @param mixed ...$args Optional list of additional arguments that will be passed to the given `$function` as is
* @return PromiseInterface<mixed>
* @since 4.0.0
* @see coroutine()
*/
function async(callable $function, mixed ...$args): PromiseInterface
{
return new Promise(function (callable $resolve, callable $reject) use ($function, $args): void {
$fiber = new \Fiber(function () use ($resolve, $reject, $function, $args): void {
try {
$resolve($function(...$args));
} catch (\Throwable $exception) {
$reject($exception);
}
});

Loop::futureTick(static fn() => $fiber->start());
});
}


/**
* Block waiting for the given `$promise` to be fulfilled.
*
Expand Down Expand Up @@ -52,48 +78,20 @@
*/
function await(PromiseInterface $promise): mixed
{
$wait = true;
$resolved = null;
$exception = null;
$rejected = false;
$fiber = FiberFactory::create();

$promise->then(
function ($c) use (&$resolved, &$wait) {
$resolved = $c;
$wait = false;
Loop::stop();
function (mixed $value) use (&$resolved, $fiber): void {
$fiber->resume($value);
},
function ($error) use (&$exception, &$rejected, &$wait) {
$exception = $error;
$rejected = true;
$wait = false;
Loop::stop();
function (mixed $throwable) use (&$resolved, $fiber): void {
$fiber->throw($throwable);
}
);

// Explicitly overwrite argument with null value. This ensure that this
// argument does not show up in the stack trace in PHP 7+ only.
$promise = null;

while ($wait) {
Loop::run();
}

if ($rejected) {
// promise is rejected with an unexpected value (Promise API v1 or v2 only)
if (!$exception instanceof \Throwable) {
$exception = new \UnexpectedValueException(
'Promise rejected with unexpected value of type ' . (is_object($exception) ? get_class($exception) : gettype($exception))
);
}

throw $exception;
}

return $resolved;
return $fiber->suspend();
}


/**
* Execute a Generator-based coroutine to "await" promises.
*
Expand Down
87 changes: 87 additions & 0 deletions tests/AsyncTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
<?php

namespace React\Tests\Async;

use React;
use React\EventLoop\Loop;
use React\Promise\Promise;
use function React\Async\async;
use function React\Async\await;
use function React\Promise\all;

class AsyncTest extends TestCase
{
public function testAsyncReturnsPendingPromise()
{
$promise = async(function () {
return 42;
});

$promise->then($this->expectCallableNever(), $this->expectCallableNever());
}

public function testAsyncReturnsPromiseThatFulfillsWithValueWhenCallbackReturns()
{
$promise = async(function () {
return 42;
});

$value = await($promise);

$this->assertEquals(42, $value);
}

public function testAsyncReturnsPromiseThatRejectsWithExceptionWhenCallbackThrows()
{
$promise = async(function () {
throw new \RuntimeException('Foo', 42);
});

$this->expectException(\RuntimeException::class);
$this->expectExceptionMessage('Foo');
$this->expectExceptionCode(42);
await($promise);
}

public function testAsyncReturnsPromiseThatFulfillsWithValueWhenCallbackReturnsAfterAwaitingPromise()
{
$promise = async(function () {
$promise = new Promise(function ($resolve) {
Loop::addTimer(0.001, fn () => $resolve(42));
});

return await($promise);
});

$value = await($promise);

$this->assertEquals(42, $value);
}

public function testAsyncReturnsPromiseThatFulfillsWithValueWhenCallbackReturnsAfterAwaitingTwoConcurrentPromises()
{
$promise1 = async(function () {
$promise = new Promise(function ($resolve) {
Loop::addTimer(0.11, fn () => $resolve(21));
});

return await($promise);
});

$promise2 = async(function () {
$promise = new Promise(function ($resolve) {
Loop::addTimer(0.11, fn () => $resolve(42));
});

return await($promise);
});

$time = microtime(true);
$values = await(all([$promise1, $promise2]));
$time = microtime(true) - $time;

$this->assertEquals([21, 42], $values);
$this->assertGreaterThan(0.1, $time);
$this->assertLessThan(0.12, $time);
}
}
Loading