Skip to content

Commit

Permalink
Adding a new default job worker that can execute queue jobs via class…
Browse files Browse the repository at this point in the history
…es added to subject
  • Loading branch information
marcelfolaron committed Aug 23, 2024
1 parent 7cb0199 commit c64f09c
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 5 deletions.
23 changes: 21 additions & 2 deletions app/Domain/Queue/Services/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Leantime\Core\Mailer as MailerCore;
use Leantime\Core\Language as LanguageCore;
use Leantime\Domain\Queue\Repositories\Queue as QueueRepository;
use Leantime\Domain\Queue\Workers\DefaultWorker;
use Leantime\Domain\Queue\Workers\EmailWorker;
use Leantime\Domain\Queue\Workers\HttpRequestWorker;
use Leantime\Domain\Queue\Workers\Workers;
Expand Down Expand Up @@ -73,21 +74,39 @@ public function processQueue(Workers $worker): bool
$worker->handleQueue($messages);
}

if($worker == Workers::DEFAULT){
$worker = app()->make(DefaultWorker::class);
$worker->handleQueue($messages);
}

return true;
}


public function addToQueue(Workers $channel, string $subject, string $message, $projectId) {


return $this->queue->addMessageToQueue(
return $this->queue->addMessageToQueue(
channel: $channel,
subject: $subject,
message: $message,
projectId: $projectId,
userId: session("userdata.id"));

}

public static function addJob(Workers $channel, string $subject, mixed $message, ?int $userId = null, ?int $projectId = null) {

$queue = app()->make(QueueRepository::class);

return $queue->addMessageToQueue(
channel: $channel,
subject: $subject,
message: serialize($message),
projectId: $projectId ?? session('currentProject'),
userId:$userId ?? session('userdata.id')
);

}
}

}
48 changes: 48 additions & 0 deletions app/Domain/Queue/Workers/DefaultWorker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?php

namespace Leantime\Domain\Queue\Workers;

use GuzzleHttp\Exception\GuzzleException;
use Leantime\Core\Mailer;
use Leantime\Domain\Queue\Repositories\Queue;
use Leantime\Domain\Setting\Repositories\Setting;
use Leantime\Domain\Users\Repositories\Users;
use GuzzleHttp\Client;
use PHPUnit\Exception;

class DefaultWorker
{
public function __construct(
private Users $userRepo,
private Setting $settingsRepo,
private Queue $queue,
private Client $client
) {
}

public function handleQueue($messages)
{


foreach ($messages as $message) {
try {
$payload = unserialize($message['message']);
$subjectClass = $message['subject'];

$jobClass = app()->make($subjectClass);

$result = $jobClass->handle($payload);

if ($result) {
$this->queue->deleteMessageInQueue($message['msghash']);
return true;
}

} catch (Exception $e) {
error_log($e);
}

return false;
}
}
}
4 changes: 1 addition & 3 deletions app/Domain/Queue/Workers/HttpRequestWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ public function handleQueue($messages) {
$messageArray
);

if($response->getStatusCode() == 200){
$this->queue->deleteMessageInQueue($request['msghash']);
}
$this->queue->deleteMessageInQueue($request['msghash']);

} catch (GuzzleException $e) {
report($e);
Expand Down
2 changes: 2 additions & 0 deletions app/Domain/Queue/Workers/Workers.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ enum Workers: string
{
case EMAILS = "email";
case HTTPREQUESTS = "httprequests";

case DEFAULT = "default";
}
4 changes: 4 additions & 0 deletions app/Domain/Queue/register.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,8 @@
$scheduler
->call(fn () => app()->make(Services\Queue::class)->processQueue(Workers::HTTPREQUESTS))
->everyFiveMinutes();

$scheduler
->call(fn () => app()->make(Services\Queue::class)->processQueue(Workers::DEFAULT))
->everyFiveMinutes();
});

0 comments on commit c64f09c

Please sign in to comment.