Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add any() helper to await first successful fulfillment of operations #18

Merged
merged 1 commit into from
Dec 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ much any API that already uses Promises.
* [Cancellation](#cancellation)
* [Timeout](#timeout)
* [all()](#all)
* [any()](#any)
* [Blocking](#blocking)
* [Install](#install)
* [Tests](#tests)
Expand Down Expand Up @@ -332,6 +333,80 @@ $promise = Queue::all(10, $jobs, array($browser, 'get'));
> Keep in mind that returning an array of response messages means that
the whole response body has to be kept in memory.

#### any()

The static `any(int $concurrency, array $jobs, callable $handler): PromiseInterface<mixed>` method can be used to
concurrently process given jobs through the given `$handler` and resolve
with first resolution value.

This is a convenience method which uses the `Queue` internally to
schedule all jobs while limiting concurrency to ensure no more than
`$concurrency` jobs ever run at once. It will return a promise which
resolves with the result of the first job on success and will then try
to `cancel()` all outstanding jobs.

```php
$loop = React\EventLoop\Factory::create();
$browser = new Clue\React\Buzz\Browser($loop);

$promise = Queue::any(3, $urls, function ($url) use ($browser) {
return $browser->get($url);
});

$promise->then(function (ResponseInterface $response) {
echo 'First response: ' . $response->getBody() . PHP_EOL;
});
```

If all of the jobs fail, it will reject the resulting promise. Similarly,
calling `cancel()` on the resulting promise will try to cancel all
outstanding jobs. See [promises](#promises) and
[cancellation](#cancellation) for details.

The `$concurrency` parameter sets a new soft limit for the maximum number
of jobs to handle concurrently. Finding a good concurrency limit depends
on your particular use case. It's common to limit concurrency to a rather
small value, as doing more than a dozen of things at once may easily
overwhelm the receiving side. Using a `1` value will ensure that all jobs
are processed one after another, effectively creating a "waterfall" of
jobs. Using a value less than 1 will reject with an
`InvalidArgumentException` without processing any jobs.

```php
// handle up to 10 jobs concurrently
$promise = Queue::any(10, $jobs, $handler);
```

```php
// handle each job after another without concurrency (waterfall)
$promise = Queue::any(1, $jobs, $handler);
```

The `$jobs` parameter must be an array with all jobs to process. Each
value in this array will be passed to the `$handler` to start one job.
The array keys have no effect, the promise will simply resolve with the
job results of the first successful job as returned by the `$handler`.
If this array is empty, this method will reject without processing any
jobs.

The `$handler` parameter must be a valid callable that accepts your job
parameters, invokes the appropriate operation and returns a Promise as a
placeholder for its future result. If the given argument is not a valid
callable, this method will reject with an `InvalidArgumentExceptionn`
without processing any jobs.

```php
// using a Closure as handler is usually recommended
$promise = Queue::any(10, $jobs, function ($url) use ($browser) {
return $browser->get($url);
});
```

```php
// accepts any callable, so PHP's array notation is also supported
$promise = Queue::any(10, $jobs, array($browser, 'get'));
```

#### Blocking

As stated above, this library provides you a powerful, async API by default.
Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
},
"require-dev": {
"clue/block-react": "^1.0",
"clue/buzz-react": "^2.0",
"clue/buzz-react": "^2.4",
"phpunit/phpunit": "^7.0 || ^6.4 || ^5.7 || ^4.8.35",
"react/event-loop": "^1.0 || ^0.5 || ^0.4 || ^0.3"
}
Expand Down
44 changes: 44 additions & 0 deletions examples/03-http-any.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?php

use Clue\React\Buzz\Browser;
use Clue\React\Mq\Queue;
use Psr\Http\Message\ResponseInterface;
use React\EventLoop\Factory;

require __DIR__ . '/../vendor/autoload.php';

// list of all URLs you want to try
// this list may potentially contain hundreds or thousands of entries
$urls = array(
'http://www.github.com/invalid',
'http://www.yahoo.com/invalid',
'http://www.bing.com/invalid',
'http://www.bing.com/',
'http://www.google.com/',
'http://www.google.com/invalid',
);

$loop = Factory::create();
$browser = new Browser($loop);

// each job should use the browser to GET a certain URL
// limit number of concurrent jobs here to avoid using excessive network resources
$promise = Queue::any(2, $urls, function ($url) use ($browser) {
return $browser->get($url)->then(
function (ResponseInterface $response) use ($url) {
// return only the URL for the first successful response
return $url;
}
);
});

$promise->then(
function ($url) {
echo 'First successful URL is ' . $url . PHP_EOL;
},
function ($e) {
echo 'An error occured: ' . $e->getMessage() . PHP_EOL;
}
);

$loop->run();
118 changes: 118 additions & 0 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,124 @@ public static function all($concurrency, array $jobs, $handler)
});
}

/**
* Concurrently process given jobs through the given `$handler` and resolve
* with first resolution value.
*
* This is a convenience method which uses the `Queue` internally to
* schedule all jobs while limiting concurrency to ensure no more than
* `$concurrency` jobs ever run at once. It will return a promise which
* resolves with the result of the first job on success and will then try
* to `cancel()` all outstanding jobs.
*
* ```php
* $loop = React\EventLoop\Factory::create();
* $browser = new Clue\React\Buzz\Browser($loop);
*
* $promise = Queue::any(3, $urls, function ($url) use ($browser) {
* return $browser->get($url);
* });
*
* $promise->then(function (ResponseInterface $response) {
* echo 'First response: ' . $response->getBody() . PHP_EOL;
* });
* ```
*
* If all of the jobs fail, it will reject the resulting promise. Similarly,
* calling `cancel()` on the resulting promise will try to cancel all
* outstanding jobs. See [promises](#promises) and
* [cancellation](#cancellation) for details.
*
* The `$concurrency` parameter sets a new soft limit for the maximum number
* of jobs to handle concurrently. Finding a good concurrency limit depends
* on your particular use case. It's common to limit concurrency to a rather
* small value, as doing more than a dozen of things at once may easily
* overwhelm the receiving side. Using a `1` value will ensure that all jobs
* are processed one after another, effectively creating a "waterfall" of
* jobs. Using a value less than 1 will reject with an
* `InvalidArgumentException` without processing any jobs.
*
* ```php
* // handle up to 10 jobs concurrently
* $promise = Queue::any(10, $jobs, $handler);
* ```
*
* ```php
* // handle each job after another without concurrency (waterfall)
* $promise = Queue::any(1, $jobs, $handler);
* ```
*
* The `$jobs` parameter must be an array with all jobs to process. Each
* value in this array will be passed to the `$handler` to start one job.
* The array keys have no effect, the promise will simply resolve with the
* job results of the first successful job as returned by the `$handler`.
* If this array is empty, this method will reject without processing any
* jobs.
*
* The `$handler` parameter must be a valid callable that accepts your job
* parameters, invokes the appropriate operation and returns a Promise as a
* placeholder for its future result. If the given argument is not a valid
* callable, this method will reject with an `InvalidArgumentExceptionn`
* without processing any jobs.
*
* ```php
* // using a Closure as handler is usually recommended
* $promise = Queue::any(10, $jobs, function ($url) use ($browser) {
* return $browser->get($url);
* });
* ```
*
* ```php
* // accepts any callable, so PHP's array notation is also supported
* $promise = Queue::any(10, $jobs, array($browser, 'get'));
* ```
*
* @param int $concurrency concurrency soft limit
* @param array $jobs
* @param callable $handler
* @return PromiseInterface Returns a Promise<mixed> which resolves with a single resolution value
* or rejects when all of the operations reject.
*/
public static function any($concurrency, array $jobs, $handler)
{
// explicitly reject with empty jobs (https://github.com/reactphp/promise/pull/34)
if (!$jobs) {
return Promise\reject(new \UnderflowException('No jobs given'));
}

try {
// limit number of concurrent operations
$q = new self($concurrency, null, $handler);
} catch (\InvalidArgumentException $e) {
// reject if $concurrency or $handler is invalid
return Promise\reject($e);
}

// try invoking all operations and automatically queue excessive ones
$promises = array_map($q, $jobs);

return new Promise\Promise(function ($resolve, $reject) use ($promises) {
Promise\any($promises)->then(function ($result) use ($promises, $resolve) {
// cancel all pending promises if a single result is ready
foreach (array_reverse($promises) as $promise) {
if ($promise instanceof CancellablePromiseInterface) {
$promise->cancel();
}
}

// resolve with original resolution value
$resolve($result);
}, $reject);
}, function () use ($promises) {
// cancel all pending promises on cancellation
foreach (array_reverse($promises) as $promise) {
if ($promise instanceof CancellablePromiseInterface) {
$promise->cancel();
}
}
});
}

/**
* Instantiates a new queue object.
*
Expand Down
3 changes: 2 additions & 1 deletion tests/QueueAllTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ public function testCancelResultingPromiseWillCancelPendingOperation()
$promise->cancel();
}

public function testPendingOperationWillBeCancelledIfOneOperationRejects22222222222()
public function testPendingOperationWillBeStartedAndCancelledIfOneOperationRejects()
{
// second operation will only be started to be cancelled immediately
$first = new Deferred();
$second = new Promise(function () { }, $this->expectCallableOnce());

Expand Down
Loading