From caf1ed719d286c2409edc46786efa6568fbcc190 Mon Sep 17 00:00:00 2001 From: blizzardc0der Date: Sat, 13 Apr 2024 14:01:34 -0400 Subject: [PATCH] [Breaking] Factor out health status checks to its own service Health status checks are now a proper service and no longer enabled by default. Add `"health": {},` under your services config to enable. --- Cargo.lock | 3 + nativelink-config/examples/basic_cas.json | 3 +- nativelink-config/src/cas_server.rs | 15 ++++ nativelink-service/BUILD.bazel | 5 ++ nativelink-service/Cargo.toml | 4 + nativelink-service/src/health_server.rs | 94 +++++++++++++++++++++++ nativelink-service/src/lib.rs | 1 + nativelink-util/src/health_utils.rs | 8 +- src/bin/nativelink.rs | 82 +++++--------------- 9 files changed, 149 insertions(+), 66 deletions(-) create mode 100644 nativelink-service/src/health_server.rs diff --git a/Cargo.lock b/Cargo.lock index 4ef1e9f0f..a1df00085 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1815,6 +1815,7 @@ dependencies = [ name = "nativelink-service" version = "0.3.0" dependencies = [ + "async-lock", "bytes", "futures", "hyper", @@ -1832,9 +1833,11 @@ dependencies = [ "prost", "prost-types", "rand", + "serde_json5", "tokio", "tokio-stream", "tonic 0.11.0", + "tower", "tracing", "uuid", ] diff --git a/nativelink-config/examples/basic_cas.json b/nativelink-config/examples/basic_cas.json index a0403caf6..173951deb 100644 --- a/nativelink-config/examples/basic_cas.json +++ b/nativelink-config/examples/basic_cas.json @@ -154,7 +154,8 @@ "worker_api": { "scheduler": "MAIN_SCHEDULER", }, - "admin": {} + "admin": {}, + "health": {}, } }], "global": { diff --git a/nativelink-config/src/cas_server.rs b/nativelink-config/src/cas_server.rs index 42e18c6d9..a8461d6da 100644 --- a/nativelink-config/src/cas_server.rs +++ b/nativelink-config/src/cas_server.rs @@ -184,6 +184,18 @@ pub struct AdminConfig { pub path: String, } +#[derive(Deserialize, Debug, Default)] +#[serde(deny_unknown_fields)] +pub struct HealthConfig { + /// Path to register the health status check. If path is "/status", and your + /// domain is "example.com", you can reach the endpoint with: + /// . + /// + /// Default: "/status" + #[serde(default)] + pub path: String, +} + #[derive(Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct ServicesConfig { @@ -228,6 +240,9 @@ pub struct ServicesConfig { /// This is the service for any administrative tasks. /// It provides a REST API endpoint for administrative purposes. pub admin: Option, + + /// This is the service for health status check. + pub health: Option, } #[derive(Deserialize, Debug)] diff --git a/nativelink-service/BUILD.bazel b/nativelink-service/BUILD.bazel index 06e1d4ef2..3b45ca37e 100644 --- a/nativelink-service/BUILD.bazel +++ b/nativelink-service/BUILD.bazel @@ -14,6 +14,7 @@ rust_library( "src/capabilities_server.rs", "src/cas_server.rs", "src/execution_server.rs", + "src/health_server.rs", "src/lib.rs", "src/worker_api_server.rs", ], @@ -25,15 +26,19 @@ rust_library( "//nativelink-scheduler", "//nativelink-store", "//nativelink-util", + "@crates//:async-lock", "@crates//:bytes", "@crates//:futures", + "@crates//:hyper", "@crates//:log", "@crates//:parking_lot", "@crates//:prost", "@crates//:rand", + "@crates//:serde_json5", "@crates//:tokio", "@crates//:tokio-stream", "@crates//:tonic", + "@crates//:tower", "@crates//:tracing", "@crates//:uuid", ], diff --git a/nativelink-service/Cargo.toml b/nativelink-service/Cargo.toml index a8f7adbe6..74e98c282 100644 --- a/nativelink-service/Cargo.toml +++ b/nativelink-service/Cargo.toml @@ -11,8 +11,11 @@ nativelink-util = { path = "../nativelink-util" } nativelink-store = { path = "../nativelink-store" } nativelink-scheduler = { path = "../nativelink-scheduler" } +async-lock = "3.3.0" bytes = "1.6.0" futures = "0.3.30" +hyper = { version = "0.14.28" } +serde_json5 = "0.1.0" log = "0.4.21" parking_lot = "0.12.1" prost = "0.12.4" @@ -20,6 +23,7 @@ rand = "0.8.5" tokio = { version = "1.37.0", features = ["sync", "rt"] } tokio-stream = { version = "0.1.15", features = ["sync"] } tonic = { version = "0.11.0", features = ["gzip", "tls"] } +tower = "0.4.13" tracing = "0.1.40" uuid = { version = "1.8.0", features = ["v4"] } diff --git a/nativelink-service/src/health_server.rs b/nativelink-service/src/health_server.rs new file mode 100644 index 000000000..4dd1da5fa --- /dev/null +++ b/nativelink-service/src/health_server.rs @@ -0,0 +1,94 @@ +// Copyright 2024 The NativeLink Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use futures::StreamExt; +use hyper::{Body, Request, Response, StatusCode}; +use nativelink_error::Error; +use nativelink_util::health_utils::{ + HealthRegistry, HealthStatus, HealthStatusDescription, HealthStatusReporter, +}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tower::Service; + +/// Content type header value for JSON. +const JSON_CONTENT_TYPE: &str = "application/json; charset=utf-8"; + +#[derive(Clone)] +pub struct HealthServer { + pub health_registry_status: HealthRegistry, +} + +impl HealthServer { + pub async fn new(health_registry_status: HealthRegistry) -> Result { + Ok(Self { + health_registry_status, + }) + } +} + +impl Service> for HealthServer { + type Response = Response; + type Error = std::convert::Infallible; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, _req: Request) -> Self::Future { + let health_registry_status = self.health_registry_status.clone(); + let fut = async move { + let health_status_descriptions: Vec = health_registry_status + .health_status_report() + .collect() + .await; + + match serde_json5::to_string(&health_status_descriptions) { + Ok(body) => { + let contains_failed_report = + health_status_descriptions.iter().any(|description| { + matches!(description.status, HealthStatus::Failed { .. }) + }); + let status_code = if contains_failed_report { + StatusCode::SERVICE_UNAVAILABLE + } else { + StatusCode::OK + }; + + Ok(Response::builder() + .status(status_code) + .header( + hyper::header::CONTENT_TYPE, + hyper::header::HeaderValue::from_static(JSON_CONTENT_TYPE), + ) + .body(Body::from(body)) + .unwrap()) + } + + Err(e) => Ok(Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .header( + hyper::header::CONTENT_TYPE, + hyper::header::HeaderValue::from_static(JSON_CONTENT_TYPE), + ) + .body(Body::from(format!("Internal Failure: {e:?}"))) + .unwrap()), + } + }; + + // Return the response as an immediate future + Box::pin(fut) + } +} diff --git a/nativelink-service/src/lib.rs b/nativelink-service/src/lib.rs index c1891af2b..ba78350b7 100644 --- a/nativelink-service/src/lib.rs +++ b/nativelink-service/src/lib.rs @@ -17,4 +17,5 @@ pub mod bytestream_server; pub mod capabilities_server; pub mod cas_server; pub mod execution_server; +pub mod health_server; pub mod worker_api_server; diff --git a/nativelink-util/src/health_utils.rs b/nativelink-util/src/health_utils.rs index f0a4e43a8..75f92b672 100644 --- a/nativelink-util/src/health_utils.rs +++ b/nativelink-util/src/health_utils.rs @@ -162,13 +162,17 @@ pub struct HealthRegistry { } pub trait HealthStatusReporter { - fn health_status_report(&self) -> Pin + '_>>; + fn health_status_report( + &self, + ) -> Pin + '_ + Send>>; } /// Health status reporter implementation for the health registry that provides a stream /// of health status descriptions. impl HealthStatusReporter for HealthRegistry { - fn health_status_report(&self) -> Pin + '_>> { + fn health_status_report( + &self, + ) -> Pin + '_ + Send>> { Box::pin(futures::stream::iter(self.indicators.iter()).then( |(namespace, indicator)| async move { HealthStatusDescription { diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index 787cc5858..ab790a64e 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -21,7 +21,7 @@ use async_lock::Mutex as AsyncMutex; use axum::Router; use clap::Parser; use futures::future::{select_all, BoxFuture, OptionFuture, TryFutureExt}; -use futures::{FutureExt, StreamExt}; +use futures::FutureExt; use hyper::server::conn::Http; use hyper::{Response, StatusCode}; use mimalloc::MiMalloc; @@ -37,14 +37,13 @@ use nativelink_service::bytestream_server::ByteStreamServer; use nativelink_service::capabilities_server::CapabilitiesServer; use nativelink_service::cas_server::CasServer; use nativelink_service::execution_server::ExecutionServer; +use nativelink_service::health_server::HealthServer; use nativelink_service::worker_api_server::WorkerApiServer; use nativelink_store::default_store_factory::store_factory; use nativelink_store::store_manager::StoreManager; use nativelink_util::common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit}; use nativelink_util::digest_hasher::{set_default_digest_hasher_func, DigestHasherFunc}; -use nativelink_util::health_utils::{ - HealthRegistryBuilder, HealthStatus, HealthStatusDescription, HealthStatusReporter, -}; +use nativelink_util::health_utils::HealthRegistryBuilder; use nativelink_util::metrics_utils::{ set_metrics_enabled_for_this_thread, Collector, CollectorState, Counter, MetricsComponent, Registry, @@ -79,12 +78,12 @@ const DEFAULT_PROMETHEUS_METRICS_PATH: &str = "/metrics"; /// Note: This must be kept in sync with the documentation in `AdminConfig::path`. const DEFAULT_ADMIN_API_PATH: &str = "/admin"; +// Note: This must be kept in sync with the documentation in `HealthConfig::path`. +const DEFAULT_HEALTH_STATUS_CHECK_PATH: &str = "/status"; + /// Name of environment variable to disable metrics. const METRICS_DISABLE_ENV: &str = "NATIVELINK_DISABLE_METRICS"; -/// Content type header value for JSON. -const JSON_CONTENT_TYPE: &str = "application/json; charset=utf-8"; - /// Backend for bazel remote execution / cache API. #[derive(Parser, Debug)] #[clap( @@ -401,64 +400,21 @@ async fn inner_main( let mut svc = Router::new() // This is the default service that executes if no other endpoint matches. - .fallback_service(tonic_services.into_service().map_err(|e| panic!("{e}"))) - .route_service( - "/status", - axum::routing::get(move || async move { - fn error_to_response(e: E) -> Response { - let mut response = Response::new(format!("Error: {e:?}")); - *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - response - } + .fallback_service(tonic_services.into_service().map_err(|e| panic!("{e}"))); - spawn_blocking(move || { - futures::executor::block_on(async { - let health_status_descriptions: Vec = - health_registry_status - .health_status_report() - .collect() - .await; - - match serde_json5::to_string(&health_status_descriptions) { - Ok(body) => { - let contains_failed_report = - health_status_descriptions.iter().any(|description| { - matches!( - description.status, - HealthStatus::Failed { .. } - ) - }); - let status_code = if contains_failed_report { - StatusCode::SERVICE_UNAVAILABLE - } else { - StatusCode::OK - }; - Response::builder() - .status(status_code) - .header( - hyper::header::CONTENT_TYPE, - hyper::header::HeaderValue::from_static( - JSON_CONTENT_TYPE, - ), - ) - .body(body) - .unwrap() - } - Err(e) => Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .header( - hyper::header::CONTENT_TYPE, - hyper::header::HeaderValue::from_static(JSON_CONTENT_TYPE), - ) - .body(format!("Internal Failure: {e:?}")) - .unwrap(), - } - }) - }) - .await - .unwrap_or_else(error_to_response) - }), + if let Some(health_cfg) = services.health { + let path = if health_cfg.path.is_empty() { + DEFAULT_HEALTH_STATUS_CHECK_PATH + } else { + &health_cfg.path + }; + svc = svc.route_service( + path, + HealthServer { + health_registry_status, + }, ); + } if let Some(prometheus_cfg) = services.experimental_prometheus { fn error_to_response(e: E) -> Response {