Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
b1e6eb4
first commit
jorgeantonio21 Aug 5, 2025
8ffe717
register runtime config after engine initialization
jorgeantonio21 Aug 5, 2025
58d73d2
add sglang runtime config values retrieval
jorgeantonio21 Aug 5, 2025
dfc9154
merge main and resolve conflicts
jorgeantonio21 Aug 5, 2025
87865fc
Merge branch 'main' into feat/ja/runtime-configs-mdc
jorgeantonio21 Aug 6, 2025
5707890
address comments in the PR
jorgeantonio21 Aug 7, 2025
9770e75
Merge branch 'main' into feat/ja/runtime-configs-mdc
jorgeantonio21 Aug 7, 2025
61f6424
refactor logic to pass in engine initialization runtime args directly…
jorgeantonio21 Aug 7, 2025
6fbe951
merge main and resolve conflicts
jorgeantonio21 Aug 11, 2025
b376cfb
resolve _core.py import issues
jorgeantonio21 Aug 11, 2025
9d3cbb1
resolve runtime issues
jorgeantonio21 Aug 11, 2025
d1b87f5
resolve import issues
jorgeantonio21 Aug 11, 2025
d18881b
resolve import issues
jorgeantonio21 Aug 11, 2025
24712cb
resolve vllm cache config issues
jorgeantonio21 Aug 11, 2025
c20f0e1
resolve non-int gpu_mem_integer issue
jorgeantonio21 Aug 11, 2025
af94e4b
resolve non-int gpu_mem_integer issue
jorgeantonio21 Aug 11, 2025
57e12c2
remove uneeded async in python code
jorgeantonio21 Aug 12, 2025
3304c8d
Merge branch 'main' into feat/ja/runtime-configs-mdc
jorgeantonio21 Aug 12, 2025
acddc6b
Merge branch 'main' into feat/ja/runtime-configs-mdc
PeaBrane Aug 12, 2025
d4b1edf
revert llama-cpp version in Cargo.lock
PeaBrane Aug 12, 2025
b7ca2f5
move runtime config into local_model
PeaBrane Aug 12, 2025
becb754
put runtime config in ModelEntry so it gets registered to etcd
PeaBrane Aug 12, 2025
5adaeb1
fmt
PeaBrane Aug 12, 2025
950e6a4
if mocker, override runtime configs
PeaBrane Aug 13, 2025
cbbd03b
router listens to runtime configs (kv total blocks)
PeaBrane Aug 13, 2025
e697253
clippy
PeaBrane Aug 13, 2025
b0dc6f3
mv runtime config bindings to new file local_model.rs
PeaBrane Aug 13, 2025
8004bbd
tensorrtllm support (vibe coded)
PeaBrane Aug 13, 2025
ef3d419
max_num_batched_tokens instead
PeaBrane Aug 13, 2025
6842494
fix sglang server_info args
PeaBrane Aug 13, 2025
3b175cf
direct access to server_Args
PeaBrane Aug 13, 2025
10773c7
sglang: access total num tokens via scheduler info
PeaBrane Aug 13, 2025
69d5d80
isort
PeaBrane Aug 13, 2025
e6de5a2
trtllm: extract directly from config
PeaBrane Aug 13, 2025
09f1cb0
trtllm: get total_kv_blocks from get_stats_async
PeaBrane Aug 13, 2025
36a6fbb
Merge branch 'feat/ja/runtime-configs-mdc' of https://github.com/jorg…
jorgeantonio21 Aug 13, 2025
280e98a
ceil division for sglang total_kv_blocks calculation
jorgeantonio21 Aug 13, 2025
3f8bcdd
hooks
jorgeantonio21 Aug 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 68 additions & 8 deletions components/backends/sglang/src/dynamo/sglang/worker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
from sglang.srt.server_args import ServerArgs
from sglang.srt.utils import get_ip, get_zmq_socket

from dynamo._core import Endpoint
from dynamo.llm import (
ForwardPassMetrics,
KvStats,
ModelRuntimeConfig,
ModelType,
WorkerMetricsPublisher,
WorkerStats,
Expand Down Expand Up @@ -333,13 +335,8 @@ async def init(
await component.create_service()

endpoint = component.endpoint("generate")
await register_llm(
ModelType.Backend,
endpoint,
server_args.model_path,
server_args.served_model_name,
kv_cache_block_size=server_args.page_size,
migration_limit=migration_limit,
await register_llm_with_runtime_config(
engine, endpoint, server_args, migration_limit
)

if server_args.disaggregation_mode != "null":
Expand All @@ -364,12 +361,75 @@ async def init(
_ = ZmqKvEventPublisher(component=component, config=zmq_config)

tasks = [endpoint.serve_endpoint(handler.generate)]

tasks.extend(setup_native_endpoints(server_args, component, handler))

await asyncio.gather(*tasks)


async def register_llm_with_runtime_config(
engine: sgl.Engine,
endpoint: Endpoint,
server_args: ServerArgs,
migration_limit: int,
):
"""Register LLM with runtime config"""
runtime_config = await _get_runtime_config(engine)
try:
await register_llm(
ModelType.Backend,
endpoint,
server_args.model_path,
server_args.served_model_name,
kv_cache_block_size=server_args.page_size,
migration_limit=migration_limit,
runtime_config=runtime_config,
)
except Exception as e:
logging.error(f"Failed to register with runtime config: {e}")
return None


async def _get_runtime_config(engine: sgl.Engine) -> Optional[ModelRuntimeConfig]:
"""Get runtime config from SGLang engine"""
try:
# Try to check if the engine has a scheduler attribute with the computed values
if hasattr(engine, "scheduler_info") and engine.scheduler_info is not None:
runtime_config = ModelRuntimeConfig()

# Get max_total_num_tokens from scheduler_info
if "max_total_num_tokens" in engine.scheduler_info:
max_total_tokens = engine.scheduler_info["max_total_num_tokens"]
if max_total_tokens and hasattr(
engine.tokenizer_manager, "server_args"
):
page_size = engine.tokenizer_manager.server_args.page_size
if page_size:
runtime_config.total_kv_blocks = (
max_total_tokens + page_size - 1
) // page_size
logging.info(
f"Got total KV blocks from scheduler: {runtime_config.total_kv_blocks} "
f"(max_total_tokens={max_total_tokens}, page_size={page_size})"
)

# Note: max_running_requests and max_prefill_tokens are NOT available in scheduler_info
# TODO: figure out where they are

return runtime_config

# If scheduler approach doesn't work, log and return None to indicate we'll skip runtime config
logging.warning(
"Could not access runtime config from SGLang engine. "
"The engine may compute these values internally after initialization. "
"Proceeding without runtime config - SGLang will use its internal defaults."
)
return None

except Exception as e:
logging.warning(f"Failed to get runtime config: {e}. Proceeding without it.")
return None


def main():
uvloop.install()
asyncio.run(worker())
Expand Down
43 changes: 40 additions & 3 deletions components/backends/trtllm/src/dynamo/trtllm/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
from torch.cuda import device_count
from transformers import AutoConfig

from dynamo.llm import ModelType, register_llm
from dynamo.llm import ModelRuntimeConfig, ModelType, register_llm
from dynamo.runtime import DistributedRuntime, dynamo_worker
from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.trtllm.engine import get_llm_engine
from dynamo.trtllm.engine import TensorRTLLMEngine, get_llm_engine
from dynamo.trtllm.multimodal_processor import MultimodalRequestProcessor
from dynamo.trtllm.publisher import get_publisher
from dynamo.trtllm.request_handlers.handlers import (
Expand All @@ -49,6 +49,39 @@ async def graceful_shutdown(runtime):
logging.info("DistributedRuntime shutdown complete")


async def get_engine_runtime_config(
engine: TensorRTLLMEngine, config: Config
) -> ModelRuntimeConfig:
"""Retrieve runtime configuration from TensorRT-LLM engine."""
runtime_config = ModelRuntimeConfig()

try:
# Extract total_kv_blocks from engine stats
stats = engine.llm.get_stats_async(timeout=5)
stat = await anext(stats)
runtime_config.total_kv_blocks = stat["kvCacheStats"]["maxNumBlocks"]
logging.info(
f"Set runtime config total_kv_blocks: {runtime_config.total_kv_blocks}"
)

# Extract max number of sequences
runtime_config.max_num_seqs = config.max_batch_size
logging.info(f"Set runtime config max_num_seqs: {runtime_config.max_num_seqs}")

# Get max_num_batched_tokens from config
runtime_config.max_num_batched_tokens = config.max_num_tokens
logging.info(
f"Set runtime config max_num_batched_tokens: {runtime_config.max_num_batched_tokens}"
)

return runtime_config

except Exception as e:
logging.error(f"Failed to get runtime config from TensorRT-LLM engine: {e}")
# Return config with default/None values if retrieval fails
return runtime_config


@dynamo_worker(static=False)
async def worker(runtime: DistributedRuntime):
# Set up signal handler for graceful shutdown
Expand Down Expand Up @@ -196,14 +229,18 @@ async def init(runtime: DistributedRuntime, config: Config):
endpoint = component.endpoint(config.endpoint)

if is_first_worker(config):
# Register the model with the endpoint if only the worker is first in the disaggregation chain.
# Get runtime configuration from the engine
runtime_config = await get_engine_runtime_config(engine, config)

# Register the model with runtime config
await register_llm(
modelType,
endpoint,
config.model_path,
config.served_model_name,
kv_cache_block_size=config.kv_block_size,
migration_limit=config.migration_limit,
runtime_config=runtime_config, # Add runtime config here
)
# publisher will be set later if publishing is enabled.
handler_config = RequestHandlerConfig(
Expand Down
39 changes: 39 additions & 0 deletions components/backends/vllm/src/dynamo/vllm/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from vllm.v1.engine.async_llm import AsyncLLM

from dynamo.llm import (
ModelRuntimeConfig,
ModelType,
ZmqKvEventPublisher,
ZmqKvEventPublisherConfig,
Expand Down Expand Up @@ -213,13 +214,25 @@ async def init(runtime: DistributedRuntime, config: Config):
handler.kv_publisher = kv_publisher

if not config.engine_args.data_parallel_rank: # if rank is 0 or None then register
runtime_config = ModelRuntimeConfig()

# make a `collective_rpc` call to get runtime configuration values
logging.info(
"Getting engine runtime configuration metadata from vLLM engine..."
)
runtime_values = get_engine_cache_info(engine_client)
runtime_config.total_kv_blocks = runtime_values["num_gpu_blocks"]
runtime_config.max_num_seqs = runtime_values["max_num_seqs"]
runtime_config.max_num_batched_tokens = runtime_values["max_num_batched_tokens"]

await register_llm(
ModelType.Backend,
generate_endpoint,
config.model,
config.served_model_name,
kv_cache_block_size=config.engine_args.block_size,
migration_limit=config.migration_limit,
runtime_config=runtime_config,
)

try:
Expand All @@ -237,6 +250,32 @@ async def init(runtime: DistributedRuntime, config: Config):
handler.cleanup()


def get_engine_cache_info(engine: AsyncLLM):
"""Retrieve cache configuration information from [`AsyncLLM`] engine."""

try:
# Get values directly from vllm_config instead of collective_rpc
cache_values = {
"num_gpu_blocks": engine.vllm_config.cache_config.num_gpu_blocks,
}

scheduler_values = {
"max_num_seqs": engine.vllm_config.scheduler_config.max_num_seqs,
"max_num_batched_tokens": engine.vllm_config.scheduler_config.max_num_batched_tokens,
}

logging.info(f"Cache config values: {cache_values}")
logging.info(f"Scheduler config values: {scheduler_values}")
return {
"num_gpu_blocks": cache_values["num_gpu_blocks"],
"max_num_seqs": scheduler_values["max_num_seqs"],
"max_num_batched_tokens": scheduler_values["max_num_batched_tokens"],
}
except Exception as e:
logging.error(f"Failed to get configuration values from vLLM config: {e}")
raise


def main():
uvloop.run(worker())

Expand Down
5 changes: 3 additions & 2 deletions components/router/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
// 2. Update the backend component to produce a config in a standard location.
// 3. Update the KvRouter to read the config from the backend component.

use std::collections::HashMap;
use std::sync::Arc;

use clap::Parser;
Expand All @@ -29,7 +30,7 @@ use dynamo_llm::kv_router::{
scheduler::{DefaultWorkerSelector, KvSchedulerError, SchedulingRequest},
KvRouter, WorkerSelector,
};
use dynamo_runtime::component::Instance;
use dynamo_llm::local_model::runtime_config::ModelRuntimeConfig;
use dynamo_runtime::{
logging, pipeline::network::Ingress, DistributedRuntime, Result, Runtime, Worker,
};
Expand Down Expand Up @@ -86,7 +87,7 @@ pub struct CustomWorkerSelector(DefaultWorkerSelector);
impl WorkerSelector for CustomWorkerSelector {
fn select_worker(
&self,
workers: &[Instance],
workers: &HashMap<i64, Option<ModelRuntimeConfig>>,
request: &SchedulingRequest,
block_size: u32,
) -> Result<WorkerSelectionResult, KvSchedulerError> {
Expand Down
7 changes: 6 additions & 1 deletion lib/bindings/python/rust/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use dynamo_runtime::{
use dynamo_llm::{self as llm_rs};
use dynamo_llm::{entrypoint::RouterConfig, kv_router::KvRouterConfig};

use crate::llm::local_model::ModelRuntimeConfig;

#[pyclass(eq, eq_int)]
#[derive(Clone, Debug, PartialEq)]
pub enum RouterMode {
Expand Down Expand Up @@ -82,6 +84,7 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<llm::entrypoint::KvRouterConfig>()?;
m.add_class::<llm::kv::WorkerMetricsPublisher>()?;
m.add_class::<llm::model_card::ModelDeploymentCard>()?;
m.add_class::<llm::local_model::ModelRuntimeConfig>()?;
m.add_class::<llm::preprocessor::OAIChatPreprocessor>()?;
m.add_class::<llm::backend::Backend>()?;
m.add_class::<llm::kv::OverlapScores>()?;
Expand Down Expand Up @@ -131,7 +134,7 @@ fn log_message(level: &str, message: &str, module: &str, file: &str, line: u32)
}

#[pyfunction]
#[pyo3(signature = (model_type, endpoint, model_path, model_name=None, context_length=None, kv_cache_block_size=None, router_mode=None, migration_limit=0, user_data=None))]
#[pyo3(signature = (model_type, endpoint, model_path, model_name=None, context_length=None, kv_cache_block_size=None, router_mode=None, migration_limit=0, runtime_config=None, user_data=None))]
#[allow(clippy::too_many_arguments)]
fn register_llm<'p>(
py: Python<'p>,
Expand All @@ -143,6 +146,7 @@ fn register_llm<'p>(
kv_cache_block_size: Option<u32>,
router_mode: Option<RouterMode>,
migration_limit: u32,
runtime_config: Option<ModelRuntimeConfig>,
user_data: Option<&Bound<'p, PyDict>>,
) -> PyResult<Bound<'p, PyAny>> {
let model_type_obj = match model_type {
Expand Down Expand Up @@ -173,6 +177,7 @@ fn register_llm<'p>(
.kv_cache_block_size(kv_cache_block_size)
.router_config(Some(router_config))
.migration_limit(Some(migration_limit))
.runtime_config(runtime_config.unwrap_or_default().inner)
.user_data(user_data_json);
// Download from HF, load the ModelDeploymentCard
let mut local_model = builder.build().await.map_err(to_pyerr)?;
Expand Down
1 change: 1 addition & 0 deletions lib/bindings/python/rust/llm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub mod block_manager;
pub mod disagg_router;
pub mod entrypoint;
pub mod kv;
pub mod local_model;
pub mod model_card;
pub mod nats;
pub mod preprocessor;
3 changes: 2 additions & 1 deletion lib/bindings/python/rust/llm/entrypoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ pub fn make_engine<'p>(
.kv_cache_block_size(args.kv_cache_block_size)
.router_config(args.router_config.clone().map(|rc| rc.into()))
.http_port(args.http_port)
.is_mocker(matches!(args.engine_type, EngineType::Mocker));
.is_mocker(matches!(args.engine_type, EngineType::Mocker))
.extra_engine_args(args.extra_engine_args.clone());
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let local_model = builder.build().await.map_err(to_pyerr)?;
let inner = select_engine(distributed_runtime, args, local_model)
Expand Down
Loading
Loading