diff --git a/components/backends/vllm/src/dynamo/vllm/main.py b/components/backends/vllm/src/dynamo/vllm/main.py index decae44828..43d7c5181f 100644 --- a/components/backends/vllm/src/dynamo/vllm/main.py +++ b/components/backends/vllm/src/dynamo/vllm/main.py @@ -131,7 +131,12 @@ async def init_prefill(runtime: DistributedRuntime, config: Config): """ Instantiate and serve """ - component = runtime.namespace(config.namespace).component(config.component) + + component = ( + runtime.namespace(config.namespace) + .component(config.component) + .add_labels([("model", config.model)]) + ) await component.create_service() generate_endpoint = component.endpoint(config.endpoint) @@ -164,7 +169,11 @@ async def init(runtime: DistributedRuntime, config: Config): Instantiate and serve """ - component = runtime.namespace(config.namespace).component(config.component) + component = ( + runtime.namespace(config.namespace) + .component(config.component) + .add_labels([("model", config.model)]) + ) await component.create_service() generate_endpoint = component.endpoint(config.endpoint) diff --git a/components/metrics/src/lib.rs b/components/metrics/src/lib.rs index bf925b96e3..5f9ddec6c4 100644 --- a/components/metrics/src/lib.rs +++ b/components/metrics/src/lib.rs @@ -145,6 +145,7 @@ impl MetricsMode { pub struct LLMWorkerLoadCapacityConfig { pub component_name: String, pub endpoint_name: String, + pub model_name: Option, } /// Metrics collector for exposing metrics to prometheus/grafana diff --git a/components/metrics/src/main.rs b/components/metrics/src/main.rs index 58a2979012..cbbf36fd3a 100644 --- a/components/metrics/src/main.rs +++ b/components/metrics/src/main.rs @@ -31,6 +31,7 @@ use dynamo_llm::kv_router::scheduler::KVHitRateEvent; use dynamo_llm::kv_router::KV_HIT_RATE_SUBJECT; use dynamo_runtime::{ error, logging, + metrics::MetricsRegistry, traits::events::{EventPublisher, EventSubscriber}, utils::{Duration, Instant}, DistributedRuntime, ErrorContext, Result, Runtime, Worker, @@ -60,6 +61,10 @@ struct Args { #[arg(long)] endpoint: String, + /// Model name for the target component (optional) + #[arg(long)] + model_name: Option, + /// Polling interval in seconds for scraping dynamo endpoint stats (minimum 1 second) #[arg(long, default_value = "1")] poll_interval: u64, @@ -109,6 +114,7 @@ fn get_config(args: &Args) -> Result { Ok(LLMWorkerLoadCapacityConfig { component_name: args.component.clone(), endpoint_name: args.endpoint.clone(), + model_name: args.model_name.clone(), }) } @@ -131,7 +137,14 @@ async fn app(runtime: Runtime) -> Result<()> { .await .context("Unable to create unique instance of Count; possibly one already exists")?; - let target_component = namespace.component(&config.component_name)?; + let target_component = { + let c = namespace.component(&config.component_name)?; + if let Some(ref model) = config.model_name { + c.add_labels(&[("model", model.as_str())])? + } else { + c + } + }; let target_endpoint = target_component.endpoint(&config.endpoint_name); let service_path = target_endpoint.path(); diff --git a/lib/bindings/python/rust/lib.rs b/lib/bindings/python/rust/lib.rs index 39ff3fd1a8..615a1dca8a 100644 --- a/lib/bindings/python/rust/lib.rs +++ b/lib/bindings/python/rust/lib.rs @@ -480,6 +480,21 @@ impl Component { Ok(()) }) } + + /// Add constant labels to this component (for metrics). Returns a new Component with labels. + /// labels: list of (key, value) tuples. + fn add_labels(&self, labels: Vec<(String, String)>) -> PyResult { + use rs::metrics::MetricsRegistry as _; + let pairs: Vec<(&str, &str)> = labels + .iter() + .map(|(k, v)| (k.as_str(), v.as_str())) + .collect(); + let inner = self.inner.clone().add_labels(&pairs).map_err(to_pyerr)?; + Ok(Component { + inner, + event_loop: self.event_loop.clone(), + }) + } } #[pymethods] diff --git a/lib/llm/src/discovery/watcher.rs b/lib/llm/src/discovery/watcher.rs index 18fa84809c..66082c8218 100644 --- a/lib/llm/src/discovery/watcher.rs +++ b/lib/llm/src/discovery/watcher.rs @@ -7,6 +7,7 @@ use anyhow::Context as _; use tokio::sync::{mpsc::Receiver, Notify}; use dynamo_runtime::{ + metrics::MetricsRegistry, pipeline::{ network::egress::push_router::PushRouter, ManyOut, Operator, RouterMode, SegmentSource, ServiceBackend, SingleIn, Source, @@ -169,7 +170,8 @@ impl ModelWatcher { let component = self .drt .namespace(&endpoint_id.namespace)? - .component(&endpoint_id.component)?; + .component(&endpoint_id.component) + .and_then(|c| c.add_labels(&[("model", &model_entry.name)]))?; let client = component.endpoint(&endpoint_id.name).client().await?; let Some(etcd_client) = self.drt.etcd_client() else { diff --git a/lib/llm/src/entrypoint/input/common.rs b/lib/llm/src/entrypoint/input/common.rs index ee3e8fe22d..ff0e86f798 100644 --- a/lib/llm/src/entrypoint/input/common.rs +++ b/lib/llm/src/entrypoint/input/common.rs @@ -22,10 +22,12 @@ use crate::{ Annotated, }, }; + use dynamo_runtime::{ component::Client, distributed::DistributedConfig, engine::{AsyncEngineStream, Data}, + metrics::MetricsRegistry, pipeline::{ Context, ManyOut, Operator, PushRouter, RouterMode, SegmentSource, ServiceBackend, ServiceEngine, ServiceFrontend, SingleIn, Source, @@ -109,7 +111,9 @@ pub async fn prepare_engine( let endpoint_id = local_model.endpoint_id(); let component = distributed_runtime .namespace(&endpoint_id.namespace)? - .component(&endpoint_id.component)?; + .component(&endpoint_id.component) + .and_then(|c| c.add_labels(&[("model", card.slug().to_string().as_str())]))?; + let client = component.endpoint(&endpoint_id.name).client().await?; let kv_chooser = if router_mode == RouterMode::KV { diff --git a/lib/llm/src/entrypoint/input/endpoint.rs b/lib/llm/src/entrypoint/input/endpoint.rs index 467fff6027..7b4996f002 100644 --- a/lib/llm/src/entrypoint/input/endpoint.rs +++ b/lib/llm/src/entrypoint/input/endpoint.rs @@ -15,7 +15,9 @@ use crate::{ Annotated, }, }; + use dynamo_runtime::engine::AsyncEngineStream; +use dynamo_runtime::metrics::MetricsRegistry; use dynamo_runtime::pipeline::{ network::Ingress, Context, ManyOut, Operator, SegmentSource, ServiceBackend, SingleIn, Source, }; @@ -31,9 +33,25 @@ pub async fn run( let cancel_token = distributed_runtime.primary_token().clone(); let endpoint_id: EndpointId = path.parse()?; + let model_name = match &engine_config { + EngineConfig::StaticFull { model, .. } | EngineConfig::StaticCore { model, .. } => { + Some(model.service_name().to_string()) + } + EngineConfig::StaticRemote(model) | EngineConfig::Dynamic(model) => { + Some(model.service_name().to_string()) + } + }; + let component = distributed_runtime .namespace(&endpoint_id.namespace)? - .component(&endpoint_id.component)?; + .component(&endpoint_id.component) + .and_then(|c| { + if let Some(ref name) = model_name { + c.add_labels(&[("model", name.as_str())]) + } else { + Ok(c) + } + })?; let endpoint = component .service_builder() .create() diff --git a/lib/runtime/examples/hello_world/src/bin/client.rs b/lib/runtime/examples/hello_world/src/bin/client.rs index 9ab2805357..46bfd44b5a 100644 --- a/lib/runtime/examples/hello_world/src/bin/client.rs +++ b/lib/runtime/examples/hello_world/src/bin/client.rs @@ -14,8 +14,8 @@ // limitations under the License. use dynamo_runtime::{ - logging, pipeline::PushRouter, protocols::annotated::Annotated, stream::StreamExt, - DistributedRuntime, Result, Runtime, Worker, + logging, metrics::MetricsRegistry, pipeline::PushRouter, protocols::annotated::Annotated, + stream::StreamExt, DistributedRuntime, Result, Runtime, Worker, }; use hello_world::DEFAULT_NAMESPACE; @@ -31,6 +31,7 @@ async fn app(runtime: Runtime) -> Result<()> { let client = distributed .namespace(DEFAULT_NAMESPACE)? .component("backend")? + .add_labels(&[("model", "hello_world_model")])? .endpoint("generate") .client() .await?; diff --git a/lib/runtime/examples/hello_world/src/bin/server.rs b/lib/runtime/examples/hello_world/src/bin/server.rs index e1e22ce897..153f3df0bc 100644 --- a/lib/runtime/examples/hello_world/src/bin/server.rs +++ b/lib/runtime/examples/hello_world/src/bin/server.rs @@ -15,6 +15,7 @@ use dynamo_runtime::{ logging, + metrics::MetricsRegistry, pipeline::{ async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn, @@ -69,6 +70,7 @@ async fn backend(runtime: DistributedRuntime) -> Result<()> { runtime .namespace(DEFAULT_NAMESPACE)? .component("backend")? + .add_labels(&[("model", "hello_world_model")])? .service_builder() .create() .await? diff --git a/lib/runtime/examples/service_metrics/src/bin/service_client.rs b/lib/runtime/examples/service_metrics/src/bin/service_client.rs index 7952fc3576..b1d90672d7 100644 --- a/lib/runtime/examples/service_metrics/src/bin/service_client.rs +++ b/lib/runtime/examples/service_metrics/src/bin/service_client.rs @@ -17,8 +17,8 @@ use futures::StreamExt; use service_metrics::DEFAULT_NAMESPACE; use dynamo_runtime::{ - logging, pipeline::PushRouter, protocols::annotated::Annotated, utils::Duration, - DistributedRuntime, Result, Runtime, Worker, + logging, metrics::MetricsRegistry, pipeline::PushRouter, protocols::annotated::Annotated, + utils::Duration, DistributedRuntime, Result, Runtime, Worker, }; fn main() -> Result<()> { @@ -31,7 +31,9 @@ async fn app(runtime: Runtime) -> Result<()> { let distributed = DistributedRuntime::from_settings(runtime.clone()).await?; let namespace = distributed.namespace(DEFAULT_NAMESPACE)?; - let component = namespace.component("backend")?; + let component = namespace + .component("backend")? + .add_labels(&[("model", "service_metrics_model")])?; let client = component.endpoint("generate").client().await?; diff --git a/lib/runtime/examples/service_metrics/src/bin/service_server.rs b/lib/runtime/examples/service_metrics/src/bin/service_server.rs index c8ab9ed13c..54fc295ff6 100644 --- a/lib/runtime/examples/service_metrics/src/bin/service_server.rs +++ b/lib/runtime/examples/service_metrics/src/bin/service_server.rs @@ -17,6 +17,7 @@ use service_metrics::{MyStats, DEFAULT_NAMESPACE}; use dynamo_runtime::{ logging, + metrics::MetricsRegistry, pipeline::{ async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn, @@ -71,6 +72,7 @@ async fn backend(runtime: DistributedRuntime) -> Result<()> { runtime .namespace(DEFAULT_NAMESPACE)? .component("backend")? + .add_labels(&[("model", "service_metrics_model")])? .service_builder() .create() .await? diff --git a/lib/runtime/examples/system_metrics/src/lib.rs b/lib/runtime/examples/system_metrics/src/lib.rs index bfada2858d..2f131c1abe 100644 --- a/lib/runtime/examples/system_metrics/src/lib.rs +++ b/lib/runtime/examples/system_metrics/src/lib.rs @@ -16,6 +16,7 @@ use std::sync::Arc; pub const DEFAULT_NAMESPACE: &str = "dyn_example_namespace"; pub const DEFAULT_COMPONENT: &str = "dyn_example_component"; pub const DEFAULT_ENDPOINT: &str = "dyn_example_endpoint"; +pub const DEFAULT_MODEL_NAME: &str = "dyn_example_model"; /// Stats structure returned by the endpoint's stats handler #[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] @@ -90,6 +91,7 @@ pub async fn backend(drt: DistributedRuntime, endpoint_name: Option<&str>) -> Re let endpoint = drt .namespace(DEFAULT_NAMESPACE)? .component(DEFAULT_COMPONENT)? + .add_labels(&[("model", DEFAULT_MODEL_NAME)])? .service_builder() .create() .await? diff --git a/lib/runtime/src/component.rs b/lib/runtime/src/component.rs index 2e0366cc8b..e00e1268ea 100644 --- a/lib/runtime/src/component.rs +++ b/lib/runtime/src/component.rs @@ -125,6 +125,10 @@ pub struct Component { #[validate(custom(function = "validate_allowed_chars"))] name: String, + /// Additional labels for metrics + #[builder(default = "Vec::new()")] + labels: Vec<(String, String)>, + // todo - restrict the namespace to a-z0-9-_A-Z /// Namespace #[builder(setter(into))] @@ -183,6 +187,16 @@ impl MetricsRegistry for Component { ] .concat() } + + fn stored_labels(&self) -> Vec<(&str, &str)> { + let mut all_labels = self.namespace.stored_labels(); + all_labels.extend(self.labels.iter().map(|(k, v)| (k.as_str(), v.as_str()))); + all_labels + } + + fn labels_mut(&mut self) -> &mut Vec<(String, String)> { + &mut self.labels + } } impl Component { @@ -220,6 +234,7 @@ impl Component { component: self.clone(), name: endpoint.into(), is_static: self.is_static, + labels: Vec::new(), } } @@ -285,6 +300,9 @@ pub struct Endpoint { name: String, is_static: bool, + + /// Additional labels for metrics + labels: Vec<(String, String)>, } impl Hash for Endpoint { @@ -329,6 +347,16 @@ impl MetricsRegistry for Endpoint { ] .concat() } + + fn stored_labels(&self) -> Vec<(&str, &str)> { + let mut all_labels = self.component.stored_labels(); + all_labels.extend(self.labels.iter().map(|(k, v)| (k.as_str(), v.as_str()))); + all_labels + } + + fn labels_mut(&mut self) -> &mut Vec<(String, String)> { + &mut self.labels + } } impl Endpoint { @@ -447,6 +475,10 @@ pub struct Namespace { #[builder(default = "None")] parent: Option>, + + /// Additional labels for metrics + #[builder(default = "Vec::new()")] + labels: Vec<(String, String)>, } impl DistributedRuntimeProvider for Namespace { diff --git a/lib/runtime/src/component/namespace.rs b/lib/runtime/src/component/namespace.rs index a7c1bfe7a8..c1beee55a9 100644 --- a/lib/runtime/src/component/namespace.rs +++ b/lib/runtime/src/component/namespace.rs @@ -86,6 +86,18 @@ impl MetricsRegistry for Namespace { fn parent_hierarchy(&self) -> Vec { vec![self.drt().basename()] } + + fn stored_labels(&self) -> Vec<(&str, &str)> { + // Convert Vec<(String, String)> to Vec<(&str, &str)> + self.labels + .iter() + .map(|(k, v)| (k.as_str(), v.as_str())) + .collect() + } + + fn labels_mut(&mut self) -> &mut Vec<(String, String)> { + &mut self.labels + } } #[cfg(feature = "integration")] diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index c4bb7c965c..28b154a748 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -40,6 +40,18 @@ impl MetricsRegistry for DistributedRuntime { fn parent_hierarchy(&self) -> Vec { vec![] // drt is the root, so no parent hierarchy } + + fn stored_labels(&self) -> Vec<(&str, &str)> { + // Convert Vec<(String, String)> to Vec<(&str, &str)> + self.labels + .iter() + .map(|(k, v)| (k.as_str(), v.as_str())) + .collect() + } + + fn labels_mut(&mut self) -> &mut Vec<(String, String)> { + &mut self.labels + } } impl DistributedRuntime { @@ -111,6 +123,7 @@ impl DistributedRuntime { prometheus::Registry, >::new())), system_health, + labels: Vec::new(), }; // Start metrics server if enabled diff --git a/lib/runtime/src/lib.rs b/lib/runtime/src/lib.rs index 351b2aee6c..6cd2f2281c 100644 --- a/lib/runtime/src/lib.rs +++ b/lib/runtime/src/lib.rs @@ -178,4 +178,7 @@ pub struct DistributedRuntime { // This map associates metric prefixes with their corresponding Prometheus registries. prometheus_registries_by_prefix: Arc>>, + + // Additional labels for metrics + labels: Vec<(String, String)>, } diff --git a/lib/runtime/src/metrics.rs b/lib/runtime/src/metrics.rs index 78169bc431..a3a499c78c 100644 --- a/lib/runtime/src/metrics.rs +++ b/lib/runtime/src/metrics.rs @@ -65,6 +65,21 @@ fn lint_prometheus_name(name: &str) -> anyhow::Result { Ok(sanitized) } +/// Validate that a label slice has no duplicate keys. +/// Returns Ok(()) when all keys are unique; otherwise returns an error naming the duplicate key. +fn validate_no_duplicate_label_keys(labels: &[(&str, &str)]) -> anyhow::Result<()> { + let mut seen_keys = std::collections::HashSet::new(); + for (key, _) in labels { + if !seen_keys.insert(*key) { + return Err(anyhow::anyhow!( + "Duplicate label key '{}' found in labels", + key + )); + } + } + Ok(()) +} + /// Trait that defines common behavior for Prometheus metric types pub trait PrometheusMetric: prometheus::core::Collector + Clone + Send + Sync + 'static { /// Create a new metric with the given options @@ -196,7 +211,16 @@ fn create_metric( const_labels: Option<&[&str]>, ) -> anyhow::Result { // Validate that user-provided labels don't have duplicate keys - let mut seen_keys = std::collections::HashSet::new(); + validate_no_duplicate_label_keys(labels)?; + // Validate that user-provided labels don't conflict with stored labels + for (key, _) in registry.stored_labels() { + if labels.iter().any(|(k, _)| *k == key) { + return Err(anyhow::anyhow!( + "Label key '{}' already exists in registry.", + key + )); + } + } let basename = registry.basename(); let parent_hierarchy = registry.parent_hierarchy(); @@ -206,16 +230,7 @@ fn create_metric( let metric_name = build_metric_name(metric_name); - // Validate that user-provided labels don't have duplicate keys - for (key, _) in labels { - if !seen_keys.insert(*key) { - return Err(anyhow::anyhow!( - "Duplicate label key '{}' found in labels", - key - )); - } - } - // Build updated_labels: auto-labels first, then user labels + // Build updated_labels: auto-labels first, then `labels` + stored labels let mut updated_labels: Vec<(String, String)> = Vec::new(); if USE_AUTO_LABELS { @@ -266,6 +281,13 @@ fn create_metric( .iter() .map(|(k, v)| ((*k).to_string(), (*v).to_string())), ); + // Add stored labels (safe because overlaps were rejected above) + updated_labels.extend( + registry + .stored_labels() + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())), + ); // Handle different metric types let metric = if std::any::TypeId::of::() == std::any::TypeId::of::() { @@ -386,6 +408,47 @@ pub trait MetricsRegistry: Send + Sync + crate::traits::DistributedRuntimeProvid // Get the name of this registry (without any prefix) fn basename(&self) -> String; + /// Get any stored labels for this registry + fn stored_labels(&self) -> Vec<(&str, &str)> { + Vec::new() + } + + /// Get mutable access to the labels storage - implementors must provide this + fn labels_mut(&mut self) -> &mut Vec<(String, String)>; + + /// Add labels to this registry and return a new instance with the labels. + /// This allows for method chaining like: runtime.namespace(...).add_labels(...)? + /// Fails if: + /// - Provided `labels` contains duplicate keys, or + /// - Any provided key already exists in the registry's stored labels. + fn add_labels(mut self, labels: &[(&str, &str)]) -> anyhow::Result + where + Self: Sized, + { + validate_no_duplicate_label_keys(labels)?; + + // 2) Validate no overlap with existing stored labels + let existing: std::collections::HashSet<&str> = + self.stored_labels().into_iter().map(|(k, _)| k).collect(); + if let Some(conflict) = labels + .iter() + .map(|(k, _)| *k) + .find(|k| existing.contains(k)) + { + return Err(anyhow::anyhow!( + "Label key '{}' already exists in registry; refusing to overwrite", + conflict + )); + } + + // 3) Safe to append + let labels_storage = self.labels_mut(); + for (key, value) in labels { + labels_storage.push((key.to_string(), value.to_string())); + } + Ok(self) + } + /// Retrieve the complete hierarchy and basename for this registry. Currently, the prefix for drt is an empty string, /// so we must account for the leading underscore. The existing code remains unchanged to accommodate any future /// scenarios where drt's prefix might be assigned a value. @@ -848,6 +911,33 @@ mod test_simple_metricsregistry_trait { use prometheus::Counter; use std::sync::Arc; + #[test] + fn test_component_prometheus_output_contains_custom_label() { + // Arrange: DRT → namespace → component with a custom label + let drt = create_test_drt(); + let namespace = drt.namespace("testnamespace").unwrap(); + let component = namespace + .component("testcomponent") + .unwrap() + .add_labels(&[("service", "api")]) + .unwrap(); + + // Act: create a simple gauge and render Prometheus text + let gauge = component + .create_gauge("with_label", "Gauge with custom label", &[]) + .unwrap(); + gauge.set(1.0); + + let output = component.prometheus_metrics_fmt().unwrap(); + + // Assert: custom label is present (don’t rely on label ordering) + assert!( + output.contains("dynamo_component_with_label{") && output.contains(r#"service="api""#), + "Expected custom label service=\"api\" in Prometheus output:\n{}", + output + ); + } + #[test] fn test_factory_methods_via_registry_trait() { // Setup real DRT and registry using the test-friendly constructor