Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
b317939
no errors for now
Oct 2, 2025
6d25ddc
bump
ishandhanani Oct 3, 2025
f90eb88
Merge branch 'main' into ishan/late-registration-ye
ishandhanani Oct 3, 2025
3b492f4
refactor
ishandhanani Oct 3, 2025
2124d91
found
ishandhanani Oct 3, 2025
ac9357e
nice
ishandhanani Oct 4, 2025
999c716
err
ishandhanani Oct 4, 2025
889f0c5
cleanup
ishandhanani Oct 5, 2025
9562483
bump
ishandhanani Oct 5, 2025
48bc524
go
ishandhanani Oct 5, 2025
ad934a4
Merge branch 'main' into ishan/late-registration-ye
ishandhanani Oct 5, 2025
c2d10ee
working
ishandhanani Oct 5, 2025
f3d74b8
go
ishandhanani Oct 5, 2025
3820e80
profile
ishandhanani Oct 5, 2025
a791ca7
bump:
ishandhanani Oct 5, 2025
5886ae4
redundant space
ishandhanani Oct 5, 2025
5166638
bump
ishandhanani Oct 5, 2025
35aa49e
slightly logic change since only agg endpoints supported now
ishandhanani Oct 6, 2025
b0ae2d4
reshuffle
ishandhanani Oct 6, 2025
c5307e3
Merge branch 'main' into ishan/late-registration-ye
ishandhanani Oct 6, 2025
f50c54b
selective pieces return a 404
ishandhanani Oct 6, 2025
3e6bf2c
singleton drt
ishandhanani Oct 6, 2025
f6f7c91
bump
ishandhanani Oct 6, 2025
e774001
bump
ishandhanani Oct 6, 2025
4502810
bump
ishandhanani Oct 6, 2025
18f590d
Merge branch 'main' into ishan/late-registration-ye
ishandhanani Oct 7, 2025
0cb978a
bump
ishandhanani Oct 7, 2025
145fb06
Merge branch 'main' into ishan/late-registration-ye
ishandhanani Oct 7, 2025
95e3a4f
swap etcd with drt in http builder
ishandhanani Oct 7, 2025
cdc4da0
gp
ishandhanani Oct 7, 2025
3d79298
lel
ishandhanani Oct 7, 2025
7ffe3e2
test
ishandhanani Oct 7, 2025
28e268d
no panicgit add .!
ishandhanani Oct 7, 2025
7852920
bruh
ishandhanani Oct 7, 2025
a3e2839
bump
ishandhanani Oct 7, 2025
11e8439
simple
ishandhanani Oct 7, 2025
2b7d61a
full refactor
ishandhanani Oct 8, 2025
6f194ee
try
ishandhanani Oct 8, 2025
c6c39cf
handling
ishandhanani Oct 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions components/src/dynamo/sglang/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ async def register_model():

health_check_payload = SglangHealthCheckPayload(engine).to_dict()

test_endpoint = component.endpoint("test")

try:
# Start endpoint immediately and register model concurrently
# Requests queue until ready_event is set
Expand All @@ -139,6 +141,13 @@ async def register_model():
health_check_payload=health_check_payload,
),
register_model(),
test_endpoint.serve_endpoint(
handler.test,
graceful_shutdown=True,
metrics_labels=metrics_labels,
health_check_payload=health_check_payload,
http_endpoint_path="/test",
),
)
except Exception as e:
logging.error(f"Failed to serve endpoints: {e}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ def _build_sampling_params(self, request: dict) -> dict:

return {k: v for k, v in param_mapping.items() if v is not None}

async def test(self, request: dict):
yield {"message": "Hello, world!"}

async def generate(self, request: dict):
sampling_params = self._build_sampling_params(request)
input_param = self._get_input_param(request)
Expand Down
7 changes: 6 additions & 1 deletion lib/bindings/python/rust/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,14 +644,15 @@ impl Component {

#[pymethods]
impl Endpoint {
#[pyo3(signature = (generator, graceful_shutdown = true, metrics_labels = None, health_check_payload = None))]
#[pyo3(signature = (generator, graceful_shutdown = true, metrics_labels = None, health_check_payload = None, http_endpoint_path = None))]
fn serve_endpoint<'p>(
&self,
py: Python<'p>,
generator: PyObject,
graceful_shutdown: Option<bool>,
metrics_labels: Option<Vec<(String, String)>>,
health_check_payload: Option<&Bound<'p, PyDict>>,
http_endpoint_path: Option<&str>,
) -> PyResult<Bound<'p, PyAny>> {
let engine = Arc::new(engine::PythonAsyncEngine::new(
generator,
Expand Down Expand Up @@ -689,6 +690,10 @@ impl Endpoint {
builder = builder.health_check_payload(payload);
}

if let Some(http_endpoint_path) = http_endpoint_path {
builder = builder.http_endpoint_path(http_endpoint_path);
}

let graceful_shutdown = graceful_shutdown.unwrap_or(true);
pyo3_async_runtimes::tokio::future_into_py(py, async move {
builder
Expand Down
2 changes: 1 addition & 1 deletion lib/bindings/python/src/dynamo/_core.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class Endpoint:

...

async def serve_endpoint(self, handler: RequestHandler, graceful_shutdown: bool = True, metrics_labels: Optional[List[Tuple[str, str]]] = None, health_check_payload: Optional[Dict[str, Any]] = None) -> None:
async def serve_endpoint(self, handler: RequestHandler, graceful_shutdown: bool = True, metrics_labels: Optional[List[Tuple[str, str]]] = None, health_check_payload: Optional[Dict[str, Any]] = None, http_endpoint_path: Optional[str] = None) -> None:
"""
Serve an endpoint discoverable by all connected clients at
`{{ namespace }}/components/{{ component_name }}/endpoints/{{ endpoint_name }}`
Expand Down
1 change: 1 addition & 0 deletions lib/llm/src/http/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
mod openai;

pub mod disconnect;
pub mod dynamic_endpoint;
pub mod error;
pub mod health;
pub mod metrics;
Expand Down
183 changes: 183 additions & 0 deletions lib/llm/src/http/service/dynamic_endpoint.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use super::{RouteDoc, service_v2};
use crate::types::Annotated;
use axum::{
Json, Router,
http::{Method, StatusCode},
response::IntoResponse,
routing::post,
};
use dynamo_runtime::instances::list_all_instances;
use dynamo_runtime::{DistributedRuntime, Runtime, component::Client};
use dynamo_runtime::{pipeline::PushRouter, stream::StreamExt};
use std::sync::Arc;

pub const DYNAMIC_ENDPOINT_PATH: &str = "dynamic_endpoint";

pub fn dynamic_endpoint_router(
state: Arc<service_v2::State>,
path: Option<String>,
) -> (Vec<RouteDoc>, Router) {
let wildcard_path = "/{*path}";
let path = path.unwrap_or_else(|| wildcard_path.to_string());

let docs: Vec<RouteDoc> = vec![RouteDoc::new(Method::POST, &path)];

let router = Router::new()
.route(&path, post(dynamic_endpoint_handler))
.with_state(state);

(docs, router)
}

async fn dynamic_endpoint_handler(
axum::extract::State(state): axum::extract::State<Arc<service_v2::State>>,
axum::extract::Path(path): axum::extract::Path<String>,
) -> impl IntoResponse {
let Some(etcd_client) = state.etcd_client() else {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"message": "Failed to get etcd client"
})),
);
};

let instances = match list_all_instances(etcd_client).await {
Ok(instances) => instances,
Err(_) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"message": "Failed to get instances"
})),
);
}
};

let dynamic_endpoints = instances
.iter()
.filter_map(|instance| instance.http_endpoint_path.clone())
.collect::<Vec<String>>();

let fmt_path = format!("/{}", &path);
if !dynamic_endpoints.contains(&fmt_path) {
return (
StatusCode::NOT_FOUND,
Json(serde_json::json!({
"message": "Dynamic endpoint not found"
})),
);
}

let rt = match Runtime::from_current() {
Ok(rt) => rt,
Err(_) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"message": "Failed to get runtime"
})),
);
}
};
let drt = match DistributedRuntime::from_settings(rt).await {
Ok(drt) => drt,
Err(_) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"message": "Failed to get distributed runtime"
})),
);
}
};

// grab all instances that expose this endpoint
let target_instances = instances
.iter()
.filter(|instance| instance.http_endpoint_path == Some(fmt_path.clone()))
.collect::<Vec<_>>();

// use pushrouter .direct to forward the request to the filtered instances sequentially
let mut target_clients: Vec<Client> = Vec::new();
for instance in target_instances {
let ns = match drt.namespace(instance.namespace.clone()) {
Ok(ns) => ns,
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"message": "Failed to get namespace"
})),
);
}
};
let c = match ns.component(instance.component.clone()) {
Ok(c) => c,
Err(_) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"message": "Failed to get component"
})),
);
}
};
let ep = c.endpoint(path.clone());
let c = match ep.client().await {
Ok(c) => c,
Err(_) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"message": "Failed to get client"
})),
);
}
};
target_clients.push(c.clone());
}

let mut all_responses = Vec::new();
for client in target_clients {
let router = match PushRouter::<(), Annotated<serde_json::Value>>::from_client(
client,
Default::default(),
)
.await
{
Ok(router) => router,
Err(_) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"message": "Failed to get router"
})),
);
}
};
let mut stream = match router.round_robin(().into()).await {
Ok(s) => s,
Err(_) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"message": "Failed to route"})),
);
}
};

while let Some(resp) = stream.next().await {
all_responses.push(resp);
}
}

return (
StatusCode::OK,
Json(serde_json::json!({
"responses": all_responses
})),
);
}
1 change: 1 addition & 0 deletions lib/llm/src/http/service/service_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ impl HttpServiceConfigBuilder {
super::openai::list_models_router(state.clone(), var(HTTP_SVC_MODELS_PATH_ENV).ok()),
super::health::health_check_router(state.clone(), var(HTTP_SVC_HEALTH_PATH_ENV).ok()),
super::health::live_check_router(state.clone(), var(HTTP_SVC_LIVE_PATH_ENV).ok()),
super::dynamic_endpoint::dynamic_endpoint_router(state.clone(), None),
];

let endpoint_routes =
Expand Down
4 changes: 3 additions & 1 deletion lib/runtime/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ pub struct Instance {
pub namespace: String,
pub instance_id: i64,
pub transport: TransportType,
#[serde(skip_serializing_if = "Option::is_none")]
pub http_endpoint_path: Option<String>,
}

impl Instance {
Expand Down Expand Up @@ -463,7 +465,7 @@ impl Endpoint {
.expect("Endpoint name and component name should be valid")
}

/// The fully path of an instance in etcd
/// The full path of an instance in etcd
pub fn etcd_path_with_lease_id(&self, lease_id: i64) -> String {
format!("{INSTANCE_ROOT_PATH}/{}", self.unique_path(lease_id))
}
Expand Down
8 changes: 8 additions & 0 deletions lib/runtime/src/component/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ pub struct EndpointConfig {
#[educe(Debug(ignore))]
#[builder(default, setter(into, strip_option))]
health_check_payload: Option<serde_json::Value>,

/// Expose this endpoint over HTTP at this path
#[builder(default, setter(into, strip_option))]
http_endpoint_path: Option<String>,
}

impl EndpointConfigBuilder {
Expand All @@ -67,6 +71,7 @@ impl EndpointConfigBuilder {
metrics_labels,
graceful_shutdown,
health_check_payload,
http_endpoint_path,
) = self.build_internal()?.dissolve();
let lease = lease.or(endpoint.drt().primary_lease());
let lease_id = lease.as_ref().map(|l| l.id()).unwrap_or(0);
Expand Down Expand Up @@ -128,6 +133,7 @@ impl EndpointConfigBuilder {
let subject = endpoint.subject_to(lease_id);
let etcd_path = endpoint.etcd_path_with_lease_id(lease_id);
let etcd_client = endpoint.component.drt.etcd_client.clone();
let http_endpoint_path = http_endpoint_path.clone();

// Register health check target in SystemHealth if provided
if let Some(health_check_payload) = &health_check_payload {
Expand All @@ -137,6 +143,7 @@ impl EndpointConfigBuilder {
namespace: namespace_name.clone(),
instance_id: lease_id,
transport: TransportType::NatsTcp(subject.clone()),
http_endpoint_path: http_endpoint_path.clone(),
};
tracing::debug!(subject = %subject, "Registering endpoint health check target");
let guard = system_health.lock().unwrap();
Expand Down Expand Up @@ -230,6 +237,7 @@ impl EndpointConfigBuilder {
namespace: namespace_name,
instance_id: lease_id,
transport: TransportType::NatsTcp(subject),
http_endpoint_path: http_endpoint_path,
};

let info = serde_json::to_vec_pretty(&info)?;
Expand Down
Loading