Skip to content

Commit 81162df

Browse files
authored
chore(discovery): Watch/publish ModelDeploymentCard instead of ModelEntry (#3350)
Signed-off-by: Graham King <[email protected]>
1 parent ddbb4f5 commit 81162df

File tree

23 files changed

+537
-329
lines changed

23 files changed

+537
-329
lines changed

lib/bindings/python/examples/openai_service/server.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ async def worker(runtime: DistributedRuntime):
6969
host: str = "localhost"
7070
port: int = 8000
7171
service: HttpService = HttpService(port=port)
72-
service.add_chat_completions_model(served_model_name, engine)
72+
service.add_chat_completions_model(served_model_name, "mdcsum", engine)
7373

7474
print("Starting service...")
7575
shutdown_signal = service.run(runtime.child_token())

lib/bindings/python/rust/http.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,23 +30,29 @@ impl HttpService {
3030
Ok(Self { inner })
3131
}
3232

33-
pub fn add_completions_model(&self, model: String, engine: HttpAsyncEngine) -> PyResult<()> {
33+
pub fn add_completions_model(
34+
&self,
35+
model: String,
36+
checksum: String,
37+
engine: HttpAsyncEngine,
38+
) -> PyResult<()> {
3439
let engine = Arc::new(engine);
3540
self.inner
3641
.model_manager()
37-
.add_completions_model(&model, engine)
42+
.add_completions_model(&model, &checksum, engine)
3843
.map_err(to_pyerr)
3944
}
4045

4146
pub fn add_chat_completions_model(
4247
&self,
4348
model: String,
49+
checksum: String,
4450
engine: HttpAsyncEngine,
4551
) -> PyResult<()> {
4652
let engine = Arc::new(engine);
4753
self.inner
4854
.model_manager()
49-
.add_chat_completions_model(&model, engine)
55+
.add_chat_completions_model(&model, &checksum, engine)
5056
.map_err(to_pyerr)
5157
}
5258

lib/bindings/python/tests/test_http_server.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ async def http_server(runtime: DistributedRuntime):
8585
model_name = "test_model"
8686
start_done = asyncio.Event()
8787
child_token = runtime.child_token()
88+
checksum = "abc123" # Checksum of ModelDeplomentCard for that model
8889

8990
async def worker():
9091
"""The server worker task."""
@@ -94,7 +95,7 @@ async def worker():
9495
engine = HttpAsyncEngine(python_engine.generate, loop)
9596

9697
service = HttpService(port=port)
97-
service.add_chat_completions_model(model_name, engine)
98+
service.add_chat_completions_model(model_name, checksum, engine)
9899
service.enable_endpoint("chat", True)
99100

100101
shutdown_signal = service.run(child_token)

lib/llm/src/common/checked_file.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ pub struct Checksum {
3333
algorithm: CryptographicHashMethods,
3434
}
3535

36-
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
36+
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)]
3737
pub enum CryptographicHashMethods {
3838
#[serde(rename = "blake3")]
3939
BLAKE3,
@@ -259,6 +259,15 @@ impl TryFrom<&str> for Checksum {
259259
}
260260
}
261261

262+
impl Default for Checksum {
263+
fn default() -> Self {
264+
Self {
265+
hash: "".to_string(),
266+
algorithm: CryptographicHashMethods::BLAKE3,
267+
}
268+
}
269+
}
270+
262271
impl FromStr for CryptographicHashMethods {
263272
type Err = String;
264273

lib/llm/src/discovery.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,8 @@
44
mod model_manager;
55
pub use model_manager::{ModelManager, ModelManagerError};
66

7-
mod model_entry;
8-
pub use model_entry::ModelEntry;
9-
107
mod watcher;
118
pub use watcher::{ModelUpdate, ModelWatcher};
129

13-
/// The root etcd path for ModelEntry
14-
pub const MODEL_ROOT_PATH: &str = "models";
15-
1610
/// The root etcd path for KV Router registrations
1711
pub const KV_ROUTERS_ROOT_PATH: &str = "kv_routers";

lib/llm/src/discovery/model_entry.rs

Lines changed: 0 additions & 30 deletions
This file was deleted.

lib/llm/src/discovery/model_manager.rs

Lines changed: 69 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use parking_lot::{Mutex, RwLock};
1111
use dynamo_runtime::component::Component;
1212
use dynamo_runtime::prelude::DistributedRuntimeProvider;
1313

14-
use crate::kv_router::{KvRouterConfig, scheduler::DefaultWorkerSelector};
1514
use crate::{discovery::KV_ROUTERS_ROOT_PATH, model_card::ModelDeploymentCard};
1615
use crate::{
1716
kv_router::KvRouter,
@@ -21,6 +20,10 @@ use crate::{
2120
completions::OpenAICompletionsStreamingEngine, embeddings::OpenAIEmbeddingsStreamingEngine,
2221
},
2322
};
23+
use crate::{
24+
kv_router::{KvRouterConfig, scheduler::DefaultWorkerSelector},
25+
model_type::ModelType,
26+
};
2427

2528
#[derive(Debug, thiserror::Error)]
2629
pub enum ModelManagerError {
@@ -39,7 +42,7 @@ pub struct ModelManager {
3942
embeddings_engines: RwLock<ModelEngines<OpenAIEmbeddingsStreamingEngine>>,
4043
tensor_engines: RwLock<ModelEngines<TensorStreamingEngine>>,
4144

42-
// These two are Mutex because we read and write rarely and equally
45+
// These are Mutex because we read and write rarely and equally
4346
cards: Mutex<HashMap<String, ModelDeploymentCard>>,
4447
kv_choosers: Mutex<HashMap<String, Arc<KvRouter>>>,
4548
}
@@ -62,6 +65,43 @@ impl ModelManager {
6265
}
6366
}
6467

68+
pub fn is_valid_checksum(
69+
&self,
70+
model_type: ModelType,
71+
model_name: &str,
72+
candidate_checksum: &str,
73+
) -> Option<bool> {
74+
let mut results = vec![];
75+
for unit in model_type.units() {
76+
let maybe_valid_checksum = match unit {
77+
ModelType::Chat => self.chat_completion_engines.read().checksum(model_name),
78+
ModelType::Completions => self.completion_engines.read().checksum(model_name),
79+
ModelType::Embedding => self.embeddings_engines.read().checksum(model_name),
80+
ModelType::TensorBased => self.tensor_engines.read().checksum(model_name),
81+
_ => {
82+
continue;
83+
}
84+
};
85+
if let Some(is_valid) = maybe_valid_checksum.map(|valid_checksum| {
86+
tracing::debug!(
87+
model_name,
88+
valid_checksum,
89+
candidate_checksum,
90+
"is_valid_checksum: check case"
91+
);
92+
valid_checksum == candidate_checksum
93+
}) {
94+
results.push(is_valid)
95+
}
96+
}
97+
if results.is_empty() {
98+
None
99+
} else {
100+
// The checksum is valid if it is correct for all the ModelType in the bitflag.
101+
Some(results.into_iter().all(|x| x))
102+
}
103+
}
104+
65105
pub fn get_model_cards(&self) -> Vec<ModelDeploymentCard> {
66106
self.cards.lock().values().cloned().collect()
67107
}
@@ -99,37 +139,41 @@ impl ModelManager {
99139
pub fn add_completions_model(
100140
&self,
101141
model: &str,
142+
card_checksum: &str,
102143
engine: OpenAICompletionsStreamingEngine,
103144
) -> Result<(), ModelManagerError> {
104145
let mut clients = self.completion_engines.write();
105-
clients.add(model, engine)
146+
clients.add(model, card_checksum, engine)
106147
}
107148

108149
pub fn add_chat_completions_model(
109150
&self,
110151
model: &str,
152+
card_checksum: &str,
111153
engine: OpenAIChatCompletionsStreamingEngine,
112154
) -> Result<(), ModelManagerError> {
113155
let mut clients = self.chat_completion_engines.write();
114-
clients.add(model, engine)
156+
clients.add(model, card_checksum, engine)
115157
}
116158

117159
pub fn add_embeddings_model(
118160
&self,
119161
model: &str,
162+
card_checksum: &str,
120163
engine: OpenAIEmbeddingsStreamingEngine,
121164
) -> Result<(), ModelManagerError> {
122165
let mut clients = self.embeddings_engines.write();
123-
clients.add(model, engine)
166+
clients.add(model, card_checksum, engine)
124167
}
125168

126169
pub fn add_tensor_model(
127170
&self,
128171
model: &str,
172+
card_checksum: &str,
129173
engine: TensorStreamingEngine,
130174
) -> Result<(), ModelManagerError> {
131175
let mut clients = self.tensor_engines.write();
132-
clients.add(model, engine)
176+
clients.add(model, card_checksum, engine)
133177
}
134178

135179
pub fn remove_completions_model(&self, model: &str) -> Result<(), ModelManagerError> {
@@ -196,10 +240,11 @@ impl ModelManager {
196240
.ok_or(ModelManagerError::ModelNotFound(model.to_string()))
197241
}
198242

199-
/// Save a ModelDeploymentCard from an instance's etcd `models/` key so we can fetch it later when the key is
200-
/// deleted from etcd.
201-
pub fn save_model_card(&self, key: &str, entry: ModelDeploymentCard) {
202-
self.cards.lock().insert(key.to_string(), entry);
243+
/// Save a ModelDeploymentCard from an instance's ModelDeploymentCard key so we can fetch it later when the key is
244+
/// deleted.
245+
pub fn save_model_card(&self, key: &str, card: ModelDeploymentCard) -> anyhow::Result<()> {
246+
self.cards.lock().insert(key.to_string(), card);
247+
Ok(())
203248
}
204249

205250
/// Remove and return model card for this instance's etcd key. We do this when the instance stops.
@@ -291,13 +336,17 @@ pub struct ModelEngines<E> {
291336
/// Optional default model name
292337
default: Option<String>,
293338
engines: HashMap<String, E>,
339+
/// Key: Model name, value: Checksum of the ModelDeploymentCard. New instances must have the
340+
/// same card.
341+
checksums: HashMap<String, String>,
294342
}
295343

296344
impl<E> Default for ModelEngines<E> {
297345
fn default() -> Self {
298346
Self {
299347
default: None,
300348
engines: HashMap::new(),
349+
checksums: HashMap::new(),
301350
}
302351
}
303352
}
@@ -313,18 +362,21 @@ impl<E> ModelEngines<E> {
313362
self.default = None;
314363
}
315364

316-
fn add(&mut self, model: &str, engine: E) -> Result<(), ModelManagerError> {
365+
fn add(&mut self, model: &str, checksum: &str, engine: E) -> Result<(), ModelManagerError> {
317366
if self.engines.contains_key(model) {
318367
return Err(ModelManagerError::ModelAlreadyExists(model.to_string()));
319368
}
320369
self.engines.insert(model.to_string(), engine);
370+
self.checksums
371+
.insert(model.to_string(), checksum.to_string());
321372
Ok(())
322373
}
323374

324375
fn remove(&mut self, model: &str) -> Result<(), ModelManagerError> {
325376
if self.engines.remove(model).is_none() {
326377
return Err(ModelManagerError::ModelNotFound(model.to_string()));
327378
}
379+
let _ = self.checksums.remove(model);
328380
Ok(())
329381
}
330382

@@ -339,4 +391,10 @@ impl<E> ModelEngines<E> {
339391
pub fn list(&self) -> Vec<String> {
340392
self.engines.keys().map(|k| k.to_owned()).collect()
341393
}
394+
395+
/// Returns a newly allocated String for called convenience. All the places I use
396+
/// this I need a String.
397+
pub fn checksum(&self, model: &str) -> Option<String> {
398+
self.checksums.get(model).map(|s| s.to_string())
399+
}
342400
}

0 commit comments

Comments
 (0)