Skip to content

Commit

Permalink
[Breaking] Factor out health status checks to its own service
Browse files Browse the repository at this point in the history
Health status checks are now a proper service and no longer enabled by
default.

Add `"health": {},` under your services config to enable.
  • Loading branch information
blizzardc0der committed Apr 15, 2024
1 parent d854874 commit 0638fa2
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 68 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion nativelink-config/examples/basic_cas.json
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@
"worker_api": {
"scheduler": "MAIN_SCHEDULER",
},
"admin": {}
"admin": {},
"health": {},
}
}],
"global": {
Expand Down
15 changes: 15 additions & 0 deletions nativelink-config/src/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,18 @@ pub struct AdminConfig {
pub path: String,
}

#[derive(Deserialize, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct HealthConfig {
/// Path to register the health status check. If path is "/status", and your
/// domain is "example.com", you can reach the endpoint with:
/// <http://example.com/status>.
///
/// Default: "/status"
#[serde(default)]
pub path: String,
}

#[derive(Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct ServicesConfig {
Expand Down Expand Up @@ -228,6 +240,9 @@ pub struct ServicesConfig {
/// This is the service for any administrative tasks.
/// It provides a REST API endpoint for administrative purposes.
pub admin: Option<AdminConfig>,

/// This is the service for health status check.
pub health: Option<HealthConfig>,
}

#[derive(Deserialize, Debug)]
Expand Down
8 changes: 8 additions & 0 deletions nativelink-service/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ rust_library(
"src/capabilities_server.rs",
"src/cas_server.rs",
"src/execution_server.rs",
"src/health_server.rs",
"src/lib.rs",
"src/worker_api_server.rs",
],
Expand All @@ -25,15 +26,19 @@ rust_library(
"//nativelink-scheduler",
"//nativelink-store",
"//nativelink-util",
"@crates//:async-lock",
"@crates//:bytes",
"@crates//:futures",
"@crates//:hyper",
"@crates//:log",
"@crates//:parking_lot",
"@crates//:prost",
"@crates//:rand",
"@crates//:serde_json5",
"@crates//:tokio",
"@crates//:tokio-stream",
"@crates//:tonic",
"@crates//:tower",
"@crates//:tracing",
"@crates//:uuid",
],
Expand All @@ -56,6 +61,7 @@ rust_test_suite(
"//nativelink-service",
"//nativelink-store",
"//nativelink-util",
"@crates//:async-lock",
"@crates//:bytes",
"@crates//:futures",
"@crates//:hyper",
Expand All @@ -64,9 +70,11 @@ rust_test_suite(
"@crates//:prometheus-client",
"@crates//:prost",
"@crates//:prost-types",
"@crates//:serde_json5",
"@crates//:tokio",
"@crates//:tokio-stream",
"@crates//:tonic",
"@crates//:tower",
],
)

Expand Down
4 changes: 4 additions & 0 deletions nativelink-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,19 @@ nativelink-util = { path = "../nativelink-util" }
nativelink-store = { path = "../nativelink-store" }
nativelink-scheduler = { path = "../nativelink-scheduler" }

async-lock = "3.3.0"
bytes = "1.6.0"
futures = "0.3.30"
hyper = { version = "0.14.28" }
serde_json5 = "0.1.0"
log = "0.4.21"
parking_lot = "0.12.1"
prost = "0.12.4"
rand = "0.8.5"
tokio = { version = "1.37.0", features = ["sync", "rt"] }
tokio-stream = { version = "0.1.15", features = ["sync"] }
tonic = { version = "0.11.0", features = ["gzip", "tls"] }
tower = "0.4.13"
tracing = "0.1.40"
uuid = { version = "1.8.0", features = ["v4"] }

Expand Down
81 changes: 81 additions & 0 deletions nativelink-service/src/health_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2024 The NativeLink Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use futures::StreamExt;
use hyper::header::{HeaderValue, CONTENT_TYPE};
use hyper::{Body, Request, Response, StatusCode};
use nativelink_util::health_utils::{
HealthRegistry, HealthStatus, HealthStatusDescription, HealthStatusReporter,
};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::Service;

/// Content type header value for JSON.
const JSON_CONTENT_TYPE: &str = "application/json; charset=utf-8";

#[derive(Clone)]
pub struct HealthServer {
health_registry: HealthRegistry,
}

impl HealthServer {
pub fn new(health_registry: HealthRegistry) -> Self {
Self { health_registry }
}
}

impl Service<Request<hyper::Body>> for HealthServer {
type Response = Response<hyper::Body>;
type Error = std::convert::Infallible;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, _req: Request<Body>) -> Self::Future {
let health_registry = self.health_registry.clone();
Box::pin(async move {
let health_status_descriptions: Vec<HealthStatusDescription> =
health_registry.health_status_report().collect().await;
match serde_json5::to_string(&health_status_descriptions) {
Ok(body) => {
let contains_failed_report =
health_status_descriptions.iter().any(|description| {
matches!(description.status, HealthStatus::Failed { .. })
});
let status_code = if contains_failed_report {
StatusCode::SERVICE_UNAVAILABLE
} else {
StatusCode::OK
};

Ok(Response::builder()
.status(status_code)
.header(CONTENT_TYPE, HeaderValue::from_static(JSON_CONTENT_TYPE))
.body(Body::from(body))
.unwrap())
}

Err(e) => Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.header(CONTENT_TYPE, HeaderValue::from_static(JSON_CONTENT_TYPE))
.body(Body::from(format!("Internal Failure: {e:?}")))
.unwrap()),
}
})
}
}
1 change: 1 addition & 0 deletions nativelink-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ pub mod bytestream_server;
pub mod capabilities_server;
pub mod cas_server;
pub mod execution_server;
pub mod health_server;
pub mod worker_api_server;
8 changes: 6 additions & 2 deletions nativelink-util/src/health_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,17 @@ pub struct HealthRegistry {
}

pub trait HealthStatusReporter {
fn health_status_report(&self) -> Pin<Box<dyn Stream<Item = HealthStatusDescription> + '_>>;
fn health_status_report(
&self,
) -> Pin<Box<dyn Stream<Item = HealthStatusDescription> + Send + '_>>;
}

/// Health status reporter implementation for the health registry that provides a stream
/// of health status descriptions.
impl HealthStatusReporter for HealthRegistry {
fn health_status_report(&self) -> Pin<Box<dyn Stream<Item = HealthStatusDescription> + '_>> {
fn health_status_report(
&self,
) -> Pin<Box<dyn Stream<Item = HealthStatusDescription> + Send + '_>> {
Box::pin(futures::stream::iter(self.indicators.iter()).then(
|(namespace, indicator)| async move {
HealthStatusDescription {
Expand Down
81 changes: 16 additions & 65 deletions src/bin/nativelink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use async_lock::Mutex as AsyncMutex;
use axum::Router;
use clap::Parser;
use futures::future::{select_all, BoxFuture, OptionFuture, TryFutureExt};
use futures::{FutureExt, StreamExt};
use futures::FutureExt;
use hyper::server::conn::Http;
use hyper::{Response, StatusCode};
use mimalloc::MiMalloc;
Expand All @@ -37,14 +37,13 @@ use nativelink_service::bytestream_server::ByteStreamServer;
use nativelink_service::capabilities_server::CapabilitiesServer;
use nativelink_service::cas_server::CasServer;
use nativelink_service::execution_server::ExecutionServer;
use nativelink_service::health_server::HealthServer;
use nativelink_service::worker_api_server::WorkerApiServer;
use nativelink_store::default_store_factory::store_factory;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit};
use nativelink_util::digest_hasher::{set_default_digest_hasher_func, DigestHasherFunc};
use nativelink_util::health_utils::{
HealthRegistryBuilder, HealthStatus, HealthStatusDescription, HealthStatusReporter,
};
use nativelink_util::health_utils::HealthRegistryBuilder;
use nativelink_util::metrics_utils::{
set_metrics_enabled_for_this_thread, Collector, CollectorState, Counter, MetricsComponent,
Registry,
Expand Down Expand Up @@ -79,12 +78,12 @@ const DEFAULT_PROMETHEUS_METRICS_PATH: &str = "/metrics";
/// Note: This must be kept in sync with the documentation in `AdminConfig::path`.
const DEFAULT_ADMIN_API_PATH: &str = "/admin";

// Note: This must be kept in sync with the documentation in `HealthConfig::path`.
const DEFAULT_HEALTH_STATUS_CHECK_PATH: &str = "/status";

/// Name of environment variable to disable metrics.
const METRICS_DISABLE_ENV: &str = "NATIVELINK_DISABLE_METRICS";

/// Content type header value for JSON.
const JSON_CONTENT_TYPE: &str = "application/json; charset=utf-8";

/// Backend for bazel remote execution / cache API.
#[derive(Parser, Debug)]
#[clap(
Expand Down Expand Up @@ -397,68 +396,20 @@ async fn inner_main(
);

let root_metrics_registry = root_metrics_registry.clone();
let health_registry_status = health_registry_builder.lock().await.build();
let health_registry = health_registry_builder.lock().await.build();

let mut svc = Router::new()
// This is the default service that executes if no other endpoint matches.
.fallback_service(tonic_services.into_service().map_err(|e| panic!("{e}")))
.route_service(
"/status",
axum::routing::get(move || async move {
fn error_to_response<E: std::error::Error>(e: E) -> Response<String> {
let mut response = Response::new(format!("Error: {e:?}"));
*response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
response
}
.fallback_service(tonic_services.into_service().map_err(|e| panic!("{e}")));

spawn_blocking(move || {
futures::executor::block_on(async {
let health_status_descriptions: Vec<HealthStatusDescription> =
health_registry_status
.health_status_report()
.collect()
.await;

match serde_json5::to_string(&health_status_descriptions) {
Ok(body) => {
let contains_failed_report =
health_status_descriptions.iter().any(|description| {
matches!(
description.status,
HealthStatus::Failed { .. }
)
});
let status_code = if contains_failed_report {
StatusCode::SERVICE_UNAVAILABLE
} else {
StatusCode::OK
};
Response::builder()
.status(status_code)
.header(
hyper::header::CONTENT_TYPE,
hyper::header::HeaderValue::from_static(
JSON_CONTENT_TYPE,
),
)
.body(body)
.unwrap()
}
Err(e) => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.header(
hyper::header::CONTENT_TYPE,
hyper::header::HeaderValue::from_static(JSON_CONTENT_TYPE),
)
.body(format!("Internal Failure: {e:?}"))
.unwrap(),
}
})
})
.await
.unwrap_or_else(error_to_response)
}),
);
if let Some(health_cfg) = services.health {
let path = if health_cfg.path.is_empty() {
DEFAULT_HEALTH_STATUS_CHECK_PATH
} else {
&health_cfg.path
};
svc = svc.route_service(path, HealthServer::new(health_registry));
}

if let Some(prometheus_cfg) = services.experimental_prometheus {
fn error_to_response<E: std::error::Error>(e: E) -> Response<String> {
Expand Down

0 comments on commit 0638fa2

Please sign in to comment.