Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions lib/llm/src/entrypoint/input/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,22 @@ use dynamo_runtime::{DistributedRuntime, Runtime};

/// Build and run an HTTP service
pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Result<()> {
let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
let etcd_client = distributed_runtime.etcd_client().clone();

let http_service = service_v2::HttpService::builder()
.port(engine_config.local_model().http_port())
.enable_chat_endpoints(true)
.enable_cmpl_endpoints(true)
.enable_embeddings_endpoints(true)
.with_request_template(engine_config.local_model().request_template())
.with_etcd_client(etcd_client.clone())
.build()?;

match engine_config {
EngineConfig::Dynamic(_) => {
let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
match distributed_runtime.etcd_client() {
Some(etcd_client) => {
match etcd_client {
Some(ref etcd_client) => {
let router_config = engine_config.local_model().router_config();
// Listen for models registering themselves in etcd, add them to HTTP service
run_watcher(
Expand Down
18 changes: 16 additions & 2 deletions lib/llm/src/http/service/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

use super::{service_v2, RouteDoc};
use axum::{http::Method, http::StatusCode, response::IntoResponse, routing::get, Json, Router};
use dynamo_runtime::instances::list_all_instances;
use serde_json::json;
use std::sync::Arc;

Expand Down Expand Up @@ -79,13 +80,25 @@ async fn health_handler(
axum::extract::State(state): axum::extract::State<Arc<service_v2::State>>,
) -> impl IntoResponse {
let model_entries = state.manager().get_model_entries();
let instances = if let Some(etcd_client) = state.etcd_client() {
match list_all_instances(etcd_client).await {
Ok(instances) => instances,
Err(err) => {
tracing::warn!("Failed to fetch instances from etcd: {}", err);
vec![]
}
}
} else {
vec![]
};

if model_entries.is_empty() {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({
"status": "unhealthy",
"message": "No endpoints available"
"message": "No endpoints available",
"instances": instances
})),
)
} else {
Expand All @@ -97,7 +110,8 @@ async fn health_handler(
StatusCode::OK,
Json(json!({
"status": "healthy",
"endpoints": endpoints
"endpoints": endpoints,
"instances": instances
})),
)
}
Expand Down
25 changes: 24 additions & 1 deletion lib/llm/src/http/service/service_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,31 @@ use crate::discovery::ModelManager;
use crate::request_template::RequestTemplate;
use anyhow::Result;
use derive_builder::Builder;
use dynamo_runtime::transports::etcd;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;

/// HTTP service shared state
pub struct State {
metrics: Arc<Metrics>,
manager: Arc<ModelManager>,
etcd_client: Option<etcd::Client>,
}

impl State {
pub fn new(manager: Arc<ModelManager>) -> Self {
Self {
manager,
metrics: Arc::new(Metrics::default()),
etcd_client: None,
}
}

pub fn new_with_etcd(manager: Arc<ModelManager>, etcd_client: Option<etcd::Client>) -> Self {
Self {
manager,
metrics: Arc::new(Metrics::default()),
etcd_client,
}
}

Expand All @@ -42,6 +53,10 @@ impl State {
self.manager.clone()
}

pub fn etcd_client(&self) -> Option<&etcd::Client> {
self.etcd_client.as_ref()
}

// TODO
pub fn sse_keep_alive(&self) -> Option<Duration> {
None
Expand Down Expand Up @@ -84,6 +99,9 @@ pub struct HttpServiceConfig {

#[builder(default = "None")]
request_template: Option<RequestTemplate>,

#[builder(default = "None")]
etcd_client: Option<etcd::Client>,
}

impl HttpService {
Expand Down Expand Up @@ -155,7 +173,7 @@ impl HttpServiceConfigBuilder {
let config: HttpServiceConfig = self.build_internal()?;

let model_manager = Arc::new(ModelManager::new());
let state = Arc::new(State::new(model_manager));
let state = Arc::new(State::new_with_etcd(model_manager, config.etcd_client));

// enable prometheus metrics
let registry = metrics::Registry::new();
Expand Down Expand Up @@ -225,4 +243,9 @@ impl HttpServiceConfigBuilder {
self.request_template = Some(request_template);
self
}

pub fn with_etcd_client(mut self, etcd_client: Option<etcd::Client>) -> Self {
self.etcd_client = Some(etcd_client);
self
}
}
34 changes: 34 additions & 0 deletions lib/runtime/src/instances.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! Instance management functions for the distributed runtime.
//!
//! This module provides functionality to list and manage instances across
//! the entire distributed system, complementing the component-specific
//! instance listing in `component.rs`.

use crate::component::{Instance, INSTANCE_ROOT_PATH};
use crate::transports::etcd::Client as EtcdClient;

pub async fn list_all_instances(etcd_client: &EtcdClient) -> anyhow::Result<Vec<Instance>> {
let mut instances = Vec::new();

for kv in etcd_client
.kv_get_prefix(format!("{}/", INSTANCE_ROOT_PATH))
.await?
{
match serde_json::from_slice::<Instance>(kv.value()) {
Ok(instance) => instances.push(instance),
Err(err) => {
tracing::warn!(
"Failed to parse instance from etcd: {}. Key: {}, Value: {}",
err,
kv.key_str().unwrap_or("invalid_key"),
kv.value_str().unwrap_or("invalid_value")
);
}
}
}

Ok(instances)
}
1 change: 1 addition & 0 deletions lib/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub mod discovery;
pub mod engine;
pub mod http_server;
pub use http_server::HttpServerInfo;
pub mod instances;
pub mod logging;
pub mod metrics;
pub mod pipeline;
Expand Down
Loading