Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/bindings/python/rust/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ fn bind_tcp_port(port: u16) -> std::io::Result<socket2::Socket> {
}

fn make_port_key(namespace: &str, node_ip: IpAddr, port: u16) -> anyhow::Result<String> {
Ok(format!("dyn://{namespace}/ports/{node_ip}/{port}"))
Ok(format!("v1/{namespace}/ports/{node_ip}/{port}"))
}

fn local_ip() -> Result<IpAddr, local_ip_address::Error> {
Expand Down
4 changes: 2 additions & 2 deletions lib/bindings/python/rust/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl VirtualConnectorCoordinator {
let prefix = root_key(&self.0.namespace);
let inner = self.0.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let kv_cache = KvCache::new(inner.etcd_client.clone(), prefix, HashMap::new())
let kv_cache = KvCache::new(inner.etcd_client.clone(), "v1", prefix, HashMap::new())
.await
.map_err(to_pyerr)?;
*inner.kv_cache.lock() = Some(Arc::new(kv_cache));
Expand Down Expand Up @@ -497,5 +497,5 @@ fn load(a: &AtomicUsize) -> usize {
}

fn root_key(namespace: &str) -> String {
format!("/{namespace}/planner/")
format!("{namespace}/planner/")
}
2 changes: 1 addition & 1 deletion lib/llm/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ mod watcher;
pub use watcher::{ModelUpdate, ModelWatcher};

/// The root etcd path for KV Router registrations
pub const KV_ROUTERS_ROOT_PATH: &str = "kv_routers";
pub const KV_ROUTERS_ROOT_PATH: &str = "v1/kv_routers";
35 changes: 9 additions & 26 deletions lib/llm/src/discovery/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,6 @@ impl ModelWatcher {
namespace = endpoint_id.namespace,
"New endpoint for existing model"
);
//self.notify_on_model.notify_waiters();
return Ok(());
}

Expand Down Expand Up @@ -413,7 +412,7 @@ impl ModelWatcher {
.await?;
let engine = Arc::new(push_router);
self.manager
.add_embeddings_model(&model_entry.name, engine)?;
.add_embeddings_model(card.name(), checksum, engine)?;
} else if card.model_input == ModelInput::Text && card.model_type.supports_chat() {
// Case 3: Text + Chat
let push_router = PushRouter::<
Expand Down Expand Up @@ -560,26 +559,20 @@ impl ModelWatcher {
/// The ModelDeploymentCard is published in etcd with a key like "v1/mdc/dynamo/backend/generate/694d9981145a61ad".
/// Extract the EndpointId and instance_id from that.
fn etcd_key_extract(s: &str) -> anyhow::Result<(EndpointId, String)> {
if !s.starts_with(model_card::ROOT_PATH) {
anyhow::bail!("Invalid format: expected model card ROOT_PATH segment in {s}");
}
let parts: Vec<&str> = s.split('/').collect();
let start_idx = if !parts.is_empty() && parts[0] == "v1" {
1
} else {
0
};

// Need at least prefix model_card::ROOT_PATH + 3 parts: namespace, component, name
if parts.len() <= start_idx + 3 {
// Need at least prefix model_card::ROOT_PATH (2 parts) + namespace, component, name (3 parts)
if parts.len() <= 5 {
anyhow::bail!("Invalid format: not enough path segments in {s}");
}

if parts.get(start_idx) != Some(&model_card::ROOT_PATH) {
anyhow::bail!("Invalid format: expected model card ROOT_PATH segment in {s}");
}

let endpoint_id = EndpointId {
namespace: parts[start_idx + 1].to_string(),
component: parts[start_idx + 2].to_string(),
name: parts[start_idx + 3].to_string(),
namespace: parts[2].to_string(),
component: parts[3].to_string(),
name: parts[4].to_string(),
};
Ok((endpoint_id, parts[parts.len() - 1].to_string()))
}
Expand All @@ -590,16 +583,6 @@ mod tests {

#[test]
fn test_etcd_key_extract() {
let input = format!(
"v1/{}/dynamo/backend/generate/694d9981145a61ad",
model_card::ROOT_PATH
);
let (endpoint_id, instance_id) = etcd_key_extract(&input).unwrap();
assert_eq!(endpoint_id.namespace, "dynamo");
assert_eq!(endpoint_id.component, "backend");
assert_eq!(endpoint_id.name, "generate");
assert_eq!(instance_id, "694d9981145a61ad");

let input = format!(
"{}/dynamo/backend/generate/694d9981145a61ad",
model_card::ROOT_PATH
Expand Down
2 changes: 1 addition & 1 deletion lib/llm/src/model_card.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::gguf::{Content, ContentConfig, ModelConfigLike};
use crate::protocols::TokenIdType;

/// Identify model deployment cards in the key-value store
pub const ROOT_PATH: &str = "mdc";
pub const ROOT_PATH: &str = "v1/mdc";

#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "snake_case")]
Expand Down
9 changes: 3 additions & 6 deletions lib/runtime/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::{
discovery::Lease,
metrics::{MetricsRegistry, prometheus_names},
service::ServiceSet,
transports::etcd::EtcdPath,
transports::etcd::{ETCD_ROOT_PATH, EtcdPath},
};

use super::{
Expand Down Expand Up @@ -72,10 +72,7 @@ pub use client::{Client, InstanceSource};

/// The root etcd path where each instance registers itself in etcd.
/// An instance is namespace+component+endpoint+lease_id and must be unique.
pub const INSTANCE_ROOT_PATH: &str = "instances";

/// The root etcd path where each namespace is registered in etcd.
pub const ETCD_ROOT_PATH: &str = "dynamo://";
pub const INSTANCE_ROOT_PATH: &str = "v1/instances";

#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
#[serde(rename_all = "snake_case")]
Expand Down Expand Up @@ -601,7 +598,7 @@ impl Namespace {
}

pub fn etcd_path(&self) -> String {
format!("{}{}", ETCD_ROOT_PATH, self.name())
format!("{ETCD_ROOT_PATH}{}", self.name())
}

pub fn name(&self) -> String {
Expand Down
2 changes: 1 addition & 1 deletion lib/runtime/src/storage/key_value_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ mod tests {
use super::*;
use futures::{StreamExt, pin_mut};

const BUCKET_NAME: &str = "mdc";
const BUCKET_NAME: &str = "v1/mdc";

/// Convert the value returned by `watch()` into a broadcast stream that multiple
/// clients can listen to.
Expand Down
4 changes: 2 additions & 2 deletions lib/runtime/src/storage/key_value_store/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::collections::HashMap;
use std::pin::Pin;
use std::time::Duration;

use crate::{slug::Slug, storage::key_value_store::Key, transports::etcd::Client};
use crate::{storage::key_value_store::Key, transports::etcd::Client};
use async_stream::stream;
use async_trait::async_trait;
use etcd_client::{Compare, CompareOp, EventType, PutOptions, Txn, TxnOp, WatchOptions};
Expand Down Expand Up @@ -240,7 +240,7 @@ impl EtcdBucket {
}

fn make_key(bucket_name: &str, key: &Key) -> String {
[Slug::slugify(bucket_name).to_string(), key.to_string()].join("/")
[bucket_name.to_string(), key.to_string()].join("/")
}

#[cfg(feature = "integration")]
Expand Down
13 changes: 7 additions & 6 deletions lib/runtime/src/transports/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ pub use path::*;

use super::utils::build_in_runtime;

//pub use etcd::ConnectOptions as EtcdConnectOptions;

/// ETCD Client
#[derive(Clone)]
pub struct Client {
Expand Down Expand Up @@ -517,11 +515,14 @@ impl KvCache {
/// Create a new KV cache for the given prefix
pub async fn new(
client: Client,
version: &str,
prefix: String,
initial_values: HashMap<String, Vec<u8>>,
) -> Result<Self> {
let mut cache = HashMap::new();

let prefix = format!("{version}/{prefix}");

// First get all existing keys with this prefix
let existing_kvs = client.kv_get_prefix(&prefix).await?;
for kv in existing_kvs {
Expand Down Expand Up @@ -575,21 +576,21 @@ impl KvCache {
let key = String::from_utf8_lossy(kv.key()).to_string();
let value = kv.value().to_vec();

tracing::debug!("KvCache update: {} = {:?}", key, value);
tracing::trace!("KvCache update: {} = {:?}", key, value);
let mut cache_write = cache.write().await;
cache_write.insert(key, value);
}
WatchEvent::Delete(kv) => {
let key = String::from_utf8_lossy(kv.key()).to_string();

tracing::debug!("KvCache delete: {}", key);
tracing::trace!("KvCache delete: {}", key);
let mut cache_write = cache.write().await;
cache_write.remove(&key);
}
}
}

tracing::info!("KvCache watcher for prefix '{}' stopped", prefix);
tracing::debug!("KvCache watcher for prefix '{}' stopped", prefix);
});
}

Expand Down Expand Up @@ -719,7 +720,7 @@ mod tests {
initial_values.insert("key2".to_string(), b"value2".to_vec());

// Create the KV cache
let kv_cache = KvCache::new(client.clone(), prefix.clone(), initial_values).await?;
let kv_cache = KvCache::new(client.clone(), "v1", prefix.clone(), initial_values).await?;

// Test get
let value1 = kv_cache.get("key1").await;
Expand Down
Loading
Loading