Skip to content
This repository has been archived by the owner on Sep 10, 2024. It is now read-only.

Commit

Permalink
Upgrade apalis
Browse files Browse the repository at this point in the history
  • Loading branch information
sandhose committed May 17, 2024
1 parent 94792ed commit a63ba4c
Show file tree
Hide file tree
Showing 18 changed files with 223 additions and 674 deletions.
114 changes: 60 additions & 54 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ oauth2-types = { path = "./crates/oauth2-types/", version = "=0.9.0" }

# Async job queue
[workspace.dependencies.apalis]
version = "0.4.9"
version = "0.5.1"
features = ["cron"]

# GraphQL server
Expand Down
3 changes: 1 addition & 2 deletions crates/cli/src/commands/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,7 @@ impl Options {

info!(worker_name, "Starting task worker");
let monitor =
mas_tasks::init(&worker_name, &pool, &mailer, homeserver_connection.clone())
.await?;
mas_tasks::init(&worker_name, &pool, &mailer, homeserver_connection.clone());
// TODO: grab the handle
tokio::spawn(monitor.run());
}
Expand Down
2 changes: 1 addition & 1 deletion crates/cli/src/commands/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl Options {
let worker_name = Alphanumeric.sample_string(&mut rng, 10);

info!(worker_name, "Starting task scheduler");
let monitor = mas_tasks::init(&worker_name, &pool, &mailer, conn).await?;
let monitor = mas_tasks::init(&worker_name, &pool, &mailer, conn);

span.exit();

Expand Down
6 changes: 4 additions & 2 deletions crates/email/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ mod mailer;
mod transport;

pub use lettre::{
message::Mailbox, transport::smtp::authentication::Credentials as SmtpCredentials, Address,
address::{Address, AddressError},
message::Mailbox,
transport::smtp::authentication::Credentials as SmtpCredentials,
};
pub use mas_templates::EmailVerificationContext;

pub use self::{
mailer::Mailer,
mailer::{Error as MailerError, Mailer},
transport::{SmtpMode, Transport as MailTransport},
};
6 changes: 6 additions & 0 deletions crates/email/src/mailer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,17 @@ pub struct Mailer {
reply_to: Mailbox,
}

/// Errors that can occur when sending emails
#[derive(Debug, Error)]
#[error(transparent)]
pub enum Error {
/// Mail failed to send through the transport
Transport(#[from] crate::transport::Error),

/// Failed to render email templates
Templates(#[from] mas_templates::TemplateError),

/// Email built was invalid
Content(#[from] lettre::error::Error),
}

Expand Down
4 changes: 1 addition & 3 deletions crates/storage-pg/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ impl<'c> JobRepository for PgJobRepository<'c> {
) -> Result<(), Self::Error> {
let now = clock.now();
let id = Ulid::from_datetime_with_source(now.into(), rng);
// XXX: this is what apalis_core::job::JobId does
let id = format!("JID-{id}");
tracing::Span::current().record("job.id", &id);
tracing::Span::current().record("job.id", tracing::field::display(id));

let res = sqlx::query!(
r#"
Expand Down
4 changes: 2 additions & 2 deletions crates/storage/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct JobSubmission {
payload: Value,
}

#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
struct SerializableSpanContext {
trace_id: String,
span_id: String,
Expand Down Expand Up @@ -65,7 +65,7 @@ impl TryFrom<&SerializableSpanContext> for SpanContext {
}

/// A wrapper for [`Job`] which adds the span context in the payload.
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct JobWithSpanContext<T> {
#[serde(skip_serializing_if = "Option::is_none")]
span_context: Option<SerializableSpanContext>,
Expand Down
3 changes: 2 additions & 1 deletion crates/tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ workspace = true
[dependencies]
anyhow.workspace = true
apalis.workspace = true
apalis-core = "0.4.9"
apalis-core = "0.5.1"
apalis-sql = { version = "0.5.1", features = ["postgres"] }
async-stream = "0.3.5"
async-trait.workspace = true
chrono.workspace = true
Expand Down
20 changes: 10 additions & 10 deletions crates/tasks/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@ use std::str::FromStr;

use apalis::{
cron::CronStream,
prelude::{
timer::TokioTimer, Job, JobContext, Monitor, TokioExecutor, WorkerBuilder, WorkerFactoryFn,
},
prelude::{Data, Job, Monitor, TokioExecutor, WorkerBuilder, WorkerFactoryFn},
};
use chrono::{DateTime, Utc};
use mas_storage::{oauth2::OAuth2AccessTokenRepository, RepositoryAccess};
use tracing::{debug, info};

use crate::{
utils::{metrics_layer, trace_layer, TracedJob},
JobContextExt, State,
State,
};

#[derive(Default, Clone)]
Expand All @@ -50,13 +48,15 @@ impl TracedJob for CleanupExpiredTokensJob {}

pub async fn cleanup_expired_tokens(
job: CleanupExpiredTokensJob,
ctx: JobContext,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
state: Data<State>,
) -> Result<(), mas_storage::RepositoryError> {
debug!("cleanup expired tokens job scheduled at {}", job.scheduled);

let state = ctx.state();
let clock = state.clock();
let mut repo = state.repository().await?;
let mut repo = state
.repository()
.await
.map_err(mas_storage::RepositoryError::from_error)?;

let count = repo.oauth2_access_token().cleanup_expired(&clock).await?;
repo.save().await?;
Expand All @@ -78,10 +78,10 @@ pub(crate) fn register(
let schedule = apalis::cron::Schedule::from_str("*/15 * * * * *").unwrap();
let worker_name = format!("{job}-{suffix}", job = CleanupExpiredTokensJob::NAME);
let worker = WorkerBuilder::new(worker_name)
.stream(CronStream::new(schedule).timer(TokioTimer).to_stream())
.layer(state.inject())
.data(state.clone())
.layer(metrics_layer())
.layer(trace_layer())
.stream(CronStream::new(schedule).into_stream())
.build_fn(cleanup_expired_tokens);

monitor.register(worker)
Expand Down
Loading

0 comments on commit a63ba4c

Please sign in to comment.