Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(dev/core#1304) Queues - Allow background worker to drive tasks via APIv4 #22762

Merged
merged 8 commits into from
Jun 3, 2022

Conversation

totten
Copy link
Member

@totten totten commented Feb 11, 2022

Overview

This adds two APIs, Queue.claimItems and Queue.runItems. These APIs may be used by background agents to reserve and execute tasks in a persistent queue. This PR is a major step towards providing generic support for background processing.

As described in previous PRs (esp #22812), application-developers can specify queue behavior by setting various options, eg:

$queue = Civi::queue('my-stuff', [
  'type' => 'SqlParallel',    // Multiple workers may claim concurrent tasks from this queue
  'runner' => 'myStuff',      // Execute tasks via `hook_civicrm_queueRun_myStuff`
  'lease_time' => 3600,       // If task doesn't succeed within 60min, assume it failed.
  'batch_limit' => 5,         // Grab batches of 0-5 items.
  'retry_limit' => 2,         // If a task fails, it may be retried (twice).
  'retry_interval' => 600,    // If a task fails, try again after 10min.
  'error' => 'abort',         // If a task repeatedly has unhandled exceptions, then stop queue from running new tasks
]);
$queue->createItem(...);
$queue->createItem(...);
$queue->createItem(...);

Background agents call the Queue.runItems API (and/or Queue.claimItems). The Queue API respects the options from above. Tasks may be executed like so:

# Example 1: Claim and run a task - single API call, bash
cv api4 Queue.runItems queue=my-stuff
# Example 2: Claim and run a task - single API call, PHP
$outcomes = \Civi\Api4\Queue::runItems(0)>setQueue('my-stuff')->execute();
# Example 3: Claim and run a task - two API calls, PHP
$claims = (array) \Civi\Api4\Queue::claimItems(0)->setQueue('my-stuff')->execute();
if ($claims) {
  $outcomes =  \Civi\Api4\Queue::runItems(0)->setItems($claims)->execute();
}

Cross-Reference

Before

The CRM_Queue allows you to build a queue or task-list. It encourages task-running in the foreground but not in the background:

  • Foreground: If you want to run tasks in the foreground, then use the AJAX-based CRM_Queue_Runner. (Note: it was primarily tuned for executing database upgrades in the web UI.)
  • Background: If you want to run tasks in the background, then you must create a new agent and use various APIs to poll+evaluate the queues. (This is what some extensions do.) Anyone who deploys must take extra steps to configure their agent.

After

You may still use all the same patterns as before. Additionally, you may register a queue that will run in the background. The Queue.runItems API executes them. Here are a few examples:

Technical Details

The present PR only adds the Queue APIs (building-blocks) -- there must also be some agent that uses the APIs. Here are a few ways to create an agent:

  • Bare mininum: Manually setup cron jobs or CLI scripts for each queue.
    cv api4 Queue.runItems queue=my-stuff
  • Generic agent (pseudocode): Implement a generic agent that calls Queue.get +w 'runner IS NOT EMPTY' and then Queue.runItems. Here is pseudocode: https://gist.github.com/totten/d7a7e29de238b7892d49366966787759
  • Generic agent (implementation): Use https://lab.civicrm.org/dev/coworker. This is an in-development implementation of that pseudocode above. It defines a number of options (such as configuring #workers and limiting the #tasks each worker handles). The current revision uses API pipe for low-latency execution -- but it also aims to support REST API for broader compatibility.

Further notes:

  • Background error handling: For database upgrades running in the foreground, an error will cause the whole queue to stop - and it will ask the user for instruction. But if you send tasks to run in the background, then there is no active user to ask. Instead, you need some policy to decide how to handle the error. This PR includes two mechanisms for error-handling:
    • Retry Policy: The retry policy specifies whether (and how) to retry tasks. This example will retry up to 4 times (at intervals of 10 minutes, aka 600 seconds). If it still doesn't succeed, then it is considered failed.
      $q = Civi::queue('my-stuff', [
        'type' => 'SqlParallel',
        'retry_limit' => 4,
        'retry_interval' => 600,
        'error' => 'delete',
      ]);
      $q->createItem(new CRM_Queue_Task('queuex_hello', [123]));
    • Error Hook: When a task encounters an error, it fires hook_civicrm_queueTaskError(). You can use this hook to inspect the error, to enqueue an alternative action, to send a notification, etc. The following example discriminates based on the specific error - for some errors, it retries; for others, it gives up:
      function example_civicrm_queueTaskError($queue, $item, &$outcome, $exception) {
        if (preg_match(';Email server unavailable;', $exception->getMessage()) {
          $outcome = 'retry'; // Maybe worth trying again.
        }
        else {
          $outcome = 'fail';
        }
      }
  • 1 Step/2 Step: The examples show how to run a task with 1-step (runItems()) or 2-steps (claimItems()+runItems()). Why? The 1-step protocol is more convenient in some cases (eg cv api4 Queue.runItems...). However, in coworkers architecture, the 2-step allows it to better balance work across multiple processes.

@civibot
Copy link

civibot bot commented Feb 11, 2022

(Standard links)

@totten
Copy link
Member Author

totten commented Mar 7, 2022

jenkins, test this please

@totten totten force-pushed the master-queue-api4-alt branch from 3b41c8e to 64cdd43 Compare March 9, 2022 10:52
@totten totten marked this pull request as ready for review March 9, 2022 10:54
@totten
Copy link
Member Author

totten commented Mar 9, 2022

@eileenmcnaughton @artfulrobot @seamuslee001 I'm removing the "WIP/Draft" flag from here, rebased, fleshed out the description, and added a couple examples. This should be ripe for a round of review:

@totten totten changed the title (WIP) (dev/core#1304) Queues - Allow background worker to drive tasks via APIv4 (dev/core#1304) Queues - Allow background worker to drive tasks via APIv4 Mar 9, 2022
@totten
Copy link
Member Author

totten commented Mar 9, 2022

More copy-editing on description. Added a pithier example https://gist.github.com/totten/168f15ccf7a1ab14a6a8b695f33074e8.

@eileenmcnaughton
Copy link
Contributor

I started digging into queuing today - a lot of the things I'm trying to make sense are around how it can (and can't be used) but one thing I hit that relates to the api in this PR is that I currently think claimItems is pretty confusing re dates.

What I'm seeing is that the relevant fields are all datetime and that the date that was written when I tried to claim the item is converted into GMT - 12 hours ago - I think that is being handled at other places too - so it seems weird that the item is claimed before it was submitted but it may still work

@@ -18,7 +18,7 @@
/**
* Track a list of known queues.
*/
class CRM_Queue_BAO_Queue extends CRM_Queue_DAO_Queue {
class CRM_Queue_BAO_Queue extends CRM_Queue_DAO_Queue implements \Civi\Test\HookInterface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is now \Civi\Core\HookInterface

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eileenmcnaughton Yup, that's prettier. Pushed update.

@eileenmcnaughton
Copy link
Contributor

test this please

2 similar comments
@eileenmcnaughton
Copy link
Contributor

test this please

@eileenmcnaughton
Copy link
Contributor

test this please

@eileenmcnaughton
Copy link
Contributor

eileenmcnaughton commented Apr 6, 2022

@totten running this locally I'm finding the hook is not invoked.

I am runnning it on 5.48 patched with this PR - I'm somewhat inclined to merge this and keep testing on a generated tarball (on the understanding that the review might still require follow up fixes) as I'm not seeing anything that would adversely affect pre-existing functionality.

My queue item IS being claimed - it just isn't doing anything as the hook is not firing

image

image

@eileenmcnaughton
Copy link
Contributor

Ah - system.flush did it

@eileenmcnaughton
Copy link
Contributor

OK - this is working for me as I think intended - in terms of usefulness I'm having issues though.

What I have is a queue with a bunch of items - this is somewhat analagous to an import job

  • the first one has a lower weight and if it fails the others will not success
  • the others all have the same weight - some might fail

The hope I had was

  1. it wouldn't continue to tasks with a higher weight until the ones with a lower weight had succeeded
  2. the tasks that failed would remain in the queue_item table so they could be viewed / dealt with through another process & would block higher weight items
  3. I could queue api tasks in a lightweight way - in my code I've created a civicrm_api4_queue function that just throughs away the first variable (the $queueContext) and passes the rest to v4 api & returns TRUE.

I'm still working through this use case & having the api functions be usable in practice may well be out of the scope of this PR.

@totten
Copy link
Member Author

totten commented Apr 7, 2022

Trying to recapture some points from MM discussion:

it wouldn't continue to tasks with a higher weight until the ones with a lower weight had succeeded

This could be done as a separate driver/type, eg CRM_Queue_Queue_SqlPhased.

Additionally, queue utilities (fork/branch and wait/merge) could address the same use-case. That might be more portable, though it may also be more verbose.

Both of those can be pursued as additional/follow-up topics that don't change the API semantics here.

I could queue api tasks in a lightweight way - in my code I've created a civicrm_api4_queue function that just throughs away the first variable (the $queueContext) and passes the rest to v4 api & returns TRUE.

Yeah, we should have a small library of tasks/task-adapters -- lke "run api4", "run api5", "fork/branch subqueue", "wait/merge for subqueue". But this is also something that can be improved in additional/follow-up work.

the tasks that failed would remain in the queue_item table so they could be viewed / dealt with through another process & would block higher weight items

We do need some different+well-defined patterns for error-handling. The retry mechanism here is fine -- but what happens when you exhaust all the retries (or don't use retries)? Some error-handling patterns that came up:

  • Fire a hook/event (current PR - hook_queueTaskError)
  • Remove the failed item from the queue (current default in PR, if no hook-listener intervenes)
  • Abort/block the queue (default behavior of the pre-existing ajax runner)
  • Mark the item as "failed" (explicitly or implicitly, via run_count) and teach claimItem() to skip failed items
  • Move the item to a "dead letter queue" - where the DLQ might be:
    • A global/default (Civi::queue('dead_letter') or Civi::queue('failed'))
    • An eponymous queue (Civi::queue("{$queueName}/dead_letter") or Civi::queue("{$queueName}/failed"))
    • A configurable name
  • Dump the item to a log file

It's ideal for the queue-dev to choose a policy for their queue, but that leaves discussion topics re: (a) defaults and (b) how to indicate the policy.

@eileenmcnaughton
Copy link
Contributor

So when I tried to use this the debug issue I hit was that I was 'losing' queue items and their associated data and not realising - ie they failed out & disappeared. I don't think our default error_handling should be to remove data from the queue because in my experience I was unknowingly losing data....

In the Ajax UI the error_mode is set up when the queue is run - but I think the error handling is actually implicit to the queue config - not how it is run. The 2 error modes currently in the runner are

  • ERROR_ABORT
  • ERROR_CONTINUE

In the latter case it deletes and continues.

At minimum I think we should make the error_mode part of the queue config and default to ERROR_ABORT such that people have to choose ERROR_CONTINUE deliberately (and understand the risk of data loss)

Separately there is the question of whether to re-queue - I prefer being able to configure the queue name to which it is requeued (and in practice can see that we would sometimes requeue to a generic failed/dead_letter queue and in other cases to specific ones). That does raise the question of how that config would be stored

Note that one the can-be-later things

  1. I think the firing of an event is an AND not an OR
  2. logging - I think ideally there would be add a Civi::log('queue-processor')->info() call - but I'm on the fence about whether we would actually log it anywhere by default or only with the addition of an extension like monolog.

@totten totten force-pushed the master-queue-api4-alt branch from cae4a9a to 88cf0c0 Compare May 25, 2022 10:30
@totten
Copy link
Member Author

totten commented May 25, 2022

The error policy stuff is WIP, but I rebased and added commits to define the schema for civicr_queue.error and civicrm_queue.status.

@eileenmcnaughton
Copy link
Contributor

@totten sadly the upgrade script is conflicted now - maybe you should pull that schema change into a separate PR for a quick more if the rest is WIP?

@totten totten force-pushed the master-queue-api4-alt branch 3 times, most recently from 1eaddd0 to cd22596 Compare May 31, 2022 01:20
$claimAndRun('abort', 1);
$this->assertEquals(FALSE, $queue->isActive());
break;
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eileenmcnaughton This bit shows how the error=>delete and error=>abort lead to different outcomes (after a series of retries).

@eileenmcnaughton
Copy link
Contributor

Does this need rebasing now the other is merged - I'll try r-running tomorrow

totten added 8 commits June 2, 2022 13:31
Note: The return types are a bit wonky here.  Unfortunately, as I recall,
the original specification for all the methods here allowed backend-specific
return types -- anti-standardized return-types.
Before: Whenever you `claimItem()` from the queue, it marks the item `release_time`.

After: Whenever you `claimItem()` from the queue, it marks _both_ the
`release_time` and the `run_count`.

Comments:

* This is the basis for enforcing a `retry_limit` policy.

* This doesn't require any extra queries or joins - it fits into the
  existing update query.
Background:

* A queue runner should call `releaseItem()` if it tries and aborts some task.
* The `retry_interval` is defined as the extra time to wait before trying again.

Before: The `releaseItem()` always releases for immediate execution.

After: The `releaseItem()` checks `retry_interval`.  If it's set, then it
will add an extra delay before retrying.
Before: The lease-time is one of the following:

1. A value requested at runtime
2. The constant 3600

After: The lease-time is either supplied as

1. A value requested at runtime
2. A value configured for the queue
3. The constant 3600
Before
------

Each of the `CRM_Queue_Queue_*` drivers supports a set of methods for
claiming/releasing one queue-item at a time (eg `claimItem()`,
`releaseItem()`, `deleteItem()`).

After
-----

All drivers still support the same queue-item methods.  Additionally, the
`SqlParallel` driver supports batch-oriented equivalents (`claimItems()`,
`deleteItems()`, etc).

Comments
--------

I initially looked at updating all of the drivers to support queues.  There
were a few obstacles.  Firstly, batched-claims seem semantically
questionable for queues that run 1-by-1 (`Sql`, `Memory`).  Secondly, there
are a few extensions in contrib that extend these classes and override
methods (consequently, they're looking for stable signatures).

The approach here seemed to resolve those two concerns in an OOP-safe way:
define an optional interface (`BatchQueueInterface`) which can be used to
mark enhanced functionality on queues where it makes sense (eg
`SqlParallel`).
@totten totten force-pushed the master-queue-api4-alt branch from 0875af1 to 8369e17 Compare June 2, 2022 20:32
@totten
Copy link
Member Author

totten commented Jun 2, 2022

@eileenmcnaughton OK, rebased. Also added the required error property in the description.

@eileenmcnaughton
Copy link
Contributor

The issue that was blocking for me (the ability to non-deliberately create a queue that discards failed items) is resolved so I'm merging this now

@eileenmcnaughton eileenmcnaughton merged commit de39f93 into civicrm:master Jun 3, 2022
@totten totten deleted the master-queue-api4-alt branch June 6, 2022 23:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants