Skip to content

Commit

Permalink
fix: add timeout to health checks (#468)
Browse files Browse the repository at this point in the history
* fix: add timeout to health checks

* Parameterise the timeout

* rollback on inadvertent circleci change

* misc: log out a warning when a task has been running for a long time

* longer timeouts

* Address review

* Fmt
  • Loading branch information
brokad authored Nov 8, 2022
1 parent b6bd64c commit b4055af
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 12 deletions.
5 changes: 5 additions & 0 deletions common/src/models/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,18 @@ pub enum ErrorKind {
InvalidOperation,
Internal,
NotReady,
ServiceUnavailable,
}

impl From<ErrorKind> for ApiError {
fn from(kind: ErrorKind) -> Self {
let (status, error_message) = match kind {
ErrorKind::Internal => (StatusCode::INTERNAL_SERVER_ERROR, "internal server error"),
ErrorKind::KeyMissing => (StatusCode::UNAUTHORIZED, "request is missing a key"),
ErrorKind::ServiceUnavailable => (
StatusCode::SERVICE_UNAVAILABLE,
"we're experiencing a high workload right now, please try again in a little bit",
),
ErrorKind::KeyMalformed => (StatusCode::BAD_REQUEST, "request has an invalid key"),
ErrorKind::BadHost => (StatusCode::BAD_REQUEST, "the 'Host' header is invalid"),
ErrorKind::UserNotFound => (StatusCode::NOT_FOUND, "user not found"),
Expand Down
18 changes: 15 additions & 3 deletions gateway/src/api/latest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ use tracing::{debug, debug_span, field, Span};

use crate::auth::{Admin, ScopedUser, User};
use crate::task::{self, BoxedTask};
use crate::worker::WORKER_QUEUE_SIZE;
use crate::{AccountName, Error, GatewayService, ProjectName};

pub const SVC_DEGRADED_THRESHOLD: usize = 128;

#[derive(Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum GatewayStatus {
Healthy,
Degraded,
Unhealthy,
}

Expand All @@ -38,6 +42,12 @@ impl StatusResponse {
}
}

pub fn degraded() -> Self {
Self {
status: GatewayStatus::Degraded,
}
}

pub fn unhealthy() -> Self {
Self {
status: GatewayStatus::Unhealthy,
Expand Down Expand Up @@ -138,13 +148,15 @@ async fn route_project(
}

async fn get_status(Extension(sender): Extension<Sender<BoxedTask>>) -> Response<Body> {
let (status, body) = if !sender.is_closed() && sender.capacity() > 0 {
(StatusCode::OK, StatusResponse::healthy())
} else {
let (status, body) = if sender.is_closed() || sender.capacity() == 0 {
(
StatusCode::INTERNAL_SERVER_ERROR,
StatusResponse::unhealthy(),
)
} else if sender.capacity() < WORKER_QUEUE_SIZE - SVC_DEGRADED_THRESHOLD {
(StatusCode::OK, StatusResponse::degraded())
} else {
(StatusCode::OK, StatusResponse::healthy())
};

let body = serde_json::to_vec(&body).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl From<ErrorKind> for Error {

impl<T> From<SendError<T>> for Error {
fn from(_: SendError<T>) -> Self {
Self::from(ErrorKind::NotReady)
Self::from(ErrorKind::ServiceUnavailable)
}
}

Expand Down
7 changes: 5 additions & 2 deletions gateway/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use hyper::Client;
use once_cell::sync::Lazy;
use rand::distributions::{Alphanumeric, DistString};
use serde::{Deserialize, Serialize};
use tokio::time;
use tokio::time::{self, timeout};
use tracing::{debug, error};

use crate::{
Expand Down Expand Up @@ -63,6 +63,8 @@ const MAX_RESTARTS: i64 = 3;

// Client used for health checks
static CLIENT: Lazy<Client<HttpConnector>> = Lazy::new(Client::new);
// Health check must succeed within 10 seconds
static IS_HEALTHY_TIMEOUT: Duration = Duration::from_secs(10);

#[async_trait]
impl<Ctx> Refresh<Ctx> for ContainerInspectResponse
Expand Down Expand Up @@ -640,7 +642,8 @@ impl Service {

pub async fn is_healthy(&mut self) -> bool {
let uri = self.uri(format!("/projects/{}/status", self.name)).unwrap();
let is_healthy = matches!(CLIENT.get(uri).await, Ok(res) if res.status().is_success());
let resp = timeout(IS_HEALTHY_TIMEOUT, CLIENT.get(uri)).await;
let is_healthy = matches!(resp, Ok(Ok(res)) if res.status().is_success());
self.last_check = Some(HealthCheckRecord::new(is_healthy));
is_healthy
}
Expand Down
32 changes: 27 additions & 5 deletions gateway/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@ use std::collections::VecDeque;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::Sender;
use tokio::time::{sleep, timeout};
use tracing::warn;
use uuid::Uuid;

use crate::project::*;
use crate::service::{GatewayContext, GatewayService};
use crate::{AccountName, EndState, Error, ErrorKind, ProjectName, Refresh, State};

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
// Default maximum _total_ time a task is allowed to run
pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
// Maximum time we'll wait for a task to successfully be sent down the channel
pub const TASK_SEND_TIMEOUT: Duration = Duration::from_secs(9);
// Maximum time before a task is considered degraded
pub const PROJECT_TASK_MAX_IDLE_TIMEOUT: Duration = Duration::from_secs(60);

#[async_trait]
pub trait Task<Ctx>: Send {
Expand Down Expand Up @@ -183,8 +188,11 @@ impl TaskBuilder {
))
}

pub async fn send(self, sender: &Sender<BoxedTask>) -> Result<(), SendError<BoxedTask>> {
sender.send(self.build()).await
pub async fn send(self, sender: &Sender<BoxedTask>) -> Result<(), Error> {
match timeout(TASK_SEND_TIMEOUT, sender.send(self.build())).await {
Ok(Ok(_)) => Ok(()),
_ => Err(Error::from_kind(ErrorKind::ServiceUnavailable)),
}
}
}

Expand Down Expand Up @@ -338,7 +346,21 @@ where

let task = self.tasks.front_mut().unwrap();

let res = task.poll(project_ctx).await;
let timeout = sleep(PROJECT_TASK_MAX_IDLE_TIMEOUT);
let res = {
let mut poll = task.poll(project_ctx);
tokio::select! {
res = &mut poll => res,
_ = timeout => {
warn!(
project_name = ?self.project_name,
account_name = ?self.account_name,
"a task has been idling for a long time"
);
poll.await
}
}
};

if let Some(update) = res.as_ref().ok() {
match self
Expand Down
2 changes: 1 addition & 1 deletion gateway/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use tracing::{debug, info};
use crate::task::{BoxedTask, TaskResult};
use crate::Error;

const WORKER_QUEUE_SIZE: usize = 2048;
pub const WORKER_QUEUE_SIZE: usize = 2048;

pub struct Worker<W = BoxedTask> {
send: Option<Sender<W>>,
Expand Down

0 comments on commit b4055af

Please sign in to comment.