From b4055af55e3991ca39db9d3bfae3efa7d9e953db Mon Sep 17 00:00:00 2001 From: Damien Date: Tue, 8 Nov 2022 12:34:54 +0000 Subject: [PATCH] fix: add timeout to health checks (#468) * fix: add timeout to health checks * Parameterise the timeout * rollback on inadvertent circleci change * misc: log out a warning when a task has been running for a long time * longer timeouts * Address review * Fmt --- common/src/models/error.rs | 5 +++++ gateway/src/api/latest.rs | 18 +++++++++++++++--- gateway/src/lib.rs | 2 +- gateway/src/project.rs | 7 +++++-- gateway/src/task.rs | 32 +++++++++++++++++++++++++++----- gateway/src/worker.rs | 2 +- 6 files changed, 54 insertions(+), 12 deletions(-) diff --git a/common/src/models/error.rs b/common/src/models/error.rs index 5e5677925..da5b77fa5 100644 --- a/common/src/models/error.rs +++ b/common/src/models/error.rs @@ -47,6 +47,7 @@ pub enum ErrorKind { InvalidOperation, Internal, NotReady, + ServiceUnavailable, } impl From for ApiError { @@ -54,6 +55,10 @@ impl From for ApiError { let (status, error_message) = match kind { ErrorKind::Internal => (StatusCode::INTERNAL_SERVER_ERROR, "internal server error"), ErrorKind::KeyMissing => (StatusCode::UNAUTHORIZED, "request is missing a key"), + ErrorKind::ServiceUnavailable => ( + StatusCode::SERVICE_UNAVAILABLE, + "we're experiencing a high workload right now, please try again in a little bit", + ), ErrorKind::KeyMalformed => (StatusCode::BAD_REQUEST, "request has an invalid key"), ErrorKind::BadHost => (StatusCode::BAD_REQUEST, "the 'Host' header is invalid"), ErrorKind::UserNotFound => (StatusCode::NOT_FOUND, "user not found"), diff --git a/gateway/src/api/latest.rs b/gateway/src/api/latest.rs index 48114fa8e..733a9c4b0 100644 --- a/gateway/src/api/latest.rs +++ b/gateway/src/api/latest.rs @@ -17,12 +17,16 @@ use tracing::{debug, debug_span, field, Span}; use crate::auth::{Admin, ScopedUser, User}; use crate::task::{self, BoxedTask}; +use crate::worker::WORKER_QUEUE_SIZE; use crate::{AccountName, Error, GatewayService, ProjectName}; +pub const SVC_DEGRADED_THRESHOLD: usize = 128; + #[derive(Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum GatewayStatus { Healthy, + Degraded, Unhealthy, } @@ -38,6 +42,12 @@ impl StatusResponse { } } + pub fn degraded() -> Self { + Self { + status: GatewayStatus::Degraded, + } + } + pub fn unhealthy() -> Self { Self { status: GatewayStatus::Unhealthy, @@ -138,13 +148,15 @@ async fn route_project( } async fn get_status(Extension(sender): Extension>) -> Response { - let (status, body) = if !sender.is_closed() && sender.capacity() > 0 { - (StatusCode::OK, StatusResponse::healthy()) - } else { + let (status, body) = if sender.is_closed() || sender.capacity() == 0 { ( StatusCode::INTERNAL_SERVER_ERROR, StatusResponse::unhealthy(), ) + } else if sender.capacity() < WORKER_QUEUE_SIZE - SVC_DEGRADED_THRESHOLD { + (StatusCode::OK, StatusResponse::degraded()) + } else { + (StatusCode::OK, StatusResponse::healthy()) }; let body = serde_json::to_vec(&body).unwrap(); diff --git a/gateway/src/lib.rs b/gateway/src/lib.rs index 687bec738..0faf2e7aa 100644 --- a/gateway/src/lib.rs +++ b/gateway/src/lib.rs @@ -77,7 +77,7 @@ impl From for Error { impl From> for Error { fn from(_: SendError) -> Self { - Self::from(ErrorKind::NotReady) + Self::from(ErrorKind::ServiceUnavailable) } } diff --git a/gateway/src/project.rs b/gateway/src/project.rs index 8ac610158..a4a70ad85 100644 --- a/gateway/src/project.rs +++ b/gateway/src/project.rs @@ -15,7 +15,7 @@ use hyper::Client; use once_cell::sync::Lazy; use rand::distributions::{Alphanumeric, DistString}; use serde::{Deserialize, Serialize}; -use tokio::time; +use tokio::time::{self, timeout}; use tracing::{debug, error}; use crate::{ @@ -63,6 +63,8 @@ const MAX_RESTARTS: i64 = 3; // Client used for health checks static CLIENT: Lazy> = Lazy::new(Client::new); +// Health check must succeed within 10 seconds +static IS_HEALTHY_TIMEOUT: Duration = Duration::from_secs(10); #[async_trait] impl Refresh for ContainerInspectResponse @@ -640,7 +642,8 @@ impl Service { pub async fn is_healthy(&mut self) -> bool { let uri = self.uri(format!("/projects/{}/status", self.name)).unwrap(); - let is_healthy = matches!(CLIENT.get(uri).await, Ok(res) if res.status().is_success()); + let resp = timeout(IS_HEALTHY_TIMEOUT, CLIENT.get(uri)).await; + let is_healthy = matches!(resp, Ok(Ok(res)) if res.status().is_success()); self.last_check = Some(HealthCheckRecord::new(is_healthy)); is_healthy } diff --git a/gateway/src/task.rs b/gateway/src/task.rs index 2a1d38f1f..2f552a995 100644 --- a/gateway/src/task.rs +++ b/gateway/src/task.rs @@ -3,8 +3,8 @@ use std::collections::VecDeque; use std::marker::PhantomData; use std::sync::Arc; use std::time::{Duration, Instant}; -use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::Sender; +use tokio::time::{sleep, timeout}; use tracing::warn; use uuid::Uuid; @@ -12,7 +12,12 @@ use crate::project::*; use crate::service::{GatewayContext, GatewayService}; use crate::{AccountName, EndState, Error, ErrorKind, ProjectName, Refresh, State}; -const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300); +// Default maximum _total_ time a task is allowed to run +pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300); +// Maximum time we'll wait for a task to successfully be sent down the channel +pub const TASK_SEND_TIMEOUT: Duration = Duration::from_secs(9); +// Maximum time before a task is considered degraded +pub const PROJECT_TASK_MAX_IDLE_TIMEOUT: Duration = Duration::from_secs(60); #[async_trait] pub trait Task: Send { @@ -183,8 +188,11 @@ impl TaskBuilder { )) } - pub async fn send(self, sender: &Sender) -> Result<(), SendError> { - sender.send(self.build()).await + pub async fn send(self, sender: &Sender) -> Result<(), Error> { + match timeout(TASK_SEND_TIMEOUT, sender.send(self.build())).await { + Ok(Ok(_)) => Ok(()), + _ => Err(Error::from_kind(ErrorKind::ServiceUnavailable)), + } } } @@ -338,7 +346,21 @@ where let task = self.tasks.front_mut().unwrap(); - let res = task.poll(project_ctx).await; + let timeout = sleep(PROJECT_TASK_MAX_IDLE_TIMEOUT); + let res = { + let mut poll = task.poll(project_ctx); + tokio::select! { + res = &mut poll => res, + _ = timeout => { + warn!( + project_name = ?self.project_name, + account_name = ?self.account_name, + "a task has been idling for a long time" + ); + poll.await + } + } + }; if let Some(update) = res.as_ref().ok() { match self diff --git a/gateway/src/worker.rs b/gateway/src/worker.rs index 9584fa245..25a3914ba 100644 --- a/gateway/src/worker.rs +++ b/gateway/src/worker.rs @@ -4,7 +4,7 @@ use tracing::{debug, info}; use crate::task::{BoxedTask, TaskResult}; use crate::Error; -const WORKER_QUEUE_SIZE: usize = 2048; +pub const WORKER_QUEUE_SIZE: usize = 2048; pub struct Worker { send: Option>,