Skip to content

Commit

Permalink
Factor out health status checks
Browse files Browse the repository at this point in the history
  • Loading branch information
blizzardc0der committed Apr 1, 2024
1 parent 7997f03 commit c4b136b
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 51 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions nativelink-service/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ rust_library(
"src/capabilities_server.rs",
"src/cas_server.rs",
"src/execution_server.rs",
"src/health_server.rs",
"src/lib.rs",
"src/worker_api_server.rs",
],
Expand All @@ -25,12 +26,15 @@ rust_library(
"//nativelink-scheduler",
"//nativelink-store",
"//nativelink-util",
"@crates//:async-lock",
"@crates//:bytes",
"@crates//:futures",
"@crates//:hyper",
"@crates//:log",
"@crates//:parking_lot",
"@crates//:prost",
"@crates//:rand",
"@crates//:serde_json5",
"@crates//:tokio",
"@crates//:tokio-stream",
"@crates//:tonic",
Expand Down
3 changes: 3 additions & 0 deletions nativelink-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ nativelink-util = { path = "../nativelink-util" }
nativelink-store = { path = "../nativelink-store" }
nativelink-scheduler = { path = "../nativelink-scheduler" }

async-lock = "3.3.0"
bytes = "1.6.0"
futures = "0.3.30"
hyper = { version = "0.14.28" }
serde_json5 = "0.1.0"
log = "0.4.21"
parking_lot = "0.12.1"
prost = "0.12.3"
Expand Down
85 changes: 85 additions & 0 deletions nativelink-service/src/health_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2023 The NativeLink Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use async_lock::{Mutex as AsyncMutex, MutexGuard};
use futures::StreamExt;
use hyper::{Response, StatusCode};
use nativelink_error::Error;
use nativelink_util::health_utils::{
HealthRegistryBuilder, HealthStatus, HealthStatusDescription, HealthStatusReporter,
};

#[derive(Clone)]
pub struct HealthServer {
health_registry_builder: Arc<AsyncMutex<HealthRegistryBuilder>>,
}

impl HealthServer {
pub async fn new(namespace: &str) -> Result<Self, Error> {
let namespace = namespace.to_string();
let health_registry_builder = Arc::new(AsyncMutex::new(HealthRegistryBuilder::new(
namespace.into(),
)));

Ok(HealthServer {
health_registry_builder,
})
}

pub async fn get_health_registry(&self) -> Result<MutexGuard<HealthRegistryBuilder>, Error> {
let health_registry_lock = self.health_registry_builder.lock().await;
Ok(health_registry_lock)
}

pub async fn check_health_status(&self, json_content_type: &'static str) -> Response<String> {
let health_registry_status = self.get_health_registry().await.unwrap().build();
let health_status_descriptions: Vec<HealthStatusDescription> = health_registry_status
.health_status_report()
.collect()
.await;

match serde_json5::to_string(&health_status_descriptions) {
Ok(body) => {
let contains_failed_report = health_status_descriptions
.iter()
.any(|description| matches!(description.status, HealthStatus::Failed { .. }));
let status_code = if contains_failed_report {
StatusCode::SERVICE_UNAVAILABLE
} else {
StatusCode::OK
};

Response::builder()
.status(status_code)
.header(
hyper::header::CONTENT_TYPE,
hyper::header::HeaderValue::from_static(json_content_type),
)
.body(body)
.unwrap()
}

Err(e) => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.header(
hyper::header::CONTENT_TYPE,
hyper::header::HeaderValue::from_static(json_content_type),
)
.body(format!("Internal Failure: {e:?}"))
.unwrap(),
}
}
}
1 change: 1 addition & 0 deletions nativelink-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ pub mod bytestream_server;
pub mod capabilities_server;
pub mod cas_server;
pub mod execution_server;
pub mod health_server;
pub mod worker_api_server;
59 changes: 8 additions & 51 deletions src/bin/nativelink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use async_lock::Mutex as AsyncMutex;
use axum::Router;
use clap::Parser;
use futures::future::{select_all, BoxFuture, OptionFuture, TryFutureExt};
use futures::{FutureExt, StreamExt};
use futures::FutureExt;
use hyper::server::conn::Http;
use hyper::{Response, StatusCode};
use mimalloc::MiMalloc;
Expand All @@ -37,14 +37,12 @@ use nativelink_service::bytestream_server::ByteStreamServer;
use nativelink_service::capabilities_server::CapabilitiesServer;
use nativelink_service::cas_server::CasServer;
use nativelink_service::execution_server::ExecutionServer;
use nativelink_service::health_server::HealthServer;
use nativelink_service::worker_api_server::WorkerApiServer;
use nativelink_store::default_store_factory::store_factory;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit};
use nativelink_util::digest_hasher::{set_default_digest_hasher_func, DigestHasherFunc};
use nativelink_util::health_utils::{
HealthRegistryBuilder, HealthStatus, HealthStatusDescription, HealthStatusReporter,
};
use nativelink_util::metrics_utils::{
set_metrics_enabled_for_this_thread, Collector, CollectorState, Counter, MetricsComponent,
Registry,
Expand Down Expand Up @@ -104,13 +102,11 @@ async fn inner_main(
server_start_timestamp: u64,
) -> Result<(), Box<dyn std::error::Error>> {
let mut root_metrics_registry = <Registry>::with_prefix("nativelink");
let health_registry_builder = Arc::new(AsyncMutex::new(HealthRegistryBuilder::new(
"nativelink".into(),
)));
let health_server = HealthServer::new("nativelink").await?;

let store_manager = Arc::new(StoreManager::new());
{
let mut health_registry_lock = health_registry_builder.lock().await;
let mut health_registry_lock = health_server.get_health_registry().await?;
let root_store_metrics = root_metrics_registry.sub_registry_with_prefix("stores");

for (name, store_cfg) in cfg.stores {
Expand Down Expand Up @@ -397,7 +393,7 @@ async fn inner_main(
);

let root_metrics_registry = root_metrics_registry.clone();
let health_registry_status = health_registry_builder.lock().await.build();
let health_server_cloned = health_server.clone();

let mut svc = Router::new()
// This is the default service that executes if no other endpoint matches.
Expand All @@ -412,48 +408,9 @@ async fn inner_main(
}

spawn_blocking(move || {
futures::executor::block_on(async {
let health_status_descriptions: Vec<HealthStatusDescription> =
health_registry_status
.health_status_report()
.collect()
.await;

match serde_json5::to_string(&health_status_descriptions) {
Ok(body) => {
let contains_failed_report =
health_status_descriptions.iter().any(|description| {
matches!(
description.status,
HealthStatus::Failed { .. }
)
});
let status_code = if contains_failed_report {
StatusCode::SERVICE_UNAVAILABLE
} else {
StatusCode::OK
};
Response::builder()
.status(status_code)
.header(
hyper::header::CONTENT_TYPE,
hyper::header::HeaderValue::from_static(
JSON_CONTENT_TYPE,
),
)
.body(body)
.unwrap()
}
Err(e) => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.header(
hyper::header::CONTENT_TYPE,
hyper::header::HeaderValue::from_static(JSON_CONTENT_TYPE),
)
.body(format!("Internal Failure: {e:?}"))
.unwrap(),
}
})
futures::executor::block_on(
health_server_cloned.check_health_status(JSON_CONTENT_TYPE),
)
})
.await
.unwrap_or_else(error_to_response)
Expand Down

0 comments on commit c4b136b

Please sign in to comment.