From d86e6c97e03d5922a5a50d112173d87d43bbdb3e Mon Sep 17 00:00:00 2001 From: George Armstrong Date: Tue, 10 Feb 2026 13:39:42 -0800 Subject: [PATCH 1/7] feat: add NeMo Gym rollouts pipeline and multi-node vLLM support Add `ns nemo_gym_rollouts` CLI command for orchestrating rollout collection with NeMo Gym, including self-hosted or pre-hosted vLLM servers, optional sandbox, and parallel seed-based scaling. New script classes: - NemoGymRolloutsScript: orchestrates ng_run + ng_collect_rollouts - MultiVLLMServerScript: data-parallel vLLM with multiple replicas per node - GymClientScript: multi-node vLLM server discovery via SLURM env vars Infrastructure enhancements: - Command: mounts, environment, workdir, avoid/force nemo_run_code controls - Per-script num_tasks_override for mixed task configurations - Allocation ordering fix for multi-component overlap jobs - NEMO_SKILLS_FORCE_PATTERN_PACKAGER env var for packaging control - --kill-on-bad-exit=1 srun flag for early failure detection Signed-off-by: George Armstrong --- nemo_skills/pipeline/cli.py | 1 + nemo_skills/pipeline/nemo_gym_rollouts.py | 374 ++++++++++++++ nemo_skills/pipeline/utils/__init__.py | 9 + nemo_skills/pipeline/utils/declarative.py | 108 ++++- nemo_skills/pipeline/utils/exp.py | 1 + nemo_skills/pipeline/utils/packager.py | 27 +- nemo_skills/pipeline/utils/scripts.py | 564 ++++++++++++++++++++++ 7 files changed, 1070 insertions(+), 14 deletions(-) create mode 100644 nemo_skills/pipeline/nemo_gym_rollouts.py diff --git a/nemo_skills/pipeline/cli.py b/nemo_skills/pipeline/cli.py index 76ea60a73d..2df7eb772c 100644 --- a/nemo_skills/pipeline/cli.py +++ b/nemo_skills/pipeline/cli.py @@ -26,6 +26,7 @@ from nemo_skills.pipeline.generate import generate from nemo_skills.pipeline.megatron_lm.train import train_megatron_lm from nemo_skills.pipeline.nemo_evaluator import nemo_evaluator +from nemo_skills.pipeline.nemo_gym_rollouts import nemo_gym_rollouts from nemo_skills.pipeline.nemo_rl.grpo import grpo_nemo_rl from nemo_skills.pipeline.nemo_rl.sft import sft_nemo_rl from nemo_skills.pipeline.prepare_data import prepare_data diff --git a/nemo_skills/pipeline/nemo_gym_rollouts.py b/nemo_skills/pipeline/nemo_gym_rollouts.py new file mode 100644 index 0000000000..80fcb4b070 --- /dev/null +++ b/nemo_skills/pipeline/nemo_gym_rollouts.py @@ -0,0 +1,374 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""NeMo Gym Rollouts Pipeline. + +This pipeline command runs rollout collection with NeMo Gym, orchestrating: +- vLLM model server (optional, can use pre-hosted) +- Sandbox for code execution (optional) +- NeMo Gym servers (ng_run) +- Rollout collection client (ng_collect_rollouts) + +Example usage: + # Self-hosted vLLM server + ns nemo_gym_rollouts \\ + --cluster local \\ + --config_paths "ns_tools/configs/ns_tools.yaml,math_with_judge/configs/math_with_judge.yaml" \\ + --input_file data/example.jsonl \\ + --output_dir /results/rollouts \\ + --model /path/to/model \\ + --server_type vllm \\ + --server_gpus 1 \\ + --with_sandbox \\ + +agent_name=ns_tools_simple_agent \\ + +limit=10 \\ + +num_samples_in_parallel=3 \\ + +num_repeats=4 # Run each prompt 4 times for mean@4 + + # Pre-hosted server + ns nemo_gym_rollouts \\ + --cluster local \\ + --config_paths "ns_tools/configs/ns_tools.yaml" \\ + --input_file data/example.jsonl \\ + --output_dir /results/rollouts \\ + --server_address http://localhost:8000/v1 \\ + --policy_model_name nvidia/model-name \\ + +agent_name=ns_tools_simple_agent +""" + +import logging + +import typer + +import nemo_skills.pipeline.utils as pipeline_utils +from nemo_skills.pipeline.app import app, typer_unpacker +from nemo_skills.pipeline.utils.cluster import cluster_path_exists, parse_kwargs +from nemo_skills.pipeline.utils.declarative import ( + Command, + CommandGroup, + HardwareConfig, + Pipeline, +) +from nemo_skills.pipeline.utils.mounts import get_unmounted_path +from nemo_skills.pipeline.utils.scripts import ( + NemoGymRolloutsScript, + SandboxScript, + ServerScript, +) +from nemo_skills.utils import get_logger_name, setup_logging + +LOG = logging.getLogger(get_logger_name(__file__)) + + +@app.command(context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) +@typer_unpacker +def nemo_gym_rollouts( + ctx: typer.Context, + cluster: str = typer.Option( + None, + help="One of the configs inside config_dir or NEMO_SKILLS_CONFIG_DIR or ./cluster_configs. " + "Can also use NEMO_SKILLS_CONFIG instead of specifying as argument.", + ), + config_paths: str = typer.Option( + ..., + help="Comma-separated list of NeMo Gym config YAML files for ng_run. " + "E.g., 'ns_tools/configs/ns_tools.yaml,math_with_judge/configs/math_with_judge.yaml'", + ), + input_file: str = typer.Option(..., help="Path to input JSONL file for rollout collection"), + output_dir: str = typer.Option(..., help="Directory for rollout outputs. Output file will be rollouts.jsonl"), + expname: str = typer.Option("nemo_gym_rollouts", help="NeMo Run experiment name"), + model: str = typer.Option(None, help="Path to model for self-hosted vLLM server"), + server_address: str = typer.Option( + None, + help="Address of pre-hosted server (e.g., http://localhost:8000/v1). If provided, skips self-hosted server.", + ), + server_type: pipeline_utils.SupportedServers = typer.Option( + None, + help="Type of server (vllm, trtllm, sglang, etc.)", + ), + server_gpus: int = typer.Option(None, help="Number of GPUs for self-hosted server"), + server_nodes: int = typer.Option(1, help="Number of nodes for self-hosted server"), + server_args: str = typer.Option("", help="Additional arguments for the server"), + with_sandbox: bool = typer.Option(False, help="If True, start a sandbox container for code execution"), + gym_path: str = typer.Option( + "/opt/NeMo-RL/3rdparty/Gym-workspace/Gym", + help="Path to NeMo Gym installation. Defaults to container built-in. Use for mounted/custom Gym.", + ), + policy_api_key: str = typer.Option( + "dummy", + help="API key for policy server. Use 'dummy' for local vLLM servers.", + ), + policy_model_name: str = typer.Option( + None, + help="Model name for policy server. Required for pre-hosted servers. " + "For self-hosted, defaults to the model path if not specified.", + ), + partition: str = typer.Option(None, help="Cluster partition to use"), + qos: str = typer.Option(None, help="Specify Slurm QoS"), + time_min: str = typer.Option(None, help="Slurm time-min parameter"), + config_dir: str = typer.Option(None, help="Custom directory for cluster configs"), + log_dir: str = typer.Option(None, help="Custom location for logs"), + exclusive: bool | None = typer.Option(None, help="Add exclusive flag to slurm job"), + num_random_seeds: int = typer.Option( + None, + help="Number of parallel rollout jobs to run. Each job writes to rollouts-rs{i}.jsonl. " + "Use this to scale rollout collection across multiple nodes.", + ), + random_seeds: str = typer.Option( + None, + help="Explicit list of seed indices to run (comma-separated, e.g., '0,2,5,7'). " + "Overrides num_random_seeds. Can provide a list directly when using through Python.", + ), + starting_seed: int = typer.Option( + 0, + help="Starting seed index when using num_random_seeds. " + "E.g., starting_seed=10 with num_random_seeds=4 creates seeds 10,11,12,13.", + ), + rerun_done: bool = typer.Option( + False, + help="If False (default), skip seeds that already have output files. " + "If True, re-run all seeds even if output files exist.", + ), + use_mounted_nemo_skills: bool = typer.Option( + True, + help="If True (default), use the nemo-skills code packaged to /nemo_run/code. " + "If False, avoid /nemo_run/code and use the container's installed nemo-skills instead. " + "Set to False when you want Gym to use its own bundled nemo-skills version.", + ), + dry_run: bool = typer.Option(False, help="Validate without executing"), + sbatch_kwargs: str = typer.Option("", help="Additional sbatch kwargs as JSON string"), +): + """Run NeMo Gym rollout collection pipeline. + + This command orchestrates running rollout collection with NeMo Gym: + 1. Optionally starts a vLLM model server (or uses pre-hosted) + 2. Optionally starts a sandbox for code execution + 3. Starts NeMo Gym servers via ng_run + 4. Runs ng_collect_rollouts to collect rollouts + + All Hydra arguments (prefixed with + or ++) are passed through to ng_run + and ng_collect_rollouts. Common arguments include: + - +agent_name=... (required for rollout collection) + - +limit=... (limit number of samples from input) + - +num_samples_in_parallel=... (concurrent requests) + - +num_repeats=N (run each prompt N times for mean@k evaluation) + + For large-scale rollout collection, use --num_random_seeds to create multiple + independent jobs. Each job has its own server and sandbox (unique ports to avoid + conflicts if scheduled on same node) and writes to rollouts-rs{i}.jsonl. + + Use --starting_seed to offset seed numbering (e.g., to continue from a previous run). + Use --random_seeds to specify explicit seeds (e.g., '0,2,5,7' to re-run specific seeds). + """ + setup_logging(disable_hydra_logs=False, use_rich=True) + extra_arguments = " ".join(ctx.args) + LOG.info("Starting NeMo Gym rollouts pipeline") + LOG.info(f"Extra arguments: {extra_arguments}") + + # Parse config paths + config_paths_list = [p.strip() for p in config_paths.split(",") if p.strip()] + LOG.info(f"Config paths: {config_paths_list}") + + # Validate server configuration + self_hosted = model is not None and server_gpus is not None + pre_hosted = server_address is not None + + if not self_hosted and not pre_hosted: + raise ValueError( + "Must provide either --model and --server_gpus for self-hosted server, " + "or --server_address for pre-hosted server" + ) + + if self_hosted and pre_hosted: + raise ValueError("Cannot specify both self-hosted (--model, --server_gpus) and pre-hosted (--server_address)") + + if self_hosted and server_type is None: + raise ValueError("--server_type is required when using self-hosted server") + + # Validate and set policy_model_name + if pre_hosted and policy_model_name is None: + raise ValueError("--policy_model_name is required when using a pre-hosted server (--server_address)") + + if self_hosted and policy_model_name is None: + # For self-hosted, default to the model path + policy_model_name = model + LOG.info(f"Using model path as policy_model_name: {policy_model_name}") + + # Get cluster config + cluster_config = pipeline_utils.get_cluster_config(cluster, config_dir) + + if not log_dir: + log_dir = f"{output_dir}/logs" + + # Parse sbatch kwargs + sbatch_kwargs_dict = parse_kwargs(sbatch_kwargs, exclusive=exclusive, qos=qos, time_min=time_min) + + # Determine seed indices for parallel jobs + if random_seeds is not None: + # Explicit seeds provided + if isinstance(random_seeds, str): + seed_indices = [int(s.strip()) for s in random_seeds.split(",")] + else: + seed_indices = list(random_seeds) + LOG.info(f"Using explicit seeds: {seed_indices}") + elif num_random_seeds: + seed_indices = list(range(starting_seed, starting_seed + num_random_seeds)) + LOG.info( + f"Creating {num_random_seeds} separate jobs (rs{starting_seed}..rs{starting_seed + num_random_seeds - 1})" + ) + else: + seed_indices = [None] # Single job, no seed suffix + + # Get server type string once if self-hosted + server_type_str = None + server_container = None + if self_hosted: + server_type_str = server_type.value if hasattr(server_type, "value") else server_type + server_container = cluster_config["containers"].get(server_type_str, server_type_str) + + # Filter out seeds with existing output files (unless rerun_done=True) + if not rerun_done and seed_indices != [None]: + filtered_seeds = [] + skipped_seeds = [] + for seed_idx in seed_indices: + output_file = f"{output_dir}/rollouts-rs{seed_idx}.jsonl" + # Check if file exists on cluster + try: + unmounted_path = get_unmounted_path(cluster_config, output_file) + if cluster_path_exists(cluster_config, unmounted_path): + skipped_seeds.append(seed_idx) + else: + filtered_seeds.append(seed_idx) + except Exception as e: + LOG.warning(f"Could not check if {output_file} exists: {e}. Including seed {seed_idx}.") + filtered_seeds.append(seed_idx) + + if skipped_seeds: + LOG.info(f"Skipping seeds with existing output files: {skipped_seeds}") + seed_indices = filtered_seeds + + if not seed_indices: + LOG.info("All seeds already have output files. Nothing to run.") + return None + + # Build jobs - one per seed, each with its own server/sandbox (unique ports) + jobs = [] + for seed_idx in seed_indices: + components = [] + + # Determine naming suffix + if seed_idx is not None: + output_file = f"{output_dir}/rollouts-rs{seed_idx}.jsonl" + job_suffix = f"_rs{seed_idx}" + else: + output_file = f"{output_dir}/rollouts.jsonl" + job_suffix = "" + + # 1. Server (optional, self-hosted) - each job gets its own server with unique port + server_script = None + if self_hosted: + server_script = ServerScript( + server_type=server_type_str, + model_path=model, + cluster_config=cluster_config, + num_gpus=server_gpus, + num_nodes=server_nodes, + server_args=server_args, + allocate_port=True, # Each job gets unique port + ) + + server_cmd = Command( + script=server_script, + container=server_container, + name=f"{expname}_server{job_suffix}", + ) + components.append(server_cmd) + LOG.info(f"Job{job_suffix}: server on port {server_script.port}") + + # 2. Sandbox (optional) - each job gets its own sandbox with unique port + sandbox_script = None + if with_sandbox: + sandbox_script = SandboxScript( + cluster_config=cluster_config, + allocate_port=True, # Each job gets unique port + ) + + sandbox_cmd = Command( + script=sandbox_script, + container=cluster_config["containers"]["sandbox"], + name=f"{expname}_sandbox{job_suffix}", + ) + components.append(sandbox_cmd) + LOG.info(f"Job{job_suffix}: sandbox on port {sandbox_script.port}") + + # 3. NeMo Gym rollouts + nemo_gym_script = NemoGymRolloutsScript( + config_paths=config_paths_list, + input_file=input_file, + output_file=output_file, + extra_arguments=extra_arguments, + server=server_script, + server_address=server_address, + sandbox=sandbox_script, + gym_path=gym_path, + policy_api_key=policy_api_key, + policy_model_name=policy_model_name, + ) + + nemo_gym_cmd = Command( + script=nemo_gym_script, + container=cluster_config["containers"]["nemo-rl"], + name=f"{expname}_nemo_gym{job_suffix}", + # If use_mounted_nemo_skills=False, avoid /nemo_run/code so Gym uses its bundled version + avoid_nemo_run_code=not use_mounted_nemo_skills, + ) + components.append(nemo_gym_cmd) + + # Create command group for this job + hardware = HardwareConfig( + partition=partition, + num_gpus=server_gpus if self_hosted else 0, + num_nodes=server_nodes if self_hosted else 1, + num_tasks=1, + sbatch_kwargs=sbatch_kwargs_dict, + ) + + group = CommandGroup( + commands=components, + hardware=hardware, + name=f"{expname}{job_suffix}", + log_dir=log_dir, + ) + + jobs.append({"name": f"{expname}{job_suffix}", "group": group}) + + LOG.info(f"Created {len(jobs)} job(s)") + + # Create and run pipeline + pipeline = Pipeline( + name=expname, + cluster_config=cluster_config, + jobs=jobs, + ) + + sequential = cluster_config["executor"] in ["local", "none"] + result = pipeline.run(dry_run=dry_run, sequential=sequential) + + LOG.info(f"Pipeline {'validated' if dry_run else 'submitted'} successfully") + return result + + +if __name__ == "__main__": + typer.main.get_command_name = lambda name: name + app() diff --git a/nemo_skills/pipeline/utils/__init__.py b/nemo_skills/pipeline/utils/__init__.py index 3e738a530f..37d16b734d 100644 --- a/nemo_skills/pipeline/utils/__init__.py +++ b/nemo_skills/pipeline/utils/__init__.py @@ -71,6 +71,15 @@ get_registered_external_repo, register_external_repo, ) +from nemo_skills.pipeline.utils.scripts import ( + BaseJobScript, + GenerationClientScript, + GymClientScript, + MultiVLLMServerScript, + NemoGymRolloutsScript, + SandboxScript, + ServerScript, +) from nemo_skills.pipeline.utils.server import ( SupportedServers, SupportedServersSelfHosted, diff --git a/nemo_skills/pipeline/utils/declarative.py b/nemo_skills/pipeline/utils/declarative.py index 7029dcc638..fbe36487a0 100644 --- a/nemo_skills/pipeline/utils/declarative.py +++ b/nemo_skills/pipeline/utils/declarative.py @@ -36,7 +36,7 @@ get_packaging_job_key, tunnel_hash, ) -from nemo_skills.pipeline.utils.mounts import is_mounted_filepath +from nemo_skills.pipeline.utils.mounts import get_mounts_from_config, is_mounted_filepath from nemo_skills.pipeline.utils.server import wrap_python_path from nemo_skills.utils import get_logger_name @@ -219,6 +219,22 @@ class Command: script: run.Script container: str = "nemo-skills" name: str = "command" + # Optional extra mounts for this Command (e.g., "/dev/shm:/dev/shm"). + # These are merged with mounts from the cluster config when creating the executor. + mounts: Optional[List[str]] = None + # Optional per-command env var overrides (merged with Script-provided runtime env). + environment: Optional[Dict[str, str]] = None + # Runtime working directory to `cd` into before running the script body. + # This is useful because pyxis sets container-workdir=/nemo_run/code by default, + # which can cause imports from /nemo_run/code to shadow site-packages. + workdir: Optional[str] = None + # Control whether /nemo_run/code is used for Python imports for this command. + # - If avoid_nemo_run_code=True, we `cd` away from /nemo_run/code (default "/") and + # remove /nemo_run/code from PYTHONPATH if present. + # - If force_nemo_run_code=True, we prepend /nemo_run/code to PYTHONPATH even if the + # script later cd's elsewhere. + avoid_nemo_run_code: bool = False + force_nemo_run_code: bool = False def prepare_for_execution(self, cluster_config: Dict) -> Tuple[run.Script, Dict]: """Prepare script for execution. @@ -244,11 +260,46 @@ def prepare_for_execution(self, cluster_config: Dict) -> Tuple[run.Script, Dict] # Update script.inline with evaluated command self.script.set_inline(evaluated_command) + # Optionally wrap the command to control cwd/PYTHONPATH behavior (see fields above). + # This is done at the very end so it applies to both eager and lazy inline builders. + prelude_lines: List[str] = [] + + # If requested, force /nemo_run/code on PYTHONPATH (so mounted code is importable even after cd). + if self.force_nemo_run_code and self.avoid_nemo_run_code: + raise ValueError("Command cannot set both avoid_nemo_run_code=True and force_nemo_run_code=True") + if self.force_nemo_run_code: + prelude_lines.append('export PYTHONPATH="/nemo_run/code${PYTHONPATH:+:$PYTHONPATH}"') + + # If requested, avoid /nemo_run/code import shadowing (cd away + remove PYTHONPATH entry). + effective_workdir = self.workdir + if self.avoid_nemo_run_code and effective_workdir is None: + effective_workdir = "/" + if self.avoid_nemo_run_code: + prelude_lines.append('if [ -n "${PYTHONPATH:-}" ]; then') + prelude_lines.append( + " export PYTHONPATH=\"$(echo \"$PYTHONPATH\" | tr ':' '\\n' | grep -v '^/nemo_run/code$' | paste -sd: -)\"" + ) + prelude_lines.append("fi") + + if effective_workdir: + prelude_lines.append(f'cd "{effective_workdir}"') + + if prelude_lines: + prelude = "\n".join(prelude_lines) + "\n" + inline_cmd = self.script.inline + if isinstance(inline_cmd, str): + self.script.set_inline(prelude + inline_cmd) + # If inline_cmd is still callable here, we intentionally do not wrap it; it should + # have been evaluated above. This keeps behavior deterministic. + # Build execution config from Script fields + merged_env = dict(runtime_metadata.get("environment", {})) + if self.environment: + merged_env.update(self.environment) execution_config = { "log_prefix": getattr(self.script, "log_prefix", "main"), - "environment": runtime_metadata.get("environment", {}), - "mounts": None, # Mounts not currently exposed by Scripts + "environment": merged_env, + "mounts": self.mounts, "container": self.container, } @@ -574,12 +625,30 @@ def _create_executor( if span_group_nodes and hardware and hardware.num_nodes is not None: num_nodes = hardware.num_nodes + # Check if the script has a per-script num_tasks override. + # This allows different scripts in the same CommandGroup to have different + # task configurations (e.g., vLLM servers with 2 tasks per node, Gym with 1). + script_num_tasks = getattr(command.script, "num_tasks_override", None) + tasks_per_node = ( + script_num_tasks + if script_num_tasks is not None + else (hardware.num_tasks if hardware and hardware.num_tasks is not None else 1) + ) + + # Allow per-command extra mounts without requiring editing the cluster YAML. + # We treat exec_config["mounts"] as additive and merge it with mounts from cluster_config. + mounts = None + extra_mounts = exec_config.get("mounts") or None + if extra_mounts: + base_mounts = get_mounts_from_config(cluster_config) + mounts = base_mounts + [m for m in extra_mounts if m not in base_mounts] + with env_context: return get_executor( cluster_config=cluster_config, container=container_image, num_nodes=num_nodes, - tasks_per_node=hardware.num_tasks if hardware and hardware.num_tasks is not None else 1, + tasks_per_node=tasks_per_node, gpus_per_node=hardware.num_gpus if hardware and hardware.num_gpus is not None else 0, job_name=job_name_override if job_name_override else command.name, log_dir=log_dir, @@ -589,7 +658,7 @@ def _create_executor( het_group=het_group, total_het_groups=total_het_groups, overlap=overlap, - mounts=exec_config.get("mounts"), + mounts=mounts, with_ray=self.with_ray, sbatch_kwargs=hardware.sbatch_kwargs, dependencies=dependencies, @@ -664,7 +733,30 @@ def _plan_and_add_job( if heterogeneous: shared_env_vars.update(exec_config.get("environment", {})) - # Share packager across executors for efficiency (single-group only) + # IMPORTANT: For single-group jobs with multiple components (overlap), + # nemo-run effectively uses the FIRST executor to determine the SLURM allocation + # (sbatch nodes/gpus/ntasks-per-node). Components that only need to run on the + # master node (e.g., Gym client, sandbox) set span_group_nodes=False which would + # request 1 node if they appear first. That leads to allocating only 1 node even + # when a later component (e.g., multi-node vLLM servers) needs >1 nodes. + # + # To avoid this footgun, ensure that components which span the group's nodes are + # scheduled first so the allocation matches the maximal requirements. + if not heterogeneous: + + def _allocation_sort_key(entry: Dict) -> Tuple[int, int]: + group_hw = entry["group"].hardware + span = getattr(entry["command"].script, "span_group_nodes", False) + # Prefer spanning components first; then prefer larger node counts. + nodes = (group_hw.num_nodes or 1) if span else 1 + return (0 if span else 1, -nodes) + + prepared_commands.sort(key=_allocation_sort_key) + + # Share packager across executors for efficiency (single-group only). + # NOTE: We must NOT key this off of comp_idx/het_idx because we may reorder + # prepared_commands (e.g., to ensure spanning components drive the allocation). + # Otherwise we can end up assigning executor.packager=None for early entries. shared_packager = None # Build commands and executors using prepared data @@ -709,9 +801,9 @@ def _plan_and_add_job( job_name_override=job_name_for_slurm, ) - # Share packager across executors for single-group jobs + # Share packager across executors for single-group jobs (robust to reordering) if not heterogeneous: - if comp_idx == 0 and het_idx == 0: + if shared_packager is None: shared_packager = executor.packager else: executor.packager = shared_packager diff --git a/nemo_skills/pipeline/utils/exp.py b/nemo_skills/pipeline/utils/exp.py index 3bce2eb864..4bfbeefa10 100644 --- a/nemo_skills/pipeline/utils/exp.py +++ b/nemo_skills/pipeline/utils/exp.py @@ -314,6 +314,7 @@ def get_executor( "--no-container-mount-home", "--mpi=pmix", "--wait=10", + "--kill-on-bad-exit=1", # Fail entire job if any task exits with non-zero (e.g., vLLM crash) # we need to be explicit about this in srun as commands might need to run in parallel f"--ntasks-per-node={tasks_per_node}", f"--nodes={num_nodes}", diff --git a/nemo_skills/pipeline/utils/packager.py b/nemo_skills/pipeline/utils/packager.py index 4cb8c6552e..fa2d890b39 100644 --- a/nemo_skills/pipeline/utils/packager.py +++ b/nemo_skills/pipeline/utils/packager.py @@ -111,6 +111,16 @@ def get_git_repo_path(path: str | Path = None): def get_packager(extra_package_dirs: tuple[str] | None = None): """Will check if we are running from a git repo and use git packager or default packager otherwise.""" nemo_skills_dir = get_registered_external_repo("nemo_skills").path + # Controls for deterministic packaging behavior across different launch directories. + # + # By default, if we are running inside a git repo, we use GitArchivePackager (committed files only). + # If the current repo does not contain `nemo_skills/`, we additionally include the *installed* + # nemo_skills package directory to ensure remote tasks can import it. + # + # This can be surprising when users rely on a venv editable install / uncommitted changes. + # These flags let you force using the installed package tree (PatternPackager) regardless of CWD. + force_installed_nemo_skills = bool(int(os.getenv("NEMO_SKILLS_FORCE_INSTALLED_PACKAGE", "0"))) + force_pattern_packager = bool(int(os.getenv("NEMO_SKILLS_FORCE_PATTERN_PACKAGER", "0"))) if extra_package_dirs: include_patterns = [str(Path(d) / "*") for d in extra_package_dirs] @@ -121,15 +131,18 @@ def get_packager(extra_package_dirs: tuple[str] | None = None): check_uncommited_changes = not bool(int(os.getenv("NEMO_SKILLS_DISABLE_UNCOMMITTED_CHANGES_CHECK", 0))) - # are we in a git repo? If yes, we are uploading the current code - repo_path = get_git_repo_path(path=None) # check if we are in a git repo in pwd + # Are we in a git repo? If yes, we *normally* upload committed code from the current repo. + # force_pattern_packager overrides this and forces packaging from the installed package tree. + repo_path = None if force_pattern_packager else get_git_repo_path(path=None) if repo_path: - # Do we have nemo_skills package in this repo? If no, we need to pick it up from installed location - if not (Path(repo_path) / "nemo_skills").is_dir(): + # Do we have nemo_skills package in this repo? If no, we need to pick it up from installed location. + # If force_installed_nemo_skills is set, we always pick up the installed package. + if force_installed_nemo_skills or not (Path(repo_path) / "nemo_skills").is_dir(): LOG.info( - "Not running from Nemo-Skills repo, trying to upload installed package. " + "Using installed Nemo-Skills package for packaging (force_installed=%s). " "Make sure there are no extra files in %s", + force_installed_nemo_skills, str(nemo_skills_dir / "*"), ) include_patterns.append(str(nemo_skills_dir / "*")) @@ -150,7 +163,9 @@ def get_packager(extra_package_dirs: tuple[str] | None = None): ) else: LOG.info( - "Not running from a git repo, trying to upload installed package. Make sure there are no extra files in %s", + "Using PatternPackager for installed Nemo-Skills package (force_pattern=%s). " + "Make sure there are no extra files in %s", + force_pattern_packager, str(nemo_skills_dir / "*"), ) include_patterns.append(str(nemo_skills_dir / "*")) diff --git a/nemo_skills/pipeline/utils/scripts.py b/nemo_skills/pipeline/utils/scripts.py index 3e74e438ef..f5af68ec34 100644 --- a/nemo_skills/pipeline/utils/scripts.py +++ b/nemo_skills/pipeline/utils/scripts.py @@ -75,10 +75,16 @@ class BaseJobScript(run.Script): When True, the script spans all nodes specified in the group's num_nodes. This is important for multi-node setups with --overlap where the server needs multiple nodes but client/sandbox should run on the master node only. + num_tasks_override: Override the group's num_tasks for this specific script. + When set, this script's srun will use this value for --ntasks-per-node + instead of the group's HardwareConfig.num_tasks. Useful when multiple + scripts in a CommandGroup need different task configurations (e.g., + vLLM servers needing 2 tasks per node while Gym client needs 1). """ het_group_index: Optional[int] = field(default=None, init=False, repr=False) span_group_nodes: bool = False # Default: run on 1 node + num_tasks_override: Optional[int] = None # Per-script task count override installation_command: Optional[str] = None entrypoint: str = field(default="bash", init=False) @@ -432,3 +438,561 @@ def build_cmd() -> Tuple[str, Dict]: # Always use lazy command building self.set_inline(build_cmd) super().__post_init__() + + +@dataclass(kw_only=True) +class NemoGymRolloutsScript(BaseJobScript): + """Script for running NeMo Gym rollout collection. + + This script orchestrates the full rollout collection workflow: + 1. Starts ng_run in background to spin up NeMo Gym servers + 2. Polls ng_status until all servers are healthy + 3. Runs ng_collect_rollouts to collect rollouts + 4. Keeps ng_run running (cleanup handled externally) + + Attributes: + config_paths: List of YAML config file paths for ng_run + input_file: Input JSONL file path for rollout collection + output_file: Output JSONL file path for rollouts + extra_arguments: Additional Hydra overrides passed to both ng_run and ng_collect_rollouts + server: Optional ServerScript reference for policy model server + server_address: Optional pre-hosted server address + sandbox: Optional SandboxScript reference for sandbox port + log_prefix: Prefix for log files (default: "nemo_gym") + + Example: + script = NemoGymRolloutsScript( + config_paths=[ + "resources_servers/ns_tools/configs/ns_tools.yaml", + "resources_servers/math_with_judge/configs/math_with_judge.yaml", + ], + input_file="/data/input.jsonl", + output_file="/data/rollouts.jsonl", + extra_arguments="+agent_name=ns_tools_simple_agent +limit=10", + server=server_script, + sandbox=sandbox_script, + ) + """ + + config_paths: List[str] + input_file: str + output_file: str + extra_arguments: str = "" + server: Optional["ServerScript"] = None + server_address: Optional[str] = None + sandbox: Optional["SandboxScript"] = None + gym_path: str = "/opt/NeMo-RL/3rdparty/Gym-workspace/Gym" + policy_api_key: str = "dummy" # API key for policy server (can be dummy for local) + policy_model_name: Optional[str] = None # Model name override for policy server + + log_prefix: str = field(default="nemo_gym", init=False) + + def __post_init__(self): + """Initialize the combined ng_run + ng_collect_rollouts script.""" + + def build_cmd() -> Tuple[str, Dict]: + """Build the full rollout collection command.""" + # Build config_paths argument + config_paths_str = ",".join(self.config_paths) + + # Build ng_run command parts + ng_run_parts = [ + "ng_run", + f'"+config_paths=[{config_paths_str}]"', + ] + + # Add policy server URL if we have a server reference or address + if self.server is not None: + server_addr = f"http://{self.server.hostname_ref()}:{self.server.port}/v1" + ng_run_parts.append(f'+policy_base_url="{server_addr}"') + elif self.server_address is not None: + ng_run_parts.append(f'+policy_base_url="{self.server_address}"') + + # Add policy API key (required by some configs) + ng_run_parts.append(f'+policy_api_key="{self.policy_api_key}"') + + # Add policy model name (required by configs) + if self.policy_model_name: + ng_run_parts.append(f'+policy_model_name="{self.policy_model_name}"') + + # Add extra arguments to ng_run + if self.extra_arguments: + ng_run_parts.append(self.extra_arguments) + + ng_run_cmd = " ".join(ng_run_parts) + + # Build ng_collect_rollouts command + ng_collect_parts = [ + "ng_collect_rollouts", + f'+input_jsonl_fpath="{self.input_file}"', + f'+output_jsonl_fpath="{self.output_file}"', + ] + + # Add extra arguments to ng_collect_rollouts + if self.extra_arguments: + ng_collect_parts.append(self.extra_arguments) + + ng_collect_cmd = " ".join(ng_collect_parts) + + # Compute the vLLM server URL for the wait check + if self.server is not None: + vllm_server_url = f"http://{self.server.hostname_ref()}:{self.server.port}/v1" + elif self.server_address is not None: + vllm_server_url = self.server_address + else: + vllm_server_url = "" + + # Build the full bash script that: + # 1. Installs NeMo Gym from 3rdparty/Gym-workspace/Gym + # 2. Waits for vLLM server to be ready + # 3. Starts ng_run in background + # 4. Polls ng_status until healthy (with early failure detection) + # 5. Runs ng_collect_rollouts + cmd = f"""set -e +set -o pipefail + +echo "=== Installing NeMo Gym ===" +cd {self.gym_path} || {{ echo "ERROR: Failed to cd to Gym directory"; exit 1; }} +uv venv --python 3.12 --allow-existing .venv || {{ echo "ERROR: Failed to create venv"; exit 1; }} +source .venv/bin/activate || {{ echo "ERROR: Failed to activate venv"; exit 1; }} +uv sync --active --extra dev || {{ echo "ERROR: Failed to sync dependencies"; exit 1; }} +echo "NeMo Gym installed successfully" + +# Disable pipefail for the polling loop (grep may return non-zero) +set +o pipefail + +# Wait for vLLM server to be ready before starting ng_run +# Note: --kill-on-bad-exit in srun ensures job fails if vLLM crashes +VLLM_SERVER_URL="{vllm_server_url}" +if [ -n "$VLLM_SERVER_URL" ]; then + echo "=== Waiting for vLLM server at $VLLM_SERVER_URL ===" + while [ $(curl -s -o /dev/null -w "%{{http_code}}" "$VLLM_SERVER_URL/models" 2>/dev/null) -ne 200 ]; do + sleep 10 + done + echo "vLLM server is ready!" +fi + +echo "=== Starting NeMo Gym servers ===" +{ng_run_cmd} & +NG_RUN_PID=$! +echo "ng_run PID: $NG_RUN_PID" + +echo "Waiting for NeMo Gym servers..." +LAST_STATUS="" +while true; do + # Check if ng_run process died - let the failure cascade naturally + if ! kill -0 $NG_RUN_PID 2>/dev/null; then + echo "ERROR: ng_run process exited unexpectedly" + wait $NG_RUN_PID 2>/dev/null # Get exit code + exit 1 + fi + + STATUS_OUTPUT=$(ng_status 2>&1) + + if echo "$STATUS_OUTPUT" | grep -q "healthy, 0 unhealthy"; then + echo "All servers ready!" + break + fi + + # Only print status when it changes (reduce verbosity) + CURRENT_STATUS=$(echo "$STATUS_OUTPUT" | grep -oE '[0-9]+ healthy' | head -1 || echo "starting") + if [ "$CURRENT_STATUS" != "$LAST_STATUS" ]; then + echo "Server status: $CURRENT_STATUS" + LAST_STATUS="$CURRENT_STATUS" + fi + + sleep 10 +done + +echo "=== Running rollout collection ===" +echo "Input file: {self.input_file}" +echo "Output file: {self.output_file}" +mkdir -p "$(dirname "{self.output_file}")" +echo "Output directory created: $(dirname "{self.output_file}")" +echo "Running: {ng_collect_cmd}" +{ng_collect_cmd} || {{ echo "ERROR: ng_collect_rollouts failed"; kill $NG_RUN_PID 2>/dev/null || true; exit 1; }} + +echo "=== Rollout collection complete ===" +echo "Output: {self.output_file}" + +echo "=== Cleaning up ===" +kill $NG_RUN_PID 2>/dev/null || true +echo "Servers terminated." +""" + # Build environment variables for sandbox connection + # YAML configs use ${oc.env:NEMO_SKILLS_SANDBOX_HOST,127.0.0.1} and + # ${oc.env:NEMO_SKILLS_SANDBOX_PORT,6000} to resolve these + env_vars = {} + if self.sandbox is not None: + env_vars["NEMO_SKILLS_SANDBOX_HOST"] = self.sandbox.hostname_ref() + env_vars["NEMO_SKILLS_SANDBOX_PORT"] = str(self.sandbox.port) + + return cmd.strip(), {"environment": env_vars} + + self.set_inline(build_cmd) + super().__post_init__() + + +@dataclass(kw_only=True) +class MultiVLLMServerScript(BaseJobScript): + """Script for deploying multiple independent vLLM servers for Gym routing. + + This script enables data-parallel vLLM deployments where Gym routes requests + across multiple independent vLLM server replicas. Each replica runs on its own + set of GPUs and handles requests independently. + + Supports configurations like: + - 1 server per node with TP=8 (full node per server) + - 2 servers per node with TP=4 each (2 servers sharing a node) + - 4 servers per node with TP=2 each + - 8 servers per node with TP=1 each + + The script uses SLURM environment variables to determine which server instance + to start on each task: + - SLURM_PROCID: Global task ID (0 to num_nodes * servers_per_node - 1) + - SLURM_LOCALID: Task ID within node (0 to servers_per_node - 1) + - SLURM_NODEID: Node ID (0 to num_nodes - 1) + + Attributes: + model_path: Path to model weights or HuggingFace model name + cluster_config: Cluster configuration dictionary + num_nodes: Number of nodes to use + servers_per_node: Number of vLLM servers per node + gpus_per_server: GPUs per server (tensor_parallel_size) + server_args: Additional vLLM server arguments + base_port: Starting port (servers use base_port + global_rank) + + Example: + # 4 nodes × 2 servers per node = 8 total replicas, each with TP=4 + servers = MultiVLLMServerScript( + model_path="/models/llama-70b", + cluster_config=cluster_config, + num_nodes=4, + servers_per_node=2, + gpus_per_server=4, + server_args="--dtype auto --max-model-len 8192", + ) + + # Use in CommandGroup + group = CommandGroup( + commands=[ + Command(script=servers, container="vllm", name="vllm"), + Command(script=gym_client, container="nemo-skills", name="gym"), + ], + hardware=HardwareConfig( + num_nodes=4, + num_gpus=8, + num_tasks=2, # servers_per_node + ), + ) + """ + + model_path: str + cluster_config: Dict + num_nodes: int = 1 + servers_per_node: int = 1 + gpus_per_server: int = 8 + server_args: str = "" + base_port: Optional[int] = None + # Mitigations for vLLM v1 shared-memory communicator instability on some clusters. + # When enabled, we export env vars that (if supported by the vLLM version) disable + # SHM-based broadcast paths, which are sensitive to /dev/shm sizing and IPC quirks. + disable_shm_broadcast: bool = False + # Print /dev/shm diagnostics at startup to help debug node-to-node differences. + print_shm_diagnostics: bool = False + + # Spans all nodes - each SLURM task runs one server + span_group_nodes: bool = True + + # Internal tracking + _total_replicas: int = field(init=False, repr=False, default=0) + _ports: List[int] = field(init=False, repr=False, default_factory=list) + log_prefix: str = field(default="vllm_servers", init=False) + + def __post_init__(self): + """Build command for distributed vLLM servers.""" + self._total_replicas = self.num_nodes * self.servers_per_node + + # Set num_tasks_override to ensure this script gets the right task count + self.num_tasks_override = self.servers_per_node + + # Allocate ports + if self.base_port is None: + self.base_port = get_free_port(strategy="random") + self._ports = [self.base_port + i for i in range(self._total_replicas)] + + ports_str = " ".join(str(p) for p in self._ports) + + # Build command that runs one server per SLURM task + disable_shm_broadcast = "true" if self.disable_shm_broadcast else "false" + print_shm_diagnostics = "true" if self.print_shm_diagnostics else "false" + cmd = f''' +#!/bin/bash +set -e + +# Configuration +PORTS=({ports_str}) +GPUS_PER_SERVER={self.gpus_per_server} +SERVERS_PER_NODE={self.servers_per_node} +DISABLE_SHM_BROADCAST={disable_shm_broadcast} +PRINT_SHM_DIAGNOSTICS={print_shm_diagnostics} + +# SLURM environment +# SLURM_PROCID: Global task ID (0 to num_nodes * servers_per_node - 1) +# SLURM_LOCALID: Task ID within this node (0 to servers_per_node - 1) +# SLURM_NODEID: Node ID (0 to num_nodes - 1) +GLOBAL_RANK=${{SLURM_PROCID:-0}} +LOCAL_RANK=${{SLURM_LOCALID:-0}} +NODE_ID=${{SLURM_NODEID:-0}} + +# Calculate which GPUs this server should use +# E.g., with 2 servers per node and 4 GPUs each: +# Local rank 0 → GPUs 0,1,2,3 +# Local rank 1 → GPUs 4,5,6,7 +GPU_START=$((LOCAL_RANK * GPUS_PER_SERVER)) +GPU_END=$((GPU_START + GPUS_PER_SERVER - 1)) +GPU_LIST=$(seq -s, $GPU_START $GPU_END) + +# Get port for this server +MY_PORT=${{PORTS[$GLOBAL_RANK]}} + +echo "=== vLLM Server Configuration ===" +echo "Global Rank: $GLOBAL_RANK" +echo "Node ID: $NODE_ID" +echo "Local Rank: $LOCAL_RANK" +echo "Port: $MY_PORT" +echo "GPUs: $GPU_LIST (CUDA_VISIBLE_DEVICES)" +echo "Tensor Parallel Size: {self.gpus_per_server}" +echo "Model: {self.model_path}" +echo "=================================" + +# Debug /dev/shm and IPC-related info (helps with node-to-node variance) +if [ "$PRINT_SHM_DIAGNOSTICS" = "true" ]; then + echo "" + echo "=== /dev/shm diagnostics (before vLLM start) ===" + df -h /dev/shm || true + ls -ld /dev/shm || true + mount | grep -E " /dev/shm " || true + echo "ulimit -n: $(ulimit -n || true)" + echo "===============================================" + echo "" +fi + +# Set GPU visibility +export CUDA_VISIBLE_DEVICES=$GPU_LIST + +# Mitigation: attempt to disable vLLM SHM broadcast transport (best-effort, version-dependent). +# These env vars are intentionally safe: if vLLM doesn't recognize them, they are ignored. +if [ "$DISABLE_SHM_BROADCAST" = "true" ]; then + export VLLM_DISABLE_SHM_BROADCAST=1 + export VLLM_USE_SHM_BROADCAST=0 +fi + +# Start vLLM server +python3 -m vllm.entrypoints.openai.api_server \\ + --model "{self.model_path}" \\ + --host "0.0.0.0" \\ + --port "$MY_PORT" \\ + --tensor-parallel-size {self.gpus_per_server} \\ + --trust-remote-code \\ + {self.server_args} +''' + + self.set_inline(cmd) + super().__post_init__() + + @property + def total_replicas(self) -> int: + """Total number of vLLM server replicas.""" + return self._total_replicas + + @property + def ports(self) -> List[int]: + """List of ports for all server replicas.""" + return self._ports + + +@dataclass(kw_only=True) +class GymClientScript(BaseJobScript): + """Script that starts Gym with routing to multiple vLLM servers. + + This script runs on a single node (the master node) and: + 1. Waits for all vLLM servers to be healthy + 2. Builds the list of server URLs + 3. Exports GYM_VLLM_BASE_URLS environment variable + 4. Starts Gym with the configured command + + Attributes: + servers: Reference to MultiVLLMServerScript for server URLs + gym_command: Command to run Gym (e.g., "python -m nemo_gym.server ...") + health_check_interval: Seconds between health check attempts (default: 10) + + Example: + gym_client = GymClientScript( + servers=vllm_servers, + gym_command="python -m nemo_gym.server --config /path/to/config.yaml", + ) + + # In same CommandGroup as vllm_servers + group = CommandGroup( + commands=[ + Command(script=vllm_servers, container="vllm", name="vllm"), + Command(script=gym_client, container="nemo-skills", name="gym"), + ], + ... + ) + """ + + servers: "MultiVLLMServerScript" + gym_command: str + health_check_interval: int = 10 # Seconds between health check attempts + + # Runs only on master node, single task + span_group_nodes: bool = False + num_tasks_override: int = 1 + + log_prefix: str = field(default="gym", init=False) + + def __post_init__(self): + """Build command that waits for servers then starts Gym.""" + + def build_cmd() -> Tuple[str, Dict]: + ports_str = " ".join(str(p) for p in self.servers.ports) + num_servers = self.servers.total_replicas + num_nodes = self.servers.num_nodes + servers_per_node = self.servers.servers_per_node + + cmd = f""" +#!/bin/bash +set -e + +echo "=== Gym Client Starting ===" +echo "Waiting for {num_servers} vLLM servers across {num_nodes} nodes..." +echo "No timeout - will wait until servers are ready or job is cancelled." +echo "If vLLM servers crash, the job will fail." + +# Configuration +PORTS=({ports_str}) +NUM_NODES={num_nodes} +SERVERS_PER_NODE={servers_per_node} +HEALTH_CHECK_INTERVAL={self.health_check_interval} + +# Expand SLURM_JOB_NODELIST without scontrol (not available in containers) +# Uses Python to parse the compressed node list format (e.g., "node[001-004]") +expand_nodelist() {{ + python3 -c " +import re +import sys + +nodelist = sys.argv[1] +nodes = [] + +# Handle comma-separated parts +for part in re.split(r',(?![^\\[]*\\])', nodelist): + # Check if it has a range like prefix[001-003,005] + match = re.match(r'(.*)\\[([^\\]]+)\\](.*)', part) + if match: + prefix, ranges, suffix = match.groups() + for r in ranges.split(','): + if '-' in r: + start, end = r.split('-') + width = len(start) + for i in range(int(start), int(end) + 1): + nodes.append(f'{{prefix}}{{i:0{{width}}d}}{{suffix}}') + else: + nodes.append(f'{{prefix}}{{r}}{{suffix}}') + else: + nodes.append(part) + +print(' '.join(nodes)) +" "$1" +}} + +# Get all node hostnames. +# IMPORTANT: depending on how the step is launched, some clusters populate only one of these. +# We'll pick the candidate that expands to the most nodes, and verify it matches NUM_NODES. +BEST_NODES_STR="" +BEST_COUNT=0 +for VAR_NAME in SLURM_JOB_NODELIST SLURM_NODELIST SLURM_STEP_NODELIST; do + NL="${{!VAR_NAME:-}}" + if [ -z "$NL" ]; then + continue + fi + EXPANDED="$(expand_nodelist "$NL" || true)" + COUNT="$(echo "$EXPANDED" | wc -w | tr -d ' ')" + if [ "$COUNT" -gt "$BEST_COUNT" ]; then + BEST_COUNT="$COUNT" + BEST_NODES_STR="$EXPANDED" + BEST_VAR_NAME="$VAR_NAME" + fi +done + +if [ "$BEST_COUNT" -lt "$NUM_NODES" ]; then + echo "ERROR: Could not discover enough node hostnames for NUM_NODES=$NUM_NODES" + echo " Best candidate: ${{BEST_VAR_NAME:-}} -> $BEST_COUNT nodes: '${{BEST_NODES_STR:-}}'" + echo " SLURM_JOB_NODELIST='${{SLURM_JOB_NODELIST:-}}'" + echo " SLURM_NODELIST='${{SLURM_NODELIST:-}}'" + echo " SLURM_STEP_NODELIST='${{SLURM_STEP_NODELIST:-}}'" + exit 1 +fi + +read -ra NODES <<< "$BEST_NODES_STR" +echo "Nodes (from $BEST_VAR_NAME): ${{NODES[*]}}" + +# Build URL list and wait for each server (no timeout - wait indefinitely) +URLS="" +GLOBAL_IDX=0 +START_TIME=$(date +%s) + +for NODE_IDX in $(seq 0 $((NUM_NODES - 1))); do + NODE=${{NODES[$NODE_IDX]}} + if [ -z "$NODE" ]; then + echo "ERROR: Empty NODE at index $NODE_IDX. Nodes: ${{NODES[*]}}" + exit 1 + fi + + for LOCAL_IDX in $(seq 0 $((SERVERS_PER_NODE - 1))); do + PORT=${{PORTS[$GLOBAL_IDX]}} + URL="http://${{NODE}}:${{PORT}}/v1" + + echo "Waiting for server $GLOBAL_IDX at $URL..." + ATTEMPT=0 + while true; do + ATTEMPT=$((ATTEMPT + 1)) + if curl -s "${{URL}}/health" > /dev/null 2>&1; then + ELAPSED=$(($(date +%s) - START_TIME)) + echo " ✓ Server $GLOBAL_IDX is ready (after ${{ELAPSED}}s total)" + break + fi + # Log progress every 60 seconds + if [ $((ATTEMPT % (60 / HEALTH_CHECK_INTERVAL))) -eq 0 ]; then + ELAPSED=$(($(date +%s) - START_TIME)) + echo " ... still waiting for server $GLOBAL_IDX (${{ELAPSED}}s elapsed)" + fi + sleep $HEALTH_CHECK_INTERVAL + done + + if [ -n "$URLS" ]; then URLS="$URLS,"; fi + URLS="${{URLS}}$URL" + GLOBAL_IDX=$((GLOBAL_IDX + 1)) + done +done + +TOTAL_TIME=$(($(date +%s) - START_TIME)) +echo "" +echo "=== All {num_servers} servers ready! (took ${{TOTAL_TIME}}s) ===" +echo "Server URLs: $URLS" +echo "" + +# Export for Gym to use +export GYM_VLLM_BASE_URLS="$URLS" + +# Run Gym +echo "Starting Gym..." +{self.gym_command} +""" + + return cmd.strip(), {"environment": {}} + + self.set_inline(build_cmd) + super().__post_init__() From 85ab39d0dae9edb9b4cd8f4588dcb5ff9795dec6 Mon Sep 17 00:00:00 2001 From: George Armstrong Date: Tue, 17 Feb 2026 13:55:11 -0800 Subject: [PATCH 2/7] fix: address review feedback on gym rollouts pipeline - Fix PYTHONPATH stripping to match /nemo_run/code prefixed paths - Re-enable pipefail before ng_collect_rollouts - Fix indentation in GymClientScript error block - Remove emoji from server ready message Signed-off-by: George Armstrong --- nemo_skills/pipeline/utils/declarative.py | 2 +- nemo_skills/pipeline/utils/scripts.py | 11 +++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/nemo_skills/pipeline/utils/declarative.py b/nemo_skills/pipeline/utils/declarative.py index fbe36487a0..5497311195 100644 --- a/nemo_skills/pipeline/utils/declarative.py +++ b/nemo_skills/pipeline/utils/declarative.py @@ -277,7 +277,7 @@ def prepare_for_execution(self, cluster_config: Dict) -> Tuple[run.Script, Dict] if self.avoid_nemo_run_code: prelude_lines.append('if [ -n "${PYTHONPATH:-}" ]; then') prelude_lines.append( - " export PYTHONPATH=\"$(echo \"$PYTHONPATH\" | tr ':' '\\n' | grep -v '^/nemo_run/code$' | paste -sd: -)\"" + " export PYTHONPATH=\"$(echo \"$PYTHONPATH\" | tr ':' '\\n' | grep -v '^/nemo_run/code' | paste -sd: -)\"" ) prelude_lines.append("fi") diff --git a/nemo_skills/pipeline/utils/scripts.py b/nemo_skills/pipeline/utils/scripts.py index f5af68ec34..61603d7f64 100644 --- a/nemo_skills/pipeline/utils/scripts.py +++ b/nemo_skills/pipeline/utils/scripts.py @@ -604,6 +604,9 @@ def build_cmd() -> Tuple[str, Dict]: sleep 10 done +# Re-enable pipefail for the actual rollout collection +set -o pipefail + echo "=== Running rollout collection ===" echo "Input file: {self.input_file}" echo "Output file: {self.output_file}" @@ -930,9 +933,9 @@ def build_cmd() -> Tuple[str, Dict]: if [ "$BEST_COUNT" -lt "$NUM_NODES" ]; then echo "ERROR: Could not discover enough node hostnames for NUM_NODES=$NUM_NODES" echo " Best candidate: ${{BEST_VAR_NAME:-}} -> $BEST_COUNT nodes: '${{BEST_NODES_STR:-}}'" - echo " SLURM_JOB_NODELIST='${{SLURM_JOB_NODELIST:-}}'" - echo " SLURM_NODELIST='${{SLURM_NODELIST:-}}'" - echo " SLURM_STEP_NODELIST='${{SLURM_STEP_NODELIST:-}}'" + echo " SLURM_JOB_NODELIST='${{SLURM_JOB_NODELIST:-}}'" + echo " SLURM_NODELIST='${{SLURM_NODELIST:-}}'" + echo " SLURM_STEP_NODELIST='${{SLURM_STEP_NODELIST:-}}'" exit 1 fi @@ -961,7 +964,7 @@ def build_cmd() -> Tuple[str, Dict]: ATTEMPT=$((ATTEMPT + 1)) if curl -s "${{URL}}/health" > /dev/null 2>&1; then ELAPSED=$(($(date +%s) - START_TIME)) - echo " ✓ Server $GLOBAL_IDX is ready (after ${{ELAPSED}}s total)" + echo " Server $GLOBAL_IDX is ready (after ${{ELAPSED}}s total)" break fi # Log progress every 60 seconds From a34c0d645236986a6b33f3b0a410a72f8e913ac6 Mon Sep 17 00:00:00 2001 From: George Armstrong Date: Tue, 3 Mar 2026 14:24:28 -0800 Subject: [PATCH 3/7] refactor: address PR review feedback for gym rollouts pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Split scripts.py into scripts/ package (base, server, generation, nemo_gym) - Remove unused MultiVLLMServerScript and GymClientScript classes - Remove server_address param; model now accepts URLs for pre-hosted servers - Use str_ids_to_list for seed parsing (consistency with generate pipeline) - Add server_container CLI option; use direct dict access for container lookup - Remove try/except on seed skip check — fail hard on cluster errors - Use get_server_wait_cmd for vLLM health check (reuse existing utility) - Add comment explaining why Gym venv install step is needed for mounted paths - Fix external_deps assignment after prepared_commands reordering - Replace .get() with direct dict access throughout Signed-off-by: George Armstrong --- nemo_skills/pipeline/nemo_gym_rollouts.py | 67 +- nemo_skills/pipeline/utils/__init__.py | 2 - nemo_skills/pipeline/utils/declarative.py | 10 +- nemo_skills/pipeline/utils/scripts.py | 1001 ----------------- .../pipeline/utils/scripts/__init__.py | 28 + nemo_skills/pipeline/utils/scripts/base.py | 91 ++ .../pipeline/utils/scripts/generation.py | 118 ++ .../pipeline/utils/scripts/nemo_gym.py | 200 ++++ nemo_skills/pipeline/utils/scripts/server.py | 145 +++ 9 files changed, 621 insertions(+), 1041 deletions(-) delete mode 100644 nemo_skills/pipeline/utils/scripts.py create mode 100644 nemo_skills/pipeline/utils/scripts/__init__.py create mode 100644 nemo_skills/pipeline/utils/scripts/base.py create mode 100644 nemo_skills/pipeline/utils/scripts/generation.py create mode 100644 nemo_skills/pipeline/utils/scripts/nemo_gym.py create mode 100644 nemo_skills/pipeline/utils/scripts/server.py diff --git a/nemo_skills/pipeline/nemo_gym_rollouts.py b/nemo_skills/pipeline/nemo_gym_rollouts.py index 80fcb4b070..99f971c6d6 100644 --- a/nemo_skills/pipeline/nemo_gym_rollouts.py +++ b/nemo_skills/pipeline/nemo_gym_rollouts.py @@ -42,7 +42,7 @@ --config_paths "ns_tools/configs/ns_tools.yaml" \\ --input_file data/example.jsonl \\ --output_dir /results/rollouts \\ - --server_address http://localhost:8000/v1 \\ + --model http://localhost:8000/v1 \\ --policy_model_name nvidia/model-name \\ +agent_name=ns_tools_simple_agent """ @@ -66,7 +66,7 @@ SandboxScript, ServerScript, ) -from nemo_skills.utils import get_logger_name, setup_logging +from nemo_skills.utils import get_logger_name, setup_logging, str_ids_to_list LOG = logging.getLogger(get_logger_name(__file__)) @@ -88,10 +88,9 @@ def nemo_gym_rollouts( input_file: str = typer.Option(..., help="Path to input JSONL file for rollout collection"), output_dir: str = typer.Option(..., help="Directory for rollout outputs. Output file will be rollouts.jsonl"), expname: str = typer.Option("nemo_gym_rollouts", help="NeMo Run experiment name"), - model: str = typer.Option(None, help="Path to model for self-hosted vLLM server"), - server_address: str = typer.Option( + model: str = typer.Option( None, - help="Address of pre-hosted server (e.g., http://localhost:8000/v1). If provided, skips self-hosted server.", + help="Model path for self-hosted server, or server URL (e.g., http://host:8000/v1) for pre-hosted.", ), server_type: pipeline_utils.SupportedServers = typer.Option( None, @@ -100,6 +99,11 @@ def nemo_gym_rollouts( server_gpus: int = typer.Option(None, help="Number of GPUs for self-hosted server"), server_nodes: int = typer.Option(1, help="Number of nodes for self-hosted server"), server_args: str = typer.Option("", help="Additional arguments for the server"), + server_container: str = typer.Option( + None, + help="Override the container image for the server. " + "If not specified, uses cluster_config['containers'][server_type].", + ), with_sandbox: bool = typer.Option(False, help="If True, start a sandbox container for code execution"), gym_path: str = typer.Option( "/opt/NeMo-RL/3rdparty/Gym-workspace/Gym", @@ -180,28 +184,27 @@ def nemo_gym_rollouts( config_paths_list = [p.strip() for p in config_paths.split(",") if p.strip()] LOG.info(f"Config paths: {config_paths_list}") - # Validate server configuration - self_hosted = model is not None and server_gpus is not None - pre_hosted = server_address is not None + # Determine if model is a URL (pre-hosted) or a path (self-hosted) + pre_hosted = model is not None and model.startswith("http") + self_hosted = model is not None and not pre_hosted and server_gpus is not None + + if model is None: + raise ValueError("--model is required. Provide a model path for self-hosted or a URL for pre-hosted server.") if not self_hosted and not pre_hosted: raise ValueError( - "Must provide either --model and --server_gpus for self-hosted server, " - "or --server_address for pre-hosted server" + "--server_gpus is required when using a self-hosted server (model path). " + "Or provide a URL (http://...) for pre-hosted." ) - if self_hosted and pre_hosted: - raise ValueError("Cannot specify both self-hosted (--model, --server_gpus) and pre-hosted (--server_address)") - if self_hosted and server_type is None: raise ValueError("--server_type is required when using self-hosted server") # Validate and set policy_model_name if pre_hosted and policy_model_name is None: - raise ValueError("--policy_model_name is required when using a pre-hosted server (--server_address)") + raise ValueError("--policy_model_name is required when using a pre-hosted server") if self_hosted and policy_model_name is None: - # For self-hosted, default to the model path policy_model_name = model LOG.info(f"Using model path as policy_model_name: {policy_model_name}") @@ -216,9 +219,8 @@ def nemo_gym_rollouts( # Determine seed indices for parallel jobs if random_seeds is not None: - # Explicit seeds provided if isinstance(random_seeds, str): - seed_indices = [int(s.strip()) for s in random_seeds.split(",")] + seed_indices = str_ids_to_list(random_seeds) else: seed_indices = list(random_seeds) LOG.info(f"Using explicit seeds: {seed_indices}") @@ -230,12 +232,15 @@ def nemo_gym_rollouts( else: seed_indices = [None] # Single job, no seed suffix - # Get server type string once if self-hosted + # Get server type string and container if self-hosted server_type_str = None - server_container = None + resolved_server_container = None if self_hosted: server_type_str = server_type.value if hasattr(server_type, "value") else server_type - server_container = cluster_config["containers"].get(server_type_str, server_type_str) + if server_container is not None: + resolved_server_container = server_container + else: + resolved_server_container = cluster_config["containers"][server_type_str] # Filter out seeds with existing output files (unless rerun_done=True) if not rerun_done and seed_indices != [None]: @@ -243,15 +248,10 @@ def nemo_gym_rollouts( skipped_seeds = [] for seed_idx in seed_indices: output_file = f"{output_dir}/rollouts-rs{seed_idx}.jsonl" - # Check if file exists on cluster - try: - unmounted_path = get_unmounted_path(cluster_config, output_file) - if cluster_path_exists(cluster_config, unmounted_path): - skipped_seeds.append(seed_idx) - else: - filtered_seeds.append(seed_idx) - except Exception as e: - LOG.warning(f"Could not check if {output_file} exists: {e}. Including seed {seed_idx}.") + unmounted_path = get_unmounted_path(cluster_config, output_file) + if cluster_path_exists(cluster_config, unmounted_path): + skipped_seeds.append(seed_idx) + else: filtered_seeds.append(seed_idx) if skipped_seeds: @@ -285,12 +285,12 @@ def nemo_gym_rollouts( num_gpus=server_gpus, num_nodes=server_nodes, server_args=server_args, - allocate_port=True, # Each job gets unique port + allocate_port=True, ) server_cmd = Command( script=server_script, - container=server_container, + container=resolved_server_container, name=f"{expname}_server{job_suffix}", ) components.append(server_cmd) @@ -301,7 +301,7 @@ def nemo_gym_rollouts( if with_sandbox: sandbox_script = SandboxScript( cluster_config=cluster_config, - allocate_port=True, # Each job gets unique port + allocate_port=True, ) sandbox_cmd = Command( @@ -319,7 +319,7 @@ def nemo_gym_rollouts( output_file=output_file, extra_arguments=extra_arguments, server=server_script, - server_address=server_address, + server_address=model if pre_hosted else None, sandbox=sandbox_script, gym_path=gym_path, policy_api_key=policy_api_key, @@ -330,7 +330,6 @@ def nemo_gym_rollouts( script=nemo_gym_script, container=cluster_config["containers"]["nemo-rl"], name=f"{expname}_nemo_gym{job_suffix}", - # If use_mounted_nemo_skills=False, avoid /nemo_run/code so Gym uses its bundled version avoid_nemo_run_code=not use_mounted_nemo_skills, ) components.append(nemo_gym_cmd) diff --git a/nemo_skills/pipeline/utils/__init__.py b/nemo_skills/pipeline/utils/__init__.py index 37d16b734d..e419a38301 100644 --- a/nemo_skills/pipeline/utils/__init__.py +++ b/nemo_skills/pipeline/utils/__init__.py @@ -74,8 +74,6 @@ from nemo_skills.pipeline.utils.scripts import ( BaseJobScript, GenerationClientScript, - GymClientScript, - MultiVLLMServerScript, NemoGymRolloutsScript, SandboxScript, ServerScript, diff --git a/nemo_skills/pipeline/utils/declarative.py b/nemo_skills/pipeline/utils/declarative.py index 5497311195..9fc159d1a9 100644 --- a/nemo_skills/pipeline/utils/declarative.py +++ b/nemo_skills/pipeline/utils/declarative.py @@ -638,7 +638,7 @@ def _create_executor( # Allow per-command extra mounts without requiring editing the cluster YAML. # We treat exec_config["mounts"] as additive and merge it with mounts from cluster_config. mounts = None - extra_mounts = exec_config.get("mounts") or None + extra_mounts = exec_config["mounts"] or None if extra_mounts: base_mounts = get_mounts_from_config(cluster_config) mounts = base_mounts + [m for m in extra_mounts if m not in base_mounts] @@ -760,7 +760,7 @@ def _allocation_sort_key(entry: Dict) -> Tuple[int, int]: shared_packager = None # Build commands and executors using prepared data - for entry in prepared_commands: + for entry_idx, entry in enumerate(prepared_commands): het_idx = entry["het_idx"] comp_idx = entry["comp_idx"] group = entry["group"] @@ -778,8 +778,10 @@ def _allocation_sort_key(entry: Dict) -> Tuple[int, int]: # Resolve container and create executor container_image = self._resolve_container(exec_config, command, cluster_config) - # Pass external dependencies only to the first executor (SLURM doesn't support per-component dependencies in hetjobs) - exec_dependencies = external_deps if (het_idx == 0 and comp_idx == 0) else None + # Pass external dependencies only to the first executor in iteration order. + # We use entry_idx rather than het_idx/comp_idx because prepared_commands may + # have been reordered (e.g., to put spanning components first for allocation). + exec_dependencies = external_deps if entry_idx == 0 else None # Always use group.name for SLURM job name (consistent across all components) # The group name is set to task_name in generate.py, without component suffixes diff --git a/nemo_skills/pipeline/utils/scripts.py b/nemo_skills/pipeline/utils/scripts.py deleted file mode 100644 index 61603d7f64..0000000000 --- a/nemo_skills/pipeline/utils/scripts.py +++ /dev/null @@ -1,1001 +0,0 @@ -# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Script classes for NeMo-Skills pipeline components. - -These classes wrap NeMo-Run's run.Script interface to provide typed, reusable -job components (servers, clients, sandboxes) with explicit fields and -cross-component reference support for heterogeneous jobs. - -Example: - # Create a server script with automatic port allocation - server = ServerScript( - server_type="vllm", - model_path="/models/llama-8b", - cluster_config=cluster_config, - num_gpus=8, - ) - - # Create a client that references the server - client = GenerationClientScript( - output_dir="/results", - input_file="/data/input.jsonl", - server=server, # Cross-component reference - ) - - # Use in Command objects - Command(script=server, container="vllm", ...) - Command(script=client, container="nemo-skills", ...) -""" - -import logging -from dataclasses import dataclass, field -from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Tuple, Union - -import nemo_run as run - -from nemo_skills.pipeline.utils.commands import sandbox_command -from nemo_skills.pipeline.utils.exp import install_packages_wrap -from nemo_skills.pipeline.utils.generation import get_generation_cmd -from nemo_skills.pipeline.utils.server import get_free_port, get_server_command -from nemo_skills.utils import get_logger_name - -if TYPE_CHECKING: - # Avoid circular imports for type hints - pass - -LOG = logging.getLogger(get_logger_name(__file__)) - - -@dataclass -class BaseJobScript(run.Script): - """Base class for job component scripts with heterogeneous job support. - - This class provides: - - het_group_index tracking for cross-component references in heterogeneous SLURM jobs - - hostname_ref() method for getting hostnames in het jobs - - Common pattern for Script initialization - - Attributes: - het_group_index: Index in heterogeneous job group (set by Pipeline at runtime) - span_group_nodes: Whether to span all nodes from the group's HardwareConfig. - When False (default), the script runs on 1 node regardless of group config. - When True, the script spans all nodes specified in the group's num_nodes. - This is important for multi-node setups with --overlap where the server - needs multiple nodes but client/sandbox should run on the master node only. - num_tasks_override: Override the group's num_tasks for this specific script. - When set, this script's srun will use this value for --ntasks-per-node - instead of the group's HardwareConfig.num_tasks. Useful when multiple - scripts in a CommandGroup need different task configurations (e.g., - vLLM servers needing 2 tasks per node while Gym client needs 1). - """ - - het_group_index: Optional[int] = field(default=None, init=False, repr=False) - span_group_nodes: bool = False # Default: run on 1 node - num_tasks_override: Optional[int] = None # Per-script task count override - installation_command: Optional[str] = None - entrypoint: str = field(default="bash", init=False) - - def __post_init__(self): - """Wrap inline command with installation_command if provided.""" - if not self.installation_command: - return - - if callable(self.inline): - original_inline = self.inline - - def wrapped_inline(): - result = original_inline() - if isinstance(result, tuple): - command, metadata = result - return install_packages_wrap(command, self.installation_command), metadata - return install_packages_wrap(result, self.installation_command) - - self.set_inline(wrapped_inline) - elif isinstance(self.inline, str): - self.set_inline(install_packages_wrap(self.inline, self.installation_command)) - - def set_inline(self, command: Union[str, Callable, run.Script]) -> None: - """Set the inline command safely on frozen dataclass.""" - object.__setattr__(self, "inline", command) - - def hostname_ref(self) -> str: - """Get hostname reference for hetjob cross-component communication. - - Returns a shell variable reference that resolves to the master node hostname - for this het group. Uses environment variables automatically exported by nemo-run: - SLURM_MASTER_NODE_HET_GROUP_0, SLURM_MASTER_NODE_HET_GROUP_1, etc. - - These are set via: - export SLURM_MASTER_NODE_HET_GROUP_N=$(scontrol show hostnames $SLURM_JOB_NODELIST_HET_GROUP_N | head -n1) - """ - if self.het_group_index is None: - return "127.0.0.1" # Local fallback for non-heterogeneous jobs - - # Use the environment variable exported by nemo-run - return f"${{SLURM_MASTER_NODE_HET_GROUP_{self.het_group_index}:-localhost}}" - - -@dataclass(kw_only=True) -class ServerScript(BaseJobScript): - """Script for model inference servers (vLLM, TRT-LLM, SGLang, etc.). - - This script wraps server command builders and provides: - - Automatic port allocation if not specified - - Type-safe server configuration - - Cross-component address sharing (get_address()) - - Resource requirement tracking (num_gpus, num_nodes, num_tasks) - - Attributes: - server_type: Type of server (vllm, trtllm, sglang, megatron, openai, etc.) - model_path: Path to model weights or model name for API services - cluster_config: Cluster configuration dictionary - num_gpus: Number of GPUs required (default: 8) - num_nodes: Number of nodes required (default: 1) - server_args: Additional server-specific arguments - server_entrypoint: Custom server entrypoint script (optional) - port: Server port (allocated automatically if None) - allocate_port: Whether to allocate port automatically (default: True) - num_tasks: Number of MPI tasks (computed in __post_init__) - log_prefix: Prefix for log files (default: "server") - - Example: - # Basic usage - server = ServerScript( - server_type="vllm", - model_path="/models/llama-3-8b", - cluster_config=cluster_config, - num_gpus=8, - ) - - # Access allocated port - print(f"Server will run on port {server.port}") - - # Get full address for client connection - address = server.get_address() # Returns "hostname:port" - """ - - server_type: str - model_path: str - cluster_config: Dict - num_gpus: int = 8 - num_nodes: int = 1 - server_args: str = "" - server_entrypoint: Optional[str] = None # Custom server entrypoint script - port: Optional[int] = None - allocate_port: bool = True - - # Server spans all group nodes (e.g., for distributed inference) - span_group_nodes: bool = True - - # Computed fields (set in __post_init__) - num_tasks: int = field(init=False, repr=False) - log_prefix: str = field(default="server", init=False) - - def __post_init__(self): - """Initialize server script. - - - Allocates port if not provided - - Builds server command using get_server_command() - - Sets self.inline to the command string - - Computes num_tasks from server command builder - """ - # Allocate port if not provided - if self.port is None and self.allocate_port: - self.port = get_free_port(strategy="random") - LOG.debug(f"Allocated port {self.port} for {self.server_type} server") - - # Build server command - cmd, self.num_tasks = get_server_command( - server_type=self.server_type, - num_gpus=self.num_gpus, - num_nodes=self.num_nodes, - model_path=self.model_path, - cluster_config=self.cluster_config, - server_port=self.port, - server_args=self.server_args, - server_entrypoint=self.server_entrypoint, - ) - - self.set_inline(cmd) - super().__post_init__() - - def get_address(self) -> str: - """Get server address for client connections. - - Returns hostname:port string that clients can use to connect. - In heterogeneous jobs, hostname_ref() returns a bash expression - that resolves at runtime. - - Returns: - Server address in format "hostname:port" - - Example: - # Use in client command - client_cmd = f"python client.py --server-url http://{server.get_address()}" - """ - return f"{self.hostname_ref()}:{self.port}" - - -@dataclass(kw_only=True) -class SandboxScript(BaseJobScript): - """Script for code execution sandbox container. - - The sandbox provides a secure environment for executing LLM-generated code. - This script wraps sandbox command builders and provides: - - Automatic port allocation - - Mount configuration (can optionally keep mounts, though risky) - - Type-safe sandbox configuration - - Attributes: - cluster_config: Cluster configuration dictionary - port: Sandbox port (allocated automatically if None) - keep_mounts: Whether to keep filesystem mounts (default: False, risky if True). - Note: This is stored for documentation but actually handled at - the executor level, not in the sandbox command itself. - allocate_port: Whether to allocate port automatically (default: True) - log_prefix: Prefix for log files (default: "sandbox") - - Example: - sandbox = SandboxScript( - cluster_config=cluster_config, - keep_mounts=False, # Safer: sandbox has no access to mounted paths - ) - - # Client can reference sandbox port - client = GenerationClientScript(..., sandbox=sandbox) - """ - - cluster_config: Dict - port: Optional[int] = None - keep_mounts: bool = False - allocate_port: bool = True - env_overrides: Optional[List[str]] = None # Extra env vars in KEY=VALUE form - - # Sandbox spans all group nodes (e.g., for multi-node generate jobs) - span_group_nodes: bool = True - - log_prefix: str = field(default="sandbox", init=False) - - def __post_init__(self): - """Initialize sandbox script. - - - Allocates port if not provided - - Builds sandbox command using sandbox_command() - - Sets self.inline to a callable that returns command and environment vars - """ - # Allocate port if not provided - if self.port is None and self.allocate_port: - self.port = get_free_port(strategy="random") - LOG.debug(f"Allocated port {self.port} for sandbox") - - # Build sandbox command and metadata (including environment vars) - # Note: keep_mounts is handled at the executor level, not in the command itself - cmd, metadata = sandbox_command( - cluster_config=self.cluster_config, - port=self.port, - ) - - # Use a callable to return both command and environment variables - # This ensures the sandbox's LISTEN_PORT and NGINX_PORT are properly set - def build_cmd() -> Tuple[str, Dict]: - env = dict(metadata.get("environment", {})) - # Apply user-specified environment overrides - if self.env_overrides: - for override in self.env_overrides: - key, value = override.split("=", 1) - env[key] = value - return cmd, {"environment": env} - - self.set_inline(build_cmd) - super().__post_init__() - - -@dataclass(kw_only=True) -class GenerationClientScript(BaseJobScript): - """Script for LLM generation/inference client. - - This script wraps generation command builders and provides: - - Cross-component references to multiple servers and sandbox - - Lazy command building for runtime hostname resolution - - Type-safe generation configuration - - Environment variable handling for sandbox/server communication - - Attributes: - output_dir: Directory for output files - input_file: Input JSONL file (mutually exclusive with input_dir) - input_dir: Input directory (mutually exclusive with input_file) - extra_arguments: Additional arguments for generation script - random_seed: Random seed for sampling (optional) - chunk_id: Chunk ID for parallel processing (optional) - num_chunks: Total number of chunks (required if chunk_id set) - preprocess_cmd: Command to run before generation (optional) - postprocess_cmd: Command to run after generation (optional) - wandb_parameters: WandB logging configuration (optional) - with_sandbox: Whether sandbox is enabled - script: Module or file path for generation script (default: nemo_skills.inference.generate) - servers: List of ServerScript references (None for pre-hosted servers) - server_addresses_prehosted: Addresses for pre-hosted servers (parallel to servers list) - model_names: Model names for multi-model generation (optional) - server_types: Server types for multi-model generation (optional) - sandbox: Reference to SandboxScript for cross-component communication (optional) - log_prefix: Prefix for log files (default: "main") - - Examples: - # Single server - client = GenerationClientScript( - output_dir="/results", - input_file="/data/input.jsonl", - servers=[server_script], - model_names=["llama-8b"], - server_types=["vllm"], - ) - - # Multi-model with self-hosted and pre-hosted servers - client = GenerationClientScript( - output_dir="/results", - input_file="/data/input.jsonl", - servers=[server1, server2, None], # None = pre-hosted - server_addresses_prehosted=["", "", "https://api.openai.com"], - model_names=["llama-8b", "llama-70b", "gpt-4"], - server_types=["vllm", "vllm", "openai"], - sandbox=sandbox_script, - with_sandbox=True, - ) - """ - - output_dir: str - input_file: Optional[str] = None - input_dir: Optional[str] = None - extra_arguments: str = "" - random_seed: Optional[int] = None - chunk_id: Optional[int] = None - num_chunks: Optional[int] = None - preprocess_cmd: Optional[str] = None - postprocess_cmd: Optional[str] = None - wandb_parameters: Optional[Dict] = None - with_sandbox: bool = False - script: str = "nemo_skills.inference.generate" - requirements: Optional[list[str]] = None - - # Cross-component references for single/multi-model - servers: Optional[List[Optional["ServerScript"]]] = None - server_addresses_prehosted: Optional[List[str]] = None - model_names: Optional[List[str]] = None - server_types: Optional[List[str]] = None - sandbox: Optional["SandboxScript"] = None - - log_prefix: str = field(default="main", init=False) - - def __post_init__(self): - """Initialize generation client script with lazy command building. - - Builds command lazily via a callable that is evaluated when het_group_index - is assigned, allowing hostname_ref() to resolve correctly for heterogeneous jobs. - - This works for both cases: - - With cross-refs: Resolves server hostnames and sandbox ports at runtime - - Without cross-refs: Just builds the command string (no runtime resolution needed) - """ - - def build_cmd() -> Tuple[str, Dict]: - """Build command at runtime when cross-refs are resolved.""" - env_vars = {} - - # Add sandbox port to environment if sandbox is referenced - if self.sandbox: - env_vars["NEMO_SKILLS_SANDBOX_PORT"] = str(self.sandbox.port) - - # Build server addresses if servers are provided - server_addresses = None - if self.servers is not None: - server_addresses = [] - for server_idx, server_script in enumerate(self.servers): - if server_script is not None: - # Self-hosted: construct address from hostname and port refs - addr = f"{server_script.hostname_ref()}:{server_script.port}" - else: - # Pre-hosted: use the address from server_addresses_prehosted - addr = self.server_addresses_prehosted[server_idx] - server_addresses.append(addr) - - # Build generation command - cmd = get_generation_cmd( - output_dir=self.output_dir, - input_file=self.input_file, - input_dir=self.input_dir, - extra_arguments=self.extra_arguments, - random_seed=self.random_seed, - chunk_id=self.chunk_id, - num_chunks=self.num_chunks, - preprocess_cmd=self.preprocess_cmd, - postprocess_cmd=self.postprocess_cmd, - wandb_parameters=self.wandb_parameters, - with_sandbox=self.with_sandbox, - script=self.script, - requirements=self.requirements, - # Multi-model parameters (None for single-model) - server_addresses=server_addresses, - model_names=self.model_names, - server_types=self.server_types, - ) - - # Return command and runtime metadata (environment vars) - return cmd, {"environment": env_vars} - - # Always use lazy command building - self.set_inline(build_cmd) - super().__post_init__() - - -@dataclass(kw_only=True) -class NemoGymRolloutsScript(BaseJobScript): - """Script for running NeMo Gym rollout collection. - - This script orchestrates the full rollout collection workflow: - 1. Starts ng_run in background to spin up NeMo Gym servers - 2. Polls ng_status until all servers are healthy - 3. Runs ng_collect_rollouts to collect rollouts - 4. Keeps ng_run running (cleanup handled externally) - - Attributes: - config_paths: List of YAML config file paths for ng_run - input_file: Input JSONL file path for rollout collection - output_file: Output JSONL file path for rollouts - extra_arguments: Additional Hydra overrides passed to both ng_run and ng_collect_rollouts - server: Optional ServerScript reference for policy model server - server_address: Optional pre-hosted server address - sandbox: Optional SandboxScript reference for sandbox port - log_prefix: Prefix for log files (default: "nemo_gym") - - Example: - script = NemoGymRolloutsScript( - config_paths=[ - "resources_servers/ns_tools/configs/ns_tools.yaml", - "resources_servers/math_with_judge/configs/math_with_judge.yaml", - ], - input_file="/data/input.jsonl", - output_file="/data/rollouts.jsonl", - extra_arguments="+agent_name=ns_tools_simple_agent +limit=10", - server=server_script, - sandbox=sandbox_script, - ) - """ - - config_paths: List[str] - input_file: str - output_file: str - extra_arguments: str = "" - server: Optional["ServerScript"] = None - server_address: Optional[str] = None - sandbox: Optional["SandboxScript"] = None - gym_path: str = "/opt/NeMo-RL/3rdparty/Gym-workspace/Gym" - policy_api_key: str = "dummy" # API key for policy server (can be dummy for local) - policy_model_name: Optional[str] = None # Model name override for policy server - - log_prefix: str = field(default="nemo_gym", init=False) - - def __post_init__(self): - """Initialize the combined ng_run + ng_collect_rollouts script.""" - - def build_cmd() -> Tuple[str, Dict]: - """Build the full rollout collection command.""" - # Build config_paths argument - config_paths_str = ",".join(self.config_paths) - - # Build ng_run command parts - ng_run_parts = [ - "ng_run", - f'"+config_paths=[{config_paths_str}]"', - ] - - # Add policy server URL if we have a server reference or address - if self.server is not None: - server_addr = f"http://{self.server.hostname_ref()}:{self.server.port}/v1" - ng_run_parts.append(f'+policy_base_url="{server_addr}"') - elif self.server_address is not None: - ng_run_parts.append(f'+policy_base_url="{self.server_address}"') - - # Add policy API key (required by some configs) - ng_run_parts.append(f'+policy_api_key="{self.policy_api_key}"') - - # Add policy model name (required by configs) - if self.policy_model_name: - ng_run_parts.append(f'+policy_model_name="{self.policy_model_name}"') - - # Add extra arguments to ng_run - if self.extra_arguments: - ng_run_parts.append(self.extra_arguments) - - ng_run_cmd = " ".join(ng_run_parts) - - # Build ng_collect_rollouts command - ng_collect_parts = [ - "ng_collect_rollouts", - f'+input_jsonl_fpath="{self.input_file}"', - f'+output_jsonl_fpath="{self.output_file}"', - ] - - # Add extra arguments to ng_collect_rollouts - if self.extra_arguments: - ng_collect_parts.append(self.extra_arguments) - - ng_collect_cmd = " ".join(ng_collect_parts) - - # Compute the vLLM server URL for the wait check - if self.server is not None: - vllm_server_url = f"http://{self.server.hostname_ref()}:{self.server.port}/v1" - elif self.server_address is not None: - vllm_server_url = self.server_address - else: - vllm_server_url = "" - - # Build the full bash script that: - # 1. Installs NeMo Gym from 3rdparty/Gym-workspace/Gym - # 2. Waits for vLLM server to be ready - # 3. Starts ng_run in background - # 4. Polls ng_status until healthy (with early failure detection) - # 5. Runs ng_collect_rollouts - cmd = f"""set -e -set -o pipefail - -echo "=== Installing NeMo Gym ===" -cd {self.gym_path} || {{ echo "ERROR: Failed to cd to Gym directory"; exit 1; }} -uv venv --python 3.12 --allow-existing .venv || {{ echo "ERROR: Failed to create venv"; exit 1; }} -source .venv/bin/activate || {{ echo "ERROR: Failed to activate venv"; exit 1; }} -uv sync --active --extra dev || {{ echo "ERROR: Failed to sync dependencies"; exit 1; }} -echo "NeMo Gym installed successfully" - -# Disable pipefail for the polling loop (grep may return non-zero) -set +o pipefail - -# Wait for vLLM server to be ready before starting ng_run -# Note: --kill-on-bad-exit in srun ensures job fails if vLLM crashes -VLLM_SERVER_URL="{vllm_server_url}" -if [ -n "$VLLM_SERVER_URL" ]; then - echo "=== Waiting for vLLM server at $VLLM_SERVER_URL ===" - while [ $(curl -s -o /dev/null -w "%{{http_code}}" "$VLLM_SERVER_URL/models" 2>/dev/null) -ne 200 ]; do - sleep 10 - done - echo "vLLM server is ready!" -fi - -echo "=== Starting NeMo Gym servers ===" -{ng_run_cmd} & -NG_RUN_PID=$! -echo "ng_run PID: $NG_RUN_PID" - -echo "Waiting for NeMo Gym servers..." -LAST_STATUS="" -while true; do - # Check if ng_run process died - let the failure cascade naturally - if ! kill -0 $NG_RUN_PID 2>/dev/null; then - echo "ERROR: ng_run process exited unexpectedly" - wait $NG_RUN_PID 2>/dev/null # Get exit code - exit 1 - fi - - STATUS_OUTPUT=$(ng_status 2>&1) - - if echo "$STATUS_OUTPUT" | grep -q "healthy, 0 unhealthy"; then - echo "All servers ready!" - break - fi - - # Only print status when it changes (reduce verbosity) - CURRENT_STATUS=$(echo "$STATUS_OUTPUT" | grep -oE '[0-9]+ healthy' | head -1 || echo "starting") - if [ "$CURRENT_STATUS" != "$LAST_STATUS" ]; then - echo "Server status: $CURRENT_STATUS" - LAST_STATUS="$CURRENT_STATUS" - fi - - sleep 10 -done - -# Re-enable pipefail for the actual rollout collection -set -o pipefail - -echo "=== Running rollout collection ===" -echo "Input file: {self.input_file}" -echo "Output file: {self.output_file}" -mkdir -p "$(dirname "{self.output_file}")" -echo "Output directory created: $(dirname "{self.output_file}")" -echo "Running: {ng_collect_cmd}" -{ng_collect_cmd} || {{ echo "ERROR: ng_collect_rollouts failed"; kill $NG_RUN_PID 2>/dev/null || true; exit 1; }} - -echo "=== Rollout collection complete ===" -echo "Output: {self.output_file}" - -echo "=== Cleaning up ===" -kill $NG_RUN_PID 2>/dev/null || true -echo "Servers terminated." -""" - # Build environment variables for sandbox connection - # YAML configs use ${oc.env:NEMO_SKILLS_SANDBOX_HOST,127.0.0.1} and - # ${oc.env:NEMO_SKILLS_SANDBOX_PORT,6000} to resolve these - env_vars = {} - if self.sandbox is not None: - env_vars["NEMO_SKILLS_SANDBOX_HOST"] = self.sandbox.hostname_ref() - env_vars["NEMO_SKILLS_SANDBOX_PORT"] = str(self.sandbox.port) - - return cmd.strip(), {"environment": env_vars} - - self.set_inline(build_cmd) - super().__post_init__() - - -@dataclass(kw_only=True) -class MultiVLLMServerScript(BaseJobScript): - """Script for deploying multiple independent vLLM servers for Gym routing. - - This script enables data-parallel vLLM deployments where Gym routes requests - across multiple independent vLLM server replicas. Each replica runs on its own - set of GPUs and handles requests independently. - - Supports configurations like: - - 1 server per node with TP=8 (full node per server) - - 2 servers per node with TP=4 each (2 servers sharing a node) - - 4 servers per node with TP=2 each - - 8 servers per node with TP=1 each - - The script uses SLURM environment variables to determine which server instance - to start on each task: - - SLURM_PROCID: Global task ID (0 to num_nodes * servers_per_node - 1) - - SLURM_LOCALID: Task ID within node (0 to servers_per_node - 1) - - SLURM_NODEID: Node ID (0 to num_nodes - 1) - - Attributes: - model_path: Path to model weights or HuggingFace model name - cluster_config: Cluster configuration dictionary - num_nodes: Number of nodes to use - servers_per_node: Number of vLLM servers per node - gpus_per_server: GPUs per server (tensor_parallel_size) - server_args: Additional vLLM server arguments - base_port: Starting port (servers use base_port + global_rank) - - Example: - # 4 nodes × 2 servers per node = 8 total replicas, each with TP=4 - servers = MultiVLLMServerScript( - model_path="/models/llama-70b", - cluster_config=cluster_config, - num_nodes=4, - servers_per_node=2, - gpus_per_server=4, - server_args="--dtype auto --max-model-len 8192", - ) - - # Use in CommandGroup - group = CommandGroup( - commands=[ - Command(script=servers, container="vllm", name="vllm"), - Command(script=gym_client, container="nemo-skills", name="gym"), - ], - hardware=HardwareConfig( - num_nodes=4, - num_gpus=8, - num_tasks=2, # servers_per_node - ), - ) - """ - - model_path: str - cluster_config: Dict - num_nodes: int = 1 - servers_per_node: int = 1 - gpus_per_server: int = 8 - server_args: str = "" - base_port: Optional[int] = None - # Mitigations for vLLM v1 shared-memory communicator instability on some clusters. - # When enabled, we export env vars that (if supported by the vLLM version) disable - # SHM-based broadcast paths, which are sensitive to /dev/shm sizing and IPC quirks. - disable_shm_broadcast: bool = False - # Print /dev/shm diagnostics at startup to help debug node-to-node differences. - print_shm_diagnostics: bool = False - - # Spans all nodes - each SLURM task runs one server - span_group_nodes: bool = True - - # Internal tracking - _total_replicas: int = field(init=False, repr=False, default=0) - _ports: List[int] = field(init=False, repr=False, default_factory=list) - log_prefix: str = field(default="vllm_servers", init=False) - - def __post_init__(self): - """Build command for distributed vLLM servers.""" - self._total_replicas = self.num_nodes * self.servers_per_node - - # Set num_tasks_override to ensure this script gets the right task count - self.num_tasks_override = self.servers_per_node - - # Allocate ports - if self.base_port is None: - self.base_port = get_free_port(strategy="random") - self._ports = [self.base_port + i for i in range(self._total_replicas)] - - ports_str = " ".join(str(p) for p in self._ports) - - # Build command that runs one server per SLURM task - disable_shm_broadcast = "true" if self.disable_shm_broadcast else "false" - print_shm_diagnostics = "true" if self.print_shm_diagnostics else "false" - cmd = f''' -#!/bin/bash -set -e - -# Configuration -PORTS=({ports_str}) -GPUS_PER_SERVER={self.gpus_per_server} -SERVERS_PER_NODE={self.servers_per_node} -DISABLE_SHM_BROADCAST={disable_shm_broadcast} -PRINT_SHM_DIAGNOSTICS={print_shm_diagnostics} - -# SLURM environment -# SLURM_PROCID: Global task ID (0 to num_nodes * servers_per_node - 1) -# SLURM_LOCALID: Task ID within this node (0 to servers_per_node - 1) -# SLURM_NODEID: Node ID (0 to num_nodes - 1) -GLOBAL_RANK=${{SLURM_PROCID:-0}} -LOCAL_RANK=${{SLURM_LOCALID:-0}} -NODE_ID=${{SLURM_NODEID:-0}} - -# Calculate which GPUs this server should use -# E.g., with 2 servers per node and 4 GPUs each: -# Local rank 0 → GPUs 0,1,2,3 -# Local rank 1 → GPUs 4,5,6,7 -GPU_START=$((LOCAL_RANK * GPUS_PER_SERVER)) -GPU_END=$((GPU_START + GPUS_PER_SERVER - 1)) -GPU_LIST=$(seq -s, $GPU_START $GPU_END) - -# Get port for this server -MY_PORT=${{PORTS[$GLOBAL_RANK]}} - -echo "=== vLLM Server Configuration ===" -echo "Global Rank: $GLOBAL_RANK" -echo "Node ID: $NODE_ID" -echo "Local Rank: $LOCAL_RANK" -echo "Port: $MY_PORT" -echo "GPUs: $GPU_LIST (CUDA_VISIBLE_DEVICES)" -echo "Tensor Parallel Size: {self.gpus_per_server}" -echo "Model: {self.model_path}" -echo "=================================" - -# Debug /dev/shm and IPC-related info (helps with node-to-node variance) -if [ "$PRINT_SHM_DIAGNOSTICS" = "true" ]; then - echo "" - echo "=== /dev/shm diagnostics (before vLLM start) ===" - df -h /dev/shm || true - ls -ld /dev/shm || true - mount | grep -E " /dev/shm " || true - echo "ulimit -n: $(ulimit -n || true)" - echo "===============================================" - echo "" -fi - -# Set GPU visibility -export CUDA_VISIBLE_DEVICES=$GPU_LIST - -# Mitigation: attempt to disable vLLM SHM broadcast transport (best-effort, version-dependent). -# These env vars are intentionally safe: if vLLM doesn't recognize them, they are ignored. -if [ "$DISABLE_SHM_BROADCAST" = "true" ]; then - export VLLM_DISABLE_SHM_BROADCAST=1 - export VLLM_USE_SHM_BROADCAST=0 -fi - -# Start vLLM server -python3 -m vllm.entrypoints.openai.api_server \\ - --model "{self.model_path}" \\ - --host "0.0.0.0" \\ - --port "$MY_PORT" \\ - --tensor-parallel-size {self.gpus_per_server} \\ - --trust-remote-code \\ - {self.server_args} -''' - - self.set_inline(cmd) - super().__post_init__() - - @property - def total_replicas(self) -> int: - """Total number of vLLM server replicas.""" - return self._total_replicas - - @property - def ports(self) -> List[int]: - """List of ports for all server replicas.""" - return self._ports - - -@dataclass(kw_only=True) -class GymClientScript(BaseJobScript): - """Script that starts Gym with routing to multiple vLLM servers. - - This script runs on a single node (the master node) and: - 1. Waits for all vLLM servers to be healthy - 2. Builds the list of server URLs - 3. Exports GYM_VLLM_BASE_URLS environment variable - 4. Starts Gym with the configured command - - Attributes: - servers: Reference to MultiVLLMServerScript for server URLs - gym_command: Command to run Gym (e.g., "python -m nemo_gym.server ...") - health_check_interval: Seconds between health check attempts (default: 10) - - Example: - gym_client = GymClientScript( - servers=vllm_servers, - gym_command="python -m nemo_gym.server --config /path/to/config.yaml", - ) - - # In same CommandGroup as vllm_servers - group = CommandGroup( - commands=[ - Command(script=vllm_servers, container="vllm", name="vllm"), - Command(script=gym_client, container="nemo-skills", name="gym"), - ], - ... - ) - """ - - servers: "MultiVLLMServerScript" - gym_command: str - health_check_interval: int = 10 # Seconds between health check attempts - - # Runs only on master node, single task - span_group_nodes: bool = False - num_tasks_override: int = 1 - - log_prefix: str = field(default="gym", init=False) - - def __post_init__(self): - """Build command that waits for servers then starts Gym.""" - - def build_cmd() -> Tuple[str, Dict]: - ports_str = " ".join(str(p) for p in self.servers.ports) - num_servers = self.servers.total_replicas - num_nodes = self.servers.num_nodes - servers_per_node = self.servers.servers_per_node - - cmd = f""" -#!/bin/bash -set -e - -echo "=== Gym Client Starting ===" -echo "Waiting for {num_servers} vLLM servers across {num_nodes} nodes..." -echo "No timeout - will wait until servers are ready or job is cancelled." -echo "If vLLM servers crash, the job will fail." - -# Configuration -PORTS=({ports_str}) -NUM_NODES={num_nodes} -SERVERS_PER_NODE={servers_per_node} -HEALTH_CHECK_INTERVAL={self.health_check_interval} - -# Expand SLURM_JOB_NODELIST without scontrol (not available in containers) -# Uses Python to parse the compressed node list format (e.g., "node[001-004]") -expand_nodelist() {{ - python3 -c " -import re -import sys - -nodelist = sys.argv[1] -nodes = [] - -# Handle comma-separated parts -for part in re.split(r',(?![^\\[]*\\])', nodelist): - # Check if it has a range like prefix[001-003,005] - match = re.match(r'(.*)\\[([^\\]]+)\\](.*)', part) - if match: - prefix, ranges, suffix = match.groups() - for r in ranges.split(','): - if '-' in r: - start, end = r.split('-') - width = len(start) - for i in range(int(start), int(end) + 1): - nodes.append(f'{{prefix}}{{i:0{{width}}d}}{{suffix}}') - else: - nodes.append(f'{{prefix}}{{r}}{{suffix}}') - else: - nodes.append(part) - -print(' '.join(nodes)) -" "$1" -}} - -# Get all node hostnames. -# IMPORTANT: depending on how the step is launched, some clusters populate only one of these. -# We'll pick the candidate that expands to the most nodes, and verify it matches NUM_NODES. -BEST_NODES_STR="" -BEST_COUNT=0 -for VAR_NAME in SLURM_JOB_NODELIST SLURM_NODELIST SLURM_STEP_NODELIST; do - NL="${{!VAR_NAME:-}}" - if [ -z "$NL" ]; then - continue - fi - EXPANDED="$(expand_nodelist "$NL" || true)" - COUNT="$(echo "$EXPANDED" | wc -w | tr -d ' ')" - if [ "$COUNT" -gt "$BEST_COUNT" ]; then - BEST_COUNT="$COUNT" - BEST_NODES_STR="$EXPANDED" - BEST_VAR_NAME="$VAR_NAME" - fi -done - -if [ "$BEST_COUNT" -lt "$NUM_NODES" ]; then - echo "ERROR: Could not discover enough node hostnames for NUM_NODES=$NUM_NODES" - echo " Best candidate: ${{BEST_VAR_NAME:-}} -> $BEST_COUNT nodes: '${{BEST_NODES_STR:-}}'" - echo " SLURM_JOB_NODELIST='${{SLURM_JOB_NODELIST:-}}'" - echo " SLURM_NODELIST='${{SLURM_NODELIST:-}}'" - echo " SLURM_STEP_NODELIST='${{SLURM_STEP_NODELIST:-}}'" - exit 1 -fi - -read -ra NODES <<< "$BEST_NODES_STR" -echo "Nodes (from $BEST_VAR_NAME): ${{NODES[*]}}" - -# Build URL list and wait for each server (no timeout - wait indefinitely) -URLS="" -GLOBAL_IDX=0 -START_TIME=$(date +%s) - -for NODE_IDX in $(seq 0 $((NUM_NODES - 1))); do - NODE=${{NODES[$NODE_IDX]}} - if [ -z "$NODE" ]; then - echo "ERROR: Empty NODE at index $NODE_IDX. Nodes: ${{NODES[*]}}" - exit 1 - fi - - for LOCAL_IDX in $(seq 0 $((SERVERS_PER_NODE - 1))); do - PORT=${{PORTS[$GLOBAL_IDX]}} - URL="http://${{NODE}}:${{PORT}}/v1" - - echo "Waiting for server $GLOBAL_IDX at $URL..." - ATTEMPT=0 - while true; do - ATTEMPT=$((ATTEMPT + 1)) - if curl -s "${{URL}}/health" > /dev/null 2>&1; then - ELAPSED=$(($(date +%s) - START_TIME)) - echo " Server $GLOBAL_IDX is ready (after ${{ELAPSED}}s total)" - break - fi - # Log progress every 60 seconds - if [ $((ATTEMPT % (60 / HEALTH_CHECK_INTERVAL))) -eq 0 ]; then - ELAPSED=$(($(date +%s) - START_TIME)) - echo " ... still waiting for server $GLOBAL_IDX (${{ELAPSED}}s elapsed)" - fi - sleep $HEALTH_CHECK_INTERVAL - done - - if [ -n "$URLS" ]; then URLS="$URLS,"; fi - URLS="${{URLS}}$URL" - GLOBAL_IDX=$((GLOBAL_IDX + 1)) - done -done - -TOTAL_TIME=$(($(date +%s) - START_TIME)) -echo "" -echo "=== All {num_servers} servers ready! (took ${{TOTAL_TIME}}s) ===" -echo "Server URLs: $URLS" -echo "" - -# Export for Gym to use -export GYM_VLLM_BASE_URLS="$URLS" - -# Run Gym -echo "Starting Gym..." -{self.gym_command} -""" - - return cmd.strip(), {"environment": {}} - - self.set_inline(build_cmd) - super().__post_init__() diff --git a/nemo_skills/pipeline/utils/scripts/__init__.py b/nemo_skills/pipeline/utils/scripts/__init__.py new file mode 100644 index 0000000000..2f3f8a3a41 --- /dev/null +++ b/nemo_skills/pipeline/utils/scripts/__init__.py @@ -0,0 +1,28 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Script classes for NeMo-Skills pipeline components.""" + +from nemo_skills.pipeline.utils.scripts.base import BaseJobScript +from nemo_skills.pipeline.utils.scripts.generation import GenerationClientScript +from nemo_skills.pipeline.utils.scripts.nemo_gym import NemoGymRolloutsScript +from nemo_skills.pipeline.utils.scripts.server import SandboxScript, ServerScript + +__all__ = [ + "BaseJobScript", + "GenerationClientScript", + "NemoGymRolloutsScript", + "SandboxScript", + "ServerScript", +] diff --git a/nemo_skills/pipeline/utils/scripts/base.py b/nemo_skills/pipeline/utils/scripts/base.py new file mode 100644 index 0000000000..328b40d591 --- /dev/null +++ b/nemo_skills/pipeline/utils/scripts/base.py @@ -0,0 +1,91 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Base script class for NeMo-Skills pipeline components.""" + +from dataclasses import dataclass, field +from typing import Callable, Optional, Union + +import nemo_run as run + +from nemo_skills.pipeline.utils.exp import install_packages_wrap + + +@dataclass +class BaseJobScript(run.Script): + """Base class for job component scripts with heterogeneous job support. + + This class provides: + - het_group_index tracking for cross-component references in heterogeneous SLURM jobs + - hostname_ref() method for getting hostnames in het jobs + - Common pattern for Script initialization + + Attributes: + het_group_index: Index in heterogeneous job group (set by Pipeline at runtime) + span_group_nodes: Whether to span all nodes from the group's HardwareConfig. + When False (default), the script runs on 1 node regardless of group config. + When True, the script spans all nodes specified in the group's num_nodes. + This is important for multi-node setups with --overlap where the server + needs multiple nodes but client/sandbox should run on the master node only. + num_tasks_override: Override the group's num_tasks for this specific script. + When set, this script's srun will use this value for --ntasks-per-node + instead of the group's HardwareConfig.num_tasks. Useful when multiple + scripts in a CommandGroup need different task configurations (e.g., + vLLM servers needing 2 tasks per node while Gym client needs 1). + """ + + het_group_index: Optional[int] = field(default=None, init=False, repr=False) + span_group_nodes: bool = False # Default: run on 1 node + num_tasks_override: Optional[int] = None # Per-script task count override + installation_command: Optional[str] = None + entrypoint: str = field(default="bash", init=False) + + def __post_init__(self): + """Wrap inline command with installation_command if provided.""" + if not self.installation_command: + return + + if callable(self.inline): + original_inline = self.inline + + def wrapped_inline(): + result = original_inline() + if isinstance(result, tuple): + command, metadata = result + return install_packages_wrap(command, self.installation_command), metadata + return install_packages_wrap(result, self.installation_command) + + self.set_inline(wrapped_inline) + elif isinstance(self.inline, str): + self.set_inline(install_packages_wrap(self.inline, self.installation_command)) + + def set_inline(self, command: Union[str, Callable, run.Script]) -> None: + """Set the inline command safely on frozen dataclass.""" + object.__setattr__(self, "inline", command) + + def hostname_ref(self) -> str: + """Get hostname reference for hetjob cross-component communication. + + Returns a shell variable reference that resolves to the master node hostname + for this het group. Uses environment variables automatically exported by nemo-run: + SLURM_MASTER_NODE_HET_GROUP_0, SLURM_MASTER_NODE_HET_GROUP_1, etc. + + These are set via: + export SLURM_MASTER_NODE_HET_GROUP_N=$(scontrol show hostnames $SLURM_JOB_NODELIST_HET_GROUP_N | head -n1) + """ + if self.het_group_index is None: + return "127.0.0.1" # Local fallback for non-heterogeneous jobs + + # Use the environment variable exported by nemo-run + return f"${{SLURM_MASTER_NODE_HET_GROUP_{self.het_group_index}:-localhost}}" diff --git a/nemo_skills/pipeline/utils/scripts/generation.py b/nemo_skills/pipeline/utils/scripts/generation.py new file mode 100644 index 0000000000..5c477ab02a --- /dev/null +++ b/nemo_skills/pipeline/utils/scripts/generation.py @@ -0,0 +1,118 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Generation client script for NeMo-Skills pipeline.""" + +from dataclasses import dataclass, field +from typing import Dict, List, Optional, Tuple + +from nemo_skills.pipeline.utils.generation import get_generation_cmd +from nemo_skills.pipeline.utils.scripts.base import BaseJobScript +from nemo_skills.pipeline.utils.scripts.server import SandboxScript, ServerScript + + +@dataclass(kw_only=True) +class GenerationClientScript(BaseJobScript): + """Script for LLM generation/inference client. + + This script wraps generation command builders and provides: + - Cross-component references to multiple servers and sandbox + - Lazy command building for runtime hostname resolution + - Type-safe generation configuration + - Environment variable handling for sandbox/server communication + + Attributes: + output_dir: Directory for output files + input_file: Input JSONL file (mutually exclusive with input_dir) + input_dir: Input directory (mutually exclusive with input_file) + extra_arguments: Additional arguments for generation script + random_seed: Random seed for sampling (optional) + chunk_id: Chunk ID for parallel processing (optional) + num_chunks: Total number of chunks (required if chunk_id set) + preprocess_cmd: Command to run before generation (optional) + postprocess_cmd: Command to run after generation (optional) + wandb_parameters: WandB logging configuration (optional) + with_sandbox: Whether sandbox is enabled + script: Module or file path for generation script + servers: List of ServerScript references (None for pre-hosted servers) + server_addresses_prehosted: Addresses for pre-hosted servers + model_names: Model names for multi-model generation (optional) + server_types: Server types for multi-model generation (optional) + sandbox: Reference to SandboxScript (optional) + log_prefix: Prefix for log files (default: "main") + """ + + output_dir: str + input_file: Optional[str] = None + input_dir: Optional[str] = None + extra_arguments: str = "" + random_seed: Optional[int] = None + chunk_id: Optional[int] = None + num_chunks: Optional[int] = None + preprocess_cmd: Optional[str] = None + postprocess_cmd: Optional[str] = None + wandb_parameters: Optional[Dict] = None + with_sandbox: bool = False + script: str = "nemo_skills.inference.generate" + requirements: Optional[list[str]] = None + + # Cross-component references for single/multi-model + servers: Optional[List[Optional["ServerScript"]]] = None + server_addresses_prehosted: Optional[List[str]] = None + model_names: Optional[List[str]] = None + server_types: Optional[List[str]] = None + sandbox: Optional["SandboxScript"] = None + + log_prefix: str = field(default="main", init=False) + + def __post_init__(self): + def build_cmd() -> Tuple[str, Dict]: + env_vars = {} + + if self.sandbox: + env_vars["NEMO_SKILLS_SANDBOX_PORT"] = str(self.sandbox.port) + + server_addresses = None + if self.servers is not None: + server_addresses = [] + for server_idx, server_script in enumerate(self.servers): + if server_script is not None: + addr = f"{server_script.hostname_ref()}:{server_script.port}" + else: + addr = self.server_addresses_prehosted[server_idx] + server_addresses.append(addr) + + cmd = get_generation_cmd( + output_dir=self.output_dir, + input_file=self.input_file, + input_dir=self.input_dir, + extra_arguments=self.extra_arguments, + random_seed=self.random_seed, + chunk_id=self.chunk_id, + num_chunks=self.num_chunks, + preprocess_cmd=self.preprocess_cmd, + postprocess_cmd=self.postprocess_cmd, + wandb_parameters=self.wandb_parameters, + with_sandbox=self.with_sandbox, + script=self.script, + requirements=self.requirements, + server_addresses=server_addresses, + model_names=self.model_names, + server_types=self.server_types, + ) + + return cmd, {"environment": env_vars} + + self.set_inline(build_cmd) + super().__post_init__() diff --git a/nemo_skills/pipeline/utils/scripts/nemo_gym.py b/nemo_skills/pipeline/utils/scripts/nemo_gym.py new file mode 100644 index 0000000000..2176026514 --- /dev/null +++ b/nemo_skills/pipeline/utils/scripts/nemo_gym.py @@ -0,0 +1,200 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""NeMo Gym rollout collection script for NeMo-Skills pipeline.""" + +from dataclasses import dataclass, field +from typing import Dict, List, Optional, Tuple + +from nemo_skills.pipeline.utils.scripts.base import BaseJobScript +from nemo_skills.pipeline.utils.scripts.server import SandboxScript, ServerScript +from nemo_skills.utils import get_server_wait_cmd + + +@dataclass(kw_only=True) +class NemoGymRolloutsScript(BaseJobScript): + """Script for running NeMo Gym rollout collection. + + This script orchestrates the full rollout collection workflow: + 1. Starts ng_run in background to spin up NeMo Gym servers + 2. Polls ng_status until all servers are healthy + 3. Runs ng_collect_rollouts to collect rollouts + 4. Keeps ng_run running (cleanup handled externally) + + Attributes: + config_paths: List of YAML config file paths for ng_run + input_file: Input JSONL file path for rollout collection + output_file: Output JSONL file path for rollouts + extra_arguments: Additional Hydra overrides passed to both ng_run and ng_collect_rollouts + server: Optional ServerScript reference for policy model server + server_address: Optional pre-hosted server address + sandbox: Optional SandboxScript reference for sandbox port + gym_path: Path to NeMo Gym installation + policy_api_key: API key for policy server + policy_model_name: Model name override for policy server + log_prefix: Prefix for log files (default: "nemo_gym") + """ + + config_paths: List[str] + input_file: str + output_file: str + extra_arguments: str = "" + server: Optional["ServerScript"] = None + server_address: Optional[str] = None + sandbox: Optional["SandboxScript"] = None + gym_path: str = "/opt/NeMo-RL/3rdparty/Gym-workspace/Gym" + policy_api_key: str = "dummy" + policy_model_name: Optional[str] = None + + log_prefix: str = field(default="nemo_gym", init=False) + + def __post_init__(self): + """Initialize the combined ng_run + ng_collect_rollouts script.""" + + def build_cmd() -> Tuple[str, Dict]: + """Build the full rollout collection command.""" + config_paths_str = ",".join(self.config_paths) + + # Build ng_run command parts + ng_run_parts = [ + "ng_run", + f'"+config_paths=[{config_paths_str}]"', + ] + + if self.server is not None: + server_addr = f"http://{self.server.hostname_ref()}:{self.server.port}/v1" + ng_run_parts.append(f'+policy_base_url="{server_addr}"') + elif self.server_address is not None: + ng_run_parts.append(f'+policy_base_url="{self.server_address}"') + + ng_run_parts.append(f'+policy_api_key="{self.policy_api_key}"') + + if self.policy_model_name: + ng_run_parts.append(f'+policy_model_name="{self.policy_model_name}"') + + if self.extra_arguments: + ng_run_parts.append(self.extra_arguments) + + ng_run_cmd = " ".join(ng_run_parts) + + # Build ng_collect_rollouts command + ng_collect_parts = [ + "ng_collect_rollouts", + f'+input_jsonl_fpath="{self.input_file}"', + f'+output_jsonl_fpath="{self.output_file}"', + ] + + if self.extra_arguments: + ng_collect_parts.append(self.extra_arguments) + + ng_collect_cmd = " ".join(ng_collect_parts) + + # Compute the vLLM server URL for the wait check + if self.server is not None: + vllm_server_url = f"http://{self.server.hostname_ref()}:{self.server.port}/v1" + elif self.server_address is not None: + vllm_server_url = self.server_address + else: + vllm_server_url = "" + + # Build server wait command using shared utility + if vllm_server_url: + server_wait_cmd = get_server_wait_cmd(f"{vllm_server_url}/models") + else: + server_wait_cmd = "" + + cmd = f"""set -e +set -o pipefail + +# Install/sync NeMo Gym venv. The nemo-rl container has Gym pre-installed, +# but when users mount a custom Gym path (e.g., from a dev branch or worktree), +# the mounted directory may not have a .venv. The --allow-existing flag makes +# this fast (~1s) when the venv already exists and is up to date. +echo "=== Installing NeMo Gym ===" +cd {self.gym_path} || {{ echo "ERROR: Failed to cd to Gym directory"; exit 1; }} +uv venv --python 3.12 --allow-existing .venv || {{ echo "ERROR: Failed to create venv"; exit 1; }} +source .venv/bin/activate || {{ echo "ERROR: Failed to activate venv"; exit 1; }} +uv sync --active --extra dev || {{ echo "ERROR: Failed to sync dependencies"; exit 1; }} +echo "NeMo Gym installed successfully" + +# Disable pipefail for the polling loop (grep may return non-zero) +set +o pipefail + +# Wait for vLLM server to be ready before starting ng_run +# Note: --kill-on-bad-exit in srun ensures job fails if vLLM crashes +if [ -n "{vllm_server_url}" ]; then + echo "=== Waiting for vLLM server at {vllm_server_url} ===" + {server_wait_cmd} + echo "vLLM server is ready!" +fi + +echo "=== Starting NeMo Gym servers ===" +{ng_run_cmd} & +NG_RUN_PID=$! +echo "ng_run PID: $NG_RUN_PID" + +echo "Waiting for NeMo Gym servers..." +LAST_STATUS="" +while true; do + # Check if ng_run process died - let the failure cascade naturally + if ! kill -0 $NG_RUN_PID 2>/dev/null; then + echo "ERROR: ng_run process exited unexpectedly" + wait $NG_RUN_PID 2>/dev/null # Get exit code + exit 1 + fi + + STATUS_OUTPUT=$(ng_status 2>&1) + + if echo "$STATUS_OUTPUT" | grep -q "healthy, 0 unhealthy"; then + echo "All servers ready!" + break + fi + + # Only print status when it changes (reduce verbosity) + CURRENT_STATUS=$(echo "$STATUS_OUTPUT" | grep -oE '[0-9]+ healthy' | head -1 || echo "starting") + if [ "$CURRENT_STATUS" != "$LAST_STATUS" ]; then + echo "Server status: $CURRENT_STATUS" + LAST_STATUS="$CURRENT_STATUS" + fi + + sleep 10 +done + +# Re-enable pipefail for the actual rollout collection +set -o pipefail + +echo "=== Running rollout collection ===" +echo "Input file: {self.input_file}" +echo "Output file: {self.output_file}" +mkdir -p "$(dirname "{self.output_file}")" +echo "Output directory created: $(dirname "{self.output_file}")" +echo "Running: {ng_collect_cmd}" +{ng_collect_cmd} || {{ echo "ERROR: ng_collect_rollouts failed"; kill $NG_RUN_PID 2>/dev/null || true; exit 1; }} + +echo "=== Rollout collection complete ===" +echo "Output: {self.output_file}" + +echo "=== Cleaning up ===" +kill $NG_RUN_PID 2>/dev/null || true +echo "Servers terminated." +""" + env_vars = {} + if self.sandbox is not None: + env_vars["NEMO_SKILLS_SANDBOX_HOST"] = self.sandbox.hostname_ref() + env_vars["NEMO_SKILLS_SANDBOX_PORT"] = str(self.sandbox.port) + + return cmd.strip(), {"environment": env_vars} + + self.set_inline(build_cmd) + super().__post_init__() diff --git a/nemo_skills/pipeline/utils/scripts/server.py b/nemo_skills/pipeline/utils/scripts/server.py new file mode 100644 index 0000000000..57513c1062 --- /dev/null +++ b/nemo_skills/pipeline/utils/scripts/server.py @@ -0,0 +1,145 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Server and sandbox script classes for NeMo-Skills pipeline.""" + +import logging +from dataclasses import dataclass, field +from typing import Dict, List, Optional, Tuple + +from nemo_skills.pipeline.utils.commands import sandbox_command +from nemo_skills.pipeline.utils.scripts.base import BaseJobScript +from nemo_skills.pipeline.utils.server import get_free_port, get_server_command +from nemo_skills.utils import get_logger_name + +LOG = logging.getLogger(get_logger_name(__file__)) + + +@dataclass(kw_only=True) +class ServerScript(BaseJobScript): + """Script for model inference servers (vLLM, TRT-LLM, SGLang, etc.). + + This script wraps server command builders and provides: + - Automatic port allocation if not specified + - Type-safe server configuration + - Cross-component address sharing (get_address()) + - Resource requirement tracking (num_gpus, num_nodes, num_tasks) + + Attributes: + server_type: Type of server (vllm, trtllm, sglang, megatron, openai, etc.) + model_path: Path to model weights or model name for API services + cluster_config: Cluster configuration dictionary + num_gpus: Number of GPUs required (default: 8) + num_nodes: Number of nodes required (default: 1) + server_args: Additional server-specific arguments + server_entrypoint: Custom server entrypoint script (optional) + port: Server port (allocated automatically if None) + allocate_port: Whether to allocate port automatically (default: True) + num_tasks: Number of MPI tasks (computed in __post_init__) + log_prefix: Prefix for log files (default: "server") + + Example: + server = ServerScript( + server_type="vllm", + model_path="/models/llama-3-8b", + cluster_config=cluster_config, + num_gpus=8, + ) + print(f"Server will run on port {server.port}") + """ + + server_type: str + model_path: str + cluster_config: Dict + num_gpus: int = 8 + num_nodes: int = 1 + server_args: str = "" + server_entrypoint: Optional[str] = None + port: Optional[int] = None + allocate_port: bool = True + + # Server spans all group nodes (e.g., for distributed inference) + span_group_nodes: bool = True + + # Computed fields (set in __post_init__) + num_tasks: int = field(init=False, repr=False) + log_prefix: str = field(default="server", init=False) + + def __post_init__(self): + if self.port is None and self.allocate_port: + self.port = get_free_port(strategy="random") + LOG.debug(f"Allocated port {self.port} for {self.server_type} server") + + cmd, self.num_tasks = get_server_command( + server_type=self.server_type, + num_gpus=self.num_gpus, + num_nodes=self.num_nodes, + model_path=self.model_path, + cluster_config=self.cluster_config, + server_port=self.port, + server_args=self.server_args, + server_entrypoint=self.server_entrypoint, + ) + + self.set_inline(cmd) + super().__post_init__() + + def get_address(self) -> str: + """Get server address for client connections (hostname:port).""" + return f"{self.hostname_ref()}:{self.port}" + + +@dataclass(kw_only=True) +class SandboxScript(BaseJobScript): + """Script for code execution sandbox container. + + Attributes: + cluster_config: Cluster configuration dictionary + port: Sandbox port (allocated automatically if None) + keep_mounts: Whether to keep filesystem mounts (default: False, risky if True). + allocate_port: Whether to allocate port automatically (default: True) + log_prefix: Prefix for log files (default: "sandbox") + """ + + cluster_config: Dict + port: Optional[int] = None + keep_mounts: bool = False + allocate_port: bool = True + env_overrides: Optional[List[str]] = None + + # Sandbox spans all group nodes (e.g., for multi-node generate jobs) + span_group_nodes: bool = True + + log_prefix: str = field(default="sandbox", init=False) + + def __post_init__(self): + if self.port is None and self.allocate_port: + self.port = get_free_port(strategy="random") + LOG.debug(f"Allocated port {self.port} for sandbox") + + cmd, metadata = sandbox_command( + cluster_config=self.cluster_config, + port=self.port, + ) + + def build_cmd() -> Tuple[str, Dict]: + env = dict(metadata.get("environment", {})) + if self.env_overrides: + for override in self.env_overrides: + key, value = override.split("=", 1) + env[key] = value + return cmd, {"environment": env} + + self.set_inline(build_cmd) + super().__post_init__() From 278f370cc306e93ebfbe803bf6b98fc34b77ac87 Mon Sep 17 00:00:00 2001 From: George Armstrong Date: Tue, 3 Mar 2026 14:33:29 -0800 Subject: [PATCH 4/7] docs: document packaging override env vars Signed-off-by: George Armstrong --- docs/basics/code-packaging.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/docs/basics/code-packaging.md b/docs/basics/code-packaging.md index de587c404c..1606ddfa04 100644 --- a/docs/basics/code-packaging.md +++ b/docs/basics/code-packaging.md @@ -40,6 +40,21 @@ running commands from it. ``` In all cases, uncommitted code will not be used. +!!! note + + You can override the default packaging behavior with the following environment variables: + + - `NEMO_SKILLS_FORCE_PATTERN_PACKAGER=1` — Skip git-based packaging entirely and always use the installed + `nemo_skills` package tree (PatternPackager). Useful when you have an editable install and don't want + packaging tied to the git state of your current directory. + - `NEMO_SKILLS_FORCE_INSTALLED_PACKAGE=1` — When running from a git repo, use the installed `nemo_skills` + package instead of the repo's `nemo_skills/` directory. The git repo is still packaged, but `nemo_skills` + is picked up from the installed location. Useful when your repo checkout has extra files you don't want + uploaded. + + Note that `NEMO_SKILLS_FORCE_INSTALLED_PACKAGE` has no effect when `NEMO_SKILLS_FORCE_PATTERN_PACKAGER` + is also set, since the latter bypasses the git repo branch entirely. + Finally, it's important to keep in mind that whenever you submit a new experiment, NeMo-Run will create a copy of your code package both locally (inside `~/.nemo_run`) and on cluster (inside `ssh_tunnel/job_dir` path in your cluster config). From aa2320ea68dc266b50ae959c37763af197d24297 Mon Sep 17 00:00:00 2001 From: George Armstrong Date: Tue, 3 Mar 2026 16:00:15 -0800 Subject: [PATCH 5/7] test: add GPU dry-run tests for nemo_gym_rollouts pipeline Signed-off-by: George Armstrong --- tests/gpu-tests/test_nemo_gym_rollouts.py | 71 +++++++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 tests/gpu-tests/test_nemo_gym_rollouts.py diff --git a/tests/gpu-tests/test_nemo_gym_rollouts.py b/tests/gpu-tests/test_nemo_gym_rollouts.py new file mode 100644 index 0000000000..cbbe0e96c2 --- /dev/null +++ b/tests/gpu-tests/test_nemo_gym_rollouts.py @@ -0,0 +1,71 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path + +import pytest +from utils import require_env_var + +from nemo_skills.pipeline.cli import nemo_gym_rollouts, wrap_arguments + + +@pytest.mark.gpu +def test_nemo_gym_rollouts_dry_run(): + """Test that nemo_gym_rollouts pipeline constructs correctly in dry-run mode.""" + model_path = require_env_var("NEMO_SKILLS_TEST_HF_MODEL") + + result = nemo_gym_rollouts( + ctx=wrap_arguments( + "+agent_name=math_with_judge_simple_agent " + "+num_samples_in_parallel=4 " + "+responses_create_params.max_output_tokens=256 " + "+responses_create_params.temperature=1.0 " + ), + cluster="test-local", + config_dir=str(Path(__file__).absolute().parent), + config_paths="resources_servers/math_with_judge/configs/math_with_judge.yaml,responses_api_models/vllm_model/configs/vllm_model.yaml", + input_file="resources_servers/math_with_judge/data/example.jsonl", + output_dir="/tmp/nemo-skills-tests/nemo-gym-rollouts-dry-run", + model=model_path, + server_type="vllm", + server_gpus=1, + server_args="--enforce-eager", + dry_run=True, + ) + + # dry_run returns the pipeline result without executing + assert result is not None + + +@pytest.mark.gpu +def test_nemo_gym_rollouts_dry_run_with_seeds(): + """Test that nemo_gym_rollouts creates separate jobs for each seed in dry-run mode.""" + model_path = require_env_var("NEMO_SKILLS_TEST_HF_MODEL") + + result = nemo_gym_rollouts( + ctx=wrap_arguments("+agent_name=math_with_judge_simple_agent +num_samples_in_parallel=4 "), + cluster="test-local", + config_dir=str(Path(__file__).absolute().parent), + config_paths="resources_servers/math_with_judge/configs/math_with_judge.yaml,responses_api_models/vllm_model/configs/vllm_model.yaml", + input_file="resources_servers/math_with_judge/data/example.jsonl", + output_dir="/tmp/nemo-skills-tests/nemo-gym-rollouts-dry-run-seeds", + model=model_path, + server_type="vllm", + server_gpus=1, + server_args="--enforce-eager", + num_random_seeds=3, + dry_run=True, + ) + + assert result is not None From 12169c560abd0f68955ae4edf2cccdf5ddf58650 Mon Sep 17 00:00:00 2001 From: George Armstrong Date: Tue, 3 Mar 2026 16:35:35 -0800 Subject: [PATCH 6/7] fix: align server_address/model params with ns generate pattern Keep --server_address as separate param (matching generate pipeline). Use server_gpus as the discriminator: truthy = self-hosted, falsy = pre-hosted. This matches configure_client() in generation.py. Signed-off-by: George Armstrong --- nemo_skills/pipeline/nemo_gym_rollouts.py | 40 +++++++++++------------ 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/nemo_skills/pipeline/nemo_gym_rollouts.py b/nemo_skills/pipeline/nemo_gym_rollouts.py index 99f971c6d6..b69119e1ad 100644 --- a/nemo_skills/pipeline/nemo_gym_rollouts.py +++ b/nemo_skills/pipeline/nemo_gym_rollouts.py @@ -42,8 +42,8 @@ --config_paths "ns_tools/configs/ns_tools.yaml" \\ --input_file data/example.jsonl \\ --output_dir /results/rollouts \\ - --model http://localhost:8000/v1 \\ - --policy_model_name nvidia/model-name \\ + --server_address http://localhost:8000/v1 \\ + --model nvidia/model-name \\ +agent_name=ns_tools_simple_agent """ @@ -88,9 +88,11 @@ def nemo_gym_rollouts( input_file: str = typer.Option(..., help="Path to input JSONL file for rollout collection"), output_dir: str = typer.Option(..., help="Directory for rollout outputs. Output file will be rollouts.jsonl"), expname: str = typer.Option("nemo_gym_rollouts", help="NeMo Run experiment name"), - model: str = typer.Option( + model: str = typer.Option(None, help="Path to model or model name for the policy server"), + server_address: str = typer.Option( None, - help="Model path for self-hosted server, or server URL (e.g., http://host:8000/v1) for pre-hosted.", + help="Address of pre-hosted server (e.g., http://localhost:8000/v1). " + "If provided, skips self-hosted server setup.", ), server_type: pipeline_utils.SupportedServers = typer.Option( None, @@ -184,29 +186,27 @@ def nemo_gym_rollouts( config_paths_list = [p.strip() for p in config_paths.split(",") if p.strip()] LOG.info(f"Config paths: {config_paths_list}") - # Determine if model is a URL (pre-hosted) or a path (self-hosted) - pre_hosted = model is not None and model.startswith("http") - self_hosted = model is not None and not pre_hosted and server_gpus is not None + # Determine hosting mode: server_gpus as discriminator (matches ns generate pattern) + self_hosted = server_gpus is not None and server_gpus > 0 + pre_hosted = not self_hosted - if model is None: - raise ValueError("--model is required. Provide a model path for self-hosted or a URL for pre-hosted server.") - - if not self_hosted and not pre_hosted: - raise ValueError( - "--server_gpus is required when using a self-hosted server (model path). " - "Or provide a URL (http://...) for pre-hosted." - ) + if self_hosted and model is None: + raise ValueError("--model is required when using self-hosted server") if self_hosted and server_type is None: raise ValueError("--server_type is required when using self-hosted server") + if pre_hosted and server_address is None: + raise ValueError("--server_address is required when not using self-hosted server (no --server_gpus)") + # Validate and set policy_model_name - if pre_hosted and policy_model_name is None: - raise ValueError("--policy_model_name is required when using a pre-hosted server") + if pre_hosted and policy_model_name is None and model is None: + raise ValueError("--policy_model_name or --model is required when using a pre-hosted server") - if self_hosted and policy_model_name is None: + if policy_model_name is None: policy_model_name = model - LOG.info(f"Using model path as policy_model_name: {policy_model_name}") + if policy_model_name: + LOG.info(f"Using model path as policy_model_name: {policy_model_name}") # Get cluster config cluster_config = pipeline_utils.get_cluster_config(cluster, config_dir) @@ -319,7 +319,7 @@ def nemo_gym_rollouts( output_file=output_file, extra_arguments=extra_arguments, server=server_script, - server_address=model if pre_hosted else None, + server_address=server_address if pre_hosted else None, sandbox=sandbox_script, gym_path=gym_path, policy_api_key=policy_api_key, From 14a71cdf1c2f058e0a590ecb15496f7502209f89 Mon Sep 17 00:00:00 2001 From: George Armstrong Date: Tue, 3 Mar 2026 16:46:41 -0800 Subject: [PATCH 7/7] fix: add input validation from CodeRabbit review - Reject self-hosted flags (server_type, server_args, etc.) in pre-hosted mode - Validate seed inputs: reject num_random_seeds <= 0 and duplicate seeds - Reject conflicting server + server_address in NemoGymRolloutsScript - Fail fast when allocate_port=False and port is None (ServerScript, SandboxScript) Signed-off-by: George Armstrong --- nemo_skills/pipeline/nemo_gym_rollouts.py | 20 ++++++++++++++++++- .../pipeline/utils/scripts/nemo_gym.py | 2 ++ nemo_skills/pipeline/utils/scripts/server.py | 4 ++++ 3 files changed, 25 insertions(+), 1 deletion(-) diff --git a/nemo_skills/pipeline/nemo_gym_rollouts.py b/nemo_skills/pipeline/nemo_gym_rollouts.py index b69119e1ad..a82e156c79 100644 --- a/nemo_skills/pipeline/nemo_gym_rollouts.py +++ b/nemo_skills/pipeline/nemo_gym_rollouts.py @@ -199,6 +199,19 @@ def nemo_gym_rollouts( if pre_hosted and server_address is None: raise ValueError("--server_address is required when not using self-hosted server (no --server_gpus)") + if pre_hosted: + invalid_flags = [] + if server_type is not None: + invalid_flags.append("--server_type") + if server_nodes != 1: + invalid_flags.append("--server_nodes") + if server_args: + invalid_flags.append("--server_args") + if server_container is not None: + invalid_flags.append("--server_container") + if invalid_flags: + raise ValueError(f"{', '.join(invalid_flags)} are only valid for self-hosted mode (--server_gpus)") + # Validate and set policy_model_name if pre_hosted and policy_model_name is None and model is None: raise ValueError("--policy_model_name or --model is required when using a pre-hosted server") @@ -224,7 +237,9 @@ def nemo_gym_rollouts( else: seed_indices = list(random_seeds) LOG.info(f"Using explicit seeds: {seed_indices}") - elif num_random_seeds: + elif num_random_seeds is not None: + if num_random_seeds <= 0: + raise ValueError("--num_random_seeds must be > 0") seed_indices = list(range(starting_seed, starting_seed + num_random_seeds)) LOG.info( f"Creating {num_random_seeds} separate jobs (rs{starting_seed}..rs{starting_seed + num_random_seeds - 1})" @@ -232,6 +247,9 @@ def nemo_gym_rollouts( else: seed_indices = [None] # Single job, no seed suffix + if seed_indices != [None] and len(seed_indices) != len(set(seed_indices)): + raise ValueError("--random_seeds must not contain duplicate values") + # Get server type string and container if self-hosted server_type_str = None resolved_server_container = None diff --git a/nemo_skills/pipeline/utils/scripts/nemo_gym.py b/nemo_skills/pipeline/utils/scripts/nemo_gym.py index 2176026514..4a9116d0a4 100644 --- a/nemo_skills/pipeline/utils/scripts/nemo_gym.py +++ b/nemo_skills/pipeline/utils/scripts/nemo_gym.py @@ -61,6 +61,8 @@ class NemoGymRolloutsScript(BaseJobScript): def __post_init__(self): """Initialize the combined ng_run + ng_collect_rollouts script.""" + if self.server is not None and self.server_address is not None: + raise ValueError("Specify only one of `server` or `server_address`.") def build_cmd() -> Tuple[str, Dict]: """Build the full rollout collection command.""" diff --git a/nemo_skills/pipeline/utils/scripts/server.py b/nemo_skills/pipeline/utils/scripts/server.py index 57513c1062..e489caafa6 100644 --- a/nemo_skills/pipeline/utils/scripts/server.py +++ b/nemo_skills/pipeline/utils/scripts/server.py @@ -80,6 +80,8 @@ def __post_init__(self): if self.port is None and self.allocate_port: self.port = get_free_port(strategy="random") LOG.debug(f"Allocated port {self.port} for {self.server_type} server") + if self.port is None: + raise ValueError("ServerScript requires `port` when allocate_port=False") cmd, self.num_tasks = get_server_command( server_type=self.server_type, @@ -127,6 +129,8 @@ def __post_init__(self): if self.port is None and self.allocate_port: self.port = get_free_port(strategy="random") LOG.debug(f"Allocated port {self.port} for sandbox") + if self.port is None: + raise ValueError("SandboxScript requires `port` when allocate_port=False") cmd, metadata = sandbox_command( cluster_config=self.cluster_config,