diff --git a/src/Console/stubs/model/queue.stub b/src/Console/stubs/model/queue.stub index 845302cd..066d2c40 100644 --- a/src/Console/stubs/model/queue.stub +++ b/src/Console/stubs/model/queue.stub @@ -11,10 +11,17 @@ class {className} extends Migration public function up(): void { $this->create("queues", function (SQLGenerator $table) { - $table->addColumn('id', 'string', ['primary' => true]); - $table->addColumn('time', 'timestamp'); - $table->addColumn('data', 'text'); - $table->addColumn('ip', 'string'); + $table->addString('id', ["primary" => true]); + $table->addString('queue'); + $table->addText('payload'); + $table->addInteger('attempts', ["default" => 3]); + $table->addEnum('status', [ + "size" => ["waiting", "processing", "reserved", "failed", "done"], + "default" => "waiting", + ]); + $table->addDatetime('avalaibled_at'); + $table->addDatetime('reserved_at', ["nullable" => true, "default" => null]); + $table->addDatetime('created_at'); }); } @@ -23,6 +30,6 @@ class {className} extends Migration */ public function rollback(): void { - $this->dropIfExists("sessions"); + $this->dropIfExists("queues"); } } diff --git a/src/Queue/Adapters/BeanstalkdAdapter.php b/src/Queue/Adapters/BeanstalkdAdapter.php index fc95f7d7..d0bb2c10 100644 --- a/src/Queue/Adapters/BeanstalkdAdapter.php +++ b/src/Queue/Adapters/BeanstalkdAdapter.php @@ -19,27 +19,6 @@ class BeanstalkdAdapter extends QueueAdapter */ private Pheanstalk $pheanstalk; - /** - * Determine the default watch name - * - * @var string - */ - private string $queue = "default"; - - /** - * The number of working attempts - * - * @var int - */ - private int $tries; - - /** - * Define the sleep time - * - * @var int - */ - private int $sleep = 5; - /** * Configure Beanstalkd driver * diff --git a/src/Queue/Adapters/DatabaseAdapter.php b/src/Queue/Adapters/DatabaseAdapter.php index 15dbee0f..f4264b4b 100644 --- a/src/Queue/Adapters/DatabaseAdapter.php +++ b/src/Queue/Adapters/DatabaseAdapter.php @@ -5,7 +5,6 @@ use Bow\Database\Database; use Bow\Database\QueryBuilder; use Bow\Queue\ProducerService; -use RuntimeException; class DatabaseAdapter extends QueueAdapter { @@ -16,27 +15,6 @@ class DatabaseAdapter extends QueueAdapter */ private QueryBuilder $table; - /** - * Determine the default watch name - * - * @var string - */ - private string $queue = "default"; - - /** - * The number of working attempts - * - * @var int - */ - private int $tries; - - /** - * Define the sleep time - * - * @var int - */ - private int $sleep = 5; - /** * Configure Beanstalkd driver * @@ -50,50 +28,6 @@ public function configure(array $queue): DatabaseAdapter return $this; } - /** - * Get connexion - * - * @param string $name - * @return void - */ - public function setWatch(string $name): void - { - $this->queue = $name; - } - - /** - * Set job tries - * - * @param int $tries - * @return void - */ - public function setTries(int $tries): void - { - $this->tries = $tries; - } - - /** - * Set sleep time - * - * @param int $sleep - * @return void - */ - public function setSleep(int $sleep): void - { - $this->sleep = $sleep; - } - - /** - * Get the queue or return the default. - * - * @param ?string $queue - * @return string - */ - public function getQueue(?string $queue = null): string - { - return $queue ?: $this->queue; - } - /** * Get the size of the queue. * @@ -116,12 +50,12 @@ public function size(?string $queue = null): int public function push(ProducerService $producer): void { $this->table->insert([ - "id" => $producer->getId(), + "id" => $this->generateId(), "queue" => $this->getQueue(), - "payload" => $this->serializeProducer($producer), - "attempts" => $producer->getRetry(), - "status" => "pending", - "available_at" => date("Y-m-d H:i:s", time() + $producer->getDelay()), + "payload" => base64_encode($this->serializeProducer($producer)), + "attempts" => $this->tries, + "status" => "waiting", + "avalaibled_at" => date("Y-m-d H:i:s", time() + $producer->getDelay()), "reserved_at" => null, "created_at" => date("Y-m-d H:i:s"), ]); @@ -139,7 +73,7 @@ public function run(string $queue = null): void $queue = $this->getQueue($queue); $queues = $this->table ->where("queue", $queue) - ->whereIn("status", ["pending", "reserved"]) + ->whereIn("status", ["waiting", "reserved"]) ->get(); if (count($queues) == 0) { @@ -149,29 +83,35 @@ public function run(string $queue = null): void foreach ($queues as $job) { try { - $producer = $this->unserializeProducer($job->payload); - $delay = $producer->getDelay(); - if ($job->delay == 0) { + $producer = $this->unserializeProducer(base64_decode($job->payload)); + if (strtotime($job->avalaibled_at) >= time()) { + if (!is_null($job->reserved_at) && strtotime($job->reserved_at) < time()) { + continue; + } + $this->table->where("id", $job->id)->update([ + "status" => "processing", + ]); $this->execute($producer, $job); continue; } - $execute_time = time() + $job->delay; - if ($execute_time >= time()) { - $this->execute($producer, $job); - } } catch (\Exception $e) { error_log($e->getMessage()); app('logger')->error($e->getMessage(), $e->getTrace()); cache("failed:job:" . $job->id, $job->payload); - if ($producer->jobShouldBeDelete() || $job->retry <= 0) { + if (!isset($producer)) { + $this->sleep(1); + continue; + } + if ($producer->jobShouldBeDelete() || $job->attempts <= 0) { $this->table->where("id", $job->id)->delete(); $this->sleep(1); continue; } $this->table->where("id", $job->id)->update([ "status" => "reserved", - "retry" => $job->tries - 1, - 'delay' => $delay + "attempts" => $job->attempts - 1, + "avalaibled_at" => date("Y-m-d H:i:s", time() + $producer->getDelay()), + "reserved_at" => date("Y-m-d H:i:s", time() + $producer->getRetry()) ]); $this->sleep(1); } @@ -188,8 +128,23 @@ private function execute(ProducerService $producer, mixed $job) { call_user_func([$producer, "process"]); $this->table->where("id", $job->id)->update([ - "status" => "processed" + "status" => "done" ]); sleep($this->sleep ?? 5); } + + /** + * Flush the queue table + * + * @param ?string $queue + * @return void + */ + public function flush(?string $queue = null): void + { + if (is_null($queue)) { + $this->table->truncate(); + } else { + $this->table->where("queue", $queue)->delete(); + } + } } diff --git a/src/Queue/Adapters/QueueAdapter.php b/src/Queue/Adapters/QueueAdapter.php index 5e203c78..0a1aa8c4 100644 --- a/src/Queue/Adapters/QueueAdapter.php +++ b/src/Queue/Adapters/QueueAdapter.php @@ -17,21 +17,28 @@ abstract class QueueAdapter * * @var int */ - private int $start_time; + protected int $start_time; /** - * Determine if the worker should quit + * Determine the default watch name * - * @var bool + * @var string */ - protected bool $should_quit = false; + protected string $queue = "default"; /** - * Determine if the worker is paused + * The number of working attempts * - * @var bool + * @var int + */ + protected int $tries = 3; + + /** + * Define the sleep time + * + * @var int */ - protected bool $paused = false; + protected int $sleep = 5; /** * Create producer serialization @@ -160,34 +167,57 @@ protected function supportsAsyncSignals() return extension_loaded('pcntl'); } + /** - * Make adapter configuration + * Set job tries * - * @param array $config - * @return QueueAdapter + * @param int $tries + * @return void */ - abstract public function configure(array $config): QueueAdapter; + public function setTries(int $tries): void + { + $this->tries = $tries; + } /** - * Watch the the queue name + * Set sleep time * - * @param string $queue + * @param int $sleep + * @return void */ - abstract public function setWatch(string $queue): void; + public function setSleep(int $sleep): void + { + $this->sleep = $sleep; + } /** - * Set the tries value + * Get the queue or return the default. * - * @param int $tries + * @param ?string $queue + * @return string */ - abstract public function setTries(int $tries): void; + public function getQueue(?string $queue = null): string + { + return $queue ?: $this->queue; + } /** - * Set the sleep value + * Generate the job id * - * @param int $sleep + * @return string + */ + public function generateId(): string + { + return sha1(uniqid(str_shuffle("abcdefghijklmnopqrstuvwxyz0123456789"), true)); + } + + /** + * Make adapter configuration + * + * @param array $config + * @return QueueAdapter */ - abstract public function setSleep(int $sleep): void; + abstract public function configure(array $config): QueueAdapter; /** * Push new producer @@ -204,14 +234,6 @@ abstract public function push(ProducerService $producer): void; */ abstract public function size(string $queue): int; - /** - * Get the queue or return the default. - * - * @param ?string $queue - * @return string - */ - abstract public function getQueue(?string $queue = null): string; - /** * Start the worker server * diff --git a/src/Queue/Adapters/SQSAdapter.php b/src/Queue/Adapters/SQSAdapter.php index c0e00e92..cbb4ef22 100644 --- a/src/Queue/Adapters/SQSAdapter.php +++ b/src/Queue/Adapters/SQSAdapter.php @@ -24,27 +24,6 @@ class SQSAdapter extends QueueAdapter */ private array $config = []; - /** - * Determine the default watch name - * - * @var string - */ - private string $queue = "default"; - - /** - * The number of working attempts - * - * @var int - */ - private int $tries; - - /** - * Define the sleep time - * - * @var int - */ - private int $sleep = 5; - /** * Configure the queue. * @@ -64,61 +43,6 @@ public function configure(array $config): QueueAdapter return $this; } - /** - * Set the watch queue. - * - * @param string $queue - * @return void - */ - public function setWatch(string $queue): void - { - $this->queue = $queue; - } - - /** - * Set the number of times to attempt a job. - * - * @param int $tries - * @return void - */ - public function setTries(int $tries): void - { - $this->tries = $tries; - } - - /** - * Set the number of seconds to sleep between jobs. - * - * @param int $sleep - * @return void - */ - public function setSleep(int $sleep): void - { - $this->sleep = $sleep; - } - - /** - * Get the queue or return the default. - * - * @param ?string $queue - * @return string - */ - public function getQueue(?string $queue = null): string - { - return $queue ?: $this->queue; - } - - /** - * Set the number of seconds to wait before retrying a job. - * - * @param int $retry - * @return void - */ - public function setRetries(int $tries) - { - $this->tries = $tries; - } - /** * Push a job onto the queue. * @@ -134,6 +58,10 @@ public function push(ProducerService $producer): void 'DataType' => "String", 'StringValue' => get_class($producer) ], + "Id" => [ + "DataType" => "String", + "StringValue" => $this->generateId(), + ] ], 'MessageBody' => base64_encode($this->serializeProducer($producer)), 'QueueUrl' => $this->config["url"] @@ -229,6 +157,5 @@ public function run(?string $queue = null): void */ public function flush(?string $queue = null): void { - } } diff --git a/src/Queue/Connection.php b/src/Queue/Connection.php index 979d4410..65ceff42 100644 --- a/src/Queue/Connection.php +++ b/src/Queue/Connection.php @@ -6,6 +6,7 @@ use Bow\Queue\Adapters\QueueAdapter; use Bow\Queue\Adapters\BeanstalkdAdapter; +use Bow\Queue\Adapters\DatabaseAdapter; use Bow\Queue\Adapters\SQSAdapter; use ErrorException; @@ -33,6 +34,7 @@ class Connection private static array $connections = [ "beanstalkd" => BeanstalkdAdapter::class, "sqs" => SQSAdapter::class, + "database" => DatabaseAdapter::class, ]; /** diff --git a/src/Queue/ProducerService.php b/src/Queue/ProducerService.php index 18ff747d..31941a97 100644 --- a/src/Queue/ProducerService.php +++ b/src/Queue/ProducerService.php @@ -50,7 +50,14 @@ abstract class ProducerService * * @return integer */ - protected string $id; + protected ?string $id = null; + + /** + * Define the job attempts + * + * @return integer + */ + protected int $attemps = 2; /** * ProducerService constructor @@ -82,6 +89,16 @@ public function getId(): string return $this->id; } + /** + * Get the producer attemps + * + * @return int + */ + public function getAttemps(): int + { + return $this->attemps; + } + /** * Get the producer retry * diff --git a/tests/Config/stubs/queue.php b/tests/Config/stubs/queue.php index b65f6cf2..36fcc733 100644 --- a/tests/Config/stubs/queue.php +++ b/tests/Config/stubs/queue.php @@ -39,7 +39,7 @@ 'secret' => getenv('AWS_SECRET'), ], ], - + /** * The sqs connexion */ diff --git a/tests/Queue/QueueTest.php b/tests/Queue/QueueTest.php index 7e4261dd..47a29f0e 100644 --- a/tests/Queue/QueueTest.php +++ b/tests/Queue/QueueTest.php @@ -3,13 +3,13 @@ namespace Bow\Tests\Queue; use Bow\Cache\Adapter\RedisAdapter; -use Bow\Cache\Cache; use Bow\Cache\CacheConfiguration; use Bow\Configuration\EnvConfiguration; use Bow\Configuration\LoggerConfiguration; use Bow\Database\Database; use Bow\Database\DatabaseConfiguration; use Bow\Queue\Adapters\BeanstalkdAdapter; +use Bow\Queue\Adapters\DatabaseAdapter; use Bow\Queue\Adapters\SQSAdapter; use Bow\Tests\Config\TestingConfiguration; use Bow\Tests\Queue\Stubs\PetModelStub; @@ -39,6 +39,15 @@ public static function setUpBeforeClass(): void Database::connection('mysql'); Database::statement('drop table if exists pets'); Database::statement('create table pets (id int primary key auto_increment, name varchar(255))'); + Database::statement('create table if not exists queues ( + id int primary key auto_increment, + queue varchar(255), + payload text, + status varchar(100), + attempts int, + reserved_at datetime null default null, + created_at datetime + )'); } /** @@ -117,7 +126,7 @@ public function getConnection(): array return [ ["beanstalkd"], ["sqs"], - // ["database"], + ["database"], // ["redis"], // ["rabbitmq"], ];