Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Retry into Queue Lib #17

Merged
merged 15 commits into from
Jan 17, 2024
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"psr-4": {"Utopia\\Queue\\": "src/Queue"}
},
"autoload-dev": {
"psr-4": {"Utopia\\Tests\\": "tests/Database"}
"psr-4": {"Tests\\E2E\\": "tests/Queue/e2e"}
PineappleIOnic marked this conversation as resolved.
Show resolved Hide resolved
},
"scripts":{
"test": "phpunit",
Expand Down
172 changes: 89 additions & 83 deletions composer.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion phpunit.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
>
<testsuites>
<testsuite name="E2E">
<directory>./tests/Queue/e2e/</directory>
<directory>./tests/Queue/e2e/Adapter</directory>
</testsuite>
</testsuites>
</phpunit>
37 changes: 35 additions & 2 deletions src/Queue/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,47 @@ public function enqueue(array $payload): bool
return $this->connection->leftPushArray("{$this->namespace}.queue.{$this->queue}", $payload);
}

public function retry(int $limit = null): void
PineappleIOnic marked this conversation as resolved.
Show resolved Hide resolved
{
$start = \time();
$processed = 0;

while (true) {
PineappleIOnic marked this conversation as resolved.
Show resolved Hide resolved
$pid = $this->connection->rightPop("{$this->namespace}.failed.{$this->queue}", 5);
christyjacob4 marked this conversation as resolved.
Show resolved Hide resolved

if ($pid === false) {
christyjacob4 marked this conversation as resolved.
Show resolved Hide resolved
break;
}

$job = $this->getJob($pid);

if ($job === false) {
break;
}

if ($job->getTimestamp() >= $start) {
break;
}

if ($limit !== null && $processed >= $limit) {
break;
}

$this->enqueue($job->getPayload());
$processed++;
}
}

public function getJob(string $pid): Message|false
{
$job = $this->connection->get("{$this->namespace}.jobs.{$this->queue}.{$pid}");
$value = $this->connection->get("{$this->namespace}.jobs.{$this->queue}.{$pid}");

if ($job === false) {
if ($value === false) {
return false;
}

$job = json_decode($value, true);

return new Message($job);
}

Expand Down
13 changes: 6 additions & 7 deletions src/Queue/Connection/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
namespace Utopia\Queue\Connection;

use Utopia\Queue\Connection;
use Utopia\Queue\Message;

class Redis implements Connection
{
Expand Down Expand Up @@ -69,7 +68,7 @@ public function rightPopArray(string $queue, int $timeout): array|false
return false;
}

return json_decode($response, true);
return json_decode($response, true) ?? false;
}

public function rightPop(string $queue, int $timeout): string|false
Expand All @@ -91,7 +90,7 @@ public function leftPopArray(string $queue, int $timeout): array|false
return false;
}

return json_decode($response[1], true);
return json_decode($response[1], true) ?? false;
}

public function leftPop(string $queue, int $timeout): string|false
Expand Down Expand Up @@ -152,11 +151,11 @@ public function decrement(string $key): int

public function listRange(string $key, int $total, int $offset): array
{
$start = $offset - 1;
$end = ($total + $offset) -1;
$results = $this->getRedis()->lrange($key, $start, $end);
$start = $offset;
$end = $start + $total - 1;
$results = $this->getRedis()->lRange($key, $start, $end);

return array_map(fn (array $job) => new Message($job), $results);
christyjacob4 marked this conversation as resolved.
Show resolved Hide resolved
return $results;
christyjacob4 marked this conversation as resolved.
Show resolved Hide resolved
}

public function ping(): bool
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
<?php

namespace Utopia\Tests;
namespace Tests\E2E\Adapter;

use PHPUnit\Framework\TestCase;
use Utopia\Queue\Client;
use Utopia\Queue\Connection\Redis;

use function Swoole\Coroutine\go;
use function Swoole\Coroutine\run;

class SwooleTest extends TestCase
abstract class Base extends TestCase
{
private array $payloads;
protected array $payloads;

public function setUp(): void
{
Expand Down Expand Up @@ -58,16 +54,16 @@ public function setUp(): void
];
}

/**
* @return Client
*/
abstract protected function getClient(): Client;

public function testEvents(): void
{
$connection = new Redis('redis', 6379);

$this->assertTrue($connection->ping());

$client = new Client('workerman', $connection);
$client = $this->getClient();
$client->resetStats();


foreach ($this->payloads as $payload) {
$this->assertTrue($client->enqueue($payload));
}
Expand All @@ -81,26 +77,49 @@ public function testEvents(): void
$this->assertEquals(7, $client->sumSuccessfulJobs());
}

public function testSwoole(): void
/**
* @depends testEvents
*/
public function testRetry(): void
{
$connection = new Redis('redis', 6379);
$client = $this->getClient();
$client->resetStats();

$client->enqueue([
'type' => 'test_exception',
'id' => 1
]);
$client->enqueue([
'type' => 'test_exception',
'id' => 2
]);
$client->enqueue([
'type' => 'test_exception',
'id' => 3
]);
$client->enqueue([
'type' => 'test_exception',
'id' => 4
]);

run(function () use ($connection) {
$client = new Client('swoole', $connection);
go(function () use ($client) {
$client->resetStats();
sleep(1);

$this->assertEquals(4, $client->sumTotalJobs());
$this->assertEquals(0, $client->sumProcessingJobs());
$this->assertEquals(4, $client->sumFailedJobs());
$this->assertEquals(0, $client->sumSuccessfulJobs());

foreach ($this->payloads as $payload) {
$this->assertTrue($client->enqueue($payload));
}
$client->resetStats();

sleep(1);
$client->retry();

$this->assertEquals(8, $client->sumTotalJobs());
$this->assertEquals(0, $client->sumProcessingJobs());
$this->assertEquals(1, $client->sumFailedJobs());
$this->assertEquals(7, $client->sumSuccessfulJobs());
});
});
sleep(1);

// Retry will retry ALL failed jobs regardless of if they are still tracked in stats
// Meaning this test has 5 failed jobs due to the previous tests.
$this->assertEquals(5, $client->sumTotalJobs());
$this->assertEquals(0, $client->sumProcessingJobs());
$this->assertEquals(5, $client->sumFailedJobs());
$this->assertEquals(0, $client->sumSuccessfulJobs());
}
}
45 changes: 45 additions & 0 deletions tests/Queue/e2e/Adapter/SwooleTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

namespace Tests\E2E\Adapter;

use Utopia\Queue\Client;
use Utopia\Queue\Connection\Redis;

use function Co\run;

class SwooleTest extends Base
{
protected function getClient(): Client
{
$connection = new Redis('redis', 6379);
$client = new Client('swoole', $connection);

return $client;
}

/**
* @depends testRetry
*/
protected function testSwooleConcurrency(): void
PineappleIOnic marked this conversation as resolved.
Show resolved Hide resolved
{
$connection = new Redis('redis', 6379);

run(function () use ($connection) {
$client = new Client('swoole', $connection);
go(function () use ($client) {
$client->resetStats();

foreach ($this->payloads as $payload) {
$this->assertTrue($client->enqueue($payload));
}

sleep(1);

$this->assertEquals(8, $client->sumTotalJobs());
$this->assertEquals(0, $client->sumProcessingJobs());
$this->assertEquals(1, $client->sumFailedJobs());
$this->assertEquals(7, $client->sumSuccessfulJobs());
});
});
}
}
17 changes: 17 additions & 0 deletions tests/Queue/e2e/Adapter/WorkermanTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

namespace Tests\E2E\Adapter;

use Utopia\Queue\Client;
use Utopia\Queue\Connection\Redis;

class WorkermanTest extends Base
{
protected function getClient(): Client
{
$connection = new Redis('redis', 6379);
$client = new Client('workerman', $connection);

return $client;
}
}