Skip to content
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
b317939
no errors for now
Oct 2, 2025
6d25ddc
bump
ishandhanani Oct 3, 2025
f90eb88
Merge branch 'main' into ishan/late-registration-ye
ishandhanani Oct 3, 2025
3b492f4
refactor
ishandhanani Oct 3, 2025
2124d91
found
ishandhanani Oct 3, 2025
ac9357e
nice
ishandhanani Oct 4, 2025
999c716
err
ishandhanani Oct 4, 2025
889f0c5
cleanup
ishandhanani Oct 5, 2025
9562483
bump
ishandhanani Oct 5, 2025
48bc524
go
ishandhanani Oct 5, 2025
ad934a4
Merge branch 'main' into ishan/late-registration-ye
ishandhanani Oct 5, 2025
c2d10ee
working
ishandhanani Oct 5, 2025
f3d74b8
go
ishandhanani Oct 5, 2025
3820e80
profile
ishandhanani Oct 5, 2025
a791ca7
bump:
ishandhanani Oct 5, 2025
5886ae4
redundant space
ishandhanani Oct 5, 2025
5166638
bump
ishandhanani Oct 5, 2025
35aa49e
slightly logic change since only agg endpoints supported now
ishandhanani Oct 6, 2025
b0ae2d4
reshuffle
ishandhanani Oct 6, 2025
c5307e3
Merge branch 'main' into ishan/late-registration-ye
ishandhanani Oct 6, 2025
f50c54b
selective pieces return a 404
ishandhanani Oct 6, 2025
3e6bf2c
singleton drt
ishandhanani Oct 6, 2025
f6f7c91
bump
ishandhanani Oct 6, 2025
e774001
bump
ishandhanani Oct 6, 2025
4502810
bump
ishandhanani Oct 6, 2025
18f590d
Merge branch 'main' into ishan/late-registration-ye
ishandhanani Oct 7, 2025
0cb978a
bump
ishandhanani Oct 7, 2025
145fb06
Merge branch 'main' into ishan/late-registration-ye
ishandhanani Oct 7, 2025
95e3a4f
swap etcd with drt in http builder
ishandhanani Oct 7, 2025
cdc4da0
gp
ishandhanani Oct 7, 2025
3d79298
lel
ishandhanani Oct 7, 2025
7ffe3e2
test
ishandhanani Oct 7, 2025
28e268d
no panicgit add .!
ishandhanani Oct 7, 2025
7852920
bruh
ishandhanani Oct 7, 2025
a3e2839
bump
ishandhanani Oct 7, 2025
11e8439
simple
ishandhanani Oct 7, 2025
2b7d61a
full refactor
ishandhanani Oct 8, 2025
6f194ee
try
ishandhanani Oct 8, 2025
c6c39cf
handling
ishandhanani Oct 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions components/backends/sglang/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,7 @@ uv pip install maturin
cd $DYNAMO_HOME/lib/bindings/python
maturin develop --uv
cd $DYNAMO_HOME
# installs sglang supported version along with dynamo
# include the prerelease flag to install flashinfer rc versions
uv pip install --prerelease=allow -e .[sglang]
uv pip install -e .[sglang]
```

</details>
Expand Down
13 changes: 8 additions & 5 deletions components/src/dynamo/sglang/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
MultimodalPrefillWorkerHandler,
MultimodalProcessorHandler,
MultimodalWorkerHandler,
NativeApiHandler,
PrefillWorkerHandler,
)

Expand Down Expand Up @@ -73,9 +74,10 @@ async def init(runtime: DistributedRuntime, config: Config):

generate_endpoint = component.endpoint(dynamo_args.endpoint)

# TODO: think about implementing DisaggregationStrategy for P->D
# TODO: implement a `next` field in the config to dynamically set the next client
publisher, metrics_task, metrics_labels = await setup_sgl_metrics(engine, component)

prefill_client = None
native_api_tasks = []
if config.serving_mode == DisaggregationMode.DECODE:
logging.info("Initializing prefill client")
prefill_client = (
Expand All @@ -84,8 +86,9 @@ async def init(runtime: DistributedRuntime, config: Config):
.endpoint("generate")
.client()
)

publisher, metrics_task, metrics_labels = await setup_sgl_metrics(engine, component)
else:
native_api_handler = NativeApiHandler(component, engine, metrics_labels)
native_api_tasks = await native_api_handler.init_native_apis()

kv_publisher = None
if server_args.kv_events_config:
Expand Down Expand Up @@ -129,7 +132,6 @@ async def register_model():
health_check_payload = SglangHealthCheckPayload(engine).to_dict()

try:
# Start endpoint immediately and register model concurrently
# Requests queue until ready_event is set
await asyncio.gather(
generate_endpoint.serve_endpoint(
Expand All @@ -139,6 +141,7 @@ async def register_model():
health_check_payload=health_check_payload,
),
register_model(),
*native_api_tasks,
)
except Exception as e:
logging.error(f"Failed to serve endpoints: {e}")
Expand Down
2 changes: 2 additions & 0 deletions components/src/dynamo/sglang/request_handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
MultimodalPrefillWorkerHandler,
MultimodalWorkerHandler,
)
from .native_api_handler import NativeApiHandler
from .prefill_handler import PrefillWorkerHandler

__all__ = [
Expand All @@ -23,4 +24,5 @@
"MultimodalEncodeWorkerHandler",
"MultimodalWorkerHandler",
"MultimodalPrefillWorkerHandler",
"NativeApiHandler",
]
118 changes: 118 additions & 0 deletions components/src/dynamo/sglang/request_handlers/native_api_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

# SGLang Native APIs: https://docs.sglang.ai/basic_usage/native_api.html
# Code: https://github.com/sgl-project/sglang/blob/main/python/sglang/srt/entrypoints/http_server.py

import asyncio
import logging
from typing import List, Optional, Tuple

import sglang as sgl
from sglang.srt.managers.io_struct import ProfileReqInput

from dynamo._core import Component


class NativeApiHandler:
"""Handler to add sglang native API endpoints to workers"""

def __init__(
self,
component: Component,
engine: sgl.Engine,
metrics_labels: Optional[List[Tuple[str, str]]] = None,
):
self.component = component
self.engine = engine
self.metrics_labels = metrics_labels
self.native_api_tasks = []

async def init_native_apis(
self,
) -> List[asyncio.Task]:
"""
Initialize and register native API endpoints.
Returns list of tasks to be gathered.
"""
logging.info("Initializing native SGLang API endpoints")

self.tm = self.engine.tokenizer_manager

tasks = []

model_info_ep = self.component.endpoint("get_model_info")
start_profile_ep = self.component.endpoint("start_profile")
stop_profile_ep = self.component.endpoint("stop_profile")
tasks.extend(
[
model_info_ep.serve_endpoint(
self.get_model_info,
graceful_shutdown=True,
metrics_labels=self.metrics_labels,
http_endpoint_path="/get_model_info",
),
start_profile_ep.serve_endpoint(
self.start_profile,
graceful_shutdown=True,
metrics_labels=self.metrics_labels,
http_endpoint_path="/start_profile",
),
stop_profile_ep.serve_endpoint(
self.stop_profile,
graceful_shutdown=True,
metrics_labels=self.metrics_labels,
http_endpoint_path="/stop_profile",
),
]
)

self.native_api_tasks = tasks
logging.info(f"Registered {len(tasks)} native API endpoints")
return tasks

async def get_model_info(self, request: dict):
result = {
"model_path": self.tm.server_args.model_path,
"tokenizer_path": self.tm.server_args.tokenizer_path,
"preferred_sampling_params": self.tm.server_args.preferred_sampling_params,
"weight_version": self.tm.server_args.weight_version,
}

yield {"data": [result]}

async def start_profile(self, request: dict):
try:
obj = ProfileReqInput.model_validate(request)
except Exception:
obj = None

if obj is None:
obj = ProfileReqInput()

output_dir = obj.output_dir or f"profile_{self.tm.server_args.model_path}"

await self.tm.start_profile(
output_dir=output_dir,
start_step=obj.start_step,
num_steps=obj.num_steps,
activities=obj.activities,
with_stack=obj.with_stack,
record_shapes=obj.record_shapes,
profile_by_stage=obj.profile_by_stage,
)

yield {"data": [{"status": "started profile"}]}

async def stop_profile(self, request: dict):
asyncio.create_task(self.tm.stop_profile())
yield {
"data": [
{
"status": (
"Stopped profile. This might take a long time to complete. "
f"Results should be available in the 'profile_{self.tm.server_args.model_path}' directory."
)
}
]
}
7 changes: 6 additions & 1 deletion lib/bindings/python/rust/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,14 +643,15 @@ impl Component {

#[pymethods]
impl Endpoint {
#[pyo3(signature = (generator, graceful_shutdown = true, metrics_labels = None, health_check_payload = None))]
#[pyo3(signature = (generator, graceful_shutdown = true, metrics_labels = None, health_check_payload = None, http_endpoint_path = None))]
fn serve_endpoint<'p>(
&self,
py: Python<'p>,
generator: PyObject,
graceful_shutdown: Option<bool>,
metrics_labels: Option<Vec<(String, String)>>,
health_check_payload: Option<&Bound<'p, PyDict>>,
http_endpoint_path: Option<&str>,
) -> PyResult<Bound<'p, PyAny>> {
let engine = Arc::new(engine::PythonAsyncEngine::new(
generator,
Expand Down Expand Up @@ -688,6 +689,10 @@ impl Endpoint {
builder = builder.health_check_payload(payload);
}

if let Some(http_endpoint_path) = http_endpoint_path {
builder = builder.http_endpoint_path(http_endpoint_path);
}

let graceful_shutdown = graceful_shutdown.unwrap_or(true);
pyo3_async_runtimes::tokio::future_into_py(py, async move {
builder
Expand Down
2 changes: 1 addition & 1 deletion lib/bindings/python/src/dynamo/_core.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class Endpoint:

...

async def serve_endpoint(self, handler: RequestHandler, graceful_shutdown: bool = True, metrics_labels: Optional[List[Tuple[str, str]]] = None, health_check_payload: Optional[Dict[str, Any]] = None) -> None:
async def serve_endpoint(self, handler: RequestHandler, graceful_shutdown: bool = True, metrics_labels: Optional[List[Tuple[str, str]]] = None, health_check_payload: Optional[Dict[str, Any]] = None, http_endpoint_path: Optional[str] = None) -> None:
"""
Serve an endpoint discoverable by all connected clients at
`{{ namespace }}/components/{{ component_name }}/endpoints/{{ endpoint_name }}`
Expand Down
1 change: 1 addition & 0 deletions lib/llm/src/http/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
mod openai;

pub mod disconnect;
pub mod dynamic_endpoint;
pub mod error;
pub mod health;
pub mod metrics;
Expand Down
112 changes: 112 additions & 0 deletions lib/llm/src/http/service/dynamic_endpoint.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use super::{RouteDoc, service_v2};
use crate::types::Annotated;
use axum::{
Json, Router,
http::{Method, StatusCode},
response::IntoResponse,
routing::post,
};
use dynamo_runtime::instances::list_all_instances;
use dynamo_runtime::{DistributedRuntime, Runtime, component::Client};
use dynamo_runtime::{pipeline::PushRouter, stream::StreamExt};
use std::sync::Arc;

pub fn dynamic_endpoint_router(
state: Arc<service_v2::State>,
path: Option<String>,
) -> (Vec<RouteDoc>, Router) {
let wildcard_path = "/{*path}";
let path = path.unwrap_or_else(|| wildcard_path.to_string());

let docs: Vec<RouteDoc> = vec![RouteDoc::new(Method::POST, &path)];

let router = Router::new()
.route(&path, post(dynamic_endpoint_handler))
.with_state(state);

(docs, router)
}

async fn inner_dynamic_endpoint_handler(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A general design question - correct me if I'm understanding the changes properly or not

Currently, this change more or less lets you call an arbitrary route curl -X POST localhost:8000/my_custom_route.

  • If the route is something the exists from another handler (ex: /health, /v1/models, etc.), presumably that specific handler will be invoked instead? Or will it also match this one?
  • If the route doesn't match any of the other handlers, it will hit this one. At that point in time, per request, we query ETCD to see if any endpoint instances have set an http_endpoint_path key, and if so we will try to route the request to all of those matched instances round robin

This is my understanding of the current changes.

Assuming it's roughly accurate - my next question is why not use something like the watcher pattern we have elsewhere for discovery, that will be watching in background for specific keys (like /http_endpoint_path), and maybe dynamically adds/removes routes to the http server and associates corresponding endpoints/instances in some map for them there, rather than on a per request basis here?

I'm a little hesitant about the additional per-request checking here rather than something more discovery-oriented.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a blocking comment yet, just looking to understand the approach here better, and get more context on use case, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreeing with Ryan here. I would rather attach a route for the endpoint when it appears, instead of attaching a wildcard route and hitting etcd on every request.

Copy link
Contributor Author

@ishandhanani ishandhanani Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the route is something the exists from another handler (ex: /health, /v1/models, etc.), presumably that specific handler will be invoked instead? Or will it also match this one?

I believe it will go to the specific handler.

my next question is why not use something like the watcher pattern we have elsewhere for discovery
associates corresponding endpoints/instances in some map for them there, rather than on a per request basis here?

This is actually how I implemented it before. But I feel like it made things a little bit messy. Why have a separate map when it can all just be under a single endpoint entry? If the endpoint goes down, this will also take the etcd entry down which will also take the http_endpoint section as well.

hitting etcd on every request.

Endpoints that are implemented in this fashion are not meant to be endpoints where we serve heavy traffic by any means. Checking etcd here doesn't seem like it costs much. Why have another watcher?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Endpoints that are implemented in this fashion are not meant to be endpoints where we serve heavy traffic by any means. Checking etcd here doesn't seem like it costs much. Why have another watcher?

I think right now it's a "hey you can expose any endpoint here" feature - so I won't be surprised at all if someone tries calling some native endpoint for some other use case that maybe dynamo doesn't natively support yet but the framework does as a stopgap solution until we do support it. And if so, then we lose this assumption right?

I think I'm less worried optimiznig for the extreme heavy load case on one of these custom endpoints (in terms of expecting it to happen) and moreso just general code smell of doing unnecessary work and checking something on every request if we can instead only act to do the bare minimum when necessary (on discovery). At the end of the day we have limited resources (threads, CPUs, etc.) and the less we use them, the more resources the heavy load endpoints (chat, completions, nats, etcd, etc.) have to work freely with and less we have to worry about later.

For example, if any of these native endpoints are things that may not get heavy load, but may get polled say every second or every few seconds, that could be non trivial at some point. Though this implementation is completely custom support for anything, so I can't really guess what all it would be used for.

This is actually how I implemented it before. But I feel like it made things a little bit messy.

Do you have a draft/commit to refer to the original solution?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not complete but check out b317939

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spoke to Ryan offline. I think my approach before stemmed from a lack of understanding the watcher pattern and how our ModelWatcher worked.

Refactored in latest commit

state: Arc<service_v2::State>,
path: String,
) -> Result<impl IntoResponse, &'static str> {
let etcd_client = state.etcd_client().ok_or("Failed to get etcd client")?;

let instances = list_all_instances(etcd_client)
.await
.map_err(|_| "Failed to get instances")?;

let dynamic_endpoints = instances
.iter()
.filter_map(|instance| instance.http_endpoint_path.clone())
.collect::<Vec<String>>();

let fmt_path = format!("/{}", &path);
if !dynamic_endpoints.contains(&fmt_path) {
return Err("Dynamic endpoint not found");
}

let rt = Runtime::from_current().map_err(|_| "Failed to get runtime")?;
let drt = DistributedRuntime::from_settings(rt)
.await
.map_err(|_| "Failed to get distributed runtime")?;

let target_instances = instances
.iter()
.filter(|instance| instance.http_endpoint_path == Some(fmt_path.clone()))
.collect::<Vec<_>>();

let mut target_clients: Vec<Client> = Vec::new();
for instance in target_instances {
let ns = drt
.namespace(instance.namespace.clone())
.map_err(|_| "Failed to get namespace")?;
let c = ns
.component(instance.component.clone())
.map_err(|_| "Failed to get component")?;
let ep = c.endpoint(path.clone());
let client = ep.client().await.map_err(|_| "Failed to get client")?;
target_clients.push(client);
}

let mut all_responses = Vec::new();
for client in target_clients {
let router =
PushRouter::<(), Annotated<serde_json::Value>>::from_client(client, Default::default())
.await
.map_err(|_| "Failed to get router")?;

let mut stream = router
.round_robin(().into())
.await
.map_err(|_| "Failed to route")?;

while let Some(resp) = stream.next().await {
all_responses.push(resp);
}
}

Ok(Json(serde_json::json!({
"responses": all_responses
})))
}

async fn dynamic_endpoint_handler(
axum::extract::State(state): axum::extract::State<Arc<service_v2::State>>,
axum::extract::Path(path): axum::extract::Path<String>,
) -> impl IntoResponse {
inner_dynamic_endpoint_handler(state, path)
.await
.map_err(|err_string| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"message": err_string
})),
)
})
}
1 change: 1 addition & 0 deletions lib/llm/src/http/service/service_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ impl HttpServiceConfigBuilder {
super::openai::list_models_router(state.clone(), var(HTTP_SVC_MODELS_PATH_ENV).ok()),
super::health::health_check_router(state.clone(), var(HTTP_SVC_HEALTH_PATH_ENV).ok()),
super::health::live_check_router(state.clone(), var(HTTP_SVC_LIVE_PATH_ENV).ok()),
super::dynamic_endpoint::dynamic_endpoint_router(state.clone(), None),
];

let endpoint_routes =
Expand Down
4 changes: 3 additions & 1 deletion lib/runtime/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ pub struct Instance {
pub namespace: String,
pub instance_id: i64,
pub transport: TransportType,
#[serde(skip_serializing_if = "Option::is_none")]
pub http_endpoint_path: Option<String>,
}

impl Instance {
Expand Down Expand Up @@ -463,7 +465,7 @@ impl Endpoint {
.expect("Endpoint name and component name should be valid")
}

/// The fully path of an instance in etcd
/// The full path of an instance in etcd
pub fn etcd_path_with_lease_id(&self, lease_id: i64) -> String {
format!("{INSTANCE_ROOT_PATH}/{}", self.unique_path(lease_id))
}
Expand Down
Loading
Loading