Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extended functionality for Workers to specify queue to run on, and to perform after a duration #759

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion docs-site/content/docs/processing/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,20 @@ To use a worker, we mainly think about adding a job to the queue, so you `use` t
.await
```

If you want to add the job be run after a specified time, you can use the `perform_in` method and specify a `std::time::Duration`, for example:

```rust
// .. in your controller ..
DownloadWorker::perform_in(
&ctx,
std::time::Duration::from_secs(60), // Start job after 60 seconds has passed
DownloadWorkerArgs {
user_guid: "foo".to_string(),
},
)
.await
```

Unlike Rails and Ruby, with Rust you can enjoy _strongly typed_ job arguments which gets serialized and pushed into the queue.

## Creating a new worker
Expand Down Expand Up @@ -169,9 +183,30 @@ workers:
mode: BackgroundQueue
```

By default, `loco` has 2 queues: `default` and `mailer`. If you want to specify other queues for your workers to use, you have to specify them. Adding a `custom` queue would look like this:

```yaml
mode: BackgroundQueue
queues:
- custom
```
And then you can specify which queue to use for each worker by implementing the `opts` function on the `Worker` trait.

```rust
#[async_trait]
impl Worker<DownloadWorkerArgs> for DownloadWorker {
//..
fn opts() -> worker::AppWorkerOpts<DownloadWorkerArgs, Self> {
// this won't run if the queue you supply is not in the config
worker::AppWorkerOpts::new().queue("custom")
}
//..
}
```

## Testing a Worker

You can easily test your worker background jobs using `Loco`. Ensure that your worker is set to the `ForegroundBlocking` mode, which blocks the job, ensuring it runs synchronously. When testing the worker, the test will wait until your worker is completed, allowing you to verify if the worker accomplished its intended tasks.
You can easily test your worker background jobs using `loco`. Ensure that your worker is set to the `ForegroundBlocking` mode, which blocks the job, ensuring it runs synchronously. When testing the worker, the test will wait until your worker is completed, allowing you to verify if the worker accomplished its intended tasks.

It's recommended to implement tests in the `tests/workers` directory to consolidate all your worker tests in one place.

Expand Down
2 changes: 1 addition & 1 deletion src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub use crate::{
task::{self, Task, TaskInfo},
validation::{self, Validatable},
validator::Validate,
worker::{self, AppWorker},
worker::{self, AppWorker, AppWorkerOpts},
Result,
};

Expand Down
51 changes: 51 additions & 0 deletions src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub fn get_queues(config_queues: &Option<Vec<String>>) -> Vec<String> {
queues
}

pub type AppWorkerOpts<Args, T> = sidekiq::WorkerOpts<Args, T>;

#[async_trait]
#[allow(clippy::module_name_repetitions)]
pub trait AppWorker<T>: Worker<T>
Expand All @@ -32,6 +34,55 @@ where
T: Send + Sync + serde::Serialize + 'static,
{
fn build(ctx: &AppContext) -> Self;

async fn perform_in(ctx: &AppContext, duration: std::time::Duration, args: T) -> Result<()> {
match &ctx.config.workers.mode {
WorkerMode::BackgroundQueue => {
if let Some(queue) = &ctx.queue {
Self::opts()
.perform_in(queue, duration, args)
.await
.unwrap();
} else {
error!(
error.msg =
"worker mode requested but no queue connection supplied, skipping job",
"worker_error"
);
}
}
WorkerMode::ForegroundBlocking => {
std::thread::sleep(duration);
Self::build(ctx).perform(args).await.unwrap();
}
WorkerMode::BackgroundAsync => {
let dx = ctx.clone();
tokio::spawn(async move {
// If wait time is larger than a minute, wait in
// intervals of 1 minute to avoid long running tasks
if duration > std::time::Duration::from_secs(60) {
let num_sleeps = duration.as_secs() / 60;
let remaining_time = duration.as_secs() % 60;

for _ in 0..num_sleeps {
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
}

if remaining_time > 0 {
tokio::time::sleep(std::time::Duration::from_secs(remaining_time))
.await;
}
} else {
tokio::time::sleep(duration).await;
}

Self::build(&dx).perform(args).await
});
}
}
Ok(())
}

async fn perform_later(ctx: &AppContext, args: T) -> Result<()> {
match &ctx.config.workers.mode {
WorkerMode::BackgroundQueue => {
Expand Down
Loading