diff --git a/Cargo.lock b/Cargo.lock index 994b458cd..fb00254c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5427,7 +5427,7 @@ dependencies = [ [[package]] name = "shuttle-common" -version = "0.7.2" +version = "0.7.3" dependencies = [ "anyhow", "async-trait", @@ -5448,7 +5448,7 @@ dependencies = [ [[package]] name = "shuttle-deployer" -version = "0.7.2" +version = "0.7.3" dependencies = [ "anyhow", "async-trait", @@ -5496,7 +5496,7 @@ dependencies = [ [[package]] name = "shuttle-gateway" -version = "0.7.2" +version = "0.7.3" dependencies = [ "acme2", "anyhow", @@ -5543,7 +5543,7 @@ dependencies = [ [[package]] name = "shuttle-proto" -version = "0.7.2" +version = "0.7.3" dependencies = [ "prost", "shuttle-common", @@ -5553,7 +5553,7 @@ dependencies = [ [[package]] name = "shuttle-provisioner" -version = "0.7.2" +version = "0.7.3" dependencies = [ "aws-config", "aws-sdk-rds", diff --git a/common/Cargo.toml b/common/Cargo.toml index 8e35b26fd..3d70c7cf0 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "shuttle-common" -version = "0.7.2" +version = "0.7.3" edition = "2021" license = "Apache-2.0" description = "Common library for the shuttle platform (https://www.shuttle.rs/)" diff --git a/deployer/Cargo.toml b/deployer/Cargo.toml index e76dba8b0..19799bc50 100644 --- a/deployer/Cargo.toml +++ b/deployer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "shuttle-deployer" -version = "0.7.2" +version = "0.7.3" edition = "2021" description = "Service with instances created per project for handling the compilation, loading, and execution of Shuttle services" diff --git a/deployer/src/handlers/mod.rs b/deployer/src/handlers/mod.rs index cdefc7333..1501ee589 100644 --- a/deployer/src/handlers/mod.rs +++ b/deployer/src/handlers/mod.rs @@ -174,7 +174,7 @@ async fn get_service( async fn get_service_summary( Extension(persistence): Extension, Extension(proxy_fqdn): Extension, - Path((project_name, service_name)): Path<(String, String)>, + Path((_, service_name)): Path<(String, String)>, ) -> Result> { if let Some(service) = persistence.get_service_by_name(&service_name).await? { let deployment = persistence @@ -189,7 +189,7 @@ async fn get_service_summary( .collect(); let response = shuttle_common::models::service::Summary { - uri: format!("https://{}.{proxy_fqdn}", project_name), + uri: format!("https://{proxy_fqdn}"), name: service.name, deployment, resources, diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index aa0ec05f5..2140336cf 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "shuttle-gateway" -version = "0.7.2" +version = "0.7.3" edition = "2021" publish = false diff --git a/gateway/src/acme.rs b/gateway/src/acme.rs index 08067605e..4f448df1e 100644 --- a/gateway/src/acme.rs +++ b/gateway/src/acme.rs @@ -243,9 +243,9 @@ impl AcmeClient { let digest = order.key_authorization(challenge).dns_value(); warn!("dns-01 challenge: _acme-challenge.{domain} 300 IN TXT \"{digest}\""); - // Wait 120 secs to insert the record manually and for it to + // Wait 60 secs to insert the record manually and for it to // propagate before moving on - sleep(Duration::from_secs(120)).await; + sleep(Duration::from_secs(60)).await; order .set_challenge_ready(&challenge.url) diff --git a/gateway/src/main.rs b/gateway/src/main.rs index ff7f6e5e5..b96a36b08 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -4,7 +4,7 @@ use futures::prelude::*; use instant_acme::{AccountCredentials, ChallengeType}; use opentelemetry::global; use shuttle_gateway::acme::{AcmeClient, CustomDomain}; -use shuttle_gateway::api::latest::ApiBuilder; +use shuttle_gateway::api::latest::{ApiBuilder, SVC_DEGRADED_THRESHOLD}; use shuttle_gateway::args::StartArgs; use shuttle_gateway::args::{Args, Commands, InitArgs, UseTls}; use shuttle_gateway::auth::Key; @@ -12,14 +12,14 @@ use shuttle_gateway::proxy::UserServiceBuilder; use shuttle_gateway::service::{GatewayService, MIGRATIONS}; use shuttle_gateway::task; use shuttle_gateway::tls::{make_tls_acceptor, ChainAndPrivateKey}; -use shuttle_gateway::worker::Worker; +use shuttle_gateway::worker::{Worker, WORKER_QUEUE_SIZE}; use sqlx::migrate::MigrateDatabase; use sqlx::{query, Sqlite, SqlitePool}; use std::io::{self, Cursor}; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, info_span, trace, warn}; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; #[tokio::main(flavor = "multi_thread")] @@ -108,14 +108,33 @@ async fn start(db: SqlitePool, fs: PathBuf, args: StartArgs) -> io::Result<()> { async move { loop { tokio::time::sleep(Duration::from_secs(60)).await; + if sender.capacity() < WORKER_QUEUE_SIZE - SVC_DEGRADED_THRESHOLD { + // if degraded, don't stack more health checks + warn!( + sender.capacity = sender.capacity(), + "skipping health checks" + ); + continue; + } + if let Ok(projects) = gateway.iter_projects().await { + let span = info_span!( + "running health checks", + healthcheck.num_projects = projects.len() + ); + let _ = span.enter(); for (project_name, _) in projects { - let _ = gateway + if let Ok(handle) = gateway .new_task() .project(project_name) .and_then(task::check_health()) .send(&sender) - .await; + .await + { + // we wait for the check to be done before + // queuing up the next one + handle.await + } } } } diff --git a/gateway/src/service.rs b/gateway/src/service.rs index 4fe54ff1c..0134a6e7d 100644 --- a/gateway/src/service.rs +++ b/gateway/src/service.rs @@ -246,7 +246,7 @@ impl GatewayService { pub async fn iter_projects( &self, - ) -> Result, Error> { + ) -> Result, Error> { let iter = query("SELECT project_name, account_name FROM projects") .fetch_all(&self.db) .await? diff --git a/gateway/src/task.rs b/gateway/src/task.rs index b6abedb21..5a23c64ac 100644 --- a/gateway/src/task.rs +++ b/gateway/src/task.rs @@ -1,11 +1,13 @@ use futures::Future; use std::collections::VecDeque; use std::marker::PhantomData; +use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::mpsc::Sender; +use tokio::sync::oneshot; use tokio::time::{sleep, timeout}; -use tracing::{info, warn}; +use tracing::{error, info, info_span, warn}; use uuid::Uuid; use crate::project::*; @@ -66,6 +68,23 @@ impl TaskResult { } } + pub fn to_str(&self) -> &str { + match self { + Self::Pending(_) => "pending", + Self::Done(_) => "done", + Self::TryAgain => "try again", + Self::Cancelled => "cancelled", + Self::Err(_) => "error", + } + } + + pub fn is_done(&self) -> bool { + match self { + Self::Done(_) | Self::Cancelled | Self::Err(_) => true, + Self::TryAgain | Self::Pending(_) => false, + } + } + pub fn as_ref(&self) -> TaskResult<&R, &E> { match self { Self::Pending(r) => TaskResult::Pending(r), @@ -179,9 +198,10 @@ impl TaskBuilder { )) } - pub async fn send(self, sender: &Sender) -> Result<(), Error> { - match timeout(TASK_SEND_TIMEOUT, sender.send(self.build())).await { - Ok(Ok(_)) => Ok(()), + pub async fn send(self, sender: &Sender) -> Result { + let (task, handle) = AndThenNotify::after(self.build()); + match timeout(TASK_SEND_TIMEOUT, sender.send(Box::new(task))).await { + Ok(Ok(_)) => Ok(handle), _ => Err(Error::from_kind(ErrorKind::ServiceUnavailable)), } } @@ -225,6 +245,60 @@ impl Task for RunUntilDone { } } +pub struct TaskHandle { + rx: oneshot::Receiver<()>, +} + +impl Future for TaskHandle { + type Output = (); + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + Pin::new(&mut self.rx).poll(cx).map(|_| ()) + } +} + +pub struct AndThenNotify { + inner: T, + notify: Option>, +} + +impl AndThenNotify { + pub fn after(task: T) -> (Self, TaskHandle) { + let (tx, rx) = oneshot::channel(); + ( + Self { + inner: task, + notify: Some(tx), + }, + TaskHandle { rx }, + ) + } +} + +#[async_trait] +impl Task for AndThenNotify +where + Ctx: Send + 'static, + T: Task, +{ + type Output = T::Output; + + type Error = T::Error; + + async fn poll(&mut self, ctx: Ctx) -> TaskResult { + let out = self.inner.poll(ctx).await; + + if out.is_done() { + let _ = self.notify.take().unwrap().send(()); + } + + out + } +} + pub struct WithTimeout { inner: T, start: Option, @@ -322,8 +396,6 @@ where let ctx = self.service.context(); - info!(%self.project_name, "starting work on project"); - let project = match self.service.find_project(&self.project_name).await { Ok(project) => project, Err(err) => return TaskResult::Err(err), @@ -345,6 +417,14 @@ where state: project, }; + let span = info_span!( + "polling project", + ctx.project = ?project_ctx.project_name, + ctx.account = ?project_ctx.account_name, + ctx.state = project_ctx.state.state() + ); + let _ = span.enter(); + let task = self.tasks.front_mut().unwrap(); let timeout = sleep(PROJECT_TASK_MAX_IDLE_TIMEOUT); @@ -364,16 +444,24 @@ where }; if let Some(update) = res.as_ref().ok() { + info!(new_state = ?update.state(), "new state"); match self .service .update_project(&self.project_name, update) .await { - Ok(_) => {} - Err(err) => return TaskResult::Err(err), + Ok(_) => { + info!(new_state = ?update.state(), "successfully updated project state"); + } + Err(err) => { + error!(err = %err, "could not update project state"); + return TaskResult::Err(err); + } } } + info!(result = res.to_str(), "poll result"); + match res { TaskResult::Pending(_) => TaskResult::Pending(()), TaskResult::TryAgain => TaskResult::TryAgain, @@ -386,7 +474,10 @@ where } } TaskResult::Cancelled => TaskResult::Cancelled, - TaskResult::Err(err) => TaskResult::Err(err), + TaskResult::Err(err) => { + error!(err = %err, "project task failure"); + TaskResult::Err(err) + } } } } diff --git a/proto/Cargo.toml b/proto/Cargo.toml index 48dcd0354..91d5352ee 100644 --- a/proto/Cargo.toml +++ b/proto/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "shuttle-proto" -version = "0.7.2" +version = "0.7.3" edition = "2021" publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/provisioner/Cargo.toml b/provisioner/Cargo.toml index 083426a45..057619eed 100644 --- a/provisioner/Cargo.toml +++ b/provisioner/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "shuttle-provisioner" -version = "0.7.2" +version = "0.7.3" edition = "2021" description = "Service responsible for provisioning and managing resources for services" publish = false