Skip to content

Commit

Permalink
Queue mechanism to stop processing jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
ani2amigos committed Mar 22, 2024
1 parent c8b28e7 commit 837afd9
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 5 deletions.
25 changes: 25 additions & 0 deletions src/Queue/Backend/Beanstalkd/BeanstalkdQueueStoreAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,29 @@ protected function createPayload(MailJobInterface $mailJob)
'message' => $mailJob->getMessage(),
]);
}

public function removeFailedJobs()
{
try {
$pheanstalk = $this->getConnection()->getInstance()->watch($this->queueName);
while (true) {
$job = $pheanstalk->reserveWithTimeout($this->reserveTimeout);

if (!$job) {
break;
}

$jobData = json_decode($job->getData(), true);

if($jobData['attempt'] >= $_ENV['MAX_ATTEMPTS_DEFAULT']){
$pheanstalk->delete($job);
}

}

return true;
} catch (\Exception $exception) {
return false;
}
}
}
15 changes: 15 additions & 0 deletions src/Queue/Backend/Pdo/PdoQueueStoreAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,19 @@ public function isEmpty()
$query->execute();
return intval($query->fetchColumn(0)) === 0;
}

public function removeFailedJobs()
{
try {
$sqlText = 'DELETE FROM mail_queue WHERE attempt >= :max_attempt;';
$sql = sprintf($sqlText, $this->tableName);
$query = $this->getConnection()->getInstance()->prepare($sql);
$query->bindValue(':max_attempt', $_ENV['MAX_ATTEMPTS_DEFAULT']);
$query->execute();

return true;
} catch (\Exception $exception) {
return false;
}
}
}
2 changes: 2 additions & 0 deletions src/Queue/Backend/QueueStoreAdapterInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ public function ack(MailJobInterface $mailJob);
* @return bool
*/
public function isEmpty();

// public function removeFailedJobs();
}
20 changes: 20 additions & 0 deletions src/Queue/Backend/RabbitMq/RabbitMqQueueStoreAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,24 @@ protected function createPayload(MailJobInterface $mailJob)
'delivery_tag' => null,
]);
}

public function removeFailedJobs()
{
try {
/** @var AMQPChannel $channel */
$channel = $this->getConnection()->getInstance();

while ($message = $channel->basic_get($this->queueName, false)) {
$body = json_decode($message->body, true);
if ($body['attempt'] >= $_ENV['MAX_ATTEMPTS_DEFAULT']) {

$channel->basic_ack($message->delivery_info['delivery_tag'], false);
}
}

return true;
} catch (\Exception $exception) {
return false;
}
}
}
17 changes: 17 additions & 0 deletions src/Queue/Backend/Redis/RedisQueueStoreAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -214,4 +214,21 @@ protected function pushExpiredJobsOntoNewQueue($transaction, $to, $jobs)
{
call_user_func_array([$transaction, 'rpush'], array_merge([$to], $jobs));
}

public function removeFailedJobs()
{
try {
$members = $this->getConnection()->getInstance()->zrange($this->queueName . ':reserved', 0, -1);
foreach($members as $member){
$memberValue = json_decode($member);
if ($memberValue->attempt >= $_ENV['MAX_ATTEMPTS_DEFAULT']){
$this->getConnection()->getInstance()->zrem($this->queueName . ':reserved', $member);
}
}

return true;
} catch (\Exception $exception) {
return false;
}
}
}
35 changes: 32 additions & 3 deletions src/Queue/Backend/Sqs/SqsQueueStoreAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ public function enqueue(MailJobInterface $mailJob)
{
$result = $this->getConnection()->getInstance()->sendMessage([
'QueueUrl' => $this->queueUrl,
'MessageBody' => $mailJob->getMessage(),
'MessageBody' => json_encode(['message' => $mailJob->getMessage(), 'attempt' => $mailJob->getAttempt()]),
'DelaySeconds' => $mailJob->getDelaySeconds(),
'Attempt' => $mailJob->getAttempt(),
]);

$messageId = $result['MessageId'];
return $messageId !== null && is_string($messageId);
}
Expand All @@ -91,7 +91,7 @@ public function dequeue()
'id' => $result['MessageId'],
'receiptHandle' => $result['ReceiptHandle'],
'message' => $result['Body'],
'attempt' => $result['Attempt'],
// 'attempt' => $result['Attempt'],
]);
}

Expand Down Expand Up @@ -135,4 +135,33 @@ public function isEmpty(): bool
]);
return $response['Attributes']['ApproximateNumberOfMessages'] === 0;
}

public function removeFailedJobs()
{
try {
do {
$result = $this->getConnection()->getInstance()->receiveMessage([
'QueueUrl' => $this->queueUrl,
'MaxNumberOfMessages' => 10,
]);

if(is_array($result->get('Messages'))){
foreach ($result->get('Messages') as $message) {
$messageBody = json_decode($message['Body']);
if(isset($messageBody->attempt) && $messageBody->attempt >= $_ENV['MAX_ATTEMPTS_DEFAULT']){
$this->getConnection()->getInstance()->deleteMessage([
'QueueUrl' => $this->queueUrl,
'ReceiptHandle' => $message['ReceiptHandle'],
]);
}
}
}

} while (is_array($result->get('Messages')));

return true;
} catch (\Exception $exception) {
return false;
}
}
}
6 changes: 4 additions & 2 deletions src/Queue/Backend/Sqs/SqsQueueStoreConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ public function connect()
$secret = $this->getConfigurationValue('secret');
$region = $this->getConfigurationValue('region');
$this->instance = new SqsClient([
'key' => $key,
'secret' => $secret,
'credentials' => [
'key' => $key,
'secret' => $secret,
],
'region' => $region,
]);
return $this;
Expand Down

0 comments on commit 837afd9

Please sign in to comment.