From 48ca6214d6ad699e3f00b685931d4974865e457b Mon Sep 17 00:00:00 2001 From: Mudit Mathur <85603888+heisenberglit@users.noreply.github.com> Date: Fri, 18 Jul 2025 13:43:57 +0000 Subject: [PATCH 1/2] feat(health): extend /health endpoint to include instances (#1312) --- lib/llm/src/entrypoint/input/http.rs | 10 +++++--- lib/llm/src/http/service/health.rs | 19 ++++++++++++-- lib/llm/src/http/service/service_v2.rs | 26 +++++++++++++++++++- lib/runtime/src/instances.rs | 34 ++++++++++++++++++++++++++ lib/runtime/src/lib.rs | 1 + 5 files changed, 84 insertions(+), 6 deletions(-) create mode 100644 lib/runtime/src/instances.rs diff --git a/lib/llm/src/entrypoint/input/http.rs b/lib/llm/src/entrypoint/input/http.rs index 01bfa61c696..9dcb5f53cbf 100644 --- a/lib/llm/src/entrypoint/input/http.rs +++ b/lib/llm/src/entrypoint/input/http.rs @@ -22,18 +22,22 @@ use dynamo_runtime::{DistributedRuntime, Runtime}; /// Build and run an HTTP service pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Result<()> { + let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?; + let etcd_client = distributed_runtime.etcd_client().clone(); + let http_service = service_v2::HttpService::builder() .port(engine_config.local_model().http_port()) .enable_chat_endpoints(true) .enable_cmpl_endpoints(true) .enable_embeddings_endpoints(true) .with_request_template(engine_config.local_model().request_template()) + .with_etcd_client(etcd_client.clone()) .build()?; + match engine_config { EngineConfig::Dynamic(_) => { - let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?; - match distributed_runtime.etcd_client() { - Some(etcd_client) => { + match etcd_client{ + Some(ref etcd_client) => { let router_config = engine_config.local_model().router_config(); // Listen for models registering themselves in etcd, add them to HTTP service run_watcher( diff --git a/lib/llm/src/http/service/health.rs b/lib/llm/src/http/service/health.rs index c3dfdfcd638..cef349e7497 100644 --- a/lib/llm/src/http/service/health.rs +++ b/lib/llm/src/http/service/health.rs @@ -30,6 +30,7 @@ use super::{service_v2, RouteDoc}; use axum::{http::Method, http::StatusCode, response::IntoResponse, routing::get, Json, Router}; +use dynamo_runtime::instances::list_all_instances; use serde_json::json; use std::sync::Arc; @@ -79,13 +80,26 @@ async fn health_handler( axum::extract::State(state): axum::extract::State>, ) -> impl IntoResponse { let model_entries = state.manager().get_model_entries(); + let instances = if let Some(etcd_client) = state.etcd_client() { + match list_all_instances(etcd_client).await { + Ok(instances) => instances, + Err(err) => { + tracing::warn!("Failed to fetch instances from etcd: {}", err); + vec![] + } + } + } else { + vec![] + }; + if model_entries.is_empty() { ( StatusCode::SERVICE_UNAVAILABLE, Json(json!({ "status": "unhealthy", - "message": "No endpoints available" + "message": "No endpoints available", + "instances": instances })), ) } else { @@ -97,7 +111,8 @@ async fn health_handler( StatusCode::OK, Json(json!({ "status": "healthy", - "endpoints": endpoints + "endpoints": endpoints, + "instances": instances })), ) } diff --git a/lib/llm/src/http/service/service_v2.rs b/lib/llm/src/http/service/service_v2.rs index 0b2af7763cc..ab762b6d430 100644 --- a/lib/llm/src/http/service/service_v2.rs +++ b/lib/llm/src/http/service/service_v2.rs @@ -14,11 +14,14 @@ use anyhow::Result; use derive_builder::Builder; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; +use dynamo_runtime::transports::etcd; + /// HTTP service shared state pub struct State { metrics: Arc, manager: Arc, + etcd_client: Option, } impl State { @@ -26,6 +29,15 @@ impl State { Self { manager, metrics: Arc::new(Metrics::default()), + etcd_client: None, + } + } + + pub fn new_with_etcd(manager: Arc, etcd_client: Option) -> Self { + Self { + manager, + metrics: Arc::new(Metrics::default()), + etcd_client, } } @@ -42,6 +54,10 @@ impl State { self.manager.clone() } + pub fn etcd_client(&self) -> Option<&etcd::Client> { + self.etcd_client.as_ref() + } + // TODO pub fn sse_keep_alive(&self) -> Option { None @@ -84,6 +100,9 @@ pub struct HttpServiceConfig { #[builder(default = "None")] request_template: Option, + + #[builder(default = "None")] + etcd_client: Option, } impl HttpService { @@ -155,7 +174,7 @@ impl HttpServiceConfigBuilder { let config: HttpServiceConfig = self.build_internal()?; let model_manager = Arc::new(ModelManager::new()); - let state = Arc::new(State::new(model_manager)); + let state = Arc::new(State::new_with_etcd(model_manager, config.etcd_client)); // enable prometheus metrics let registry = metrics::Registry::new(); @@ -225,4 +244,9 @@ impl HttpServiceConfigBuilder { self.request_template = Some(request_template); self } + + pub fn with_etcd_client(mut self, etcd_client: Option) -> Self { + self.etcd_client = Some(etcd_client); + self + } } diff --git a/lib/runtime/src/instances.rs b/lib/runtime/src/instances.rs new file mode 100644 index 00000000000..d436a0bd387 --- /dev/null +++ b/lib/runtime/src/instances.rs @@ -0,0 +1,34 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Instance management functions for the distributed runtime. +//! +//! This module provides functionality to list and manage instances across +//! the entire distributed system, complementing the component-specific +//! instance listing in `component.rs`. + +use crate::component::{Instance, INSTANCE_ROOT_PATH}; +use crate::transports::etcd::Client as EtcdClient; + +pub async fn list_all_instances(etcd_client: &EtcdClient) -> anyhow::Result> { + let mut instances = Vec::new(); + + for kv in etcd_client + .kv_get_prefix(format!("{}/", INSTANCE_ROOT_PATH)) + .await? + { + match serde_json::from_slice::(kv.value()) { + Ok(instance) => instances.push(instance), + Err(err) => { + tracing::warn!( + "Failed to parse instance from etcd: {}. Key: {}, Value: {}", + err, + kv.key_str().unwrap_or("invalid_key"), + kv.value_str().unwrap_or("invalid_value") + ); + } + } + } + + Ok(instances) +} diff --git a/lib/runtime/src/lib.rs b/lib/runtime/src/lib.rs index 925c12b7ffb..30fcd02fdd1 100644 --- a/lib/runtime/src/lib.rs +++ b/lib/runtime/src/lib.rs @@ -52,6 +52,7 @@ pub mod traits; pub mod transports; pub mod utils; pub mod worker; +pub mod instances; pub mod distributed; pub use futures::stream; From e6df2e0c152ccbc100bb7147ade73ea6eaf2269d Mon Sep 17 00:00:00 2001 From: Mudit Mathur <85603888+heisenberglit@users.noreply.github.com> Date: Fri, 18 Jul 2025 19:50:20 +0530 Subject: [PATCH 2/2] chore: fix formatting to pass cargo fmt check --- lib/llm/src/entrypoint/input/http.rs | 2 +- lib/llm/src/http/service/health.rs | 1 - lib/llm/src/http/service/service_v2.rs | 3 +-- lib/runtime/src/lib.rs | 2 +- 4 files changed, 3 insertions(+), 5 deletions(-) diff --git a/lib/llm/src/entrypoint/input/http.rs b/lib/llm/src/entrypoint/input/http.rs index 9dcb5f53cbf..8c0536af48c 100644 --- a/lib/llm/src/entrypoint/input/http.rs +++ b/lib/llm/src/entrypoint/input/http.rs @@ -36,7 +36,7 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul match engine_config { EngineConfig::Dynamic(_) => { - match etcd_client{ + match etcd_client { Some(ref etcd_client) => { let router_config = engine_config.local_model().router_config(); // Listen for models registering themselves in etcd, add them to HTTP service diff --git a/lib/llm/src/http/service/health.rs b/lib/llm/src/http/service/health.rs index cef349e7497..bb4f55245d0 100644 --- a/lib/llm/src/http/service/health.rs +++ b/lib/llm/src/http/service/health.rs @@ -92,7 +92,6 @@ async fn health_handler( vec![] }; - if model_entries.is_empty() { ( StatusCode::SERVICE_UNAVAILABLE, diff --git a/lib/llm/src/http/service/service_v2.rs b/lib/llm/src/http/service/service_v2.rs index ab762b6d430..817ce0fdbd8 100644 --- a/lib/llm/src/http/service/service_v2.rs +++ b/lib/llm/src/http/service/service_v2.rs @@ -12,10 +12,9 @@ use crate::discovery::ModelManager; use crate::request_template::RequestTemplate; use anyhow::Result; use derive_builder::Builder; +use dynamo_runtime::transports::etcd; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; -use dynamo_runtime::transports::etcd; - /// HTTP service shared state pub struct State { diff --git a/lib/runtime/src/lib.rs b/lib/runtime/src/lib.rs index 30fcd02fdd1..83de1432a4f 100644 --- a/lib/runtime/src/lib.rs +++ b/lib/runtime/src/lib.rs @@ -38,6 +38,7 @@ pub mod discovery; pub mod engine; pub mod http_server; pub use http_server::HttpServerInfo; +pub mod instances; pub mod logging; pub mod metrics; pub mod pipeline; @@ -52,7 +53,6 @@ pub mod traits; pub mod transports; pub mod utils; pub mod worker; -pub mod instances; pub mod distributed; pub use futures::stream;