Refactor task construction for generation#888
Conversation
WalkthroughAdds a declarative pipeline system (Command, HardwareConfig, CommandGroup, Pipeline), new command builders ( Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor User
participant Generate as generate.py
participant CmdUtils as utils/commands.py
participant Decl as utils/declarative.py
participant Executor as Nemo-Run
User->>Generate: start generation (config)
Generate->>CmdUtils: request server/sandbox command
CmdUtils-->>Generate: (command_str, metadata)
Generate->>Decl: build Command(s) & CommandGroup(s)
Generate->>Decl: construct Pipeline(jobs, dependencies, reuse_code)
Decl->>Decl: plan jobs, resolve containers/env, compute HW
Decl->>Executor: create experiments / add tasks
Executor-->>Decl: experiment handles / job ids
Decl-->>Generate: run result or job names (if reuse)
Generate-->>User: return outcome
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (2)
🧰 Additional context used🧬 Code graph analysis (2)nemo_skills/pipeline/generate.py (5)
nemo_skills/pipeline/utils/declarative.py (6)
🪛 Ruff (0.13.3)nemo_skills/pipeline/generate.py52-52: Unused function argument: (ARG001) nemo_skills/pipeline/utils/declarative.py179-182: Avoid specifying long messages outside the exception class (TRY003) 185-185: Unused method argument: (ARG002) 304-304: Avoid specifying long messages outside the exception class (TRY003) 309-309: Avoid specifying long messages outside the exception class (TRY003) 313-313: Avoid specifying long messages outside the exception class (TRY003) 315-315: Avoid specifying long messages outside the exception class (TRY003) 321-325: Avoid specifying long messages outside the exception class (TRY003) 327-327: Avoid specifying long messages outside the exception class (TRY003) 395-395: Avoid specifying long messages outside the exception class (TRY003) 402-405: Avoid specifying long messages outside the exception class (TRY003) 448-448: Avoid specifying long messages outside the exception class (TRY003) 557-557: Avoid specifying long messages outside the exception class (TRY003) 566-566: Loop control variable (B007) 648-648: Do not catch blind exception: (BLE001) 651-651: Do not catch blind exception: (BLE001) ⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
d059811 to
b9a597e
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
nemo_skills/pipeline/generate.py (1)
237-241: keep_mounts_for_sandbox is ignored (regression).CLI flag exists but is never used; sandbox currently always runs with empty mounts. Pass this flag into the helper and honor it.
with_sandbox: bool = typer.Option(False, help="If True, will start a sandbox container alongside this job"), keep_mounts_for_sandbox: bool = typer.Option( False, help="If True, will keep the mounts for the sandbox container. Note that, it is risky given that sandbox executes LLM commands and could potentially lead to data loss. So, we advise not to use this unless absolutely necessary.", ),And at the call site:
- cmd_group = _create_commandgroup_from_config( + cmd_group = _create_commandgroup_from_config( generation_cmd=cmd, server_config=server_config.copy() if server_config else None, with_sandbox=with_sandbox, sandbox_port=None if get_random_port else 6000, cluster_config=cluster_config, installation_command=installation_command, get_server_command_fn=generation_task.get_server_command_fn(), partition=partition, time_min=time_min, exclusive=exclusive, task_name=task_name, log_dir=log_dir, + keep_mounts_for_sandbox=keep_mounts_for_sandbox, )Also update helper signature (see next comment) and within make_sandbox_cmd, if keep_mounts_for_sandbox: remove metadata["mounts"] so default mounts apply.
🧹 Nitpick comments (9)
tests/test_declarative_pipeline.py (1)
107-118: Handle empty metadata gracefully in tests
meta_refraisesKeyErroron missing keys by design. Instead of callingCommand(command="echo test", name="test")(no metadata) and asserting a KeyError, craft the command withmetadata={}to avoid wrapping logic alteringself.command, ensuring the test reflects the intended control path.nemo_skills/pipeline/generate.py (4)
84-91: Avoid double cd/export wrapping for server command.get_server_command() already includes cd /nemo_run/code and PYTHONPATH export. Command will wrap again due to default working_dir. Set working_dir="" to prevent redundant wrapping.
server_cmd = Command( command=make_server_cmd, container=server_container, gpus=server_config["num_gpus"], nodes=server_config["num_nodes"], name=task_name, + working_dir="", )
94-101: Avoid double cd/export wrapping for client command.You pre‑wrap via wrap_python_path, and Command will wrap again because of default working_dir. Set working_dir="" to avoid “cd /nemo_run/code && cd /nemo_run/code …”.
client_cmd = Command( command=generation_cmd, container=cluster_config["containers"]["nemo-skills"], name=task_name, installation_command=installation_command, metadata={"log_prefix": "main"}, + working_dir="", )
64-72: Don’t mutate server_config; handle missing container safely.Using pop mutates the dict and relies on cluster_config["containers"][server_type] existing. Also ensure “container” isn’t passed to get_server_command.
- if server_config is not None and int(server_config["num_gpus"]) > 0: + if server_config is not None and int(server_config.get("num_gpus") or 0) > 0: server_type = server_config["server_type"] - server_container = server_config.pop("container", cluster_config["containers"][server_type]) + server_container = server_config.get("container") or cluster_config["containers"].get(server_type) + if not server_container: + raise ValueError( + f"No container configured for server_type='{server_type}'. " + "Provide --server_container or define containers[server_type] in the cluster config." + ) # Create server command builder that defers execution until cluster_config is available - server_config_copy = server_config.copy() + server_config_copy = {k: v for k, v in server_config.items() if k != "container"}
40-53: Extend helper signature to accept keep_mounts_for_sandbox.Wire the CLI flag through; default False maintains current behavior.
def _create_commandgroup_from_config( generation_cmd: str, server_config: Optional[Dict], with_sandbox: bool, sandbox_port: Optional[int], cluster_config: Dict, installation_command: Optional[str], get_server_command_fn: Callable, partition: Optional[str], time_min: Optional[str], exclusive: bool, task_name: str, log_dir: str, + keep_mounts_for_sandbox: bool = False, ) -> CommandGroup:And inside make_sandbox_cmd after building metadata:
- # Merge metadata + # Merge metadata metadata = initial_metadata.copy() ... + if keep_mounts_for_sandbox: + # None -> use default mounts from cluster config; [] -> no mounts + metadata.pop("mounts", None)nemo_skills/pipeline/utils/commands.py (2)
36-37: Silence unused kwargs without breaking call sites.Rename **kwargs to **_ to placate linters while keeping a flexible signature.
- entrypoint: Optional[str] = None, - **kwargs, + entrypoint: Optional[str] = None, + **_,
79-79: Same nit: unused kwargs in sandbox_command.-def sandbox_command(port: Optional[int] = None, **kwargs) -> Tuple[Callable, Dict]: +def sandbox_command(port: Optional[int] = None, **_) -> Tuple[Callable, Dict]:nemo_skills/pipeline/utils/declarative.py (2)
169-171: Consistent wrapping for callable commands (optional).Currently only string commands are auto‑wrapped with working_dir/env_vars; callables aren’t. Consider also wrapping final_command after evaluation to keep behavior consistent.
Option:
- Add a flag on Command (e.g., wrap_callable=True) and, in prepare_for_execution, if flag and (env_vars or working_dir), wrap final_command with wrap_command(). This avoids surprising differences between str vs callable commands.
Also applies to: 232-235
631-639: Narrow broad exceptions when resolving reuse experiment.Catching Exception hides actionable errors. Prefer specific exceptions from nemo_run (e.g., FileNotFoundError, ValueError) and log others.
- if isinstance(reuse_exp, str): - try: - reuse_exp = run.Experiment.from_id(reuse_exp) - except Exception: - try: - reuse_exp = run.Experiment.from_title(reuse_exp) - except Exception: + if isinstance(reuse_exp, str): + try: + reuse_exp = run.Experiment.from_id(reuse_exp) + except (FileNotFoundError, AssertionError): + try: + reuse_exp = run.Experiment.from_title(reuse_exp) + except (FileNotFoundError, AssertionError): LOG.warning(f"Failed to load experiment {reuse_exp} for code reuse") reuse_exp = None
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
nemo_skills/pipeline/generate.py(3 hunks)nemo_skills/pipeline/utils/commands.py(1 hunks)nemo_skills/pipeline/utils/declarative.py(1 hunks)tests/test_declarative_pipeline.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
tests/test_declarative_pipeline.py (1)
nemo_skills/pipeline/utils/declarative.py (9)
Command(124-250)CommandGroup(264-277)HardwareConfig(254-261)Pipeline(280-725)prepare_for_execution(195-247)meta_ref(186-193)hostname_ref(179-184)_get_cluster_config(323-330)run(332-464)
nemo_skills/pipeline/utils/commands.py (2)
nemo_skills/pipeline/utils/exp.py (1)
get_sandbox_command(105-108)nemo_skills/pipeline/utils/server.py (2)
get_free_port(41-57)get_server_command(112-210)
nemo_skills/pipeline/generate.py (6)
nemo_skills/dataset/utils.py (1)
import_from_path(60-67)nemo_skills/pipeline/utils/commands.py (1)
sandbox_command(79-116)nemo_skills/pipeline/utils/declarative.py (5)
Command(124-250)CommandGroup(264-277)HardwareConfig(254-261)Pipeline(280-725)run(332-464)nemo_skills/inference/generate.py (1)
get_server_command_fn(240-250)nemo_skills/pipeline/utils/generation.py (3)
get_chunked_rs_filename(29-45)configure_client(305-365)get_generation_cmd(178-280)nemo_skills/pipeline/utils/server.py (1)
wrap_python_path(64-65)
nemo_skills/pipeline/utils/declarative.py (6)
nemo_skills/pipeline/utils/cluster.py (5)
get_cluster_config(232-286)get_env_variables(108-200)get_tunnel(357-362)temporary_env_update(204-213)tunnel_hash(351-354)nemo_skills/pipeline/utils/exp.py (5)
get_executor(154-282)get_exp(649-665)get_exp_handles(57-102)run_exp(615-646)install_packages_wrap(285-325)nemo_skills/pipeline/utils/commands.py (1)
wrap_command(119-144)nemo_skills/pipeline/utils/mounts.py (1)
is_mounted_filepath(27-46)nemo_skills/pipeline/utils/packager.py (1)
get_registered_external_repo(64-76)nemo_skills/utils.py (1)
get_logger_name(130-134)
🪛 Ruff (0.13.2)
tests/test_declarative_pipeline.py
74-74: Unpacked variable exec_config is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
102-102: Unpacked variable exec_config is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
275-275: Unused method argument: mock_get_config
(ARG002)
292-292: Unused method argument: mock_run_exp
(ARG002)
322-322: Unused method argument: mock_run_exp
(ARG002)
nemo_skills/pipeline/utils/commands.py
36-36: Unused function argument: kwargs
(ARG001)
79-79: Unused function argument: kwargs
(ARG001)
nemo_skills/pipeline/utils/declarative.py
189-192: Avoid specifying long messages outside the exception class
(TRY003)
310-310: Avoid specifying long messages outside the exception class
(TRY003)
319-319: Avoid specifying long messages outside the exception class
(TRY003)
327-327: Avoid specifying long messages outside the exception class
(TRY003)
363-367: Avoid specifying long messages outside the exception class
(TRY003)
369-369: Avoid specifying long messages outside the exception class
(TRY003)
447-447: Avoid specifying long messages outside the exception class
(TRY003)
546-546: Avoid specifying long messages outside the exception class
(TRY003)
555-555: Loop control variable het_idx not used within loop body
(B007)
634-634: Do not catch blind exception: Exception
(BLE001)
637-637: Do not catch blind exception: Exception
(BLE001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: unit-tests
🔇 Additional comments (1)
nemo_skills/pipeline/utils/declarative.py (1)
553-559: Hetero env probing re-evaluates builders (side effects).prepare_for_execution is called once for probing and again for planning, potentially running builder logic twice. If builders have side effects (e.g., dynamic ports), this can diverge.
- Ensure builders are side‑effect free on repeated calls, or memoize their metadata during probing.
| from typing import Callable, Dict, List, Optional, Tuple, Union | ||
|
|
There was a problem hiding this comment.
Use typing.Any instead of built-in any.
Fix annotation and import.
-from typing import Callable, Dict, List, Optional, Tuple, Union
+from typing import Any, Callable, Dict, List, Optional, Tuple, Union
...
- metadata: Dict[str, any] = field(default_factory=dict) # Stores metadata from command builders
+ metadata: Dict[str, Any] = field(default_factory=dict) # Stores metadata from command buildersAlso applies to: 157-158
🤖 Prompt for AI Agents
In nemo_skills/pipeline/utils/declarative.py around lines 100-101 and 157-158,
the type annotation uses the built-in any instead of typing.Any; import Any from
typing (add it to the existing typing import list) and replace the occurrences
of any with Any in the annotations at those line ranges so the annotations use
the proper typing.Any type.
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (3)
nemo_skills/pipeline/generate.py (1)
106-116: Sandbox env merge drops required ports (breaks sandbox).You overwrite the initial metadata.environment with runtime PYTHONPATH, losing LISTEN_PORT/NGINX_PORT. Result: /start-with-nginx.sh may not listen on the intended port.
Deep-merge environment instead of replacing it.
Apply this diff:
def make_sandbox_cmd(cfg): # sandbox_command returns (callable, metadata), so we need to call the callable cmd_builder, initial_metadata = sandbox_command(port=sandbox_port) # Call the builder to get the actual command string cmd_string, runtime_metadata = cmd_builder(cfg) - # Merge metadata - metadata = initial_metadata.copy() - metadata.update(runtime_metadata) + # Merge metadata (deep-merge environment) + metadata = initial_metadata.copy() + if "environment" in runtime_metadata: + env = metadata.get("environment", {}).copy() + env.update(runtime_metadata["environment"]) + metadata["environment"] = env + for k, v in runtime_metadata.items(): + if k != "environment": + metadata[k] = v metadata["log_prefix"] = "sandbox" return (cmd_string, metadata)nemo_skills/pipeline/utils/declarative.py (2)
95-100: Missing typing.Any import.Line 100 imports typing utilities but omits
Any, which is needed at lines 157-158 for the metadata annotation. This was flagged in a previous review.Apply this diff:
import inspect import logging import shlex from contextlib import nullcontext from dataclasses import dataclass, field -from typing import Callable, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union
157-157: Use typing.Any instead of built-in any.Line 157 uses the built-in
anyinstead oftyping.Anyfor the type annotation. This was flagged in a previous review.Apply this diff:
installation_command: Optional[str] = None port: Optional[int] = None # Can be set from metadata - metadata: Dict[str, any] = field(default_factory=dict) # Stores metadata from command builders + metadata: Dict[str, Any] = field(default_factory=dict) # Stores metadata from command builders het_group_index: Optional[int] = None # Set per-job by Pipeline (not global)
🧹 Nitpick comments (1)
nemo_skills/pipeline/utils/declarative.py (1)
578-581: Unused loop variablehet_idx.The loop at line 578 enumerates over groups but never uses
het_idxin the body. Consider using_to indicate the index is intentionally unused.Apply this diff:
# In heterogeneous jobs, collect environment from all commands for cross-component refs shared_env_vars: Dict[str, str] = {} if heterogeneous: - for het_idx, group in enumerate(groups): + for _, group in enumerate(groups): for command in group.components: _, exec_config_probe = command.prepare_for_execution(cluster_config) shared_env_vars.update(exec_config_probe.get("environment", {}))
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
nemo_skills/pipeline/generate.py(3 hunks)nemo_skills/pipeline/utils/declarative.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
nemo_skills/pipeline/generate.py (6)
nemo_skills/pipeline/app.py (1)
typer_unpacker(25-53)nemo_skills/pipeline/utils/commands.py (1)
sandbox_command(79-116)nemo_skills/pipeline/utils/declarative.py (5)
Command(124-250)CommandGroup(264-277)HardwareConfig(254-261)Pipeline(280-748)run(332-487)nemo_skills/inference/generate.py (1)
get_server_command_fn(240-250)nemo_skills/pipeline/utils/generation.py (2)
configure_client(305-365)get_generation_cmd(178-280)nemo_skills/pipeline/utils/server.py (1)
wrap_python_path(64-65)
nemo_skills/pipeline/utils/declarative.py (6)
nemo_skills/pipeline/utils/cluster.py (5)
get_cluster_config(232-286)get_env_variables(108-200)get_tunnel(357-362)temporary_env_update(204-213)tunnel_hash(351-354)nemo_skills/pipeline/utils/exp.py (5)
get_executor(154-282)get_exp(649-665)get_exp_handles(57-102)run_exp(615-646)install_packages_wrap(285-325)nemo_skills/pipeline/utils/commands.py (1)
wrap_command(119-144)nemo_skills/pipeline/utils/mounts.py (1)
is_mounted_filepath(27-46)nemo_skills/pipeline/utils/packager.py (1)
get_registered_external_repo(64-76)nemo_skills/utils.py (1)
get_logger_name(130-134)
🪛 Ruff (0.13.2)
nemo_skills/pipeline/utils/declarative.py
189-192: Avoid specifying long messages outside the exception class
(TRY003)
310-310: Avoid specifying long messages outside the exception class
(TRY003)
319-319: Avoid specifying long messages outside the exception class
(TRY003)
327-327: Avoid specifying long messages outside the exception class
(TRY003)
365-369: Avoid specifying long messages outside the exception class
(TRY003)
371-371: Avoid specifying long messages outside the exception class
(TRY003)
465-465: Avoid specifying long messages outside the exception class
(TRY003)
569-569: Avoid specifying long messages outside the exception class
(TRY003)
578-578: Loop control variable het_idx not used within loop body
(B007)
657-657: Do not catch blind exception: Exception
(BLE001)
660-660: Do not catch blind exception: Exception
(BLE001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: unit-tests
- GitHub Check: pre-commit
🔇 Additional comments (9)
nemo_skills/pipeline/generate.py (3)
17-17: Missing type import for Callable usage.Line 17 imports
Callablefromtyping, which is used at line 47 in the function signature. However, this import is appropriate and correctly placed.
40-141: Function structure looks good overall.The function correctly constructs a CommandGroup from configuration, properly ordering components (server, client, sandbox) and computing maximum resource requirements.
368-478: Declarative pipeline refactor looks solid.The refactor properly converts the ad-hoc task expansion to a declarative pipeline approach:
- Builds job specifications with correct dependency chains for dependent_jobs
- First job in each chain receives external dependencies and run_after
- Subsequent jobs properly chain to previous job
- Pipeline.run() correctly passes _reuse_exp for integration with existing experiments
nemo_skills/pipeline/utils/declarative.py (6)
218-226: Excellent deep-merge pattern for environment metadata.The deep merge at lines 221-226 correctly preserves existing environment variables while adding new ones. This is the pattern that should be used in generate.py to fix the sandbox environment merge issue.
253-277: HardwareConfig and CommandGroup classes are well-structured.Both classes provide clean abstractions for hardware requirements and command grouping.
280-330: Pipeline initialization and cluster config loading are correct.The constructor properly validates mutually exclusive groups/jobs parameters and supports both legacy and new modes. The lazy cluster config loading is appropriate.
654-673: Blind exception handling is acceptable here.Lines 657 and 660 catch
Exception, which static analysis flags. However, this is appropriate since the code attempts multiple fallback paths (from_id, then from_title) and logs failures. The broad catch prevents any unexpected error from breaking code reuse.
332-487: Pipeline.run() orchestration is well-designed.The run method properly:
- Validates cluster configuration and HF_HOME
- Resolves dependencies (internal, external experiments, task handles)
- Adds jobs with correct dependency chains
- Handles _reuse_exp for integration with existing experiments
- Returns appropriate result based on context
489-748: Helper methods provide clean separation of concerns.The helper methods (_prepare_command, _resolve_container, _create_executor, _plan_and_add_job, _add_single_group_job, _add_multi_group_job) properly encapsulate execution planning, resource allocation, and job submission logic.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
nemo_skills/pipeline/generate.py (1)
373-376: Remove unusedall_job_nameslist.
all_job_namesis only initialized (line 375) and appended to (line 465) but never used; remove both to eliminate dead code.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
nemo_skills/pipeline/generate.py(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
nemo_skills/pipeline/generate.py (4)
nemo_skills/pipeline/utils/commands.py (1)
sandbox_command(79-116)nemo_skills/pipeline/utils/declarative.py (5)
Command(124-250)CommandGroup(264-277)HardwareConfig(254-261)Pipeline(280-748)run(332-487)nemo_skills/pipeline/utils/generation.py (3)
get_chunked_rs_filename(29-45)configure_client(305-365)get_generation_cmd(178-280)nemo_skills/pipeline/utils/server.py (1)
wrap_python_path(64-65)
🪛 GitHub Actions: Lint and Format
nemo_skills/pipeline/generate.py
[error] 119-125: ruff-format: 1 file reformatted by the hook; pre-commit hook 'ruff-format' failed with exit code 1. Please re-run the commit to apply formatting.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: unit-tests
🔇 Additional comments (12)
nemo_skills/pipeline/generate.py (12)
17-17: LGTM!The new imports correctly support the declarative pipeline refactor:
Callablefor deferred command builders, and the declarative primitives (Command,CommandGroup,HardwareConfig,Pipeline) plussandbox_commandutility.Also applies to: 25-26
40-60: LGTM!The function signature is well-defined with clear type hints, and the docstring effectively documents the component ordering convention (server, client, sandbox).
64-91: LGTM!The server command is correctly constructed using a lazy lambda to defer
cluster_configaccess. Theserver_config.pop("container"...)mutation is safe because the caller copiesserver_configbefore passing it (line 429).
93-101: LGTM!The client command is properly constructed with the generation command string, and the
installation_commandcorrectly targets only the main task (not server or sandbox).
103-127: LGTM!The sandbox command construction correctly implements the deep-merge for the
environmentdictionary (lines 113-116), preservingLISTEN_PORT/NGINX_PORTfrominitial_metadatawhile addingPYTHONPATHfromruntime_metadata. This addresses the past review comment about environment variable loss.
129-145: LGTM!The hardware config correctly computes the maximum GPUs and nodes across all components (lines 131-132), ensuring the job-level resource request accommodates the most demanding component. This is essential for heterogeneous jobs.
377-385: LGTM!The loop structure correctly iterates over seeds and chunks, and properly limits wandb logging to the first seed (line 412:
seed_idx == 0).
386-415: LGTM!The server/client configuration and command generation flow correctly reuses existing utilities (
configure_client,get_generation_cmd,wrap_python_path).
417-420: LGTM!Task naming correctly incorporates seed and chunk identifiers when present.
422-465: LGTM!The dependency chain logic correctly handles the
dependent_jobsparameter:
- The first job (dep_idx=0) inherits external dependencies (
_task_dependenciesorrun_after)- Subsequent jobs form a sequential chain by depending on the previous job
The internal naming strategy (
task_name-dep{idx}) ensures unique job names for dependency resolution while preserving the user-facingtask_namefor all tasks in the chain.
467-469: LGTM!The early return when no jobs remain is correct and avoids unnecessary pipeline creation.
471-483: LGTM!The pipeline creation and execution correctly instantiates the
Pipelinewith the jobs list and passes the necessary parameters torun(). The result is appropriately returned.
There was a problem hiding this comment.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
nemo_skills/pipeline/generate.py(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
nemo_skills/pipeline/generate.py (6)
nemo_skills/dataset/utils.py (1)
import_from_path(60-67)nemo_skills/pipeline/utils/commands.py (1)
sandbox_command(79-116)nemo_skills/pipeline/utils/declarative.py (5)
Command(124-250)CommandGroup(264-277)HardwareConfig(254-261)Pipeline(280-748)run(332-487)nemo_skills/inference/generate.py (1)
get_server_command_fn(240-250)nemo_skills/pipeline/utils/generation.py (3)
get_chunked_rs_filename(29-45)configure_client(305-365)get_generation_cmd(178-280)nemo_skills/pipeline/utils/server.py (1)
wrap_python_path(64-65)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: unit-tests
| metadata = { | ||
| "port": port, | ||
| "log_prefix": "sandbox", | ||
| "mounts": [], # Sandbox doesn't mount anything |
There was a problem hiding this comment.
this has recently been changed, there is now a parameter that controls this
| slurm_kwargs={"exclusive": hardware.exclusive} if (hardware and hardware.exclusive) else None, | ||
| ) | ||
|
|
||
| def _plan_and_add_job( |
There was a problem hiding this comment.
maybe we can split this file into some more modular utils? It's a bit too large currently, could probably benefit from separating some logic if possible
There was a problem hiding this comment.
but we can keep it as is if there is no natural split
There was a problem hiding this comment.
We definitely should! Perhaps the pipeline class itself could be its own file.
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (3)
nemo_skills/pipeline/utils/declarative.py (1)
159-159: Use typing.Any instead of built-in any.The type annotation should use
typing.Anyinstead of the built-inany.Apply this diff:
- metadata: Dict[str, any] = field(default_factory=dict) # Stores metadata from command builders + metadata: Dict[str, Any] = field(default_factory=dict) # Stores metadata from command buildersAnd add
Anyto the imports at line 112:-from typing import Callable, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, UnionBased on past review comments, this issue was previously identified.
nemo_skills/pipeline/generate.py (2)
40-132: Restore keep_mounts_for_sandbox functionality.The CLI parameter
keep_mounts_for_sandbox(lines 230-233) is exposed but never used. The new_create_commandgroup_from_configfunction doesn't accept or propagate this flag, so sandbox containers always run with empty mounts regardless of the user's setting. This is a functional regression from the previous implementation.Based on past review comments, this issue was previously flagged but appears unresolved in the current code.
412-452: Fix dependent job chain to reference correct job names.The dependency chain is broken for
dependent_jobs > 0. Whendep_idx=1, line 442 creates a dependency onf"{task_name}-dep0", but the first job (whendep_idx=0) is named justtask_name(line 430). This causes the Pipeline to treat the missing name as an external experiment, and dependency resolution fails.Apply this diff to track and use the previous job name:
+ previous_internal_job_name = None for dep_idx in range(dependent_jobs + 1): # Create CommandGroup for this task cmd_group = _create_commandgroup_from_config( generation_cmd=cmd, server_config=server_config.copy() if server_config else None, with_sandbox=with_sandbox, sandbox_port=None if get_random_port else 6000, cluster_config=cluster_config, installation_command=installation_command, get_server_command_fn=generation_task.get_server_command_fn(), partition=partition, time_min=time_min, exclusive=exclusive, task_name=task_name, log_dir=log_dir, ) # Use unique internal job name for dependency tracking, but same task_name internal_job_name = f"{task_name}-dep{dep_idx}" if dep_idx > 0 else task_name # Build dependencies: first job in chain gets external dependencies, rest chain to previous if dep_idx == 0: # First job: add run_after if no task_dependencies job_deps = dependencies.copy() if dependencies else [] if not dependencies and run_after: run_after_list = run_after if isinstance(run_after, list) else [run_after] job_deps.extend(run_after_list) job_deps = job_deps if job_deps else None else: # Subsequent jobs in chain depend on previous job - job_deps = [f"{task_name}-dep{dep_idx - 1}"] + job_deps = [previous_internal_job_name] jobs.append( { "name": internal_job_name, "group": cmd_group, "dependencies": job_deps, } ) all_job_names.append(internal_job_name) + previous_internal_job_name = internal_job_nameBased on past review comments, this critical bug was previously identified but remains unfixed.
🧹 Nitpick comments (2)
nemo_skills/pipeline/utils/commands.py (2)
28-74: Remove unused**kwargsparameter.The
**kwargsparameter is not used in the function body and should be removed for clarity.Apply this diff:
def vllm_server_command( cluster_config: Dict, model: str, port: Optional[int] = None, server_type: str = "vllm", gpus: int = 8, nodes: int = 1, args: str = "", entrypoint: Optional[str] = None, - **kwargs, ) -> Tuple[str, Dict]:
77-111: Improve PYTHONPATH parsing robustness.The PYTHONPATH extraction at lines 94-98 has two issues:
- The hardcoded offset
[11:]at line 96 is fragile and assumes"PYTHONPATH="is exactly 11 characters.- The
**kwargsparameter at line 77 is unused.Apply this diff:
-def sandbox_command(cluster_config: Dict, port: Optional[int] = None, **kwargs) -> Tuple[str, Dict]: +def sandbox_command(cluster_config: Dict, port: Optional[int] = None) -> Tuple[str, Dict]: """Build sandbox command. Args: cluster_config: Cluster configuration dictionary port: Port to use for sandbox Returns: Tuple of (command_string, metadata_dict) """ if port is None: port = get_free_port(strategy="random") cmd = get_sandbox_command(cluster_config) # Build PYTHONPATH from cluster config pythonpath_env = {} for env_var in cluster_config.get("env_vars", []): if "PYTHONPATH" in env_var: - pythonpath = env_var[11:] if env_var.startswith("PYTHONPATH=") else env_var + if "=" in env_var: + _, pythonpath = env_var.split("=", 1) + else: + pythonpath = env_var pythonpath_env["PYTHONPATH"] = pythonpath + ":/app" break
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
nemo_skills/pipeline/generate.py(3 hunks)nemo_skills/pipeline/utils/commands.py(1 hunks)nemo_skills/pipeline/utils/declarative.py(1 hunks)tests/test_declarative_pipeline.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
nemo_skills/pipeline/generate.py (5)
nemo_skills/dataset/utils.py (1)
import_from_path(60-67)nemo_skills/pipeline/utils/commands.py (1)
sandbox_command(77-111)nemo_skills/pipeline/utils/declarative.py (5)
Command(135-229)CommandGroup(243-256)HardwareConfig(233-240)Pipeline(259-705)run(301-444)nemo_skills/pipeline/utils/generation.py (2)
configure_client(305-365)get_generation_cmd(178-280)nemo_skills/pipeline/utils/server.py (1)
wrap_python_path(64-65)
tests/test_declarative_pipeline.py (1)
nemo_skills/pipeline/utils/declarative.py (8)
Command(135-229)CommandGroup(243-256)HardwareConfig(233-240)Pipeline(259-705)prepare_for_execution(183-226)meta_ref(174-181)hostname_ref(167-172)run(301-444)
nemo_skills/pipeline/utils/commands.py (2)
nemo_skills/pipeline/utils/exp.py (1)
get_sandbox_command(105-108)nemo_skills/pipeline/utils/server.py (2)
get_free_port(41-57)get_server_command(112-210)
nemo_skills/pipeline/utils/declarative.py (6)
nemo_skills/pipeline/utils/cluster.py (4)
get_env_variables(108-200)get_tunnel(357-362)temporary_env_update(204-213)tunnel_hash(351-354)nemo_skills/pipeline/utils/exp.py (4)
get_executor(154-282)get_exp(649-665)get_exp_handles(57-102)run_exp(615-646)nemo_skills/pipeline/utils/commands.py (1)
wrap_command(114-139)nemo_skills/pipeline/utils/mounts.py (1)
is_mounted_filepath(27-46)nemo_skills/pipeline/utils/packager.py (1)
get_registered_external_repo(64-76)nemo_skills/utils.py (1)
get_logger_name(130-134)
🪛 Ruff (0.13.3)
tests/test_declarative_pipeline.py
74-74: Unpacked variable exec_config is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
271-271: Unused method argument: mock_run_exp
(ARG002)
301-301: Unused method argument: mock_run_exp
(ARG002)
nemo_skills/pipeline/utils/commands.py
37-37: Unused function argument: kwargs
(ARG001)
77-77: Unused function argument: kwargs
(ARG001)
nemo_skills/pipeline/utils/declarative.py
177-180: Avoid specifying long messages outside the exception class
(TRY003)
183-183: Unused method argument: cluster_config
(ARG002)
288-288: Avoid specifying long messages outside the exception class
(TRY003)
297-297: Avoid specifying long messages outside the exception class
(TRY003)
322-326: Avoid specifying long messages outside the exception class
(TRY003)
328-328: Avoid specifying long messages outside the exception class
(TRY003)
422-422: Avoid specifying long messages outside the exception class
(TRY003)
526-526: Avoid specifying long messages outside the exception class
(TRY003)
535-535: Loop control variable het_idx not used within loop body
(B007)
614-614: Do not catch blind exception: Exception
(BLE001)
617-617: Do not catch blind exception: Exception
(BLE001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: unit-tests
- GitHub Check: pre-commit
🔇 Additional comments (5)
nemo_skills/pipeline/utils/commands.py (1)
114-138: LGTM!The function correctly wraps commands with environment variables and working directory setup.
tests/test_declarative_pipeline.py (1)
1-579: LGTM! Comprehensive test coverage.The test suite thoroughly covers the declarative pipeline system including:
- Command construction and metadata handling
- CommandGroup and HardwareConfig integration
- Pipeline execution with dependencies and het_group_index management
- Error handling for missing configurations
- HF_HOME validation
The static analysis warnings about unused parameters at lines 74, 271, and 301 are false positives - these mock parameters are required for proper test setup even when not directly referenced in the test body.
nemo_skills/pipeline/utils/declarative.py (3)
232-256: LGTM!The
HardwareConfigandCommandGroupclasses are well-designed and provide clear abstractions for resource configuration and command grouping.
259-444: LGTM! Robust dependency resolution.The Pipeline class correctly handles multiple dependency scenarios:
- Internal pipeline dependencies (job name references)
- External experiment dependencies (via
get_exp_handlesfor SLURM)- Direct task handle references
- Pipeline-level
run_afterapplied to jobs without explicit dependenciesThe broad exception catches at lines 614 and 617 (flagged by static analysis) are acceptable here as they provide fallback behavior when loading experiments for code reuse.
446-705: LGTM! Well-structured execution planning.The internal methods correctly handle:
- Command preparation with mpirun wrapping for non-SLURM executors
- Container resolution from cluster config
- Executor creation with environment updates
- Code reuse across experiments
- Heterogeneous job planning with shared environment variables
- Per-job
het_group_indexassignment (not global)The static analysis warning about unused
het_idxat line 535 is a false positive - the variable is used later at line 559 to assigncommand.het_group_index.
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (3)
tests/test_declarative_pipeline.py (3)
74-76: Unused variable in test.The
exec_configvariable is unpacked but never used in assertions. This is flagged by static analysis.Apply this diff to use underscore for unused values:
- final_cmd, exec_config = cmd.prepare_for_execution(cluster_config) + final_cmd, _ = cmd.prepare_for_execution(cluster_config) assert final_cmd == "echo test"
270-270: Unused mock parameter.The
mock_run_expparameter is passed to the mock decorator but never used in the test. This is flagged by static analysis.Apply this diff to prefix unused mocks:
- def test_pipeline_run_basic(self, mock_run_exp, mock_env_vars, mock_get_exp): + def test_pipeline_run_basic(self, _mock_run_exp, mock_env_vars, mock_get_exp):
298-298: Unused mock parameter.The
mock_run_expparameter is passed to the mock decorator but never used in the test. This is flagged by static analysis.Apply this diff to prefix unused mocks:
- def test_pipeline_run_with_dependencies(self, mock_run_exp, mock_env_vars, mock_get_exp): + def test_pipeline_run_with_dependencies(self, _mock_run_exp, mock_env_vars, mock_get_exp):
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
tests/test_declarative_pipeline.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
tests/test_declarative_pipeline.py (1)
nemo_skills/pipeline/utils/declarative.py (8)
Command(135-229)CommandGroup(243-256)HardwareConfig(233-240)Pipeline(259-705)prepare_for_execution(183-226)meta_ref(174-181)hostname_ref(167-172)run(301-444)
🪛 Ruff (0.13.3)
tests/test_declarative_pipeline.py
74-74: Unpacked variable exec_config is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
270-270: Unused method argument: mock_run_exp
(ARG002)
298-298: Unused method argument: mock_run_exp
(ARG002)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: unit-tests
- GitHub Check: pre-commit
🔇 Additional comments (6)
tests/test_declarative_pipeline.py (6)
147-178: LGTM!The CommandGroup tests provide good coverage of basic functionality, hardware configuration, and log directory handling.
180-262: LGTM!The Pipeline initialization tests comprehensively cover legacy mode, jobs parameter, validation rules, run_after configuration, and cluster config handling.
334-386: LGTM!The HF_HOME validation tests thoroughly cover the different error scenarios (missing HF_HOME, non-mounted paths) and successful validation cases.
388-489: LGTM!The het_group_index tests comprehensively validate the behavior in both homogeneous and heterogeneous scenarios. The test at lines 452-488 is particularly important as it confirms that het_group_index is per-job rather than global across the pipeline.
491-540: LGTM!The dependency resolution tests validate critical scenarios: explicit None dependency handling and pipeline-level run_after application. These tests ensure the dependency system behaves correctly in both edge cases and typical usage patterns.
542-564: LGTM!The error handling tests validate important failure scenarios with clear error messages, ensuring users receive helpful feedback when configuration is invalid.
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
nemo_skills/pipeline/utils/declarative.py (1)
112-159: Usetyping.Anyin metadata annotation.The annotation still uses the built-in
any; importAnyand update the type.-from typing import Callable, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union @@ - metadata: Dict[str, any] = field(default_factory=dict) # Stores metadata from command builders + metadata: Dict[str, Any] = field(default_factory=dict) # Stores metadata from command builders
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
nemo_skills/pipeline/utils/declarative.py(1 hunks)tests/test_declarative_pipeline.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
tests/test_declarative_pipeline.py (1)
nemo_skills/pipeline/utils/declarative.py (8)
Command(135-229)CommandGroup(243-256)HardwareConfig(233-240)Pipeline(259-703)prepare_for_execution(183-226)meta_ref(174-181)hostname_ref(167-172)run(299-442)
nemo_skills/pipeline/utils/declarative.py (6)
nemo_skills/pipeline/utils/cluster.py (4)
get_env_variables(108-200)get_tunnel(357-362)temporary_env_update(204-213)tunnel_hash(351-354)nemo_skills/pipeline/utils/exp.py (5)
get_executor(154-282)get_exp(649-665)get_exp_handles(57-102)run_exp(615-646)install_packages_wrap(285-325)nemo_skills/pipeline/utils/commands.py (1)
wrap_command(114-139)nemo_skills/pipeline/utils/mounts.py (1)
is_mounted_filepath(27-46)nemo_skills/pipeline/utils/packager.py (1)
get_registered_external_repo(64-76)nemo_skills/utils.py (1)
get_logger_name(130-134)
🪛 Ruff (0.13.3)
tests/test_declarative_pipeline.py
74-74: Unpacked variable exec_config is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
268-268: Unused method argument: mock_run_exp
(ARG002)
296-296: Unused method argument: mock_run_exp
(ARG002)
nemo_skills/pipeline/utils/declarative.py
177-180: Avoid specifying long messages outside the exception class
(TRY003)
183-183: Unused method argument: cluster_config
(ARG002)
288-288: Avoid specifying long messages outside the exception class
(TRY003)
295-295: Avoid specifying long messages outside the exception class
(TRY003)
320-324: Avoid specifying long messages outside the exception class
(TRY003)
326-326: Avoid specifying long messages outside the exception class
(TRY003)
420-420: Avoid specifying long messages outside the exception class
(TRY003)
524-524: Avoid specifying long messages outside the exception class
(TRY003)
533-533: Loop control variable het_idx not used within loop body
(B007)
612-612: Do not catch blind exception: Exception
(BLE001)
615-615: Do not catch blind exception: Exception
(BLE001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: pre-commit
- GitHub Check: unit-tests
| # 1. Evaluate if callable (for cross-component references like hostname_ref) | ||
| if callable(self.command): | ||
| result = self.command() | ||
|
|
||
| if isinstance(result, tuple): | ||
| final_command, runtime_metadata = result | ||
| # Deep merge metadata, especially environment dict | ||
| for key, value in runtime_metadata.items(): | ||
| if key == "environment" and key in self.metadata: | ||
| # Merge environment dicts instead of replacing | ||
| self.metadata[key].update(value) | ||
| else: | ||
| self.metadata[key] = value | ||
| else: | ||
| final_command = result | ||
| else: | ||
| final_command = self.command | ||
|
|
||
| # 2. Wrap with installation_command if provided | ||
| if self.installation_command: | ||
| final_command = install_packages_wrap(final_command, self.installation_command) | ||
|
|
There was a problem hiding this comment.
Restore env/working_dir wrapping for callable commands.
When command is a lambda, we never reapply env_vars/working_dir; the lambda result runs without the expected exports or cd, breaking every callable that relies on these settings. Wrap callable results the same way as string commands (without double-wrapping strings) so existing usages keep working.
def prepare_for_execution(self, cluster_config: Dict) -> Tuple[str, Dict]:
"""Prepare command for execution.
@@
- if callable(self.command):
+ was_callable = callable(self.command)
+ if was_callable:
result = self.command()
@@
else:
final_command = self.command
+
+ if was_callable and isinstance(final_command, str) and (self.env_vars or self.working_dir):
+ final_command = wrap_command(final_command, self.working_dir, self.env_vars)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # 1. Evaluate if callable (for cross-component references like hostname_ref) | |
| if callable(self.command): | |
| result = self.command() | |
| if isinstance(result, tuple): | |
| final_command, runtime_metadata = result | |
| # Deep merge metadata, especially environment dict | |
| for key, value in runtime_metadata.items(): | |
| if key == "environment" and key in self.metadata: | |
| # Merge environment dicts instead of replacing | |
| self.metadata[key].update(value) | |
| else: | |
| self.metadata[key] = value | |
| else: | |
| final_command = result | |
| else: | |
| final_command = self.command | |
| # 2. Wrap with installation_command if provided | |
| if self.installation_command: | |
| final_command = install_packages_wrap(final_command, self.installation_command) | |
| def prepare_for_execution(self, cluster_config: Dict) -> Tuple[str, Dict]: | |
| """Prepare command for execution.""" | |
| # 1. Evaluate if callable (for cross-component references like hostname_ref) | |
| was_callable = callable(self.command) | |
| if was_callable: | |
| result = self.command() | |
| if isinstance(result, tuple): | |
| final_command, runtime_metadata = result | |
| # Deep merge metadata, especially environment dict | |
| for key, value in runtime_metadata.items(): | |
| if key == "environment" and key in self.metadata: | |
| # Merge environment dicts instead of replacing | |
| self.metadata[key].update(value) | |
| else: | |
| self.metadata[key] = value | |
| else: | |
| final_command = result | |
| else: | |
| final_command = self.command | |
| # 1a. Wrap callable results with env_vars/working_dir | |
| if was_callable and isinstance(final_command, str) and (self.env_vars or self.working_dir): | |
| final_command = wrap_command(final_command, self.working_dir, self.env_vars) | |
| # 2. Wrap with installation_command if provided | |
| if self.installation_command: | |
| final_command = install_packages_wrap(final_command, self.installation_command) | |
| # ... rest of method ... |
| shared_env_vars: Dict[str, str] = {} | ||
| if heterogeneous: | ||
| for het_idx, group in enumerate(groups): | ||
| for command in group.commands: | ||
| _, exec_config_probe = command.prepare_for_execution(cluster_config) | ||
| shared_env_vars.update(exec_config_probe.get("environment", {})) | ||
|
|
||
| # Share packager across executors for efficiency (single-group only) | ||
| shared_packager = None | ||
|
|
||
| # Build commands and executors | ||
| for het_idx, group in enumerate(groups): | ||
| has_multiple_components = len(group.commands) > 1 | ||
| total_het_groups = ( | ||
| len(groups) if heterogeneous else (len(group.commands) if has_multiple_components else 1) | ||
| ) | ||
|
|
||
| # For single-group jobs with multiple components, allow job-level GPU override for sbatch allocation | ||
| job_level_gpus = ( | ||
| group.hardware.num_gpus if (not heterogeneous and has_multiple_components and group.hardware) else None | ||
| ) | ||
|
|
||
| for comp_idx, command in enumerate(group.commands): | ||
| # Assign het_group_index ONLY for heterogeneous jobs (per-job, not global) | ||
| # Non-heterogeneous jobs use localhost, so het_group_index should remain None | ||
| if heterogeneous: | ||
| command.het_group_index = het_idx | ||
| else: | ||
| command.het_group_index = None | ||
|
|
||
| final_cmd, exec_config = self._prepare_command(command, cluster_config) | ||
| commands.append(final_cmd) | ||
|
|
||
| # Adjust GPU allocation (first component gets job-level GPUs for sbatch) for single-group jobs | ||
| exec_config["num_gpus"] = exec_config["num_gpus"] or 0 | ||
| if (not heterogeneous) and (comp_idx == 0) and (job_level_gpus is not None): | ||
| exec_config["num_gpus"] = job_level_gpus | ||
|
|
||
| # Merge shared environment for heterogeneous jobs | ||
| if heterogeneous and shared_env_vars: | ||
| exec_config["environment"].update(shared_env_vars) | ||
|
|
There was a problem hiding this comment.
Fix shared env propagation for heterogeneous jobs.
We probe commands before assigning het_group_index, so cross-component refs fall back to 127.0.0.1. Later, exec_config["environment"].update(shared_env_vars) overwrites the correct values produced after het_group_index is set, so every heter job ends up exporting the localhost fallback. Assign the het index before probing and merge shared env vars without clobbering per-command values.
shared_env_vars: Dict[str, str] = {}
if heterogeneous:
for het_idx, group in enumerate(groups):
for command in group.commands:
- _, exec_config_probe = command.prepare_for_execution(cluster_config)
- shared_env_vars.update(exec_config_probe.get("environment", {}))
+ command.het_group_index = het_idx
+ _, exec_config_probe = command.prepare_for_execution(cluster_config)
+ for key, value in exec_config_probe.get("environment", {}).items():
+ shared_env_vars.setdefault(key, value)
@@
# Merge shared environment for heterogeneous jobs
if heterogeneous and shared_env_vars:
- exec_config["environment"].update(shared_env_vars)
+ for key, value in shared_env_vars.items():
+ exec_config["environment"].setdefault(key, value)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| shared_env_vars: Dict[str, str] = {} | |
| if heterogeneous: | |
| for het_idx, group in enumerate(groups): | |
| for command in group.commands: | |
| _, exec_config_probe = command.prepare_for_execution(cluster_config) | |
| shared_env_vars.update(exec_config_probe.get("environment", {})) | |
| # Share packager across executors for efficiency (single-group only) | |
| shared_packager = None | |
| # Build commands and executors | |
| for het_idx, group in enumerate(groups): | |
| has_multiple_components = len(group.commands) > 1 | |
| total_het_groups = ( | |
| len(groups) if heterogeneous else (len(group.commands) if has_multiple_components else 1) | |
| ) | |
| # For single-group jobs with multiple components, allow job-level GPU override for sbatch allocation | |
| job_level_gpus = ( | |
| group.hardware.num_gpus if (not heterogeneous and has_multiple_components and group.hardware) else None | |
| ) | |
| for comp_idx, command in enumerate(group.commands): | |
| # Assign het_group_index ONLY for heterogeneous jobs (per-job, not global) | |
| # Non-heterogeneous jobs use localhost, so het_group_index should remain None | |
| if heterogeneous: | |
| command.het_group_index = het_idx | |
| else: | |
| command.het_group_index = None | |
| final_cmd, exec_config = self._prepare_command(command, cluster_config) | |
| commands.append(final_cmd) | |
| # Adjust GPU allocation (first component gets job-level GPUs for sbatch) for single-group jobs | |
| exec_config["num_gpus"] = exec_config["num_gpus"] or 0 | |
| if (not heterogeneous) and (comp_idx == 0) and (job_level_gpus is not None): | |
| exec_config["num_gpus"] = job_level_gpus | |
| # Merge shared environment for heterogeneous jobs | |
| if heterogeneous and shared_env_vars: | |
| exec_config["environment"].update(shared_env_vars) | |
| shared_env_vars: Dict[str, str] = {} | |
| if heterogeneous: | |
| for het_idx, group in enumerate(groups): | |
| for command in group.commands: | |
| # Ensure probes use the correct het_group_index | |
| command.het_group_index = het_idx | |
| _, exec_config_probe = command.prepare_for_execution(cluster_config) | |
| # Collect shared env vars without overwriting earlier groups | |
| for key, value in exec_config_probe.get("environment", {}).items(): | |
| shared_env_vars.setdefault(key, value) | |
| # Share packager across executors for efficiency (single-group only) | |
| shared_packager = None | |
| # Build commands and executors | |
| for het_idx, group in enumerate(groups): | |
| has_multiple_components = len(group.commands) > 1 | |
| total_het_groups = ( | |
| len(groups) if heterogeneous else (len(group.commands) if has_multiple_components else 1) | |
| ) | |
| # For single-group jobs with multiple components, allow job-level GPU override for sbatch allocation | |
| job_level_gpus = ( | |
| group.hardware.num_gpus if (not heterogeneous and has_multiple_components and group.hardware) else None | |
| ) | |
| for comp_idx, command in enumerate(group.commands): | |
| # Assign het_group_index ONLY for heterogeneous jobs | |
| if heterogeneous: | |
| command.het_group_index = het_idx | |
| else: | |
| command.het_group_index = None | |
| final_cmd, exec_config = self._prepare_command(command, cluster_config) | |
| commands.append(final_cmd) | |
| # Adjust GPU allocation (first component gets job-level GPUs for sbatch) | |
| exec_config["num_gpus"] = exec_config["num_gpus"] or 0 | |
| if not heterogeneous and comp_idx == 0 and job_level_gpus is not None: | |
| exec_config["num_gpus"] = job_level_gpus | |
| # Merge shared environment for heterogeneous jobs without clobbering per-command vars | |
| if heterogeneous and shared_env_vars: | |
| for key, value in shared_env_vars.items(): | |
| exec_config["environment"].setdefault(key, value) |
🧰 Tools
🪛 Ruff (0.13.3)
533-533: Loop control variable het_idx not used within loop body
(B007)
activatedgeek
left a comment
There was a problem hiding this comment.
Looking really good! I like the general interface. Just some suggestions for how we can plan for a transition, and slowly phase out old code.
| if port is None: | ||
| port = get_free_port(strategy="random") | ||
|
|
||
| cmd, num_tasks = get_server_command( |
There was a problem hiding this comment.
While we are at this, can we now isolate all logic in get_server_command into this function. In the other function, can we throw a warning,
import warnings
warnings.warn("deprecation message", DeprecationWarning)
In get_server_command, we can then just return the cmd from this function.
I think we'll need to do this to slowly transition away from old designed functions, and keep track on things that need to be updated or removed.
| if port is None: | ||
| port = get_free_port(strategy="random") | ||
|
|
||
| cmd = get_sandbox_command(cluster_config) |
There was a problem hiding this comment.
I suggest the same deprecation warning here. Whenever we get a chance next, we can then search for the deprecation warnings, and migrate away from those methods.
| def __post_init__(self): | ||
| # Wrap plain strings with environment setup | ||
| if isinstance(self.command, str) and (self.env_vars or self.working_dir): | ||
| self.command = wrap_command(self.command, self.working_dir, self.env_vars) |
There was a problem hiding this comment.
Just a stylistic choice, but I will suggest methods like wrap_command be co-located with the Command data class, since they mostly won't have any other utility.
If they do end up with a utility outside, we can always make them a @staticmethod.
|
|
||
| # 2. Wrap with installation_command if provided | ||
| if self.installation_command: | ||
| final_command = install_packages_wrap(final_command, self.installation_command) |
There was a problem hiding this comment.
Here also the same stylistic suggestion for co-locating install_packages_wrap.
| log_dir: Optional[str] = None, | ||
| ): | ||
| self.commands = commands | ||
| self.hardware = hardware or HardwareConfig() |
There was a problem hiding this comment.
How does this hardware config interact with the hardware-related arguments in Command?
| reuse_code_exp: Optional[str] = None, | ||
| skip_hf_home_check: bool = False, | ||
| with_ray: bool = False, | ||
| run_after: Optional[Union[str, List[str]]] = None, # Pipeline-level dependency on other experiments |
There was a problem hiding this comment.
In this code's context, just confirming by experiments you mean other "pipelines"?
There was a problem hiding this comment.
I'll look into this--an experiment in this context is a nemo-run experiment handle, so you could build a pipeline as a dependency of of a job not run through the Pipeline construction. I'll try to put together a convenient way to get the experiment handle of a pipeline so you can also easily make dependent pipelines.
| slurm_kwargs={"exclusive": hardware.exclusive} if (hardware and hardware.exclusive) else None, | ||
| ) | ||
|
|
||
| def _plan_and_add_job( |
There was a problem hiding this comment.
We definitely should! Perhaps the pipeline class itself could be its own file.
| reuse_exp = None | ||
| if reuse_exp is not None: | ||
| LOG.info(f"Trying to reuse code from experiment {reuse_exp._title}") | ||
| reuse_key = get_packaging_job_key(reuse_exp._id, "nemo-run") |
There was a problem hiding this comment.
Again functions like these can perhaps we isolated along with the pipeline since they likely have no other utility.
There was a problem hiding this comment.
Are we confident enough in our tests related to generate?
If our test coverage is low, I will strong recommend that we keep the old generate.py as generate_legacy.py so that people getting the new version automatically use the new command (with warnings of course). So in case, it does cause any bugs, they can always have an escape hatch for this transition phase.
There was a problem hiding this comment.
We should run a few more tests, but I would probably just go ahead and switch to the new interface directly. As long as there are no silent issues (so it doesn't corrupt generations, but just throws an error), it should be easy to fix, and we will quickly resolve all problems. If we find that there are some really tricky things that we didn't handle, we can always just roll this back or add generate_legacy.py whenever we face this situation
Signed-off-by: George Armstrong <georgea@nvidia.com> WIP add more declarative syntax Signed-off-by: George Armstrong <georgea@nvidia.com> add crossgroup sandbox Signed-off-by: George Armstrong <georgea@nvidia.com> WIP make cross group built in Signed-off-by: George Armstrong <georgea@nvidia.com> add run_cmd Signed-off-by: George Armstrong <georgea@nvidia.com> small fixes Signed-off-by: George Armstrong <georgea@nvidia.com> WIP add variable definition and getting utilities Signed-off-by: George Armstrong <georgea@nvidia.com> get het group working again Signed-off-by: George Armstrong <georgea@nvidia.com> MAINT add simple example WIP fixing hetgroup -- better but host is broken Signed-off-by: George Armstrong <georgea@nvidia.com> multi hetgroup example Signed-off-by: George Armstrong <georgea@nvidia.com> cleanup Signed-off-by: George Armstrong <georgea@nvidia.com> FIX het group cross references Signed-off-by: George Armstrong <georgea@nvidia.com> MAINT simplify declarative Signed-off-by: George Armstrong <georgea@nvidia.com> Maint remove stuff MAINT more simplification and improvements Signed-off-by: George Armstrong <georgea@nvidia.com> MAINT make dependencies work Signed-off-by: George Armstrong <georgea@nvidia.com> make example simplistic but functional Signed-off-by: George Armstrong <georgea@nvidia.com> MAINT remove v2 examples WIP working send to remote Signed-off-by: George Armstrong <georgea@nvidia.com> add commands Signed-off-by: George Armstrong <georgea@nvidia.com> MAINT renaming and additional docs Signed-off-by: George Armstrong <georgea@nvidia.com> MAINT remove examples MAINT remove unused files MAINT fix the dep suffix Signed-off-by: George Armstrong <georgea@nvidia.com> FIXED timing issue on creation Signed-off-by: George Armstrong <georgea@nvidia.com> resolve more differences Signed-off-by: George Armstrong <georgea@nvidia.com> MAINT revert generate.py to refactored? Signed-off-by: George Armstrong <georgea@nvidia.com> MAINT respect log dir Signed-off-by: George Armstrong <georgea@nvidia.com> MAINT cleanup and FIX run_after Signed-off-by: George Armstrong <georgea@nvidia.com> MAINT rename stage back to pipeline MAINT rename stage back to pipeline FIX log dir and output dir setting Signed-off-by: George Armstrong <georgea@nvidia.com> MAINT simplify declarative Signed-off-by: George Armstrong <georgea@nvidia.com> MAINT remvoe extra abstraction leak move imports to top MAINT support using single group job if groups is only one item Signed-off-by: George Armstrong <georgea@nvidia.com> simplify unneeded path Signed-off-by: George Armstrong <georgea@nvidia.com> remove special sandbox handling refactor pipeline Signed-off-by: George Armstrong <georgea@nvidia.com> fix pipeline Signed-off-by: George Armstrong <georgea@nvidia.com> remove extra generate MAINT move import Signed-off-by: George Armstrong <georgea@nvidia.com>
Signed-off-by: George Armstrong <georgea@nvidia.com>
Signed-off-by: George Armstrong <georgea@nvidia.com>
Signed-off-by: George Armstrong <georgea@nvidia.com>
Signed-off-by: George Armstrong <georgea@nvidia.com>
Signed-off-by: George Armstrong <georgea@nvidia.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Signed-off-by: George Armstrong <georgea@nvidia.com>
Signed-off-by: George Armstrong <georgea@nvidia.com>
Signed-off-by: George Armstrong <georgea@nvidia.com>
Signed-off-by: George Armstrong <georgea@nvidia.com>
Signed-off-by: George Armstrong <georgea@nvidia.com>
Signed-off-by: George Armstrong <georgea@nvidia.com>
Signed-off-by: George Armstrong <georgea@nvidia.com>
Signed-off-by: George Armstrong <georgea@nvidia.com>
Signed-off-by: George Armstrong <georgea@nvidia.com>
Signed-off-by: George Armstrong <georgea@nvidia.com>
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (1)
tests/test_mcp_clients.py (1)
208-224: LGTM with a minor comment clarity suggestion.The implementation correctly uses a class variable to track
list_tools()invocations across all instances, which is appropriate for testingToolManagercaching behavior. The class methods provide proper encapsulation.Minor note: The comment on line 211 states "track calls across instances" but the class variable tracks all invocations globally, including multiple calls on the same instance. Consider clarifying to "track total calls across all instances and invocations."
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
tests/test_mcp_clients.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
tests/test_mcp_clients.py (2)
nemo_skills/mcp/tool_manager.py (3)
list_tools(37-38)ToolManager(50-150)list_all_tools(97-137)nemo_skills/mcp/tool_providers.py (1)
list_tools(106-107)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: unit-tests
🔇 Additional comments (2)
tests/test_mcp_clients.py (2)
227-232: LGTM!The implementation correctly returns duplicate tool names within the same provider to exercise
ToolManager's duplicate detection logic.
248-249: LGTM! Consider test isolation for parallel execution.The test correctly uses
reset_count()to initialize the counter andget_count()to verify caching behavior. The logic properly tests that:
- Two consecutive calls with
use_cache=Trueinvokelist_tools()only once (line 254)- A subsequent call with
use_cache=Falseinvokes it again, bringing the total to 2 (line 258)Minor consideration: If tests run in parallel (e.g., with
pytest-xdist), the shared class variable could cause race conditions. However, this appears to be the only test usingCountingTool, so isolation should be fine in practice.Also applies to: 254-254, 258-258
Signed-off-by: George Armstrong <georgea@nvidia.com>
…in' of github.com:NVIDIA/NeMo-Skills into georgea/refactor-task-construction-refactor-generate-main
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (1)
tests/test_mcp_clients.py (1)
256-257: Good test isolation practice.Resetting the counter before the test ensures proper isolation with module-level state.
For consistency, consider using a pytest fixture to handle state reset if more tests use this pattern:
@pytest.fixture def reset_counting_tool(): _counting_tool_state["call_count"] = 0 yieldThen use
@pytest.mark.usefixtures("reset_counting_tool")on the test.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
tests/test_mcp_clients.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
tests/test_mcp_clients.py (1)
nemo_skills/mcp/tool_manager.py (3)
list_tools(51-52)ToolManager(64-164)list_all_tools(111-151)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: unit-tests
🔇 Additional comments (4)
tests/test_mcp_clients.py (4)
222-224: Well-documented shared state for dynamic instantiation.The module-level mutable state is necessary because
ToolManageruseslocate()to dynamically instantiate tool classes, and all instances must share the same counter. The comment clearly explains this design choice.
227-232: LGTM!The
CountingToolclass correctly trackslist_toolsinvocations via the shared state and is properly placed at module level forlocate()compatibility.
235-239: LGTM!The
DupToolclass correctly generates duplicate tool names to exercise the deduplication logic inToolManager.list_all_tools().
259-272: LGTM!The test correctly verifies:
- Caching behavior:
list_all_tools(use_cache=True)called twice only invokeslist_tools()once (line 262)- Cache bypass:
list_all_tools(use_cache=False)invokeslist_tools()again (line 266)- Duplicate detection:
DupToolreturns duplicates but only unique names appear in results (line 271)The refactor successfully maintains the original test behavior while making the helper classes reusable via
locate().
Signed-off-by: George Armstrong <georgea@nvidia.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
tests/test_mcp_clients.py (1)
264-264: Replacegetattrwith direct attribute access.Using
getattrwith constant attribute names provides no benefit over direct attribute access and reduces readability.Apply this diff:
- CountingToolClass = getattr(this_module, "CountingTool") + CountingToolClass = this_module.CountingTool- imported_class = getattr(imported_module, "CountingTool") + imported_class = imported_module.CountingToolAs per coding guidelines.
Also applies to: 269-269
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
tests/test_mcp_clients.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
tests/test_mcp_clients.py (1)
nemo_skills/mcp/tool_manager.py (3)
list_tools(51-52)ToolManager(64-164)list_all_tools(111-151)
🪛 Ruff (0.13.3)
tests/test_mcp_clients.py
264-264: Do not call getattr with a constant attribute value. It is not any safer than normal property access.
Replace getattr with attribute access
(B009)
269-269: Do not call getattr with a constant attribute value. It is not any safer than normal property access.
Replace getattr with attribute access
(B009)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: pre-commit
- GitHub Check: unit-tests
🔇 Additional comments (1)
tests/test_mcp_clients.py (1)
222-243: LGTM!The module-level helper classes are correctly implemented.
CountingTooluses a class variable to track invocations across instances, andDupToolreturns duplicates to exercise detection logic. Module-level placement is necessary for ToolManager'slocate()to find these classes via module specs.
Signed-off-by: George Armstrong <georgea@nvidia.com>
Signed-off-by: George Armstrong <georgea@nvidia.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
tests/test_generation.py (1)
156-191: Consider using tmp_path fixture for log_dir.The test correctly validates metadata creation from the server command. However, the hardcoded
"/tmp/logs"at line 185 is flagged by static analysis. While this doesn't affect test correctness, using thetmp_pathfixture would be better practice.Apply this diff:
-def test_server_metadata_from_num_tasks(): +def test_server_metadata_from_num_tasks(tmp_path): """Test that metadata dict is properly created from server command returning (cmd, num_tasks).""" mock_server_fn = MagicMock(return_value=("python server.py", 4)) cluster_config = { "containers": {"vllm": "nvcr.io/nvidia/nemo:vllm", "nemo-skills": "nvcr.io/nvidia/nemo:skills"}, "executor": "slurm", } server_config = { "server_type": "vllm", "num_gpus": 8, "num_nodes": 1, "model_path": "/models/test", "server_port": 5000, } cmd_group = _create_commandgroup_from_config( generation_cmd="python generate.py", server_config=server_config, with_sandbox=False, sandbox_port=None, cluster_config=cluster_config, installation_command=None, get_server_command_fn=mock_server_fn, partition=None, qos=None, time_min=None, exclusive=False, keep_mounts_for_sandbox=False, task_name="test-task", - log_dir="/tmp/logs", + log_dir=str(tmp_path / "logs"), )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
nemo_skills/pipeline/generate.py(3 hunks)tests/test_generation.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
tests/test_generation.py (2)
nemo_skills/pipeline/generate.py (2)
generate(139-476)_create_commandgroup_from_config(40-134)nemo_skills/inference/generate.py (3)
generate(675-702)generate(710-715)get_server_command_fn(253-263)
nemo_skills/pipeline/generate.py (6)
nemo_skills/dataset/utils.py (1)
import_from_path(60-67)nemo_skills/pipeline/utils/commands.py (1)
sandbox_command(77-110)nemo_skills/pipeline/utils/declarative.py (5)
Command(137-231)CommandGroup(246-259)HardwareConfig(235-243)Pipeline(262-723)run(326-461)nemo_skills/inference/generate.py (1)
get_server_command_fn(253-263)nemo_skills/pipeline/utils/generation.py (3)
get_chunked_rs_filename(29-45)configure_client(305-365)get_generation_cmd(178-280)nemo_skills/pipeline/utils/server.py (1)
wrap_python_path(64-65)
🪛 Ruff (0.13.3)
tests/test_generation.py
185-185: Probable insecure usage of temporary file or directory: "/tmp/logs"
(S108)
nemo_skills/pipeline/generate.py
52-52: Unused function argument: keep_mounts_for_sandbox
(ARG001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: unit-tests
🔇 Additional comments (1)
nemo_skills/pipeline/generate.py (1)
363-476: LGTM! Clean refactoring to declarative pipeline.The refactoring successfully transitions from ad-hoc task creation to a declarative Job specification list with Pipeline-driven execution. The dependency chain handling (lines 416-458) correctly references job objects rather than string names, and the first job in each chain properly handles both
_task_dependenciesandrun_after. The final Pipeline construction and execution model is clean and consistent.
| if with_sandbox: | ||
| # Call sandbox command builder directly with cluster_config | ||
| cmd, metadata = sandbox_command(cluster_config=cluster_config, port=sandbox_port) | ||
| metadata["log_prefix"] = "sandbox" | ||
|
|
||
| sandbox_cmd = Command( | ||
| command=cmd, | ||
| container=cluster_config["containers"]["sandbox"], | ||
| name=task_name, | ||
| metadata=metadata, | ||
| ) | ||
|
|
||
| components.append(sandbox_cmd) |
There was a problem hiding this comment.
Implement keep_mounts_for_sandbox to restore mount propagation.
The keep_mounts_for_sandbox parameter is accepted but never used, causing a functional regression. The CLI flag is still exposed, but sandbox containers always run without mounts. To fix this, propagate mounts from cluster_config to sandbox metadata when the flag is enabled.
Apply this diff:
# 3. Add sandbox if requested
if with_sandbox:
# Call sandbox command builder directly with cluster_config
cmd, metadata = sandbox_command(cluster_config=cluster_config, port=sandbox_port)
metadata["log_prefix"] = "sandbox"
+
+ # Propagate mounts to sandbox if enabled
+ if keep_mounts_for_sandbox and "mounts" in cluster_config:
+ metadata["mounts"] = cluster_config["mounts"]
sandbox_cmd = Command(
command=cmd,
container=cluster_config["containers"]["sandbox"],
name=task_name,
metadata=metadata,
)
components.append(sandbox_cmd)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if with_sandbox: | |
| # Call sandbox command builder directly with cluster_config | |
| cmd, metadata = sandbox_command(cluster_config=cluster_config, port=sandbox_port) | |
| metadata["log_prefix"] = "sandbox" | |
| sandbox_cmd = Command( | |
| command=cmd, | |
| container=cluster_config["containers"]["sandbox"], | |
| name=task_name, | |
| metadata=metadata, | |
| ) | |
| components.append(sandbox_cmd) | |
| if with_sandbox: | |
| # Call sandbox command builder directly with cluster_config | |
| cmd, metadata = sandbox_command(cluster_config=cluster_config, port=sandbox_port) | |
| metadata["log_prefix"] = "sandbox" | |
| # Propagate mounts to sandbox if enabled | |
| if keep_mounts_for_sandbox and "mounts" in cluster_config: | |
| metadata["mounts"] = cluster_config["mounts"] | |
| sandbox_cmd = Command( | |
| command=cmd, | |
| container=cluster_config["containers"]["sandbox"], | |
| name=task_name, | |
| metadata=metadata, | |
| ) | |
| components.append(sandbox_cmd) |
🤖 Prompt for AI Agents
nemo_skills/pipeline/generate.py around lines 103 to 115: the
keep_mounts_for_sandbox parameter is accepted but never used so sandbox metadata
never receives mount info; when keep_mounts_for_sandbox is truthy, copy the
mounts configuration from cluster_config into the sandbox metadata (e.g.,
metadata["mounts"] = cluster_config.get("mounts") or the appropriate
container-specific mounts) and set a flag in metadata (e.g.,
metadata["keep_mounts_for_sandbox"] = True) so the sandbox Command is created
with mounts restored and mount propagation preserved.
Signed-off-by: George Armstrong <georgea@nvidia.com>
Signed-off-by: George Armstrong <georgea@nvidia.com> add test for dependency issue Signed-off-by: George Armstrong <georgea@nvidia.com> add test for dependency issue Signed-off-by: George Armstrong <georgea@nvidia.com>
Signed-off-by: George Armstrong <georgea@nvidia.com>
Signed-off-by: George Armstrong <georgea@nvidia.com>
…actor-task-construction-refactor-generate-main
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (1)
tests/test_declarative_pipeline.py (1)
601-870: Excellent test coverage for complex dependency scenarios.These tests verify critical dependency handling including:
- Multiple internal dependencies (lines 604-670)
- Proper separation of internal vs external dependencies (lines 672-767) with clear documentation of the bug fix
- Cross-experiment dependency chains (lines 768-840)
- Defensive handling of nonexistent experiments (lines 842-848)
- Experiment object handling (lines 850-869)
The docstrings and inline comments clearly explain what each test verifies, which is especially valuable for the bug fix verification test.
Optional: Consider using tmp_path consistently.
Lines 630-637, 719-723 use hardcoded
/tmp/logspaths, while the test at line 768 uses thetmp_pathfixture. For consistency and to eliminate static analysis warnings, consider usingtmp_paththroughout:def test_multiple_internal_dependencies(self, tmp_path): # ... existing code ... log_dir = str(tmp_path / "logs") group1 = CommandGroup(commands=[cmd1], name="group1", log_dir=log_dir) # ... etcThis would also ensure proper cleanup and avoid potential conflicts in parallel test execution.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
tests/test_declarative_pipeline.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
tests/test_declarative_pipeline.py (5)
nemo_skills/pipeline/cli.py (1)
wrap_arguments(45-54)nemo_skills/pipeline/generate.py (1)
generate(139-476)nemo_skills/pipeline/run_cmd.py (1)
run_cmd(40-211)nemo_skills/pipeline/utils/declarative.py (8)
Command(137-231)CommandGroup(246-259)HardwareConfig(235-243)Pipeline(262-742)prepare_for_execution(185-228)meta_ref(176-183)hostname_ref(169-174)run(326-467)nemo_skills/pipeline/utils/exp.py (1)
get_exp_handles(57-102)
🪛 Ruff (0.13.3)
tests/test_declarative_pipeline.py
79-79: Unpacked variable exec_config is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
288-288: Unused method argument: mock_run_exp
(ARG002)
318-318: Unused method argument: mock_run_exp
(ARG002)
630-630: Probable insecure usage of temporary file or directory: "/tmp/logs"
(S108)
633-633: Probable insecure usage of temporary file or directory: "/tmp/logs"
(S108)
637-637: Probable insecure usage of temporary file or directory: "/tmp/logs"
(S108)
719-719: Probable insecure usage of temporary file or directory: "/tmp/logs"
(S108)
723-723: Probable insecure usage of temporary file or directory: "/tmp/logs"
(S108)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: unit-tests
- GitHub Check: pre-commit
🔇 Additional comments (6)
tests/test_declarative_pipeline.py (6)
29-150: Comprehensive test coverage for Command class.The test methods thoroughly cover Command functionality including basic creation, metadata handling, callable commands, prepare_for_execution with both strings and callables, meta_ref/hostname_ref methods, installation commands, and environment variable wrapping. The assertions are clear and verify the expected behavior.
Note: The static analysis warning about unused
exec_configon line 79 is a false positive—the variable is used in assertions on lines 66-68.
152-183: LGTM! CommandGroup tests are clear and comprehensive.The tests appropriately verify CommandGroup creation with various configurations including hardware specifications and log directory settings.
185-280: LGTM! Thorough testing of Pipeline initialization.The tests cover various Pipeline initialization scenarios including single/multiple jobs, dependency handling, run_after parameter (both string and list forms), and proper error handling for missing required parameters.
282-415: LGTM! Pipeline execution tests are thorough.The tests verify pipeline execution behavior including basic execution, internal dependencies, and HF_HOME validation (both missing and non-mounted scenarios). The mocking strategy is appropriate for isolating the units under test.
Note: The unused
mock_run_expparameters flagged by static analysis (lines 288, 318) are intentionally included to prevent side effects during testing—this is a common and acceptable pattern.
417-520: LGTM! Het group index tests verify critical behavior.The tests comprehensively verify het_group_index assignment including:
- Non-heterogeneous jobs remain None (localhost communication)
- Heterogeneous jobs get sequential indices (0, 1, ...)
- Indices are per-job, not global across the pipeline
The per-job indexing test (lines 483-520) is particularly valuable as it prevents a subtle bug where indices could incorrectly accumulate across jobs.
522-599: LGTM! Edge case and error handling tests are appropriate.The dependency resolution tests verify proper handling of None dependencies and run_after propagation, while the error handling tests ensure meaningful errors are raised for invalid configurations (missing group/groups keys, missing log_dir).
Signed-off-by: George Armstrong <georgea@nvidia.com>
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
nemo_skills/pipeline/utils/declarative.py (1)
161-161: Usetyping.Anyinstead of built-inany.The type annotation uses the built-in
anyinstead oftyping.Any.Import
Anyfrom typing and update the annotation:-from typing import Callable, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union- metadata: Dict[str, any] = field(default_factory=dict) # Stores metadata from command builders + metadata: Dict[str, Any] = field(default_factory=dict) # Stores metadata from command builders
🧹 Nitpick comments (1)
nemo_skills/pipeline/utils/declarative.py (1)
645-650: Consider more specific exception handling.Catching bare
Exceptionmakes it hard to distinguish between different failure modes. While the fallback is safe, catching specific exceptions (e.g.,run.ExperimentNotFoundErroror similar) would make debugging easier.Based on static analysis hints.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
nemo_skills/pipeline/utils/declarative.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
nemo_skills/pipeline/utils/declarative.py (6)
nemo_skills/pipeline/utils/cluster.py (4)
get_env_variables(108-200)get_tunnel(357-362)temporary_env_update(204-213)tunnel_hash(351-354)nemo_skills/pipeline/utils/exp.py (3)
get_executor(154-284)run_exp(620-651)install_packages_wrap(287-327)nemo_skills/pipeline/utils/commands.py (1)
wrap_command(113-138)nemo_skills/pipeline/utils/mounts.py (1)
is_mounted_filepath(27-46)nemo_skills/pipeline/utils/packager.py (1)
get_registered_external_repo(64-76)nemo_skills/utils.py (1)
get_logger_name(130-134)
🪛 Ruff (0.13.3)
nemo_skills/pipeline/utils/declarative.py
179-182: Avoid specifying long messages outside the exception class
(TRY003)
185-185: Unused method argument: cluster_config
(ARG002)
301-301: Avoid specifying long messages outside the exception class
(TRY003)
306-306: Avoid specifying long messages outside the exception class
(TRY003)
310-310: Avoid specifying long messages outside the exception class
(TRY003)
312-312: Avoid specifying long messages outside the exception class
(TRY003)
318-322: Avoid specifying long messages outside the exception class
(TRY003)
324-324: Avoid specifying long messages outside the exception class
(TRY003)
392-392: Avoid specifying long messages outside the exception class
(TRY003)
399-402: Avoid specifying long messages outside the exception class
(TRY003)
445-445: Avoid specifying long messages outside the exception class
(TRY003)
554-554: Avoid specifying long messages outside the exception class
(TRY003)
563-563: Loop control variable het_idx not used within loop body
(B007)
645-645: Do not catch blind exception: Exception
(BLE001)
648-648: Do not catch blind exception: Exception
(BLE001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: unit-tests
- GitHub Check: pre-commit
🔇 Additional comments (1)
nemo_skills/pipeline/utils/declarative.py (1)
136-742: Overall architecture looks solid.The declarative pipeline system provides a clean API for defining and executing complex job workflows. The separation of Command, CommandGroup, and Pipeline abstractions is well-designed. The dependency resolution logic correctly handles both internal and external dependencies, and the heterogeneous job support is comprehensive.
Key strengths:
- Clear separation of concerns between command definition and execution
- Flexible dependency model supporting both internal (task handles) and external (experiment names) references
- Good error messages and validation in
_validate()- Proper handling of code reuse across experiments
- Support for both single-group and multi-group heterogeneous SLURM jobs
Note: The critical issues flagged in past review comments regarding callable command wrapping (lines 195-216) and shared env propagation (lines 561-602) should be addressed before merging.
…-construction-refactor-generate-main
Signed-off-by: George Armstrong <georgea@nvidia.com>
Signed-off-by: George Armstrong <georgea@nvidia.com> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Co-authored-by: Igor Gitman <igitman@nvidia.com>
Signed-off-by: George Armstrong <georgea@nvidia.com> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Co-authored-by: Igor Gitman <igitman@nvidia.com>
Signed-off-by: George Armstrong <georgea@nvidia.com> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Co-authored-by: Igor Gitman <igitman@nvidia.com> Signed-off-by: dgitman <dgitman@nvidia.com>
Summary by CodeRabbit
New Features
Tests