Revert "Use run.Script for generate pipeline (#1052)"#1125
Conversation
This reverts commit 9c5b68c. Signed-off-by: George Armstrong <georgea@nvidia.com>
8ab3681 to
a3f35de
Compare
📝 WalkthroughWalkthroughThis pull request refactors the NeMo Skills pipeline from a Script-based architecture to a Command-based system, eliminating legacy Script classes (ServerScript, GenerationClientScript, SandboxScript) and introducing CommandGroup-driven composition. The generate.py entry point shifts from multi-model list parameters to scalar values, while GPU test workflows are simplified to delegate control flow to underlying scripts. Changes
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes
Possibly related PRs
Suggested labels
Suggested reviewers
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
nemo_skills/pipeline/generate.py (1)
459-473:server_config.copy()is a shallow copy.If
server_configcontains nested mutable objects (like dicts or lists), modifications could still affect other iterations. Looking at the function, line 77 doesserver_config.pop("container")which mutates the dict.Use
copy.deepcopyto ensure nested structures are also copied:+import copy + # ... in the loop ... # Create CommandGroup for this task cmd_group = _create_commandgroup_from_config( generation_cmd=cmd, - server_config=server_config.copy() if server_config else None, + server_config=copy.deepcopy(server_config) if server_config else None, with_sandbox=with_sandbox,Alternatively, avoid mutating
server_configinside_create_commandgroup_from_configby using.get()instead of.pop():- if "container" in server_config: - server_container = server_config.pop("container") + if "container" in server_config: + server_container = server_config["container"]nemo_skills/pipeline/nemo_evaluator.py (1)
736-808: Type annotation mismatch forlauncher_run_cfgparameter.The
_build_task_cmdfunction signature (line 738) incorrectly annotateslauncher_run_cfgasDictConfig, but the_TaskCreationContextdataclass (line ~513) correctly types it asRunConfig. Additionally, the docstring states "Global evaluator configuration from RunConfig," confirming the intended type. When_build_task_cmdis called from code using_TaskCreationContext, it receives aRunConfiginstance despite theDictConfigtype hint. Update the function signature tolauncher_run_cfg: RunConfigto match the actual type being passed.
🧹 Nitpick comments (5)
tests/test_declarative_pipeline.py (1)
70-81: Consider prefixing unused variable with underscore.The
exec_configvariable is intentionally unused here to demonstrate the return structure ofprepare_for_execution. Prefixing it with an underscore (_exec_config) would silence the linter warning while maintaining readability.def test_command_prepare_for_execution_callable(self): """Test prepare_for_execution with callable command.""" def make_cmd(): return "echo test" cmd = Command(command=make_cmd, name="test") cluster_config = {"executor": "local", "containers": {}} - final_cmd, exec_config = cmd.prepare_for_execution(cluster_config) + final_cmd, _exec_config = cmd.prepare_for_execution(cluster_config) assert final_cmd == "echo test"nemo_skills/pipeline/utils/declarative.py (3)
180-181: Type annotation uses lowercaseanyinstead ofAny.The
metadatafield usesDict[str, any]which technically works in Python 3.9+ but is unconventional. The standard practice is to usetyping.Any.- metadata: Dict[str, any] = field(default_factory=dict) # Stores metadata from command builders + metadata: Dict[str, Any] = field(default_factory=dict) # Stores metadata from command buildersAdd
Anyto the imports:-from typing import Callable, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union
204-247: Thecluster_configparameter is unused inprepare_for_execution.The static analysis correctly identifies that
cluster_configis passed but never used in this method. This appears to be a signature prepared for future use or left over from refactoring.If
cluster_configis not needed, consider removing it to reduce API surface:- def prepare_for_execution(self, cluster_config: Dict) -> Tuple[str, Dict]: + def prepare_for_execution(self) -> Tuple[str, Dict]:Note: This would require updating all call sites (line 487, 579).
574-615: Shared environment collection for heterogeneous jobs looks correct but has redundant loop variable.The static analysis notes that
het_idxon line 577 is unused within the loop body. This is a minor code smell but doesn't affect functionality.- for het_idx, group in enumerate(groups): - for command in group.commands: + for group in groups: + for command in group.commands: _, exec_config_probe = command.prepare_for_execution(cluster_config) shared_env_vars.update(exec_config_probe.get("environment", {}))nemo_skills/pipeline/generate.py (1)
56-56: Unused parameterkeep_mounts_for_sandbox.The static analysis correctly identifies that this parameter is never used within the function. It's declared but the actual mount handling appears to happen elsewhere.
Either remove the unused parameter or implement the intended functionality:
def _create_commandgroup_from_config( generation_cmd: str, server_config: Optional[Dict], with_sandbox: bool, sandbox_port: Optional[int], cluster_config: Dict, installation_command: Optional[str], get_server_command_fn: Callable, partition: Optional[str], - keep_mounts_for_sandbox: bool, task_name: str, log_dir: str, sbatch_kwargs: Optional[Dict] = None, sandbox_env_overrides: Optional[List[str]] = None, ) -> CommandGroup:This would also require updating the call site at line 468.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (11)
.github/workflows/gpu_tests.yml(1 hunks)nemo_skills/pipeline/generate.py(11 hunks)nemo_skills/pipeline/nemo_evaluator.py(7 hunks)nemo_skills/pipeline/utils/__init__.py(0 hunks)nemo_skills/pipeline/utils/declarative.py(9 hunks)nemo_skills/pipeline/utils/generation.py(1 hunks)nemo_skills/pipeline/utils/scripts.py(0 hunks)tests/gpu-tests/test_eval.py(1 hunks)tests/test_declarative_pipeline.py(24 hunks)tests/test_generation.py(2 hunks)tests/test_nemo_evaluator_pipeline.py(5 hunks)
💤 Files with no reviewable changes (2)
- nemo_skills/pipeline/utils/init.py
- nemo_skills/pipeline/utils/scripts.py
🧰 Additional context used
🧬 Code graph analysis (5)
tests/test_nemo_evaluator_pipeline.py (2)
nemo_skills/pipeline/nemo_evaluator.py (1)
nemo_evaluator(113-421)nemo_skills/pipeline/utils/declarative.py (1)
Command(156-250)
tests/test_generation.py (1)
nemo_skills/pipeline/generate.py (2)
generate(163-519)_create_commandgroup_from_config(47-158)
nemo_skills/pipeline/utils/declarative.py (5)
nemo_skills/pipeline/utils/cluster.py (1)
get_env_variables(163-276)nemo_skills/pipeline/utils/exp.py (1)
install_packages_wrap(368-408)nemo_skills/pipeline/utils/commands.py (1)
wrap_command(114-139)nemo_skills/pipeline/utils/mounts.py (1)
is_mounted_filepath(27-46)nemo_skills/pipeline/utils/packager.py (1)
get_registered_external_repo(64-76)
nemo_skills/pipeline/nemo_evaluator.py (3)
nemo_skills/pipeline/utils/commands.py (1)
vllm_server_command(28-74)nemo_skills/pipeline/utils/declarative.py (3)
Command(156-250)hostname_ref(188-193)meta_ref(195-202)nemo_skills/utils.py (1)
get_server_wait_cmd(618-624)
tests/test_declarative_pipeline.py (1)
nemo_skills/pipeline/utils/declarative.py (7)
Command(156-250)prepare_for_execution(204-247)meta_ref(195-202)hostname_ref(188-193)CommandGroup(263-276)Pipeline(279-756)run(346-483)
🪛 Ruff (0.14.8)
tests/test_generation.py
182-182: Probable insecure usage of temporary file or directory: "/tmp/logs"
(S108)
nemo_skills/pipeline/utils/declarative.py
198-201: Avoid specifying long messages outside the exception class
(TRY003)
204-204: Unused method argument: cluster_config
(ARG002)
577-577: Loop control variable het_idx not used within loop body
(B007)
nemo_skills/pipeline/generate.py
56-56: Unused function argument: keep_mounts_for_sandbox
(ARG001)
193-193: Do not perform function call typer.Option in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
tests/test_declarative_pipeline.py
79-79: Unpacked variable exec_config is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
630-630: Probable insecure usage of temporary file or directory: "/tmp/logs"
(S108)
633-633: Probable insecure usage of temporary file or directory: "/tmp/logs"
(S108)
719-719: Probable insecure usage of temporary file or directory: "/tmp/logs"
(S108)
947-949: Possible SQL injection vector through string-based query construction
(S608)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: pre-commit
- GitHub Check: unit-tests
🔇 Additional comments (32)
.github/workflows/gpu_tests.yml (1)
47-55: Simplification looks good; verify tests still work as expected.The removal of heartbeat loops, PID tracking, and manual EXIT_CODE handling in favor of direct script execution simplifies the workflow significantly. The essential safeguards remain in place:
- Step-level timeout (240 minutes)
pipefailto propagate test failuresSince this reverts to a previous working state, the approach should be sound.
Please verify that the GPU tests execute successfully and that test failures are correctly detected and reported by the workflow. You can check recent workflow runs or trigger a test run with the 'run GPU tests' label.
tests/gpu-tests/test_eval.py (1)
47-47: LGTM - Dataset exclusion update with clear rationale.The exclusion of "aalcr" with a documented reason ("Has tokenization mismatch issues") is appropriate. This helps maintain test stability while the underlying issue is addressed.
nemo_skills/pipeline/utils/generation.py (5)
29-45: LGTM - Clean path construction helper.The function correctly handles optional
random_seedandchunk_idparameters, constructing appropriate filenames for chunked generation outputs.
60-121: Robust batch processing with appropriate fallback mechanism.The implementation correctly handles batch processing of file existence checks with a conservative batch size of 30 to avoid "Argument list too long" errors. The fallback to individual file checks when batches fail ensures reliability.
178-281: Well-documented Hydra argument parsing.The function has excellent docstring examples and correctly handles both
--flag=valueand--flag valueformats for Hydra configuration flags.
284-390: Simplified single-model generation command construction.The function correctly validates inputs (mutual exclusivity of
input_file/input_dir), handles chunking with proper merge command construction, and safely quotes the postprocess command usingshlex.quote.
415-474: Clean client configuration utility.The function properly handles both self-hosted servers (when
server_gpusis set) and externally-hosted servers, returning appropriate configuration for each case.tests/test_generation.py (1)
156-188: Well-structured test for Command-based metadata creation.The test correctly validates that
_create_commandgroup_from_configproperly creates server command metadata withnum_tasksandgpusvalues from the mocked server function. The assertions onserver_cmd.metadataalign with the Command-based architecture.tests/test_nemo_evaluator_pipeline.py (4)
134-136: Correct Command attribute assertions for no-server case.The assertions properly verify that when using external URLs without hosted servers, the client command has
gpus=Noneandnodes=1, matching the expected Command-based behavior.
184-193: Proper validation of server metadata and client callable.The test correctly verifies server command metadata (port, log_prefix) and that the client command uses a callable for lazy evaluation of cross-component references (hostname_ref, meta_ref).
238-245: Judge server Command assertions are correct.The test properly validates judge server command properties (
gpus=32,log_prefix="judge-server") and client command callable behavior.
303-315: Correct validation of two-group structure for both servers.The test properly verifies that when both servers are hosted, separate groups are created with appropriate GPU allocations (8 for server group, 32 for judge group) and client command uses callable for cross-component references.
tests/test_declarative_pipeline.py (3)
152-279: Comprehensive CommandGroup and Pipeline tests.The tests thoroughly cover CommandGroup creation with hardware configuration, Pipeline initialization with various job structures, and dependency handling (including
run_afterparameter).
417-519: Critical tests for heterogeneous job index assignment.These tests correctly validate that
het_group_indexis assigned per-job (not globally across the pipeline), ensuring proper SLURM heterogeneous job behavior withhostname_ref()returning appropriate SLURM variables.
934-962: Important integration test for sandbox environment variables.This test validates that the sandbox integration correctly passes
NEMO_SKILLS_SANDBOX_PORTto the client command andLISTEN_PORT/NGINX_PORTto the sandbox command. This ensures proper communication between components.nemo_skills/pipeline/nemo_evaluator.py (5)
90-106: Import changes align with the Command-based refactor.The imports are correctly updated to support the new architecture:
dataclassfor_TaskCreationContext,DictConfigfrom OmegaConf for config handling, andvllm_server_commandreplacing the previousServerScriptapproach.
288-330: Task context initialization looks correct.The
_TaskCreationContextis properly initialized withlauncher_run_cfg(RunConfig) andtask_cfg(DictConfig) directly, which aligns with the updated dataclass definition. All server configuration parameters are correctly passed through.
468-503: Command construction for serving components is well-structured.The
_create_serving_command_objfunction correctly:
- Builds the command via
vllm_server_command- Resolves container fallback from cluster config
- Sets appropriate log prefixes and metadata
- Returns a properly configured
CommandobjectThe metadata merging with
**metaensures port and other server metadata are preserved.
648-699: Lambda factory pattern for runtime URL resolution is correctly implemented.The
_client_cmd_factoryclosure capturesctx,main_server_cmd, andjudge_server_cmdreferences, enabling lazy evaluation ofhostname_ref()andmeta_ref()whenhet_group_indexis assigned at pipeline execution time. This is the correct pattern for cross-component communication in heterogeneous SLURM jobs.
701-733: External server URL handling is straightforward.When no servers are hosted, the code correctly builds static URLs from
server_base_urlandjudge_server_base_url, bypassing the lambda factory pattern since runtime resolution isn't needed.nemo_skills/pipeline/utils/declarative.py (6)
155-167: Well-documented Command class with clear lazy evaluation semantics.The docstring clearly explains when lambdas are needed (cross-component references) and why (
het_group_indexisn't assigned until pipeline execution). This is helpful documentation for maintainers.
183-186: Command wrapping in__post_init__has a subtle issue.When
self.commandis a string and eitherenv_varsorworking_diris set, the command is wrapped. However,working_dirhas a default value of/nemo_run/code, so this wrapping will always occur for string commands even when the user didn't explicitly setworking_dir. This is likely intentional for consistency, but worth noting.
195-202: Clear error message for missing metadata keys.The
meta_refmethod provides a helpful error message showing available keys when a key is not found. This aids debugging cross-component reference issues.
485-494: MPI wrapping for non-SLURM executors is correctly handled.The
_prepare_commandmethod properly wraps commands withmpirunwhen running on non-SLURM executors with multiple tasks. The use ofshlex.quoteensures the command is safely passed to bash.
596-644: Command processing loop handles GPU allocation and environment merging correctly.The logic for:
- Assigning
het_group_indexonly for heterogeneous jobs- Applying job-level GPU overrides for single-group jobs
- Merging shared environment variables for heterogeneous jobs
- Sharing packagers across executors for efficiency
All appears correct and well-structured.
696-712: Script creation and experiment addition is straightforward.The code correctly creates
run.Scriptobjects with inline commands and adds them to the experiment. Single commands get a single executor, multiple commands get a list of executors. Ray metadata is only added to the first script in multi-script scenarios.nemo_skills/pipeline/generate.py (6)
47-68: Clear component ordering documentation in_create_commandgroup_from_config.The docstring clearly explains the ordering of components (Server → Client → Sandbox), which is important for understanding execution flow.
143-158: Hardware configuration uses maximum resources across components.The logic correctly computes
max_gpusandmax_nodesfrom all components to ensure the job requests sufficient resources. This is the right approach for co-located components.
189-204: Simplified server configuration parameters.The transition from list-based parameters (e.g.,
models: List[str]) to scalar values (model: str) aligns with the revert PR's goal of simplifying the interface. This makes the API more straightforward for single-model use cases.
309-312: Safe handling of server_type enum conversion.The try/except block safely handles both enum and string values for
server_type, which is good defensive programming.
410-422: Server configuration now uses scalar parameters.The
configure_clientcall correctly passes scalar values (model,server_type, etc.) instead of the previous list-based approach. This aligns with the simplified function signature.
451-473: Sandbox port allocation and CommandGroup creation look correct.The code:
- Allocates a sandbox port before creating the CommandGroup (so client knows the port)
- Copies
server_configto avoid mutation across loop iterations- Passes all necessary parameters to
_create_commandgroup_from_config
| if sandbox_env_overrides: | ||
| sandbox_env = metadata.get("environment", {}) | ||
| for override in sandbox_env_overrides: | ||
| key, value = override.split("=", 1) | ||
| sandbox_env[key] = value | ||
| metadata["environment"] = sandbox_env |
There was a problem hiding this comment.
Missing validation for sandbox environment override format.
The code splits override on "=" with maxsplit=1, but if an override doesn't contain "=", this will raise a ValueError with an unhelpful message.
Add validation for the expected format:
if sandbox_env_overrides:
sandbox_env = metadata.get("environment", {})
for override in sandbox_env_overrides:
- key, value = override.split("=", 1)
+ if "=" not in override:
+ raise ValueError(
+ f"Invalid sandbox_env_override format: '{override}'. "
+ "Expected KEY=VALUE format."
+ )
+ key, value = override.split("=", 1)
sandbox_env[key] = value
metadata["environment"] = sandbox_env📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if sandbox_env_overrides: | |
| sandbox_env = metadata.get("environment", {}) | |
| for override in sandbox_env_overrides: | |
| key, value = override.split("=", 1) | |
| sandbox_env[key] = value | |
| metadata["environment"] = sandbox_env | |
| if sandbox_env_overrides: | |
| sandbox_env = metadata.get("environment", {}) | |
| for override in sandbox_env_overrides: | |
| if "=" not in override: | |
| raise ValueError( | |
| f"Invalid sandbox_env_override format: '{override}'. " | |
| "Expected KEY=VALUE format." | |
| ) | |
| key, value = override.split("=", 1) | |
| sandbox_env[key] = value | |
| metadata["environment"] = sandbox_env |
🤖 Prompt for AI Agents
In nemo_skills/pipeline/generate.py around lines 127 to 132, the loop that
handles sandbox_env_overrides blindly calls override.split("=", 1) which raises
a ValueError for inputs without "="; add validation before splitting by checking
that "=" is present (and non-empty key after splitting), strip whitespace from
key/value, and either raise a clear ValueError describing the invalid override
and expected "KEY=VALUE" format or skip/log the invalid entry; then assign
sandbox_env[key] = value and set metadata["environment"] = sandbox_env as
before.
| # Handle executor="none" path replacements (single-group only) | ||
| if (not heterogeneous) and cluster_config["executor"] == "none": | ||
| for idx in range(len(commands)): | ||
| commands[idx] = commands[idx].replace( | ||
| "/nemo_run/code/nemo_skills", str(get_registered_external_repo("nemo_skills").path) | ||
| ) | ||
| commands[idx] = commands[idx].replace("/nemo_run/code", "./") |
There was a problem hiding this comment.
Path replacement for executor="none" assumes nemo_skills repo is registered.
The code calls get_registered_external_repo("nemo_skills").path without checking if the repo is registered. If the repo is not registered, this will raise an AttributeError when accessing .path on None.
Consider adding a guard:
if (not heterogeneous) and cluster_config["executor"] == "none":
+ nemo_skills_repo = get_registered_external_repo("nemo_skills")
+ if nemo_skills_repo is None:
+ raise RuntimeError("nemo_skills repo not registered for executor='none' path replacement")
for idx in range(len(commands)):
commands[idx] = commands[idx].replace(
- "/nemo_run/code/nemo_skills", str(get_registered_external_repo("nemo_skills").path)
+ "/nemo_run/code/nemo_skills", str(nemo_skills_repo.path)
)
commands[idx] = commands[idx].replace("/nemo_run/code", "./")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Handle executor="none" path replacements (single-group only) | |
| if (not heterogeneous) and cluster_config["executor"] == "none": | |
| for idx in range(len(commands)): | |
| commands[idx] = commands[idx].replace( | |
| "/nemo_run/code/nemo_skills", str(get_registered_external_repo("nemo_skills").path) | |
| ) | |
| commands[idx] = commands[idx].replace("/nemo_run/code", "./") | |
| # Handle executor="none" path replacements (single-group only) | |
| if (not heterogeneous) and cluster_config["executor"] == "none": | |
| nemo_skills_repo = get_registered_external_repo("nemo_skills") | |
| if nemo_skills_repo is None: | |
| raise RuntimeError("nemo_skills repo not registered for executor='none' path replacement") | |
| for idx in range(len(commands)): | |
| commands[idx] = commands[idx].replace( | |
| "/nemo_run/code/nemo_skills", str(nemo_skills_repo.path) | |
| ) | |
| commands[idx] = commands[idx].replace("/nemo_run/code", "./") |
🤖 Prompt for AI Agents
In nemo_skills/pipeline/utils/declarative.py around lines 679–685, the path
replacement for executor="none" directly calls
get_registered_external_repo("nemo_skills").path which will raise if the repo is
not registered; guard the call by first retrieving the repo into a variable,
check if it is not None (or has a .path), and only perform the replacement using
repo.path when present; otherwise either skip the nemo_skills-specific
replacement (leave "/nemo_run/code/nemo_skills" unchanged or replace with a safe
default like "./") or raise a clear error describing that the external repo is
not registered so callers can register it before using executor="none".
This reverts commit 1c0722a. FIX multi-node pipeline creation Signed-off-by: George Armstrong <georgea@nvidia.com> remove hosntame ref change Signed-off-by: George Armstrong <georgea@nvidia.com> make param span_group_nodes Signed-off-by: George Armstrong <georgea@nvidia.com>
…DIA-NeMo#1125) Signed-off-by: George Armstrong <georgea@nvidia.com> Signed-off-by: dlord <dlord@nvidia.com>
This reverts commit 9c5b68c.
Summary by CodeRabbit
New Features
Improvements
Tests
✏️ Tip: You can customize this high-level summary in your review settings.