Skip to content

Commit 85c2be7

Browse files
grahamkingnv-tusharma
authored andcommitted
feat(etcd): Version the etcd keys (#3458)
Signed-off-by: Graham King <[email protected]>
1 parent 289ea20 commit 85c2be7

File tree

13 files changed

+68
-167
lines changed

13 files changed

+68
-167
lines changed

lib/bindings/python/rust/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -594,7 +594,7 @@ fn bind_tcp_port(port: u16) -> std::io::Result<socket2::Socket> {
594594
}
595595

596596
fn make_port_key(namespace: &str, node_ip: IpAddr, port: u16) -> anyhow::Result<String> {
597-
Ok(format!("dyn://{namespace}/ports/{node_ip}/{port}"))
597+
Ok(format!("v1/{namespace}/ports/{node_ip}/{port}"))
598598
}
599599

600600
fn local_ip() -> Result<IpAddr, local_ip_address::Error> {

lib/bindings/python/rust/planner.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ impl VirtualConnectorCoordinator {
102102
let prefix = root_key(&self.0.namespace);
103103
let inner = self.0.clone();
104104
pyo3_async_runtimes::tokio::future_into_py(py, async move {
105-
let kv_cache = KvCache::new(inner.etcd_client.clone(), prefix, HashMap::new())
105+
let kv_cache = KvCache::new(inner.etcd_client.clone(), "v1", prefix, HashMap::new())
106106
.await
107107
.map_err(to_pyerr)?;
108108
*inner.kv_cache.lock() = Some(Arc::new(kv_cache));
@@ -497,5 +497,5 @@ fn load(a: &AtomicUsize) -> usize {
497497
}
498498

499499
fn root_key(namespace: &str) -> String {
500-
format!("/{namespace}/planner/")
500+
format!("{namespace}/planner/")
501501
}

lib/llm/src/discovery.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ mod watcher;
88
pub use watcher::{ModelUpdate, ModelWatcher};
99

1010
/// The root etcd path for KV Router registrations
11-
pub const KV_ROUTERS_ROOT_PATH: &str = "kv_routers";
11+
pub const KV_ROUTERS_ROOT_PATH: &str = "v1/kv_routers";

lib/llm/src/discovery/watcher.rs

Lines changed: 9 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,6 @@ impl ModelWatcher {
318318
namespace = endpoint_id.namespace,
319319
"New endpoint for existing model"
320320
);
321-
//self.notify_on_model.notify_waiters();
322321
return Ok(());
323322
}
324323

@@ -413,7 +412,7 @@ impl ModelWatcher {
413412
.await?;
414413
let engine = Arc::new(push_router);
415414
self.manager
416-
.add_embeddings_model(&model_entry.name, engine)?;
415+
.add_embeddings_model(card.name(), checksum, engine)?;
417416
} else if card.model_input == ModelInput::Text && card.model_type.supports_chat() {
418417
// Case 3: Text + Chat
419418
let push_router = PushRouter::<
@@ -560,26 +559,20 @@ impl ModelWatcher {
560559
/// The ModelDeploymentCard is published in etcd with a key like "v1/mdc/dynamo/backend/generate/694d9981145a61ad".
561560
/// Extract the EndpointId and instance_id from that.
562561
fn etcd_key_extract(s: &str) -> anyhow::Result<(EndpointId, String)> {
562+
if !s.starts_with(model_card::ROOT_PATH) {
563+
anyhow::bail!("Invalid format: expected model card ROOT_PATH segment in {s}");
564+
}
563565
let parts: Vec<&str> = s.split('/').collect();
564-
let start_idx = if !parts.is_empty() && parts[0] == "v1" {
565-
1
566-
} else {
567-
0
568-
};
569566

570-
// Need at least prefix model_card::ROOT_PATH + 3 parts: namespace, component, name
571-
if parts.len() <= start_idx + 3 {
567+
// Need at least prefix model_card::ROOT_PATH (2 parts) + namespace, component, name (3 parts)
568+
if parts.len() <= 5 {
572569
anyhow::bail!("Invalid format: not enough path segments in {s}");
573570
}
574571

575-
if parts.get(start_idx) != Some(&model_card::ROOT_PATH) {
576-
anyhow::bail!("Invalid format: expected model card ROOT_PATH segment in {s}");
577-
}
578-
579572
let endpoint_id = EndpointId {
580-
namespace: parts[start_idx + 1].to_string(),
581-
component: parts[start_idx + 2].to_string(),
582-
name: parts[start_idx + 3].to_string(),
573+
namespace: parts[2].to_string(),
574+
component: parts[3].to_string(),
575+
name: parts[4].to_string(),
583576
};
584577
Ok((endpoint_id, parts[parts.len() - 1].to_string()))
585578
}
@@ -590,16 +583,6 @@ mod tests {
590583

591584
#[test]
592585
fn test_etcd_key_extract() {
593-
let input = format!(
594-
"v1/{}/dynamo/backend/generate/694d9981145a61ad",
595-
model_card::ROOT_PATH
596-
);
597-
let (endpoint_id, instance_id) = etcd_key_extract(&input).unwrap();
598-
assert_eq!(endpoint_id.namespace, "dynamo");
599-
assert_eq!(endpoint_id.component, "backend");
600-
assert_eq!(endpoint_id.name, "generate");
601-
assert_eq!(instance_id, "694d9981145a61ad");
602-
603586
let input = format!(
604587
"{}/dynamo/backend/generate/694d9981145a61ad",
605588
model_card::ROOT_PATH

lib/llm/src/model_card.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use crate::gguf::{Content, ContentConfig, ModelConfigLike};
3434
use crate::protocols::TokenIdType;
3535

3636
/// Identify model deployment cards in the key-value store
37-
pub const ROOT_PATH: &str = "mdc";
37+
pub const ROOT_PATH: &str = "v1/mdc";
3838

3939
#[derive(Serialize, Deserialize, Clone, Debug)]
4040
#[serde(rename_all = "snake_case")]

lib/runtime/src/component.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use crate::{
3434
discovery::Lease,
3535
metrics::{MetricsRegistry, prometheus_names},
3636
service::ServiceSet,
37-
transports::etcd::EtcdPath,
37+
transports::etcd::{ETCD_ROOT_PATH, EtcdPath},
3838
};
3939

4040
use super::{
@@ -72,10 +72,7 @@ pub use client::{Client, InstanceSource};
7272

7373
/// The root etcd path where each instance registers itself in etcd.
7474
/// An instance is namespace+component+endpoint+lease_id and must be unique.
75-
pub const INSTANCE_ROOT_PATH: &str = "instances";
76-
77-
/// The root etcd path where each namespace is registered in etcd.
78-
pub const ETCD_ROOT_PATH: &str = "dynamo://";
75+
pub const INSTANCE_ROOT_PATH: &str = "v1/instances";
7976

8077
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
8178
#[serde(rename_all = "snake_case")]
@@ -601,7 +598,7 @@ impl Namespace {
601598
}
602599

603600
pub fn etcd_path(&self) -> String {
604-
format!("{}{}", ETCD_ROOT_PATH, self.name())
601+
format!("{ETCD_ROOT_PATH}{}", self.name())
605602
}
606603

607604
pub fn name(&self) -> String {

lib/runtime/src/storage/key_value_store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ mod tests {
252252
use super::*;
253253
use futures::{StreamExt, pin_mut};
254254

255-
const BUCKET_NAME: &str = "mdc";
255+
const BUCKET_NAME: &str = "v1/mdc";
256256

257257
/// Convert the value returned by `watch()` into a broadcast stream that multiple
258258
/// clients can listen to.

lib/runtime/src/storage/key_value_store/etcd.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::collections::HashMap;
55
use std::pin::Pin;
66
use std::time::Duration;
77

8-
use crate::{slug::Slug, storage::key_value_store::Key, transports::etcd::Client};
8+
use crate::{storage::key_value_store::Key, transports::etcd::Client};
99
use async_stream::stream;
1010
use async_trait::async_trait;
1111
use etcd_client::{Compare, CompareOp, EventType, PutOptions, Txn, TxnOp, WatchOptions};
@@ -240,7 +240,7 @@ impl EtcdBucket {
240240
}
241241

242242
fn make_key(bucket_name: &str, key: &Key) -> String {
243-
[Slug::slugify(bucket_name).to_string(), key.to_string()].join("/")
243+
[bucket_name.to_string(), key.to_string()].join("/")
244244
}
245245

246246
#[cfg(feature = "integration")]

lib/runtime/src/transports/etcd.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ pub use path::*;
2828

2929
use super::utils::build_in_runtime;
3030

31-
//pub use etcd::ConnectOptions as EtcdConnectOptions;
32-
3331
/// ETCD Client
3432
#[derive(Clone)]
3533
pub struct Client {
@@ -517,11 +515,14 @@ impl KvCache {
517515
/// Create a new KV cache for the given prefix
518516
pub async fn new(
519517
client: Client,
518+
version: &str,
520519
prefix: String,
521520
initial_values: HashMap<String, Vec<u8>>,
522521
) -> Result<Self> {
523522
let mut cache = HashMap::new();
524523

524+
let prefix = format!("{version}/{prefix}");
525+
525526
// First get all existing keys with this prefix
526527
let existing_kvs = client.kv_get_prefix(&prefix).await?;
527528
for kv in existing_kvs {
@@ -575,21 +576,21 @@ impl KvCache {
575576
let key = String::from_utf8_lossy(kv.key()).to_string();
576577
let value = kv.value().to_vec();
577578

578-
tracing::debug!("KvCache update: {} = {:?}", key, value);
579+
tracing::trace!("KvCache update: {} = {:?}", key, value);
579580
let mut cache_write = cache.write().await;
580581
cache_write.insert(key, value);
581582
}
582583
WatchEvent::Delete(kv) => {
583584
let key = String::from_utf8_lossy(kv.key()).to_string();
584585

585-
tracing::debug!("KvCache delete: {}", key);
586+
tracing::trace!("KvCache delete: {}", key);
586587
let mut cache_write = cache.write().await;
587588
cache_write.remove(&key);
588589
}
589590
}
590591
}
591592

592-
tracing::info!("KvCache watcher for prefix '{}' stopped", prefix);
593+
tracing::debug!("KvCache watcher for prefix '{}' stopped", prefix);
593594
});
594595
}
595596

@@ -719,7 +720,7 @@ mod tests {
719720
initial_values.insert("key2".to_string(), b"value2".to_vec());
720721

721722
// Create the KV cache
722-
let kv_cache = KvCache::new(client.clone(), prefix.clone(), initial_values).await?;
723+
let kv_cache = KvCache::new(client.clone(), "v1", prefix.clone(), initial_values).await?;
723724

724725
// Test get
725726
let value1 = kv_cache.get("key1").await;

0 commit comments

Comments
 (0)