Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(REF) Queues - Expand docblocks. Extract SQL trait. #22681

Merged
merged 4 commits into from
Feb 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 2 additions & 92 deletions CRM/Queue/Queue/Sql.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
class CRM_Queue_Queue_Sql extends CRM_Queue_Queue {

use CRM_Queue_Queue_SqlTrait;

/**
* Create a reference to queue. After constructing the queue, one should
* usually call createQueue (if it's a new queue) or loadQueue (if it's
Expand All @@ -32,74 +34,6 @@ public function __construct($queueSpec) {
parent::__construct($queueSpec);
}

/**
* Perform any registation or resource-allocation for a new queue
*/
public function createQueue() {
// nothing to do -- just start CRUDing items in the appropriate table
}

/**
* Perform any loading or pre-fetch for an existing queue.
*/
public function loadQueue() {
// nothing to do -- just start CRUDing items in the appropriate table
}

/**
* Release any resources claimed by the queue (memory, DB rows, etc)
*/
public function deleteQueue() {
return CRM_Core_DAO::singleValueQuery("
DELETE FROM civicrm_queue_item
WHERE queue_name = %1
", [
1 => [$this->getName(), 'String'],
]);
}

/**
* Check if the queue exists.
*
* @return bool
*/
public function existsQueue() {
return ($this->numberOfItems() > 0);
}

/**
* Add a new item to the queue.
*
* @param mixed $data
* Serializable PHP object or array.
* @param array $options
* Queue-dependent options; for example, if this is a
* priority-queue, then $options might specify the item's priority.
*/
public function createItem($data, $options = []) {
$dao = new CRM_Queue_DAO_QueueItem();
$dao->queue_name = $this->getName();
$dao->submit_time = CRM_Utils_Time::getTime('YmdHis');
$dao->data = serialize($data);
$dao->weight = CRM_Utils_Array::value('weight', $options, 0);
$dao->save();
}

/**
* Determine number of items remaining in the queue.
*
* @return int
*/
public function numberOfItems() {
return CRM_Core_DAO::singleValueQuery("
SELECT count(*)
FROM civicrm_queue_item
WHERE queue_name = %1
", [
1 => [$this->getName(), 'String'],
]);
}

/**
* Get the next item.
*
Expand Down Expand Up @@ -185,28 +119,4 @@ public function stealItem($lease_time = 3600) {
}
}

/**
* Remove an item from the queue.
*
* @param CRM_Core_DAO $dao
* The item returned by claimItem.
*/
public function deleteItem($dao) {
$dao->delete();
}

/**
* Return an item that could not be processed.
*
* @param CRM_Core_DAO $dao
* The item returned by claimItem.
*/
public function releaseItem($dao) {
$sql = "UPDATE civicrm_queue_item SET release_time = NULL WHERE id = %1";
$params = [
1 => [$dao->id, 'Integer'],
];
CRM_Core_DAO::executeQuery($sql, $params);
}

}
96 changes: 2 additions & 94 deletions CRM/Queue/Queue/SqlParallel.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue {

use CRM_Queue_Queue_SqlTrait;

/**
* Create a reference to queue. After constructing the queue, one should
* usually call createQueue (if it's a new queue) or loadQueue (if it's
Expand All @@ -32,74 +34,6 @@ public function __construct($queueSpec) {
parent::__construct($queueSpec);
}

/**
* Perform any registation or resource-allocation for a new queue
*/
public function createQueue() {
// nothing to do -- just start CRUDing items in the appropriate table
}

/**
* Perform any loading or pre-fetch for an existing queue.
*/
public function loadQueue() {
// nothing to do -- just start CRUDing items in the appropriate table
}

/**
* Release any resources claimed by the queue (memory, DB rows, etc)
*/
public function deleteQueue() {
return CRM_Core_DAO::singleValueQuery("
DELETE FROM civicrm_queue_item
WHERE queue_name = %1
", [
1 => [$this->getName(), 'String'],
]);
}

/**
* Check if the queue exists.
*
* @return bool
*/
public function existsQueue() {
return ($this->numberOfItems() > 0);
}

/**
* Add a new item to the queue.
*
* @param mixed $data
* Serializable PHP object or array.
* @param array $options
* Queue-dependent options; for example, if this is a
* priority-queue, then $options might specify the item's priority.
*/
public function createItem($data, $options = []) {
$dao = new CRM_Queue_DAO_QueueItem();
$dao->queue_name = $this->getName();
$dao->submit_time = CRM_Utils_Time::getTime('YmdHis');
$dao->data = serialize($data);
$dao->weight = CRM_Utils_Array::value('weight', $options, 0);
$dao->save();
}

/**
* Determine number of items remaining in the queue.
*
* @return int
*/
public function numberOfItems() {
return CRM_Core_DAO::singleValueQuery("
SELECT count(*)
FROM civicrm_queue_item
WHERE queue_name = %1
", [
1 => [$this->getName(), 'String'],
]);
}

/**
* Get the next item.
*
Expand Down Expand Up @@ -182,30 +116,4 @@ public function stealItem($lease_time = 3600) {
}
}

/**
* Remove an item from the queue.
*
* @param CRM_Core_DAO $dao
* The item returned by claimItem.
*/
public function deleteItem($dao) {
$dao->delete();
$dao->free();
}

/**
* Return an item that could not be processed.
*
* @param CRM_Core_DAO $dao
* The item returned by claimItem.
*/
public function releaseItem($dao) {
$sql = "UPDATE civicrm_queue_item SET release_time = NULL WHERE id = %1";
$params = [
1 => [$dao->id, 'Integer'],
];
CRM_Core_DAO::executeQuery($sql, $params);
$dao->free();
}

}
112 changes: 112 additions & 0 deletions CRM/Queue/Queue/SqlTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
<?php

/*
+--------------------------------------------------------------------+
| Copyright CiviCRM LLC. All rights reserved. |
| |
| This work is published under the GNU AGPLv3 license with some |
| permitted exceptions and without any warranty. For full license |
| and copyright information, see https://civicrm.org/licensing |
+--------------------------------------------------------------------+
*/

/**
* Trait defines methods that are commonly used to implement a SQL-backed queue.
*/
trait CRM_Queue_Queue_SqlTrait {

/**
* Perform any registation or resource-allocation for a new queue
*/
public function createQueue() {
// nothing to do -- just start CRUDing items in the appropriate table
}

/**
* Perform any loading or pre-fetch for an existing queue.
*/
public function loadQueue() {
// nothing to do -- just start CRUDing items in the appropriate table
}

/**
* Release any resources claimed by the queue (memory, DB rows, etc)
*/
public function deleteQueue() {
return CRM_Core_DAO::singleValueQuery("
DELETE FROM civicrm_queue_item
WHERE queue_name = %1
", [
1 => [$this->getName(), 'String'],
]);
}

/**
* Check if the queue exists.
*
* @return bool
*/
public function existsQueue() {
return ($this->numberOfItems() > 0);
}

/**
* Determine number of items remaining in the queue.
*
* @return int
*/
public function numberOfItems() {
return CRM_Core_DAO::singleValueQuery("
SELECT count(*)
FROM civicrm_queue_item
WHERE queue_name = %1
", [
1 => [$this->getName(), 'String'],
]);
}

/**
* Add a new item to the queue.
*
* @param mixed $data
* Serializable PHP object or array.
* @param array $options
* Queue-dependent options; for example, if this is a
* priority-queue, then $options might specify the item's priority.
*/
public function createItem($data, $options = []) {
$dao = new CRM_Queue_DAO_QueueItem();
$dao->queue_name = $this->getName();
$dao->submit_time = CRM_Utils_Time::getTime('YmdHis');
$dao->data = serialize($data);
$dao->weight = CRM_Utils_Array::value('weight', $options, 0);
$dao->save();
}

/**
* Remove an item from the queue.
*
* @param CRM_Core_DAO|stdClass $dao
* The item returned by claimItem.
*/
public function deleteItem($dao) {
$dao->delete();
$dao->free();
}

/**
* Return an item that could not be processed.
*
* @param CRM_Core_DAO $dao
* The item returned by claimItem.
*/
public function releaseItem($dao) {
$sql = "UPDATE civicrm_queue_item SET release_time = NULL WHERE id = %1";
$params = [
1 => [$dao->id, 'Integer'],
];
CRM_Core_DAO::executeQuery($sql, $params);
$dao->free();
}

}
24 changes: 19 additions & 5 deletions CRM/Queue/Runner.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,25 @@
*/

/**
* The queue runner is a helper which runs all jobs in a queue.
* `CRM_Queue_Runner` runs a list tasks from a queue. It originally supported the database-upgrade
* queue. Consequently, this runner is optimal for queues which are:
*
* The queue runner is most useful for one-off queues (such as an upgrade);
* if the intention is to develop a dedicated, long-running worker thread,
* then one should consider writing a new queue consumer.
* - Short lived and discrete. You have a fixed list of tasks that will be run to completion.
* - Strictly linear. Tasks must run 1-by-1. Often, one task depends on the success of a previous task.
* - Slightly dangerous. An error, omission, or mistake indicates that the database is in an
* inconsistent state. Errors call for skilled human intervention.
*
* This runner supports a few modes of operation, eg
*
* - `runAllViaWeb()`: Use a web-browser and a series of AJAX requests to perform all steps.
* If there is an error, prompt the sysadmin/user to decide how to handle it.
* - `runAll()`: Run all tasks, 1-by-1, back-to-back. If there is an error, give up.
* This is used by some CLI upgrades.
*
* This runner is not appropriate for all queues or workloads, so you might choose or create
* a different runner. For example, `CRM_Queue_Autorunner` is geared toward background task lists.
*
* @see CRM_Queue_Autorunner
*/
class CRM_Queue_Runner {

Expand Down Expand Up @@ -203,7 +217,7 @@ public function runNext($useSteal = FALSE) {
$exception = new Exception('Task returned false');
}
}
catch (Exception$e) {
catch (Exception $e) {
$isOK = FALSE;
$exception = $e;
}
Expand Down
9 changes: 4 additions & 5 deletions CRM/Queue/Task.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,11 @@ public function __construct($callback, $arguments, $title = NULL) {
/**
* Perform the task.
*
* @param array $taskCtx
* Array with keys:
* - log: object 'Log'
*
* @param \CRM_Queue_TaskContext $taskCtx
* @throws Exception
* @return bool, TRUE if task completes successfully
* @return bool
* TRUE if task completes successfully.
* FALSE or exception if task fails.
*/
public function run($taskCtx) {
$args = $this->arguments;
Expand Down