Add job-based port offset to avoid cross-job port conflicts#195
Add job-based port offset to avoid cross-job port conflicts#195YAMY1234 wants to merge 4 commits intoishandhanani:mainfrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
✅ Files skipped from review due to trivial changes (1)
📝 WalkthroughWalkthroughThreads a SLURM-job-derived port offset through runtime, topology, backends, CLI orchestration, and infra startup; adds job-aware NodePortAllocator support, makes endpoints_to_processes accept an optional port allocator/job_id, and replaces fixed NATS/etcd/frontend ports with per-job dynamic ports. Changes
Sequence DiagramsequenceDiagram
participant SLURM as SLURM Job
participant Runtime as Runtime Setup
participant Orchestrator as Sweep Orchestrator
participant Allocator as NodePortAllocator
participant Infrastructure as Head Infra
participant Services as NATS/etcd
participant Backend as Backend/Frontend
SLURM->>Runtime: provide job_id
Runtime->>Runtime: offset = get_port_offset(job_id)
Runtime->>Runtime: frontend_port = 8000 + offset
Orchestrator->>Allocator: NodePortAllocator.with_job_offset(job_id)
Allocator->>Allocator: initialize ports with port_offset
Orchestrator->>Infrastructure: start_head_infrastructure(port_offset=offset)
Infrastructure->>Services: start_nats(port=4222+offset)
Infrastructure->>Services: start_etcd(client_port=2379+offset, peer_port=2380+offset)
Services-->>Infrastructure: ready
Orchestrator->>Backend: endpoints_to_processes(port_allocator=Allocator, job_id)
Backend->>Allocator: request HTTP/bootstrap/KV ports
Allocator-->>Backend: return ports (base + offset)
Backend-->>Orchestrator: Process list with offset ports
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
src/srtctl/cli/setup_head.py (1)
201-225:⚠️ Potential issue | 🔴 Critical
start_etcdcomputes a uniquedata_dirbut still runs on shared/tmp/etcd.At Line 204-211 you create a per-job directory, but Line 223-224 still points etcd to
/tmp/etcd(and Line 216-217 deletes it). This defeats job isolation and risks cross-job data loss.🛠️ Proposed fix
- if os.path.exists("/tmp/etcd"): - shutil.rmtree("/tmp/etcd") - etcd_data_dir = "/tmp/etcd" - os.makedirs(etcd_data_dir, exist_ok=True) + etcd_data_dir = str(data_dir) @@ "--data-dir", etcd_data_dir,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/srtctl/cli/setup_head.py` around lines 201 - 225, The code computes a per-job Path named data_dir but then overrides it by hardcoding /tmp/etcd (and even removing /tmp/etcd); update the start_etcd logic to use the computed data_dir for etcd storage: remove the shutil.rmtree("/tmp/etcd") and the hardcoded etcd_data_dir assignment, ensure data_dir is created (data_dir.mkdir(...)) and set etcd_data_dir to str(data_dir) before building cmd (the list that includes "--data-dir" and binary_path) so etcd uses the per-job directory instead of a shared /tmp/etcd.src/srtctl/core/topology.py (1)
107-113:⚠️ Potential issue | 🟠 MajorNIXL ports are not job-offset-aware yet.
next_nixl_port()starts frombase_nixl_portonly, so concurrent jobs can still collide on NIXL ports.🛠️ Proposed fix
if self._next_nixl_port == 0: - self._next_nixl_port = self.base_nixl_port + self._next_nixl_port = self.base_nixl_port + self.port_offset🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/srtctl/core/topology.py` around lines 107 - 113, next_nixl_port currently always seeds _next_nixl_port from base_nixl_port, causing port collisions across concurrent jobs; change it to be job-offset-aware by initializing _next_nixl_port to base_nixl_port + (self.job_offset * self.nixl_port_stride) (or similar per-job offset) instead of plain base_nixl_port, and ensure the class exposes/initializes self.job_offset and a constant self.nixl_port_stride (or use an existing per-job identifier like self.job_id or self.node_index to compute a unique offset) so each job allocates a disjoint NIXL port range before incrementing in next_nixl_port.src/srtctl/cli/mixins/frontend_stage.py (1)
68-70:⚠️ Potential issue | 🟠 Major
backend_processesproperty has empty body with unfulfilled return contract.At lines 68-70, this property is typed as
list["Process"]but contains only an ellipsis, causing it to implicitly returnNone. This violates the type contract and will failtytype checking. Align with the pattern used inworker_stage.pyandbenchmark_stage.pyby raisingNotImplementedError:`@property` def backend_processes(self) -> list["Process"]: """Compute physical process topology from endpoints (cached).""" - ... + raise NotImplementedError🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/srtctl/cli/mixins/frontend_stage.py` around lines 68 - 70, The backend_processes method currently contains only an ellipsis and returns None contrary to its declared return type list["Process"]; replace the ellipsis body with an explicit raise NotImplementedError to match the pattern used in worker_stage.py and benchmark_stage.py so callers and static typing know this method is intentionally unimplemented (refer to backend_processes for placement).src/srtctl/core/slurm.py (1)
168-187:⚠️ Potential issue | 🔴 Critical
start_srun_processhas a type mismatch that breakstytype checking.The parameter at line 177 declares
dict[Path, Path], butsrc/srtctl/cli/mixins/postprocess_stage.pypassesdict[str, str](e.g.,{str(self.runtime.log_dir): "/logs"}). The implementation converts both to strings regardless, so runtime works, but type checking fails. Widen the annotation to accept both patterns:Fix
-from collections.abc import Sequence +from collections.abc import Mapping, Sequence @@ - container_mounts: dict[Path, Path] | None = None, + container_mounts: Mapping[Path | str, Path | str] | None = None,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/srtctl/core/slurm.py` around lines 168 - 187, The type for the container_mounts parameter in start_srun_process is too narrow (dict[Path, Path]) and conflicts with callers that pass dict[str, str]; widen the annotation to accept both string and Path keys/values (e.g., dict[Path | str, Path | str] or Mapping[Path | str, Path | str]) and adjust imports if needed so type checkers accept it; keep the existing runtime behavior that converts keys/values to str.
🧹 Nitpick comments (2)
src/srtctl/cli/do_sweep.py (1)
117-121: Passjob_idexplicitly toendpoints_to_processesfor deterministic offset behavior.Right now allocator offset can fall back to env lookup when
port_allocatorisNone;self.runtime.job_idis already available and should be passed directly.♻️ Proposed refactor
processes = self.backend.endpoints_to_processes( self.endpoints, base_sys_port=base_sys_port, port_allocator=port_allocator, + job_id=self.runtime.job_id, )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/srtctl/cli/do_sweep.py` around lines 117 - 121, The call to backend.endpoints_to_processes should receive the job id explicitly to avoid port_allocator offset falling back to env lookup; update the call in do_sweep (where processes is assigned) to pass job_id=self.runtime.job_id (i.e., endpoints_to_processes(..., port_allocator=port_allocator, job_id=self.runtime.job_id)) so allocator offset behavior is deterministic even when port_allocator is None.src/srtctl/backends/sglang.py (1)
273-291: Validation logic is correct but flow is non-obvious.The validation ensures the user-configured port matches the topology-allocated port. When validation succeeds, the flag is added via
_config_to_cli_args(config)at line 319 rather than explicitly here. This works correctly but the implicit handoff could confuse future maintainers.Consider adding a brief comment noting that
_config_to_cli_argshandles the flag when user-specified:if user_port_int != process.bootstrap_port: raise ValueError( "disaggregation-bootstrap-port mismatch for sglang prefill worker: " f"config={user_port_int}, topology={process.bootstrap_port}. " "For sglang router frontend, router and prefill workers must use the same bootstrap port. " "If you run multiple prefill workers on the same node, do not set a fixed " "disaggregation-bootstrap-port in the recipe." ) + # Validated user port will be added by _config_to_cli_args below🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/srtctl/backends/sglang.py` around lines 273 - 291, The validation block for frontend_type == "sglang" in sglang.py correctly checks that a user-provided disaggregation-bootstrap-port matches process.bootstrap_port, but the code path that actually appends the CLI flag is handled later by _config_to_cli_args(config), which is non-obvious; add a short clarifying comment inside the current branch (near the user_bootstrap_port handling / the try/except and mismatch checks) stating that when validation passes the flag will be added later by _config_to_cli_args(config) (and that only when user_bootstrap_port is None we append the topology port immediately), so future maintainers understand the implicit handoff between this validation and the _config_to_cli_args function.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/srtctl/cli/do_sweep.py`:
- Around line 101-115: The code is calling int() on an object-typed value and
omitting the job-aware port offset; remove the unnecessary "type:
ignore[assignment]" on the prefill_cfg assignment, narrow/validate
user_bootstrap_port (e.g., if isinstance(user_bootstrap_port, (str, int)) or
convert via str(user_bootstrap_port) then int(...)) before casting to int to
satisfy typing, and when constructing NodePortAllocator pass the existing
port_offset variable (NodePortAllocator(..., port_offset=port_offset)) so port
allocation remains job-id-aware.
In `@src/srtctl/cli/mixins/frontend_stage.py`:
- Around line 96-97: The internal frontend port (base_internal_port) can collide
with worker sys-port windows; change its computation so it never falls inside
the worker window defined by base_sys_port = 9000 + port_offset*20 — e.g.
replace base_internal_port = 9090 + port_offset with a value guaranteed above
the reserved window such as base_internal_port = 9100 + port_offset (or compute
base_internal_port = base_sys_port + 20 + 90 if base_sys_port is available) so
base_internal_port, base_public_port, base_sys_port and port_offset are
consistent and non-overlapping.
In `@src/srtctl/cli/mixins/worker_stage.py`:
- Around line 113-136: start_endpoint_worker() still uses hardcoded infra ports
and a fixed host profiling path, which breaks parity with start_worker(); update
start_endpoint_worker() to mirror the env construction in start_worker(): import
and call get_port_offset(self.runtime.job_id) to compute port_offset, derive
nats_port and etcd_port (4222/2379 + port_offset), set ETCD_ENDPOINTS and
NATS_SERVER using self.runtime.infra_node_ip, set DYN_SYSTEM_PORT from
process.sys_port, and set DYN_REQUEST_PLANE from self.config.frontend.env
(falling back to "nats"); also replace the hardcoded host profiling path with
the runtime-backed profiling path (use the same runtime attribute start_worker
uses). Apply the same changes to the other endpoint-launch occurrences noted
(the other block around start_endpoint_worker usage).
In `@src/srtctl/cli/setup_head.py`:
- Line 209: There is a whitespace-only blank line triggering Ruff W293 in the
src/srtctl/cli/setup_head.py module; remove the blank line (trailing
spaces/newline containing only spaces/tabs) so the file has no whitespace-only
lines (e.g., in the setup_head module or the surrounding function where the
blank line appears) and re-save the file to clear the linter error.
- Around line 157-166: The code currently force-removes the global path
"/tmp/nats" via nats_store_dir and shutil.rmtree which can delete other jobs'
JetStream stores; change to create a job-unique temporary directory (e.g., use
tempfile.mkdtemp or tempfile.TemporaryDirectory with a prefix like "nats_" or
include the port/PID) instead of deleting "/tmp/nats", stop calling
shutil.rmtree on the global path, and ensure nats_store_dir points to this
unique temp directory (also update the cmd list usage of nats_store_dir);
optionally register a cleanup on process exit so the per-job dir is removed when
this server stops.
In `@src/srtctl/core/topology.py`:
- Around line 83-89: The next_http_port method increments per-node HTTP ports by
1000, causing allocations like base_http_port + port_offset + 1000 to collide
with the bootstrap range; update next_http_port (and its use of self._http_ports
and initial value computed from base_http_port and port_offset) to increment by
1 (or the correct small step for sequential ports) instead of 1000 so subsequent
allocations advance by a single port and do not overlap bootstrap ports.
---
Outside diff comments:
In `@src/srtctl/cli/mixins/frontend_stage.py`:
- Around line 68-70: The backend_processes method currently contains only an
ellipsis and returns None contrary to its declared return type list["Process"];
replace the ellipsis body with an explicit raise NotImplementedError to match
the pattern used in worker_stage.py and benchmark_stage.py so callers and static
typing know this method is intentionally unimplemented (refer to
backend_processes for placement).
In `@src/srtctl/cli/setup_head.py`:
- Around line 201-225: The code computes a per-job Path named data_dir but then
overrides it by hardcoding /tmp/etcd (and even removing /tmp/etcd); update the
start_etcd logic to use the computed data_dir for etcd storage: remove the
shutil.rmtree("/tmp/etcd") and the hardcoded etcd_data_dir assignment, ensure
data_dir is created (data_dir.mkdir(...)) and set etcd_data_dir to str(data_dir)
before building cmd (the list that includes "--data-dir" and binary_path) so
etcd uses the per-job directory instead of a shared /tmp/etcd.
In `@src/srtctl/core/slurm.py`:
- Around line 168-187: The type for the container_mounts parameter in
start_srun_process is too narrow (dict[Path, Path]) and conflicts with callers
that pass dict[str, str]; widen the annotation to accept both string and Path
keys/values (e.g., dict[Path | str, Path | str] or Mapping[Path | str, Path |
str]) and adjust imports if needed so type checkers accept it; keep the existing
runtime behavior that converts keys/values to str.
In `@src/srtctl/core/topology.py`:
- Around line 107-113: next_nixl_port currently always seeds _next_nixl_port
from base_nixl_port, causing port collisions across concurrent jobs; change it
to be job-offset-aware by initializing _next_nixl_port to base_nixl_port +
(self.job_offset * self.nixl_port_stride) (or similar per-job offset) instead of
plain base_nixl_port, and ensure the class exposes/initializes self.job_offset
and a constant self.nixl_port_stride (or use an existing per-job identifier like
self.job_id or self.node_index to compute a unique offset) so each job allocates
a disjoint NIXL port range before incrementing in next_nixl_port.
---
Nitpick comments:
In `@src/srtctl/backends/sglang.py`:
- Around line 273-291: The validation block for frontend_type == "sglang" in
sglang.py correctly checks that a user-provided disaggregation-bootstrap-port
matches process.bootstrap_port, but the code path that actually appends the CLI
flag is handled later by _config_to_cli_args(config), which is non-obvious; add
a short clarifying comment inside the current branch (near the
user_bootstrap_port handling / the try/except and mismatch checks) stating that
when validation passes the flag will be added later by
_config_to_cli_args(config) (and that only when user_bootstrap_port is None we
append the topology port immediately), so future maintainers understand the
implicit handoff between this validation and the _config_to_cli_args function.
In `@src/srtctl/cli/do_sweep.py`:
- Around line 117-121: The call to backend.endpoints_to_processes should receive
the job id explicitly to avoid port_allocator offset falling back to env lookup;
update the call in do_sweep (where processes is assigned) to pass
job_id=self.runtime.job_id (i.e., endpoints_to_processes(...,
port_allocator=port_allocator, job_id=self.runtime.job_id)) so allocator offset
behavior is deterministic even when port_allocator is None.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
src/srtctl/backends/base.pysrc/srtctl/backends/sglang.pysrc/srtctl/cli/do_sweep.pysrc/srtctl/cli/mixins/frontend_stage.pysrc/srtctl/cli/mixins/worker_stage.pysrc/srtctl/cli/setup_head.pysrc/srtctl/core/runtime.pysrc/srtctl/core/slurm.pysrc/srtctl/core/topology.pysrc/srtctl/frontends/dynamo.py
| env_to_set: dict[str, str] = {"HEAD_NODE_IP": self.runtime.head_node_ip} | ||
|
|
||
| # Only Dynamo workers require etcd/NATS + system status server port. | ||
| if self.config.frontend.type == "dynamo": | ||
| from srtctl.core.slurm import get_port_offset | ||
|
|
||
| port_offset = get_port_offset(self.runtime.job_id) | ||
| nats_port = 4222 + port_offset | ||
| etcd_port = 2379 + port_offset | ||
|
|
||
| env_to_set.update( | ||
| { | ||
| "ETCD_ENDPOINTS": f"http://{self.runtime.infra_node_ip}:{etcd_port}", | ||
| "NATS_SERVER": f"nats://{self.runtime.infra_node_ip}:{nats_port}", | ||
| "DYN_SYSTEM_PORT": str(process.sys_port), | ||
| } | ||
| ) | ||
|
|
||
| # Keep request-plane consistent across frontend/workers | ||
| frontend_plane = None | ||
| if self.config.frontend.env: | ||
| frontend_plane = self.config.frontend.env.get("DYN_REQUEST_PLANE") | ||
| env_to_set["DYN_REQUEST_PLANE"] = frontend_plane if frontend_plane else "nats" | ||
|
|
There was a problem hiding this comment.
Offset/profile fixes are incomplete across worker launch modes.
start_worker() is updated, but start_endpoint_worker() still uses fixed infra ports (Line 250/Line 251) and host profiling path (Line 263). That can reintroduce cross-job conflicts in endpoint-launch mode.
🛠️ Proposed parity fix for endpoint-launch mode
@@
- env_to_set = {
- "HEAD_NODE_IP": self.runtime.head_node_ip,
- "ETCD_ENDPOINTS": f"http://{self.runtime.nodes.infra}:2379",
- "NATS_SERVER": f"nats://{self.runtime.nodes.infra}:4222",
- "DYN_SYSTEM_PORT": str(leader.sys_port),
- }
+ env_to_set = {"HEAD_NODE_IP": self.runtime.head_node_ip, "DYN_SYSTEM_PORT": str(leader.sys_port)}
+ if self.config.frontend.type == "dynamo":
+ from srtctl.core.slurm import get_port_offset
+ port_offset = get_port_offset(self.runtime.job_id)
+ env_to_set["ETCD_ENDPOINTS"] = f"http://{self.runtime.infra_node_ip}:{2379 + port_offset}"
+ env_to_set["NATS_SERVER"] = f"nats://{self.runtime.infra_node_ip}:{4222 + port_offset}"
@@
- if profiling.enabled:
- profile_dir = str(self.runtime.log_dir / "profiles")
- env_to_set.update(profiling.get_env_vars(mode, profile_dir))
+ if profiling.enabled:
+ env_to_set.update(profiling.get_env_vars(mode, "/logs/profiles"))Also applies to: 158-160
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/srtctl/cli/mixins/worker_stage.py` around lines 113 - 136,
start_endpoint_worker() still uses hardcoded infra ports and a fixed host
profiling path, which breaks parity with start_worker(); update
start_endpoint_worker() to mirror the env construction in start_worker(): import
and call get_port_offset(self.runtime.job_id) to compute port_offset, derive
nats_port and etcd_port (4222/2379 + port_offset), set ETCD_ENDPOINTS and
NATS_SERVER using self.runtime.infra_node_ip, set DYN_SYSTEM_PORT from
process.sys_port, and set DYN_REQUEST_PLANE from self.config.frontend.env
(falling back to "nats"); also replace the hardcoded host profiling path with
the runtime-backed profiling path (use the same runtime attribute start_worker
uses). Apply the same changes to the other endpoint-launch occurrences noted
(the other block around start_endpoint_worker usage).
There was a problem hiding this comment.
Added TODO for trt-llm
There was a problem hiding this comment.
@YAMY1234, thanks for adding the TODO — that's a good placeholder to track the gap. Just worth noting that the cross-job port conflict risk remains active for TRTLLM (endpoint-launch mode) until the TODO is resolved, so if TRTLLM jobs ever share nodes, they'll still collide on the hardcoded 2379/4222 ports. Whenever you revisit it, the proposed diff in the original comment should be a straightforward starting point.
🧠 Learnings used
Learnt from: qiching
Repo: ishandhanani/srt-slurm PR: 223
File: src/srtctl/backends/vllm.py:334-346
Timestamp: 2026-03-17T19:20:45.629Z
Learning: In ishandhanani/srt-slurm vLLM backend (src/srtctl/backends/vllm.py), the unconditional use of dynamo 1.0.0+ flags (--disaggregation-mode, --kv-transfer-config) without version gating is intentional. The default Dynamo version bump in src/srtctl/core/schema.py is deferred to a coordinated repo-wide 1.0 migration PR. Users running disaggregated vLLM are expected to specify the dynamo version explicitly in their job YAML. Adding version gating here was considered temporary scaffolding that would require plumbing dynamo_version through build_worker_command — complexity that belongs in the migration PR, not the vLLM connector/disagg-mode update.
994f645 to
2c891ca
Compare
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
src/srtctl/core/topology.py (1)
107-113:⚠️ Potential issue | 🟠 MajorNIXL ports missing job-based offset may cause cross-job collisions.
next_nixl_port()initializes frombase_nixl_portwithout addingport_offset, unlikenext_kv_events_port()which applies the offset. This inconsistency means NIXL ports will collide across concurrent SLURM jobs on the same nodes.🛠️ Proposed fix
def next_nixl_port(self) -> int: """Get next available NIXL side channel port (globally unique across all nodes).""" if self._next_nixl_port == 0: - self._next_nixl_port = self.base_nixl_port + self._next_nixl_port = self.base_nixl_port + self.port_offset port = self._next_nixl_port self._next_nixl_port += 1 return port🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/srtctl/core/topology.py` around lines 107 - 113, The next_nixl_port method initializes _next_nixl_port from base_nixl_port but fails to apply the job-specific port_offset, causing cross-job collisions; update next_nixl_port to initialize _next_nixl_port as self.base_nixl_port + self.port_offset (same approach used in next_kv_events_port) so each job gets its offset-applied NIXL port range and then continue incrementing _next_nixl_port as before.src/srtctl/cli/mixins/frontend_stage.py (1)
67-70:⚠️ Potential issue | 🟡 MinorPipeline failure: Abstract property needs implementation stub.
The CI reports this property has an empty body but non-None return type. For mixin abstract properties, use
...or raiseNotImplementedError.🛠️ Proposed fix
`@property` def backend_processes(self) -> list["Process"]: """Compute physical process topology from endpoints (cached).""" - ... + raise NotImplementedError🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/srtctl/cli/mixins/frontend_stage.py` around lines 67 - 70, The property backend_processes currently has an empty body while declaring a non-None return type; replace the empty implementation with an explicit abstract stub so type checkers/CI are happy—update the backend_processes `@property` in src/srtctl/cli/mixins/frontend_stage.py to either contain an Ellipsis (...) or raise NotImplementedError() (prefer raising NotImplementedError for clarity) instead of an empty body.src/srtctl/cli/setup_head.py (1)
201-224:⚠️ Potential issue | 🔴 Critical
data_diris computed but never used—etcd still uses global/tmp/etcd.Lines 203-210 correctly compute a job-isolated
data_dir, but lines 216-219 override this by definingetcd_data_dir = "/tmp/etcd"and passing that to--data-diron line 224. The per-job directory logic is dead code.This means concurrent jobs will still delete each other's etcd data via the
shutil.rmtree("/tmp/etcd")call.🛠️ Proposed fix: Use the computed `data_dir` and remove the dead code
data_dir.mkdir(parents=True, exist_ok=True) logger.info("etcd data directory: %s", data_dir) - # Use /tmp for etcd data directory - this is typically on fast local storage - # (often tmpfs on HPC systems). Without this, etcd uses "default.etcd" in CWD - # which may be on slow network storage, causing Raft consensus timeouts. - if os.path.exists("/tmp/etcd"): - shutil.rmtree("/tmp/etcd") - etcd_data_dir = "/tmp/etcd" - os.makedirs(etcd_data_dir, exist_ok=True) + # Clean existing data directory for fresh start + if data_dir.exists(): + shutil.rmtree(data_dir) + data_dir.mkdir(parents=True, exist_ok=True) cmd = [ binary_path, "--data-dir", - etcd_data_dir, + str(data_dir), "--listen-client-urls",🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/srtctl/cli/setup_head.py` around lines 201 - 224, The computed per-job Path variable data_dir is never used and instead the hardcoded etcd_data_dir "/tmp/etcd" is created and passed to the command; fix by replacing uses of etcd_data_dir with the previously computed data_dir (ensure data_dir is str when inserted into cmd), remove the unconditional shutil.rmtree("/tmp/etcd") and any creation of the hardcoded /tmp/etcd, and update any os.makedirs call to create data_dir (use data_dir.mkdir(parents=True, exist_ok=True) or os.makedirs(str(data_dir), exist_ok=True)); ensure the cmd list passes the job-specific path (data_dir) to --data-dir and that logger.info still reports data_dir.
♻️ Duplicate comments (1)
src/srtctl/cli/setup_head.py (1)
157-165:⚠️ Potential issue | 🔴 CriticalShared
/tmp/natscleanup still breaks concurrent jobs.Lines 159-161 delete and recreate a global
/tmp/natspath. When multiple SLURM jobs run on the same node, one job'sshutil.rmtree("/tmp/nats")will wipe another job's JetStream store mid-operation.Since
portis now job-specific, use it (or a job_id parameter) to isolate the storage directory.🛠️ Proposed fix
-def start_nats(binary_path: str = "/configs/nats-server", port: int = 4222) -> subprocess.Popen: +def start_nats(binary_path: str = "/configs/nats-server", port: int = 4222, job_id: str | None = None) -> subprocess.Popen: """Start NATS server. Args: binary_path: Path to nats-server binary port: Port for NATS to listen on + job_id: SLURM job ID for unique storage directory Returns: Popen object for the NATS process """ if not os.path.exists(binary_path): raise FileNotFoundError(f"NATS binary not found: {binary_path}") - # Use /tmp for JetStream storage - avoids "Temporary storage directory" warning - # and ensures we're using fast local storage - if os.path.exists("/tmp/nats"): - shutil.rmtree("/tmp/nats") - nats_store_dir = "/tmp/nats" + # Use job-specific directory for JetStream storage to avoid conflicts + nats_store_dir = f"/tmp/nats-{job_id or port}" + if os.path.exists(nats_store_dir): + shutil.rmtree(nats_store_dir) os.makedirs(nats_store_dir, exist_ok=True)Then update the call site at line 324:
- nats_proc = start_nats(args.nats_binary, port=nats_port) + nats_proc = start_nats(args.nats_binary, port=nats_port, job_id=job_id)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/srtctl/cli/setup_head.py` around lines 157 - 165, The cleanup uses a shared path "/tmp/nats" which can race across concurrent jobs; change the nats_store_dir construction to be unique per job (e.g., include the port or a job_id) instead of hardcoding "/tmp/nats", stop calling shutil.rmtree on a global path, and only create/remove the job-specific directory; update the variables used here (nats_store_dir, port, binary_path) so cmd uses the new unique nats_store_dir and ensure any removal logic targets only that directory to avoid wiping other jobs' stores.
🧹 Nitpick comments (1)
src/srtctl/cli/setup_head.py (1)
29-49: Removeget_port_offset_from_job_id()and importget_port_offset()fromslurm.py.The function
get_port_offset()already exists insrc/srtctl/core/slurm.pywith identical logic. Both functions perform the same calculation(job_id % 100) * 10and handle environment variables identically. Theslurm.pyversion has better logging and documentation. Consolidate by importing from the core module to maintain a single source of truth.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/srtctl/cli/setup_head.py` around lines 29 - 49, Remove the duplicate get_port_offset_from_job_id function and instead import and re-export the canonical get_port_offset from the core slurm module: delete the get_port_offset_from_job_id definition in this file and add an import like "from srtctl.core.slurm import get_port_offset" (or equivalent relative import) so existing callers in this module use get_port_offset; ensure any references to get_port_offset_from_job_id are updated to call get_port_offset and keep existing behavior and logging intact by relying on the core slurm.get_port_offset implementation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@src/srtctl/cli/mixins/frontend_stage.py`:
- Around line 67-70: The property backend_processes currently has an empty body
while declaring a non-None return type; replace the empty implementation with an
explicit abstract stub so type checkers/CI are happy—update the
backend_processes `@property` in src/srtctl/cli/mixins/frontend_stage.py to either
contain an Ellipsis (...) or raise NotImplementedError() (prefer raising
NotImplementedError for clarity) instead of an empty body.
In `@src/srtctl/cli/setup_head.py`:
- Around line 201-224: The computed per-job Path variable data_dir is never used
and instead the hardcoded etcd_data_dir "/tmp/etcd" is created and passed to the
command; fix by replacing uses of etcd_data_dir with the previously computed
data_dir (ensure data_dir is str when inserted into cmd), remove the
unconditional shutil.rmtree("/tmp/etcd") and any creation of the hardcoded
/tmp/etcd, and update any os.makedirs call to create data_dir (use
data_dir.mkdir(parents=True, exist_ok=True) or os.makedirs(str(data_dir),
exist_ok=True)); ensure the cmd list passes the job-specific path (data_dir) to
--data-dir and that logger.info still reports data_dir.
In `@src/srtctl/core/topology.py`:
- Around line 107-113: The next_nixl_port method initializes _next_nixl_port
from base_nixl_port but fails to apply the job-specific port_offset, causing
cross-job collisions; update next_nixl_port to initialize _next_nixl_port as
self.base_nixl_port + self.port_offset (same approach used in
next_kv_events_port) so each job gets its offset-applied NIXL port range and
then continue incrementing _next_nixl_port as before.
---
Duplicate comments:
In `@src/srtctl/cli/setup_head.py`:
- Around line 157-165: The cleanup uses a shared path "/tmp/nats" which can race
across concurrent jobs; change the nats_store_dir construction to be unique per
job (e.g., include the port or a job_id) instead of hardcoding "/tmp/nats", stop
calling shutil.rmtree on a global path, and only create/remove the job-specific
directory; update the variables used here (nats_store_dir, port, binary_path) so
cmd uses the new unique nats_store_dir and ensure any removal logic targets only
that directory to avoid wiping other jobs' stores.
---
Nitpick comments:
In `@src/srtctl/cli/setup_head.py`:
- Around line 29-49: Remove the duplicate get_port_offset_from_job_id function
and instead import and re-export the canonical get_port_offset from the core
slurm module: delete the get_port_offset_from_job_id definition in this file and
add an import like "from srtctl.core.slurm import get_port_offset" (or
equivalent relative import) so existing callers in this module use
get_port_offset; ensure any references to get_port_offset_from_job_id are
updated to call get_port_offset and keep existing behavior and logging intact by
relying on the core slurm.get_port_offset implementation.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
src/srtctl/backends/base.pysrc/srtctl/backends/sglang.pysrc/srtctl/cli/do_sweep.pysrc/srtctl/cli/mixins/frontend_stage.pysrc/srtctl/cli/mixins/worker_stage.pysrc/srtctl/cli/setup_head.pysrc/srtctl/core/runtime.pysrc/srtctl/core/slurm.pysrc/srtctl/core/topology.pysrc/srtctl/frontends/dynamo.py
🚧 Files skipped from review as they are similar to previous changes (1)
- src/srtctl/core/slurm.py
2c891ca to
2de46f1
Compare
2de46f1 to
c8c3ec6
Compare
There was a problem hiding this comment.
♻️ Duplicate comments (3)
src/srtctl/cli/do_sweep.py (1)
99-116:⚠️ Potential issue | 🟠 MajorFix the typed bootstrap-port parsing path and keep allocator offset-aware.
Line 101 has an unnecessary suppression, Line 108 casts an
object-typed value directly, and Line 115 dropsport_offsetwhen constructingNodePortAllocator, which can defeat job-level isolation for that allocator path.🛠️ Suggested patch
- prefill_cfg: dict[str, object] = {} + prefill_cfg: dict[str, object] = {} try: - prefill_cfg = self.backend.get_config_for_mode("prefill") # type: ignore[assignment] + prefill_cfg = self.backend.get_config_for_mode("prefill") except Exception: prefill_cfg = {} user_bootstrap_port = prefill_cfg.get("disaggregation-bootstrap-port") if user_bootstrap_port is not None: try: - base_bootstrap_port = int(user_bootstrap_port) + base_bootstrap_port = int(str(user_bootstrap_port)) except (TypeError, ValueError): logger.warning( "Invalid disaggregation-bootstrap-port=%r; falling back to default bootstrap port allocation", user_bootstrap_port, ) else: - port_allocator = NodePortAllocator(base_bootstrap_port=base_bootstrap_port) + port_allocator = NodePortAllocator( + base_bootstrap_port=base_bootstrap_port, + port_offset=port_offset, + )Run this to verify the issue/fix points:
#!/bin/bash set -euo pipefail echo "Inspect bootstrap-port parsing block:" nl -ba src/srtctl/cli/do_sweep.py | sed -n '96,118p' echo echo "Inspect NodePortAllocator API for port_offset support:" rg -n "class NodePortAllocator|port_offset|base_bootstrap_port" src/srtctl/core/topology.py -C3Expected: the parser block no longer uses blanket
type: ignore, cast is type-safe, and allocator wiring includesport_offset.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/srtctl/cli/do_sweep.py` around lines 99 - 116, The parsed bootstrap-port handling currently suppresses typing, casts an object-typed value unsafely, and drops port_offset when creating the allocator; update the get_config_for_mode("prefill") usage to remove the blanket "# type: ignore", safely inspect and convert prefill_cfg.get("disaggregation-bootstrap-port") (e.g., check isinstance(user_bootstrap_port, (str, int)) and then int(user_bootstrap_port) or skip with a warning on bad types) instead of blind casting, and when constructing NodePortAllocator pass through the job-aware port_offset argument (e.g., NodePortAllocator(base_bootstrap_port=base_bootstrap_port, port_offset=port_offset)) so the allocator remains offset-aware; reference functions/vars: get_config_for_mode, prefill_cfg, user_bootstrap_port, and NodePortAllocator.src/srtctl/cli/mixins/worker_stage.py (1)
113-136:⚠️ Potential issue | 🟠 MajorPort/profile isolation rollout is incomplete across worker launch modes.
start_worker()is offset-aware now, butstart_endpoint_worker()still uses fixedETCD_ENDPOINTS/NATS_SERVERand host profile path (Line 248-Line 264). That reintroduces cross-job conflicts in MPI/per-endpoint mode.🛠️ Parity fix sketch for endpoint-launch mode
- env_to_set = { - "HEAD_NODE_IP": self.runtime.head_node_ip, - "ETCD_ENDPOINTS": f"http://{self.runtime.nodes.infra}:2379", - "NATS_SERVER": f"nats://{self.runtime.nodes.infra}:4222", - "DYN_SYSTEM_PORT": str(leader.sys_port), - } + env_to_set: dict[str, str] = {"HEAD_NODE_IP": self.runtime.head_node_ip} + if self.config.frontend.type == "dynamo": + from srtctl.core.slurm import get_port_offset + + port_offset = get_port_offset(self.runtime.job_id) + env_to_set.update( + { + "ETCD_ENDPOINTS": f"http://{self.runtime.infra_node_ip}:{2379 + port_offset}", + "NATS_SERVER": f"nats://{self.runtime.infra_node_ip}:{4222 + port_offset}", + "DYN_SYSTEM_PORT": str(leader.sys_port), + } + ) + frontend_plane = self.config.frontend.env.get("DYN_REQUEST_PLANE") if self.config.frontend.env else None + env_to_set["DYN_REQUEST_PLANE"] = frontend_plane if frontend_plane else "nats" @@ - if profiling.enabled: - profile_dir = str(self.runtime.log_dir / "profiles") - env_to_set.update(profiling.get_env_vars(mode, profile_dir)) + if profiling.enabled: + env_to_set.update(profiling.get_env_vars(mode, "/logs/profiles"))Also applies to: 158-160
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/srtctl/cli/mixins/worker_stage.py` around lines 113 - 136, The endpoint-launch path still hardcodes ETCD_ENDPOINTS/NATS_SERVER and the host profile path; update start_endpoint_worker to mirror start_worker by calling get_port_offset(self.runtime.job_id) (same import from srtctl.core.slurm), compute nats_port = 4222+offset and etcd_port = 2379+offset, and set ETCD_ENDPOINTS, NATS_SERVER and DYN_SYSTEM_PORT in the env when self.config.frontend.type == "dynamo" (respecting DYN_REQUEST_PLANE from self.config.frontend.env if present); also make the host profile path generation use the job-specific/profile-isolated name (e.g., include job_id/offset) so MPI/per-endpoint launches do not share the same host profile.src/srtctl/cli/setup_head.py (1)
157-162:⚠️ Potential issue | 🔴 CriticalShared
/tmpstore cleanup still breaks concurrent jobs.Line 159-Line 161 and Line 216-Line 224 still delete/reuse global NATS/etcd paths. Also, the per-job
data_dircomputed at Line 203-Line 211 is not used in the etcd command. This can wipe or conflict with another active job on the same node.🛠️ Suggested patch
-def start_nats(binary_path: str = "/configs/nats-server", port: int = 4222) -> subprocess.Popen: +def start_nats( + binary_path: str = "/configs/nats-server", + port: int = BASE_NATS_PORT, + job_id: str | None = None, +) -> subprocess.Popen: @@ - if os.path.exists("/tmp/nats"): - shutil.rmtree("/tmp/nats") - nats_store_dir = "/tmp/nats" + nats_store_dir = f"/tmp/nats-{job_id or port}" os.makedirs(nats_store_dir, exist_ok=True) @@ - if os.path.exists("/tmp/etcd"): - shutil.rmtree("/tmp/etcd") - etcd_data_dir = "/tmp/etcd" - os.makedirs(etcd_data_dir, exist_ok=True) - cmd = [ binary_path, "--data-dir", - etcd_data_dir, + str(data_dir), @@ - nats_proc = start_nats(args.nats_binary, port=nats_port) + nats_proc = start_nats(args.nats_binary, port=nats_port, job_id=job_id)Also applies to: 201-224
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/srtctl/cli/setup_head.py` around lines 157 - 162, The cleanup is deleting and reusing global /tmp paths (nats_store_dir = "/tmp/nats" and later etcd usage) which can disrupt concurrent jobs; modify the setup to create and use a per-job directory (use the existing data_dir computed in the code) for both NATS and etcd storage instead of /tmp/nats, remove any calls that rmtree or reuse global /tmp paths (os.path.exists(...) and shutil.rmtree on /tmp/nats), ensure os.makedirs uses the per-job data_dir for NATS (replace nats_store_dir) and update the etcd command invocation to point to data_dir (and any related variables) so each job uses isolated storage and no global directories are deleted.
🧹 Nitpick comments (1)
tests/test_frontend_topology.py (1)
44-59: Align the runtime fixture with productionfrontend_portderivation.
make_runtime()still relies onRuntimeContextdefault (8000). Settingfrontend_portexplicitly toEXPECTED_PUBLIC_PORTmakes the fixture behavior closer to realRuntimeContext.from_config()and avoids masking regressions in callers that readruntime.frontend_port.♻️ Suggested patch
def make_runtime(nodes: list[str]) -> RuntimeContext: """Create a minimal RuntimeContext for testing.""" return RuntimeContext( job_id=TEST_JOB_ID, run_name="test-run", nodes=Nodes(head=nodes[0], bench=nodes[0], infra=nodes[0], worker=tuple(nodes)), head_node_ip="10.0.0.1", infra_node_ip="10.0.0.1", log_dir=Path("/tmp/logs"), model_path=Path("/models/test-model"), container_image=Path("/path/to/container.sqsh"), gpus_per_node=8, network_interface=None, container_mounts={}, environment={}, + frontend_port=EXPECTED_PUBLIC_PORT, )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_frontend_topology.py` around lines 44 - 59, The test fixture make_runtime returns a RuntimeContext that still uses the default frontend_port (8000); update make_runtime to set the frontend_port explicitly to EXPECTED_PUBLIC_PORT when constructing the RuntimeContext so the test mirrors RuntimeContext.from_config behavior and catches regressions where callers read runtime.frontend_port; modify the RuntimeContext(...) call in make_runtime to include frontend_port=EXPECTED_PUBLIC_PORT (referencing the make_runtime function and RuntimeContext class).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/srtctl/cli/do_sweep.py`:
- Around line 99-116: The parsed bootstrap-port handling currently suppresses
typing, casts an object-typed value unsafely, and drops port_offset when
creating the allocator; update the get_config_for_mode("prefill") usage to
remove the blanket "# type: ignore", safely inspect and convert
prefill_cfg.get("disaggregation-bootstrap-port") (e.g., check
isinstance(user_bootstrap_port, (str, int)) and then int(user_bootstrap_port) or
skip with a warning on bad types) instead of blind casting, and when
constructing NodePortAllocator pass through the job-aware port_offset argument
(e.g., NodePortAllocator(base_bootstrap_port=base_bootstrap_port,
port_offset=port_offset)) so the allocator remains offset-aware; reference
functions/vars: get_config_for_mode, prefill_cfg, user_bootstrap_port, and
NodePortAllocator.
In `@src/srtctl/cli/mixins/worker_stage.py`:
- Around line 113-136: The endpoint-launch path still hardcodes
ETCD_ENDPOINTS/NATS_SERVER and the host profile path; update
start_endpoint_worker to mirror start_worker by calling
get_port_offset(self.runtime.job_id) (same import from srtctl.core.slurm),
compute nats_port = 4222+offset and etcd_port = 2379+offset, and set
ETCD_ENDPOINTS, NATS_SERVER and DYN_SYSTEM_PORT in the env when
self.config.frontend.type == "dynamo" (respecting DYN_REQUEST_PLANE from
self.config.frontend.env if present); also make the host profile path generation
use the job-specific/profile-isolated name (e.g., include job_id/offset) so
MPI/per-endpoint launches do not share the same host profile.
In `@src/srtctl/cli/setup_head.py`:
- Around line 157-162: The cleanup is deleting and reusing global /tmp paths
(nats_store_dir = "/tmp/nats" and later etcd usage) which can disrupt concurrent
jobs; modify the setup to create and use a per-job directory (use the existing
data_dir computed in the code) for both NATS and etcd storage instead of
/tmp/nats, remove any calls that rmtree or reuse global /tmp paths
(os.path.exists(...) and shutil.rmtree on /tmp/nats), ensure os.makedirs uses
the per-job data_dir for NATS (replace nats_store_dir) and update the etcd
command invocation to point to data_dir (and any related variables) so each job
uses isolated storage and no global directories are deleted.
---
Nitpick comments:
In `@tests/test_frontend_topology.py`:
- Around line 44-59: The test fixture make_runtime returns a RuntimeContext that
still uses the default frontend_port (8000); update make_runtime to set the
frontend_port explicitly to EXPECTED_PUBLIC_PORT when constructing the
RuntimeContext so the test mirrors RuntimeContext.from_config behavior and
catches regressions where callers read runtime.frontend_port; modify the
RuntimeContext(...) call in make_runtime to include
frontend_port=EXPECTED_PUBLIC_PORT (referencing the make_runtime function and
RuntimeContext class).
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: f3359736-c4af-43c4-8a7f-da9cec2d2f56
📒 Files selected for processing (11)
src/srtctl/backends/base.pysrc/srtctl/backends/sglang.pysrc/srtctl/cli/do_sweep.pysrc/srtctl/cli/mixins/frontend_stage.pysrc/srtctl/cli/mixins/worker_stage.pysrc/srtctl/cli/setup_head.pysrc/srtctl/core/runtime.pysrc/srtctl/core/slurm.pysrc/srtctl/core/topology.pysrc/srtctl/frontends/dynamo.pytests/test_frontend_topology.py
🚧 Files skipped from review as they are similar to previous changes (3)
- src/srtctl/cli/mixins/frontend_stage.py
- src/srtctl/core/topology.py
- src/srtctl/backends/sglang.py
- Fix etcd data_dir: use computed per-job path instead of hardcoded /tmp/etcd - Fix NIXL port allocation: add port_offset to avoid cross-job collisions - Fix HTTP port step: change +1000 to +1 to avoid overlapping bootstrap range - Fix health check port: use runtime.frontend_port instead of hardcoded 8000 - Fix container_mounts type hint: widen to Mapping[Path|str, Path|str] - Fix FrontendStageMixin.backend_processes: use raise NotImplementedError - Add TODO comment on start_endpoint_worker for TRTLLM port-offset gap - Clarify NATS/etcd /tmp comments: container-local, no cross-job conflict
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/srtctl/cli/mixins/frontend_stage.py (1)
158-163:⚠️ Potential issue | 🟠 MajorWiden the
container_mountsannotation inRuntimeContextto match the parameter type accepted bystart_srun_process().Line 112 of
src/srtctl/core/runtime.pydeclarescontainer_mounts: dict[Path, Path], butstart_srun_process()expectsMapping[Path | str, Path | str]. Change the annotation tocontainer_mounts: dict[Path | str, Path | str]to resolve the type incompatibility at all call sites, includingsrc/srtctl/cli/mixins/frontend_stage.py:163andsrc/srtctl/cli/mixins/benchmark_stage.py:168.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/srtctl/cli/mixins/frontend_stage.py` around lines 158 - 163, The RuntimeContext type for container_mounts is too narrow (dict[Path, Path]) for start_srun_process which accepts Mapping[Path|str, Path|str]; update the RuntimeContext annotation named container_mounts to dict[Path | str, Path | str] (or Mapping[Path | str, Path | str]) so it matches start_srun_process's parameter and resolves type errors at call sites (references: RuntimeContext.container_mounts, start_srun_process, and callers in frontend_stage and benchmark_stage); adjust or add necessary typing imports (Path, Mapping) if missing.
🧹 Nitpick comments (3)
src/srtctl/cli/mixins/frontend_stage.py (1)
84-97: ReuseRuntimeContext.frontend_portinstead of re-deriving it here.Line 96 is now a second copy of the public-port formula. Reusing
self.runtime.frontend_portkeeps frontend startup, health checks, and any future port overrides from drifting apart.As per coding guidelines, "Create context objects (like `RuntimeContext`) that compute all derived paths/values once at startup rather than recomputing".♻️ Proposed refactor
- base_public_port = 8000 + port_offset + base_public_port = self.runtime.frontend_port base_internal_port = 9090 + port_offset🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/srtctl/cli/mixins/frontend_stage.py` around lines 84 - 97, Replace the ad-hoc public-port calculation with the runtime-provided frontend port: stop recomputing base_public_port as 8000 + get_port_offset(... ) and instead use self.runtime.frontend_port so frontend startup/healthchecks and overrides remain consistent; update the assignment where base_public_port is set and any subsequent uses of base_public_port in this function to reference the same variable (leave base_internal_port logic as-is or recompute from runtime values if needed).src/srtctl/cli/setup_head.py (2)
144-145: Use the new base-port constants as the helper defaults.These signatures still repeat
4222/2379/2380, so a later base-port change can desyncmain()from callers that rely on the defaults. ReuseBASE_NATS_PORT,BASE_ETCD_CLIENT_PORT, andBASE_ETCD_PEER_PORThere to keep the file single-sourced.♻️ Proposed cleanup
-def start_nats(binary_path: str = "/configs/nats-server", port: int = 4222) -> subprocess.Popen: +def start_nats(binary_path: str = "/configs/nats-server", port: int = BASE_NATS_PORT) -> subprocess.Popen: @@ - client_port: int = 2379, - peer_port: int = 2380, + client_port: int = BASE_ETCD_CLIENT_PORT, + peer_port: int = BASE_ETCD_PEER_PORT,Also applies to: 179-180
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/srtctl/cli/setup_head.py` around lines 144 - 145, Update the helper default port parameters to use the central base-port constants instead of hard-coded numbers: replace the literal 4222 in start_nats(binary_path: str = "/configs/nats-server", port: int = 4222) with BASE_NATS_PORT, and similarly replace the hard-coded etcd ports in the other helper signatures (the functions referenced around lines 179-180) to use BASE_ETCD_CLIENT_PORT and BASE_ETCD_PEER_PORT respectively; ensure these constants are imported/available in the module so callers and main() remain single-sourced for port defaults.
29-48: Reuse the shared SLURM port-offset helper here.
src/srtctl/core/slurm.pyalready owns this calculation. Keeping a second copy insetup_head.pymeans head setup can drift away from the orchestrator's port math, andmain()only capturesSLURM_JOB_ID, so aSLURM_JOBID-only environment logsN/Aand skips the job-id-specific etcd path even though the offset still resolves. Import the canonicalget_slurm_job_id()/get_port_offset()pair instead of maintaining a local clone.Also applies to: 300-303
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/srtctl/cli/setup_head.py` around lines 29 - 48, Replace the local get_port_offset_from_job_id implementation with the canonical pair from the slurm helper: import get_slurm_job_id() and get_port_offset() and use get_slurm_job_id() to obtain the job id (it handles SLURM_JOB_ID/SLURM_JOBID) and then call get_port_offset(job_id) instead of the local math; update any calls in main() that currently read SLURM env vars or call get_port_offset_from_job_id to use these imported functions and remove the duplicated helper function get_port_offset_from_job_id.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/srtctl/cli/mixins/benchmark_stage.py`:
- Around line 79-81: The manual-mode URL print is still hardcoded to port 8000
while wait_for_model uses self.runtime.frontend_port, causing wrong URLs when an
offset is present; update the URL formatting in the benchmark_stage mixin (the
print/log call around the manual-mode block that currently prints
"http://...:8000") to use self.runtime.nodes.head and self.runtime.frontend_port
(the same values passed to wait_for_model), and make the same replacement for
the other occurrence in the manual-mode block (the print/log between lines
~107-110) so both messages reflect the actual runtime.frontend_port.
---
Outside diff comments:
In `@src/srtctl/cli/mixins/frontend_stage.py`:
- Around line 158-163: The RuntimeContext type for container_mounts is too
narrow (dict[Path, Path]) for start_srun_process which accepts Mapping[Path|str,
Path|str]; update the RuntimeContext annotation named container_mounts to
dict[Path | str, Path | str] (or Mapping[Path | str, Path | str]) so it matches
start_srun_process's parameter and resolves type errors at call sites
(references: RuntimeContext.container_mounts, start_srun_process, and callers in
frontend_stage and benchmark_stage); adjust or add necessary typing imports
(Path, Mapping) if missing.
---
Nitpick comments:
In `@src/srtctl/cli/mixins/frontend_stage.py`:
- Around line 84-97: Replace the ad-hoc public-port calculation with the
runtime-provided frontend port: stop recomputing base_public_port as 8000 +
get_port_offset(... ) and instead use self.runtime.frontend_port so frontend
startup/healthchecks and overrides remain consistent; update the assignment
where base_public_port is set and any subsequent uses of base_public_port in
this function to reference the same variable (leave base_internal_port logic
as-is or recompute from runtime values if needed).
In `@src/srtctl/cli/setup_head.py`:
- Around line 144-145: Update the helper default port parameters to use the
central base-port constants instead of hard-coded numbers: replace the literal
4222 in start_nats(binary_path: str = "/configs/nats-server", port: int = 4222)
with BASE_NATS_PORT, and similarly replace the hard-coded etcd ports in the
other helper signatures (the functions referenced around lines 179-180) to use
BASE_ETCD_CLIENT_PORT and BASE_ETCD_PEER_PORT respectively; ensure these
constants are imported/available in the module so callers and main() remain
single-sourced for port defaults.
- Around line 29-48: Replace the local get_port_offset_from_job_id
implementation with the canonical pair from the slurm helper: import
get_slurm_job_id() and get_port_offset() and use get_slurm_job_id() to obtain
the job id (it handles SLURM_JOB_ID/SLURM_JOBID) and then call
get_port_offset(job_id) instead of the local math; update any calls in main()
that currently read SLURM env vars or call get_port_offset_from_job_id to use
these imported functions and remove the duplicated helper function
get_port_offset_from_job_id.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 93a24df6-8dfc-4757-a39d-38e739eae116
📒 Files selected for processing (6)
src/srtctl/cli/mixins/benchmark_stage.pysrc/srtctl/cli/mixins/frontend_stage.pysrc/srtctl/cli/mixins/worker_stage.pysrc/srtctl/cli/setup_head.pysrc/srtctl/core/slurm.pysrc/srtctl/core/topology.py
🚧 Files skipped from review as they are similar to previous changes (3)
- src/srtctl/core/slurm.py
- src/srtctl/core/topology.py
- src/srtctl/cli/mixins/worker_stage.py
The manual benchmark mode log message was still hardcoding port 8000, causing operators to see the wrong endpoint URL when port offset is non-zero. Made-with: Cursor
|
This PR is crucial for Wide-EP srt-slurm submissions on certain clusters, or single rack multiple jobs. |
Summary
When multiple SLURM jobs run on the same nodes, they share the same port space and can conflict on NATS, etcd, HTTP, and worker system ports. This PR adds a job-based port offset
(job_id % 100) * 10applied consistently across all services:get_port_offset()utility functionNodePortAllocatorgainsport_offsetfield andwith_job_offset()factoryfrontend_portcalculated with offsetendpoints_to_processesacceptsport_allocatorfor offset-aware port assignmentTest plan
Summary by CodeRabbit
New Features
Bug Fixes / Validation
Other