diff --git a/recipies/h100/aggdsr1sglangfp8/dsr1-fp8-agg-workeronly-tp8-pp2_1K_1K.yaml b/recipies/h100/aggdsr1sglangfp8/dsr1-fp8-agg-workeronly-tp8-pp2_1K_1K.yaml new file mode 100644 index 00000000..256797d0 --- /dev/null +++ b/recipies/h100/aggdsr1sglangfp8/dsr1-fp8-agg-workeronly-tp8-pp2_1K_1K.yaml @@ -0,0 +1,72 @@ +name: "h100-dsr1-fp8-agg-workeronly-tp8-pp2_1K_1K" + +model: + path: "dsr1-0528" + container: "docker://lmsysorg/sglang:v0.5.8-cu130-runtime" + precision: "fp8" + +resources: + gpu_type: "h100" + gpus_per_node: 8 + # TP*PP = 8*2 = 16 GPUs total → 2 nodes @ 8 GPUs each + agg_nodes: 2 + agg_workers: 1 +slurm: + time_limit: "02:00:00" + +sbatch_directives: + # Prevent automatic cancellation during long model-load / warmup periods with idle GPUs. + # (Cluster reaper expects this JSON under --comment) + comment: >- + '{"OccupiedIdleGPUsJobReaper":{"exemptIdleTimeMins":"60","reason":"data_loading","description":"DeepSeek-R1 FP8 model load + warmup can keep some GPUs idle initially"}}' + +frontend: + # Stock SGLang: + # - agg_workers=1 => worker-only (direct-to-worker) handled automatically under the hood + # - agg_workers>1 or disagg => router + type: sglang + +backend: + type: sglang + + aggregated_environment: + TORCH_CUDA_ARCH_LIST: "9.0" + TORCH_DISTRIBUTED_DEFAULT_TIMEOUT: "1800" + + sglang_config: + aggregated: + # srtctl mounts host model dir -> /model inside container + model-path: "/model/" + tokenizer-path: "/model/" + served-model-name: "deepseek-ai/DeepSeek-R1-0528" + trust-remote-code: true + + tensor-parallel-size: 8 + data-parallel-size: 1 + pipeline-parallel-size: 2 + + disable-radix-cache: true + max-running-requests: 128 + cuda-graph-max-bs: 128 + chunked-prefill-size: 16000 + max-prefill-tokens: 16000 + mem-fraction-static: 0.70 + kv-cache-dtype: "auto" + attention-backend: "flashinfer" + stream-interval: 10 + decode-log-interval: 1 + +benchmark: + type: "sa-bench" + isl: 1024 + osl: 1024 + concurrencies: "1x2x4x8x16x32x64x128x256x512" + req_rate: "inf" + +# DSR1 can take a long time to load weights across 2 nodes. +# Health timeout controls how long srtctl waits for the worker to become ready (independent of SLURM time_limit). +health_check: + max_attempts: 720 # 720 * 10s = 7200s (matches 2:00:00 time limit) + interval_seconds: 10 + + diff --git a/recipies/h100/aggdsr1sglangfp8/dsr1-fp8-agg-workeronly-tp8-pp2_1K_1K_nsys.yaml b/recipies/h100/aggdsr1sglangfp8/dsr1-fp8-agg-workeronly-tp8-pp2_1K_1K_nsys.yaml new file mode 100644 index 00000000..5cd22f43 --- /dev/null +++ b/recipies/h100/aggdsr1sglangfp8/dsr1-fp8-agg-workeronly-tp8-pp2_1K_1K_nsys.yaml @@ -0,0 +1,88 @@ +name: "h100-dsr1-fp8-agg-workeronly-tp8-pp2_1K_1K_nsys" + +model: + path: "dsr1-0528" + container: "docker://lmsysorg/sglang:v0.5.8-cu130-runtime" + precision: "fp8" + +resources: + gpu_type: "h100" + gpus_per_node: 8 + # TP*PP = 8*2 = 16 GPUs total → 2 nodes @ 8 GPUs each + agg_nodes: 2 + agg_workers: 1 +slurm: + time_limit: "02:00:00" + +sbatch_directives: + # Prevent automatic cancellation during long model-load / warmup periods with idle GPUs. + # (Cluster reaper expects this JSON under --comment) + comment: >- + '{"OccupiedIdleGPUsJobReaper":{"exemptIdleTimeMins":"60","reason":"data_loading","description":"DeepSeek-R1 FP8 model load + warmup can keep some GPUs idle initially"}}' + +# Nsight Systems (nsys) is not shipped in the SGLang runtime container. +# Mount the site-provided Nsight Systems CLI into the container and put it on PATH. +environment: + PATH: "/opt/nsight/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" + LD_LIBRARY_PATH: "/opt/nsight/target-linux-x64:/usr/local/lib:/usr/lib/x86_64-linux-gnu:/lib/x86_64-linux-gnu" + +container_mounts: + "/lustre/fsw/portfolios/general/users/yeswanthk/nsight2025.6.1/opt/nvidia/nsight-systems-cli/2025.6.1": "/opt/nsight" + +frontend: + # Stock SGLang: + # - agg_workers=1 => worker-only (direct-to-worker) handled automatically under the hood + # - agg_workers>1 or disagg => router + type: sglang + +backend: + type: sglang + + aggregated_environment: + TORCH_CUDA_ARCH_LIST: "9.0" + TORCH_DISTRIBUTED_DEFAULT_TIMEOUT: "1800" + + sglang_config: + aggregated: + # srtctl mounts host model dir -> /model inside container + model-path: "/model/" + tokenizer-path: "/model/" + served-model-name: "deepseek-ai/DeepSeek-R1-0528" + trust-remote-code: true + + tensor-parallel-size: 8 + data-parallel-size: 1 + pipeline-parallel-size: 2 + + disable-radix-cache: true + max-running-requests: 128 + cuda-graph-max-bs: 128 + chunked-prefill-size: 16000 + max-prefill-tokens: 16000 + mem-fraction-static: 0.70 + kv-cache-dtype: "auto" + attention-backend: "flashinfer" + stream-interval: 10 + decode-log-interval: 1 + +# Profiling and benchmarking are mutually exclusive. For nsys, set benchmark to manual. +benchmark: + type: "manual" + +profiling: + type: "nsys" + isl: 1024 + osl: 1024 + # Keep this modest for profiling (nsys output size grows quickly with concurrency). + concurrency: 16 + aggregated: + start_step: 10 + stop_step: 30 + +# DSR1 can take a long time to load weights across 2 nodes. +# Health timeout controls how long srtctl waits for the worker to become ready (independent of SLURM time_limit). +health_check: + max_attempts: 720 # 720 * 10s = 7200s (matches 2:00:00 time limit) + interval_seconds: 10 + + diff --git a/recipies/h100/aggdsr1sglangfp8/dsr1-fp8-agg-workeronly-tp8-pp2_1K_8K.yaml b/recipies/h100/aggdsr1sglangfp8/dsr1-fp8-agg-workeronly-tp8-pp2_1K_8K.yaml new file mode 100644 index 00000000..ee07c6ea --- /dev/null +++ b/recipies/h100/aggdsr1sglangfp8/dsr1-fp8-agg-workeronly-tp8-pp2_1K_8K.yaml @@ -0,0 +1,73 @@ +name: "h100-dsr1-fp8-agg-workeronly-tp8-pp2_1K_8K" + +model: + path: "dsr1-0528" + container: "docker://lmsysorg/sglang:v0.5.8-cu130-runtime" + precision: "fp8" + +resources: + gpu_type: "h100" + gpus_per_node: 8 + # TP*PP = 8*2 = 16 GPUs total → 2 nodes @ 8 GPUs each + agg_nodes: 2 + agg_workers: 1 + +slurm: + time_limit: "03:00:00" + +sbatch_directives: + # Prevent automatic cancellation during long model-load / warmup periods with idle GPUs. + # (Cluster reaper expects this JSON under --comment) + comment: >- + '{"OccupiedIdleGPUsJobReaper":{"exemptIdleTimeMins":"60","reason":"data_loading","description":"DeepSeek-R1 FP8 model load + warmup can keep some GPUs idle initially"}}' + +frontend: + # Stock SGLang: + # - agg_workers=1 => worker-only (direct-to-worker) handled automatically under the hood + # - agg_workers>1 or disagg => router + type: sglang + +backend: + type: sglang + + aggregated_environment: + TORCH_CUDA_ARCH_LIST: "9.0" + TORCH_DISTRIBUTED_DEFAULT_TIMEOUT: "1800" + + sglang_config: + aggregated: + # srtctl mounts host model dir -> /model inside container + model-path: "/model/" + tokenizer-path: "/model/" + served-model-name: "deepseek-ai/DeepSeek-R1-0528" + trust-remote-code: true + + tensor-parallel-size: 8 + data-parallel-size: 1 + pipeline-parallel-size: 2 + + disable-radix-cache: true + max-running-requests: 128 + cuda-graph-max-bs: 128 + chunked-prefill-size: 16000 + max-prefill-tokens: 16000 + mem-fraction-static: 0.70 + kv-cache-dtype: "auto" + attention-backend: "flashinfer" + stream-interval: 10 + decode-log-interval: 1 + +benchmark: + type: "sa-bench" + isl: 1024 + osl: 8192 + concurrencies: "1x2x4x8x16x32x64x128x176x256" + req_rate: "inf" + +# DSR1 can take a long time to load weights across 2 nodes. +# Default health timeout (max_attempts * interval_seconds) was too short and caused premature job failure. +health_check: + max_attempts: 720 # 720 * 10s = 7200s (matches 2:00:00 time limit) + interval_seconds: 10 + + diff --git a/recipies/h100/aggdsr1sglangfp8/dsr1-fp8-agg-workeronly-tp8-pp2_8K_1K.yaml b/recipies/h100/aggdsr1sglangfp8/dsr1-fp8-agg-workeronly-tp8-pp2_8K_1K.yaml new file mode 100644 index 00000000..51804b26 --- /dev/null +++ b/recipies/h100/aggdsr1sglangfp8/dsr1-fp8-agg-workeronly-tp8-pp2_8K_1K.yaml @@ -0,0 +1,70 @@ +name: "h100-dsr1-fp8-agg-workeronly-tp8-pp2_8K_1K" + +model: + path: "dsr1-0528" + container: "docker://lmsysorg/sglang:v0.5.8-cu130-runtime" + precision: "fp8" + +resources: + gpu_type: "h100" + gpus_per_node: 8 + # TP*PP = 8*2 = 16 GPUs total → 2 nodes @ 8 GPUs each + agg_nodes: 2 + agg_workers: 1 + +sbatch_directives: + # Prevent automatic cancellation during long model-load / warmup periods with idle GPUs. + # (Cluster reaper expects this JSON under --comment) + comment: >- + '{"OccupiedIdleGPUsJobReaper":{"exemptIdleTimeMins":"60","reason":"data_loading","description":"DeepSeek-R1 FP8 model load + warmup can keep some GPUs idle initially"}}' + +frontend: + # Stock SGLang: + # - agg_workers=1 => worker-only (direct-to-worker) handled automatically under the hood + # - agg_workers>1 or disagg => router + type: sglang + +backend: + type: sglang + + aggregated_environment: + TORCH_CUDA_ARCH_LIST: "9.0" + TORCH_DISTRIBUTED_DEFAULT_TIMEOUT: "1800" + + sglang_config: + aggregated: + # srtctl mounts host model dir -> /model inside container + model-path: "/model/" + tokenizer-path: "/model/" + served-model-name: "deepseek-ai/DeepSeek-R1-0528" + trust-remote-code: true + + tensor-parallel-size: 8 + data-parallel-size: 1 + pipeline-parallel-size: 2 + + disable-radix-cache: true + max-running-requests: 128 + cuda-graph-max-bs: 128 + chunked-prefill-size: 16000 + max-prefill-tokens: 16000 + mem-fraction-static: 0.70 + kv-cache-dtype: "auto" + attention-backend: "flashinfer" + stream-interval: 10 + decode-log-interval: 1 + +benchmark: + type: "sa-bench" + isl: 8192 + osl: 1024 + concurrencies: "1x2x64" + req_rate: "inf" + +# DSR1 can take a long time to load weights across 2 nodes. +# Health timeout controls how long srtctl waits for the worker to become ready (independent of SLURM time_limit). +health_check: + max_attempts: 720 # 720 * 10s = 7200s (matches 2:00:00 time limit) + interval_seconds: 10 + + diff --git a/src/srtctl/backends/sglang.py b/src/srtctl/backends/sglang.py index 62635017..1d908cc8 100644 --- a/src/srtctl/backends/sglang.py +++ b/src/srtctl/backends/sglang.py @@ -220,7 +220,7 @@ def build_worker_command( process: The process to start endpoint_processes: All processes for this endpoint (for multi-node) runtime: Runtime context with paths and settings - frontend_type: Frontend type - "sglang" uses sglang.launch_server, "dynamo" uses dynamo.sglang + frontend_type: Frontend type - "sglang"/"direct" use sglang.launch_server, "dynamo" uses dynamo.sglang profiling_enabled: Whether profiling is enabled (forces sglang.launch_server) nsys_prefix: Optional nsys profiling command prefix dump_config_path: Path to dump config JSON @@ -240,7 +240,7 @@ def build_worker_command( # Choose Python module # When profiling is enabled, always use sglang.launch_server (not dynamo.sglang) - use_sglang = frontend_type == "sglang" or profiling_enabled + use_sglang = frontend_type in ("sglang", "direct", "none") or profiling_enabled python_module = "sglang.launch_server" if use_sglang else "dynamo.sglang" # Get served model name from config @@ -289,8 +289,9 @@ def build_worker_command( ] ) - # Add config dump path (not when using sglang frontend) - if dump_config_path and frontend_type != "sglang": + # Add config dump path (only for dynamo.sglang). + # sglang.launch_server does not support --dump-config-to. + if dump_config_path and frontend_type not in ("sglang", "direct", "none"): cmd.extend(["--dump-config-to", str(dump_config_path)]) # Add kv-events-config if enabled for this mode and we have an allocated port diff --git a/src/srtctl/cli/do_sweep.py b/src/srtctl/cli/do_sweep.py index 6c7542d0..9ff2fb8d 100644 --- a/src/srtctl/cli/do_sweep.py +++ b/src/srtctl/cli/do_sweep.py @@ -80,7 +80,16 @@ def endpoints(self) -> list[Endpoint]: @functools.cached_property def backend_processes(self) -> list[Process]: """Compute physical process topology from endpoints (cached).""" - return self.backend.endpoints_to_processes(self.endpoints) + # NOTE: On shared clusters, fixed DYN_SYSTEM_PORT ranges can collide across jobs + # and crash dynamo.sglang with "Address already in use". Use a job-specific base. + try: + return self.backend.endpoints_to_processes( + self.endpoints, + base_sys_port=self.runtime.sys_port_base, + ) + except TypeError: + # Backends that don't accept base_sys_port keep their default behavior. + return self.backend.endpoints_to_processes(self.endpoints) def start_head_infrastructure(self, registry: ProcessRegistry) -> ManagedProcess: """Start NATS and etcd on the infra node. @@ -130,14 +139,16 @@ def start_head_infrastructure(self, registry: ProcessRegistry) -> ManagedProcess critical=True, ) - # 300s timeout to handle slow container imports on first run + # NOTE: Starting infra requires an `srun` into the container image. + # On busy clusters, `pyxis` image import can easily exceed 60s, so keep this + # timeout comfortably larger than the container startup overhead. logger.info("Waiting for NATS (port 4222) on %s...", infra_node) - if not wait_for_port(infra_node, 4222, timeout=300): + if not wait_for_port(self.runtime.infra_node_ip, 4222, timeout=300): raise RuntimeError("NATS failed to start") logger.info("NATS is ready") logger.info("Waiting for etcd (port 2379) on %s...", infra_node) - if not wait_for_port(infra_node, 2379, timeout=300): + if not wait_for_port(self.runtime.infra_node_ip, 2379, timeout=300): raise RuntimeError("etcd failed to start") logger.info("etcd is ready") @@ -154,7 +165,10 @@ def _print_connection_info(self) -> None: logger.info("=" * 60) logger.info("Connection Commands") logger.info("=" * 60) - logger.info("Frontend URL: http://%s:8000", self.runtime.nodes.head) + if self.runtime.effective_frontend_type == "direct": + logger.info("Worker URL: http://%s:%d", self.runtime.nodes.head, self.runtime.frontend_port) + else: + logger.info("Frontend URL: http://%s:%d", self.runtime.nodes.head, self.runtime.frontend_port) logger.info("") logger.info("To connect to head node (%s):", self.runtime.nodes.head) logger.info( @@ -211,8 +225,9 @@ def run(self) -> int: try: # Stage 1: Head infrastructure (NATS, etcd) reporter.report(JobStatus.STARTING, JobStage.HEAD_INFRASTRUCTURE, "Starting head infrastructure") - head_proc = self.start_head_infrastructure(registry) - registry.add_process(head_proc) + if self.runtime.effective_frontend_type != "direct": + head_proc = self.start_head_infrastructure(registry) + registry.add_process(head_proc) # Stage 2: Workers reporter.report(JobStatus.WORKERS, JobStage.WORKERS, "Starting workers") diff --git a/src/srtctl/cli/mixins/benchmark_stage.py b/src/srtctl/cli/mixins/benchmark_stage.py index 489f75b8..14001ca7 100644 --- a/src/srtctl/cli/mixins/benchmark_stage.py +++ b/src/srtctl/cli/mixins/benchmark_stage.py @@ -78,13 +78,13 @@ def run_benchmark( hc = self.config.health_check if not wait_for_model( host=self.runtime.nodes.head, - port=8000, + port=self.runtime.frontend_port, n_prefill=n_prefill, n_decode=n_decode, poll_interval=float(hc.interval_seconds), timeout=float(hc.max_attempts * hc.interval_seconds), report_every=60.0, - frontend_type=self.config.frontend.type, + frontend_type=self.runtime.effective_frontend_type, stop_event=stop_event, ): logger.error("Server did not become healthy") @@ -114,7 +114,7 @@ def run_benchmark( if benchmark_type == "manual": logger.info("Benchmark type is 'manual' - server is ready for testing") - logger.info("Frontend URL: http://%s:8000", self.runtime.nodes.head) + logger.info("Frontend URL: http://%s:%d", self.runtime.nodes.head, self.runtime.frontend_port) logger.info("Press Ctrl+C to stop the job") while not stop_event.is_set(): diff --git a/src/srtctl/cli/mixins/frontend_stage.py b/src/srtctl/cli/mixins/frontend_stage.py index 25a59dff..eab493bd 100644 --- a/src/srtctl/cli/mixins/frontend_stage.py +++ b/src/srtctl/cli/mixins/frontend_stage.py @@ -183,6 +183,11 @@ def start_frontend(self, registry: "ProcessRegistry") -> list[ManagedProcess]: List of ManagedProcess instances for all frontend processes. """ logger.info("Starting frontend layer") + # Direct-to-worker mode: skip nginx/router/frontend entirely + if self.runtime.effective_frontend_type == "direct": + logger.info("Frontend disabled (effective_frontend_type=direct)") + return [] + topology = self._compute_frontend_topology() processes: list[ManagedProcess] = [] @@ -191,8 +196,8 @@ def start_frontend(self, registry: "ProcessRegistry") -> list[ManagedProcess]: nginx_proc = self._start_nginx(topology) processes.append(nginx_proc) - # Get frontend implementation based on config type - frontend_impl = get_frontend(self.config.frontend.type) + # Get frontend implementation based on effective type + frontend_impl = get_frontend(self.runtime.effective_frontend_type) frontend_procs = frontend_impl.start_frontends( topology=topology, runtime=self.runtime, diff --git a/src/srtctl/cli/mixins/worker_stage.py b/src/srtctl/cli/mixins/worker_stage.py index 7367d539..e00991d5 100644 --- a/src/srtctl/cli/mixins/worker_stage.py +++ b/src/srtctl/cli/mixins/worker_stage.py @@ -57,7 +57,7 @@ def _build_worker_preamble(self) -> str | None: Runs (in order): 1. Custom setup script from /configs/ (if config.setup_script set) - 2. Dynamo installation (if frontend type is dynamo and not profiling) + 2. Dynamo installation (if effective frontend type is dynamo and not profiling) """ parts = [] @@ -72,7 +72,11 @@ def _build_worker_preamble(self) -> str | None: # 2. Dynamo installation (required for dynamo.sglang when using dynamo frontend and not profiling) # When profiling is enabled, we use sglang.launch_server directly (no dynamo) # Skip if dynamo.install is False (container already has dynamo installed) - if self.config.frontend.type == "dynamo" and not self.config.profiling.enabled and self.config.dynamo.install: + if ( + self.runtime.effective_frontend_type == "dynamo" + and not self.config.profiling.enabled + and self.config.dynamo.install + ): parts.append(self.config.dynamo.get_install_commands()) if not parts: @@ -103,7 +107,7 @@ def start_worker(self, process: "Process", endpoint_processes: list["Process"]) process=process, endpoint_processes=endpoint_processes, runtime=self.runtime, - frontend_type=self.config.frontend.type, + frontend_type=self.runtime.effective_frontend_type, profiling_enabled=profiling.enabled, nsys_prefix=nsys_prefix, dump_config_path=config_dump, @@ -219,7 +223,7 @@ def start_endpoint_worker(self, endpoint_processes: list["Process"]) -> ManagedP process=leader, endpoint_processes=endpoint_processes, runtime=self.runtime, - frontend_type=self.config.frontend.type, + frontend_type=self.runtime.effective_frontend_type, profiling_enabled=profiling.enabled, nsys_prefix=nsys_prefix, dump_config_path=config_dump, diff --git a/src/srtctl/cli/setup_head.py b/src/srtctl/cli/setup_head.py index 91582e3f..54d04d2b 100644 --- a/src/srtctl/cli/setup_head.py +++ b/src/srtctl/cli/setup_head.py @@ -119,10 +119,11 @@ def setup_logging(): ) -def start_nats(binary_path: str = "/configs/nats-server") -> subprocess.Popen: +def start_nats(host_ip: str, binary_path: str = "/configs/nats-server") -> subprocess.Popen: """Start NATS server. Args: + host_ip: IP address of this node (for binding) binary_path: Path to nats-server binary Returns: @@ -139,7 +140,21 @@ def start_nats(binary_path: str = "/configs/nats-server") -> subprocess.Popen: os.makedirs(nats_store_dir, exist_ok=True) logger.info("Starting NATS server...") - cmd = [binary_path, "-js", "-sd", nats_store_dir] + # IMPORTANT: + # The orchestrator and workers connect via `nats://:4222`. + # If NATS only listens on localhost, `wait_for_port(...)` will fail. + # Bind explicitly to the node's cluster/private interface so it is reachable via hostname/IP + # without exposing NATS on all interfaces. + cmd = [ + binary_path, + "-js", + "-sd", + nats_store_dir, + "-a", + host_ip, + "-p", + str(NATS_PORT), + ] proc = subprocess.Popen( cmd, @@ -264,21 +279,21 @@ def main(): etcd_proc = None try: - nats_proc = start_nats(args.nats_binary) + nats_proc = start_nats(host_ip, args.nats_binary) etcd_proc = start_etcd(host_ip, args.etcd_binary, log_dir) # Wait for services - if not wait_for_service("localhost", NATS_PORT, "NATS"): + if not wait_for_service(host_ip, NATS_PORT, "NATS"): logger.error("NATS failed to start") sys.exit(1) - if not wait_for_service("localhost", ETCD_CLIENT_PORT, "etcd"): + if not wait_for_service(host_ip, ETCD_CLIENT_PORT, "etcd"): logger.error("etcd failed to start") sys.exit(1) logger.info("Head node infrastructure is ready") - logger.info(" NATS: nats://localhost:%d", NATS_PORT) - logger.info(" etcd: http://localhost:%d", ETCD_CLIENT_PORT) + logger.info(" NATS: nats://%s:%d", host_ip, NATS_PORT) + logger.info(" etcd: http://%s:%d", host_ip, ETCD_CLIENT_PORT) # Keep running - wait for either process to exit while True: diff --git a/src/srtctl/core/health.py b/src/srtctl/core/health.py index c362945c..f4cb1c78 100644 --- a/src/srtctl/core/health.py +++ b/src/srtctl/core/health.py @@ -366,6 +366,54 @@ def wait_for_model( Returns: True if model is ready with expected workers, False if timeout/aborted """ + if frontend_type in ("direct", "none"): + # Direct-to-worker mode (no router/frontend). + # Treat model as ready when /health is 200 and /v1/models returns at least one model. + health_url = f"http://{host}:{port}/health" + models_url = f"http://{host}:{port}/v1/models" + logger.info( + "Polling %s every %.1fs for direct worker readiness (frontend=direct)", + health_url, + poll_interval, + ) + + start_time = time.time() + last_report_time = start_time + + while True: + if stop_event and stop_event.is_set(): + logger.warning("Wait for model aborted by stop event") + return False + + elapsed = time.time() - start_time + if elapsed >= timeout: + logger.error("Model did not get healthy in %.0f seconds", timeout) + return False + + try: + r = requests.get(health_url, timeout=5.0) + if r.status_code == 200: + mr = requests.get(models_url, timeout=5.0) + if mr.status_code == 200: + data = mr.json() + models = data.get("data", []) + if models: + logger.info("Model is ready (direct worker). Found %d model(s).", len(models)) + return True + + if time.time() - last_report_time >= report_every: + logger.info("Waiting for direct worker to become ready...") + last_report_time = time.time() + + except requests.exceptions.RequestException as e: + if time.time() - last_report_time >= report_every: + logger.debug("Health check failed: %s", e) + last_report_time = time.time() + except Exception as e: + logger.debug("Unexpected error during health check: %s", e) + + time.sleep(poll_interval) + if frontend_type == "sglang": health_url = f"http://{host}:{port}/workers" logger.info( diff --git a/src/srtctl/core/ip_utils/__init__.py b/src/srtctl/core/ip_utils/__init__.py index 19a39dd0..a5593679 100644 --- a/src/srtctl/core/ip_utils/__init__.py +++ b/src/srtctl/core/ip_utils/__init__.py @@ -11,6 +11,7 @@ import logging import subprocess +import re from pathlib import Path logger = logging.getLogger(__name__) @@ -99,8 +100,26 @@ def get_node_ip( ) if success and output: - logger.debug("Resolved IP for %s: %s", node, output) - return output + # Some clusters emit step banners/prolog output into stdout/stderr for srun + # (e.g., "GpuFreq=control_disabled"). Robustly extract a usable IPv4. + candidates = re.findall(r"(?:\d{1,3}\.){3}\d{1,3}", output) + if candidates: + # Prefer RFC1918 private addresses first. + private = [ + ip + for ip in candidates + if ip.startswith("10.") + or ip.startswith("192.168.") + or re.match(r"^172\.(1[6-9]|2\d|3[0-1])\.", ip) + ] + ip = (private[0] if private else candidates[0]).strip() + logger.debug("Resolved IP for %s: %s", node, ip) + return ip + + # Fallback: return raw output (may already be an IP/hostname) + cleaned = output.strip() + logger.debug("Resolved IP for %s (no IPv4 found): %s", node, cleaned) + return cleaned else: logger.error("Failed to get IP for node %s: %s", node, output) return None diff --git a/src/srtctl/core/runtime.py b/src/srtctl/core/runtime.py index 3e68bdd5..9a7e3720 100644 --- a/src/srtctl/core/runtime.py +++ b/src/srtctl/core/runtime.py @@ -97,8 +97,8 @@ class RuntimeContext: # Computed paths (all absolute) log_dir: Path - model_path: Path # For HF models (hf:prefix), this is the HF model ID as a Path - container_image: Path + model_path: Path + container_image: str | Path # Resource configuration gpus_per_node: int @@ -117,9 +117,21 @@ class RuntimeContext: # Environment variables environment: dict[str, str] = field(default_factory=dict) + # Effective frontend type used by the orchestrator. + # This is inferred under the hood based on backend + serving mode. + # Examples: + # - "dynamo": dynamo frontend (/health) + # - "sglang": sglang router (/workers) + # - "direct": no router; benchmark/health hit the worker OAI server directly + effective_frontend_type: str = "dynamo" + # Frontend port (for benchmark endpoint) frontend_port: int = 8000 + # Base port for Dynamo system status servers (DYN_SYSTEM_PORT). + # Must be unique per-job on shared clusters to avoid "Address already in use" collisions. + sys_port_base: int = 8081 + @classmethod def from_config( cls, @@ -149,6 +161,44 @@ def from_config( head_node_ip = get_hostname_ip(nodes.head) infra_node_ip = get_hostname_ip(nodes.infra) + # Infer frontend behavior under the hood. + # Stock SGLang variants (frontend.type: "sglang"): + # - Aggregated, agg_workers=1: run only sglang.launch_server (no router) -> hit worker directly + # - Aggregated, agg_workers>1: use sglang router + # - Disaggregated: use sglang router + # + # Dynamo runs (frontend.type: "dynamo"): keep existing behavior. + r = config.resources + backend_type = getattr(config.backend, "type", "sglang") + if backend_type == "sglang" and config.frontend.type == "sglang": + # Stock SGLang behavior + if (not r.is_disaggregated) and r.num_agg == 1: + effective_frontend_type = "direct" + else: + # Disagg OR multi-agg => router + effective_frontend_type = "sglang" + else: + # Dynamo (and any other backend/frontends): preserve user config + effective_frontend_type = config.frontend.type + + # Endpoint port that benchmarks/health checks should target. + # - Router/frontends listen on 8000 (or nginx public port) + # - Direct-to-worker mode: hit the agg worker leader HTTP port (allocated from 30000) + frontend_port = 30000 if effective_frontend_type == "direct" else 8000 + + # Choose a job-specific system-port base to avoid collisions across jobs on shared nodes. + # We reserve a small contiguous range per job and let process allocation increment within it. + # + # IMPORTANT: Dynamo expects DYN_SYSTEM_PORT to fit in a signed 16-bit int (i16), + # so keep the base <= 32767. We also reserve 100 ports per job. + # + # Range: 10000..29900 (step 100). This gives 100 ports per job and stays < 32767. + try: + job_seed = int(job_id) + except Exception: + job_seed = abs(hash(job_id)) + sys_port_base = 10000 + (job_seed % 200) * 100 + # Compute log directory using FormattablePath or default logic # Check for SRTCTL_OUTPUT_DIR from sbatch script first (ensures consistency) output_dir_env = os.environ.get("SRTCTL_OUTPUT_DIR") @@ -193,8 +243,9 @@ def from_config( if not container_image.is_file(): raise ValueError(f"Container image path is not a file: {container_image}") else: - # Image name (e.g., nvcr.io/nvidia/pytorch:23.12) - keep as string, convert to Path for type compatibility - container_image = Path(container_image_str) + # Image name (e.g., nvcr.io/nvidia/pytorch:23.12 or docker://lmsysorg/sglang:v0.5.5) + # Keep as a plain string; Path(...) would mangle schemes like docker:// into docker:/ + container_image = container_image_str # Build container mounts container_mounts: dict[Path, Path] = { @@ -212,6 +263,17 @@ def from_config( if configs_dir.exists(): container_mounts[configs_dir.resolve()] = Path("/configs") + # Workaround for some pyxis/enroot environments that source /root/.cargo/env on container start. + # Provide an empty file so container startup does not fail if it is missing. + cargo_env = log_dir / "cargo_env" + try: + cargo_env.parent.mkdir(parents=True, exist_ok=True) + cargo_env.touch(exist_ok=True) + container_mounts[cargo_env.resolve()] = Path("/root/.cargo/env") + except Exception: + # Best-effort; if we cannot create/mount this file, proceed and let pyxis report the error. + pass + # Mount srtctl benchmark scripts from srtctl.benchmarks.base import SCRIPTS_DIR @@ -249,6 +311,9 @@ def from_config( srun_options=dict(config.srun_options), environment=dict(config.environment), is_hf_model=is_hf_model, + effective_frontend_type=effective_frontend_type, + frontend_port=frontend_port, + sys_port_base=sys_port_base, ) # Expand FormattablePath mounts @@ -272,6 +337,9 @@ def from_config( srun_options=dict(config.srun_options), environment=dict(config.environment), is_hf_model=is_hf_model, + effective_frontend_type=effective_frontend_type, + frontend_port=frontend_port, + sys_port_base=sys_port_base, ) def format_string(self, template: str, **extra_kwargs) -> str: