Skip to content

Commit 0875af1

Browse files
committed
Civi\Api4\Queue - Add APIs for claiming and running enqueued tasks
1 parent 087efff commit 0875af1

File tree

4 files changed

+648
-0
lines changed

4 files changed

+648
-0
lines changed

Civi/Api4/Action/Queue/ClaimItems.php

+90
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
<?php
2+
3+
/*
4+
+--------------------------------------------------------------------+
5+
| Copyright CiviCRM LLC. All rights reserved. |
6+
| |
7+
| This work is published under the GNU AGPLv3 license with some |
8+
| permitted exceptions and without any warranty. For full license |
9+
| and copyright information, see https://civicrm.org/licensing |
10+
+--------------------------------------------------------------------+
11+
*/
12+
13+
namespace Civi\Api4\Action\Queue;
14+
15+
use Civi\Api4\Generic\Traits\SelectParamTrait;
16+
17+
/**
18+
* Claim an item from the queue. Returns zero or one items.
19+
*
20+
* @method ?string setQueue
21+
* @method $this setQueue(?string $queue)
22+
*/
23+
class ClaimItems extends \Civi\Api4\Generic\AbstractAction {
24+
25+
use SelectParamTrait;
26+
27+
/**
28+
* Name of the target queue.
29+
*
30+
* @var string|null
31+
*/
32+
protected $queue;
33+
34+
public function _run(\Civi\Api4\Generic\Result $result) {
35+
$this->select = empty($this->select) ? ['id', 'data', 'queue'] : $this->select;
36+
$queue = $this->queue();
37+
if (!$queue->isActive()) {
38+
return;
39+
}
40+
41+
$isBatch = $queue instanceof \CRM_Queue_Queue_BatchQueueInterface;
42+
$limit = $queue->getSpec('batch_limit') ?: 1;
43+
if ($limit > 1 && !$isBatch) {
44+
throw new \API_Exception(sprintf('Queue "%s" (%s) does not support batching.', $queue->getName(), get_class($queue)));
45+
// Note 1: Simply looping over `claimItem()` is unlikley to help the consumer b/c
46+
// drivers like Sql+Memory are linear+blocking.
47+
// Note 2: The default is batch_limit=1. So someone has specifically chosen an invalid configuration...
48+
}
49+
$items = $isBatch ? $queue->claimItems($limit) : [$queue->claimItem()];
50+
51+
foreach ($items as $item) {
52+
if ($item) {
53+
$result[] = $this->convertItemToStub($item);
54+
}
55+
}
56+
}
57+
58+
/**
59+
* @param \CRM_Queue_DAO_QueueItem|\stdClass $item
60+
* @return array
61+
*/
62+
protected function convertItemToStub(object $item): array {
63+
$array = [];
64+
foreach ($this->select as $field) {
65+
switch ($field) {
66+
case 'id':
67+
$array['id'] = $item->id;
68+
break;
69+
70+
case 'data':
71+
$array['data'] = (array) $item->data;
72+
break;
73+
74+
case 'queue':
75+
$array['queue'] = $this->queue;
76+
break;
77+
78+
}
79+
}
80+
return $array;
81+
}
82+
83+
protected function queue(): \CRM_Queue_Queue {
84+
if (empty($this->queue)) {
85+
throw new \API_Exception('Missing required parameter: $queue');
86+
}
87+
return \Civi::queue($this->queue);
88+
}
89+
90+
}

Civi/Api4/Action/Queue/RunItems.php

+119
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
<?php
2+
3+
namespace Civi\Api4\Action\Queue;
4+
5+
/**
6+
* Run an enqueued item (task).
7+
*
8+
* You must either:
9+
*
10+
* - (a) Give the target queue-item specifically (`setItem()`). Useful if you called `claimItem()` separately.
11+
* - (b) Give the name of the queue from which to find an item (`setQueue()`).
12+
*
13+
* Note: If you use `setItem()`, the inputted will be validated (refetched) to ensure authenticity of all details.
14+
*
15+
* Returns 0 or 1 records which indicate the outcome of running the chosen task.
16+
*
17+
* ```php
18+
* $todo = Civi\Api4\Queue::claimItem()->setQueue($item)->setLeaseTime(600)->execute()->single();
19+
* $result = Civi\Api4\Queue::runItem()->setItem($todo)->execute()->single();
20+
* assert(in_array($result['outcome'], ['ok', 'retry', 'fail']))
21+
*
22+
* $result = Civi\Api4\Queue::runItem()->setQueue('foo')->execute()->first();
23+
* assert(in_array($result['outcome'], ['ok', 'retry', 'fail']))
24+
* ```
25+
*
26+
* Valid outcomes are:
27+
* - 'ok': Task executed normally. Removed from queue.
28+
* - 'retry': Task encountered an error. Will try again later.
29+
* - 'fail': Task encountered an error. Will not try again later. Removed from queue.
30+
*
31+
* @method $this setItem(?array $item)
32+
* @method ?array getItem()
33+
* @method ?string setQueue
34+
* @method $this setQueue(?string $queue)
35+
*/
36+
class RunItems extends \Civi\Api4\Generic\AbstractAction {
37+
38+
/**
39+
* Previously claimed item - which should now be released.
40+
*
41+
* @var array|null
42+
* Fields: {id: scalar, queue: string}
43+
*/
44+
protected $items;
45+
46+
/**
47+
* Name of the target queue.
48+
*
49+
* @var string|null
50+
*/
51+
protected $queue;
52+
53+
public function _run(\Civi\Api4\Generic\Result $result) {
54+
if (!empty($this->items)) {
55+
$this->validateItemStubs();
56+
$queue = \Civi::queue($this->items[0]['queue']);
57+
$ids = \CRM_Utils_Array::collect('id', $this->items);
58+
if (count($ids) > 1 && !($queue instanceof \CRM_Queue_Queue_BatchQueueInterface)) {
59+
throw new \API_Exception("runItems: Error: Running multiple items requires BatchQueueInterface");
60+
}
61+
if (count($ids) > 1) {
62+
$items = $queue->fetchItems($ids);
63+
}
64+
else {
65+
$items = [$queue->fetchItem($ids[0])];
66+
}
67+
}
68+
elseif (!empty($this->queue)) {
69+
$queue = \Civi::queue($this->queue);
70+
if (!$queue->isActive()) {
71+
return;
72+
}
73+
$items = $queue instanceof \CRM_Queue_Queue_BatchQueueInterface
74+
? $queue->claimItems($queue->getSpec('batch_limit') ?: 1)
75+
: [$queue->claimItem()];
76+
}
77+
else {
78+
throw new \API_Exception("runItems: Requires either 'queue' or 'item'.");
79+
}
80+
81+
if (empty($items)) {
82+
return;
83+
}
84+
85+
$outcomes = [];
86+
\CRM_Utils_Hook::queueRun($queue, $items, $outcomes);
87+
if (empty($outcomes)) {
88+
throw new \API_Exception(sprintf('Failed to run queue items (name=%s, runner=%s, itemCount=%d, outcomeCount=%d)',
89+
$queue->getName(), $queue->getSpec('runner'), count($items), count($outcomes)));
90+
}
91+
foreach ($items as $itemPos => $item) {
92+
$result[] = ['outcome' => $outcomes[$itemPos], 'item' => $this->createItemStub($item)];
93+
}
94+
}
95+
96+
private function validateItemStubs(): void {
97+
$queueNames = [];
98+
if (!isset($this->items[0])) {
99+
throw new \API_Exception("Queue items must be given as numeric array.");
100+
}
101+
foreach ($this->items as $item) {
102+
if (empty($item['queue'])) {
103+
throw new \API_Exception("Queue item requires property 'queue'.");
104+
}
105+
if (empty($item['id'])) {
106+
throw new \API_Exception("Queue item requires property 'id'.");
107+
}
108+
$queueNames[$item['queue']] = 1;
109+
}
110+
if (count($queueNames) > 1) {
111+
throw new \API_Exception("Queue items cannot be mixed. Found queues: " . implode(', ', array_keys($queueNames)));
112+
}
113+
}
114+
115+
private function createItemStub($item): array {
116+
return ['id' => $item->id, 'queue' => $item->queue_name];
117+
}
118+
119+
}

Civi/Api4/Queue.php

+30
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
*/
1111
namespace Civi\Api4;
1212

13+
use Civi\Api4\Action\Queue\ClaimItems;
14+
use Civi\Api4\Action\Queue\RunItems;
15+
1316
/**
1417
* Track a list of durable/scannable queues.
1518
*
@@ -31,7 +34,34 @@ public static function permissions() {
3134
return [
3235
'meta' => ['access CiviCRM'],
3336
'default' => ['administer queues'],
37+
'runItem' => [\CRM_Core_Permission::ALWAYS_DENY_PERMISSION],
3438
];
3539
}
3640

41+
/**
42+
* Claim an item from the queue. Returns zero or one items.
43+
*
44+
* Note: This is appropriate for persistent, auto-run queues.
45+
*
46+
* @param bool $checkPermissions
47+
* @return \Civi\Api4\Action\Queue\ClaimItems
48+
*/
49+
public static function claimItems($checkPermissions = TRUE) {
50+
return (new ClaimItems(static::getEntityName(), __FUNCTION__))
51+
->setCheckPermissions($checkPermissions);
52+
}
53+
54+
/**
55+
* Run an item from the queue.
56+
*
57+
* Note: This is appropriate for persistent, auto-run queues.
58+
*
59+
* @param bool $checkPermissions
60+
* @return \Civi\Api4\Action\Queue\RunItems
61+
*/
62+
public static function runItems($checkPermissions = TRUE) {
63+
return (new RunItems(static::getEntityName(), __FUNCTION__))
64+
->setCheckPermissions($checkPermissions);
65+
}
66+
3767
}

0 commit comments

Comments
 (0)