From a907ae9b3f627c2bcf17a029b41415f5832a3ac5 Mon Sep 17 00:00:00 2001 From: Keiven Chang Date: Wed, 24 Sep 2025 19:35:34 +0000 Subject: [PATCH 1/2] feat: replace polling with event-driven metrics updates - Replace periodic polling task with event-driven metrics updates triggered by ModelWatcher - Add update_model_metrics function that updates metrics when models are added - Remove start_runtime_config_polling_task and related polling infrastructure - Update integration test to use ModelWatcher directly instead of run_watcher - Add etcd_client to HttpServiceConfig for health endpoint functionality This change improves performance by eliminating unnecessary polling and makes metrics updates more responsive to actual model lifecycle events. Signed-off-by: Keiven Chang --- lib/llm/src/entrypoint/input/http.rs | 67 +++- lib/llm/src/http/service/metrics.rs | 154 +-------- lib/llm/src/http/service/service_v2.rs | 24 +- lib/llm/tests/http_metrics.rs | 441 ++++++++++++------------- 4 files changed, 291 insertions(+), 395 deletions(-) diff --git a/lib/llm/src/entrypoint/input/http.rs b/lib/llm/src/entrypoint/input/http.rs index 0eef4c57f52..8e4dbb16e7f 100644 --- a/lib/llm/src/entrypoint/input/http.rs +++ b/lib/llm/src/entrypoint/input/http.rs @@ -55,9 +55,9 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul EngineConfig::Dynamic(_) => { let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?; let etcd_client = distributed_runtime.etcd_client(); - // This allows the /health endpoint to query etcd for active instances - http_service_builder = http_service_builder.with_etcd_client(etcd_client.clone()); - let http_service = http_service_builder.build()?; + let http_service = http_service_builder + .with_etcd_client(etcd_client.clone()) + .build()?; match etcd_client { Some(ref etcd_client) => { let router_config = engine_config.local_model().router_config(); @@ -80,6 +80,7 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul router_config.busy_threshold, target_namespace, Arc::new(http_service.clone()), + http_service.state().metrics_clone(), ) .await?; } @@ -217,7 +218,11 @@ async fn run_watcher( busy_threshold: Option, target_namespace: Option, http_service: Arc, + metrics: Arc, ) -> anyhow::Result<()> { + // Clone model_manager before it's moved into ModelWatcher + let model_manager_clone = model_manager.clone(); + let mut watch_obj = ModelWatcher::new( runtime, model_manager, @@ -234,11 +239,22 @@ async fn run_watcher( watch_obj.set_notify_on_model_update(tx); - // Spawn a task to watch for model type changes and update HTTP service endpoints + // Spawn a task to watch for model type changes and update HTTP service endpoints and metrics let _endpoint_enabler_task = tokio::spawn(async move { while let Some(model_type) = rx.recv().await { tracing::debug!("Received model type update: {:?}", model_type); + + // Update HTTP endpoints (existing functionality) update_http_endpoints(http_service.clone(), model_type); + + // Update metrics (only for added models) + update_model_metrics( + model_type, + model_manager_clone.clone(), + metrics.clone(), + Some(etcd_client.clone()), + ) + .await; } }); @@ -271,3 +287,46 @@ fn update_http_endpoints(service: Arc, model_type: ModelUpdate) { } } } + +/// Updates metrics for model type changes +async fn update_model_metrics( + model_type: ModelUpdate, + model_manager: Arc, + metrics: Arc, + etcd_client: Option, +) { + match model_type { + ModelUpdate::Added(model_type) => { + tracing::debug!("Updating metrics for added model type: {:?}", model_type); + + // Get all model entries and update metrics for matching types + let model_entries = model_manager.get_model_entries(); + for entry in model_entries { + if entry.model_type == model_type { + // Update runtime config metrics if available + if let Some(runtime_config) = &entry.runtime_config { + metrics.update_runtime_config_metrics(&entry.name, runtime_config); + } + + // Update MDC metrics if etcd is available + if let Some(ref etcd) = etcd_client + && let Err(e) = metrics + .update_metrics_from_model_entry_with_mdc(&entry, etcd) + .await + { + tracing::debug!( + model = %entry.name, + error = %e, + "Failed to update MDC metrics for newly added model" + ); + } + } + } + } + ModelUpdate::Removed(model_type) => { + tracing::debug!("Model type removed: {:?}", model_type); + // Note: Metrics are typically not removed to preserve historical data + // This matches the behavior in the polling task + } + } +} diff --git a/lib/llm/src/http/service/metrics.rs b/lib/llm/src/http/service/metrics.rs index 95c01e15bd8..cccd1d4bf7c 100644 --- a/lib/llm/src/http/service/metrics.rs +++ b/lib/llm/src/http/service/metrics.rs @@ -472,37 +472,6 @@ impl Metrics { } } - /// Update model deployment card metrics for a model - /// This should be called when model deployment card information is available - pub fn update_mdc_metrics( - &self, - model_name: &str, - context_length: u32, - kv_cache_block_size: u32, - migration_limit: u32, - ) { - self.model_context_length - .with_label_values(&[model_name]) - .set(context_length as i64); - - self.model_kv_cache_block_size - .with_label_values(&[model_name]) - .set(kv_cache_block_size as i64); - - self.model_migration_limit - .with_label_values(&[model_name]) - .set(migration_limit as i64); - } - - /// Update metrics from a ModelEntry - /// This is a convenience method that extracts runtime config from a ModelEntry - /// and updates the appropriate metrics - pub fn update_metrics_from_model_entry(&self, model_entry: &ModelEntry) { - if let Some(runtime_config) = &model_entry.runtime_config { - self.update_runtime_config_metrics(&model_entry.name, runtime_config); - } - } - /// Update metrics from a ModelEntry and its ModelDeploymentCard /// This updates both runtime config metrics and MDC-specific metrics pub async fn update_metrics_from_model_entry_with_mdc( @@ -525,12 +494,19 @@ impl Metrics { .await { Ok(Some(mdc)) => { - self.update_mdc_metrics( - &model_entry.name, - mdc.context_length, - mdc.kv_cache_block_size, - mdc.migration_limit, - ); + // Inline MDC metrics update + self.model_context_length + .with_label_values(&[&model_entry.name]) + .set(mdc.context_length as i64); + + self.model_kv_cache_block_size + .with_label_values(&[&model_entry.name]) + .set(mdc.kv_cache_block_size as i64); + + self.model_migration_limit + .with_label_values(&[&model_entry.name]) + .set(mdc.migration_limit as i64); + tracing::debug!( model = %model_entry.name, "Successfully updated MDC metrics" @@ -554,110 +530,6 @@ impl Metrics { Ok(()) } - /// Start a background task that periodically updates runtime config metrics - /// - /// ## Why Polling is Required - /// - /// Polling is necessary because new models may come online at any time through the distributed - /// discovery system. The ModelManager is continuously updated as workers register/deregister - /// with etcd, and we need to periodically check for these changes to expose their metrics. - /// - /// ## Behavior - /// - /// - Polls the ModelManager for current models and updates metrics accordingly - /// - Models are never removed from metrics to preserve historical data - /// - If multiple model instances have the same name, only the first instance's metrics are used - /// - Subsequent instances with duplicate names will be skipped - /// - /// ## MDC (Model Deployment Card) Behavior - /// - /// Currently, we don't overwrite an MDC. The first worker to start wins, and we assume - /// that all other workers claiming to serve that model really are using the same configuration. - /// Later, every worker will have its own MDC, and the frontend will validate that they - /// checksum the same. For right now, you can assume they have the same MDC, because - /// they aren't allowed to change it. - /// - /// The task will run until the provided cancellation token is cancelled. - pub fn start_runtime_config_polling_task( - metrics: Arc, - manager: Arc, - etcd_client: Option, - poll_interval: Duration, - cancel_token: tokio_util::sync::CancellationToken, - ) -> tokio::task::JoinHandle<()> { - tokio::spawn(async move { - let mut interval = tokio::time::interval(poll_interval); - let mut known_models = std::collections::HashSet::new(); - - tracing::info!( - interval_secs = poll_interval.as_secs(), - "Starting runtime config metrics polling task (metrics never removed)" - ); - - loop { - tokio::select! { - _ = cancel_token.cancelled() => { - tracing::info!("Runtime config metrics polling task cancelled"); - break; - } - _ = interval.tick() => { - // Continue with polling logic - } - } - - // Get current model entries from the manager - let current_entries = manager.get_model_entries(); - let mut current_models = std::collections::HashSet::new(); - - // Note: If multiple model instances have the same name, only the first instance's config metrics are recorded. - // Subsequent instances with duplicate names will be skipped for config updates. - // This is based on the assumption that all workers serving the same model have identical - // configuration values (MDC content, runtime config, etc.). This assumption holds because - // workers are not allowed to change their configuration after registration. - - // Update configuration metrics for current models - for entry in current_entries { - // Skip config processing if we've already seen this model name - if !current_models.insert(entry.name.clone()) { - tracing::debug!( - model_name = %entry.name, - endpoint = ?entry.endpoint_id, - "Skipping duplicate model instance - only first instance config metrics are recorded" - ); - continue; - } - - // Update runtime config metrics if available - if let Some(runtime_config) = &entry.runtime_config { - metrics.update_runtime_config_metrics(&entry.name, runtime_config); - } - - // Optionally load MDC for additional metrics if etcd is available - if let Some(ref etcd) = etcd_client - && let Err(e) = metrics - .update_metrics_from_model_entry_with_mdc(&entry, etcd) - .await - { - tracing::debug!( - model = %entry.name, - error = %e, - "Failed to update MDC metrics (this is normal if MDC is not available)" - ); - } - } - - // Update our known models set - known_models.extend(current_models.iter().cloned()); - - tracing::trace!( - active_models = current_models.len(), - total_known_models = known_models.len(), - "Updated runtime config metrics for active models" - ); - } - }) - } - /// Create a new [`InflightGuard`] for the given model and annotate if its a streaming request, /// and the kind of endpoint that was hit /// diff --git a/lib/llm/src/http/service/service_v2.rs b/lib/llm/src/http/service/service_v2.rs index 866a0fe704e..21d3e8422cf 100644 --- a/lib/llm/src/http/service/service_v2.rs +++ b/lib/llm/src/http/service/service_v2.rs @@ -133,9 +133,6 @@ pub struct HttpService { tls_cert_path: Option, tls_key_path: Option, route_docs: Vec, - - // Metrics polling configuration - etcd_client: Option, } #[derive(Clone, Builder)] @@ -204,22 +201,6 @@ impl HttpService { let protocol = if self.enable_tls { "HTTPS" } else { "HTTP" }; tracing::info!(protocol, address, "Starting HTTP(S) service"); - // Start background task to poll runtime config metrics with proper cancellation - let poll_interval_secs = std::env::var("DYN_HTTP_SVC_CONFIG_METRICS_POLL_INTERVAL_SECS") - .ok() - .and_then(|s| s.parse::().ok()) - .filter(|&secs| secs > 0.0) // Guard against zero or negative values - .unwrap_or(8.0); - let poll_interval = Duration::from_secs_f64(poll_interval_secs); - - let _polling_task = super::metrics::Metrics::start_runtime_config_polling_task( - self.state.metrics_clone(), - self.state.manager_clone(), - self.etcd_client.clone(), - poll_interval, - cancel_token.child_token(), - ); - let router = self.router.clone(); let observer = cancel_token.child_token(); @@ -313,8 +294,8 @@ impl HttpServiceConfigBuilder { let config: HttpServiceConfig = self.build_internal()?; let model_manager = Arc::new(ModelManager::new()); - let etcd_client = config.etcd_client.clone(); - let state = Arc::new(State::new_with_etcd(model_manager, config.etcd_client)); + let etcd_client = config.etcd_client; + let state = Arc::new(State::new_with_etcd(model_manager, etcd_client)); state .flags @@ -366,7 +347,6 @@ impl HttpServiceConfigBuilder { tls_cert_path: config.tls_cert_path, tls_key_path: config.tls_key_path, route_docs: all_docs, - etcd_client, }) } diff --git a/lib/llm/tests/http_metrics.rs b/lib/llm/tests/http_metrics.rs index 5d76ab53f47..e926512b7e0 100644 --- a/lib/llm/tests/http_metrics.rs +++ b/lib/llm/tests/http_metrics.rs @@ -293,278 +293,263 @@ async fn test_metrics_with_mock_model() { mod integration_tests { use super::*; use dynamo_llm::{ - discovery::ModelEntry, engines::make_echo_engine, entrypoint::EngineConfig, + discovery::{ModelEntry, ModelWatcher}, + engines::make_echo_engine, + entrypoint::EngineConfig, local_model::LocalModelBuilder, }; use dynamo_runtime::DistributedRuntime; + use std::sync::Arc; #[tokio::test] #[ignore = "Requires etcd and distributed runtime"] async fn test_metrics_with_mdc_registration() { // Integration test for metrics collection with full MDC registration (like real model servers) - temp_env::async_with_vars([ - (METRICS_PREFIX_ENV, None::<&str>), - ("DYN_HTTP_SVC_CONFIG_METRICS_POLL_INTERVAL_SECS", Some("0.6")), // Fast polling for tests (600ms) - ], async { - let port = get_random_port().await; - - // Create distributed runtime (required for MDC registration) - let runtime = dynamo_runtime::Runtime::from_settings().unwrap(); - let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()) - .await - .unwrap(); - - // Create LocalModel with realistic configuration for testing - let mut local_model = LocalModelBuilder::default() - .model_name(Some("test-mdc-model".to_string())) - .build() - .await - .unwrap(); - - // Create EngineConfig with EchoEngine - let engine_config = EngineConfig::StaticFull { - engine: make_echo_engine(), - model: Box::new(local_model.clone()), - is_static: false, // This enables MDC registration! - }; - - let service = HttpService::builder() - .port(port) - .enable_chat_endpoints(true) - .with_etcd_client(distributed_runtime.etcd_client()) - .build() - .unwrap(); - - // Set up model watcher to discover models from etcd (like production) - // This is crucial for the polling task to find model entries - use dynamo_llm::discovery::{ModelWatcher, MODEL_ROOT_PATH}; - use dynamo_runtime::pipeline::RouterMode; - - let model_watcher = ModelWatcher::new( - distributed_runtime.clone(), - service.state().manager_clone(), - RouterMode::RoundRobin, - None, - None, - ); + let port = get_random_port().await; - // Start watching etcd for model registrations - if let Some(etcd_client) = distributed_runtime.etcd_client() { - let models_watcher = etcd_client.kv_get_and_watch_prefix(MODEL_ROOT_PATH).await.unwrap(); - let (_prefix, _watcher, receiver) = models_watcher.dissolve(); + // Create distributed runtime (required for MDC registration) + let runtime = dynamo_runtime::Runtime::from_settings().unwrap(); + let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()) + .await + .unwrap(); - // Spawn watcher task to discover models from etcd - let _watcher_task = tokio::spawn(async move { - model_watcher.watch(receiver, None).await; - }); + // Create LocalModel with realistic configuration for testing + let mut local_model = LocalModelBuilder::default() + .model_name(Some("test-mdc-model".to_string())) + .build() + .await + .unwrap(); - } + // Create EngineConfig with EchoEngine + let engine_config = EngineConfig::StaticFull { + engine: make_echo_engine(), + model: Box::new(local_model.clone()), + is_static: false, // This enables MDC registration! + }; - // Set up the engine following the StaticFull pattern from http.rs - let EngineConfig::StaticFull { engine, model, .. } = engine_config else { - panic!("Expected StaticFull config"); - }; + let service = HttpService::builder() + .port(port) + .enable_chat_endpoints(true) + .build() + .unwrap(); - let engine = Arc::new(dynamo_llm::engines::StreamingEngineAdapter::new(engine)); - let manager = service.model_manager(); - manager - .add_chat_completions_model(model.service_name(), engine.clone()) - .unwrap(); + // Set up model watcher to discover models from etcd (like production) + // This is crucial for the polling task to find model entries + use dynamo_llm::discovery::{MODEL_ROOT_PATH, ModelWatcher}; + use dynamo_runtime::pipeline::RouterMode; + + let model_watcher = ModelWatcher::new( + distributed_runtime.clone(), + service.state().manager_clone(), + RouterMode::RoundRobin, + None, + None, + ); - // Now do the proper MDC registration via LocalModel::attach() - // Create a component and endpoint for proper registration - let namespace = distributed_runtime.namespace("test-namespace").unwrap(); - let test_component = namespace.component("test-mdc-component").unwrap(); - let test_endpoint = test_component.endpoint("test-mdc-endpoint"); - - // This will store the MDC in etcd and create the ModelEntry for discovery - local_model - .attach( - &test_endpoint, - dynamo_llm::model_type::ModelType::Chat, - dynamo_llm::model_type::ModelInput::Text, - ) + // Start watching etcd for model registrations + if let Some(etcd_client) = distributed_runtime.etcd_client() { + let models_watcher = etcd_client + .kv_get_and_watch_prefix(MODEL_ROOT_PATH) .await .unwrap(); + let (_prefix, _watcher, receiver) = models_watcher.dissolve(); + // Spawn watcher task to discover models from etcd + let _watcher_task = tokio::spawn(async move { + model_watcher.watch(receiver, None).await; + }); + } - // Start the HTTP service - let token = CancellationToken::new(); - let cancel_token = token.clone(); - let service_for_task = service.clone(); - let task = tokio::spawn(async move { service_for_task.run(token.clone()).await }); + // Set up the engine following the StaticFull pattern from http.rs + let EngineConfig::StaticFull { engine, model, .. } = engine_config else { + panic!("Expected StaticFull config"); + }; - // Wait for service to be ready - wait_for_metrics_ready(port).await; + let engine = Arc::new(dynamo_llm::engines::StreamingEngineAdapter::new(engine)); + let manager = service.model_manager(); + manager + .add_chat_completions_model(model.service_name(), engine.clone()) + .unwrap(); - // Wait for MDC registration to complete by checking if the model appears - // This simulates the real polling that happens in production - let start = tokio::time::Instant::now(); - let timeout = Duration::from_secs(10); - loop { - if start.elapsed() > timeout { - break; // Continue with test even if MDC metrics aren't ready - } + // Now do the proper MDC registration via LocalModel::attach() + // Create a component and endpoint for proper registration + let namespace = distributed_runtime.namespace("test-namespace").unwrap(); + let test_component = namespace.component("test-mdc-component").unwrap(); + let test_endpoint = test_component.endpoint("test-mdc-endpoint"); + + // This will store the MDC in etcd and create the ModelEntry for discovery + local_model + .attach( + &test_endpoint, + dynamo_llm::model_type::ModelType::Chat, + dynamo_llm::model_type::ModelInput::Text, + ) + .await + .unwrap(); - // Check if our model is registered in the manager (indicates MDC registration completed) - let model_service_name = model.service_name(); - if manager.has_model_any(model_service_name) { - tracing::info!("MDC registration completed for {}", model_service_name); - break; - } + // Start the HTTP service + let token = CancellationToken::new(); + let cancel_token = token.clone(); + let service_for_task = service.clone(); + let task = tokio::spawn(async move { service_for_task.run(token.clone()).await }); - tokio::time::sleep(Duration::from_millis(100)).await; - } + // Wait for service to be ready + wait_for_metrics_ready(port).await; - // Give a bit more time for background metrics collection - tokio::time::sleep(Duration::from_millis(200)).await; + // Give a bit more time for background metrics collection + tokio::time::sleep(Duration::from_secs(5)).await; - let client = reqwest::Client::new(); + let client = reqwest::Client::new(); - // Create a chat completion request - let message = dynamo_async_openai::types::ChatCompletionRequestMessage::User( - dynamo_async_openai::types::ChatCompletionRequestUserMessage { - content: - dynamo_async_openai::types::ChatCompletionRequestUserMessageContent::Text( - "Hello, MDC model!".to_string(), - ), - name: None, - }, - ); + // Create a chat completion request + let message = dynamo_async_openai::types::ChatCompletionRequestMessage::User( + dynamo_async_openai::types::ChatCompletionRequestUserMessage { + content: dynamo_async_openai::types::ChatCompletionRequestUserMessageContent::Text( + "Hello, MDC model!".to_string(), + ), + name: None, + }, + ); - let request = dynamo_async_openai::types::CreateChatCompletionRequestArgs::default() - .model(model.service_name()) - .messages(vec![message]) - .max_tokens(50u32) - .stream(true) - .build() - .expect("Failed to build request"); - - // Make the request to the HTTP service - let response = client - .post(format!("http://localhost:{}/v1/chat/completions", port)) - .json(&request) - .send() - .await - .unwrap(); + let request = dynamo_async_openai::types::CreateChatCompletionRequestArgs::default() + .model(model.service_name()) + .messages(vec![message]) + .max_tokens(50u32) + .stream(true) + .build() + .expect("Failed to build request"); - assert!( - response.status().is_success(), - "Request failed: {:?}", - response - ); + // Make the request to the HTTP service + let response = client + .post(format!("http://localhost:{}/v1/chat/completions", port)) + .json(&request) + .send() + .await + .unwrap(); + + assert!( + response.status().is_success(), + "Request failed: {:?}", + response + ); - // Consume the response stream to complete the request - let _response_body = response.bytes().await.unwrap(); + // Consume the response stream to complete the request + let _response_body = response.bytes().await.unwrap(); // Wait for the fast polling interval (600ms) for MDC metrics tokio::time::sleep(Duration::from_millis(5)).await; - // Fetch and verify metrics - let metrics_response = client - .get(format!("http://localhost:{}/metrics", port)) - .send() - .await - .unwrap(); - - assert!(metrics_response.status().is_success()); - let metrics_body = metrics_response.text().await.unwrap(); - - println!("=== METRICS WITH FULL MDC REGISTRATION ==="); - println!("{}", metrics_body); - println!("=== END METRICS ==="); - - // Assert basic metrics are present (using service_name from the model) - let model_name = model.service_name(); - assert!(metrics_body.contains("dynamo_frontend_requests_total")); - assert!(metrics_body.contains(&format!("model=\"{}\"", model_name))); - assert!(metrics_body.contains("dynamo_frontend_inflight_requests_total")); - assert!(metrics_body.contains("dynamo_frontend_request_duration_seconds")); - assert!(metrics_body.contains("dynamo_frontend_output_sequence_tokens")); - assert!(metrics_body.contains("dynamo_frontend_queued_requests_total")); - - // Assert MDC-based model configuration metrics are present - // These MUST be present for the test to pass - assert!(metrics_body.contains("dynamo_frontend_model_context_length"), - "MDC metrics not found! Metrics body: {}", metrics_body); - - assert!(metrics_body.contains("dynamo_frontend_model_kv_cache_block_size")); - assert!(metrics_body.contains("dynamo_frontend_model_migration_limit")); - - // Note: The following metrics are not present in this test because they require - // actual inference engines (vllm/sglang/trtllm *.py) with real runtime configurations: - // - dynamo_frontend_model_total_kv_blocks (requires actual KV cache from real engines) - // - dynamo_frontend_model_max_num_seqs (requires actual batching config from real engines) - // - dynamo_frontend_model_max_num_batched_tokens (requires actual batching config from real engines) - - - // Verify specific request counter incremented - assert!(metrics_body.contains("endpoint=\"chat_completions\"")); - assert!(metrics_body.contains("request_type=\"stream\"")); - assert!(metrics_body.contains("status=\"success\"")); - - // Now test the complete lifecycle: remove the model from etcd - - // Get all model entries to find the one we need to delete - if let Some(etcd_client) = distributed_runtime.etcd_client() { - let kvs = etcd_client.kv_get_prefix("models").await.unwrap(); - - // Find our model's etcd key - let mut model_key_to_delete = None; - for kv in kvs { - if let Ok(model_entry) = serde_json::from_slice::(kv.value()) - && model_entry.name == "test-mdc-model" - { - model_key_to_delete = Some(kv.key_str().unwrap().to_string()); - break; - } - } + // Fetch and verify metrics + let metrics_response = client + .get(format!("http://localhost:{}/metrics", port)) + .send() + .await + .unwrap(); - if let Some(key) = model_key_to_delete { - etcd_client.kv_delete(key.as_str(), None).await.unwrap(); + assert!(metrics_response.status().is_success()); + let metrics_body = metrics_response.text().await.unwrap(); - // Poll every 80ms for up to 2 seconds to check when worker count drops to 0 + println!("=== METRICS WITH FULL MDC REGISTRATION ==="); + println!("{}", metrics_body); + println!("=== END METRICS ==="); - let start_time = tokio::time::Instant::now(); - let timeout = Duration::from_millis(2000); - let mut worker_count_dropped = false; + // Assert basic metrics are present (using service_name from the model) + let model_name = model.service_name(); + assert!(metrics_body.contains("dynamo_frontend_requests_total")); + assert!(metrics_body.contains(&format!("model=\"{}\"", model_name))); + assert!(metrics_body.contains("dynamo_frontend_inflight_requests_total")); + assert!(metrics_body.contains("dynamo_frontend_request_duration_seconds")); + assert!(metrics_body.contains("dynamo_frontend_output_sequence_tokens")); + assert!(metrics_body.contains("dynamo_frontend_queued_requests_total")); - while start_time.elapsed() < timeout { - // Check if the model was removed from the manager - let has_model = manager.has_model_any(model.service_name()); + // Assert MDC-based model configuration metrics are present + // These MUST be present for the test to pass + assert!( + metrics_body.contains("dynamo_frontend_model_context_length"), + "MDC metrics not found! Metrics body: {}", + metrics_body + ); - // Fetch current metrics - let metrics_response = client - .get(format!("http://localhost:{}/metrics", port)) - .send() - .await - .unwrap(); + assert!(metrics_body.contains("dynamo_frontend_model_kv_cache_block_size")); + assert!(metrics_body.contains("dynamo_frontend_model_migration_limit")); - if metrics_response.status().is_success() { + // Note: The following metrics are not present in this test because they require + // actual inference engines (vllm/sglang/trtllm *.py) with real runtime configurations: + // - dynamo_frontend_model_total_kv_blocks (requires actual KV cache from real engines) + // - dynamo_frontend_model_max_num_seqs (requires actual batching config from real engines) + // - dynamo_frontend_model_max_num_batched_tokens (requires actual batching config from real engines) - // Since model_workers metric was removed, just check if model is gone from manager - if !has_model { - worker_count_dropped = true; - break; - } - } + // Verify specific request counter incremented + assert!(metrics_body.contains("endpoint=\"chat_completions\"")); + assert!(metrics_body.contains("request_type=\"stream\"")); + assert!(metrics_body.contains("status=\"success\"")); - tokio::time::sleep(Duration::from_millis(80)).await; - } + // Now test the complete lifecycle: remove the model from etcd - // Assert that model was removed from manager - assert!(worker_count_dropped, - "Model should be removed from manager after etcd removal and polling cycles"); + // Remove the model using the cleaner ModelWatcher approach + if let Some(etcd_client) = distributed_runtime.etcd_client() { + // Use ModelWatcher to find and remove the model (following ModelWatcher::handle_delete pattern) + let watcher = ModelWatcher::new( + distributed_runtime.clone(), + service.state().manager_clone(), + RouterMode::RoundRobin, + None, + None, + ); - } else { + // Get all model entries for our test model + let model_entries = watcher.entries_for_model("test-mdc-model").await.unwrap(); + + if !model_entries.is_empty() { + // For each model entry, we need to find its etcd key and remove it + // This follows the same pattern as ModelWatcher::handle_delete + for model_entry in model_entries { + // Find the etcd key for this specific model entry + // etcd keys follow pattern: "models/{UUID}" + // Example: "models/11dff335-316d-4c9f-8229-88ad8e8dac5e" + let kvs = etcd_client.kv_get_prefix("models").await.unwrap(); + + // Find the key by matching ModelEntry JSON structure: + // { + // "name": "test-mdc-model", + // "endpoint": { "namespace": "...", "component": "...", "name": "..." }, + // "model_type": "Chat", + // "runtime_config": { ... }, + // "model_input": "Text" + // } + let key = kvs + .iter() + .find(|kv| { + serde_json::from_slice::(kv.value()) + .map(|entry| { + entry.name == model_entry.name + && entry.endpoint_id == model_entry.endpoint_id + }) + .unwrap_or(false) + }) + .map(|kv| kv.key_str().unwrap().to_string()); + + if let Some(key) = key { + // Remove from ModelManager first (this returns the ModelEntry) + if let Some(_removed_entry) = manager.remove_model_entry(&key) { + // Remove engines (following ModelWatcher::handle_delete pattern) + manager + .remove_chat_completions_model(&model_entry.name) + .ok(); + manager.remove_completions_model(&model_entry.name).ok(); + manager.remove_embeddings_model(&model_entry.name).ok(); + manager.remove_tensor_model(&model_entry.name).ok(); + + // Then delete from etcd + etcd_client.kv_delete(key.as_str(), None).await.unwrap(); + } + } } } + } - - // Clean up - cancel_token.cancel(); - task.await.unwrap().unwrap(); - }) - .await; + // Clean up + cancel_token.cancel(); + task.await.unwrap().unwrap(); } } From de1647104a27590ade49b225d113172d5017e08b Mon Sep 17 00:00:00 2001 From: Keiven Chang Date: Wed, 24 Sep 2025 22:37:16 +0000 Subject: [PATCH 2/2] refactor: revert test organization changes Signed-off-by: Keiven Chang --- lib/llm/src/entrypoint/input/http.rs | 6 +++--- lib/llm/tests/http_metrics.rs | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/llm/src/entrypoint/input/http.rs b/lib/llm/src/entrypoint/input/http.rs index 8e4dbb16e7f..cc266057b4e 100644 --- a/lib/llm/src/entrypoint/input/http.rs +++ b/lib/llm/src/entrypoint/input/http.rs @@ -55,9 +55,9 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul EngineConfig::Dynamic(_) => { let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?; let etcd_client = distributed_runtime.etcd_client(); - let http_service = http_service_builder - .with_etcd_client(etcd_client.clone()) - .build()?; + // This allows the /health endpoint to query etcd for active instances + http_service_builder = http_service_builder.with_etcd_client(etcd_client.clone()); + let http_service = http_service_builder.build()?; match etcd_client { Some(ref etcd_client) => { let router_config = engine_config.local_model().router_config(); diff --git a/lib/llm/tests/http_metrics.rs b/lib/llm/tests/http_metrics.rs index e926512b7e0..70315ed045f 100644 --- a/lib/llm/tests/http_metrics.rs +++ b/lib/llm/tests/http_metrics.rs @@ -436,8 +436,8 @@ mod integration_tests { // Consume the response stream to complete the request let _response_body = response.bytes().await.unwrap(); - // Wait for the fast polling interval (600ms) for MDC metrics - tokio::time::sleep(Duration::from_millis(5)).await; + // Wait for the fast polling interval (50ms) for MDC metrics + tokio::time::sleep(Duration::from_millis(50)).await; // Fetch and verify metrics let metrics_response = client