Skip to content
Merged
9 changes: 7 additions & 2 deletions components/backends/vllm/src/dynamo/vllm/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ async def init_prefill(runtime: DistributedRuntime, config: Config):
"""
Instantiate and serve
"""
component = runtime.namespace(config.namespace).component(config.component)

component = runtime.namespace(config.namespace).component(
config.component, config.model
)
await component.create_service()

generate_endpoint = component.endpoint(config.endpoint)
Expand Down Expand Up @@ -164,7 +167,9 @@ async def init(runtime: DistributedRuntime, config: Config):
Instantiate and serve
"""

component = runtime.namespace(config.namespace).component(config.component)
component = runtime.namespace(config.namespace).component(
config.component, config.model
)
await component.create_service()

generate_endpoint = component.endpoint(config.endpoint)
Expand Down
2 changes: 1 addition & 1 deletion components/metrics/src/bin/mock_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ async fn backend(runtime: DistributedRuntime) -> Result<()> {
let namespace = runtime.namespace("dynamo")?;
// we must first create a service, then we can attach one more more endpoints
let component = namespace
.component("MyComponent")?
.component("MyComponent", Some("MyModel".to_string()))?
.service_builder()
.create()
.await?;
Expand Down
1 change: 1 addition & 0 deletions components/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ impl MetricsMode {
pub struct LLMWorkerLoadCapacityConfig {
pub component_name: String,
pub endpoint_name: String,
pub model_name: Option<String>,
}

/// Metrics collector for exposing metrics to prometheus/grafana
Expand Down
11 changes: 9 additions & 2 deletions components/metrics/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ struct Args {
#[arg(long)]
endpoint: String,

/// Model name for the target component (optional)
#[arg(long)]
model_name: String,

/// Polling interval in seconds for scraping dynamo endpoint stats (minimum 1 second)
#[arg(long, default_value = "1")]
poll_interval: u64,
Expand Down Expand Up @@ -109,6 +113,7 @@ fn get_config(args: &Args) -> Result<LLMWorkerLoadCapacityConfig> {
Ok(LLMWorkerLoadCapacityConfig {
component_name: args.component.clone(),
endpoint_name: args.endpoint.clone(),
model_name: Some(args.model_name.clone()),
})
}

Expand All @@ -120,7 +125,9 @@ async fn app(runtime: Runtime) -> Result<()> {
let drt = DistributedRuntime::from_settings(runtime.clone()).await?;

let namespace = drt.namespace(args.namespace)?;
let component = namespace.component("count")?;
// The metrics aggregator operates independently of any model,
// hence the model name is set to None.
let component = namespace.component("count", None)?;

// Create unique instance of Count
let key = format!("{}/instance", component.etcd_root());
Expand All @@ -131,7 +138,7 @@ async fn app(runtime: Runtime) -> Result<()> {
.await
.context("Unable to create unique instance of Count; possibly one already exists")?;

let target_component = namespace.component(&config.component_name)?;
let target_component = namespace.component(&config.component_name, config.model_name.clone())?;
let target_endpoint = target_component.endpoint(&config.endpoint_name);

let service_path = target_endpoint.path();
Expand Down
4 changes: 3 additions & 1 deletion components/router/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ async fn app(runtime: Runtime) -> Result<()> {
let args = Args::parse();
let runtime = DistributedRuntime::from_settings(runtime).await?;

// The metrics aggregator operates independently of any model,
// hence the model name is set to None.
let component = runtime
.namespace(&args.namespace)?
.component(&args.component)?;
.component(&args.component, None)?;

let selector = Box::new(CustomWorkerSelector::default());

Expand Down
2 changes: 1 addition & 1 deletion launch/dynamo-run/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async fn engine_for(
let endpoint = local_model.endpoint_id().clone();

let engine =
dynamo_llm::mocker::engine::make_mocker_engine(drt, endpoint, args).await?;
dynamo_llm::mocker::engine::make_mocker_engine(drt, endpoint, Some(local_model.card().slug().to_string()), args).await?;

Ok(EngineConfig::StaticCore {
engine,
Expand Down
4 changes: 3 additions & 1 deletion lib/bindings/c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ fn dynamo_create_kv_publisher(
.ok_or(anyhow::Error::msg("Could not get Distributed Runtime"))
{
Ok(drt) => {
let backend = drt.namespace(namespace)?.component(component)?;
// The model publisher isn't tied to a specific model. Thus, the model name
// is set to None.
let backend = drt.namespace(namespace)?.component(component, None)?;
KvEventPublisher::new(backend, worker_id, kv_block_size, None)
}
Err(e) => Err(e),
Expand Down
5 changes: 3 additions & 2 deletions lib/bindings/python/rust/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,8 +535,9 @@ impl Endpoint {

#[pymethods]
impl Namespace {
fn component(&self, name: String) -> PyResult<Component> {
let inner = self.inner.component(name).map_err(to_pyerr)?;
#[pyo3(signature = (name, model = None))]
fn component(&self, name: String, model: Option<String>) -> PyResult<Component> {
let inner = self.inner.component(name, model).map_err(to_pyerr)?;
Ok(Component {
inner,
event_loop: self.event_loop.clone(),
Expand Down
1 change: 1 addition & 0 deletions lib/bindings/python/rust/llm/entrypoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ async fn select_engine(
let engine = dynamo_llm::mocker::engine::make_mocker_engine(
distributed_runtime.inner,
endpoint,
Some(local_model.card().service_name.clone()),
mocker_args,
)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion lib/llm/src/discovery/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl ModelWatcher {
let component = self
.drt
.namespace(&endpoint_id.namespace)?
.component(&endpoint_id.component)?;
.component(&endpoint_id.component, Some(model_entry.name.clone()))?;
let client = component.endpoint(&endpoint_id.name).client().await?;

let Some(etcd_client) = self.drt.etcd_client() else {
Expand Down
2 changes: 1 addition & 1 deletion lib/llm/src/entrypoint/input/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ pub async fn prepare_engine(
let endpoint_id = local_model.endpoint_id();
let component = distributed_runtime
.namespace(&endpoint_id.namespace)?
.component(&endpoint_id.component)?;
.component(&endpoint_id.component, Some(card.slug().to_string()))?;
let client = component.endpoint(&endpoint_id.name).client().await?;

let kv_chooser = if router_mode == RouterMode::KV {
Expand Down
9 changes: 8 additions & 1 deletion lib/llm/src/entrypoint/input/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,16 @@ pub async fn run(
let cancel_token = distributed_runtime.primary_token().clone();
let endpoint_id: EndpointId = path.parse()?;

let model_name = match &engine_config {
EngineConfig::StaticFull { model, .. } | EngineConfig::StaticCore { model, .. } => {
Some(model.card().slug().to_string())
}
EngineConfig::StaticRemote(model) | EngineConfig::Dynamic(model) => Some(model.card().slug().to_string()),
};

let component = distributed_runtime
.namespace(&endpoint_id.namespace)?
.component(&endpoint_id.component)?;
.component(&endpoint_id.component, model_name)?;
let endpoint = component
.service_builder()
.create()
Expand Down
2 changes: 1 addition & 1 deletion lib/llm/src/entrypoint/input/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul
let endpoint_id = local_model.endpoint_id();
let component = distributed_runtime
.namespace(&endpoint_id.namespace)?
.component(&endpoint_id.component)?;
.component(&endpoint_id.component, Some(card.slug().to_string()))?;
let client = component.endpoint(&endpoint_id.name).client().await?;

let kv_chooser = if router_mode == RouterMode::KV {
Expand Down
6 changes: 4 additions & 2 deletions lib/llm/src/kv_router/prefill_counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,9 @@ mod integration_tests {

// Create namespace and components with same names
let namespace = distributed.namespace("test_prefill_multiworker")?;
// Counters component is not associated with any model, hence model name is None.
let component = namespace
.component("counters")?
.component("counters", None)?
.service_builder()
.create()
.await?;
Expand Down Expand Up @@ -430,8 +431,9 @@ mod integration_tests {

// Create namespace and components with same names
let namespace = distributed.namespace("test_prefill_multiworker")?;
// Counters component is not associated with any model, hence model name is None.
let component = namespace
.component("counters")?
.component("counters", None)?
.service_builder()
.create()
.await?;
Expand Down
4 changes: 2 additions & 2 deletions lib/llm/src/kv_router/sequence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ mod tests {
// Create namespace and component with same names as thread 2
let namespace = distributed.namespace("test_multiworker_sequences")?;
let component = namespace
.component("sequences")?
.component("sequences", None)?
.service_builder()
.create()
.await?;
Expand Down Expand Up @@ -914,7 +914,7 @@ mod tests {
// Create namespace and component with same names as thread 1
let namespace = distributed.namespace("test_multiworker_sequences")?;
let component = namespace
.component("sequences")?
.component("sequences", None)?
.service_builder()
.create()
.await?;
Expand Down
11 changes: 7 additions & 4 deletions lib/llm/src/mocker/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use tokio_stream::wrappers::ReceiverStream;
use uuid::Uuid;

pub const MOCKER_COMPONENT: &str = "mocker";
pub const MOCKER_MODEL: &str = "mocker_model";

/// Generate a random token ID from 1k to 5k
fn generate_random_token() -> TokenIdType {
Expand Down Expand Up @@ -441,6 +442,7 @@ impl AnnotatedMockEngine {
inner: MockVllmEngine,
distributed_runtime: DistributedRuntime,
endpoint: dynamo_runtime::protocols::Endpoint,
model_name: Option<String>,
) -> Self {
let inner = Arc::new(inner);
let inner_clone = inner.clone();
Expand All @@ -455,7 +457,7 @@ impl AnnotatedMockEngine {
continue;
};

let Ok(component) = namespace.component(&endpoint.component) else {
let Ok(component) = namespace.component(&endpoint.component, model_name.clone()) else {
tracing::debug!("Component not available yet, retrying...");
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
Expand Down Expand Up @@ -510,12 +512,13 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
pub async fn make_mocker_engine(
distributed_runtime: DistributedRuntime,
endpoint: dynamo_runtime::protocols::Endpoint,
model_name: Option<String>,
args: MockEngineArgs,
) -> Result<crate::backend::ExecutionContext, Error> {
// Create the mocker engine
tracing::info!("Creating mocker engine with config: {args:?}");
let annotated_engine =
AnnotatedMockEngine::new(MockVllmEngine::new(args), distributed_runtime, endpoint);
AnnotatedMockEngine::new(MockVllmEngine::new(args), distributed_runtime, endpoint, model_name);

Ok(Arc::new(annotated_engine))
}
Expand Down Expand Up @@ -551,7 +554,7 @@ mod integration_tests {
// Create component for MockVllmEngine (needed for publishers)
let test_component = distributed
.namespace("test")?
.component(MOCKER_COMPONENT)?
.component(MOCKER_COMPONENT, Some(MOCKER_MODEL.to_string()))?
.service_builder()
.create()
.await?;
Expand Down Expand Up @@ -622,7 +625,7 @@ mod integration_tests {
// Create client
let client = distributed
.namespace("test")?
.component(MOCKER_COMPONENT)?
.component(MOCKER_COMPONENT, Some(MOCKER_MODEL.to_string()))?
.endpoint("generate")
.client()
.await?;
Expand Down
19 changes: 18 additions & 1 deletion lib/runtime/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ pub struct Component {
#[validate(custom(function = "validate_allowed_chars"))]
name: String,

/// Name of the model
#[builder(setter(into), default)]
model: Option<String>,

// todo - restrict the namespace to a-z0-9-_A-Z
/// Namespace
#[builder(setter(into))]
Expand Down Expand Up @@ -176,6 +180,10 @@ impl MetricsRegistry for Component {
self.name.clone()
}

fn model(&self) -> Option<String> {
self.model.clone()
}

fn parent_hierarchy(&self) -> Vec<String> {
[
self.namespace.parent_hierarchy(),
Expand Down Expand Up @@ -215,6 +223,10 @@ impl Component {
self.name.clone()
}

pub fn model(&self) -> Option<String> {
self.model.clone()
}

pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
Endpoint {
component: self.clone(),
Expand Down Expand Up @@ -322,6 +334,10 @@ impl MetricsRegistry for Endpoint {
self.name.clone()
}

fn model(&self) -> Option<String> {
self.component.model()
}

fn parent_hierarchy(&self) -> Vec<String> {
[
self.component.parent_hierarchy(),
Expand Down Expand Up @@ -487,9 +503,10 @@ impl Namespace {
}

/// Create a [`Component`] in the namespace who's endpoints can be discovered with etcd
pub fn component(&self, name: impl Into<String>) -> Result<Component> {
pub fn component(&self, name: impl Into<String>, model: Option<String>) -> Result<Component> {
Ok(ComponentBuilder::from_runtime(self.runtime.clone())
.name(name)
.model(model)
.namespace(self.clone())
.is_static(self.is_static)
.build()?)
Expand Down
13 changes: 13 additions & 0 deletions lib/runtime/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,14 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
}
}
}
if let Some(model) = registry.model() {
if !model.is_empty() {
let valid_model = lint_prometheus_name(&model)?;
if !valid_model.is_empty() {
updated_labels.push(("model".to_string(), valid_model));
}
}
}
if hierarchy.len() > 3 {
let endpoint = &hierarchy[3];
if !endpoint.is_empty() {
Expand Down Expand Up @@ -386,6 +394,11 @@ pub trait MetricsRegistry: Send + Sync + crate::traits::DistributedRuntimeProvid
// Get the name of this registry (without any prefix)
fn basename(&self) -> String;

// Get the model name for this registry
fn model(&self) -> Option<String> {
None
}

/// Retrieve the complete hierarchy and basename for this registry. Currently, the prefix for drt is an empty string,
/// so we must account for the leading underscore. The existing code remains unchanged to accommodate any future
/// scenarios where drt's prefix might be assigned a value.
Expand Down
Loading