From 77f2272748e9e3228afe9f70940b588fba85a429 Mon Sep 17 00:00:00 2001 From: Bradley Schofield Date: Fri, 29 Dec 2023 11:20:54 +0000 Subject: [PATCH 01/15] Finish Initial Retry Implementation --- composer.lock | 172 +++++++++++++++++--------------- src/Queue/Client.php | 34 ++++++- src/Queue/Connection/Redis.php | 14 +-- tests/Queue/e2e/AdapterTest.php | 95 ++++++++++++++++++ 4 files changed, 223 insertions(+), 92 deletions(-) diff --git a/composer.lock b/composer.lock index df6c65f..50cd1b6 100644 --- a/composer.lock +++ b/composer.lock @@ -57,25 +57,26 @@ }, { "name": "utopia-php/framework", - "version": "0.28.1", + "version": "0.32.0", "source": { "type": "git", "url": "https://github.com/utopia-php/framework.git", - "reference": "7f22c556fc5991e54e5811a68fb39809b21bda55" + "reference": "ad6f7e6d6b38cf5bed4e3af9a1394c59d4bb9225" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/framework/zipball/7f22c556fc5991e54e5811a68fb39809b21bda55", - "reference": "7f22c556fc5991e54e5811a68fb39809b21bda55", + "url": "https://api.github.com/repos/utopia-php/framework/zipball/ad6f7e6d6b38cf5bed4e3af9a1394c59d4bb9225", + "reference": "ad6f7e6d6b38cf5bed4e3af9a1394c59d4bb9225", "shasum": "" }, "require": { - "php": ">=8.0.0" + "php": ">=8.0" }, "require-dev": { "laravel/pint": "^1.2", - "phpunit/phpunit": "^9.5.25", - "vimeo/psalm": "4.27.0" + "phpbench/phpbench": "^1.2", + "phpstan/phpstan": "^1.10", + "phpunit/phpunit": "^9.5.25" }, "type": "library", "autoload": { @@ -95,9 +96,9 @@ ], "support": { "issues": "https://github.com/utopia-php/framework/issues", - "source": "https://github.com/utopia-php/framework/tree/0.28.1" + "source": "https://github.com/utopia-php/framework/tree/0.32.0" }, - "time": "2023-03-02T08:16:01+00:00" + "time": "2023-12-26T14:18:36+00:00" } ], "packages-dev": [ @@ -239,16 +240,16 @@ }, { "name": "myclabs/deep-copy", - "version": "1.11.0", + "version": "1.11.1", "source": { "type": "git", "url": "https://github.com/myclabs/DeepCopy.git", - "reference": "14daed4296fae74d9e3201d2c4925d1acb7aa614" + "reference": "7284c22080590fb39f2ffa3e9057f10a4ddd0e0c" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/myclabs/DeepCopy/zipball/14daed4296fae74d9e3201d2c4925d1acb7aa614", - "reference": "14daed4296fae74d9e3201d2c4925d1acb7aa614", + "url": "https://api.github.com/repos/myclabs/DeepCopy/zipball/7284c22080590fb39f2ffa3e9057f10a4ddd0e0c", + "reference": "7284c22080590fb39f2ffa3e9057f10a4ddd0e0c", "shasum": "" }, "require": { @@ -286,7 +287,7 @@ ], "support": { "issues": "https://github.com/myclabs/DeepCopy/issues", - "source": "https://github.com/myclabs/DeepCopy/tree/1.11.0" + "source": "https://github.com/myclabs/DeepCopy/tree/1.11.1" }, "funding": [ { @@ -294,20 +295,20 @@ "type": "tidelift" } ], - "time": "2022-03-03T13:19:32+00:00" + "time": "2023-03-08T13:26:56+00:00" }, { "name": "nikic/php-parser", - "version": "v4.15.3", + "version": "v4.18.0", "source": { "type": "git", "url": "https://github.com/nikic/PHP-Parser.git", - "reference": "570e980a201d8ed0236b0a62ddf2c9cbb2034039" + "reference": "1bcbb2179f97633e98bbbc87044ee2611c7d7999" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/nikic/PHP-Parser/zipball/570e980a201d8ed0236b0a62ddf2c9cbb2034039", - "reference": "570e980a201d8ed0236b0a62ddf2c9cbb2034039", + "url": "https://api.github.com/repos/nikic/PHP-Parser/zipball/1bcbb2179f97633e98bbbc87044ee2611c7d7999", + "reference": "1bcbb2179f97633e98bbbc87044ee2611c7d7999", "shasum": "" }, "require": { @@ -348,9 +349,9 @@ ], "support": { "issues": "https://github.com/nikic/PHP-Parser/issues", - "source": "https://github.com/nikic/PHP-Parser/tree/v4.15.3" + "source": "https://github.com/nikic/PHP-Parser/tree/v4.18.0" }, - "time": "2023-01-16T22:05:37+00:00" + "time": "2023-12-10T21:03:43+00:00" }, { "name": "phar-io/manifest", @@ -465,16 +466,16 @@ }, { "name": "phpstan/phpstan", - "version": "1.10.3", + "version": "1.10.50", "source": { "type": "git", "url": "https://github.com/phpstan/phpstan.git", - "reference": "5419375b5891add97dc74be71e6c1c34baaddf64" + "reference": "06a98513ac72c03e8366b5a0cb00750b487032e4" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpstan/phpstan/zipball/5419375b5891add97dc74be71e6c1c34baaddf64", - "reference": "5419375b5891add97dc74be71e6c1c34baaddf64", + "url": "https://api.github.com/repos/phpstan/phpstan/zipball/06a98513ac72c03e8366b5a0cb00750b487032e4", + "reference": "06a98513ac72c03e8366b5a0cb00750b487032e4", "shasum": "" }, "require": { @@ -503,8 +504,11 @@ "static analysis" ], "support": { + "docs": "https://phpstan.org/user-guide/getting-started", + "forum": "https://github.com/phpstan/phpstan/discussions", "issues": "https://github.com/phpstan/phpstan/issues", - "source": "https://github.com/phpstan/phpstan/tree/1.10.3" + "security": "https://github.com/phpstan/phpstan/security/policy", + "source": "https://github.com/phpstan/phpstan-src" }, "funding": [ { @@ -520,27 +524,27 @@ "type": "tidelift" } ], - "time": "2023-02-25T14:47:13+00:00" + "time": "2023-12-13T10:59:42+00:00" }, { "name": "phpunit/php-code-coverage", - "version": "9.2.25", + "version": "9.2.30", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/php-code-coverage.git", - "reference": "0e2b40518197a8c0d4b08bc34dfff1c99c508954" + "reference": "ca2bd87d2f9215904682a9cb9bb37dda98e76089" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/php-code-coverage/zipball/0e2b40518197a8c0d4b08bc34dfff1c99c508954", - "reference": "0e2b40518197a8c0d4b08bc34dfff1c99c508954", + "url": "https://api.github.com/repos/sebastianbergmann/php-code-coverage/zipball/ca2bd87d2f9215904682a9cb9bb37dda98e76089", + "reference": "ca2bd87d2f9215904682a9cb9bb37dda98e76089", "shasum": "" }, "require": { "ext-dom": "*", "ext-libxml": "*", "ext-xmlwriter": "*", - "nikic/php-parser": "^4.15", + "nikic/php-parser": "^4.18 || ^5.0", "php": ">=7.3", "phpunit/php-file-iterator": "^3.0.3", "phpunit/php-text-template": "^2.0.2", @@ -555,8 +559,8 @@ "phpunit/phpunit": "^9.3" }, "suggest": { - "ext-pcov": "*", - "ext-xdebug": "*" + "ext-pcov": "PHP extension that provides line coverage", + "ext-xdebug": "PHP extension that provides line coverage as well as branch and path coverage" }, "type": "library", "extra": { @@ -589,7 +593,8 @@ ], "support": { "issues": "https://github.com/sebastianbergmann/php-code-coverage/issues", - "source": "https://github.com/sebastianbergmann/php-code-coverage/tree/9.2.25" + "security": "https://github.com/sebastianbergmann/php-code-coverage/security/policy", + "source": "https://github.com/sebastianbergmann/php-code-coverage/tree/9.2.30" }, "funding": [ { @@ -597,7 +602,7 @@ "type": "github" } ], - "time": "2023-02-25T05:32:00+00:00" + "time": "2023-12-22T06:47:57+00:00" }, { "name": "phpunit/php-file-iterator", @@ -842,16 +847,16 @@ }, { "name": "phpunit/phpunit", - "version": "9.6.4", + "version": "9.6.15", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/phpunit.git", - "reference": "9125ee085b6d95e78277dc07aa1f46f9e0607b8d" + "reference": "05017b80304e0eb3f31d90194a563fd53a6021f1" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/9125ee085b6d95e78277dc07aa1f46f9e0607b8d", - "reference": "9125ee085b6d95e78277dc07aa1f46f9e0607b8d", + "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/05017b80304e0eb3f31d90194a563fd53a6021f1", + "reference": "05017b80304e0eb3f31d90194a563fd53a6021f1", "shasum": "" }, "require": { @@ -866,7 +871,7 @@ "phar-io/manifest": "^2.0.3", "phar-io/version": "^3.0.2", "php": ">=7.3", - "phpunit/php-code-coverage": "^9.2.13", + "phpunit/php-code-coverage": "^9.2.28", "phpunit/php-file-iterator": "^3.0.5", "phpunit/php-invoker": "^3.1.1", "phpunit/php-text-template": "^2.0.3", @@ -884,8 +889,8 @@ "sebastian/version": "^3.0.2" }, "suggest": { - "ext-soap": "*", - "ext-xdebug": "*" + "ext-soap": "To be able to generate mocks based on WSDL files", + "ext-xdebug": "PHP extension that provides line coverage as well as branch and path coverage" }, "bin": [ "phpunit" @@ -924,7 +929,8 @@ ], "support": { "issues": "https://github.com/sebastianbergmann/phpunit/issues", - "source": "https://github.com/sebastianbergmann/phpunit/tree/9.6.4" + "security": "https://github.com/sebastianbergmann/phpunit/security/policy", + "source": "https://github.com/sebastianbergmann/phpunit/tree/9.6.15" }, "funding": [ { @@ -940,7 +946,7 @@ "type": "tidelift" } ], - "time": "2023-02-27T13:06:37+00:00" + "time": "2023-12-01T16:55:19+00:00" }, { "name": "sebastian/cli-parser", @@ -1185,20 +1191,20 @@ }, { "name": "sebastian/complexity", - "version": "2.0.2", + "version": "2.0.3", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/complexity.git", - "reference": "739b35e53379900cc9ac327b2147867b8b6efd88" + "reference": "25f207c40d62b8b7aa32f5ab026c53561964053a" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/complexity/zipball/739b35e53379900cc9ac327b2147867b8b6efd88", - "reference": "739b35e53379900cc9ac327b2147867b8b6efd88", + "url": "https://api.github.com/repos/sebastianbergmann/complexity/zipball/25f207c40d62b8b7aa32f5ab026c53561964053a", + "reference": "25f207c40d62b8b7aa32f5ab026c53561964053a", "shasum": "" }, "require": { - "nikic/php-parser": "^4.7", + "nikic/php-parser": "^4.18 || ^5.0", "php": ">=7.3" }, "require-dev": { @@ -1230,7 +1236,7 @@ "homepage": "https://github.com/sebastianbergmann/complexity", "support": { "issues": "https://github.com/sebastianbergmann/complexity/issues", - "source": "https://github.com/sebastianbergmann/complexity/tree/2.0.2" + "source": "https://github.com/sebastianbergmann/complexity/tree/2.0.3" }, "funding": [ { @@ -1238,20 +1244,20 @@ "type": "github" } ], - "time": "2020-10-26T15:52:27+00:00" + "time": "2023-12-22T06:19:30+00:00" }, { "name": "sebastian/diff", - "version": "4.0.4", + "version": "4.0.5", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/diff.git", - "reference": "3461e3fccc7cfdfc2720be910d3bd73c69be590d" + "reference": "74be17022044ebaaecfdf0c5cd504fc9cd5a7131" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/diff/zipball/3461e3fccc7cfdfc2720be910d3bd73c69be590d", - "reference": "3461e3fccc7cfdfc2720be910d3bd73c69be590d", + "url": "https://api.github.com/repos/sebastianbergmann/diff/zipball/74be17022044ebaaecfdf0c5cd504fc9cd5a7131", + "reference": "74be17022044ebaaecfdf0c5cd504fc9cd5a7131", "shasum": "" }, "require": { @@ -1296,7 +1302,7 @@ ], "support": { "issues": "https://github.com/sebastianbergmann/diff/issues", - "source": "https://github.com/sebastianbergmann/diff/tree/4.0.4" + "source": "https://github.com/sebastianbergmann/diff/tree/4.0.5" }, "funding": [ { @@ -1304,7 +1310,7 @@ "type": "github" } ], - "time": "2020-10-26T13:10:38+00:00" + "time": "2023-05-07T05:35:17+00:00" }, { "name": "sebastian/environment", @@ -1448,16 +1454,16 @@ }, { "name": "sebastian/global-state", - "version": "5.0.5", + "version": "5.0.6", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/global-state.git", - "reference": "0ca8db5a5fc9c8646244e629625ac486fa286bf2" + "reference": "bde739e7565280bda77be70044ac1047bc007e34" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/global-state/zipball/0ca8db5a5fc9c8646244e629625ac486fa286bf2", - "reference": "0ca8db5a5fc9c8646244e629625ac486fa286bf2", + "url": "https://api.github.com/repos/sebastianbergmann/global-state/zipball/bde739e7565280bda77be70044ac1047bc007e34", + "reference": "bde739e7565280bda77be70044ac1047bc007e34", "shasum": "" }, "require": { @@ -1500,7 +1506,7 @@ ], "support": { "issues": "https://github.com/sebastianbergmann/global-state/issues", - "source": "https://github.com/sebastianbergmann/global-state/tree/5.0.5" + "source": "https://github.com/sebastianbergmann/global-state/tree/5.0.6" }, "funding": [ { @@ -1508,24 +1514,24 @@ "type": "github" } ], - "time": "2022-02-14T08:28:10+00:00" + "time": "2023-08-02T09:26:13+00:00" }, { "name": "sebastian/lines-of-code", - "version": "1.0.3", + "version": "1.0.4", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/lines-of-code.git", - "reference": "c1c2e997aa3146983ed888ad08b15470a2e22ecc" + "reference": "e1e4a170560925c26d424b6a03aed157e7dcc5c5" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/lines-of-code/zipball/c1c2e997aa3146983ed888ad08b15470a2e22ecc", - "reference": "c1c2e997aa3146983ed888ad08b15470a2e22ecc", + "url": "https://api.github.com/repos/sebastianbergmann/lines-of-code/zipball/e1e4a170560925c26d424b6a03aed157e7dcc5c5", + "reference": "e1e4a170560925c26d424b6a03aed157e7dcc5c5", "shasum": "" }, "require": { - "nikic/php-parser": "^4.6", + "nikic/php-parser": "^4.18 || ^5.0", "php": ">=7.3" }, "require-dev": { @@ -1557,7 +1563,7 @@ "homepage": "https://github.com/sebastianbergmann/lines-of-code", "support": { "issues": "https://github.com/sebastianbergmann/lines-of-code/issues", - "source": "https://github.com/sebastianbergmann/lines-of-code/tree/1.0.3" + "source": "https://github.com/sebastianbergmann/lines-of-code/tree/1.0.4" }, "funding": [ { @@ -1565,7 +1571,7 @@ "type": "github" } ], - "time": "2020-11-28T06:42:11+00:00" + "time": "2023-12-22T06:20:34+00:00" }, { "name": "sebastian/object-enumerator", @@ -1950,16 +1956,16 @@ }, { "name": "theseer/tokenizer", - "version": "1.2.1", + "version": "1.2.2", "source": { "type": "git", "url": "https://github.com/theseer/tokenizer.git", - "reference": "34a41e998c2183e22995f158c581e7b5e755ab9e" + "reference": "b2ad5003ca10d4ee50a12da31de12a5774ba6b96" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/theseer/tokenizer/zipball/34a41e998c2183e22995f158c581e7b5e755ab9e", - "reference": "34a41e998c2183e22995f158c581e7b5e755ab9e", + "url": "https://api.github.com/repos/theseer/tokenizer/zipball/b2ad5003ca10d4ee50a12da31de12a5774ba6b96", + "reference": "b2ad5003ca10d4ee50a12da31de12a5774ba6b96", "shasum": "" }, "require": { @@ -1988,7 +1994,7 @@ "description": "A small library for converting tokenized PHP source code into XML and potentially other formats", "support": { "issues": "https://github.com/theseer/tokenizer/issues", - "source": "https://github.com/theseer/tokenizer/tree/1.2.1" + "source": "https://github.com/theseer/tokenizer/tree/1.2.2" }, "funding": [ { @@ -1996,20 +2002,20 @@ "type": "github" } ], - "time": "2021-07-28T10:34:58+00:00" + "time": "2023-11-20T00:12:19+00:00" }, { "name": "workerman/workerman", - "version": "v4.1.8", + "version": "v4.1.14", "source": { "type": "git", "url": "https://github.com/walkor/workerman.git", - "reference": "0df2093b09296f07f81e4cdfbe0582f3dfa183f5" + "reference": "f7c9667c7b5387c01fa9e50ee79ed931e93ee76e" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/walkor/workerman/zipball/0df2093b09296f07f81e4cdfbe0582f3dfa183f5", - "reference": "0df2093b09296f07f81e4cdfbe0582f3dfa183f5", + "url": "https://api.github.com/repos/walkor/workerman/zipball/f7c9667c7b5387c01fa9e50ee79ed931e93ee76e", + "reference": "f7c9667c7b5387c01fa9e50ee79ed931e93ee76e", "shasum": "" }, "require": { @@ -2059,7 +2065,7 @@ "type": "patreon" } ], - "time": "2023-02-19T03:27:11+00:00" + "time": "2023-08-09T03:37:45+00:00" } ], "aliases": [], @@ -2071,5 +2077,5 @@ "php": ">=8.0" }, "platform-dev": [], - "plugin-api-version": "2.2.0" + "plugin-api-version": "2.6.0" } diff --git a/src/Queue/Client.php b/src/Queue/Client.php index 967b6be..b64919c 100644 --- a/src/Queue/Client.php +++ b/src/Queue/Client.php @@ -26,14 +26,44 @@ public function enqueue(array $payload): bool return $this->connection->leftPushArray("{$this->namespace}.queue.{$this->queue}", $payload); } + public function retryFailedJobs(): void + { + $pids = []; + + while (true) { + $jobIds = $this->connection->listRange("{$this->namespace}.failed.{$this->queue}", 100, count($pids)); + + foreach ($jobIds as $jobId) { + $pids[] = $jobId; + } + + if (count($jobIds) < 100) { + break; + } + } + + foreach ($pids as $pid) { + $job = $this->getJob($pid); + + if ($job === false) { + continue; + } + + $this->connection->listRemove("{$this->namespace}.failed.{$this->queue}", $pid); + $this->enqueue($job->getPayload()); + } + } + public function getJob(string $pid): Message|false { - $job = $this->connection->get("{$this->namespace}.jobs.{$this->queue}.{$pid}"); + $value = $this->connection->get("{$this->namespace}.jobs.{$this->queue}.{$pid}"); - if ($job === false) { + if ($value === false) { return false; } + $job = json_decode($value, true); + return new Message($job); } diff --git a/src/Queue/Connection/Redis.php b/src/Queue/Connection/Redis.php index 68bc0ae..2348870 100644 --- a/src/Queue/Connection/Redis.php +++ b/src/Queue/Connection/Redis.php @@ -69,7 +69,7 @@ public function rightPopArray(string $queue, int $timeout): array|false return false; } - return json_decode($response, true); + return json_decode($response, true) ?? false; } public function rightPop(string $queue, int $timeout): string|false @@ -91,7 +91,7 @@ public function leftPopArray(string $queue, int $timeout): array|false return false; } - return json_decode($response[1], true); + return json_decode($response[1], true) ?? false; } public function leftPop(string $queue, int $timeout): string|false @@ -152,11 +152,11 @@ public function decrement(string $key): int public function listRange(string $key, int $total, int $offset): array { - $start = $offset - 1; - $end = ($total + $offset) -1; - $results = $this->getRedis()->lrange($key, $start, $end); - - return array_map(fn (array $job) => new Message($job), $results); + $start = $offset; + $end = $start + $total - 1; + $results = $this->getRedis()->lRange($key, $start, $end); + + return $results; } public function ping(): bool diff --git a/tests/Queue/e2e/AdapterTest.php b/tests/Queue/e2e/AdapterTest.php index ee09894..dc7848a 100644 --- a/tests/Queue/e2e/AdapterTest.php +++ b/tests/Queue/e2e/AdapterTest.php @@ -103,4 +103,99 @@ public function testSwoole(): void }); }); } + + /** + * @depends testSwoole + */ + public function testRetrySwoole(): void + { + $connection = new Redis('redis', 6379); + $client = new Client('swoole', $connection); + $client->resetStats(); + + $client->enqueue([ + 'type' => 'test_exception' + ]); + $client->enqueue([ + 'type' => 'test_exception' + ]); + $client->enqueue([ + 'type' => 'test_exception' + ]); + $client->enqueue([ + 'type' => 'test_exception' + ]); + + sleep(1); + + $this->assertEquals(4, $client->sumTotalJobs()); + $this->assertEquals(0, $client->sumProcessingJobs()); + $this->assertEquals(4, $client->sumFailedJobs()); + $this->assertEquals(0, $client->sumSuccessfulJobs()); + + $client->resetStats(); + + $this->assertEquals(0, $client->sumTotalJobs()); + $this->assertEquals(0, $client->sumProcessingJobs()); + $this->assertEquals(0, $client->sumFailedJobs()); + $this->assertEquals(0, $client->sumSuccessfulJobs()); + + $client->retryFailedJobs(); + + sleep(1); + + // Retry will retry ALL failed jobs regardless of if they are still tracked in stats + // Meaning this test has 5 failed jobs due to the previous tests. + $this->assertEquals(5, $client->sumTotalJobs()); + $this->assertEquals(0, $client->sumProcessingJobs()); + $this->assertEquals(5, $client->sumFailedJobs()); + $this->assertEquals(0, $client->sumSuccessfulJobs()); + } + + /** + * @depends testEvents + */ + public function testRetryEvents(): void + { + $connection = new Redis('redis', 6379); + + $client = new Client('workerman', $connection); + $client->resetStats(); + + $client->enqueue([ + 'type' => 'test_exception' + ]); + $client->enqueue([ + 'type' => 'test_exception' + ]); + $client->enqueue([ + 'type' => 'test_exception' + ]); + $client->enqueue([ + 'type' => 'test_exception' + ]); + + sleep(1); + + $this->assertEquals(4, $client->sumTotalJobs()); + $this->assertEquals(0, $client->sumProcessingJobs()); + $this->assertEquals(4, $client->sumFailedJobs()); + $this->assertEquals(0, $client->sumSuccessfulJobs()); + + $client->resetStats(); + + $this->assertEquals(0, $client->sumTotalJobs()); + $this->assertEquals(0, $client->sumProcessingJobs()); + $this->assertEquals(0, $client->sumFailedJobs()); + $this->assertEquals(0, $client->sumSuccessfulJobs()); + + $client->retryFailedJobs(); + + sleep(1); + + $this->assertEquals(5, $client->sumTotalJobs()); + $this->assertEquals(0, $client->sumProcessingJobs()); + $this->assertEquals(5, $client->sumFailedJobs()); + $this->assertEquals(0, $client->sumSuccessfulJobs()); + } } From 99ea60d23791c5e02d4fa9d10452491f8557df3c Mon Sep 17 00:00:00 2001 From: Bradley Schofield Date: Fri, 29 Dec 2023 11:22:30 +0000 Subject: [PATCH 02/15] Run Linter --- src/Queue/Connection/Redis.php | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Queue/Connection/Redis.php b/src/Queue/Connection/Redis.php index 2348870..4536bb3 100644 --- a/src/Queue/Connection/Redis.php +++ b/src/Queue/Connection/Redis.php @@ -3,7 +3,6 @@ namespace Utopia\Queue\Connection; use Utopia\Queue\Connection; -use Utopia\Queue\Message; class Redis implements Connection { @@ -155,7 +154,7 @@ public function listRange(string $key, int $total, int $offset): array $start = $offset; $end = $start + $total - 1; $results = $this->getRedis()->lRange($key, $start, $end); - + return $results; } From 19d58a7a1f84cdcc947b5dbe2176cb1ab2c0ab7f Mon Sep 17 00:00:00 2001 From: Bradley Schofield Date: Fri, 29 Dec 2023 12:58:32 +0000 Subject: [PATCH 03/15] Use rightPop instead of loading stuff into memory --- src/Queue/Client.php | 27 +++++++++++++++------------ tests/Queue/e2e/AdapterTest.php | 4 ++-- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/src/Queue/Client.php b/src/Queue/Client.php index b64919c..2f237c9 100644 --- a/src/Queue/Client.php +++ b/src/Queue/Client.php @@ -26,31 +26,34 @@ public function enqueue(array $payload): bool return $this->connection->leftPushArray("{$this->namespace}.queue.{$this->queue}", $payload); } - public function retryFailedJobs(): void + public function retry(int $limit = null): void { - $pids = []; + $start = \time(); + $processed = 0; while (true) { - $jobIds = $this->connection->listRange("{$this->namespace}.failed.{$this->queue}", 100, count($pids)); + $pid = $this->connection->rightPop("{$this->namespace}.failed.{$this->queue}", 5); - foreach ($jobIds as $jobId) { - $pids[] = $jobId; - } - - if (count($jobIds) < 100) { + if ($pid === false) { break; } - } - foreach ($pids as $pid) { $job = $this->getJob($pid); if ($job === false) { - continue; + break; + } + + if ($job->getTimestamp() >= $start) { + break; + } + + if ($limit !== null && $processed >= $limit) { + break; } - $this->connection->listRemove("{$this->namespace}.failed.{$this->queue}", $pid); $this->enqueue($job->getPayload()); + $processed++; } } diff --git a/tests/Queue/e2e/AdapterTest.php b/tests/Queue/e2e/AdapterTest.php index dc7848a..401d8b0 100644 --- a/tests/Queue/e2e/AdapterTest.php +++ b/tests/Queue/e2e/AdapterTest.php @@ -140,7 +140,7 @@ public function testRetrySwoole(): void $this->assertEquals(0, $client->sumFailedJobs()); $this->assertEquals(0, $client->sumSuccessfulJobs()); - $client->retryFailedJobs(); + $client->retry(); sleep(1); @@ -189,7 +189,7 @@ public function testRetryEvents(): void $this->assertEquals(0, $client->sumFailedJobs()); $this->assertEquals(0, $client->sumSuccessfulJobs()); - $client->retryFailedJobs(); + $client->retry(); sleep(1); From 406eee333c4333986a4c7914240ab8c07a37c5b8 Mon Sep 17 00:00:00 2001 From: Bradley Schofield Date: Fri, 29 Dec 2023 14:18:23 +0000 Subject: [PATCH 04/15] Refactor Tests --- composer.json | 2 +- phpunit.xml | 2 +- tests/Queue/e2e/Adapter/Base.php | 125 ++++++++++++++ tests/Queue/e2e/Adapter/SwooleTest.php | 45 +++++ tests/Queue/e2e/Adapter/WorkermanTest.php | 17 ++ tests/Queue/e2e/AdapterTest.php | 201 ---------------------- 6 files changed, 189 insertions(+), 203 deletions(-) create mode 100644 tests/Queue/e2e/Adapter/Base.php create mode 100644 tests/Queue/e2e/Adapter/SwooleTest.php create mode 100644 tests/Queue/e2e/Adapter/WorkermanTest.php delete mode 100644 tests/Queue/e2e/AdapterTest.php diff --git a/composer.json b/composer.json index 43ca961..c7db275 100644 --- a/composer.json +++ b/composer.json @@ -15,7 +15,7 @@ "psr-4": {"Utopia\\Queue\\": "src/Queue"} }, "autoload-dev": { - "psr-4": {"Utopia\\Tests\\": "tests/Database"} + "psr-4": {"Tests\\E2E\\": "tests/Queue/e2e"} }, "scripts":{ "test": "phpunit", diff --git a/phpunit.xml b/phpunit.xml index b0e7e34..4b64bc0 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -10,7 +10,7 @@ > - ./tests/Queue/e2e/ + ./tests/Queue/e2e/Adapter \ No newline at end of file diff --git a/tests/Queue/e2e/Adapter/Base.php b/tests/Queue/e2e/Adapter/Base.php new file mode 100644 index 0000000..9218d7a --- /dev/null +++ b/tests/Queue/e2e/Adapter/Base.php @@ -0,0 +1,125 @@ +payloads = []; + $this->payloads[] = [ + 'type' => 'test_string', + 'value' => 'lorem ipsum' + ]; + $this->payloads[] = [ + 'type' => 'test_number', + 'value' => 123 + ]; + $this->payloads[] = [ + 'type' => 'test_number', + 'value' => 123.456 + ]; + $this->payloads[] = [ + 'type' => 'test_bool', + 'value' => true + ]; + $this->payloads[] = [ + 'type' => 'test_null', + 'value' => null + ]; + $this->payloads[] = [ + 'type' => 'test_array', + 'value' => [ + 1, + 2, + 3 + ] + ]; + $this->payloads[] = [ + 'type' => 'test_assoc', + 'value' => [ + 'string' => 'ipsum', + 'number' => 123, + 'bool' => true, + 'null' => null + ] + ]; + $this->payloads[] = [ + 'type' => 'test_exception' + ]; + } + + /** + * @return Client + */ + abstract protected function getClient(): Client; + + public function testEvents(): void + { + $client = $this->getClient(); + $client->resetStats(); + + foreach ($this->payloads as $payload) { + $this->assertTrue($client->enqueue($payload)); + } + + sleep(1); + + $this->assertEquals(8, $client->sumTotalJobs()); + $this->assertEquals(0, $client->getQueueSize()); + $this->assertEquals(0, $client->sumProcessingJobs()); + $this->assertEquals(1, $client->sumFailedJobs()); + $this->assertEquals(7, $client->sumSuccessfulJobs()); + } + + /** + * @depends testEvents + */ + public function testRetry(): void + { + $client = $this->getClient(); + $client->resetStats(); + + $client->enqueue([ + 'type' => 'test_exception', + 'id' => 1 + ]); + $client->enqueue([ + 'type' => 'test_exception', + 'id' => 2 + ]); + $client->enqueue([ + 'type' => 'test_exception', + 'id' => 3 + ]); + $client->enqueue([ + 'type' => 'test_exception', + 'id' => 4 + ]); + + sleep(1); + + $this->assertEquals(4, $client->sumTotalJobs()); + $this->assertEquals(0, $client->sumProcessingJobs()); + $this->assertEquals(4, $client->sumFailedJobs()); + $this->assertEquals(0, $client->sumSuccessfulJobs()); + + $client->resetStats(); + + $client->retry(); + + sleep(1); + + // Retry will retry ALL failed jobs regardless of if they are still tracked in stats + // Meaning this test has 5 failed jobs due to the previous tests. + $this->assertEquals(5, $client->sumTotalJobs()); + $this->assertEquals(0, $client->sumProcessingJobs()); + $this->assertEquals(5, $client->sumFailedJobs()); + $this->assertEquals(0, $client->sumSuccessfulJobs()); + } +} diff --git a/tests/Queue/e2e/Adapter/SwooleTest.php b/tests/Queue/e2e/Adapter/SwooleTest.php new file mode 100644 index 0000000..289f1c0 --- /dev/null +++ b/tests/Queue/e2e/Adapter/SwooleTest.php @@ -0,0 +1,45 @@ +resetStats(); + + foreach ($this->payloads as $payload) { + $this->assertTrue($client->enqueue($payload)); + } + + sleep(1); + + $this->assertEquals(8, $client->sumTotalJobs()); + $this->assertEquals(0, $client->sumProcessingJobs()); + $this->assertEquals(1, $client->sumFailedJobs()); + $this->assertEquals(7, $client->sumSuccessfulJobs()); + }); + }); + } +} \ No newline at end of file diff --git a/tests/Queue/e2e/Adapter/WorkermanTest.php b/tests/Queue/e2e/Adapter/WorkermanTest.php new file mode 100644 index 0000000..6764faa --- /dev/null +++ b/tests/Queue/e2e/Adapter/WorkermanTest.php @@ -0,0 +1,17 @@ +payloads = []; - $this->payloads[] = [ - 'type' => 'test_string', - 'value' => 'lorem ipsum' - ]; - $this->payloads[] = [ - 'type' => 'test_number', - 'value' => 123 - ]; - $this->payloads[] = [ - 'type' => 'test_number', - 'value' => 123.456 - ]; - $this->payloads[] = [ - 'type' => 'test_bool', - 'value' => true - ]; - $this->payloads[] = [ - 'type' => 'test_null', - 'value' => null - ]; - $this->payloads[] = [ - 'type' => 'test_array', - 'value' => [ - 1, - 2, - 3 - ] - ]; - $this->payloads[] = [ - 'type' => 'test_assoc', - 'value' => [ - 'string' => 'ipsum', - 'number' => 123, - 'bool' => true, - 'null' => null - ] - ]; - $this->payloads[] = [ - 'type' => 'test_exception' - ]; - } - - public function testEvents(): void - { - $connection = new Redis('redis', 6379); - - $this->assertTrue($connection->ping()); - - $client = new Client('workerman', $connection); - $client->resetStats(); - - - foreach ($this->payloads as $payload) { - $this->assertTrue($client->enqueue($payload)); - } - - sleep(1); - - $this->assertEquals(8, $client->sumTotalJobs()); - $this->assertEquals(0, $client->getQueueSize()); - $this->assertEquals(0, $client->sumProcessingJobs()); - $this->assertEquals(1, $client->sumFailedJobs()); - $this->assertEquals(7, $client->sumSuccessfulJobs()); - } - - public function testSwoole(): void - { - $connection = new Redis('redis', 6379); - - run(function () use ($connection) { - $client = new Client('swoole', $connection); - go(function () use ($client) { - $client->resetStats(); - - foreach ($this->payloads as $payload) { - $this->assertTrue($client->enqueue($payload)); - } - - sleep(1); - - $this->assertEquals(8, $client->sumTotalJobs()); - $this->assertEquals(0, $client->sumProcessingJobs()); - $this->assertEquals(1, $client->sumFailedJobs()); - $this->assertEquals(7, $client->sumSuccessfulJobs()); - }); - }); - } - - /** - * @depends testSwoole - */ - public function testRetrySwoole(): void - { - $connection = new Redis('redis', 6379); - $client = new Client('swoole', $connection); - $client->resetStats(); - - $client->enqueue([ - 'type' => 'test_exception' - ]); - $client->enqueue([ - 'type' => 'test_exception' - ]); - $client->enqueue([ - 'type' => 'test_exception' - ]); - $client->enqueue([ - 'type' => 'test_exception' - ]); - - sleep(1); - - $this->assertEquals(4, $client->sumTotalJobs()); - $this->assertEquals(0, $client->sumProcessingJobs()); - $this->assertEquals(4, $client->sumFailedJobs()); - $this->assertEquals(0, $client->sumSuccessfulJobs()); - - $client->resetStats(); - - $this->assertEquals(0, $client->sumTotalJobs()); - $this->assertEquals(0, $client->sumProcessingJobs()); - $this->assertEquals(0, $client->sumFailedJobs()); - $this->assertEquals(0, $client->sumSuccessfulJobs()); - - $client->retry(); - - sleep(1); - - // Retry will retry ALL failed jobs regardless of if they are still tracked in stats - // Meaning this test has 5 failed jobs due to the previous tests. - $this->assertEquals(5, $client->sumTotalJobs()); - $this->assertEquals(0, $client->sumProcessingJobs()); - $this->assertEquals(5, $client->sumFailedJobs()); - $this->assertEquals(0, $client->sumSuccessfulJobs()); - } - - /** - * @depends testEvents - */ - public function testRetryEvents(): void - { - $connection = new Redis('redis', 6379); - - $client = new Client('workerman', $connection); - $client->resetStats(); - - $client->enqueue([ - 'type' => 'test_exception' - ]); - $client->enqueue([ - 'type' => 'test_exception' - ]); - $client->enqueue([ - 'type' => 'test_exception' - ]); - $client->enqueue([ - 'type' => 'test_exception' - ]); - - sleep(1); - - $this->assertEquals(4, $client->sumTotalJobs()); - $this->assertEquals(0, $client->sumProcessingJobs()); - $this->assertEquals(4, $client->sumFailedJobs()); - $this->assertEquals(0, $client->sumSuccessfulJobs()); - - $client->resetStats(); - - $this->assertEquals(0, $client->sumTotalJobs()); - $this->assertEquals(0, $client->sumProcessingJobs()); - $this->assertEquals(0, $client->sumFailedJobs()); - $this->assertEquals(0, $client->sumSuccessfulJobs()); - - $client->retry(); - - sleep(1); - - $this->assertEquals(5, $client->sumTotalJobs()); - $this->assertEquals(0, $client->sumProcessingJobs()); - $this->assertEquals(5, $client->sumFailedJobs()); - $this->assertEquals(0, $client->sumSuccessfulJobs()); - } -} From 7d5d9ff9d6acf68168ec4e82f3b44d2c463bc9d8 Mon Sep 17 00:00:00 2001 From: Bradley Schofield Date: Fri, 29 Dec 2023 14:20:04 +0000 Subject: [PATCH 05/15] Run Linter --- tests/Queue/e2e/Adapter/SwooleTest.php | 2 +- tests/Queue/e2e/Adapter/WorkermanTest.php | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/Queue/e2e/Adapter/SwooleTest.php b/tests/Queue/e2e/Adapter/SwooleTest.php index 289f1c0..2a70e44 100644 --- a/tests/Queue/e2e/Adapter/SwooleTest.php +++ b/tests/Queue/e2e/Adapter/SwooleTest.php @@ -42,4 +42,4 @@ protected function testSwooleConcurrency(): void }); }); } -} \ No newline at end of file +} diff --git a/tests/Queue/e2e/Adapter/WorkermanTest.php b/tests/Queue/e2e/Adapter/WorkermanTest.php index 6764faa..e0cd1ed 100644 --- a/tests/Queue/e2e/Adapter/WorkermanTest.php +++ b/tests/Queue/e2e/Adapter/WorkermanTest.php @@ -14,4 +14,4 @@ protected function getClient(): Client return $client; } -} \ No newline at end of file +} From 97c77dae7c53184c3b820e308ec84a97e24d7a80 Mon Sep 17 00:00:00 2001 From: Bradley Schofield Date: Fri, 29 Dec 2023 14:30:44 +0000 Subject: [PATCH 06/15] Update Base.php --- tests/Queue/e2e/Adapter/Base.php | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/Queue/e2e/Adapter/Base.php b/tests/Queue/e2e/Adapter/Base.php index 9218d7a..ccc73d6 100644 --- a/tests/Queue/e2e/Adapter/Base.php +++ b/tests/Queue/e2e/Adapter/Base.php @@ -121,5 +121,16 @@ public function testRetry(): void $this->assertEquals(0, $client->sumProcessingJobs()); $this->assertEquals(5, $client->sumFailedJobs()); $this->assertEquals(0, $client->sumSuccessfulJobs()); + + $client->resetStats(); + + $client->retry(2); + + sleep(1); + + $this->assertEquals(2, $client->sumTotalJobs()); + $this->assertEquals(0, $client->sumProcessingJobs()); + $this->assertEquals(2, $client->sumFailedJobs()); + $this->assertEquals(0, $client->sumSuccessfulJobs()); } } From 7dba0c8dba3a5c88397e0e740d6d8e6ebf8808e4 Mon Sep 17 00:00:00 2001 From: Bradley Schofield Date: Fri, 29 Dec 2023 14:37:12 +0000 Subject: [PATCH 07/15] Address Eldad's Comments --- composer.json | 2 +- tests/Queue/e2e/Adapter/Base.php | 23 +++++++++++++++++++++++ tests/Queue/e2e/Adapter/SwooleTest.php | 26 -------------------------- 3 files changed, 24 insertions(+), 27 deletions(-) diff --git a/composer.json b/composer.json index c7db275..3d2dcb3 100644 --- a/composer.json +++ b/composer.json @@ -15,7 +15,7 @@ "psr-4": {"Utopia\\Queue\\": "src/Queue"} }, "autoload-dev": { - "psr-4": {"Tests\\E2E\\": "tests/Queue/e2e"} + "psr-4": {"Tests\\E2E\\": "tests/Queue/E2E"} }, "scripts":{ "test": "phpunit", diff --git a/tests/Queue/e2e/Adapter/Base.php b/tests/Queue/e2e/Adapter/Base.php index ccc73d6..b9718f5 100644 --- a/tests/Queue/e2e/Adapter/Base.php +++ b/tests/Queue/e2e/Adapter/Base.php @@ -5,6 +5,8 @@ use PHPUnit\Framework\TestCase; use Utopia\Queue\Client; +use function Co\run; + abstract class Base extends TestCase { protected array $payloads; @@ -77,6 +79,27 @@ public function testEvents(): void $this->assertEquals(7, $client->sumSuccessfulJobs()); } + protected function testConcurrency(): void + { + run(function () { + $client = $this->getClient(); + go(function () use ($client) { + $client->resetStats(); + + foreach ($this->payloads as $payload) { + $this->assertTrue($client->enqueue($payload)); + } + + sleep(1); + + $this->assertEquals(8, $client->sumTotalJobs()); + $this->assertEquals(0, $client->sumProcessingJobs()); + $this->assertEquals(1, $client->sumFailedJobs()); + $this->assertEquals(7, $client->sumSuccessfulJobs()); + }); + }); + } + /** * @depends testEvents */ diff --git a/tests/Queue/e2e/Adapter/SwooleTest.php b/tests/Queue/e2e/Adapter/SwooleTest.php index 2a70e44..a265381 100644 --- a/tests/Queue/e2e/Adapter/SwooleTest.php +++ b/tests/Queue/e2e/Adapter/SwooleTest.php @@ -16,30 +16,4 @@ protected function getClient(): Client return $client; } - - /** - * @depends testRetry - */ - protected function testSwooleConcurrency(): void - { - $connection = new Redis('redis', 6379); - - run(function () use ($connection) { - $client = new Client('swoole', $connection); - go(function () use ($client) { - $client->resetStats(); - - foreach ($this->payloads as $payload) { - $this->assertTrue($client->enqueue($payload)); - } - - sleep(1); - - $this->assertEquals(8, $client->sumTotalJobs()); - $this->assertEquals(0, $client->sumProcessingJobs()); - $this->assertEquals(1, $client->sumFailedJobs()); - $this->assertEquals(7, $client->sumSuccessfulJobs()); - }); - }); - } } From af0b8c25158100e5940a9ec6cd4aae800ce921e1 Mon Sep 17 00:00:00 2001 From: Bradley Schofield Date: Fri, 29 Dec 2023 14:45:04 +0000 Subject: [PATCH 08/15] Run Linter --- tests/Queue/e2e/Adapter/SwooleTest.php | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/Queue/e2e/Adapter/SwooleTest.php b/tests/Queue/e2e/Adapter/SwooleTest.php index a265381..e940c98 100644 --- a/tests/Queue/e2e/Adapter/SwooleTest.php +++ b/tests/Queue/e2e/Adapter/SwooleTest.php @@ -5,8 +5,6 @@ use Utopia\Queue\Client; use Utopia\Queue\Connection\Redis; -use function Co\run; - class SwooleTest extends Base { protected function getClient(): Client From 4c0397b06c25997fcbef046e64b0af90bdddbd15 Mon Sep 17 00:00:00 2001 From: Bradley Schofield Date: Fri, 29 Dec 2023 15:03:07 +0000 Subject: [PATCH 09/15] Git Gymnastics Git ignored when I renamed it from e2e to E2E so I had to manually mv it using the cli --- tests/Queue/{e2e => E2E}/Adapter/Base.php | 0 tests/Queue/{e2e => E2E}/Adapter/SwooleTest.php | 0 tests/Queue/{e2e => E2E}/Adapter/WorkermanTest.php | 0 3 files changed, 0 insertions(+), 0 deletions(-) rename tests/Queue/{e2e => E2E}/Adapter/Base.php (100%) rename tests/Queue/{e2e => E2E}/Adapter/SwooleTest.php (100%) rename tests/Queue/{e2e => E2E}/Adapter/WorkermanTest.php (100%) diff --git a/tests/Queue/e2e/Adapter/Base.php b/tests/Queue/E2E/Adapter/Base.php similarity index 100% rename from tests/Queue/e2e/Adapter/Base.php rename to tests/Queue/E2E/Adapter/Base.php diff --git a/tests/Queue/e2e/Adapter/SwooleTest.php b/tests/Queue/E2E/Adapter/SwooleTest.php similarity index 100% rename from tests/Queue/e2e/Adapter/SwooleTest.php rename to tests/Queue/E2E/Adapter/SwooleTest.php diff --git a/tests/Queue/e2e/Adapter/WorkermanTest.php b/tests/Queue/E2E/Adapter/WorkermanTest.php similarity index 100% rename from tests/Queue/e2e/Adapter/WorkermanTest.php rename to tests/Queue/E2E/Adapter/WorkermanTest.php From 9930184b3e9f5c92b5298763968f4b1d92fbe6e2 Mon Sep 17 00:00:00 2001 From: Bradley Schofield Date: Fri, 29 Dec 2023 15:06:54 +0000 Subject: [PATCH 10/15] Update phpunit.xml --- phpunit.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/phpunit.xml b/phpunit.xml index 4b64bc0..1b8f40d 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -10,7 +10,7 @@ > - ./tests/Queue/e2e/Adapter + ./tests/Queue/E2E/Adapter \ No newline at end of file From cd6d836800dda8827fa069bc06c37bd8c2ef38b2 Mon Sep 17 00:00:00 2001 From: Bradley Schofield Date: Tue, 9 Jan 2024 15:30:01 +0000 Subject: [PATCH 11/15] Update Client.php --- src/Queue/Client.php | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/Queue/Client.php b/src/Queue/Client.php index 2f237c9..273e209 100644 --- a/src/Queue/Client.php +++ b/src/Queue/Client.php @@ -34,20 +34,24 @@ public function retry(int $limit = null): void while (true) { $pid = $this->connection->rightPop("{$this->namespace}.failed.{$this->queue}", 5); + // No more jobs to retry if ($pid === false) { break; } $job = $this->getJob($pid); + // Job doesn't exist if ($job === false) { break; } + // Job was already retried if ($job->getTimestamp() >= $start) { break; } + // We're reached the max amount of jobs to retry if ($limit !== null && $processed >= $limit) { break; } From 49194d02453dd5302f2bea791f4f793890763859 Mon Sep 17 00:00:00 2001 From: Bradley Schofield Date: Tue, 16 Jan 2024 11:01:21 +0000 Subject: [PATCH 12/15] Update Client.php --- src/Queue/Client.php | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/Queue/Client.php b/src/Queue/Client.php index 273e209..f2872ed 100644 --- a/src/Queue/Client.php +++ b/src/Queue/Client.php @@ -26,6 +26,10 @@ public function enqueue(array $payload): bool return $this->connection->leftPushArray("{$this->namespace}.queue.{$this->queue}", $payload); } + /** + * This function will take jobs from the failed queue and re-enqueue them. + * the limit param will limit the amount of jobs to retry, if not set it will retry all jobs. + */ public function retry(int $limit = null): void { $start = \time(); From 07f9ce945cd459a306bc367abe72ac14f3cdcf91 Mon Sep 17 00:00:00 2001 From: Bradley Schofield Date: Wed, 17 Jan 2024 11:06:19 +0000 Subject: [PATCH 13/15] Update sum methods to count methods --- src/Queue/Client.php | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Queue/Client.php b/src/Queue/Client.php index f2872ed..1ff9ab7 100644 --- a/src/Queue/Client.php +++ b/src/Queue/Client.php @@ -88,22 +88,22 @@ public function getQueueSize(): int return $this->connection->listSize("{$this->namespace}.queue.{$this->queue}"); } - public function sumTotalJobs(): int + public function countTotalJobs(): int { return (int)($this->connection->get("{$this->namespace}.stats.{$this->queue}.total") ?? 0); } - public function sumSuccessfulJobs(): int + public function countSuccessfulJobs(): int { return (int)($this->connection->get("{$this->namespace}.stats.{$this->queue}.success") ?? 0); } - public function sumFailedJobs(): int + public function countFailedJobs(): int { return (int)($this->connection->get("{$this->namespace}.stats.{$this->queue}.failed") ?? 0); } - public function sumProcessingJobs(): int + public function countProcessingJobs(): int { return (int)($this->connection->get("{$this->namespace}.stats.{$this->queue}.processing") ?? 0); } From 88f2b402cd421e771ec395a8c17e1306f9658d56 Mon Sep 17 00:00:00 2001 From: Bradley Schofield Date: Wed, 17 Jan 2024 12:33:06 +0000 Subject: [PATCH 14/15] Update tests and docblock --- src/Queue/Client.php | 4 +-- tests/Queue/E2E/Adapter/Base.php | 43 +++++++++++++++----------------- 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/src/Queue/Client.php b/src/Queue/Client.php index 1ff9ab7..ac55bf8 100644 --- a/src/Queue/Client.php +++ b/src/Queue/Client.php @@ -27,8 +27,8 @@ public function enqueue(array $payload): bool } /** - * This function will take jobs from the failed queue and re-enqueue them. - * the limit param will limit the amount of jobs to retry, if not set it will retry all jobs. + * Take all jobs from the failed queue and re-enqueue them. + * @param int|null $limit The amount of jobs to retry */ public function retry(int $limit = null): void { diff --git a/tests/Queue/E2E/Adapter/Base.php b/tests/Queue/E2E/Adapter/Base.php index b9718f5..8b6f49e 100644 --- a/tests/Queue/E2E/Adapter/Base.php +++ b/tests/Queue/E2E/Adapter/Base.php @@ -51,9 +51,6 @@ public function setUp(): void 'null' => null ] ]; - $this->payloads[] = [ - 'type' => 'test_exception' - ]; } /** @@ -72,11 +69,11 @@ public function testEvents(): void sleep(1); - $this->assertEquals(8, $client->sumTotalJobs()); + $this->assertEquals(7, $client->countTotalJobs()); $this->assertEquals(0, $client->getQueueSize()); - $this->assertEquals(0, $client->sumProcessingJobs()); - $this->assertEquals(1, $client->sumFailedJobs()); - $this->assertEquals(7, $client->sumSuccessfulJobs()); + $this->assertEquals(0, $client->countProcessingJobs()); + $this->assertEquals(0, $client->countFailedJobs()); + $this->assertEquals(7, $client->countSuccessfulJobs()); } protected function testConcurrency(): void @@ -92,10 +89,10 @@ protected function testConcurrency(): void sleep(1); - $this->assertEquals(8, $client->sumTotalJobs()); - $this->assertEquals(0, $client->sumProcessingJobs()); - $this->assertEquals(1, $client->sumFailedJobs()); - $this->assertEquals(7, $client->sumSuccessfulJobs()); + $this->assertEquals(7, $client->countTotalJobs()); + $this->assertEquals(0, $client->countProcessingJobs()); + $this->assertEquals(0, $client->countFailedJobs()); + $this->assertEquals(7, $client->countSuccessfulJobs()); }); }); } @@ -127,10 +124,10 @@ public function testRetry(): void sleep(1); - $this->assertEquals(4, $client->sumTotalJobs()); - $this->assertEquals(0, $client->sumProcessingJobs()); - $this->assertEquals(4, $client->sumFailedJobs()); - $this->assertEquals(0, $client->sumSuccessfulJobs()); + $this->assertEquals(4, $client->countTotalJobs()); + $this->assertEquals(0, $client->countProcessingJobs()); + $this->assertEquals(4, $client->countFailedJobs()); + $this->assertEquals(0, $client->countSuccessfulJobs()); $client->resetStats(); @@ -140,10 +137,10 @@ public function testRetry(): void // Retry will retry ALL failed jobs regardless of if they are still tracked in stats // Meaning this test has 5 failed jobs due to the previous tests. - $this->assertEquals(5, $client->sumTotalJobs()); - $this->assertEquals(0, $client->sumProcessingJobs()); - $this->assertEquals(5, $client->sumFailedJobs()); - $this->assertEquals(0, $client->sumSuccessfulJobs()); + $this->assertEquals(4, $client->countTotalJobs()); + $this->assertEquals(0, $client->countProcessingJobs()); + $this->assertEquals(4, $client->countFailedJobs()); + $this->assertEquals(0, $client->countSuccessfulJobs()); $client->resetStats(); @@ -151,9 +148,9 @@ public function testRetry(): void sleep(1); - $this->assertEquals(2, $client->sumTotalJobs()); - $this->assertEquals(0, $client->sumProcessingJobs()); - $this->assertEquals(2, $client->sumFailedJobs()); - $this->assertEquals(0, $client->sumSuccessfulJobs()); + $this->assertEquals(2, $client->countTotalJobs()); + $this->assertEquals(0, $client->countProcessingJobs()); + $this->assertEquals(2, $client->countFailedJobs()); + $this->assertEquals(0, $client->countSuccessfulJobs()); } } From f42207a971fc5146d1de890bf09da5ae3686a7cb Mon Sep 17 00:00:00 2001 From: Christy Jacob Date: Wed, 17 Jan 2024 16:34:28 +0400 Subject: [PATCH 15/15] Update tests/Queue/E2E/Adapter/Base.php --- tests/Queue/E2E/Adapter/Base.php | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/Queue/E2E/Adapter/Base.php b/tests/Queue/E2E/Adapter/Base.php index 8b6f49e..ab5b6f1 100644 --- a/tests/Queue/E2E/Adapter/Base.php +++ b/tests/Queue/E2E/Adapter/Base.php @@ -136,7 +136,6 @@ public function testRetry(): void sleep(1); // Retry will retry ALL failed jobs regardless of if they are still tracked in stats - // Meaning this test has 5 failed jobs due to the previous tests. $this->assertEquals(4, $client->countTotalJobs()); $this->assertEquals(0, $client->countProcessingJobs()); $this->assertEquals(4, $client->countFailedJobs());