Skip to content

Commit 72ec5f5

Browse files
authored
feat: Allow an endpoint to serve multiple models (#2418)
1 parent ebc84d6 commit 72ec5f5

File tree

8 files changed

+46
-99
lines changed

8 files changed

+46
-99
lines changed

components/metrics/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ async fn app(runtime: Runtime) -> Result<()> {
127127
tracing::debug!("Creating unique instance of Count at {key}");
128128
drt.etcd_client()
129129
.expect("Unreachable because of DistributedRuntime::from_settings above")
130-
.kv_create(key, serde_json::to_vec_pretty(&config)?, None)
130+
.kv_create(&key, serde_json::to_vec_pretty(&config)?, None)
131131
.await
132132
.context("Unable to create unique instance of Count; possibly one already exists")?;
133133

lib/bindings/python/rust/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,7 @@ impl EtcdClient {
557557
let client = self.inner.clone();
558558
pyo3_async_runtimes::tokio::future_into_py(py, async move {
559559
client
560-
.kv_create(key, value, lease_id)
560+
.kv_create(&key, value, lease_id)
561561
.await
562562
.map_err(to_pyerr)?;
563563
Ok(())

lib/llm/src/local_model.rs

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use dynamo_runtime::protocols::Endpoint as EndpointId;
1010
use dynamo_runtime::slug::Slug;
1111
use dynamo_runtime::traits::DistributedRuntimeProvider;
1212
use dynamo_runtime::{
13-
component::{Component, Endpoint},
13+
component::Endpoint,
1414
storage::key_value_store::{EtcdStorage, KeyValueStore, KeyValueStoreManager},
1515
};
1616

@@ -302,8 +302,6 @@ impl LocalModel {
302302
let Some(etcd_client) = endpoint.drt().etcd_client() else {
303303
anyhow::bail!("Cannot attach to static endpoint");
304304
};
305-
self.ensure_unique(endpoint.component(), self.display_name())
306-
.await?;
307305

308306
// Store model config files in NATS object store
309307
let nats_client = endpoint.drt().nats_client();
@@ -319,7 +317,7 @@ impl LocalModel {
319317

320318
// Publish our ModelEntry to etcd. This allows ingress to find the model card.
321319
// (Why don't we put the model card directly under this key?)
322-
let network_name = ModelNetworkName::from_local(endpoint, etcd_client.lease_id());
320+
let network_name = ModelNetworkName::new();
323321
tracing::debug!("Registering with etcd as {network_name}");
324322
let model_registration = ModelEntry {
325323
name: self.display_name().to_string(),
@@ -328,35 +326,12 @@ impl LocalModel {
328326
};
329327
etcd_client
330328
.kv_create(
331-
network_name.to_string(),
329+
&network_name,
332330
serde_json::to_vec_pretty(&model_registration)?,
333331
None, // use primary lease
334332
)
335333
.await
336334
}
337-
338-
/// Ensure that each component serves only one model.
339-
/// We can have multiple instances of the same model running using the same component name
340-
/// (they get load balanced, and are differentiated in etcd by their lease_id).
341-
/// We cannot have multiple models with the same component name.
342-
///
343-
/// Returns an error if there is already a component by this name serving a different model.
344-
async fn ensure_unique(&self, component: &Component, model_name: &str) -> anyhow::Result<()> {
345-
let Some(etcd_client) = component.drt().etcd_client() else {
346-
// A static component is necessarily unique, it cannot register
347-
return Ok(());
348-
};
349-
for endpoint_info in component.list_instances().await? {
350-
let network_name: ModelNetworkName = (&endpoint_info).into();
351-
352-
if let Ok(entry) = network_name.load_entry(&etcd_client).await {
353-
if entry.name != model_name {
354-
anyhow::bail!("Duplicate component. Attempt to register model {model_name} at {component}, which is already used by {network_name} running model {}.", entry.name);
355-
}
356-
}
357-
}
358-
Ok(())
359-
}
360335
}
361336

362337
/// A random endpoint to use for internal communication
Lines changed: 18 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,75 +1,38 @@
11
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use anyhow::Context as _;
5-
6-
use crate::discovery::{ModelEntry, MODEL_ROOT_PATH};
7-
use dynamo_runtime::component::{self, Instance};
8-
use dynamo_runtime::slug::Slug;
9-
use dynamo_runtime::transports::etcd;
4+
use crate::discovery::MODEL_ROOT_PATH;
105

116
#[derive(Debug, Clone)]
127
pub struct ModelNetworkName(String);
138

149
impl ModelNetworkName {
15-
/// Key to store this model entry in networked key-value store (etcd).
16-
///
17-
/// It looks like this:
18-
/// ns.cp.ep-694d967ca5efd804
19-
fn from_parts(namespace: &str, component: &str, endpoint: &str, lease_id: i64) -> Self {
20-
let model_root = MODEL_ROOT_PATH;
21-
let slug = Slug::slugify(&format!("{namespace}.{component}.{endpoint}-{lease_id:x}"));
22-
ModelNetworkName(format!("{model_root}/{slug}"))
23-
}
24-
25-
// We can't do From<&component::Endpoint> here because we also need the lease_id
26-
pub fn from_local(endpoint: &component::Endpoint, lease_id: i64) -> Self {
27-
Self::from_parts(
28-
&endpoint.component().namespace().to_string(),
29-
&endpoint.component().name(),
30-
endpoint.name(),
31-
lease_id,
32-
)
10+
pub fn new() -> Self {
11+
ModelNetworkName(format!("{MODEL_ROOT_PATH}/{}", uuid::Uuid::new_v4()))
3312
}
13+
}
3414

35-
pub fn from_entry(entry: &ModelEntry, lease_id: i64) -> Self {
36-
Self::from_parts(
37-
&entry.endpoint.namespace,
38-
&entry.endpoint.component,
39-
&entry.endpoint.name,
40-
lease_id,
41-
)
15+
impl Default for ModelNetworkName {
16+
fn default() -> Self {
17+
Self::new()
4218
}
19+
}
4320

44-
/// Fetch the ModelEntry from etcd.
45-
pub async fn load_entry(&self, etcd_client: &etcd::Client) -> anyhow::Result<ModelEntry> {
46-
let mut model_entries = etcd_client.kv_get(self.to_string(), None).await?;
47-
if model_entries.is_empty() {
48-
anyhow::bail!("No ModelEntry in etcd for key {self}");
49-
}
50-
let model_entry = model_entries.remove(0);
51-
serde_json::from_slice(model_entry.value()).with_context(|| {
52-
format!(
53-
"Error deserializing JSON. Key={self}. JSON={}",
54-
model_entry.value_str().unwrap_or("INVALID UTF-8")
55-
)
56-
})
21+
impl std::fmt::Display for ModelNetworkName {
22+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23+
write!(f, "{}", self.0)
5724
}
5825
}
5926

60-
impl From<&Instance> for ModelNetworkName {
61-
fn from(cei: &Instance) -> Self {
62-
Self::from_parts(
63-
&cei.namespace,
64-
&cei.component,
65-
&cei.endpoint,
66-
cei.instance_id,
67-
)
27+
impl AsRef<str> for ModelNetworkName {
28+
fn as_ref(&self) -> &str {
29+
&self.0
6830
}
6931
}
7032

71-
impl std::fmt::Display for ModelNetworkName {
72-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73-
write!(f, "{}", self.0)
33+
impl std::ops::Deref for ModelNetworkName {
34+
type Target = str;
35+
fn deref(&self) -> &Self::Target {
36+
&self.0
7437
}
7538
}

lib/runtime/examples/Cargo.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/runtime/src/component/endpoint.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ impl EndpointConfigBuilder {
144144
if let Some(etcd_client) = &endpoint.component.drt.etcd_client {
145145
if let Err(e) = etcd_client
146146
.kv_create(
147-
endpoint.etcd_path_with_lease_id(lease_id),
147+
&endpoint.etcd_path_with_lease_id(lease_id),
148148
info,
149149
Some(lease_id),
150150
)

lib/runtime/src/transports/etcd.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -170,20 +170,15 @@ impl Client {
170170
.await?
171171
}
172172

173-
pub async fn kv_create(
174-
&self,
175-
key: String,
176-
value: Vec<u8>,
177-
lease_id: Option<i64>,
178-
) -> Result<()> {
173+
pub async fn kv_create(&self, key: &str, value: Vec<u8>, lease_id: Option<i64>) -> Result<()> {
179174
let id = lease_id.unwrap_or(self.lease_id());
180175
let put_options = PutOptions::new().with_lease(id);
181176

182177
// Build the transaction
183178
let txn = Txn::new()
184-
.when(vec![Compare::version(key.as_str(), CompareOp::Equal, 0)]) // Ensure the lock does not exist
179+
.when(vec![Compare::version(key, CompareOp::Equal, 0)]) // Ensure the lock does not exist
185180
.and_then(vec![
186-
TxnOp::put(key.as_str(), value, Some(put_options)), // Create the object
181+
TxnOp::put(key, value, Some(put_options)), // Create the object
187182
]);
188183

189184
// Execute the transaction

lib/runtime/src/utils/leader_worker_barrier.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ fn handle_watch_event<T: DeserializeOwned>(
9595
/// Creates a key-value pair in etcd, returning a specific error if the key already exists
9696
async fn create_barrier_key<T: Serialize>(
9797
client: &Client,
98-
key: String,
98+
key: &str,
9999
data: T,
100100
lease_id: Option<i64>,
101101
) -> Result<(), LeaderWorkerBarrierError> {
@@ -193,7 +193,7 @@ impl<LeaderData: Serialize + DeserializeOwned, WorkerData: Serialize + Deseriali
193193
lease_id: i64,
194194
) -> Result<(), LeaderWorkerBarrierError> {
195195
let key = barrier_key(&self.barrier_id, BARRIER_DATA);
196-
create_barrier_key(client, key, data, Some(lease_id)).await
196+
create_barrier_key(client, &key, data, Some(lease_id)).await
197197
}
198198

199199
async fn wait_for_workers(
@@ -216,10 +216,10 @@ impl<LeaderData: Serialize + DeserializeOwned, WorkerData: Serialize + Deseriali
216216

217217
let workers = worker_result.keys().collect::<HashSet<_>>();
218218

219-
create_barrier_key(client, key, workers, Some(lease_id)).await?;
219+
create_barrier_key(client, &key, workers, Some(lease_id)).await?;
220220
} else {
221221
let key = barrier_key(&self.barrier_id, BARRIER_ABORT);
222-
create_barrier_key(client, key, (), Some(lease_id)).await?;
222+
create_barrier_key(client, &key, (), Some(lease_id)).await?;
223223
}
224224

225225
Ok(())
@@ -302,7 +302,7 @@ impl<LeaderData: Serialize + DeserializeOwned, WorkerData: Serialize + Deseriali
302302
&self.barrier_id,
303303
&format!("{}/{}", BARRIER_WORKER, self.worker_id),
304304
);
305-
create_barrier_key(client, key.clone(), data, Some(lease_id)).await?;
305+
create_barrier_key(client, &key, data, Some(lease_id)).await?;
306306
Ok(key)
307307
}
308308

0 commit comments

Comments
 (0)