Skip to content
This repository was archived by the owner on Apr 20, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/srtctl/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from pathlib import Path

from srtctl.core.runtime import RuntimeContext
from srtctl.core.topology import Endpoint, Process
from srtctl.core.topology import Endpoint, NodePortAllocator, Process


class BackendType(str, Enum):
Expand Down Expand Up @@ -90,6 +90,7 @@ def endpoints_to_processes(
self,
endpoints: list["Endpoint"],
base_sys_port: int = 8081,
port_allocator: "NodePortAllocator | None" = None,
) -> list["Process"]:
"""Convert logical endpoints to physical processes."""
...
Expand Down
26 changes: 22 additions & 4 deletions src/srtctl/backends/sglang.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
if TYPE_CHECKING:
from srtctl.backends.base import SrunConfig
from srtctl.core.runtime import RuntimeContext
from srtctl.core.topology import Endpoint, Process
from srtctl.core.topology import Endpoint, NodePortAllocator, Process

# Type alias for worker modes
WorkerMode = Literal["prefill", "decode", "agg"]
Expand Down Expand Up @@ -198,11 +198,12 @@ def endpoints_to_processes(
self,
endpoints: list["Endpoint"],
base_sys_port: int = 8081,
port_allocator: "NodePortAllocator | None" = None,
) -> list["Process"]:
"""Convert endpoints to processes."""
from srtctl.core.topology import endpoints_to_processes

return endpoints_to_processes(endpoints, base_sys_port=base_sys_port)
return endpoints_to_processes(endpoints, base_sys_port=base_sys_port, port_allocator=port_allocator)

def build_worker_command(
self,
Expand Down Expand Up @@ -270,7 +271,24 @@ def build_worker_command(
cmd.extend(["--disaggregation-mode", mode])
# Bootstrap port only needed for sglang frontend (dynamo handles internally)
if frontend_type == "sglang" and mode == "prefill" and process.bootstrap_port is not None:
cmd.extend(["--disaggregation-bootstrap-port", str(process.bootstrap_port)])
user_bootstrap_port = config.get("disaggregation-bootstrap-port")
if user_bootstrap_port is None:
cmd.extend(["--disaggregation-bootstrap-port", str(process.bootstrap_port)])
else:
try:
user_port_int = int(user_bootstrap_port)
except (TypeError, ValueError) as e:
raise ValueError(
f"Invalid disaggregation-bootstrap-port={user_bootstrap_port!r} in sglang_config.prefill"
) from e
if user_port_int != process.bootstrap_port:
raise ValueError(
"disaggregation-bootstrap-port mismatch for sglang prefill worker: "
f"config={user_port_int}, topology={process.bootstrap_port}. "
"For sglang router frontend, router and prefill workers must use the same bootstrap port. "
"If you run multiple prefill workers on the same node, do not set a fixed "
"disaggregation-bootstrap-port in the recipe."
)

# Add multi-node coordination flags
if is_multi_node:
Expand All @@ -297,7 +315,7 @@ def build_worker_command(
kv_cfg["endpoint"] = f"tcp://*:{process.kv_events_port}"
cmd.extend(["--kv-events-config", json.dumps(kv_cfg)])

# Add all config flags
# Add all config flags.
cmd.extend(_config_to_cli_args(config))

return cmd
Expand Down
91 changes: 76 additions & 15 deletions src/srtctl/cli/do_sweep.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
)
from srtctl.core.runtime import RuntimeContext
from srtctl.core.schema import SrtConfig
from srtctl.core.slurm import get_slurm_job_id, start_srun_process
from srtctl.core.slurm import get_port_offset, get_slurm_job_id, start_srun_process
from srtctl.core.status import JobStage, JobStatus, StatusReporter
from srtctl.core.topology import Endpoint, Process
from srtctl.core.topology import Endpoint, NodePortAllocator, Process
from srtctl.logging_utils import setup_logging

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -80,14 +80,67 @@ def endpoints(self) -> list[Endpoint]:
@functools.cached_property
def backend_processes(self) -> list[Process]:
"""Compute physical process topology from endpoints (cached)."""
return self.backend.endpoints_to_processes(self.endpoints)

def start_head_infrastructure(self, registry: ProcessRegistry) -> ManagedProcess:
"""Start NATS and etcd on the infra node.

When etcd_nats_dedicated_node is enabled, services run on a dedicated node.
Otherwise, they run on the head node (default behavior).
# DYN_SYSTEM_PORT is parsed as i16 by dynamo runtime, so keep ports < 32768.
# Also avoid collisions across concurrent jobs by offsetting from the job id.
#
# Note: srtctl allocates one sys_port per Process and increments sequentially from base_sys_port.
# Therefore, base_sys_port must reserve a sufficiently large consecutive port window per job
# to avoid collisions with other jobs running concurrently.
#
# Use get_port_offset() for consistency with other services (NATS, etcd, frontend).
# get_port_offset returns 0-990 in steps of 10, giving 100 slots.
# Each slot needs ~200 ports for sys_port allocation, so we multiply offset by 20.
port_offset = get_port_offset(self.runtime.job_id)
sys_port_stride = 200 # Reserved consecutive sys ports per job.
base_sys_port = 9000 + (port_offset * 20) # Range: 9000-28800, step 200

port_allocator: NodePortAllocator | None = None
if self.config.frontend.type == "sglang" and getattr(self.backend, "type", None) == "sglang":
prefill_cfg: dict[str, object] = {}
try:
prefill_cfg = self.backend.get_config_for_mode("prefill") # type: ignore[assignment]
except Exception:
prefill_cfg = {}

user_bootstrap_port = prefill_cfg.get("disaggregation-bootstrap-port")
if user_bootstrap_port is not None:
try:
base_bootstrap_port = int(user_bootstrap_port)
except (TypeError, ValueError):
logger.warning(
"Invalid disaggregation-bootstrap-port=%r; falling back to default bootstrap port allocation",
user_bootstrap_port,
)
else:
port_allocator = NodePortAllocator(base_bootstrap_port=base_bootstrap_port)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

processes = self.backend.endpoints_to_processes(
self.endpoints,
base_sys_port=base_sys_port,
port_allocator=port_allocator,
)
if len(processes) > sys_port_stride:
logger.warning(
"This job allocates %d processes, which may exceed the reserved sys_port window (%d). "
"Consider increasing sys_port_stride to reduce cross-job collision risk.",
len(processes),
sys_port_stride,
)
return processes

def start_head_infrastructure(self, registry: ProcessRegistry) -> ManagedProcess | None:
"""Start head node infrastructure when required by the chosen frontend.

Dynamo frontend requires NATS+etcd for discovery/control planes.
SGLang frontend uses direct worker connections and does not require these services.
"""
if self.config.frontend.type != "dynamo":
logger.info(
"Skipping head node infrastructure (frontend.type=%s does not require NATS/etcd)",
self.config.frontend.type,
)
return None

infra_node = self.runtime.nodes.infra
logger.info("Starting infrastructure services (NATS, etcd)")
logger.info("Infra node: %s", infra_node)
Expand Down Expand Up @@ -130,14 +183,19 @@ def start_head_infrastructure(self, registry: ProcessRegistry) -> ManagedProcess
critical=True,
)

port_offset = get_port_offset(self.runtime.job_id)
nats_port = 4222 + port_offset
etcd_port = 2379 + port_offset
logger.info("Port offset for this job: %d (job_id: %s)", port_offset, self.runtime.job_id)

# 300s timeout to handle slow container imports on first run
logger.info("Waiting for NATS (port 4222) on %s...", infra_node)
if not wait_for_port(infra_node, 4222, timeout=300):
logger.info("Waiting for NATS (port %d) on %s...", nats_port, infra_node)
if not wait_for_port(infra_node, nats_port, timeout=300):
raise RuntimeError("NATS failed to start")
logger.info("NATS is ready")

logger.info("Waiting for etcd (port 2379) on %s...", infra_node)
if not wait_for_port(infra_node, 2379, timeout=300):
logger.info("Waiting for etcd (port %d) on %s...", etcd_port, infra_node)
if not wait_for_port(infra_node, etcd_port, timeout=300):
raise RuntimeError("etcd failed to start")
logger.info("etcd is ready")

Expand All @@ -150,11 +208,13 @@ def _print_connection_info(self) -> None:
if mounts_str:
container_args += f" --container-mounts={mounts_str}"

public_port = self.runtime.frontend_port

logger.info("")
logger.info("=" * 60)
logger.info("Connection Commands")
logger.info("=" * 60)
logger.info("Frontend URL: http://%s:8000", self.runtime.nodes.head)
logger.info("Frontend URL: http://%s:%d", self.runtime.nodes.head, public_port)
logger.info("")
logger.info("To connect to head node (%s):", self.runtime.nodes.head)
logger.info(
Expand Down Expand Up @@ -206,7 +266,8 @@ def run(self) -> int:
# Stage 1: Head infrastructure (NATS, etcd)
reporter.report(JobStatus.STARTING, JobStage.HEAD_INFRASTRUCTURE, "Starting head infrastructure")
head_proc = self.start_head_infrastructure(registry)
registry.add_process(head_proc)
if head_proc is not None:
registry.add_process(head_proc)

# Stage 2: Workers
reporter.report(JobStatus.WORKERS, JobStage.WORKERS, "Starting workers")
Expand Down
4 changes: 2 additions & 2 deletions src/srtctl/cli/mixins/benchmark_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def run_benchmark(
hc = self.config.health_check
if not wait_for_model(
host=self.runtime.nodes.head,
port=8000,
port=self.runtime.frontend_port,
Comment thread
YAMY1234 marked this conversation as resolved.
n_prefill=n_prefill,
n_decode=n_decode,
poll_interval=float(hc.interval_seconds),
Expand Down Expand Up @@ -106,7 +106,7 @@ def run_benchmark(

if benchmark_type == "manual":
logger.info("Benchmark type is 'manual' - server is ready for testing")
logger.info("Frontend URL: http://%s:8000", self.runtime.nodes.head)
logger.info("Frontend URL: http://%s:%d", self.runtime.nodes.head, self.runtime.frontend_port)
logger.info("Press Ctrl+C to stop the job")

while not stop_event.is_set():
Expand Down
28 changes: 22 additions & 6 deletions src/srtctl/cli/mixins/frontend_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def backend(self) -> Any:
@property
def backend_processes(self) -> list["Process"]:
"""Compute physical process topology from endpoints (cached)."""
...
raise NotImplementedError

def _compute_frontend_topology(self) -> FrontendTopology:
"""Determine where nginx and frontends should run.
Expand All @@ -76,20 +76,33 @@ def _compute_frontend_topology(self) -> FrontendTopology:
- Single node OR multiple_frontends disabled: 1 frontend on head, no nginx
- 2+ nodes AND multiple_frontends enabled: nginx on head, frontends on other nodes

Port offset based on job_id avoids conflicts between different SLURM jobs.

Returns:
FrontendTopology describing where to run nginx and frontends.
"""
from srtctl.core.slurm import get_port_offset

nodes = self.runtime.nodes.worker
head = self.runtime.nodes.head
fe_config = self.config.frontend

# Calculate port offset to avoid conflicts between jobs
port_offset = get_port_offset(self.runtime.job_id)

# Base ports with offset
# Note: base_internal_port must not conflict with DYN_SYSTEM_PORT (8081+offset+worker_idx)
# With many workers, DYN_SYSTEM_PORT can reach 8081+offset+N, so use 9090 to stay clear
base_public_port = 8000 + port_offset
base_internal_port = 9090 + port_offset
Comment thread
YAMY1234 marked this conversation as resolved.

# Single node or multiple frontends disabled: single frontend, no nginx
if len(nodes) == 1 or not fe_config.enable_multiple_frontends:
return FrontendTopology(
nginx_node=None,
frontend_nodes=[head],
frontend_port=8000,
public_port=8000,
frontend_port=base_public_port,
public_port=base_public_port,
)

# Multiple nodes with multiple frontends enabled:
Expand All @@ -104,17 +117,19 @@ def _compute_frontend_topology(self) -> FrontendTopology:
frontend_nodes = other_nodes[:max_frontends]

logger.info(
"Frontend topology: nginx on %s, %d frontends on %s",
"Frontend topology: nginx on %s (port %d), %d frontends on %s (port %d)",
head,
base_public_port,
len(frontend_nodes),
frontend_nodes,
base_internal_port,
)

return FrontendTopology(
nginx_node=head,
frontend_nodes=frontend_nodes,
frontend_port=8180, # Internal port behind nginx
public_port=8000, # Public port exposed by nginx
frontend_port=base_internal_port, # Internal port behind nginx
public_port=base_public_port, # Public port exposed by nginx
)

def _start_nginx(self, topology: FrontendTopology) -> ManagedProcess:
Expand All @@ -132,6 +147,7 @@ def _start_nginx(self, topology: FrontendTopology) -> ManagedProcess:

# Install nginx and run it (daemon off keeps nginx in foreground so srun can manage it)
# Use container path (/logs) since log_dir is mounted there
# Add retry logic for apt-get in case of mirror sync issues
container_config_path = "/logs/nginx.conf"
cmd = [
"bash",
Expand Down
38 changes: 29 additions & 9 deletions src/srtctl/cli/mixins/worker_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,29 @@ def start_worker(self, process: "Process", endpoint_processes: list["Process"])
)

# Environment variables
env_to_set = {
"HEAD_NODE_IP": self.runtime.head_node_ip,
"ETCD_ENDPOINTS": f"http://{self.runtime.nodes.infra}:2379",
"NATS_SERVER": f"nats://{self.runtime.nodes.infra}:4222",
"DYN_SYSTEM_PORT": str(process.sys_port),
"DYN_REQUEST_PLANE": "nats",
}
env_to_set: dict[str, str] = {"HEAD_NODE_IP": self.runtime.head_node_ip}

# Only Dynamo workers require etcd/NATS + system status server port.
if self.config.frontend.type == "dynamo":
from srtctl.core.slurm import get_port_offset

port_offset = get_port_offset(self.runtime.job_id)
nats_port = 4222 + port_offset
etcd_port = 2379 + port_offset

env_to_set.update(
{
"ETCD_ENDPOINTS": f"http://{self.runtime.infra_node_ip}:{etcd_port}",
"NATS_SERVER": f"nats://{self.runtime.infra_node_ip}:{nats_port}",
"DYN_SYSTEM_PORT": str(process.sys_port),
}
)

# Keep request-plane consistent across frontend/workers
frontend_plane = None
if self.config.frontend.env:
frontend_plane = self.config.frontend.env.get("DYN_REQUEST_PLANE")
env_to_set["DYN_REQUEST_PLANE"] = frontend_plane if frontend_plane else "nats"

Comment on lines +113 to 136
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Offset/profile fixes are incomplete across worker launch modes.

start_worker() is updated, but start_endpoint_worker() still uses fixed infra ports (Line 250/Line 251) and host profiling path (Line 263). That can reintroduce cross-job conflicts in endpoint-launch mode.

🛠️ Proposed parity fix for endpoint-launch mode
@@
-        env_to_set = {
-            "HEAD_NODE_IP": self.runtime.head_node_ip,
-            "ETCD_ENDPOINTS": f"http://{self.runtime.nodes.infra}:2379",
-            "NATS_SERVER": f"nats://{self.runtime.nodes.infra}:4222",
-            "DYN_SYSTEM_PORT": str(leader.sys_port),
-        }
+        env_to_set = {"HEAD_NODE_IP": self.runtime.head_node_ip, "DYN_SYSTEM_PORT": str(leader.sys_port)}
+        if self.config.frontend.type == "dynamo":
+            from srtctl.core.slurm import get_port_offset
+            port_offset = get_port_offset(self.runtime.job_id)
+            env_to_set["ETCD_ENDPOINTS"] = f"http://{self.runtime.infra_node_ip}:{2379 + port_offset}"
+            env_to_set["NATS_SERVER"] = f"nats://{self.runtime.infra_node_ip}:{4222 + port_offset}"
@@
-        if profiling.enabled:
-            profile_dir = str(self.runtime.log_dir / "profiles")
-            env_to_set.update(profiling.get_env_vars(mode, profile_dir))
+        if profiling.enabled:
+            env_to_set.update(profiling.get_env_vars(mode, "/logs/profiles"))

Also applies to: 158-160

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/srtctl/cli/mixins/worker_stage.py` around lines 113 - 136,
start_endpoint_worker() still uses hardcoded infra ports and a fixed host
profiling path, which breaks parity with start_worker(); update
start_endpoint_worker() to mirror the env construction in start_worker(): import
and call get_port_offset(self.runtime.job_id) to compute port_offset, derive
nats_port and etcd_port (4222/2379 + port_offset), set ETCD_ENDPOINTS and
NATS_SERVER using self.runtime.infra_node_ip, set DYN_SYSTEM_PORT from
process.sys_port, and set DYN_REQUEST_PLANE from self.config.frontend.env
(falling back to "nats"); also replace the hardcoded host profiling path with
the runtime-backed profiling path (use the same runtime attribute start_worker
uses). Apply the same changes to the other endpoint-launch occurrences noted
(the other block around start_endpoint_worker usage).

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added TODO for trt-llm

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YAMY1234, thanks for adding the TODO — that's a good placeholder to track the gap. Just worth noting that the cross-job port conflict risk remains active for TRTLLM (endpoint-launch mode) until the TODO is resolved, so if TRTLLM jobs ever share nodes, they'll still collide on the hardcoded 2379/4222 ports. Whenever you revisit it, the proposed diff in the original comment should be a straightforward starting point.


🧠 Learnings used
Learnt from: qiching
Repo: ishandhanani/srt-slurm PR: 223
File: src/srtctl/backends/vllm.py:334-346
Timestamp: 2026-03-17T19:20:45.629Z
Learning: In ishandhanani/srt-slurm vLLM backend (src/srtctl/backends/vllm.py), the unconditional use of dynamo 1.0.0+ flags (--disaggregation-mode, --kv-transfer-config) without version gating is intentional. The default Dynamo version bump in src/srtctl/core/schema.py is deferred to a coordinated repo-wide 1.0 migration PR. Users running disaggregated vLLM are expected to specify the dynamo version explicitly in their job YAML. Adding version gating here was considered temporary scaffolding that would require plumbing dynamo_version through build_worker_command — complexity that belongs in the migration PR, not the vLLM connector/disagg-mode update.

# Add mode-specific environment variables from backend
# Support simple {node} and {node_id} templating
Expand All @@ -139,8 +155,9 @@ def __missing__(self, key: str) -> str:

# Add profiling environment variables
if profiling.enabled:
profile_dir = str(self.runtime.log_dir / "profiles")
env_to_set.update(profiling.get_env_vars(mode, profile_dir))
# /logs is the mounted host log directory inside the container.
profile_dir_in_container = "/logs/profiles"
env_to_set.update(profiling.get_env_vars(mode, profile_dir_in_container))

# Set CUDA_VISIBLE_DEVICES if not using all GPUs
if len(process.gpu_indices) < self.runtime.gpus_per_node:
Expand Down Expand Up @@ -228,6 +245,9 @@ def start_endpoint_worker(self, endpoint_processes: list["Process"]) -> ManagedP
)

# Environment variables
# TODO: port-offset is only applied in start_worker() (SGLang path).
# This MPI-style path (TRTLLM) still uses hardcoded NATS/etcd ports.
# If TRTLLM needs port-offset support, mirror the dynamo env logic from start_worker().
env_to_set = {
"HEAD_NODE_IP": self.runtime.head_node_ip,
"ETCD_ENDPOINTS": f"http://{self.runtime.nodes.infra}:2379",
Expand Down
Loading
Loading