Skip to content

Commit

Permalink
Integration of database adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
papac committed Sep 22, 2023
1 parent 8bc13ff commit b06c964
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 218 deletions.
17 changes: 12 additions & 5 deletions src/Console/stubs/model/queue.stub
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,17 @@ class {className} extends Migration
public function up(): void
{
$this->create("queues", function (SQLGenerator $table) {
$table->addColumn('id', 'string', ['primary' => true]);
$table->addColumn('time', 'timestamp');
$table->addColumn('data', 'text');
$table->addColumn('ip', 'string');
$table->addString('id', ["primary" => true]);
$table->addString('queue');
$table->addText('payload');
$table->addInteger('attempts', ["default" => 3]);
$table->addEnum('status', [
"size" => ["waiting", "processing", "reserved", "failed", "done"],
"default" => "waiting",
]);
$table->addDatetime('avalaibled_at');
$table->addDatetime('reserved_at', ["nullable" => true, "default" => null]);
$table->addDatetime('created_at');
});
}

Expand All @@ -23,6 +30,6 @@ class {className} extends Migration
*/
public function rollback(): void
{
$this->dropIfExists("sessions");
$this->dropIfExists("queues");
}
}
21 changes: 0 additions & 21 deletions src/Queue/Adapters/BeanstalkdAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,6 @@ class BeanstalkdAdapter extends QueueAdapter
*/
private Pheanstalk $pheanstalk;

/**
* Determine the default watch name
*
* @var string
*/
private string $queue = "default";

/**
* The number of working attempts
*
* @var int
*/
private int $tries;

/**
* Define the sleep time
*
* @var int
*/
private int $sleep = 5;

/**
* Configure Beanstalkd driver
*
Expand Down
121 changes: 38 additions & 83 deletions src/Queue/Adapters/DatabaseAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
use Bow\Database\Database;
use Bow\Database\QueryBuilder;
use Bow\Queue\ProducerService;
use RuntimeException;

class DatabaseAdapter extends QueueAdapter
{
Expand All @@ -16,27 +15,6 @@ class DatabaseAdapter extends QueueAdapter
*/
private QueryBuilder $table;

/**
* Determine the default watch name
*
* @var string
*/
private string $queue = "default";

/**
* The number of working attempts
*
* @var int
*/
private int $tries;

/**
* Define the sleep time
*
* @var int
*/
private int $sleep = 5;

/**
* Configure Beanstalkd driver
*
Expand All @@ -50,50 +28,6 @@ public function configure(array $queue): DatabaseAdapter
return $this;
}

/**
* Get connexion
*
* @param string $name
* @return void
*/
public function setWatch(string $name): void
{
$this->queue = $name;
}

/**
* Set job tries
*
* @param int $tries
* @return void
*/
public function setTries(int $tries): void
{
$this->tries = $tries;
}

/**
* Set sleep time
*
* @param int $sleep
* @return void
*/
public function setSleep(int $sleep): void
{
$this->sleep = $sleep;
}

/**
* Get the queue or return the default.
*
* @param ?string $queue
* @return string
*/
public function getQueue(?string $queue = null): string
{
return $queue ?: $this->queue;
}

/**
* Get the size of the queue.
*
Expand All @@ -116,12 +50,12 @@ public function size(?string $queue = null): int
public function push(ProducerService $producer): void
{
$this->table->insert([
"id" => $producer->getId(),
"id" => $this->generateId(),
"queue" => $this->getQueue(),
"payload" => $this->serializeProducer($producer),
"attempts" => $producer->getRetry(),
"status" => "pending",
"available_at" => date("Y-m-d H:i:s", time() + $producer->getDelay()),
"payload" => base64_encode($this->serializeProducer($producer)),
"attempts" => $this->tries,
"status" => "waiting",
"avalaibled_at" => date("Y-m-d H:i:s", time() + $producer->getDelay()),
"reserved_at" => null,
"created_at" => date("Y-m-d H:i:s"),
]);
Expand All @@ -139,7 +73,7 @@ public function run(string $queue = null): void
$queue = $this->getQueue($queue);
$queues = $this->table
->where("queue", $queue)
->whereIn("status", ["pending", "reserved"])
->whereIn("status", ["waiting", "reserved"])
->get();

if (count($queues) == 0) {
Expand All @@ -149,29 +83,35 @@ public function run(string $queue = null): void

foreach ($queues as $job) {
try {
$producer = $this->unserializeProducer($job->payload);
$delay = $producer->getDelay();
if ($job->delay == 0) {
$producer = $this->unserializeProducer(base64_decode($job->payload));
if (strtotime($job->avalaibled_at) >= time()) {
if (!is_null($job->reserved_at) && strtotime($job->reserved_at) < time()) {
continue;
}
$this->table->where("id", $job->id)->update([
"status" => "processing",
]);
$this->execute($producer, $job);
continue;
}
$execute_time = time() + $job->delay;
if ($execute_time >= time()) {
$this->execute($producer, $job);
}
} catch (\Exception $e) {
error_log($e->getMessage());
app('logger')->error($e->getMessage(), $e->getTrace());
cache("failed:job:" . $job->id, $job->payload);
if ($producer->jobShouldBeDelete() || $job->retry <= 0) {
if (!isset($producer)) {
$this->sleep(1);
continue;
}
if ($producer->jobShouldBeDelete() || $job->attempts <= 0) {
$this->table->where("id", $job->id)->delete();
$this->sleep(1);
continue;
}
$this->table->where("id", $job->id)->update([
"status" => "reserved",
"retry" => $job->tries - 1,
'delay' => $delay
"attempts" => $job->attempts - 1,
"avalaibled_at" => date("Y-m-d H:i:s", time() + $producer->getDelay()),
"reserved_at" => date("Y-m-d H:i:s", time() + $producer->getRetry())
]);
$this->sleep(1);
}
Expand All @@ -188,8 +128,23 @@ private function execute(ProducerService $producer, mixed $job)
{
call_user_func([$producer, "process"]);
$this->table->where("id", $job->id)->update([
"status" => "processed"
"status" => "done"
]);
sleep($this->sleep ?? 5);
}

/**
* Flush the queue table
*
* @param ?string $queue
* @return void
*/
public function flush(?string $queue = null): void
{
if (is_null($queue)) {
$this->table->truncate();
} else {
$this->table->where("queue", $queue)->delete();
}
}
}
78 changes: 50 additions & 28 deletions src/Queue/Adapters/QueueAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,28 @@ abstract class QueueAdapter
*
* @var int
*/
private int $start_time;
protected int $start_time;

/**
* Determine if the worker should quit
* Determine the default watch name
*
* @var bool
* @var string
*/
protected bool $should_quit = false;
protected string $queue = "default";

/**
* Determine if the worker is paused
* The number of working attempts
*
* @var bool
* @var int
*/
protected int $tries = 3;

/**
* Define the sleep time
*
* @var int
*/
protected bool $paused = false;
protected int $sleep = 5;

/**
* Create producer serialization
Expand Down Expand Up @@ -160,34 +167,57 @@ protected function supportsAsyncSignals()
return extension_loaded('pcntl');
}


/**
* Make adapter configuration
* Set job tries
*
* @param array $config
* @return QueueAdapter
* @param int $tries
* @return void
*/
abstract public function configure(array $config): QueueAdapter;
public function setTries(int $tries): void
{
$this->tries = $tries;
}

/**
* Watch the the queue name
* Set sleep time
*
* @param string $queue
* @param int $sleep
* @return void
*/
abstract public function setWatch(string $queue): void;
public function setSleep(int $sleep): void
{
$this->sleep = $sleep;
}

/**
* Set the tries value
* Get the queue or return the default.
*
* @param int $tries
* @param ?string $queue
* @return string
*/
abstract public function setTries(int $tries): void;
public function getQueue(?string $queue = null): string
{
return $queue ?: $this->queue;
}

/**
* Set the sleep value
* Generate the job id
*
* @param int $sleep
* @return string
*/
public function generateId(): string
{
return sha1(uniqid(str_shuffle("abcdefghijklmnopqrstuvwxyz0123456789"), true));
}

/**
* Make adapter configuration
*
* @param array $config
* @return QueueAdapter
*/
abstract public function setSleep(int $sleep): void;
abstract public function configure(array $config): QueueAdapter;

/**
* Push new producer
Expand All @@ -204,14 +234,6 @@ abstract public function push(ProducerService $producer): void;
*/
abstract public function size(string $queue): int;

/**
* Get the queue or return the default.
*
* @param ?string $queue
* @return string
*/
abstract public function getQueue(?string $queue = null): string;

/**
* Start the worker server
*
Expand Down
Loading

0 comments on commit b06c964

Please sign in to comment.