Skip to content

yougov/rjq

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

57 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Redis Job Queue

Simple redis job queue

crates.io Build Status

Documentation

https://docs.rs/rjq/

Enqueue jobs

use std::time::Duration;
use std::thread::sleep;

let queue = rjq::Queue::new("redis://localhost/", "rjq");
let mut uuids = Vec::new();

for _ in 0..10 {
    sleep(Duration::from_millis(100));
    uuids.push(queue.enqueue(None, vec![], 30).unwrap());
}
sleep(Duration::from_millis(3000));

for uuid in uuids.iter() {
    let status = queue.status(uuid).unwrap();
    let result = queue.result(uuid);
    if let Ok(result) = result {
        println!("{} {:?} {:?}", uuid, status, result);
    } else {
        println!("{:?}", result);
    }
}

Queue worker

use std::time::Duration;
use std::thread::sleep;

#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum Error {
    #[error("{0}")]
    Redis(#[from] redis::RedisError),
    #[error("{0}")]
    Serde(#[from] serde_json::Error),
    #[error("{0}")]
    Rjq(#[from] rjq::errors::Error),
}

fn process(uuid: String, _: Vec<String>) -> rjq::JobResult<Error> {
    sleep(Duration::from_millis(1000));
    println!("{}", uuid);
    Ok(Some(format!("hi from {}", uuid)))
}

let queue = rjq::Queue::new("redis://localhost/", "rjq");
queue.work(process, Some(5), Some(10), Some(1), Some(30), Some(false), Some(false)).unwrap();

Job status

QUEUED - job queued for further processing

RUNNING - job is running by worker

LOST - job has not been finished in time

FINISHED - job has been successfully finished

FAILED - job has been failed due to some errors

Queue methods

Init queue

fn new(url: &str, name: &str) -> Queue;

url - redis URL

name - queue name

Returns queue

Drop queue jobs

fn drop(&self) -> Result<(), Box<Error>>;

Enqueue job

fn enqueue(&self, args: Vec<String>, expire: usize) -> Result<String, Box<Error>>;

args - job arguments

expire - if job has not been started by worker in this time (in seconds), it will expire

Returns job UUID

Get job status

fn status(&self, uuid: &str) -> Result<Status, Box<Error>>;

uuid - job unique identifier

Returns job status

Work on queue

fn work<F: Fn(String, Vec<String>) -> Result<String, Box<Error>> + Send + Sync + 'static>
    (&self,
     fun: F,
     wait: Option<usize>,
     timeout: Option<usize>,
     freq: Option<usize>,
     expire: Option<usize>,
     fall: Option<bool>,
     infinite: Option<bool>)
     -> Result<(), Box<Error>>;

fun - worker function

wait - time to wait until next job will pop

timeout - worker function should finish in timeout (in seconds)

freq - job status check frequency (times per second)

expire - job result will expire in this time (in seconds)

fall - panics to terminate process if the job has been lost

infinite - process jobs infinitely one after another, otherwise only one job will be processed

Get job result

fn result(&self, uuid: &str) -> Result<Option<String>, Box<Error>>;

uuid - job unique identifier

Returns job result

Run tests

cargo test

Packages

No packages published

Languages

  • Rust 100.0%