From 5061fd7f8f4ae31a467d44e62e41fb904c5ec61f Mon Sep 17 00:00:00 2001 From: George Armstrong Date: Wed, 19 Nov 2025 10:45:58 -0800 Subject: [PATCH 1/5] ENH symlink nemo-run files inside the experiment directory Signed-off-by: George Armstrong --- .../gpt_oss_python_aime25/run_test.py | 13 ++++- .../slurm-tests/omr_simple_recipe/run_test.py | 11 +++- tests/slurm-tests/qwen3_4b_evals/run_test.py | 17 ++++-- .../qwen3coder_30b_swebench/run_test.py | 13 ++++- tests/slurm-tests/super_49b_evals/run_test.py | 17 ++++-- tests/slurm-tests/utils.py | 55 +++++++++++++++++++ 6 files changed, 113 insertions(+), 13 deletions(-) diff --git a/tests/slurm-tests/gpt_oss_python_aime25/run_test.py b/tests/slurm-tests/gpt_oss_python_aime25/run_test.py index 866f604f17..78088bde73 100644 --- a/tests/slurm-tests/gpt_oss_python_aime25/run_test.py +++ b/tests/slurm-tests/gpt_oss_python_aime25/run_test.py @@ -13,6 +13,12 @@ # limitations under the License. import argparse +import sys +from pathlib import Path + +# Add parent directory to path to import utils +sys.path.insert(0, str(Path(__file__).parents[1])) +from utils import prepare_cluster_config_for_test from nemo_skills.pipeline.cli import eval, prepare_data, run_cmd, wrap_arguments @@ -57,11 +63,14 @@ def main(): args = parser.parse_args() + # Prepare cluster config with job_dir set to workspace + cluster = prepare_cluster_config_for_test(args.cluster, args.workspace) + prepare_data(ctx=wrap_arguments("aime25")) eval_expname = eval_gpt_oss_python( workspace=args.workspace, - cluster=args.cluster, + cluster=cluster, expname_prefix=args.expname_prefix, wandb_project=args.wandb_project, ) @@ -71,7 +80,7 @@ def main(): run_cmd( ctx=wrap_arguments(checker_cmd), - cluster=args.cluster, + cluster=cluster, expname=args.expname_prefix + "-check-results", log_dir=f"{args.workspace}/check-results-logs", run_after=eval_expname, diff --git a/tests/slurm-tests/omr_simple_recipe/run_test.py b/tests/slurm-tests/omr_simple_recipe/run_test.py index 209e66b5b7..7002c1eca3 100644 --- a/tests/slurm-tests/omr_simple_recipe/run_test.py +++ b/tests/slurm-tests/omr_simple_recipe/run_test.py @@ -13,6 +13,12 @@ import argparse import subprocess +import sys +from pathlib import Path + +# Add parent directory to path to import utils +sys.path.insert(0, str(Path(__file__).parents[1])) +from utils import prepare_cluster_config_for_test from nemo_skills.pipeline.cli import run_cmd, wrap_arguments @@ -33,6 +39,9 @@ def main(): ) args = ap.parse_args() + # Prepare cluster config with job_dir set to workspace + cluster = prepare_cluster_config_for_test(args.cluster, args.workspace) + cmd = ( f"python -m recipes.openmathreasoning.scripts.simplified_recipe " f" --cluster {args.cluster} " @@ -54,7 +63,7 @@ def main(): run_cmd( ctx=wrap_arguments(checker_cmd), - cluster=args.cluster, + cluster=cluster, expname=args.expname_prefix + "-check-results", log_dir=f"{args.workspace}/check-results-logs", # these are launched in simplified recipe diff --git a/tests/slurm-tests/qwen3_4b_evals/run_test.py b/tests/slurm-tests/qwen3_4b_evals/run_test.py index 7877a2a238..27d644ffde 100644 --- a/tests/slurm-tests/qwen3_4b_evals/run_test.py +++ b/tests/slurm-tests/qwen3_4b_evals/run_test.py @@ -13,6 +13,12 @@ # limitations under the License. import argparse +import sys +from pathlib import Path + +# Add parent directory to path to import utils +sys.path.insert(0, str(Path(__file__).parents[1])) +from utils import prepare_cluster_config_for_test from nemo_skills.pipeline.cli import eval, prepare_data, run_cmd, wrap_arguments @@ -142,11 +148,14 @@ def main(): args = parser.parse_args() + # Prepare cluster config with job_dir set to workspace + cluster = prepare_cluster_config_for_test(args.cluster, args.workspace) + prepare_data(ctx=wrap_arguments("bfcl_v3 aime24")) bfcl_expname = eval_qwen3_bfcl( workspace=args.workspace, - cluster=args.cluster, + cluster=cluster, expname_prefix=args.expname_prefix, wandb_project=args.wandb_project, ) @@ -154,14 +163,14 @@ def main(): # GenSelect Tests online_genselect_expname = eval_qwen3_online_genselect( workspace=args.workspace, - cluster=args.cluster, + cluster=cluster, expname_prefix=args.expname_prefix, wandb_project=args.wandb_project, ) offline_genselect_expname = eval_qwen3_offline_genselect( workspace=args.workspace, - cluster=args.cluster, + cluster=cluster, expname_prefix=args.expname_prefix, wandb_project=args.wandb_project, ) @@ -171,7 +180,7 @@ def main(): run_cmd( ctx=wrap_arguments(checker_cmd), - cluster=args.cluster, + cluster=cluster, expname=args.expname_prefix + "-check-results", log_dir=f"{args.workspace}/check-results-logs", run_after=[bfcl_expname, online_genselect_expname, offline_genselect_expname], diff --git a/tests/slurm-tests/qwen3coder_30b_swebench/run_test.py b/tests/slurm-tests/qwen3coder_30b_swebench/run_test.py index 8bd0d93b7b..aa4378d82c 100644 --- a/tests/slurm-tests/qwen3coder_30b_swebench/run_test.py +++ b/tests/slurm-tests/qwen3coder_30b_swebench/run_test.py @@ -13,6 +13,12 @@ # limitations under the License. import argparse +import sys +from pathlib import Path + +# Add parent directory to path to import utils +sys.path.insert(0, str(Path(__file__).parents[1])) +from utils import prepare_cluster_config_for_test from nemo_skills.pipeline.cli import eval, prepare_data, run_cmd, wrap_arguments @@ -52,6 +58,9 @@ def main(): args = parser.parse_args() + # Prepare cluster config with job_dir set to workspace + cluster = prepare_cluster_config_for_test(args.cluster, args.workspace) + if args.container_formatter is None: prepare_data_args = "swe-bench" else: @@ -64,7 +73,7 @@ def main(): eval_qwen3coder( workspace=workspace, - cluster=args.cluster, + cluster=cluster, expname_prefix=expname_prefix, wandb_project=args.wandb_project, agent_framework=agent_framework, @@ -79,7 +88,7 @@ def main(): run_cmd( ctx=wrap_arguments(checker_cmd), - cluster=args.cluster, + cluster=cluster, expname=f"{expname_prefix}-check-results", log_dir=f"{workspace}/check-results-logs", run_after=expname_prefix, diff --git a/tests/slurm-tests/super_49b_evals/run_test.py b/tests/slurm-tests/super_49b_evals/run_test.py index d98000e733..ab211616fd 100644 --- a/tests/slurm-tests/super_49b_evals/run_test.py +++ b/tests/slurm-tests/super_49b_evals/run_test.py @@ -13,6 +13,12 @@ # limitations under the License. import argparse +import sys +from pathlib import Path + +# Add parent directory to path to import utils +sys.path.insert(0, str(Path(__file__).parents[1])) +from utils import prepare_cluster_config_for_test from nemo_skills.pipeline.cli import eval, prepare_data, run_cmd, wrap_arguments @@ -319,22 +325,25 @@ def main(): args = parser.parse_args() + # Prepare cluster config with job_dir set to workspace and get normalized expname prefix + cluster = prepare_cluster_config_for_test(args.cluster, args.workspace) + prepare_data( ctx=wrap_arguments("gpqa mmlu-pro hle livecodebench scicode bfcl_v3 math-500 aime24 aime25"), ) - setup(workspace=args.workspace, cluster=args.cluster, expname_prefix=args.expname_prefix) + setup(workspace=args.workspace, cluster=cluster, expname_prefix=args.expname_prefix) reasoning_on_expnames = eval_reasoning_on( workspace=args.workspace, - cluster=args.cluster, + cluster=cluster, expname_prefix=args.expname_prefix, wandb_project=args.wandb_project, ) reasoning_off_expnames = eval_reasoning_off( workspace=args.workspace, - cluster=args.cluster, + cluster=cluster, expname_prefix=args.expname_prefix, wandb_project=args.wandb_project, ) @@ -344,7 +353,7 @@ def main(): run_cmd( ctx=wrap_arguments(checker_cmd), - cluster=args.cluster, + cluster=cluster, expname=args.expname_prefix + "-check-results", log_dir=f"{args.workspace}/check-results-logs", run_after=reasoning_on_expnames + reasoning_off_expnames, diff --git a/tests/slurm-tests/utils.py b/tests/slurm-tests/utils.py index 0dd782eeb3..4d45dd202e 100644 --- a/tests/slurm-tests/utils.py +++ b/tests/slurm-tests/utils.py @@ -12,8 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy import json +from nemo_skills.pipeline.utils.cluster import get_cluster_config +from nemo_skills.pipeline.utils.mounts import get_mounts_from_config + def load_json(path): """Load a JSON file from the given path.""" @@ -56,3 +60,54 @@ def assert_all(): for i, msg in enumerate(_soft_assert_failures, 1): print(f"{i:3d}. {msg}") raise SystemExit(1) + + +def prepare_cluster_config_for_test(cluster, workspace, config_dir=None): + """Prepare a cluster config for testing by overriding job_dir to be within the test workspace. + + This ensures that nemo-run experiment artifacts are stored in {workspace}/nemo-run-experiments + instead of the global cluster job_dir, making it easier to correlate experiment IDs with + test runs and workspace locations. + + Note: This function resolves the workspace mount path to its actual filesystem path since + job_dir needs to be set before any containers are launched (it's sent over SSH, not inside + a container). + + Args: + cluster: Cluster name or config dict + workspace: Test workspace directory path (may be a mount path like /workspace/...) + config_dir: Optional directory to search for cluster configs + + Returns: + dict: Modified cluster config with job_dir set to {workspace_source}/nemo-run-experiments + """ + # Load the cluster config + cluster_config = get_cluster_config(cluster, config_dir) + + # Deep copy to avoid modifying original + cluster_config = copy.deepcopy(cluster_config) + + # Resolve workspace mount path to actual source path + # workspace might be a mount destination like /workspace/..., but job_dir needs + # the actual filesystem path (mount source) since it's set before containers are created + workspace_source = workspace + if "mounts" in cluster_config: + mounts = get_mounts_from_config(cluster_config) + for mount in mounts: + if ":" in mount: + source, dest = mount.split(":", 1) + # Check if workspace is under this mount destination + if workspace.startswith(dest): + # Replace the mount destination prefix with source prefix + workspace_source = workspace.replace(dest, source, 1) + break + + # Override job_dir to be within workspace (using the resolved source path) + test_job_dir = f"{workspace_source}/nemo-run-experiments" + + if "ssh_tunnel" in cluster_config: + cluster_config["ssh_tunnel"]["job_dir"] = test_job_dir + else: + cluster_config["job_dir"] = test_job_dir + + return cluster_config From 09f0acaea1248fc8f57687f4d4fe96382a95498a Mon Sep 17 00:00:00 2001 From: George Armstrong Date: Wed, 19 Nov 2025 11:51:07 -0800 Subject: [PATCH 2/5] ENH add cluster config with image resolution to experiment Signed-off-by: George Armstrong --- .../gpt_oss_python_aime25/run_test.py | 13 +- .../slurm-tests/omr_simple_recipe/run_test.py | 13 +- tests/slurm-tests/qwen3_4b_evals/run_test.py | 13 +- .../qwen3coder_30b_swebench/run_test.py | 13 +- tests/slurm-tests/super_49b_evals/run_test.py | 13 +- tests/slurm-tests/utils.py | 254 +++++++++++++++++- 6 files changed, 287 insertions(+), 32 deletions(-) diff --git a/tests/slurm-tests/gpt_oss_python_aime25/run_test.py b/tests/slurm-tests/gpt_oss_python_aime25/run_test.py index 78088bde73..826abad720 100644 --- a/tests/slurm-tests/gpt_oss_python_aime25/run_test.py +++ b/tests/slurm-tests/gpt_oss_python_aime25/run_test.py @@ -18,7 +18,7 @@ # Add parent directory to path to import utils sys.path.insert(0, str(Path(__file__).parents[1])) -from utils import prepare_cluster_config_for_test +from utils import add_common_args, prepare_cluster_config_for_test from nemo_skills.pipeline.cli import eval, prepare_data, run_cmd, wrap_arguments @@ -56,15 +56,16 @@ def eval_gpt_oss_python(workspace, cluster, expname_prefix, wandb_project): def main(): parser = argparse.ArgumentParser() - parser.add_argument("--workspace", required=True, help="Workspace directory containing all experiment data") - parser.add_argument("--cluster", required=True, help="Cluster name") - parser.add_argument("--expname_prefix", required=True, help="Experiment name prefix") - parser.add_argument("--wandb_project", default="nemo-skills-slurm-ci", help="W&B project name") + add_common_args(parser) args = parser.parse_args() # Prepare cluster config with job_dir set to workspace - cluster = prepare_cluster_config_for_test(args.cluster, args.workspace) + cluster = prepare_cluster_config_for_test( + args.cluster, + args.workspace, + cluster_config_mode=args.cluster_config_mode, + ) prepare_data(ctx=wrap_arguments("aime25")) diff --git a/tests/slurm-tests/omr_simple_recipe/run_test.py b/tests/slurm-tests/omr_simple_recipe/run_test.py index 7002c1eca3..30538dcbdd 100644 --- a/tests/slurm-tests/omr_simple_recipe/run_test.py +++ b/tests/slurm-tests/omr_simple_recipe/run_test.py @@ -18,17 +18,14 @@ # Add parent directory to path to import utils sys.path.insert(0, str(Path(__file__).parents[1])) -from utils import prepare_cluster_config_for_test +from utils import add_common_args, prepare_cluster_config_for_test from nemo_skills.pipeline.cli import run_cmd, wrap_arguments def main(): ap = argparse.ArgumentParser() - ap.add_argument("--cluster", required=True) - ap.add_argument("--workspace", required=True, help="Workspace path") - ap.add_argument("--wandb_project", default="nemo-skills-slurm-ci", help="W&B project name") - ap.add_argument("--expname_prefix", required=True, help="Experiment name prefix used inside the recipe") + add_common_args(ap) ap.add_argument("--disable_wandb", action="store_true", help="Disable W&B logging in the recipe") ap.add_argument( "--backend", @@ -40,7 +37,11 @@ def main(): args = ap.parse_args() # Prepare cluster config with job_dir set to workspace - cluster = prepare_cluster_config_for_test(args.cluster, args.workspace) + cluster = prepare_cluster_config_for_test( + args.cluster, + args.workspace, + cluster_config_mode=args.cluster_config_mode, + ) cmd = ( f"python -m recipes.openmathreasoning.scripts.simplified_recipe " diff --git a/tests/slurm-tests/qwen3_4b_evals/run_test.py b/tests/slurm-tests/qwen3_4b_evals/run_test.py index 27d644ffde..3f7b12f393 100644 --- a/tests/slurm-tests/qwen3_4b_evals/run_test.py +++ b/tests/slurm-tests/qwen3_4b_evals/run_test.py @@ -18,7 +18,7 @@ # Add parent directory to path to import utils sys.path.insert(0, str(Path(__file__).parents[1])) -from utils import prepare_cluster_config_for_test +from utils import add_common_args, prepare_cluster_config_for_test from nemo_skills.pipeline.cli import eval, prepare_data, run_cmd, wrap_arguments @@ -141,15 +141,16 @@ def eval_qwen3_offline_genselect(workspace, cluster, expname_prefix, wandb_proje def main(): parser = argparse.ArgumentParser() - parser.add_argument("--workspace", required=True, help="Workspace directory containing all experiment data") - parser.add_argument("--cluster", required=True, help="Cluster name, e.g. oci") - parser.add_argument("--expname_prefix", required=True, help="Experiment name prefix") - parser.add_argument("--wandb_project", default="nemo-skills-slurm-ci", help="W&B project name") + add_common_args(parser) args = parser.parse_args() # Prepare cluster config with job_dir set to workspace - cluster = prepare_cluster_config_for_test(args.cluster, args.workspace) + cluster = prepare_cluster_config_for_test( + args.cluster, + args.workspace, + cluster_config_mode=args.cluster_config_mode, + ) prepare_data(ctx=wrap_arguments("bfcl_v3 aime24")) diff --git a/tests/slurm-tests/qwen3coder_30b_swebench/run_test.py b/tests/slurm-tests/qwen3coder_30b_swebench/run_test.py index aa4378d82c..3aac559299 100644 --- a/tests/slurm-tests/qwen3coder_30b_swebench/run_test.py +++ b/tests/slurm-tests/qwen3coder_30b_swebench/run_test.py @@ -18,7 +18,7 @@ # Add parent directory to path to import utils sys.path.insert(0, str(Path(__file__).parents[1])) -from utils import prepare_cluster_config_for_test +from utils import add_common_args, prepare_cluster_config_for_test from nemo_skills.pipeline.cli import eval, prepare_data, run_cmd, wrap_arguments @@ -50,16 +50,17 @@ def eval_qwen3coder(workspace, cluster, expname_prefix, wandb_project, agent_fra def main(): parser = argparse.ArgumentParser() - parser.add_argument("--workspace", required=True, help="Workspace directory containing all experiment data") - parser.add_argument("--cluster", required=True, help="Cluster name, e.g. oci") - parser.add_argument("--expname_prefix", required=True, help="Experiment name prefix") - parser.add_argument("--wandb_project", default="nemo-skills-slurm-ci", help="W&B project name") + add_common_args(parser) parser.add_argument("--container_formatter", default=None, help="Container formatter for SWE-bench") args = parser.parse_args() # Prepare cluster config with job_dir set to workspace - cluster = prepare_cluster_config_for_test(args.cluster, args.workspace) + cluster = prepare_cluster_config_for_test( + args.cluster, + args.workspace, + cluster_config_mode=args.cluster_config_mode, + ) if args.container_formatter is None: prepare_data_args = "swe-bench" diff --git a/tests/slurm-tests/super_49b_evals/run_test.py b/tests/slurm-tests/super_49b_evals/run_test.py index ab211616fd..c4f2bd5415 100644 --- a/tests/slurm-tests/super_49b_evals/run_test.py +++ b/tests/slurm-tests/super_49b_evals/run_test.py @@ -18,7 +18,7 @@ # Add parent directory to path to import utils sys.path.insert(0, str(Path(__file__).parents[1])) -from utils import prepare_cluster_config_for_test +from utils import add_common_args, prepare_cluster_config_for_test from nemo_skills.pipeline.cli import eval, prepare_data, run_cmd, wrap_arguments @@ -318,15 +318,16 @@ def eval_reasoning_off(workspace, cluster, expname_prefix, wandb_project): def main(): parser = argparse.ArgumentParser() - parser.add_argument("--workspace", required=True, help="Workspace directory containing all experiment data") - parser.add_argument("--cluster", required=True, help="Cluster name") - parser.add_argument("--expname_prefix", required=True, help="Experiment name prefix") - parser.add_argument("--wandb_project", default="nemo-skills-slurm-ci", help="W&B project name") + add_common_args(parser) args = parser.parse_args() # Prepare cluster config with job_dir set to workspace and get normalized expname prefix - cluster = prepare_cluster_config_for_test(args.cluster, args.workspace) + cluster = prepare_cluster_config_for_test( + args.cluster, + args.workspace, + cluster_config_mode=args.cluster_config_mode, + ) prepare_data( ctx=wrap_arguments("gpqa mmlu-pro hle livecodebench scicode bfcl_v3 math-500 aime24 aime25"), diff --git a/tests/slurm-tests/utils.py b/tests/slurm-tests/utils.py index 4d45dd202e..097db436f8 100644 --- a/tests/slurm-tests/utils.py +++ b/tests/slurm-tests/utils.py @@ -14,10 +14,28 @@ import copy import json +import os +import subprocess +import tempfile +from datetime import datetime +from pathlib import Path -from nemo_skills.pipeline.utils.cluster import get_cluster_config +import yaml + +from nemo_skills.pipeline.utils import ( + cluster_download_file, + cluster_path_exists, + cluster_upload, + create_remote_directory, + get_cluster_config, +) from nemo_skills.pipeline.utils.mounts import get_mounts_from_config +_SUPPORTED_CLUSTER_CONFIG_MODES = {"assert", "overwrite", "reuse"} +_DEFAULT_CLUSTER_CONFIG_FILENAME = "cluster_config.yaml" +_DEFAULT_COMMIT_FILENAME = "nemo_skills_commit.json" +_REPO_ROOT = Path(__file__).resolve().parents[3] + def load_json(path): """Load a JSON file from the given path.""" @@ -62,7 +80,15 @@ def assert_all(): raise SystemExit(1) -def prepare_cluster_config_for_test(cluster, workspace, config_dir=None): +def prepare_cluster_config_for_test( + cluster, + workspace, + config_dir=None, + *, + cluster_config_mode: str = "assert", + cluster_config_filename: str = _DEFAULT_CLUSTER_CONFIG_FILENAME, + commit_metadata_filename: str = _DEFAULT_COMMIT_FILENAME, +): """Prepare a cluster config for testing by overriding job_dir to be within the test workspace. This ensures that nemo-run experiment artifacts are stored in {workspace}/nemo-run-experiments @@ -77,10 +103,23 @@ def prepare_cluster_config_for_test(cluster, workspace, config_dir=None): cluster: Cluster name or config dict workspace: Test workspace directory path (may be a mount path like /workspace/...) config_dir: Optional directory to search for cluster configs + cluster_config_mode: How to handle existing snapshots inside job_dir. + - "assert": require the saved config (and commit metadata) to match the newly generated one. + - "overwrite": replace the saved files with the newly generated versions. + - "reuse": load the previously saved config and use it for this run without modifying the files. + cluster_config_filename: Name of the snapshot file saved under job_dir. + commit_metadata_filename: Name of the commit metadata file saved under job_dir. Returns: dict: Modified cluster config with job_dir set to {workspace_source}/nemo-run-experiments """ + cluster_config_mode = cluster_config_mode.lower() + if cluster_config_mode not in _SUPPORTED_CLUSTER_CONFIG_MODES: + raise ValueError( + f"Unsupported cluster_config_mode '{cluster_config_mode}'. " + f"Supported values: {sorted(_SUPPORTED_CLUSTER_CONFIG_MODES)}" + ) + # Load the cluster config cluster_config = get_cluster_config(cluster, config_dir) @@ -107,7 +146,218 @@ def prepare_cluster_config_for_test(cluster, workspace, config_dir=None): if "ssh_tunnel" in cluster_config: cluster_config["ssh_tunnel"]["job_dir"] = test_job_dir + job_dir = cluster_config["ssh_tunnel"]["job_dir"] else: cluster_config["job_dir"] = test_job_dir + job_dir = cluster_config["job_dir"] + + cluster_config["job_dir"] = cluster_config.get("job_dir", test_job_dir) + _resolve_container_image_paths(cluster_config) + + return _sync_cluster_config_snapshot( + cluster_config, + job_dir=job_dir, + mode=cluster_config_mode, + cluster_config_filename=cluster_config_filename, + commit_metadata_filename=commit_metadata_filename, + ) + + +def _resolve_container_image_paths(cluster_config: dict): + """Resolve local symlinks for container image paths so snapshots capture canonical targets.""" + containers = cluster_config.get("containers") + if not isinstance(containers, dict): + return + + resolved = {} + for name, path in containers.items(): + if isinstance(path, str): + resolved[name] = os.path.realpath(path) + else: + resolved[name] = path + cluster_config["containers"] = resolved + + +def _sync_cluster_config_snapshot( + cluster_config: dict, + *, + job_dir: str, + mode: str, + cluster_config_filename: str, + commit_metadata_filename: str, +): + """Persist the cluster config / commit metadata according to the selected mode.""" + job_dir = str(Path(job_dir)) + config_remote_path = str(Path(job_dir) / cluster_config_filename) + commit_remote_path = str(Path(job_dir) / commit_metadata_filename) + + if mode == "reuse": + if not cluster_path_exists(cluster_config, config_remote_path): + raise FileNotFoundError( + f"cluster_config_mode 'reuse' requires an existing snapshot at {config_remote_path}" + ) + persisted = _download_remote_yaml(cluster_config, config_remote_path) + if not isinstance(persisted, dict): + raise ValueError(f"Existing cluster config at {config_remote_path} is not a valid mapping.") + _ensure_job_dir(persisted, job_dir) + _resolve_container_image_paths(persisted) + _sync_commit_metadata(cluster_config, commit_remote_path, mode) + return persisted + create_remote_directory(job_dir, cluster_config) + existing_remote = cluster_path_exists(cluster_config, config_remote_path) + if existing_remote: + persisted = _download_remote_yaml(cluster_config, config_remote_path) + if mode == "assert": + if not _cluster_configs_equal(persisted, cluster_config): + raise AssertionError( + "Existing cluster config snapshot does not match the newly generated config. " + "Use --cluster_config_mode overwrite to update the snapshot or reuse to keep the existing one." + ) + _sync_commit_metadata(cluster_config, commit_remote_path, mode) + return cluster_config + + _upload_yaml(cluster_config, cluster_config, config_remote_path) + _sync_commit_metadata(cluster_config, commit_remote_path, mode) return cluster_config + + +def add_common_args(parser, *, include_wandb: bool = True, wandb_default: str = "nemo-skills-slurm-ci"): + """Register the shared CLI arguments used by slurm test entrypoints.""" + + parser.add_argument( + "--workspace", + required=True, + help="Workspace directory containing all experiment data", + ) + parser.add_argument( + "--cluster", + required=True, + help="Cluster config name or path (same semantics as --cluster on nemo-skills CLI).", + ) + parser.add_argument( + "--expname_prefix", + required=True, + help="Experiment name prefix used to group nemo-run jobs for this test.", + ) + if include_wandb: + parser.add_argument( + "--wandb_project", + default=wandb_default, + help="W&B project name used for logging (set to empty string to disable).", + ) + + parser.add_argument( + "--cluster_config_mode", + choices=sorted(_SUPPORTED_CLUSTER_CONFIG_MODES), + default="assert", + help="Controls how existing cluster config snapshots under the workspace job_dir are handled.", + ) + + return parser + + +def _cluster_configs_equal(config_a: dict, config_b: dict) -> bool: + """Compare two configs after normalizing container image paths.""" + + def _normalize(config: dict): + config_copy = copy.deepcopy(config) + _resolve_container_image_paths(config_copy) + return config_copy + + return _normalize(config_a) == _normalize(config_b) + + +def _ensure_job_dir(cluster_config: dict, job_dir: str): + """Ensure the provided cluster config uses the expected workspace job_dir.""" + if "ssh_tunnel" in cluster_config: + cluster_config["ssh_tunnel"]["job_dir"] = job_dir + else: + cluster_config["job_dir"] = job_dir + + +def _sync_commit_metadata(cluster_config: dict, remote_path: str, mode: str): + """Persist commit metadata using the same mode semantics as the config snapshot.""" + metadata = _collect_repo_metadata() + remote_exists = cluster_path_exists(cluster_config, remote_path) + + if mode == "reuse": + if not remote_exists: + raise FileNotFoundError(f"cluster_config_mode 'reuse' requires existing commit metadata at {remote_path}") + return + + if remote_exists and mode == "assert": + existing = _download_remote_json(cluster_config, remote_path) + if existing != metadata: + raise AssertionError( + "Existing commit metadata does not match the current repository state. " + "Use --cluster_config_mode overwrite to refresh the snapshot." + ) + return + + _upload_json(cluster_config, metadata, remote_path) + + +def _collect_repo_metadata() -> dict: + """Gather information about the current NeMo-Skills checkout.""" + metadata = { + "repo_root": str(_REPO_ROOT), + "timestamp_utc": datetime.utcnow().isoformat(timespec="seconds") + "Z", + } + + def _run_git(*args): + result = subprocess.run( + ["git", *args], + cwd=_REPO_ROOT, + check=False, + capture_output=True, + text=True, + ) + return result.stdout.strip() if result.returncode == 0 else None + + metadata["commit"] = _run_git("rev-parse", "HEAD") + metadata["describe"] = _run_git("describe", "--always", "--dirty") + metadata["is_dirty"] = bool(_run_git("status", "--short")) + return metadata + + +def _download_remote_yaml(cluster_config: dict, remote_path: str) -> dict: + with tempfile.NamedTemporaryFile(delete=False) as tmp: + tmp_path = tmp.name + try: + cluster_download_file(cluster_config, remote_path, tmp_path) + with open(tmp_path, "rt", encoding="utf-8") as fin: + return yaml.safe_load(fin) or {} + finally: + os.remove(tmp_path) + + +def _download_remote_json(cluster_config: dict, remote_path: str) -> dict: + with tempfile.NamedTemporaryFile(delete=False) as tmp: + tmp_path = tmp.name + try: + cluster_download_file(cluster_config, remote_path, tmp_path) + return load_json(tmp_path) + finally: + os.remove(tmp_path) + + +def _upload_yaml(cluster_config: dict, data: dict, remote_path: str): + with tempfile.NamedTemporaryFile(mode="wt", encoding="utf-8", delete=False) as tmp: + yaml.safe_dump(data, tmp, sort_keys=True) + tmp_path = tmp.name + try: + cluster_upload(cluster_config, tmp_path, remote_path) + finally: + os.remove(tmp_path) + + +def _upload_json(cluster_config: dict, data: dict, remote_path: str): + with tempfile.NamedTemporaryFile(mode="wt", encoding="utf-8", delete=False) as tmp: + json.dump(data, tmp, indent=2, sort_keys=True) + tmp.write("\n") + tmp_path = tmp.name + try: + cluster_upload(cluster_config, tmp_path, remote_path) + finally: + os.remove(tmp_path) From 3a3483e9d4f00abf751b5ff9699e3155e67cab4d Mon Sep 17 00:00:00 2001 From: George Armstrong Date: Wed, 19 Nov 2025 12:02:57 -0800 Subject: [PATCH 3/5] WIP fix canonicalization Signed-off-by: George Armstrong --- tests/slurm-tests/utils.py | 74 +++++++++++++++++++++++++++++++------- 1 file changed, 62 insertions(+), 12 deletions(-) diff --git a/tests/slurm-tests/utils.py b/tests/slurm-tests/utils.py index 097db436f8..24d36fda6e 100644 --- a/tests/slurm-tests/utils.py +++ b/tests/slurm-tests/utils.py @@ -15,9 +15,11 @@ import copy import json import os +import shlex import subprocess import tempfile from datetime import datetime +from functools import lru_cache from pathlib import Path import yaml @@ -28,13 +30,32 @@ cluster_upload, create_remote_directory, get_cluster_config, + get_tunnel, ) from nemo_skills.pipeline.utils.mounts import get_mounts_from_config _SUPPORTED_CLUSTER_CONFIG_MODES = {"assert", "overwrite", "reuse"} _DEFAULT_CLUSTER_CONFIG_FILENAME = "cluster_config.yaml" _DEFAULT_COMMIT_FILENAME = "nemo_skills_commit.json" -_REPO_ROOT = Path(__file__).resolve().parents[3] + + +@lru_cache(maxsize=1) +def _get_repo_root(): + """Return the git repository root if available, otherwise fallback to project root.""" + current_dir = Path(__file__).resolve().parent + result = subprocess.run( + ["git", "rev-parse", "--show-toplevel"], + cwd=current_dir, + check=False, + capture_output=True, + text=True, + ) + if result.returncode == 0: + root = result.stdout.strip() + if root: + return Path(root) + # Fallback to the NeMo-Skills directory relative to this file + return Path(__file__).resolve().parents[3] def load_json(path): @@ -144,6 +165,8 @@ def prepare_cluster_config_for_test( # Override job_dir to be within workspace (using the resolved source path) test_job_dir = f"{workspace_source}/nemo-run-experiments" + snapshot_dir = workspace_source + if "ssh_tunnel" in cluster_config: cluster_config["ssh_tunnel"]["job_dir"] = test_job_dir job_dir = cluster_config["ssh_tunnel"]["job_dir"] @@ -157,6 +180,7 @@ def prepare_cluster_config_for_test( return _sync_cluster_config_snapshot( cluster_config, job_dir=job_dir, + snapshot_dir=snapshot_dir, mode=cluster_config_mode, cluster_config_filename=cluster_config_filename, commit_metadata_filename=commit_metadata_filename, @@ -164,32 +188,56 @@ def prepare_cluster_config_for_test( def _resolve_container_image_paths(cluster_config: dict): - """Resolve local symlinks for container image paths so snapshots capture canonical targets.""" + """Resolve local/remote symlinks for container image paths so snapshots capture canonical targets.""" containers = cluster_config.get("containers") if not isinstance(containers, dict): return resolved = {} for name, path in containers.items(): - if isinstance(path, str): - resolved[name] = os.path.realpath(path) - else: - resolved[name] = path + resolved[name] = _resolve_path_with_remote(cluster_config, path) cluster_config["containers"] = resolved +def _resolve_path_with_remote(cluster_config: dict, path: str): + """Resolve the provided path locally, and fallback to remote resolution if needed.""" + if not isinstance(path, str) or not path: + return path + + local_resolved = os.path.realpath(path) + if os.path.exists(local_resolved): + return local_resolved + + if cluster_config.get("executor") != "slurm": + return local_resolved + + tunnel = None + try: + tunnel = get_tunnel(cluster_config) + result = tunnel.run(f"readlink -f {shlex.quote(path)}", hide=True, warn=True) + resolved_remote = result.stdout.strip() if result.exited == 0 else "" + return resolved_remote or local_resolved + except Exception: + return local_resolved + finally: + if tunnel is not None: + tunnel.cleanup() + + def _sync_cluster_config_snapshot( cluster_config: dict, *, job_dir: str, + snapshot_dir: str, mode: str, cluster_config_filename: str, commit_metadata_filename: str, ): """Persist the cluster config / commit metadata according to the selected mode.""" job_dir = str(Path(job_dir)) - config_remote_path = str(Path(job_dir) / cluster_config_filename) - commit_remote_path = str(Path(job_dir) / commit_metadata_filename) + snapshot_dir = str(Path(snapshot_dir)) + config_remote_path = str(Path(snapshot_dir) / cluster_config_filename) + commit_remote_path = str(Path(snapshot_dir) / commit_metadata_filename) if mode == "reuse": if not cluster_path_exists(cluster_config, config_remote_path): @@ -204,7 +252,7 @@ def _sync_cluster_config_snapshot( _sync_commit_metadata(cluster_config, commit_remote_path, mode) return persisted - create_remote_directory(job_dir, cluster_config) + create_remote_directory([job_dir, snapshot_dir], cluster_config) existing_remote = cluster_path_exists(cluster_config, config_remote_path) if existing_remote: persisted = _download_remote_yaml(cluster_config, config_remote_path) @@ -300,15 +348,16 @@ def _sync_commit_metadata(cluster_config: dict, remote_path: str, mode: str): def _collect_repo_metadata() -> dict: """Gather information about the current NeMo-Skills checkout.""" + repo_root = _get_repo_root() metadata = { - "repo_root": str(_REPO_ROOT), + "repo_root": str(repo_root), "timestamp_utc": datetime.utcnow().isoformat(timespec="seconds") + "Z", } def _run_git(*args): result = subprocess.run( ["git", *args], - cwd=_REPO_ROOT, + cwd=repo_root, check=False, capture_output=True, text=True, @@ -317,7 +366,8 @@ def _run_git(*args): metadata["commit"] = _run_git("rev-parse", "HEAD") metadata["describe"] = _run_git("describe", "--always", "--dirty") - metadata["is_dirty"] = bool(_run_git("status", "--short")) + status_output = _run_git("status", "--short") + metadata["is_dirty"] = bool(status_output) if status_output is not None else None return metadata From 0a566d9bd4493868ed89a63c06492484fc0a6f55 Mon Sep 17 00:00:00 2001 From: George Armstrong Date: Fri, 21 Nov 2025 10:10:17 -0800 Subject: [PATCH 4/5] MAKE tests check respect uncommitted changes check Signed-off-by: George Armstrong --- tests/slurm-tests/utils.py | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/tests/slurm-tests/utils.py b/tests/slurm-tests/utils.py index 24d36fda6e..4e8e4fcdfe 100644 --- a/tests/slurm-tests/utils.py +++ b/tests/slurm-tests/utils.py @@ -37,11 +37,25 @@ _SUPPORTED_CLUSTER_CONFIG_MODES = {"assert", "overwrite", "reuse"} _DEFAULT_CLUSTER_CONFIG_FILENAME = "cluster_config.yaml" _DEFAULT_COMMIT_FILENAME = "nemo_skills_commit.json" +_UNCOMMITTED_ENV = "NEMO_SKILLS_DISABLE_UNCOMMITTED_CHANGES_CHECK" +_UNCOMMITTED_SKIP_VALUES = {"1", "true", "yes"} +_UNCOMMITTED_ERROR_MSG = ( + "The NeMo-Skills checkout you're using to launch this Slurm test has uncommitted changes.\n" + "We snapshot the repo state into each test workspace for reproducibility, but we cannot do so " + "while the working tree is dirty.\n" + "Please commit or stash your changes, or set NEMO_SKILLS_DISABLE_UNCOMMITTED_CHANGES_CHECK=1 " + "if you intentionally want to snapshot an in-progress state (note: this also disables the " + "global nemo-skills submission check)." +) + + +def _is_uncommitted_check_disabled() -> bool: + return os.environ.get(_UNCOMMITTED_ENV, "0").lower() in _UNCOMMITTED_SKIP_VALUES @lru_cache(maxsize=1) -def _get_repo_root(): - """Return the git repository root if available, otherwise fallback to project root.""" +def _get_repo_root() -> Path: + """Return the git repository root for the current checkout.""" current_dir = Path(__file__).resolve().parent result = subprocess.run( ["git", "rev-parse", "--show-toplevel"], @@ -335,6 +349,8 @@ def _sync_commit_metadata(cluster_config: dict, remote_path: str, mode: str): return if remote_exists and mode == "assert": + if _is_uncommitted_check_disabled(): + return existing = _download_remote_json(cluster_config, remote_path) if existing != metadata: raise AssertionError( @@ -364,9 +380,12 @@ def _run_git(*args): ) return result.stdout.strip() if result.returncode == 0 else None + status_output = _run_git("status", "--short") + if status_output and not _is_uncommitted_check_disabled(): + raise RuntimeError(_UNCOMMITTED_ERROR_MSG) + metadata["commit"] = _run_git("rev-parse", "HEAD") metadata["describe"] = _run_git("describe", "--always", "--dirty") - status_output = _run_git("status", "--short") metadata["is_dirty"] = bool(status_output) if status_output is not None else None return metadata From e47504f9c1ffe7ff25ae25b9d3ebc010480a8bb4 Mon Sep 17 00:00:00 2001 From: George Armstrong Date: Fri, 21 Nov 2025 10:55:23 -0800 Subject: [PATCH 5/5] ENH passthrough cluster config mode Signed-off-by: George Armstrong --- tests/slurm-tests/run_all.sh | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/tests/slurm-tests/run_all.sh b/tests/slurm-tests/run_all.sh index 200cf5e3fc..5d4c2bde85 100755 --- a/tests/slurm-tests/run_all.sh +++ b/tests/slurm-tests/run_all.sh @@ -2,16 +2,39 @@ CLUSTER=$1 RUN_NAME=${2:-$(date +%Y-%m-%d)} +# Parse --cluster_config_mode flag with default 'assert' +CLUSTER_CONFIG_MODE="assert" +POSITIONAL_ARGS=() + +while [[ $# -gt 0 ]]; do + case "$1" in + --cluster_config_mode) + CLUSTER_CONFIG_MODE="$2" + shift 2 + ;; + *) + POSITIONAL_ARGS+=("$1") + shift + ;; + esac +done + +# Restore positional parameters (so $1 = cluster, $2 = run_name, etc.) +set -- "${POSITIONAL_ARGS[@]}" + +CLUSTER=$1 +RUN_NAME=${2:-$(date +%Y-%m-%d)} + # TODO: change back to parallel submission after fixing https://github.com/NVIDIA-NeMo/Skills/issues/964 -python tests/slurm-tests/gpt_oss_python_aime25/run_test.py --cluster $CLUSTER --workspace /workspace/nemo-skills-slurm-ci/$RUN_NAME/gpt_oss_python_aime25 --expname_prefix gpt_oss_python_aime25_$RUN_NAME +python tests/slurm-tests/gpt_oss_python_aime25/run_test.py --cluster $CLUSTER --cluster_config_mode $CLUSTER_CONFIG_MODE --workspace /workspace/nemo-skills-slurm-ci/$RUN_NAME/gpt_oss_python_aime25 --expname_prefix gpt_oss_python_aime25_$RUN_NAME # sleep 10 -python tests/slurm-tests/super_49b_evals/run_test.py --cluster $CLUSTER --workspace /workspace/nemo-skills-slurm-ci/$RUN_NAME/super_49b_evals --expname_prefix super_49b_evals_$RUN_NAME +python tests/slurm-tests/super_49b_evals/run_test.py --cluster $CLUSTER --cluster_config_mode $CLUSTER_CONFIG_MODE --workspace /workspace/nemo-skills-slurm-ci/$RUN_NAME/super_49b_evals --expname_prefix super_49b_evals_$RUN_NAME # sleep 10 -python tests/slurm-tests/qwen3_4b_evals/run_test.py --cluster $CLUSTER --workspace /workspace/nemo-skills-slurm-ci/$RUN_NAME/qwen3_4b_evals --expname_prefix qwen3_4b_evals_$RUN_NAME +python tests/slurm-tests/qwen3_4b_evals/run_test.py --cluster $CLUSTER --cluster_config_mode $CLUSTER_CONFIG_MODE --workspace /workspace/nemo-skills-slurm-ci/$RUN_NAME/qwen3_4b_evals --expname_prefix qwen3_4b_evals_$RUN_NAME # sleep 10 -python tests/slurm-tests/omr_simple_recipe/run_test.py --cluster $CLUSTER --workspace /workspace/nemo-skills-slurm-ci/$RUN_NAME/omr_simple_recipe/nemo-rl --expname_prefix omr_simple_recipe_nemo_rl_$RUN_NAME +python tests/slurm-tests/omr_simple_recipe/run_test.py --cluster $CLUSTER --cluster_config_mode $CLUSTER_CONFIG_MODE --workspace /workspace/nemo-skills-slurm-ci/$RUN_NAME/omr_simple_recipe/nemo-rl --expname_prefix omr_simple_recipe_nemo_rl_$RUN_NAME # sleep 10 -python tests/slurm-tests/qwen3coder_30b_swebench/run_test.py --cluster $CLUSTER --workspace /workspace/nemo-skills-slurm-ci/$RUN_NAME/qwen3coder_30b_swebench --expname_prefix qwen3coder_30b_swebench_$RUN_NAME --container_formatter '/swe-bench-images/swebench_sweb.eval.x86_64.{instance_id}.sif' +python tests/slurm-tests/qwen3coder_30b_swebench/run_test.py --cluster $CLUSTER --cluster_config_mode $CLUSTER_CONFIG_MODE --workspace /workspace/nemo-skills-slurm-ci/$RUN_NAME/qwen3coder_30b_swebench --expname_prefix qwen3coder_30b_swebench_$RUN_NAME --container_formatter '/swe-bench-images/swebench_sweb.eval.x86_64.{instance_id}.sif' # wait