Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions lib/llm/src/entrypoint/input/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,20 @@ use dynamo_runtime::{DistributedRuntime, Runtime};

/// Build and run an HTTP service
pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Result<()> {
let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
let etcd_client = distributed_runtime.etcd_client().clone();

let http_service = service_v2::HttpService::builder()
let mut http_service_builder = service_v2::HttpService::builder()
.port(engine_config.local_model().http_port())
.enable_chat_endpoints(true)
.enable_cmpl_endpoints(true)
.enable_embeddings_endpoints(true)
.with_request_template(engine_config.local_model().request_template())
.with_etcd_client(etcd_client.clone())
.build()?;
.with_request_template(engine_config.local_model().request_template());

match engine_config {
let http_service = match engine_config {
EngineConfig::Dynamic(_) => {
let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
let etcd_client = distributed_runtime.etcd_client();
// This allows the /health endpoint to query etcd for active instances
http_service_builder = http_service_builder.with_etcd_client(etcd_client.clone());
let http_service = http_service_builder.build()?;
match etcd_client {
Some(ref etcd_client) => {
let router_config = engine_config.local_model().router_config();
Expand All @@ -52,13 +52,15 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul
// Static endpoints don't need discovery
}
}
http_service
}
EngineConfig::StaticRemote(local_model) => {
let card = local_model.card();
let router_mode = local_model.router_config().router_mode;

let dst_config = DistributedConfig::from_settings(true);
let dst_config = DistributedConfig::from_settings(true); // true means static
let distributed_runtime = DistributedRuntime::new(runtime.clone(), dst_config).await?;
let http_service = http_service_builder.build()?;
let manager = http_service.model_manager();

let endpoint_id = local_model.endpoint_id();
Expand Down Expand Up @@ -95,18 +97,23 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul
>(card, &client, router_mode, kv_chooser)
.await?;
manager.add_completions_model(local_model.display_name(), completions_engine)?;

http_service
}
EngineConfig::StaticFull { engine, model, .. } => {
let http_service = http_service_builder.build()?;
let engine = Arc::new(StreamingEngineAdapter::new(engine));
let manager = http_service.model_manager();
manager.add_completions_model(model.service_name(), engine.clone())?;
manager.add_chat_completions_model(model.service_name(), engine)?;
http_service
}
EngineConfig::StaticCore {
engine: inner_engine,
model,
..
} => {
let http_service = http_service_builder.build()?;
let manager = http_service.model_manager();

let chat_pipeline = common::build_pipeline::<
Expand All @@ -122,8 +129,9 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul
>(model.card(), inner_engine)
.await?;
manager.add_completions_model(model.service_name(), cmpl_pipeline)?;
http_service
}
}
};
tracing::debug!(
"Supported routes: {:?}",
http_service
Expand Down
Loading