Skip to content
Merged
13 changes: 11 additions & 2 deletions components/backends/vllm/src/dynamo/vllm/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,12 @@ async def init_prefill(runtime: DistributedRuntime, config: Config):
"""
Instantiate and serve
"""
component = runtime.namespace(config.namespace).component(config.component)

component = (
runtime.namespace(config.namespace)
.component(config.component)
.add_labels([("model", config.model)])
)
await component.create_service()

generate_endpoint = component.endpoint(config.endpoint)
Expand Down Expand Up @@ -164,7 +169,11 @@ async def init(runtime: DistributedRuntime, config: Config):
Instantiate and serve
"""

component = runtime.namespace(config.namespace).component(config.component)
component = (
runtime.namespace(config.namespace)
.component(config.component)
.add_labels([("model", config.model)])
)
await component.create_service()

generate_endpoint = component.endpoint(config.endpoint)
Expand Down
1 change: 1 addition & 0 deletions components/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ impl MetricsMode {
pub struct LLMWorkerLoadCapacityConfig {
pub component_name: String,
pub endpoint_name: String,
pub model_name: Option<String>,
}

/// Metrics collector for exposing metrics to prometheus/grafana
Expand Down
15 changes: 14 additions & 1 deletion components/metrics/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use dynamo_llm::kv_router::scheduler::KVHitRateEvent;
use dynamo_llm::kv_router::KV_HIT_RATE_SUBJECT;
use dynamo_runtime::{
error, logging,
metrics::MetricsRegistry,
traits::events::{EventPublisher, EventSubscriber},
utils::{Duration, Instant},
DistributedRuntime, ErrorContext, Result, Runtime, Worker,
Expand Down Expand Up @@ -60,6 +61,10 @@ struct Args {
#[arg(long)]
endpoint: String,

/// Model name for the target component (optional)
#[arg(long)]
model_name: Option<String>,

/// Polling interval in seconds for scraping dynamo endpoint stats (minimum 1 second)
#[arg(long, default_value = "1")]
poll_interval: u64,
Expand Down Expand Up @@ -109,6 +114,7 @@ fn get_config(args: &Args) -> Result<LLMWorkerLoadCapacityConfig> {
Ok(LLMWorkerLoadCapacityConfig {
component_name: args.component.clone(),
endpoint_name: args.endpoint.clone(),
model_name: args.model_name.clone(),
})
}

Expand All @@ -131,7 +137,14 @@ async fn app(runtime: Runtime) -> Result<()> {
.await
.context("Unable to create unique instance of Count; possibly one already exists")?;

let target_component = namespace.component(&config.component_name)?;
let target_component = {
let c = namespace.component(&config.component_name)?;
if let Some(ref model) = config.model_name {
c.add_labels(&[("model", model.as_str())])?
} else {
c
}
};
let target_endpoint = target_component.endpoint(&config.endpoint_name);

let service_path = target_endpoint.path();
Expand Down
15 changes: 15 additions & 0 deletions lib/bindings/python/rust/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,21 @@ impl Component {
Ok(())
})
}

/// Add constant labels to this component (for metrics). Returns a new Component with labels.
/// labels: list of (key, value) tuples.
fn add_labels(&self, labels: Vec<(String, String)>) -> PyResult<Component> {
use rs::metrics::MetricsRegistry as _;
let pairs: Vec<(&str, &str)> = labels
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect();
let inner = self.inner.clone().add_labels(&pairs).map_err(to_pyerr)?;
Ok(Component {
inner,
event_loop: self.event_loop.clone(),
})
}
}

#[pymethods]
Expand Down
4 changes: 3 additions & 1 deletion lib/llm/src/discovery/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use anyhow::Context as _;
use tokio::sync::{mpsc::Receiver, Notify};

use dynamo_runtime::{
metrics::MetricsRegistry,
pipeline::{
network::egress::push_router::PushRouter, ManyOut, Operator, RouterMode, SegmentSource,
ServiceBackend, SingleIn, Source,
Expand Down Expand Up @@ -169,7 +170,8 @@ impl ModelWatcher {
let component = self
.drt
.namespace(&endpoint_id.namespace)?
.component(&endpoint_id.component)?;
.component(&endpoint_id.component)
.and_then(|c| c.add_labels(&[("model", &model_entry.name)]))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question about client here- will take offline.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this in: #2292

let client = component.endpoint(&endpoint_id.name).client().await?;

let Some(etcd_client) = self.drt.etcd_client() else {
Expand Down
6 changes: 5 additions & 1 deletion lib/llm/src/entrypoint/input/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ use crate::{
Annotated,
},
};

use dynamo_runtime::{
component::Client,
distributed::DistributedConfig,
engine::{AsyncEngineStream, Data},
metrics::MetricsRegistry,
pipeline::{
Context, ManyOut, Operator, PushRouter, RouterMode, SegmentSource, ServiceBackend,
ServiceEngine, ServiceFrontend, SingleIn, Source,
Expand Down Expand Up @@ -109,7 +111,9 @@ pub async fn prepare_engine(
let endpoint_id = local_model.endpoint_id();
let component = distributed_runtime
.namespace(&endpoint_id.namespace)?
.component(&endpoint_id.component)?;
.component(&endpoint_id.component)
.and_then(|c| c.add_labels(&[("model", card.slug().to_string().as_str())]))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@keivenchang , @grahamking , @tzulingk - similar question here - is this relevant on a client?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this in #2292


let client = component.endpoint(&endpoint_id.name).client().await?;

let kv_chooser = if router_mode == RouterMode::KV {
Expand Down
20 changes: 19 additions & 1 deletion lib/llm/src/entrypoint/input/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use crate::{
Annotated,
},
};

use dynamo_runtime::engine::AsyncEngineStream;
use dynamo_runtime::metrics::MetricsRegistry;
use dynamo_runtime::pipeline::{
network::Ingress, Context, ManyOut, Operator, SegmentSource, ServiceBackend, SingleIn, Source,
};
Expand All @@ -31,9 +33,25 @@ pub async fn run(
let cancel_token = distributed_runtime.primary_token().clone();
let endpoint_id: EndpointId = path.parse()?;

let model_name = match &engine_config {
EngineConfig::StaticFull { model, .. } | EngineConfig::StaticCore { model, .. } => {
Some(model.card().slug().to_string())
}
EngineConfig::StaticRemote(model) | EngineConfig::Dynamic(model) => {
Some(model.card().slug().to_string())
}
};

let component = distributed_runtime
.namespace(&endpoint_id.namespace)?
.component(&endpoint_id.component)?;
.component(&endpoint_id.component)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question:

@grahamking is this for the in process engines?

@keivenchang do we have these hooked up into the frontend metrics?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this in #2292

.and_then(|c| {
if let Some(ref name) = model_name {
c.add_labels(&[("model", name.as_str())])
} else {
Ok(c)
}
})?;
let endpoint = component
.service_builder()
.create()
Expand Down
5 changes: 3 additions & 2 deletions lib/runtime/examples/hello_world/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
// limitations under the License.

use dynamo_runtime::{
logging, pipeline::PushRouter, protocols::annotated::Annotated, stream::StreamExt,
DistributedRuntime, Result, Runtime, Worker,
logging, metrics::MetricsRegistry, pipeline::PushRouter, protocols::annotated::Annotated,
stream::StreamExt, DistributedRuntime, Result, Runtime, Worker,
};
use hello_world::DEFAULT_NAMESPACE;

Expand All @@ -31,6 +31,7 @@ async fn app(runtime: Runtime) -> Result<()> {
let client = distributed
.namespace(DEFAULT_NAMESPACE)?
.component("backend")?
.add_labels(&[("model", "hello_world_model")])?
.endpoint("generate")
.client()
.await?;
Expand Down
2 changes: 2 additions & 0 deletions lib/runtime/examples/hello_world/src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

use dynamo_runtime::{
logging,
metrics::MetricsRegistry,
pipeline::{
async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut,
ResponseStream, SingleIn,
Expand Down Expand Up @@ -69,6 +70,7 @@ async fn backend(runtime: DistributedRuntime) -> Result<()> {
runtime
.namespace(DEFAULT_NAMESPACE)?
.component("backend")?
.add_labels(&[("model", "hello_world_model")])?
.service_builder()
.create()
.await?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use futures::StreamExt;
use service_metrics::DEFAULT_NAMESPACE;

use dynamo_runtime::{
logging, pipeline::PushRouter, protocols::annotated::Annotated, utils::Duration,
DistributedRuntime, Result, Runtime, Worker,
logging, metrics::MetricsRegistry, pipeline::PushRouter, protocols::annotated::Annotated,
utils::Duration, DistributedRuntime, Result, Runtime, Worker,
};

fn main() -> Result<()> {
Expand All @@ -31,7 +31,9 @@ async fn app(runtime: Runtime) -> Result<()> {
let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;

let namespace = distributed.namespace(DEFAULT_NAMESPACE)?;
let component = namespace.component("backend")?;
let component = namespace
.component("backend")?
.add_labels(&[("model", "service_metrics_model")])?;

let client = component.endpoint("generate").client().await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use service_metrics::{MyStats, DEFAULT_NAMESPACE};

use dynamo_runtime::{
logging,
metrics::MetricsRegistry,
pipeline::{
async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut,
ResponseStream, SingleIn,
Expand Down Expand Up @@ -71,6 +72,7 @@ async fn backend(runtime: DistributedRuntime) -> Result<()> {
runtime
.namespace(DEFAULT_NAMESPACE)?
.component("backend")?
.add_labels(&[("model", "service_metrics_model")])?
.service_builder()
.create()
.await?
Expand Down
2 changes: 2 additions & 0 deletions lib/runtime/examples/system_metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;
pub const DEFAULT_NAMESPACE: &str = "dyn_example_namespace";
pub const DEFAULT_COMPONENT: &str = "dyn_example_component";
pub const DEFAULT_ENDPOINT: &str = "dyn_example_endpoint";
pub const DEFAULT_MODEL_NAME: &str = "dyn_example_model";

/// Stats structure returned by the endpoint's stats handler
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -90,6 +91,7 @@ pub async fn backend(drt: DistributedRuntime, endpoint_name: Option<&str>) -> Re
let endpoint = drt
.namespace(DEFAULT_NAMESPACE)?
.component(DEFAULT_COMPONENT)?
.add_labels(&[("model", DEFAULT_MODEL_NAME)])?
.service_builder()
.create()
.await?
Expand Down
36 changes: 36 additions & 0 deletions lib/runtime/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ pub struct Component {
#[validate(custom(function = "validate_allowed_chars"))]
name: String,

/// Additional labels for metrics
#[builder(default = "Vec::new()")]
labels: Vec<(String, String)>,

// todo - restrict the namespace to a-z0-9-_A-Z
/// Namespace
#[builder(setter(into))]
Expand Down Expand Up @@ -183,6 +187,18 @@ impl MetricsRegistry for Component {
]
.concat()
}

fn stored_labels(&self) -> Vec<(&str, &str)> {
// Convert Vec<(String, String)> to Vec<(&str, &str)>
self.labels
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect()
}

fn labels_mut(&mut self) -> &mut Vec<(String, String)> {
&mut self.labels
}
}

impl Component {
Expand Down Expand Up @@ -220,6 +236,7 @@ impl Component {
component: self.clone(),
name: endpoint.into(),
is_static: self.is_static,
labels: self.labels.clone(),
}
}

Expand Down Expand Up @@ -285,6 +302,9 @@ pub struct Endpoint {
name: String,

is_static: bool,

/// Additional labels for metrics
labels: Vec<(String, String)>,
}

impl Hash for Endpoint {
Expand Down Expand Up @@ -329,6 +349,18 @@ impl MetricsRegistry for Endpoint {
]
.concat()
}

fn stored_labels(&self) -> Vec<(&str, &str)> {
// Convert Vec<(String, String)> to Vec<(&str, &str)>
self.labels
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect()
}

fn labels_mut(&mut self) -> &mut Vec<(String, String)> {
&mut self.labels
}
}

impl Endpoint {
Expand Down Expand Up @@ -447,6 +479,10 @@ pub struct Namespace {

#[builder(default = "None")]
parent: Option<Arc<Namespace>>,

/// Additional labels for metrics
#[builder(default = "Vec::new()")]
labels: Vec<(String, String)>,
}

impl DistributedRuntimeProvider for Namespace {
Expand Down
12 changes: 12 additions & 0 deletions lib/runtime/src/component/namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,18 @@ impl MetricsRegistry for Namespace {
fn parent_hierarchy(&self) -> Vec<String> {
vec![self.drt().basename()]
}

fn stored_labels(&self) -> Vec<(&str, &str)> {
// Convert Vec<(String, String)> to Vec<(&str, &str)>
self.labels
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect()
}

fn labels_mut(&mut self) -> &mut Vec<(String, String)> {
&mut self.labels
}
}

#[cfg(feature = "integration")]
Expand Down
13 changes: 13 additions & 0 deletions lib/runtime/src/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ impl MetricsRegistry for DistributedRuntime {
fn parent_hierarchy(&self) -> Vec<String> {
vec![] // drt is the root, so no parent hierarchy
}

fn stored_labels(&self) -> Vec<(&str, &str)> {
// Convert Vec<(String, String)> to Vec<(&str, &str)>
self.labels
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect()
}

fn labels_mut(&mut self) -> &mut Vec<(String, String)> {
&mut self.labels
}
}

impl DistributedRuntime {
Expand Down Expand Up @@ -111,6 +123,7 @@ impl DistributedRuntime {
prometheus::Registry,
>::new())),
system_health,
labels: Vec::new(),
};

// Start metrics server if enabled
Expand Down
3 changes: 3 additions & 0 deletions lib/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,7 @@ pub struct DistributedRuntime {

// This map associates metric prefixes with their corresponding Prometheus registries.
prometheus_registries_by_prefix: Arc<std::sync::Mutex<HashMap<String, prometheus::Registry>>>,

// Additional labels for metrics
labels: Vec<(String, String)>,
}
Loading
Loading