Skip to content

Commit

Permalink
Factor out health status checks to own service
Browse files Browse the repository at this point in the history
  • Loading branch information
blizzardc0der committed Apr 5, 2024
1 parent 2ae7cab commit 450bf36
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 64 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion nativelink-config/examples/basic_cas.json
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@
"worker_api": {
"scheduler": "MAIN_SCHEDULER",
},
"admin": {}
"admin": {},
"health": {},
}
}],
"global": {
Expand Down
15 changes: 15 additions & 0 deletions nativelink-config/src/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,18 @@ pub struct AdminConfig {
pub path: String,
}

#[derive(Deserialize, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct HealthConfig {
/// Path to register the health status check. If path is "/status", and your
/// domain is "example.com", you can reach the endpoint with:
/// <http://example.com/status>.
///
/// Default: "/status"
#[serde(default)]
pub path: String,
}

#[derive(Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct ServicesConfig {
Expand Down Expand Up @@ -228,6 +240,9 @@ pub struct ServicesConfig {
/// This is the service for any administrative tasks.
/// It provides a REST API endpoint for administrative purposes.
pub admin: Option<AdminConfig>,

/// This is the service for health status check.
pub health: Option<HealthConfig>,
}

#[derive(Deserialize, Debug)]
Expand Down
4 changes: 4 additions & 0 deletions nativelink-service/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ rust_library(
"src/capabilities_server.rs",
"src/cas_server.rs",
"src/execution_server.rs",
"src/health_server.rs",
"src/lib.rs",
"src/worker_api_server.rs",
],
Expand All @@ -25,12 +26,15 @@ rust_library(
"//nativelink-scheduler",
"//nativelink-store",
"//nativelink-util",
"@crates//:async-lock",
"@crates//:bytes",
"@crates//:futures",
"@crates//:hyper",
"@crates//:log",
"@crates//:parking_lot",
"@crates//:prost",
"@crates//:rand",
"@crates//:serde_json5",
"@crates//:tokio",
"@crates//:tokio-stream",
"@crates//:tonic",
Expand Down
3 changes: 3 additions & 0 deletions nativelink-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ nativelink-util = { path = "../nativelink-util" }
nativelink-store = { path = "../nativelink-store" }
nativelink-scheduler = { path = "../nativelink-scheduler" }

async-lock = "3.3.0"
bytes = "1.6.0"
futures = "0.3.30"
hyper = { version = "0.14.28" }
serde_json5 = "0.1.0"
log = "0.4.21"
parking_lot = "0.12.1"
prost = "0.12.3"
Expand Down
88 changes: 88 additions & 0 deletions nativelink-service/src/health_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2024 The NativeLink Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use async_lock::{Mutex as AsyncMutex, MutexGuard};
use futures::StreamExt;
use hyper::{Response, StatusCode};
use nativelink_error::Error;
use nativelink_util::health_utils::{
HealthRegistryBuilder, HealthStatus, HealthStatusDescription, HealthStatusReporter,
};

/// Content type header value for JSON.
const JSON_CONTENT_TYPE: &str = "application/json; charset=utf-8";

#[derive(Clone)]
pub struct HealthServer {
health_registry_builder: Arc<AsyncMutex<HealthRegistryBuilder>>,
}

impl HealthServer {
pub async fn new(namespace: &str) -> Result<Self, Error> {
let namespace = namespace.to_string();
let health_registry_builder = Arc::new(AsyncMutex::new(HealthRegistryBuilder::new(
namespace.into(),
)));

Ok(HealthServer {
health_registry_builder,
})
}

pub async fn get_health_registry(&self) -> Result<MutexGuard<HealthRegistryBuilder>, Error> {
let health_registry_lock = self.health_registry_builder.lock().await;
Ok(health_registry_lock)
}

pub async fn check_health_status(&self) -> Response<String> {
let health_registry_status = self.get_health_registry().await.unwrap().build();
let health_status_descriptions: Vec<HealthStatusDescription> = health_registry_status
.health_status_report()
.collect()
.await;

match serde_json5::to_string(&health_status_descriptions) {
Ok(body) => {
let contains_failed_report = health_status_descriptions
.iter()
.any(|description| matches!(description.status, HealthStatus::Failed { .. }));
let status_code = if contains_failed_report {
StatusCode::SERVICE_UNAVAILABLE
} else {
StatusCode::OK
};

Response::builder()
.status(status_code)
.header(
hyper::header::CONTENT_TYPE,
hyper::header::HeaderValue::from_static(JSON_CONTENT_TYPE),
)
.body(body)
.unwrap()
}

Err(e) => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.header(
hyper::header::CONTENT_TYPE,
hyper::header::HeaderValue::from_static(JSON_CONTENT_TYPE),
)
.body(format!("Internal Failure: {e:?}"))
.unwrap(),
}
}
}
1 change: 1 addition & 0 deletions nativelink-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ pub mod bytestream_server;
pub mod capabilities_server;
pub mod cas_server;
pub mod execution_server;
pub mod health_server;
pub mod worker_api_server;
88 changes: 25 additions & 63 deletions src/bin/nativelink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use async_lock::Mutex as AsyncMutex;
use axum::Router;
use clap::Parser;
use futures::future::{select_all, BoxFuture, OptionFuture, TryFutureExt};
use futures::{FutureExt, StreamExt};
use futures::FutureExt;
use hyper::server::conn::Http;
use hyper::{Response, StatusCode};
use mimalloc::MiMalloc;
Expand All @@ -37,14 +37,12 @@ use nativelink_service::bytestream_server::ByteStreamServer;
use nativelink_service::capabilities_server::CapabilitiesServer;
use nativelink_service::cas_server::CasServer;
use nativelink_service::execution_server::ExecutionServer;
use nativelink_service::health_server::HealthServer;
use nativelink_service::worker_api_server::WorkerApiServer;
use nativelink_store::default_store_factory::store_factory;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit};
use nativelink_util::digest_hasher::{set_default_digest_hasher_func, DigestHasherFunc};
use nativelink_util::health_utils::{
HealthRegistryBuilder, HealthStatus, HealthStatusDescription, HealthStatusReporter,
};
use nativelink_util::metrics_utils::{
set_metrics_enabled_for_this_thread, Collector, CollectorState, Counter, MetricsComponent,
Registry,
Expand Down Expand Up @@ -79,12 +77,12 @@ const DEFAULT_PROMETHEUS_METRICS_PATH: &str = "/metrics";
/// Note: This must be kept in sync with the documentation in `AdminConfig::path`.
const DEFAULT_ADMIN_API_PATH: &str = "/admin";

// Note: This must be kept in sync with the documentation in `HealthConfig::path`.
const DEFAULT_HEALTH_STATUS_CHECK_PATH: &str = "/status";

/// Name of environment variable to disable metrics.
const METRICS_DISABLE_ENV: &str = "NATIVELINK_DISABLE_METRICS";

/// Content type header value for JSON.
const JSON_CONTENT_TYPE: &str = "application/json; charset=utf-8";

/// Backend for bazel remote execution / cache API.
#[derive(Parser, Debug)]
#[clap(
Expand All @@ -104,13 +102,11 @@ async fn inner_main(
server_start_timestamp: u64,
) -> Result<(), Box<dyn std::error::Error>> {
let mut root_metrics_registry = <Registry>::with_prefix("nativelink");
let health_registry_builder = Arc::new(AsyncMutex::new(HealthRegistryBuilder::new(
"nativelink".into(),
)));
let health_server = HealthServer::new("nativelink").await?;

let store_manager = Arc::new(StoreManager::new());
{
let mut health_registry_lock = health_registry_builder.lock().await;
let mut health_registry_lock = health_server.get_health_registry().await?;
let root_store_metrics = root_metrics_registry.sub_registry_with_prefix("stores");

for (name, store_cfg) in cfg.stores {
Expand Down Expand Up @@ -397,68 +393,34 @@ async fn inner_main(
);

let root_metrics_registry = root_metrics_registry.clone();
let health_registry_status = health_registry_builder.lock().await.build();

let mut svc = Router::new()
// This is the default service that executes if no other endpoint matches.
.fallback_service(tonic_services.into_service().map_err(|e| panic!("{e}")))
.route_service(
"/status",
axum::routing::get(move || async move {
fn error_to_response<E: std::error::Error>(e: E) -> Response<String> {
let mut response = Response::new(format!("Error: {e:?}"));
*response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
response
}
.fallback_service(tonic_services.into_service().map_err(|e| panic!("{e}")));

if let Some(health_cfg) = services.health {
fn error_to_response<E: std::error::Error>(e: E) -> Response<String> {
let mut response = Response::new(format!("Error: {e:?}"));
*response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
response
}
let path = if health_cfg.path.is_empty() {
DEFAULT_HEALTH_STATUS_CHECK_PATH
} else {
&health_cfg.path
};
let health_server_cloned = health_server.clone();
svc = svc.route_service(
path,
axum::routing::get(move || async move {
spawn_blocking(move || {
futures::executor::block_on(async {
let health_status_descriptions: Vec<HealthStatusDescription> =
health_registry_status
.health_status_report()
.collect()
.await;

match serde_json5::to_string(&health_status_descriptions) {
Ok(body) => {
let contains_failed_report =
health_status_descriptions.iter().any(|description| {
matches!(
description.status,
HealthStatus::Failed { .. }
)
});
let status_code = if contains_failed_report {
StatusCode::SERVICE_UNAVAILABLE
} else {
StatusCode::OK
};
Response::builder()
.status(status_code)
.header(
hyper::header::CONTENT_TYPE,
hyper::header::HeaderValue::from_static(
JSON_CONTENT_TYPE,
),
)
.body(body)
.unwrap()
}
Err(e) => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.header(
hyper::header::CONTENT_TYPE,
hyper::header::HeaderValue::from_static(JSON_CONTENT_TYPE),
)
.body(format!("Internal Failure: {e:?}"))
.unwrap(),
}
})
futures::executor::block_on(health_server_cloned.check_health_status())
})
.await
.unwrap_or_else(error_to_response)
}),
);
}

if let Some(prometheus_cfg) = services.experimental_prometheus {
fn error_to_response<E: std::error::Error>(e: E) -> Response<String> {
Expand Down

0 comments on commit 450bf36

Please sign in to comment.