diff --git a/.github/workflows/gpu_tests.yml b/.github/workflows/gpu_tests.yml index 16f77633a8..a500fc59b2 100644 --- a/.github/workflows/gpu_tests.yml +++ b/.github/workflows/gpu_tests.yml @@ -52,7 +52,15 @@ jobs: cd ${{ github.run_id }} nvidia-smi set -o pipefail # this will make sure next line returns non-0 exit code if tests fail - ./tests/gpu-tests/run_qwen.sh + # Run heartbeat in background, capture its PID, and ensure cleanup + (while true; do sleep 60; echo "[HEARTBEAT] $(date '+%Y-%m-%d %H:%M:%S') - still running..."; done) & + HEARTBEAT_PID=$! + # Run tests and capture exit code + EXIT_CODE=0 + ./tests/gpu-tests/run_qwen.sh || EXIT_CODE=$? + # Kill heartbeat and exit with test result + kill $HEARTBEAT_PID 2>/dev/null || true + exit $EXIT_CODE - name: Cleanup if: always() run: | diff --git a/nemo_skills/pipeline/generate.py b/nemo_skills/pipeline/generate.py index f33796d05c..90ec987bca 100644 --- a/nemo_skills/pipeline/generate.py +++ b/nemo_skills/pipeline/generate.py @@ -14,7 +14,7 @@ import importlib import logging import os -from typing import Callable, Dict, List, Optional +from typing import Dict, List, Optional import typer @@ -23,14 +23,17 @@ from nemo_skills.inference import GENERATION_MODULE_MAP, GenerationType from nemo_skills.pipeline.app import app, typer_unpacker from nemo_skills.pipeline.utils.cluster import parse_kwargs -from nemo_skills.pipeline.utils.commands import sandbox_command from nemo_skills.pipeline.utils.declarative import ( Command, CommandGroup, HardwareConfig, Pipeline, ) -from nemo_skills.pipeline.utils.server import get_free_port +from nemo_skills.pipeline.utils.scripts import ( + GenerationClientScript, + SandboxScript, + ServerScript, +) from nemo_skills.utils import ( compute_chunk_ids, get_logger_name, @@ -44,118 +47,160 @@ # TODO: add num_jobs here for consistency with eval? -def _create_commandgroup_from_config( - generation_cmd: str, - server_config: Optional[Dict], - with_sandbox: bool, - sandbox_port: Optional[int], +def _create_job_unified( + models: List[str], + server_configs: List[Optional[Dict]], + generation_params: Dict, cluster_config: Dict, installation_command: Optional[str], - get_server_command_fn: Callable, + with_sandbox: bool, partition: Optional[str], keep_mounts_for_sandbox: bool, task_name: str, log_dir: str, sbatch_kwargs: Optional[Dict] = None, sandbox_env_overrides: Optional[List[str]] = None, -) -> CommandGroup: - """Create a CommandGroup from server_config. - - Component ordering: - 1. Server (if server_config provided) - 2. Client command - 3. Sandbox (if with_sandbox=True) +) -> List[CommandGroup]: """ + Create CommandGroups for n models (unified for n=1 and n>1). + + Structure: + - Group 0: Model 0 server + client + (optional sandbox) + - Group 1: Model 1 server (if n>1) + - Group N: Model N server (if n>1) + + For n=1, returns a single-element list. The Pipeline automatically + optimizes single-group lists to efficient single-group jobs. + + Args: + models: List of model paths + server_configs: List of server configurations (one per model, None if not hosting) + generation_params: Dict of parameters for generation (output_dir, etc.) + cluster_config: Cluster configuration + installation_command: Installation command to run before client + with_sandbox: Whether to include sandbox + partition: Slurm partition + keep_mounts_for_sandbox: Whether to keep mounts for sandbox + task_name: Name for the task + log_dir: Directory for logs + sbatch_kwargs: Additional sbatch kwargs + + Returns: + List of CommandGroup objects (one per het group) + """ + num_models = len(models) + groups = [] + server_scripts = [] # Track server Script objects for cross-component references + + for model_idx, (model_path, server_config) in enumerate(zip(models, server_configs)): + components = [] + server_script = None + + # Track GPU/node requirements for this group (from server config) + group_gpus = 0 + group_nodes = 1 + + # 1. Add server if needed + if server_config is not None and int(server_config.get("num_gpus", 0)) > 0: + server_type = server_config["server_type"] + server_container = server_config.get("container") or cluster_config["containers"][server_type] - components = [] + # Create ServerScript + server_script = ServerScript( + server_type=server_type, + model_path=server_config["model_path"], + cluster_config=cluster_config, + num_gpus=server_config["num_gpus"], + num_nodes=server_config["num_nodes"], + server_args=server_config.get("server_args", ""), + server_entrypoint=server_config.get("server_entrypoint"), + port=server_config.get("server_port"), + allocate_port=(server_config.get("server_port") is None), + ) - # 1. Add server if server_config is provided - if server_config is not None and int(server_config["num_gpus"]) > 0: - server_type = server_config["server_type"] - # Get container from server_config if provided, otherwise fall back to cluster config - if "container" in server_config: - server_container = server_config.pop("container") + # Set group GPU/node requirements from server config + group_gpus = server_config["num_gpus"] + group_nodes = server_config["num_nodes"] + + server_cmd = Command( + script=server_script, + container=server_container, + name=f"{task_name}_model_{model_idx}_server" if num_models > 1 else f"{task_name}_server", + ) + components.append(server_cmd) + server_scripts.append(server_script) else: - server_container = cluster_config["containers"][server_type] + # No server for this model (pre-hosted) + server_scripts.append(None) + + # 2. Group 0 gets the client and sandbox + if model_idx == 0: + # Create sandbox script (if with_sandbox) + sandbox_script = None + if with_sandbox: + sandbox_script = SandboxScript( + cluster_config=cluster_config, + keep_mounts=keep_mounts_for_sandbox, + allocate_port=True, # Always allocate port for sandbox + env_overrides=sandbox_env_overrides, + ) + + sandbox_cmd = Command( + script=sandbox_script, + container=cluster_config["containers"]["sandbox"], + name=f"{task_name}_sandbox", + ) + components.append(sandbox_cmd) + + # Create client script with cross-component references to all servers + client_script = GenerationClientScript( + output_dir=generation_params["output_dir"], + input_file=generation_params.get("input_file"), + input_dir=generation_params.get("input_dir"), + extra_arguments=generation_params.get("extra_arguments", ""), + random_seed=generation_params.get("random_seed"), + chunk_id=generation_params.get("chunk_id"), + num_chunks=generation_params.get("num_chunks"), + preprocess_cmd=generation_params.get("preprocess_cmd"), + postprocess_cmd=generation_params.get("postprocess_cmd"), + wandb_parameters=generation_params.get("wandb_parameters"), + with_sandbox=with_sandbox, + script=generation_params.get("script", "nemo_skills.inference.generate"), + # Multi-server support (works for single and multi-model) + servers=server_scripts if server_scripts else None, + server_addresses_prehosted=generation_params.get("server_addresses_prehosted"), + model_names=generation_params.get("model_names"), + server_types=generation_params.get("server_types"), + sandbox=sandbox_script, + installation_command=installation_command, + ) - # Call server command builder directly with cluster_config - cmd, num_tasks = get_server_command_fn(**server_config, cluster_config=cluster_config) + client_cmd = Command( + script=client_script, + container=cluster_config["containers"]["nemo-skills"], + name=f"{task_name}", + ) + components.append(client_cmd) - # Create metadata dict - metadata = { - "num_tasks": num_tasks, - "gpus": server_config["num_gpus"], - "nodes": server_config["num_nodes"], - "log_prefix": "server", - } + # Only create group if it has components (skip empty groups for pre-hosted models) + if components: + group_tasks = server_script.num_tasks if (server_config and server_script) else 1 - server_cmd = Command( - command=cmd, - container=server_container, - gpus=server_config["num_gpus"], - nodes=server_config["num_nodes"], - name=task_name, - metadata=metadata, - ) - components.append(server_cmd) - - # 2. Add main generation command - # Note: General cluster config env vars are automatically added by get_env_variables() in get_executor() - client_env = {} - if with_sandbox and sandbox_port is not None: - client_env["NEMO_SKILLS_SANDBOX_PORT"] = str(sandbox_port) - - client_cmd = Command( - command=generation_cmd, - container=cluster_config["containers"]["nemo-skills"], - name=task_name, - installation_command=installation_command, - metadata={ - "log_prefix": "main", - "environment": client_env, - }, - ) - components.append(client_cmd) - - # 3. Add sandbox if requested - if with_sandbox: - # Call sandbox command builder directly with cluster_config - cmd, metadata = sandbox_command(cluster_config=cluster_config, port=sandbox_port) - metadata["log_prefix"] = "sandbox" - - # Apply user-specified environment overrides for the sandbox - if sandbox_env_overrides: - sandbox_env = metadata.get("environment", {}) - for override in sandbox_env_overrides: - key, value = override.split("=", 1) - sandbox_env[key] = value - metadata["environment"] = sandbox_env - - sandbox_cmd = Command( - command=cmd, - container=cluster_config["containers"]["sandbox"], - name=task_name, - metadata=metadata, - ) + group = CommandGroup( + commands=components, + hardware=HardwareConfig( + partition=partition, + num_gpus=group_gpus, + num_nodes=group_nodes, + num_tasks=group_tasks, + sbatch_kwargs=sbatch_kwargs, + ), + name=f"{task_name}_model_{model_idx}_group" if num_models > 1 else task_name, + log_dir=log_dir, + ) + groups.append(group) - components.append(sandbox_cmd) - - # Find maximum GPUs/nodes needed by any component for the HardwareConfig - # The job-level resource request must be the maximum across all components - max_gpus = max((comp.gpus or 0) for comp in components) - max_nodes = max((comp.nodes or 1) for comp in components) - - return CommandGroup( - commands=components, - hardware=HardwareConfig( - partition=partition, - num_gpus=max_gpus, - num_nodes=max_nodes, - sbatch_kwargs=sbatch_kwargs, - ), - name=task_name, - log_dir=log_dir, - ) + return groups @app.command(context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) @@ -186,21 +231,45 @@ def generate( "If not specified, will use the registered generation module for the " "generation type (which is required in this case).", ), - model: str = typer.Option(None, help="Path to the model or model name in API"), - server_address: str = typer.Option( - None, help="Use ip:port for self-hosted models or the API url if using model providers" + model: List[str] = typer.Option( + None, + help="Path to the model(s). CLI: space-separated. Python API: string or list. " + "Single value broadcasts to all models for multi-model generation.", + ), + server_address: List[str] = typer.Option( + None, + help="Server address(es). CLI: space-separated. Python API: string or list. " + "Single value broadcasts to all models.", + ), + server_type: List[pipeline_utils.SupportedServers] = typer.Option( + ..., + help="Server type(s). CLI: space-separated. Python API: string or list. " + "Single value broadcasts to all models.", ), - server_type: pipeline_utils.SupportedServers = typer.Option(..., help="Type of server to use"), - server_gpus: int = typer.Option(None, help="Number of GPUs to use if hosting the model"), - server_nodes: int = typer.Option(1, help="Number of nodes required for hosting LLM server"), - server_args: str = typer.Option("", help="Any extra arguments to pass to the server"), - server_entrypoint: str = typer.Option( + server_gpus: List[int] = typer.Option( None, - help="Path to the entrypoint of the server. " - "If not specified, will use the default entrypoint for the server type.", + help="Number of GPUs per model. CLI: space-separated ints. Python API: int or list. " + "Single value broadcasts to all models.", ), - server_container: str = typer.Option( - None, help="Override container image for the hosted server (if server_gpus is set)" + server_nodes: List[int] = typer.Option( + [1], + help="Number of nodes per model. CLI: space-separated ints. Python API: int or list. " + "Single value broadcasts to all models.", + ), + server_args: List[str] = typer.Option( + [""], + help="Server arguments per model. CLI: space-separated. Python API: string or list. " + "Single value broadcasts to all models.", + ), + server_entrypoint: List[str] = typer.Option( + None, + help="Server entrypoint(s). CLI: space-separated. Python API: string or list. " + "Single value broadcasts to all models.", + ), + server_container: List[str] = typer.Option( + None, + help="Container image(s). CLI: space-separated. Python API: string or list. " + "Single value broadcasts to all models.", ), dependent_jobs: int = typer.Option(0, help="Specify this to launch that number of dependent jobs"), mount_paths: str = typer.Option(None, help="Comma separated list of paths to mount on the remote machine"), @@ -296,7 +365,18 @@ def generate( None, help="Internal option to specify task dependencies.", hidden=True ), ): - """Generate LLM completions for a given input file. + """Generate LLM completions for single or multiple models. + + Supports both single-model and multi-model generation through a unified interface. + + Parameter Types: + Multi-model parameters (model, server_*, etc.) use List[T] type hints for Typer CLI + compatibility, but accept both scalars and lists when called from Python: + - CLI: --model m1 m2 (space-separated) → Typer converts to ["m1", "m2"] + - Python API: model="m1" or model=["m1", "m2"] → Both work (normalized internally) + - Single values broadcast to all models: server_gpus=8 → [8, 8, 8] for 3 models + + Multi-model usage requires either --generation-type or --generation-module. Run `python -m nemo_skills.inference.generate --help` for other supported arguments (need to be prefixed with ++, since we use Hydra for that script). @@ -306,10 +386,42 @@ def generate( LOG.info("Starting generation job") LOG.info("Extra arguments that will be passed to the underlying script: %s", extra_arguments) - try: - server_type = server_type.value - except AttributeError: - pass + # Normalize model configuration to list + models_list = pipeline_utils.normalize_models_config(model) + num_models = len(models_list) + + LOG.info(f"Number of models: {num_models}") + for model_idx, model_name in enumerate(models_list): + LOG.info(f" Model {model_idx}: {model_name}") + + # Convert server_type enum values to strings + def convert_server_type_to_string(server_type): + return server_type.value if hasattr(server_type, "value") else server_type + + if isinstance(server_type, list): + server_type = [convert_server_type_to_string(st) for st in server_type] + else: + server_type = convert_server_type_to_string(server_type) + + # Normalize all server parameters to per-model lists + server_types_list = pipeline_utils.normalize_parameter(server_type, num_models, "server_type") + server_gpus_list = pipeline_utils.normalize_parameter(server_gpus, num_models, "server_gpus") + server_nodes_list = pipeline_utils.normalize_parameter(server_nodes, num_models, "server_nodes") + server_args_list = pipeline_utils.normalize_parameter(server_args, num_models, "server_args") + server_entrypoints_list = pipeline_utils.normalize_parameter(server_entrypoint, num_models, "server_entrypoint") + server_containers_list = pipeline_utils.normalize_parameter(server_container, num_models, "server_container") + + if server_address is not None: + server_addresses_list = pipeline_utils.normalize_parameter(server_address, num_models, "server_address") + else: + server_addresses_list = [None] * num_models + + # Validate multi-model requirements + if num_models > 1: + if generation_type is None and generation_module is None: + raise ValueError( + "Multi-model generation requires either --generation-type or --generation-module to be specified" + ) if log_samples: wandb_parameters = { @@ -325,8 +437,6 @@ def generate( else: wandb_parameters = None - get_random_port = pipeline_utils.should_get_random_port(server_gpus, exclusive) - if random_seeds and num_random_seeds: raise ValueError("Cannot specify both random_seeds and num_random_seeds") if num_random_seeds: @@ -355,8 +465,6 @@ def generate( check_mounted_paths=check_mounted_paths, ) - original_server_address = server_address - if generation_module is not None and generation_type is not None: raise ValueError("Cannot specify both generation_module and generation_type. ") if generation_module is None: @@ -407,36 +515,36 @@ def generate( chunk_id=None, ) for chunk_id in chunk_ids: - # Configure client (same as before) - server_config, server_address, extra_arguments = pipeline_utils.configure_client( - model=model, - server_type=server_type, - server_address=original_server_address, - server_gpus=server_gpus, - server_nodes=server_nodes, - server_args=server_args, - server_entrypoint=server_entrypoint, - server_container=server_container, - extra_arguments=extra_arguments_original, - get_random_port=get_random_port, - ) + # Configure clients for each model + server_configs = [] + server_addresses_resolved = [] + # For single model: configure_client returns extra_args with server config appended + # For multi-model: use original extra_args (server config added as lists in get_generation_cmd) + extra_arguments = extra_arguments_original + + for model_idx in range(num_models): + get_random_port_for_server = pipeline_utils.should_get_random_port( + server_gpus_list[model_idx], exclusive + ) - # Build generation command (same as before) - cmd = pipeline_utils.get_generation_cmd( - input_file=input_file, - input_dir=input_dir, - random_seed=seed, - output_dir=output_dir, - extra_arguments=extra_arguments, - chunk_id=chunk_id, - num_chunks=num_chunks, - preprocess_cmd=preprocess_cmd, - postprocess_cmd=postprocess_cmd, - wandb_parameters=wandb_parameters if seed_idx == 0 else None, - script=generation_module, - with_sandbox=with_sandbox, - ) - cmd = pipeline_utils.wrap_python_path(cmd=cmd) + srv_config, srv_address, srv_extra_args = pipeline_utils.configure_client( + model=models_list[model_idx], + server_type=server_types_list[model_idx], + server_address=server_addresses_list[model_idx], + server_gpus=server_gpus_list[model_idx], + server_nodes=server_nodes_list[model_idx], + server_args=server_args_list[model_idx], + server_entrypoint=server_entrypoints_list[model_idx], + server_container=server_containers_list[model_idx], + extra_arguments=extra_arguments_original if model_idx == 0 else "", + get_random_port=get_random_port_for_server, + ) + server_configs.append(srv_config) + server_addresses_resolved.append(srv_address) + + # For single model, capture the extra_args with server config from configure_client + if model_idx == 0 and num_models == 1: + extra_arguments = srv_extra_args # Base task name (shared across all dependent jobs in the chain) task_name = f"{expname}-rs{seed}" if seed is not None else expname @@ -448,22 +556,35 @@ def generate( prev_job = None for dep_idx in range(dependent_jobs + 1): - # Allocate sandbox port if needed - # This must be done BEFORE creating CommandGroup so client knows the port - if with_sandbox: - current_sandbox_port = get_free_port(strategy="random") if get_random_port else 6000 - else: - current_sandbox_port = None + # Build generation parameters dict for Script + generation_params = { + "output_dir": output_dir, + "input_file": input_file, + "input_dir": input_dir, + "extra_arguments": extra_arguments, + "random_seed": seed, + "chunk_id": chunk_id, + "num_chunks": num_chunks, + "preprocess_cmd": preprocess_cmd, + "postprocess_cmd": postprocess_cmd, + "wandb_parameters": wandb_parameters if seed_idx == 0 else None, + "script": generation_module, + # Multi-model specific fields + "server_addresses_prehosted": server_addresses_resolved, + "model_names": models_list, + "server_types": server_types_list, + } - # Create CommandGroup for this task - cmd_group = _create_commandgroup_from_config( - generation_cmd=cmd, - server_config=server_config.copy() if server_config else None, - with_sandbox=with_sandbox, - sandbox_port=current_sandbox_port, + # Create CommandGroup(s) using Script objects + # For multi-model, this creates multiple CommandGroups (one per model + one for client) + # For single-model, this creates a single CommandGroup + job_groups = _create_job_unified( + models=models_list, + server_configs=[cfg.copy() if cfg else None for cfg in server_configs], + generation_params=generation_params, cluster_config=cluster_config, installation_command=installation_command, - get_server_command_fn=generation_task.get_server_command_fn(), + with_sandbox=with_sandbox, partition=partition, keep_mounts_for_sandbox=keep_mounts_for_sandbox, task_name=task_name, @@ -487,11 +608,16 @@ def generate( # Subsequent jobs in chain depend on previous job (use job object, not string) job_deps = [prev_job] + # For multi-group jobs, use "groups" key; for single-group, use "group" key job_spec = { "name": internal_job_name, - "group": cmd_group, "dependencies": job_deps, } + if len(job_groups) > 1: + job_spec["groups"] = job_groups + else: + job_spec["group"] = job_groups[0] + jobs.append(job_spec) prev_job = job_spec # Track for next iteration diff --git a/nemo_skills/pipeline/nemo_evaluator.py b/nemo_skills/pipeline/nemo_evaluator.py index 020162692a..39838737ed 100644 --- a/nemo_skills/pipeline/nemo_evaluator.py +++ b/nemo_skills/pipeline/nemo_evaluator.py @@ -89,7 +89,7 @@ import copy import logging -from dataclasses import dataclass +from dataclasses import dataclass, field from pathlib import Path from typing import Dict, List, Optional @@ -97,12 +97,12 @@ from nemo_evaluator_launcher.api import RunConfig from nemo_evaluator_launcher.common.helpers import get_eval_factory_command from nemo_evaluator_launcher.common.mapping import get_task_from_mapping, load_tasks_mapping -from omegaconf import DictConfig, OmegaConf +from omegaconf import OmegaConf import nemo_skills.pipeline.utils as pipeline_utils from nemo_skills.pipeline.app import app, typer_unpacker -from nemo_skills.pipeline.utils.commands import vllm_server_command from nemo_skills.pipeline.utils.declarative import Command, CommandGroup, HardwareConfig, Pipeline +from nemo_skills.pipeline.utils.scripts import BaseJobScript, ServerScript from nemo_skills.utils import get_logger_name, setup_logging LOG = logging.getLogger(get_logger_name(__file__)) @@ -289,8 +289,8 @@ def nemo_evaluator( expname=expname, idx=idx, task_name=task.name, - launcher_run_cfg=launcher_run_cfg, - task_cfg=task, + launcher_run_cfg=OmegaConf.to_container(launcher_run_cfg, resolve=True), + task_cfg=OmegaConf.to_container(task, resolve=True), task_definition=task_definition, base_output_root=base_output_root, eval_image=eval_image, @@ -443,10 +443,8 @@ def _create_serving_command_obj( idx: int, task_name: str, ) -> Command: - """Create a Command object for a hosted serving component (main or judge server). + """Create a `Command` backed by a `ServerScript` for a hosted serving component. - This function wraps vllm_server_command and standardizes container selection, - logging prefixes, and metadata for both main and judge servers. Args: cluster_config: Cluster configuration dictionary @@ -464,54 +462,53 @@ def _create_serving_command_obj( task_name: Task name for naming Returns: - Command object configured for the serving component + Command: A Command object whose `script` is a configured `ServerScript`. """ stype = (server_type or "vllm").lower() - sargs = args or "" if stype != "vllm": LOG.warning("Only vllm server_type is supported currently; got %s", stype) - cmd_str, meta = vllm_server_command( + server_script = ServerScript( + server_type=stype, + model_path=model or "", cluster_config=cluster_config, - model=model, # type: ignore[arg-type] + num_gpus=gpus, + num_nodes=nodes or 1, + server_args=args or "", + server_entrypoint=entrypoint, port=port, - server_type=stype, - gpus=gpus, - nodes=nodes, - args=sargs, - entrypoint=entrypoint, + allocate_port=port is None, ) - # Resolve container fallback when not explicitly provided + # Judge servers get a distinct log prefix for clarity + if is_judge: + server_script.log_prefix = "judge-server" + if not container: container = cluster_config["containers"][stype] - log_prefix = "judge-server" if is_judge else "server" name_role = "judge-server" if is_judge else "server" return Command( - command=cmd_str, + script=server_script, container=container, - gpus=gpus, - nodes=nodes or 1, name=f"{expname}-{name_role}-{idx}-{task_name}", - metadata={ - **meta, - "gpus": gpus, - "log_prefix": log_prefix, - }, ) @dataclass class _TaskCreationContext: - """Local helper to pass around the information about the task and easier logic sharing.""" + """Local helper to pass around the information about the task and easier logic sharing. + + Note: launcher_run_cfg and task_cfg are stored as plain dicts (not OmegaConf) to allow + serialization by nemo_run/fiddle. Convert back to DictConfig if OmegaConf operations are needed. + """ expname: str idx: int task_name: str - launcher_run_cfg: RunConfig - task_cfg: DictConfig + launcher_run_cfg: dict # Stored as plain dict for serialization compatibility + task_cfg: dict # Stored as plain dict for serialization compatibility task_definition: dict base_output_root: Optional[str] eval_image: str @@ -630,12 +627,7 @@ def _build_judge_server_if_needed(ctx: _TaskCreationContext) -> Optional[Command def _build_client_command( ctx: _TaskCreationContext, main_server_cmd: Optional[Command], judge_server_cmd: Optional[Command] ) -> Command: - """Build Command for evaluator client. - - The client command behavior depends on server hosting: - - If servers are co-hosted: Uses lambda factory to resolve runtime URLs via hostname_ref/meta_ref - - If using external servers: Uses static URLs from server_base_url/judge_server_base_url - - If no servers: Uses URLs from evaluator config or defaults + """Create the evaluator client `Command` using `EvaluatorClientScript`. Args: ctx: Task creation context with all configuration @@ -643,100 +635,26 @@ def _build_client_command( judge_server_cmd: Judge server Command if self-hosted, None otherwise Returns: - Command object for evaluator client + Command: A Command whose script builds the evaluator CLI at runtime """ - if ctx.hosting_server or ctx.hosting_judge: - # Co-hosted servers: Use lambda factory to resolve runtime URLs - # The lambda is evaluated at execution time when het_group_index is assigned - def _client_cmd_factory(): - waits: List[str] = [] - target_url: Optional[str] = None - judge_url: Optional[str] = None - # Build main server URL from runtime references - if ctx.hosting_server and main_server_cmd is not None: - server_host = main_server_cmd.hostname_ref() - server_port_val = main_server_cmd.meta_ref("port") - base_url = f"http://{server_host}:{server_port_val}" - waits.append(pipeline_utils.get_server_wait_cmd(f"{base_url}{ctx.server_health_path}")) - target_url = f"{base_url}{ctx.server_api_path}" - - # Build judge server URL from runtime references - if ctx.hosting_judge and judge_server_cmd is not None: - jhost = judge_server_cmd.hostname_ref() - jport = judge_server_cmd.meta_ref("port") - jbase = f"http://{jhost}:{jport}" - waits.append(pipeline_utils.get_server_wait_cmd(f"{jbase}{ctx.judge_server_health_path}")) - judge_url = f"{jbase}{ctx.judge_server_api_path}" - - # Wait for servers to be ready, then run evaluator - wait_cmd = " && ".join(waits) if waits else "true" - cmd = _build_task_cmd( - task_name=ctx.task_name, - launcher_run_cfg=ctx.launcher_run_cfg, - task_cfg=ctx.task_cfg, - task_definition=ctx.task_definition, - expname=ctx.expname, - base_output_root=ctx.base_output_root, - url_override=target_url, - model_id=ctx.server_model, - judge_url_override=judge_url, - judge_model_id=ctx.judge_server_model, - ) - return f"{wait_cmd} && {cmd}" - - return Command( - command=_client_cmd_factory, - container=ctx.eval_image, - gpus=ctx.job_gpus or None, - nodes=ctx.job_nodes or 1, - name=f"{ctx.expname}-client-{ctx.idx}-{ctx.task_name}", - metadata={ - "log_prefix": "main", - "environment": ctx.env_vars, - "gpus": ctx.job_gpus or None, - }, - ) - - # No hosted servers: Use external URLs or config defaults - server_url = None - if ctx.with_external_server and ctx.server_base_url: - server_url = ctx.server_base_url.rstrip("/") + ctx.server_api_path - judge_url = None - if ctx.with_external_judge and ctx.judge_server_base_url: - judge_url = ctx.judge_server_base_url.rstrip("/") + ctx.judge_server_api_path - - eval_cmd = _build_task_cmd( - task_name=ctx.task_name, - launcher_run_cfg=ctx.launcher_run_cfg, - task_cfg=ctx.task_cfg, - task_definition=ctx.task_definition, - expname=ctx.expname, - base_output_root=ctx.base_output_root, - url_override=server_url, - model_id=ctx.server_model, - judge_url_override=judge_url, - judge_model_id=ctx.judge_server_model, + client_script = EvaluatorClientScript( + ctx=ctx, + main_server_script=main_server_cmd.script if main_server_cmd else None, + judge_server_script=judge_server_cmd.script if judge_server_cmd else None, ) return Command( - command=eval_cmd, + script=client_script, container=ctx.eval_image, - gpus=None, - nodes=ctx.job_nodes or 1, - name=f"{ctx.expname}-{ctx.idx}-{ctx.task_name}", - metadata={ - "log_prefix": "main", - "environment": ctx.env_vars, - "gpus": ctx.job_gpus or None, - }, + name=f"{ctx.expname}-client-{ctx.idx}-{ctx.task_name}", ) def _build_task_cmd( task_name: str, - launcher_run_cfg: DictConfig, - task_cfg: DictConfig, + launcher_run_cfg: dict, + task_cfg: dict, task_definition: dict, expname: str, base_output_root: Optional[str], @@ -752,8 +670,8 @@ def _build_task_cmd( Args: task_name: Task identifier (e.g., "ifeval", "gpqa_diamond") - launcher_run_cfg: Global evaluator configuration from RunConfig - task_cfg: Task-specific configuration (may include task-level overrides) + launcher_run_cfg: Global evaluator configuration (as plain dict) + task_cfg: Task-specific configuration (as plain dict, may include task-level overrides) task_definition: Task definition from mapping (container, harness info) expname: Experiment name for output directory structure base_output_root: Base directory for task outputs @@ -771,7 +689,9 @@ def _build_task_cmd( - Judge: config.params.extra.judge.url Output directory is set to: {base_output_root}/{expname}/nemo-evaluator-results/{task_name} """ - task_cfg_copy = copy.deepcopy(task_cfg) + # Convert back to DictConfig for OmegaConf operations + launcher_run_cfg = OmegaConf.create(launcher_run_cfg) + task_cfg_copy = OmegaConf.create(copy.deepcopy(task_cfg)) if url_override: OmegaConf.update(task_cfg_copy, "overrides", {"target.api_endpoint.url": url_override}, force_add=True) @@ -806,3 +726,56 @@ def _build_task_cmd( cmd_struct = get_eval_factory_command(launcher_run_cfg, task_cfg_copy, task_definition) return cmd_struct.cmd + + +@dataclass(kw_only=True) +class EvaluatorClientScript(BaseJobScript): + """run.Script implementation for nemo-evaluator client with runtime server resolution.""" + + ctx: _TaskCreationContext + main_server_script: Optional[ServerScript] = None + judge_server_script: Optional[ServerScript] = None + log_prefix: str = field(default="main", init=False) + + def __post_init__(self): + def build_command(): + waits: List[str] = [] + target_url: Optional[str] = None + judge_url: Optional[str] = None + + if self.ctx.hosting_server and self.main_server_script is not None: + server_host = self.main_server_script.hostname_ref() + base_url = f"http://{server_host}:{self.main_server_script.port}" + waits.append(pipeline_utils.get_server_wait_cmd(f"{base_url}{self.ctx.server_health_path}")) + target_url = f"{base_url}{self.ctx.server_api_path}" + elif self.ctx.with_external_server and self.ctx.server_base_url: + target_url = self.ctx.server_base_url.rstrip("/") + self.ctx.server_api_path + + if self.ctx.hosting_judge and self.judge_server_script is not None: + judge_host = self.judge_server_script.hostname_ref() + judge_base = f"http://{judge_host}:{self.judge_server_script.port}" + waits.append(pipeline_utils.get_server_wait_cmd(f"{judge_base}{self.ctx.judge_server_health_path}")) + judge_url = f"{judge_base}{self.ctx.judge_server_api_path}" + elif self.ctx.with_external_judge and self.ctx.judge_server_base_url: + judge_url = self.ctx.judge_server_base_url.rstrip("/") + self.ctx.judge_server_api_path + + cmd = _build_task_cmd( + task_name=self.ctx.task_name, + launcher_run_cfg=self.ctx.launcher_run_cfg, + task_cfg=self.ctx.task_cfg, + task_definition=self.ctx.task_definition, + expname=self.ctx.expname, + base_output_root=self.ctx.base_output_root, + url_override=target_url, + model_id=self.ctx.server_model, + judge_url_override=judge_url, + judge_model_id=self.ctx.judge_server_model, + ) + + wait_cmd = " && ".join(waits) if waits else None + final_cmd = f"{wait_cmd} && {cmd}" if wait_cmd else cmd + env_vars = copy.deepcopy(self.ctx.env_vars) + return final_cmd, {"environment": env_vars} + + self.set_inline(build_command) + super().__post_init__() diff --git a/nemo_skills/pipeline/utils/__init__.py b/nemo_skills/pipeline/utils/__init__.py index 1e470f3539..3e738a530f 100644 --- a/nemo_skills/pipeline/utils/__init__.py +++ b/nemo_skills/pipeline/utils/__init__.py @@ -49,6 +49,8 @@ get_chunked_rs_filename, get_generation_cmd, get_remaining_jobs, + normalize_models_config, + normalize_parameter, wrap_cmd, ) from nemo_skills.pipeline.utils.mounts import ( diff --git a/nemo_skills/pipeline/utils/declarative.py b/nemo_skills/pipeline/utils/declarative.py index e294a3ed82..7029dcc638 100644 --- a/nemo_skills/pipeline/utils/declarative.py +++ b/nemo_skills/pipeline/utils/declarative.py @@ -12,39 +12,71 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + +import logging +from contextlib import nullcontext +from dataclasses import dataclass +from typing import Dict, List, Optional, Tuple, Union + +import nemo_run as run + +from nemo_skills.pipeline.utils import ( + get_env_variables, + get_executor, + get_exp, + get_exp_handles, + get_registered_external_repo, + get_tunnel, + run_exp, + temporary_env_update, +) +from nemo_skills.pipeline.utils.exp import ( + REUSE_CODE_EXP, + get_packaging_job_key, + tunnel_hash, +) +from nemo_skills.pipeline.utils.mounts import is_mounted_filepath +from nemo_skills.pipeline.utils.server import wrap_python_path +from nemo_skills.utils import get_logger_name + """ -Simplified declarative pipeline system using only Command for all task types. +Simplified declarative pipeline system using Command with run.Script objects. Basic Example (Single job with multiple commands): - from nemo_skills.pipeline.utils.commands import vllm_server_command, sandbox_command + from nemo_skills.pipeline.utils.scripts import ServerScript, SandboxScript, GenerationClientScript from nemo_skills.pipeline.utils.declarative import Command, CommandGroup, HardwareConfig, Pipeline - from nemo_skills.pipeline.utils.server import get_free_port - - # Allocate ports for server and sandbox - server_port = get_free_port(strategy="random") - sandbox_port = get_free_port(strategy="random") - - # Commands that run together in one SLURM job - # Note: Lambdas are needed for cross-component references (hostname_ref, meta_ref) - # which aren't resolved until het_group_index is assigned at pipeline execution time. - server_cmd, server_meta = vllm_server_command(cluster_cfg, model="Qwen/Qwen3-8B", port=server_port) - server = Command(command=server_cmd, gpus=8, name="server", metadata=server_meta) - - sandbox_cmd, sandbox_meta = sandbox_command(cluster_cfg, port=sandbox_port) - sandbox = Command(command=sandbox_cmd, name="sandbox", metadata=sandbox_meta) - - # This lambda is ESSENTIAL - server.hostname_ref() and meta_ref() aren't available until runtime - # Client needs NEMO_SKILLS_SANDBOX_PORT to connect to sandbox - client = Command( - command=lambda: f"curl {server.hostname_ref()}:{server.meta_ref('port')}/health", - name="client", - metadata={"environment": {"NEMO_SKILLS_SANDBOX_PORT": str(sandbox_port)}} + + # Create Script objects for server and sandbox + # Scripts handle port allocation, cross-component references, and command building + server_script = ServerScript( + server_type="vllm", + model_path="Qwen/Qwen2.5-Math-7B-Instruct", + server_args="--tensor-parallel-size 1" ) + sandbox_script = SandboxScript() + + # Create generation client that references server and sandbox + # Cross-component references (hostname_ref, port) are resolved at runtime + client_script = GenerationClientScript( + output_dir="/results/inference", + extra_arguments="++prompt_config=math ++split=test", + servers=[server_script], # References server for hostname/port + model_names=["Qwen/Qwen2.5-Math-7B-Instruct"], + server_types=["vllm"], + sandbox=sandbox_script, # References sandbox for port + with_sandbox=True, + ) + + # Wrap Scripts in Commands with container and resource info + server = Command(script=server_script, container="vllm", name="server") + sandbox = Command(script=sandbox_script, container="nemo-skills", name="sandbox") + client = Command(script=client_script, container="nemo-skills", name="client") - # Group them together + # Group them together (they run in one SLURM job) inference_group = CommandGroup( commands=[server, sandbox, client], - hardware=HardwareConfig(partition="batch"), + hardware=HardwareConfig(partition="batch", num_gpus=1), name="inference" ) @@ -57,13 +89,27 @@ pipeline.run() Advanced Example (Multiple jobs with dependencies and heterogeneous components): + from nemo_skills.pipeline.utils.scripts import ServerScript, SandboxScript, GenerationClientScript + from nemo_run import Script + log_dir = "/experiments/full_pipeline/logs" - # Job 1: Preprocessing - preprocess = Command( - command="python preprocess.py --input data.jsonl --output processed.jsonl", - gpus=0, - name="preprocess" + + # Job 1: Preprocessing with custom Script + @dataclass(kw_only=True) + class PreprocessScript(Script): + input_file: str + output_file: str + + def __post_init__(self): + cmd = f"python preprocess.py --input {self.input_file} --output {self.output_file}" + self.inline = cmd + object.__setattr__(self, 'entrypoint', 'bash') + + preprocess_script = PreprocessScript( + input_file="data.jsonl", + output_file="processed.jsonl" ) + preprocess = Command(script=preprocess_script, name="preprocess") prep_group = CommandGroup( commands=[preprocess], hardware=HardwareConfig(partition="cpu"), @@ -72,39 +118,76 @@ ) prep_job = {"name": "prep", "group": prep_group} - # Job 2: Two different model servers (HETEROGENEOUS SLURM job with 2 het components) - # Allocate ports for each server/sandbox pair - from nemo_skills.pipeline.utils.server import get_free_port - server_8b_port = get_free_port(strategy="random") - sandbox_8b_port = get_free_port(strategy="random") - server_32b_port = get_free_port(strategy="random") - sandbox_32b_port = get_free_port(strategy="random") - - # Build commands with cluster_config - server_8b_cmd, server_8b_meta = vllm_server_command(cluster_config, model="Qwen/Qwen3-8B", port=server_8b_port) - sandbox_8b_cmd, sandbox_8b_meta = sandbox_command(cluster_config, port=sandbox_8b_port) - server_32b_cmd, server_32b_meta = vllm_server_command(cluster_config, model="Qwen/Qwen3-32B", port=server_32b_port) - sandbox_32b_cmd, sandbox_32b_meta = sandbox_command(cluster_config, port=sandbox_32b_port) + # Job 2: Two different model servers (HETEROGENEOUS SLURM job with 2 het groups) + # 8B model group + server_8b = ServerScript( + server_type="vllm", + model_path="Qwen/Qwen2.5-Math-7B-Instruct", + server_args="--tensor-parallel-size 1" + ) + sandbox_8b = SandboxScript() + client_8b = GenerationClientScript( + output_dir="/results/eval_8b", + extra_arguments="++prompt_config=math", + servers=[server_8b], + model_names=["Qwen/Qwen2.5-Math-7B-Instruct"], + server_types=["vllm"], + sandbox=sandbox_8b, + with_sandbox=True, + ) - server_8b = Command(command=server_8b_cmd, gpus=8, name="server_8b", metadata=server_8b_meta) - sandbox_8b = Command(command=sandbox_8b_cmd, name="sandbox_8b", metadata=sandbox_8b_meta) - eval_8b = Command(command="python eval.py --model 8b", gpus=1, name="eval_8b") + group_8b = CommandGroup( + commands=[ + Command(script=server_8b, container="vllm", name="server_8b"), + Command(script=sandbox_8b, container="nemo-skills", name="sandbox_8b"), + Command(script=client_8b, container="nemo-skills", name="eval_8b"), + ], + hardware=HardwareConfig(partition="batch", num_gpus=1), + name="eval_8b", + log_dir=log_dir + ) - server_32b = Command(command=server_32b_cmd, gpus=8, name="server_32b", metadata=server_32b_meta) - sandbox_32b = Command(command=sandbox_32b_cmd, name="sandbox_32b", metadata=sandbox_32b_meta) - eval_32b = Command(command="python eval.py --model 32b", gpus=1, name="eval_32b") + # 32B model group + server_32b = ServerScript( + server_type="vllm", + model_path="Qwen/Qwen2.5-Math-32B-Instruct", + server_args="--tensor-parallel-size 4" + ) + sandbox_32b = SandboxScript() + client_32b = GenerationClientScript( + output_dir="/results/eval_32b", + extra_arguments="++prompt_config=math", + servers=[server_32b], + model_names=["Qwen/Qwen2.5-Math-32B-Instruct"], + server_types=["vllm"], + sandbox=sandbox_32b, + with_sandbox=True, + ) - group_8b = CommandGroup(commands=[server_8b, sandbox_8b, eval_8b], name="eval_8b", log_dir=log_dir) - group_32b = CommandGroup(commands=[server_32b, sandbox_32b, eval_32b], name="eval_32b", log_dir=log_dir) + group_32b = CommandGroup( + commands=[ + Command(script=server_32b, container="vllm", name="server_32b"), + Command(script=sandbox_32b, container="nemo-skills", name="sandbox_32b"), + Command(script=client_32b, container="nemo-skills", name="eval_32b"), + ], + hardware=HardwareConfig(partition="batch", num_gpus=4), + name="eval_32b", + log_dir=log_dir + ) evals_job = {"name": "evals", "groups": [group_8b, group_32b], "dependencies": [prep_job]} # Job 3: Report generation (depends on both evaluations) - report = Command( - command="python generate_report.py --output report.txt", - gpus=0, - name="report" - ) + @dataclass(kw_only=True) + class ReportScript(Script): + output_file: str + + def __post_init__(self): + self.inline = f"python generate_report.py --output {self.output_file}" + object.__setattr__(self, 'entrypoint', 'bash') + + report_script = ReportScript(output_file="report.txt") + report = Command(script=report_script, name="report") report_group = CommandGroup(commands=[report], name="report", log_dir=log_dir) # Create pipeline with dependency graph @@ -121,130 +204,56 @@ pipeline.run() """ -import logging -import shlex -from contextlib import nullcontext -from dataclasses import dataclass, field -from typing import Callable, Dict, List, Optional, Tuple, Union - -import nemo_run as run - -from nemo_skills.pipeline.utils import ( - get_env_variables, - get_executor, - get_exp, - get_exp_handles, - get_tunnel, - run_exp, - temporary_env_update, -) -from nemo_skills.pipeline.utils.commands import wrap_command -from nemo_skills.pipeline.utils.exp import ( - REUSE_CODE_EXP, - get_packaging_job_key, - install_packages_wrap, - tunnel_hash, -) -from nemo_skills.pipeline.utils.mounts import is_mounted_filepath -from nemo_skills.pipeline.utils.packager import get_registered_external_repo -from nemo_skills.utils import get_logger_name - LOG = logging.getLogger(get_logger_name(__file__)) @dataclass class Command: - """Declarative command for running tasks in containers. - - The command can be either: - - A string: evaluated immediately - - A callable (lambda): evaluated lazily when the task is prepared + """Declarative command for running tasks in containers using run.Script objects. - Lambdas are ONLY needed for cross-component references (hostname_ref, meta_ref). - The het_group_index isn't assigned until pipeline execution, so these must be lazy: - # Lambda is ESSENTIAL here - server.hostname_ref() and meta_ref() don't exist yet - client = Command(command=lambda: f"curl {server.hostname_ref()}:{server.meta_ref('port')}") + Example: + server = ServerScript(server_type="vllm", model_path="/models/llama", ...) + Command(script=server, container="vllm", name="my_server") """ - # Command can be a string or callable (lambda). - # Lambdas are primarily used for cross-component references (hostname_ref, meta_ref). - command: Union[str, Callable] + script: run.Script container: str = "nemo-skills" - gpus: Optional[int] = None - nodes: int = 1 name: str = "command" - working_dir: str = "/nemo_run/code" - env_vars: Dict[str, str] = field(default_factory=dict) - installation_command: Optional[str] = None - port: Optional[int] = None # Can be set from metadata - metadata: Dict[str, any] = field(default_factory=dict) # Stores metadata from command builders - het_group_index: Optional[int] = None # Set per-job by Pipeline (not global) - - def __post_init__(self): - # Wrap plain strings with environment setup - if isinstance(self.command, str) and (self.env_vars or self.working_dir): - self.command = wrap_command(self.command, self.working_dir, self.env_vars) - - def hostname_ref(self) -> str: - """Get hostname reference for hetjob cross-component communication.""" - if self.het_group_index is None: - return "127.0.0.1" # Local fallback - # For heterogeneous SLURM jobs, resolve nodelist to actual hostname - return f"$(scontrol show hostnames $SLURM_JOB_NODELIST_HET_GROUP_{self.het_group_index} | head -n1)" - - def meta_ref(self, key: str) -> str: - """Get metadata value (like port). Fails if key not found.""" - if key not in self.metadata: - raise KeyError( - f"Metadata key '{key}' not found in Command '{self.name}'. " - f"Available keys: {list(self.metadata.keys())}" - ) - return str(self.metadata[key]) - def prepare_for_execution(self, cluster_config: Dict) -> Tuple[str, Dict]: - """Prepare command for execution. + def prepare_for_execution(self, cluster_config: Dict) -> Tuple[run.Script, Dict]: + """Prepare script for execution. This method: - 1. Evaluates callables (resolves cross-component references) - 2. Wraps with installation_command if provided + 1. Evaluates lazy commands (if script.inline is callable) + 2. Builds execution config from Script fields Returns: - Tuple of (final_command, execution_config) + Tuple of (Script_object, execution_config) """ - # 1. Evaluate if callable (for cross-component references like hostname_ref) - if callable(self.command): - result = self.command() + runtime_metadata = {} + + # If script.inline is callable (lazy command building), evaluate it now + if callable(self.script.inline): + result = self.script.inline() if isinstance(result, tuple): - final_command, runtime_metadata = result - # Deep merge metadata, especially environment dict - for key, value in runtime_metadata.items(): - if key == "environment" and key in self.metadata: - # Merge environment dicts instead of replacing - self.metadata[key].update(value) - else: - self.metadata[key] = value + evaluated_command, runtime_metadata = result else: - final_command = result - else: - final_command = self.command + evaluated_command = result - # 2. Wrap with installation_command if provided - if self.installation_command: - final_command = install_packages_wrap(final_command, self.installation_command) + # Update script.inline with evaluated command + self.script.set_inline(evaluated_command) - # 3. Build execution config from metadata + # Build execution config from Script fields execution_config = { - "num_tasks": self.metadata.get("num_tasks", 1), - "num_gpus": self.metadata.get("gpus", self.gpus or 0), - "num_nodes": self.metadata.get("nodes", self.nodes), - "environment": self.metadata.get("environment", {}), - "log_prefix": self.metadata.get("log_prefix", "main"), - "mounts": self.metadata.get("mounts"), - "container": self.metadata.get("container", self.container), # Use container from metadata if available + "log_prefix": getattr(self.script, "log_prefix", "main"), + "environment": runtime_metadata.get("environment", {}), + "mounts": None, # Mounts not currently exposed by Scripts + "container": self.container, } - return final_command, execution_config + # Return the Script object itself + return self.script, execution_config def get_name(self) -> str: return self.name @@ -257,6 +266,7 @@ class HardwareConfig: partition: Optional[str] = None num_gpus: Optional[int] = None num_nodes: Optional[int] = None + num_tasks: Optional[int] = 1 sbatch_kwargs: Optional[dict] = None @@ -482,16 +492,49 @@ def run(self, dry_run: bool = False, log_dir: Optional[str] = None, _reuse_exp=N return exp - def _prepare_command(self, command, cluster_config: Dict) -> Tuple[str, Dict]: - """Prepare command and handle mpirun wrapping.""" - final_cmd, exec_config = command.prepare_for_execution(cluster_config) - - # Handle mpirun wrapping for non-SLURM executors - num_tasks = exec_config["num_tasks"] - if cluster_config["executor"] != "slurm" and num_tasks > 1: - final_cmd = f"mpirun --allow-run-as-root -np {num_tasks} bash -c {shlex.quote(final_cmd)}" + def _prepare_command(self, command, cluster_config: Dict) -> Tuple[run.Script, Dict]: + """Prepare command for execution. - return final_cmd, exec_config + Returns: + Tuple of (Script_object, exec_config) + """ + script, exec_config = command.prepare_for_execution(cluster_config) + # Only rewrite paths for "none" executor (native execution without containers) + # For "local" executor (Docker), paths should stay as /nemo_run/code/... since + # that's where the code is mounted inside the container + if cluster_config.get("executor") == "none": + script = self._rewrite_local_paths(script) + # Note: mpirun wrapping for multi-task scripts is handled by the executor + return script, exec_config + + def _rewrite_local_paths(self, script: run.Script) -> run.Script: + """For executor='none', replace /nemo_run/code paths with local repo paths.""" + nemo_repo = get_registered_external_repo("nemo_skills") + if nemo_repo is None: + return script + + pkg_path = str(nemo_repo.path) + repo_root = str(nemo_repo.path.parent) + + def _replace(cmd: str) -> str: + return cmd.replace("/nemo_run/code/nemo_skills", pkg_path).replace("/nemo_run/code", repo_root) + + inline_cmd = script.inline + if isinstance(inline_cmd, str): + script.set_inline(_replace(inline_cmd)) + elif callable(inline_cmd): + original_inline = inline_cmd + + def wrapped_inline(): + result = original_inline() + if isinstance(result, tuple): + cmd, metadata = result + return _replace(cmd), metadata + return _replace(result) + + script.set_inline(wrapped_inline) + + return script def _resolve_container(self, exec_config: Dict, command, cluster_config: Dict) -> str: """Resolve container name to image path.""" @@ -513,6 +556,7 @@ def _create_executor( total_het_groups: int, overlap: bool, dependencies: Optional[List] = None, + job_name_override: Optional[str] = None, ): """Create executor with optional environment update.""" env_context = ( @@ -521,14 +565,23 @@ def _create_executor( else nullcontext() ) + # Check if the script should span all nodes from the group's HardwareConfig. + # Scripts with span_group_nodes=True (e.g., ServerScript) use the group's num_nodes. + # Scripts with span_group_nodes=False (default) run on 1 node - important for multi-node + # setups with --overlap where client/sandbox should only run on the master node. + span_group_nodes = getattr(command.script, "span_group_nodes", False) + num_nodes = 1 + if span_group_nodes and hardware and hardware.num_nodes is not None: + num_nodes = hardware.num_nodes + with env_context: return get_executor( cluster_config=cluster_config, container=container_image, - num_nodes=exec_config["num_nodes"], - tasks_per_node=exec_config["num_tasks"], - gpus_per_node=exec_config["num_gpus"], - job_name=command.name, + num_nodes=num_nodes, + tasks_per_node=hardware.num_tasks if hardware and hardware.num_tasks is not None else 1, + gpus_per_node=hardware.num_gpus if hardware and hardware.num_gpus is not None else 0, + job_name=job_name_override if job_name_override else command.name, log_dir=log_dir, log_prefix=exec_config["log_prefix"], partition=hardware.partition if hardware else None, @@ -567,81 +620,105 @@ def _plan_and_add_job( if log_dir is None: raise ValueError(f"CommandGroup '{groups[0].name}' must have log_dir set, or provide it to pipeline.run()") - commands: List[str] = [] + scripts: List[run.Script] = [] executors: List = [] het_group_indices: List[int] = [] - # In heterogeneous jobs, collect environment from all commands for cross-component refs - shared_env_vars: Dict[str, str] = {} - if heterogeneous: - for het_idx, group in enumerate(groups): - for command in group.commands: - _, exec_config_probe = command.prepare_for_execution(cluster_config) - shared_env_vars.update(exec_config_probe.get("environment", {})) + # Assign het_group_index values before evaluating any commands so cross-references + # (e.g., hostname_ref) see the correct indices regardless of processing order. + for het_idx, group in enumerate(groups): + for command in group.commands: + command.script.het_group_index = het_idx if heterogeneous else None - # Share packager across executors for efficiency (single-group only) - shared_packager = None + # Prepare commands once and collect runtime data for a second pass where we + # construct executors. This ensures all scripts have resolved cross-references. + prepared_commands: List[Dict] = [] + shared_env_vars: Dict[str, str] = {} - # Build commands and executors for het_idx, group in enumerate(groups): has_multiple_components = len(group.commands) > 1 total_het_groups = ( len(groups) if heterogeneous else (len(group.commands) if has_multiple_components else 1) ) - # For single-group jobs with multiple components, allow job-level GPU override for sbatch allocation - job_level_gpus = ( - group.hardware.num_gpus if (not heterogeneous and has_multiple_components and group.hardware) else None - ) - for comp_idx, command in enumerate(group.commands): - # Assign het_group_index ONLY for heterogeneous jobs (per-job, not global) - # Non-heterogeneous jobs use localhost, so het_group_index should remain None - if heterogeneous: - command.het_group_index = het_idx - else: - command.het_group_index = None - - final_cmd, exec_config = self._prepare_command(command, cluster_config) - commands.append(final_cmd) - - # Adjust GPU allocation (first component gets job-level GPUs for sbatch) for single-group jobs - exec_config["num_gpus"] = exec_config["num_gpus"] or 0 - if (not heterogeneous) and (comp_idx == 0) and (job_level_gpus is not None): - exec_config["num_gpus"] = job_level_gpus - - # Merge shared environment for heterogeneous jobs - if heterogeneous and shared_env_vars: - exec_config["environment"].update(shared_env_vars) - - # Resolve container and create executor - container_image = self._resolve_container(exec_config, command, cluster_config) - # Pass external dependencies only to the first executor (SLURM doesn't support per-component dependencies in hetjobs) - exec_dependencies = external_deps if (het_idx == 0 and comp_idx == 0) else None - executor = self._create_executor( - command, - exec_config, - container_image, - cluster_config, - log_dir, - group.hardware, - heterogeneous, - het_idx if heterogeneous else comp_idx, - total_het_groups, - (len(group.commands) > 1), - dependencies=exec_dependencies, + script, exec_config = self._prepare_command(command, cluster_config) + + if isinstance(script.inline, str): + if cluster_config.get("executor") not in ("none", "local"): + script.set_inline(wrap_python_path(script.inline)) + + prepared_commands.append( + { + "het_idx": het_idx, + "comp_idx": comp_idx, + "group": group, + "command": command, + "script": script, + "exec_config": exec_config, + "total_het_groups": total_het_groups, + "overlap": len(group.commands) > 1, + } ) - # Share packager across executors for single-group jobs - if not heterogeneous: - if comp_idx == 0 and het_idx == 0: - shared_packager = executor.packager - else: - executor.packager = shared_packager - - executors.append(executor) if heterogeneous: - het_group_indices.append(het_idx) + shared_env_vars.update(exec_config.get("environment", {})) + + # Share packager across executors for efficiency (single-group only) + shared_packager = None + + # Build commands and executors using prepared data + for entry in prepared_commands: + het_idx = entry["het_idx"] + comp_idx = entry["comp_idx"] + group = entry["group"] + command = entry["command"] + script = entry["script"] + exec_config = entry["exec_config"] + total_het_groups = entry["total_het_groups"] + overlap = entry["overlap"] + + scripts.append(script) + + # Merge shared environment for heterogeneous jobs + if heterogeneous and shared_env_vars: + exec_config["environment"].update(shared_env_vars) + + # Resolve container and create executor + container_image = self._resolve_container(exec_config, command, cluster_config) + # Pass external dependencies only to the first executor (SLURM doesn't support per-component dependencies in hetjobs) + exec_dependencies = external_deps if (het_idx == 0 and comp_idx == 0) else None + + # Always use group.name for SLURM job name (consistent across all components) + # The group name is set to task_name in generate.py, without component suffixes + # Component names (like {task_name}_server, {task_name}_sandbox) are only used for log_prefix + job_name_for_slurm = group.name + + executor = self._create_executor( + command, + exec_config, + container_image, + cluster_config, + log_dir, + group.hardware, + heterogeneous, + het_idx if heterogeneous else comp_idx, + total_het_groups, + overlap, + dependencies=exec_dependencies, + job_name_override=job_name_for_slurm, + ) + + # Share packager across executors for single-group jobs + if not heterogeneous: + if comp_idx == 0 and het_idx == 0: + shared_packager = executor.packager + else: + executor.packager = shared_packager + + executors.append(executor) + if heterogeneous: + het_group_indices.append(het_idx) # For heterogeneous jobs, set het_group_indices on the first executor if heterogeneous and executors: @@ -676,13 +753,7 @@ def _plan_and_add_job( # If reuse_code=False, clear cache REUSE_CODE_EXP.pop(tunnel_hash(tunnel), None) - # Handle executor="none" path replacements (single-group only) - if (not heterogeneous) and cluster_config["executor"] == "none": - for idx in range(len(commands)): - commands[idx] = commands[idx].replace( - "/nemo_run/code/nemo_skills", str(get_registered_external_repo("nemo_skills").path) - ) - commands[idx] = commands[idx].replace("/nemo_run/code", "./") + # Note: Path replacements for executor="none" are no longer needed with Script interface # Ray metadata handling if self.with_ray and cluster_config["executor"] == "slurm": @@ -693,19 +764,24 @@ def _plan_and_add_job( # Add to experiment and return task ID # Note: Internal dependencies (task handles from same experiment) go to exp.add() # External dependencies (SLURM job IDs from other experiments) go to executor - if (not heterogeneous) and len(commands) == 1: + if (not heterogeneous) and len(scripts) == 1: + # Single script - pass directly to exp.add() + if metadata: + scripts[0].metadata = metadata task_id = exp.add( - run.Script(inline=commands[0], metadata=metadata), + scripts[0], executor=executors[0], name="nemo-run", dependencies=internal_deps, ) else: + # Multiple scripts or heterogeneous job + # Apply metadata to first script only + if metadata: + scripts[0].metadata = metadata + task_id = exp.add( - [ - run.Script(inline=cmd, metadata=(metadata if idx == 0 else None)) - for idx, cmd in enumerate(commands) - ], + scripts, executor=executors, name="nemo-run", dependencies=internal_deps, diff --git a/nemo_skills/pipeline/utils/generation.py b/nemo_skills/pipeline/utils/generation.py index cd576053c1..863b196e20 100644 --- a/nemo_skills/pipeline/utils/generation.py +++ b/nemo_skills/pipeline/utils/generation.py @@ -17,6 +17,7 @@ import shlex import subprocess from collections import defaultdict +from typing import Any, List, Optional, Union from nemo_skills.pipeline.utils.cluster import get_tunnel from nemo_skills.pipeline.utils.mounts import get_unmounted_path @@ -26,6 +27,81 @@ LOG = logging.getLogger(get_logger_name(__file__)) +def normalize_models_config( + model: Optional[Union[str, List[str]]], +) -> List[str]: + """ + Normalize model specification to list. + + Handles both scalar and list inputs: + - CLI (Typer): Converts single values to single-element lists automatically + - Python API: Accepts both strings and lists + + Args: + model: Model path(s) - string or list from Python API, list from CLI + + Returns: + List of model paths + + Raises: + ValueError: If model is None or empty + """ + if model is None: + raise ValueError("Must specify --model") + + # Handle string (Python API with single model) + if isinstance(model, str): + return [model] + + # Handle list + if len(model) == 0: + raise ValueError("Must specify --model") + return list(model) + + +def normalize_parameter( + param_value: Any, + num_models: int, + param_name: str, +) -> List[Any]: + """ + Normalize a parameter to a per-model list. + + Handles both scalar and list inputs for flexible usage: + - CLI (Typer): Converts single values to single-element lists automatically + - Python API: Accepts both scalars and lists directly + + Broadcast logic: + - Scalar value: Broadcast to all models [value] * num_models + - Single-element list: Broadcast to all models + - Multi-element list: Must match num_models exactly + + Args: + param_value: Parameter value (scalar or list) + num_models: Number of models + param_name: Name of parameter (for error messages) + + Returns: + List of parameter values (one per model) + + Raises: + ValueError: If list length doesn't match num_models + """ + if not isinstance(param_value, list): + return [param_value] * num_models + + if len(param_value) == num_models: + return list(param_value) + + if len(param_value) == 1: + return param_value * num_models + + raise ValueError( + f"Parameter {param_name} has {len(param_value)} values but {num_models} models specified. " + f"Must be 1 value (broadcast) or {num_models} values (per-model)." + ) + + def get_chunked_rs_filename( output_dir: str, random_seed: int = None, @@ -294,8 +370,20 @@ def get_generation_cmd( wandb_parameters=None, with_sandbox: bool = False, script: str = "nemo_skills.inference.generate", + # Optional: for multi-model generation + server_addresses: Optional[List[str]] = None, + model_names: Optional[List[str]] = None, + server_types: Optional[List[str]] = None, ): - """Construct the generation command for language model inference.""" + """Construct the generation command for language model inference. + + Supports both single-model and multi-model generation. For multi-model: + - server_addresses: List of server addresses (one per model) + - model_names: List of model names (one per model) + - server_types: List of server types (one per model) + + For single-model, server config is passed via extra_arguments. + """ if input_file is None and input_dir is None: raise ValueError("Either input_file or input_dir must be provided.") if input_file is not None and input_dir is not None: @@ -313,6 +401,7 @@ def get_generation_cmd( output_dir=output_dir, random_seed=random_seed, ) + # Preamble for generation commands: added at executor/declarative level cmd = "export HYDRA_FULL_ERROR=1 && " # Separate Hydra config args (--config-*) from override args (++) @@ -327,6 +416,21 @@ def get_generation_cmd( else: # It's a module name, use -m flag cmd += f"python -m {script} {hydra_config_args} {common_args} " + + # Add multi-model configuration if provided + if server_addresses is not None and model_names is not None: + num_models = len(model_names) + if num_models > 1: + # Multi-model: pass server configuration as lists + model_names_arg = ",".join(model_names) + cmd += f"++server.model=[{model_names_arg}] " + + server_types_arg = ",".join(server_types) + cmd += f"++server.server_type=[{server_types_arg}] " + + server_addresses_arg = ",".join(server_addresses) + cmd += f"++server.base_url=[{server_addresses_arg}] " + job_end_cmd = "" if random_seed is not None and input_dir is None: # if input_dir is not None, we default to greedy generations diff --git a/nemo_skills/pipeline/utils/scripts.py b/nemo_skills/pipeline/utils/scripts.py new file mode 100644 index 0000000000..0657eec98b --- /dev/null +++ b/nemo_skills/pipeline/utils/scripts.py @@ -0,0 +1,428 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Script classes for NeMo-Skills pipeline components. + +These classes wrap NeMo-Run's run.Script interface to provide typed, reusable +job components (servers, clients, sandboxes) with explicit fields and +cross-component reference support for heterogeneous jobs. + +Example: + # Create a server script with automatic port allocation + server = ServerScript( + server_type="vllm", + model_path="/models/llama-8b", + cluster_config=cluster_config, + num_gpus=8, + ) + + # Create a client that references the server + client = GenerationClientScript( + output_dir="/results", + input_file="/data/input.jsonl", + server=server, # Cross-component reference + ) + + # Use in Command objects + Command(script=server, container="vllm", ...) + Command(script=client, container="nemo-skills", ...) +""" + +import logging +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Tuple, Union + +import nemo_run as run + +from nemo_skills.pipeline.utils.commands import sandbox_command +from nemo_skills.pipeline.utils.exp import install_packages_wrap +from nemo_skills.pipeline.utils.generation import get_generation_cmd +from nemo_skills.pipeline.utils.server import get_free_port, get_server_command +from nemo_skills.utils import get_logger_name + +if TYPE_CHECKING: + # Avoid circular imports for type hints + pass + +LOG = logging.getLogger(get_logger_name(__file__)) + + +@dataclass +class BaseJobScript(run.Script): + """Base class for job component scripts with heterogeneous job support. + + This class provides: + - het_group_index tracking for cross-component references in heterogeneous SLURM jobs + - hostname_ref() method for getting hostnames in het jobs + - Common pattern for Script initialization + + Attributes: + het_group_index: Index in heterogeneous job group (set by Pipeline at runtime) + span_group_nodes: Whether to span all nodes from the group's HardwareConfig. + When False (default), the script runs on 1 node regardless of group config. + When True, the script spans all nodes specified in the group's num_nodes. + This is important for multi-node setups with --overlap where the server + needs multiple nodes but client/sandbox should run on the master node only. + """ + + het_group_index: Optional[int] = field(default=None, init=False, repr=False) + span_group_nodes: bool = False # Default: run on 1 node + installation_command: Optional[str] = None + entrypoint: str = field(default="bash", init=False) + + def __post_init__(self): + """Wrap inline command with installation_command if provided.""" + if not self.installation_command: + return + + if callable(self.inline): + original_inline = self.inline + + def wrapped_inline(): + result = original_inline() + if isinstance(result, tuple): + command, metadata = result + return install_packages_wrap(command, self.installation_command), metadata + return install_packages_wrap(result, self.installation_command) + + self.set_inline(wrapped_inline) + elif isinstance(self.inline, str): + self.set_inline(install_packages_wrap(self.inline, self.installation_command)) + + def set_inline(self, command: Union[str, Callable, run.Script]) -> None: + """Set the inline command safely on frozen dataclass.""" + object.__setattr__(self, "inline", command) + + def hostname_ref(self) -> str: + """Get hostname reference for hetjob cross-component communication. + + Returns a shell variable reference that resolves to the master node hostname + for this het group. Uses environment variables automatically exported by nemo-run: + SLURM_MASTER_NODE_HET_GROUP_0, SLURM_MASTER_NODE_HET_GROUP_1, etc. + + These are set via: + export SLURM_MASTER_NODE_HET_GROUP_N=$(scontrol show hostnames $SLURM_JOB_NODELIST_HET_GROUP_N | head -n1) + """ + if self.het_group_index is None: + return "127.0.0.1" # Local fallback for non-heterogeneous jobs + + # Use the environment variable exported by nemo-run + return f"${{SLURM_MASTER_NODE_HET_GROUP_{self.het_group_index}:-localhost}}" + + +@dataclass(kw_only=True) +class ServerScript(BaseJobScript): + """Script for model inference servers (vLLM, TRT-LLM, SGLang, etc.). + + This script wraps server command builders and provides: + - Automatic port allocation if not specified + - Type-safe server configuration + - Cross-component address sharing (get_address()) + - Resource requirement tracking (num_gpus, num_nodes, num_tasks) + + Attributes: + server_type: Type of server (vllm, trtllm, sglang, megatron, openai, etc.) + model_path: Path to model weights or model name for API services + cluster_config: Cluster configuration dictionary + num_gpus: Number of GPUs required (default: 8) + num_nodes: Number of nodes required (default: 1) + server_args: Additional server-specific arguments + server_entrypoint: Custom server entrypoint script (optional) + port: Server port (allocated automatically if None) + allocate_port: Whether to allocate port automatically (default: True) + num_tasks: Number of MPI tasks (computed in __post_init__) + log_prefix: Prefix for log files (default: "server") + + Example: + # Basic usage + server = ServerScript( + server_type="vllm", + model_path="/models/llama-3-8b", + cluster_config=cluster_config, + num_gpus=8, + ) + + # Access allocated port + print(f"Server will run on port {server.port}") + + # Get full address for client connection + address = server.get_address() # Returns "hostname:port" + """ + + server_type: str + model_path: str + cluster_config: Dict + num_gpus: int = 8 + num_nodes: int = 1 + server_args: str = "" + server_entrypoint: Optional[str] = None # Custom server entrypoint script + port: Optional[int] = None + allocate_port: bool = True + + # Server spans all group nodes (e.g., for distributed inference) + span_group_nodes: bool = True + + # Computed fields (set in __post_init__) + num_tasks: int = field(init=False, repr=False) + log_prefix: str = field(default="server", init=False) + + def __post_init__(self): + """Initialize server script. + + - Allocates port if not provided + - Builds server command using get_server_command() + - Sets self.inline to the command string + - Computes num_tasks from server command builder + """ + # Allocate port if not provided + if self.port is None and self.allocate_port: + self.port = get_free_port(strategy="random") + LOG.debug(f"Allocated port {self.port} for {self.server_type} server") + + # Build server command + cmd, self.num_tasks = get_server_command( + server_type=self.server_type, + num_gpus=self.num_gpus, + num_nodes=self.num_nodes, + model_path=self.model_path, + cluster_config=self.cluster_config, + server_port=self.port, + server_args=self.server_args, + server_entrypoint=self.server_entrypoint, + ) + + self.set_inline(cmd) + super().__post_init__() + + def get_address(self) -> str: + """Get server address for client connections. + + Returns hostname:port string that clients can use to connect. + In heterogeneous jobs, hostname_ref() returns a bash expression + that resolves at runtime. + + Returns: + Server address in format "hostname:port" + + Example: + # Use in client command + client_cmd = f"python client.py --server-url http://{server.get_address()}" + """ + return f"{self.hostname_ref()}:{self.port}" + + +@dataclass(kw_only=True) +class SandboxScript(BaseJobScript): + """Script for code execution sandbox container. + + The sandbox provides a secure environment for executing LLM-generated code. + This script wraps sandbox command builders and provides: + - Automatic port allocation + - Mount configuration (can optionally keep mounts, though risky) + - Type-safe sandbox configuration + + Attributes: + cluster_config: Cluster configuration dictionary + port: Sandbox port (allocated automatically if None) + keep_mounts: Whether to keep filesystem mounts (default: False, risky if True). + Note: This is stored for documentation but actually handled at + the executor level, not in the sandbox command itself. + allocate_port: Whether to allocate port automatically (default: True) + log_prefix: Prefix for log files (default: "sandbox") + + Example: + sandbox = SandboxScript( + cluster_config=cluster_config, + keep_mounts=False, # Safer: sandbox has no access to mounted paths + ) + + # Client can reference sandbox port + client = GenerationClientScript(..., sandbox=sandbox) + """ + + cluster_config: Dict + port: Optional[int] = None + keep_mounts: bool = False + allocate_port: bool = True + env_overrides: Optional[List[str]] = None # Extra env vars in KEY=VALUE form + log_prefix: str = field(default="sandbox", init=False) + + def __post_init__(self): + """Initialize sandbox script. + + - Allocates port if not provided + - Builds sandbox command using sandbox_command() + - Sets self.inline to a callable that returns command and environment vars + """ + # Allocate port if not provided + if self.port is None and self.allocate_port: + self.port = get_free_port(strategy="random") + LOG.debug(f"Allocated port {self.port} for sandbox") + + # Build sandbox command and metadata (including environment vars) + # Note: keep_mounts is handled at the executor level, not in the command itself + cmd, metadata = sandbox_command( + cluster_config=self.cluster_config, + port=self.port, + ) + + # Use a callable to return both command and environment variables + # This ensures the sandbox's LISTEN_PORT and NGINX_PORT are properly set + def build_cmd() -> Tuple[str, Dict]: + env = dict(metadata.get("environment", {})) + # Apply user-specified environment overrides + if self.env_overrides: + for override in self.env_overrides: + key, value = override.split("=", 1) + env[key] = value + return cmd, {"environment": env} + + self.set_inline(build_cmd) + super().__post_init__() + + +@dataclass(kw_only=True) +class GenerationClientScript(BaseJobScript): + """Script for LLM generation/inference client. + + This script wraps generation command builders and provides: + - Cross-component references to multiple servers and sandbox + - Lazy command building for runtime hostname resolution + - Type-safe generation configuration + - Environment variable handling for sandbox/server communication + + Attributes: + output_dir: Directory for output files + input_file: Input JSONL file (mutually exclusive with input_dir) + input_dir: Input directory (mutually exclusive with input_file) + extra_arguments: Additional arguments for generation script + random_seed: Random seed for sampling (optional) + chunk_id: Chunk ID for parallel processing (optional) + num_chunks: Total number of chunks (required if chunk_id set) + preprocess_cmd: Command to run before generation (optional) + postprocess_cmd: Command to run after generation (optional) + wandb_parameters: WandB logging configuration (optional) + with_sandbox: Whether sandbox is enabled + script: Module or file path for generation script (default: nemo_skills.inference.generate) + servers: List of ServerScript references (None for pre-hosted servers) + server_addresses_prehosted: Addresses for pre-hosted servers (parallel to servers list) + model_names: Model names for multi-model generation (optional) + server_types: Server types for multi-model generation (optional) + sandbox: Reference to SandboxScript for cross-component communication (optional) + log_prefix: Prefix for log files (default: "main") + + Examples: + # Single server + client = GenerationClientScript( + output_dir="/results", + input_file="/data/input.jsonl", + servers=[server_script], + model_names=["llama-8b"], + server_types=["vllm"], + ) + + # Multi-model with self-hosted and pre-hosted servers + client = GenerationClientScript( + output_dir="/results", + input_file="/data/input.jsonl", + servers=[server1, server2, None], # None = pre-hosted + server_addresses_prehosted=["", "", "https://api.openai.com"], + model_names=["llama-8b", "llama-70b", "gpt-4"], + server_types=["vllm", "vllm", "openai"], + sandbox=sandbox_script, + with_sandbox=True, + ) + """ + + output_dir: str + input_file: Optional[str] = None + input_dir: Optional[str] = None + extra_arguments: str = "" + random_seed: Optional[int] = None + chunk_id: Optional[int] = None + num_chunks: Optional[int] = None + preprocess_cmd: Optional[str] = None + postprocess_cmd: Optional[str] = None + wandb_parameters: Optional[Dict] = None + with_sandbox: bool = False + script: str = "nemo_skills.inference.generate" + + # Cross-component references for single/multi-model + servers: Optional[List[Optional["ServerScript"]]] = None + server_addresses_prehosted: Optional[List[str]] = None + model_names: Optional[List[str]] = None + server_types: Optional[List[str]] = None + sandbox: Optional["SandboxScript"] = None + + log_prefix: str = field(default="main", init=False) + + def __post_init__(self): + """Initialize generation client script with lazy command building. + + Builds command lazily via a callable that is evaluated when het_group_index + is assigned, allowing hostname_ref() to resolve correctly for heterogeneous jobs. + + This works for both cases: + - With cross-refs: Resolves server hostnames and sandbox ports at runtime + - Without cross-refs: Just builds the command string (no runtime resolution needed) + """ + + def build_cmd() -> Tuple[str, Dict]: + """Build command at runtime when cross-refs are resolved.""" + env_vars = {} + + # Add sandbox port to environment if sandbox is referenced + if self.sandbox: + env_vars["NEMO_SKILLS_SANDBOX_PORT"] = str(self.sandbox.port) + + # Build server addresses if servers are provided + server_addresses = None + if self.servers is not None: + server_addresses = [] + for server_idx, server_script in enumerate(self.servers): + if server_script is not None: + # Self-hosted: construct address from hostname and port refs + addr = f"{server_script.hostname_ref()}:{server_script.port}" + else: + # Pre-hosted: use the address from server_addresses_prehosted + addr = self.server_addresses_prehosted[server_idx] + server_addresses.append(addr) + + # Build generation command + cmd = get_generation_cmd( + output_dir=self.output_dir, + input_file=self.input_file, + input_dir=self.input_dir, + extra_arguments=self.extra_arguments, + random_seed=self.random_seed, + chunk_id=self.chunk_id, + num_chunks=self.num_chunks, + preprocess_cmd=self.preprocess_cmd, + postprocess_cmd=self.postprocess_cmd, + wandb_parameters=self.wandb_parameters, + with_sandbox=self.with_sandbox, + script=self.script, + # Multi-model parameters (None for single-model) + server_addresses=server_addresses, + model_names=self.model_names, + server_types=self.server_types, + ) + + # Return command and runtime metadata (environment vars) + return cmd, {"environment": env_vars} + + # Always use lazy command building + self.set_inline(build_cmd) + super().__post_init__() diff --git a/tests/gpu-tests/test_eval.py b/tests/gpu-tests/test_eval.py index 422fdfa830..a753cb7f53 100644 --- a/tests/gpu-tests/test_eval.py +++ b/tests/gpu-tests/test_eval.py @@ -44,7 +44,7 @@ "mbpp", "mmau-pro", "asr-leaderboard", - "aalcr", # Has tokenization mismatch issues + "mrcr", "audiobench", "librispeech-pc", } diff --git a/tests/test_declarative_pipeline.py b/tests/test_declarative_pipeline.py index 92117e403d..9d76fd4721 100644 --- a/tests/test_declarative_pipeline.py +++ b/tests/test_declarative_pipeline.py @@ -16,6 +16,7 @@ import json import os +from typing import Callable, Optional from unittest.mock import MagicMock, patch import pytest @@ -26,127 +27,83 @@ from nemo_skills.pipeline.utils.declarative import Command, CommandGroup, HardwareConfig, Pipeline -class TestCommand: - """Test Command class functionality.""" +class DummyScript: + """Minimal run.Script stand-in for unit tests.""" - def test_command_basic_string(self): - """Test creating a Command with a simple string.""" - cmd = Command(command="echo hello", name="test") - assert cmd.name == "test" - assert cmd.container == "nemo-skills" - assert cmd.gpus is None - assert cmd.nodes == 1 + def __init__(self, inline: str | Callable | None = "echo test"): + self.inline = inline + self.log_prefix = "main" + self.metadata = {} + self.het_group_index: Optional[int] = None - def test_command_with_metadata(self): - """Test Command with metadata passed separately.""" - cmd = Command(command="echo hello", name="server", metadata={"port": 8080, "log_prefix": "server"}) - assert cmd.metadata["port"] == 8080 - assert cmd.metadata["log_prefix"] == "server" - # Command gets wrapped with working_dir by default - assert "echo hello" in cmd.command + def set_inline(self, inline): + self.inline = inline - def test_command_with_callable(self): - """Test Command with callable that returns tuple.""" + def hostname_ref(self) -> str: + if self.het_group_index is None: + return "127.0.0.1" + return f"${{SLURM_MASTER_NODE_HET_GROUP_{self.het_group_index}:-localhost}}" - def make_cmd(): - return ("echo world", {"port": 5000}) - cmd = Command(command=make_cmd, name="dynamic") - assert callable(cmd.command) - assert cmd.name == "dynamic" +def make_command(*, inline: str | Callable | None = "echo test", name: str = "cmd", script: DummyScript | None = None): + """Helper to build Command objects with DummyScript instances.""" + script_obj = script or DummyScript(inline=inline) + return Command(script=script_obj, name=name) + + +class TestCommand: + """Tests for the new Script-based Command wrapper.""" + + def test_command_basic_script(self): + cmd = make_command(inline="echo hello", name="test") + assert cmd.name == "test" + assert cmd.container == "nemo-skills" + assert cmd.script.inline == "echo hello" def test_command_prepare_for_execution_string(self): - """Test prepare_for_execution with string command.""" - cmd = Command(command="python script.py", gpus=2, name="test") + cmd = make_command(inline="python script.py", name="test") cluster_config = {"executor": "local", "containers": {}} - final_cmd, exec_config = cmd.prepare_for_execution(cluster_config) + script_obj, exec_config = cmd.prepare_for_execution(cluster_config) - assert "python script.py" in final_cmd - assert exec_config["num_gpus"] == 2 - assert exec_config["num_nodes"] == 1 - assert exec_config["num_tasks"] == 1 + assert script_obj.inline == "python script.py" + assert exec_config["log_prefix"] == "main" + assert exec_config["environment"] == {} def test_command_prepare_for_execution_callable(self): - """Test prepare_for_execution with callable command.""" - - def make_cmd(): - return "echo test" - - cmd = Command(command=make_cmd, name="test") + script = DummyScript(inline=lambda: "echo test") + cmd = make_command(name="test", script=script) cluster_config = {"executor": "local", "containers": {}} - final_cmd, exec_config = cmd.prepare_for_execution(cluster_config) - - assert final_cmd == "echo test" + script_obj, _ = cmd.prepare_for_execution(cluster_config) + assert script_obj.inline == "echo test" def test_command_prepare_for_execution_callable_with_metadata(self): - """Test prepare_for_execution with callable returning tuple.""" - def make_cmd(): - return ("echo metadata", {"num_tasks": 4, "environment": {"VAR": "value"}}) + return ("echo metadata", {"environment": {"VAR": "value"}}) - cmd = Command(command=make_cmd, name="test") + script = DummyScript(inline=make_cmd) + cmd = make_command(name="test", script=script) cluster_config = {"executor": "local", "containers": {}} - final_cmd, exec_config = cmd.prepare_for_execution(cluster_config) + _, exec_config = cmd.prepare_for_execution(cluster_config) - assert final_cmd == "echo metadata" - assert exec_config["num_tasks"] == 4 assert exec_config["environment"]["VAR"] == "value" - def test_command_meta_ref(self): - """Test meta_ref for accessing metadata.""" - cmd = Command(command="echo test", name="server", metadata={"port": 8080, "host": "localhost"}) - - assert cmd.meta_ref("port") == "8080" - assert cmd.meta_ref("host") == "localhost" - - def test_command_meta_ref_missing_key(self): - """Test meta_ref with missing key raises KeyError.""" - cmd = Command(command="echo test", name="test") - - with pytest.raises(KeyError, match="Metadata key 'port' not found"): - cmd.meta_ref("port") - def test_command_hostname_ref_none(self): - """Test hostname_ref returns localhost when het_group_index is None.""" - cmd = Command(command="echo test", name="test") - assert cmd.het_group_index is None - assert cmd.hostname_ref() == "127.0.0.1" - - def test_command_hostname_ref_heterogeneous(self): - """Test hostname_ref returns SLURM variable when het_group_index is set.""" - cmd = Command(command="echo test", name="test") - cmd.het_group_index = 2 - - hostname = cmd.hostname_ref() - assert "$SLURM_JOB_NODELIST_HET_GROUP_2" in hostname - assert "scontrol" in hostname - - def test_command_with_installation_command(self): - """Test Command with installation_command.""" - cmd = Command(command="python script.py", installation_command="pip install package", name="test") - cluster_config = {"executor": "local", "containers": {}} - - final_cmd, _ = cmd.prepare_for_execution(cluster_config) + script = DummyScript() + cmd = make_command(name="test", script=script) - # Installation command should be wrapped around the main command - assert "pip install package" in final_cmd - assert "python script.py" in final_cmd + assert script.hostname_ref() == "127.0.0.1" + assert cmd.get_name() == "test" - def test_command_env_vars_wrapping(self): - """Test that env_vars and working_dir are applied to string commands.""" - cmd = Command( - command="python script.py", - env_vars={"MY_VAR": "value"}, - working_dir="/custom/path", - name="test", - ) + def test_command_hostname_ref_heterogeneous(self): + script = DummyScript() + script.het_group_index = 2 + make_command(name="test", script=script) - # The command should be wrapped with env setup - assert "export MY_VAR=value" in cmd.command - assert "cd /custom/path" in cmd.command + hostname = script.hostname_ref() + assert "${SLURM_MASTER_NODE_HET_GROUP_2" in hostname class TestCommandGroup: @@ -154,8 +111,8 @@ class TestCommandGroup: def test_commandgroup_basic(self): """Test creating a basic CommandGroup.""" - cmd1 = Command(command="echo 1", name="cmd1") - cmd2 = Command(command="echo 2", name="cmd2") + cmd1 = make_command(inline="echo 1", name="cmd1") + cmd2 = make_command(inline="echo 2", name="cmd2") group = CommandGroup(commands=[cmd1, cmd2], name="test_group") @@ -165,7 +122,7 @@ def test_commandgroup_basic(self): def test_commandgroup_with_hardware(self): """Test CommandGroup with HardwareConfig.""" - cmd = Command(command="echo test", name="cmd") + cmd = make_command(inline="echo test", name="cmd") hardware = HardwareConfig(partition="batch", sbatch_kwargs={"time_min": "01:00:00"}, num_gpus=8) group = CommandGroup(commands=[cmd], hardware=hardware, name="gpu_group") @@ -176,7 +133,7 @@ def test_commandgroup_with_hardware(self): def test_commandgroup_with_log_dir(self): """Test CommandGroup with log_dir.""" - cmd = Command(command="echo test", name="cmd") + cmd = make_command(inline="echo test", name="cmd") group = CommandGroup(commands=[cmd], log_dir="/logs/test", name="group") assert group.log_dir == "/logs/test" @@ -187,7 +144,7 @@ class TestPipeline: def test_pipeline_with_single_job(self): """Test Pipeline with single job.""" - cmd = Command(command="echo test", name="cmd") + cmd = make_command(inline="echo test", name="cmd") group = CommandGroup(commands=[cmd], name="group") cluster_config = {"executor": "local", "containers": {}} @@ -204,10 +161,10 @@ def test_pipeline_with_single_job(self): def test_pipeline_with_jobs(self): """Test Pipeline with jobs parameter (full format with dependencies).""" - cmd1 = Command(command="echo 1", name="cmd1") + cmd1 = make_command(inline="echo 1", name="cmd1") group1 = CommandGroup(commands=[cmd1], name="group1", log_dir="/logs") - cmd2 = Command(command="echo 2", name="cmd2") + cmd2 = make_command(inline="echo 2", name="cmd2") group2 = CommandGroup(commands=[cmd2], name="group2", log_dir="/logs") job1 = {"name": "job1", "group": group1} @@ -232,7 +189,7 @@ def test_pipeline_requires_jobs(self): def test_pipeline_with_run_after(self): """Test Pipeline with run_after parameter.""" - cmd = Command(command="echo test", name="cmd") + cmd = make_command(inline="echo test", name="cmd") group = CommandGroup(commands=[cmd], name="group") cluster_config = {"executor": "local", "containers": {}} @@ -248,7 +205,7 @@ def test_pipeline_with_run_after(self): def test_pipeline_with_run_after_list(self): """Test Pipeline with run_after as list.""" - cmd = Command(command="echo test", name="cmd") + cmd = make_command(inline="echo test", name="cmd") group = CommandGroup(commands=[cmd], name="group") cluster_config = {"executor": "local", "containers": {}} @@ -264,7 +221,7 @@ def test_pipeline_with_run_after_list(self): def test_pipeline_cluster_config_passed_directly(self): """Test that cluster_config is passed directly (no more string resolution).""" - cmd = Command(command="echo test", name="cmd") + cmd = make_command(inline="echo test", name="cmd") group = CommandGroup(commands=[cmd], name="group") cluster_config = {"executor": "local", "containers": {}} @@ -299,7 +256,7 @@ def test_pipeline_run_basic(self, mock_run_exp, mock_env_vars, mock_get_exp): mock_get_exp.return_value.__enter__.return_value = mock_exp # Create pipeline - cmd = Command(command="echo test", name="cmd") + cmd = make_command(inline="echo test", name="cmd") group = CommandGroup(commands=[cmd], name="group", log_dir="/logs") pipeline = Pipeline( name="test", cluster_config=mock_config, jobs=[{"name": "job1", "group": group}], skip_hf_home_check=True @@ -329,10 +286,10 @@ def test_pipeline_run_with_dependencies(self, mock_run_exp, mock_env_vars, mock_ mock_get_exp.return_value.__enter__.return_value = mock_exp # Create pipeline with internal dependencies - cmd1 = Command(command="echo 1", name="cmd1") + cmd1 = make_command(inline="echo 1", name="cmd1") group1 = CommandGroup(commands=[cmd1], name="group1", log_dir="/logs") - cmd2 = Command(command="echo 2", name="cmd2") + cmd2 = make_command(inline="echo 2", name="cmd2") group2 = CommandGroup(commands=[cmd2], name="group2", log_dir="/logs") job1 = {"name": "job1", "group": group1, "dependencies": []} @@ -375,7 +332,7 @@ def test_pipeline_hf_home_validation(self, mock_get_executor, mock_is_mounted, m mock_exp.add.return_value = "handle" mock_get_exp.return_value.__enter__.return_value = mock_exp - cmd = Command(command="echo test", name="cmd") + cmd = make_command(inline="echo test", name="cmd") group = CommandGroup(commands=[cmd], name="group", log_dir="/logs") pipeline = Pipeline(name="test", cluster_config=mock_config, jobs=[{"name": "job1", "group": group}]) @@ -391,7 +348,7 @@ def test_pipeline_hf_home_missing(self, mock_env_vars): mock_config = {"executor": "slurm", "containers": {}} mock_env_vars.return_value = {} # No HF_HOME - cmd = Command(command="echo test", name="cmd") + cmd = make_command(inline="echo test", name="cmd") group = CommandGroup(commands=[cmd], name="group", log_dir="/logs") # Should raise in __init__ now, not run() @@ -406,7 +363,7 @@ def test_pipeline_hf_home_not_mounted(self, mock_is_mounted, mock_env_vars): mock_env_vars.return_value = {"HF_HOME": "/hf"} mock_is_mounted.return_value = False - cmd = Command(command="echo test", name="cmd") + cmd = make_command(inline="echo test", name="cmd") group = CommandGroup(commands=[cmd], name="group", log_dir="/logs") # Should raise in __init__ now, not run() @@ -432,8 +389,8 @@ def test_het_group_index_non_heterogeneous(self, mock_env_vars, mock_get_exp): mock_get_exp.return_value.__enter__.return_value = mock_exp # Create single-group job with multiple components - cmd1 = Command(command="echo 1", name="cmd1") - cmd2 = Command(command="echo 2", name="cmd2") + cmd1 = make_command(inline="echo 1", name="cmd1") + cmd2 = make_command(inline="echo 2", name="cmd2") group = CommandGroup(commands=[cmd1, cmd2], name="group", log_dir="/logs") pipeline = Pipeline( @@ -442,10 +399,10 @@ def test_het_group_index_non_heterogeneous(self, mock_env_vars, mock_get_exp): pipeline.run(dry_run=True) # Both commands should have None het_group_index (localhost communication) - assert cmd1.het_group_index is None - assert cmd2.het_group_index is None - assert cmd1.hostname_ref() == "127.0.0.1" - assert cmd2.hostname_ref() == "127.0.0.1" + assert cmd1.script.het_group_index is None + assert cmd2.script.het_group_index is None + assert cmd1.script.hostname_ref() == "127.0.0.1" + assert cmd2.script.hostname_ref() == "127.0.0.1" @patch("nemo_skills.pipeline.utils.declarative.get_exp") @patch("nemo_skills.pipeline.utils.declarative.get_env_variables") @@ -462,10 +419,10 @@ def test_het_group_index_heterogeneous(self, mock_env_vars, mock_get_exp): mock_get_exp.return_value.__enter__.return_value = mock_exp # Create multi-group heterogeneous job - cmd1 = Command(command="echo 1", name="cmd1") + cmd1 = make_command(inline="echo 1", name="cmd1") group1 = CommandGroup(commands=[cmd1], name="group1", log_dir="/logs") - cmd2 = Command(command="echo 2", name="cmd2") + cmd2 = make_command(inline="echo 2", name="cmd2") group2 = CommandGroup(commands=[cmd2], name="group2", log_dir="/logs") jobs = [{"name": "hetjob", "groups": [group1, group2]}] @@ -473,10 +430,10 @@ def test_het_group_index_heterogeneous(self, mock_env_vars, mock_get_exp): pipeline.run(dry_run=True) # Commands should have het_group_index 0 and 1 - assert cmd1.het_group_index == 0 - assert cmd2.het_group_index == 1 - assert "$SLURM_JOB_NODELIST_HET_GROUP_0" in cmd1.hostname_ref() - assert "$SLURM_JOB_NODELIST_HET_GROUP_1" in cmd2.hostname_ref() + assert cmd1.script.het_group_index == 0 + assert cmd2.script.het_group_index == 1 + assert "SLURM_MASTER_NODE_HET_GROUP_0" in cmd1.script.hostname_ref() + assert "SLURM_MASTER_NODE_HET_GROUP_1" in cmd2.script.hostname_ref() @patch("nemo_skills.pipeline.utils.declarative.get_exp") @patch("nemo_skills.pipeline.utils.declarative.get_env_variables") @@ -493,16 +450,16 @@ def test_het_group_index_per_job_not_global(self, mock_env_vars, mock_get_exp): mock_get_exp.return_value.__enter__.return_value = mock_exp # Create two separate heterogeneous jobs - cmd1 = Command(command="echo 1", name="cmd1") + cmd1 = make_command(inline="echo 1", name="cmd1") group1 = CommandGroup(commands=[cmd1], name="group1", log_dir="/logs") - cmd2 = Command(command="echo 2", name="cmd2") + cmd2 = make_command(inline="echo 2", name="cmd2") group2 = CommandGroup(commands=[cmd2], name="group2", log_dir="/logs") - cmd3 = Command(command="echo 3", name="cmd3") + cmd3 = make_command(inline="echo 3", name="cmd3") group3 = CommandGroup(commands=[cmd3], name="group3", log_dir="/logs") - cmd4 = Command(command="echo 4", name="cmd4") + cmd4 = make_command(inline="echo 4", name="cmd4") group4 = CommandGroup(commands=[cmd4], name="group4", log_dir="/logs") jobs = [ @@ -513,10 +470,10 @@ def test_het_group_index_per_job_not_global(self, mock_env_vars, mock_get_exp): pipeline.run(dry_run=True) # Both jobs should have het_group_index starting from 0 - assert cmd1.het_group_index == 0 - assert cmd2.het_group_index == 1 - assert cmd3.het_group_index == 0 # Starts from 0 again! - assert cmd4.het_group_index == 1 + assert cmd1.script.het_group_index == 0 + assert cmd2.script.het_group_index == 1 + assert cmd3.script.het_group_index == 0 # Starts from 0 again! + assert cmd4.script.het_group_index == 1 class TestDependencyResolution: @@ -536,7 +493,7 @@ def test_dependency_none_handling(self, mock_env_vars, mock_get_exp): mock_exp.add.return_value = "handle" mock_get_exp.return_value.__enter__.return_value = mock_exp - cmd = Command(command="echo test", name="cmd") + cmd = make_command(inline="echo test", name="cmd") group = CommandGroup(commands=[cmd], name="group", log_dir="/logs") jobs = [{"name": "job", "group": group, "dependencies": None}] @@ -559,7 +516,7 @@ def test_pipeline_run_after_applies_to_jobs(self, mock_env_vars, mock_get_exp): mock_exp.add.return_value = "handle" mock_get_exp.return_value.__enter__.return_value = mock_exp - cmd = Command(command="echo test", name="cmd") + cmd = make_command(inline="echo test", name="cmd") group = CommandGroup(commands=[cmd], name="group", log_dir="/logs") pipeline = Pipeline( @@ -589,7 +546,7 @@ def test_pipeline_job_missing_group_or_groups(self): def test_commandgroup_missing_log_dir(self): """Test that CommandGroup without log_dir raises error during execution.""" mock_config = {"executor": "none", "containers": {}} - cmd = Command(command="echo test", name="cmd") + cmd = make_command(inline="echo test", name="cmd") group = CommandGroup(commands=[cmd], name="group") # No log_dir pipeline = Pipeline(name="test", cluster_config=mock_config, jobs=[{"name": "job1", "group": group}]) @@ -626,14 +583,14 @@ def test_multiple_internal_dependencies(self): } # Job 1 and Job 2: independent - cmd1 = Command(command="echo job1", name="job1") + cmd1 = make_command(inline="echo job1", name="job1") group1 = CommandGroup(commands=[cmd1], name="group1", log_dir="/tmp/logs") - cmd2 = Command(command="echo job2", name="job2") + cmd2 = make_command(inline="echo job2", name="job2") group2 = CommandGroup(commands=[cmd2], name="group2", log_dir="/tmp/logs") # Job 3: depends on both job1 and job2 - cmd3 = Command(command="echo job3", name="job3") + cmd3 = make_command(inline="echo job3", name="job3") group3 = CommandGroup(commands=[cmd3], name="group3", log_dir="/tmp/logs") job1_spec = {"name": "job1", "group": group1} @@ -715,11 +672,11 @@ def mock_get_executor(**kwargs): } # Job 1: depends on external experiment - cmd1 = Command(command="echo job1", name="job1") + cmd1 = make_command(inline="echo job1", name="job1") group1 = CommandGroup(commands=[cmd1], name="group1", log_dir="/tmp/logs") # Job 2: depends on job1 (internal) AND external experiment - cmd2 = Command(command="echo job2", name="job2") + cmd2 = make_command(inline="echo job2", name="job2") group2 = CommandGroup(commands=[cmd2], name="group2", log_dir="/tmp/logs") job1_spec = { @@ -931,35 +888,38 @@ def capture_env_update(cluster_config, updates): # Debug: print what we captured print(f"Captured env updates: {env_updates_captured}") - # Find the client and sandbox environment updates - client_env = None - sandbox_env = None + # Verify both sandbox and client environment variables are captured + assert len(env_updates_captured) >= 2, ( + f"Expected at least 2 environment updates (sandbox + client), got {len(env_updates_captured)}: {env_updates_captured}" + ) + # Find the sandbox and client environment updates + sandbox_env = None + client_env = None for env_update in env_updates_captured: + if "LISTEN_PORT" in env_update and "NGINX_PORT" in env_update: + sandbox_env = env_update if "NEMO_SKILLS_SANDBOX_PORT" in env_update: client_env = env_update - elif "LISTEN_PORT" in env_update and "NGINX_PORT" in env_update: - sandbox_env = env_update - # Verify client got NEMO_SKILLS_SANDBOX_PORT (old behavior: exp.py line 493) - # This is the key fix - ensuring sandbox port is passed to client - assert client_env is not None, ( - f"Client environment update not found. Captured updates: {env_updates_captured}\n" - f"This means NEMO_SKILLS_SANDBOX_PORT was not set for the client command, " - f"so the Sandbox class cannot connect to the sandbox server." + # Verify sandbox got LISTEN_PORT and NGINX_PORT + assert sandbox_env is not None, ( + f"LISTEN_PORT/NGINX_PORT not set for sandbox command: {env_updates_captured}" ) - assert "NEMO_SKILLS_SANDBOX_PORT" in client_env, ( - "NEMO_SKILLS_SANDBOX_PORT not set for client command" + assert sandbox_env["LISTEN_PORT"] == sandbox_env["NGINX_PORT"], ( + f"LISTEN_PORT and NGINX_PORT should match: {sandbox_env}" ) - # Verify sandbox got its environment vars (old behavior: exp.py lines 525-538) - assert sandbox_env is not None, ( - f"Sandbox environment update not found. Captured: {env_updates_captured}" + # Verify client got NEMO_SKILLS_SANDBOX_PORT + assert client_env is not None, ( + f"NEMO_SKILLS_SANDBOX_PORT not set for client command: {env_updates_captured}" ) - assert "LISTEN_PORT" in sandbox_env, "LISTEN_PORT not set for sandbox" - assert "NGINX_PORT" in sandbox_env, "NGINX_PORT not set for sandbox" - # This test verifies the fix works end-to-end through the actual generate() function + # Verify the ports match between sandbox and client + assert client_env["NEMO_SKILLS_SANDBOX_PORT"] == sandbox_env["LISTEN_PORT"], ( + f"Sandbox port mismatch: client has {client_env['NEMO_SKILLS_SANDBOX_PORT']}, " + f"sandbox has {sandbox_env['LISTEN_PORT']}" + ) if __name__ == "__main__": diff --git a/tests/test_generation.py b/tests/test_generation.py index b69b526a0e..2693d62241 100644 --- a/tests/test_generation.py +++ b/tests/test_generation.py @@ -16,12 +16,12 @@ # running most things through subprocess since that's how it's usually used import subprocess -from unittest.mock import MagicMock import pytest from nemo_skills.evaluation.metrics import ComputeMetrics -from nemo_skills.pipeline.generate import _create_commandgroup_from_config +from nemo_skills.pipeline.generate import _create_job_unified +from nemo_skills.pipeline.utils.scripts import ServerScript def test_eval_gsm8k_api(tmp_path): @@ -153,36 +153,42 @@ def test_generate_openai_format(tmp_path, format): assert len(data[1]["generation"]) > 0 -def test_server_metadata_from_num_tasks(): +def test_server_metadata_from_num_tasks(tmp_path): """Test that metadata dict is properly created from server command returning (cmd, num_tasks).""" - mock_server_fn = MagicMock(return_value=("python server.py", 4)) cluster_config = { - "containers": {"vllm": "nvcr.io/nvidia/nemo:vllm", "nemo-skills": "nvcr.io/nvidia/nemo:skills"}, - "executor": "slurm", + "containers": { + "vllm": "apitest/vllm", + "nemo-skills": "apitest/nemo-skills", + "sandbox": "apitest/sandbox", + }, + "executor": "none", } server_config = { "server_type": "vllm", "num_gpus": 8, "num_nodes": 1, - "model_path": "/models/test", + "model_path": str(tmp_path / "model"), "server_port": 5000, + "server_args": "", } + generation_params = {"output_dir": "/tmp/out"} - cmd_group = _create_commandgroup_from_config( - generation_cmd="python generate.py", - server_config=server_config, - with_sandbox=False, - sandbox_port=None, + groups = _create_job_unified( + models=[server_config["model_path"]], + server_configs=[server_config], + generation_params=generation_params, cluster_config=cluster_config, installation_command=None, - get_server_command_fn=mock_server_fn, + with_sandbox=False, partition=None, keep_mounts_for_sandbox=False, task_name="test-task", log_dir="/tmp/logs", ) - server_cmd = cmd_group.commands[0] - assert isinstance(server_cmd.metadata, dict) - assert server_cmd.metadata["num_tasks"] == 4 - assert server_cmd.metadata["gpus"] == 8 + server_cmd = groups[0].commands[0] + assert isinstance(server_cmd.script, ServerScript) + assert server_cmd.script.num_tasks >= 1 + assert server_cmd.script.num_gpus == server_config["num_gpus"] + assert groups[0].hardware.num_gpus == server_config["num_gpus"] + assert groups[0].hardware.num_tasks == server_cmd.script.num_tasks diff --git a/tests/test_nemo_evaluator_pipeline.py b/tests/test_nemo_evaluator_pipeline.py index 22ac250882..0f333ab748 100644 --- a/tests/test_nemo_evaluator_pipeline.py +++ b/tests/test_nemo_evaluator_pipeline.py @@ -17,8 +17,14 @@ import pytest -from nemo_skills.pipeline.nemo_evaluator import nemo_evaluator as nemo_evaluator_fn +from nemo_skills.pipeline.nemo_evaluator import ( + EvaluatorClientScript, +) +from nemo_skills.pipeline.nemo_evaluator import ( + nemo_evaluator as nemo_evaluator_fn, +) from nemo_skills.pipeline.utils.declarative import Command, CommandGroup +from nemo_skills.pipeline.utils.scripts import ServerScript @pytest.fixture @@ -131,9 +137,8 @@ def test_no_servers_external_urls( # Verify client command client_cmd = group.commands[0] assert isinstance(client_cmd, Command) - assert "evaluator-test-0" in client_cmd.name - assert client_cmd.gpus is None # No GPUs when no hosted servers - assert client_cmd.nodes == 1 + assert client_cmd.name.startswith("evaluator-test-client-0") + assert isinstance(client_cmd.script, EvaluatorClientScript) # Verify hardware config assert group.hardware is not None @@ -181,16 +186,17 @@ def test_main_server_hosted( server_cmd = group.commands[0] assert isinstance(server_cmd, Command) assert "server" in server_cmd.name - assert server_cmd.gpus == 8 - assert server_cmd.nodes == 1 - assert "port" in server_cmd.metadata - assert server_cmd.metadata["log_prefix"] == "server" + assert isinstance(server_cmd.script, ServerScript) + assert server_cmd.script.num_gpus == 8 + assert server_cmd.script.log_prefix == "server" + assert server_cmd.script.port is not None # Verify client command client_cmd = group.commands[1] assert isinstance(client_cmd, Command) assert "client" in client_cmd.name - assert callable(client_cmd.command) # Should be lambda for cross-component refs + assert isinstance(client_cmd.script, EvaluatorClientScript) + assert callable(client_cmd.script.inline) # Should be lambda for cross-component refs # Verify hardware config (should use server GPUs) assert group.hardware.num_gpus == 8 @@ -235,14 +241,16 @@ def test_judge_server_hosted( judge_cmd = group.commands[0] assert isinstance(judge_cmd, Command) assert "judge-server" in judge_cmd.name - assert judge_cmd.gpus == 32 - assert judge_cmd.metadata["log_prefix"] == "judge-server" + assert isinstance(judge_cmd.script, ServerScript) + assert judge_cmd.script.num_gpus == 32 + assert judge_cmd.script.log_prefix == "judge-server" # Verify client command client_cmd = group.commands[1] assert isinstance(client_cmd, Command) assert "client" in client_cmd.name - assert callable(client_cmd.command) # Should be lambda for cross-component refs + assert isinstance(client_cmd.script, EvaluatorClientScript) + assert callable(client_cmd.script.inline) # Should be lambda for cross-component refs # Verify hardware config (should use judge server GPUs) assert group.hardware.num_gpus == 32 @@ -300,19 +308,22 @@ def test_both_servers_hosted_separate_groups( server_cmd = server_group.commands[0] assert isinstance(server_cmd, Command) assert "server" in server_cmd.name - assert server_cmd.gpus == 8 + assert isinstance(server_cmd.script, ServerScript) + assert server_cmd.script.num_gpus == 8 # Verify client command in first group client_cmd = server_group.commands[1] assert isinstance(client_cmd, Command) assert "client" in client_cmd.name - assert callable(client_cmd.command) # Lambda for cross-component refs + assert isinstance(client_cmd.script, EvaluatorClientScript) + assert callable(client_cmd.script.inline) # Lambda for cross-component refs # Verify judge server command in second group judge_cmd = judge_group.commands[0] assert isinstance(judge_cmd, Command) assert "judge-server" in judge_cmd.name - assert judge_cmd.gpus == 32 + assert isinstance(judge_cmd.script, ServerScript) + assert judge_cmd.script.num_gpus == 32 @patch("nemo_skills.pipeline.nemo_evaluator.Pipeline")