diff --git a/.github/workflows/gpu_tests.yml b/.github/workflows/gpu_tests.yml index a500fc59b2..16f77633a8 100644 --- a/.github/workflows/gpu_tests.yml +++ b/.github/workflows/gpu_tests.yml @@ -52,15 +52,7 @@ jobs: cd ${{ github.run_id }} nvidia-smi set -o pipefail # this will make sure next line returns non-0 exit code if tests fail - # Run heartbeat in background, capture its PID, and ensure cleanup - (while true; do sleep 60; echo "[HEARTBEAT] $(date '+%Y-%m-%d %H:%M:%S') - still running..."; done) & - HEARTBEAT_PID=$! - # Run tests and capture exit code - EXIT_CODE=0 - ./tests/gpu-tests/run_qwen.sh || EXIT_CODE=$? - # Kill heartbeat and exit with test result - kill $HEARTBEAT_PID 2>/dev/null || true - exit $EXIT_CODE + ./tests/gpu-tests/run_qwen.sh - name: Cleanup if: always() run: | diff --git a/nemo_skills/pipeline/generate.py b/nemo_skills/pipeline/generate.py index 90ec987bca..f33796d05c 100644 --- a/nemo_skills/pipeline/generate.py +++ b/nemo_skills/pipeline/generate.py @@ -14,7 +14,7 @@ import importlib import logging import os -from typing import Dict, List, Optional +from typing import Callable, Dict, List, Optional import typer @@ -23,17 +23,14 @@ from nemo_skills.inference import GENERATION_MODULE_MAP, GenerationType from nemo_skills.pipeline.app import app, typer_unpacker from nemo_skills.pipeline.utils.cluster import parse_kwargs +from nemo_skills.pipeline.utils.commands import sandbox_command from nemo_skills.pipeline.utils.declarative import ( Command, CommandGroup, HardwareConfig, Pipeline, ) -from nemo_skills.pipeline.utils.scripts import ( - GenerationClientScript, - SandboxScript, - ServerScript, -) +from nemo_skills.pipeline.utils.server import get_free_port from nemo_skills.utils import ( compute_chunk_ids, get_logger_name, @@ -47,160 +44,118 @@ # TODO: add num_jobs here for consistency with eval? -def _create_job_unified( - models: List[str], - server_configs: List[Optional[Dict]], - generation_params: Dict, +def _create_commandgroup_from_config( + generation_cmd: str, + server_config: Optional[Dict], + with_sandbox: bool, + sandbox_port: Optional[int], cluster_config: Dict, installation_command: Optional[str], - with_sandbox: bool, + get_server_command_fn: Callable, partition: Optional[str], keep_mounts_for_sandbox: bool, task_name: str, log_dir: str, sbatch_kwargs: Optional[Dict] = None, sandbox_env_overrides: Optional[List[str]] = None, -) -> List[CommandGroup]: - """ - Create CommandGroups for n models (unified for n=1 and n>1). - - Structure: - - Group 0: Model 0 server + client + (optional sandbox) - - Group 1: Model 1 server (if n>1) - - Group N: Model N server (if n>1) - - For n=1, returns a single-element list. The Pipeline automatically - optimizes single-group lists to efficient single-group jobs. - - Args: - models: List of model paths - server_configs: List of server configurations (one per model, None if not hosting) - generation_params: Dict of parameters for generation (output_dir, etc.) - cluster_config: Cluster configuration - installation_command: Installation command to run before client - with_sandbox: Whether to include sandbox - partition: Slurm partition - keep_mounts_for_sandbox: Whether to keep mounts for sandbox - task_name: Name for the task - log_dir: Directory for logs - sbatch_kwargs: Additional sbatch kwargs - - Returns: - List of CommandGroup objects (one per het group) - """ - num_models = len(models) - groups = [] - server_scripts = [] # Track server Script objects for cross-component references - - for model_idx, (model_path, server_config) in enumerate(zip(models, server_configs)): - components = [] - server_script = None - - # Track GPU/node requirements for this group (from server config) - group_gpus = 0 - group_nodes = 1 - - # 1. Add server if needed - if server_config is not None and int(server_config.get("num_gpus", 0)) > 0: - server_type = server_config["server_type"] - server_container = server_config.get("container") or cluster_config["containers"][server_type] +) -> CommandGroup: + """Create a CommandGroup from server_config. - # Create ServerScript - server_script = ServerScript( - server_type=server_type, - model_path=server_config["model_path"], - cluster_config=cluster_config, - num_gpus=server_config["num_gpus"], - num_nodes=server_config["num_nodes"], - server_args=server_config.get("server_args", ""), - server_entrypoint=server_config.get("server_entrypoint"), - port=server_config.get("server_port"), - allocate_port=(server_config.get("server_port") is None), - ) + Component ordering: + 1. Server (if server_config provided) + 2. Client command + 3. Sandbox (if with_sandbox=True) + """ - # Set group GPU/node requirements from server config - group_gpus = server_config["num_gpus"] - group_nodes = server_config["num_nodes"] + components = [] - server_cmd = Command( - script=server_script, - container=server_container, - name=f"{task_name}_model_{model_idx}_server" if num_models > 1 else f"{task_name}_server", - ) - components.append(server_cmd) - server_scripts.append(server_script) + # 1. Add server if server_config is provided + if server_config is not None and int(server_config["num_gpus"]) > 0: + server_type = server_config["server_type"] + # Get container from server_config if provided, otherwise fall back to cluster config + if "container" in server_config: + server_container = server_config.pop("container") else: - # No server for this model (pre-hosted) - server_scripts.append(None) - - # 2. Group 0 gets the client and sandbox - if model_idx == 0: - # Create sandbox script (if with_sandbox) - sandbox_script = None - if with_sandbox: - sandbox_script = SandboxScript( - cluster_config=cluster_config, - keep_mounts=keep_mounts_for_sandbox, - allocate_port=True, # Always allocate port for sandbox - env_overrides=sandbox_env_overrides, - ) - - sandbox_cmd = Command( - script=sandbox_script, - container=cluster_config["containers"]["sandbox"], - name=f"{task_name}_sandbox", - ) - components.append(sandbox_cmd) - - # Create client script with cross-component references to all servers - client_script = GenerationClientScript( - output_dir=generation_params["output_dir"], - input_file=generation_params.get("input_file"), - input_dir=generation_params.get("input_dir"), - extra_arguments=generation_params.get("extra_arguments", ""), - random_seed=generation_params.get("random_seed"), - chunk_id=generation_params.get("chunk_id"), - num_chunks=generation_params.get("num_chunks"), - preprocess_cmd=generation_params.get("preprocess_cmd"), - postprocess_cmd=generation_params.get("postprocess_cmd"), - wandb_parameters=generation_params.get("wandb_parameters"), - with_sandbox=with_sandbox, - script=generation_params.get("script", "nemo_skills.inference.generate"), - # Multi-server support (works for single and multi-model) - servers=server_scripts if server_scripts else None, - server_addresses_prehosted=generation_params.get("server_addresses_prehosted"), - model_names=generation_params.get("model_names"), - server_types=generation_params.get("server_types"), - sandbox=sandbox_script, - installation_command=installation_command, - ) + server_container = cluster_config["containers"][server_type] - client_cmd = Command( - script=client_script, - container=cluster_config["containers"]["nemo-skills"], - name=f"{task_name}", - ) - components.append(client_cmd) + # Call server command builder directly with cluster_config + cmd, num_tasks = get_server_command_fn(**server_config, cluster_config=cluster_config) - # Only create group if it has components (skip empty groups for pre-hosted models) - if components: - group_tasks = server_script.num_tasks if (server_config and server_script) else 1 + # Create metadata dict + metadata = { + "num_tasks": num_tasks, + "gpus": server_config["num_gpus"], + "nodes": server_config["num_nodes"], + "log_prefix": "server", + } - group = CommandGroup( - commands=components, - hardware=HardwareConfig( - partition=partition, - num_gpus=group_gpus, - num_nodes=group_nodes, - num_tasks=group_tasks, - sbatch_kwargs=sbatch_kwargs, - ), - name=f"{task_name}_model_{model_idx}_group" if num_models > 1 else task_name, - log_dir=log_dir, - ) - groups.append(group) + server_cmd = Command( + command=cmd, + container=server_container, + gpus=server_config["num_gpus"], + nodes=server_config["num_nodes"], + name=task_name, + metadata=metadata, + ) + components.append(server_cmd) + + # 2. Add main generation command + # Note: General cluster config env vars are automatically added by get_env_variables() in get_executor() + client_env = {} + if with_sandbox and sandbox_port is not None: + client_env["NEMO_SKILLS_SANDBOX_PORT"] = str(sandbox_port) + + client_cmd = Command( + command=generation_cmd, + container=cluster_config["containers"]["nemo-skills"], + name=task_name, + installation_command=installation_command, + metadata={ + "log_prefix": "main", + "environment": client_env, + }, + ) + components.append(client_cmd) + + # 3. Add sandbox if requested + if with_sandbox: + # Call sandbox command builder directly with cluster_config + cmd, metadata = sandbox_command(cluster_config=cluster_config, port=sandbox_port) + metadata["log_prefix"] = "sandbox" + + # Apply user-specified environment overrides for the sandbox + if sandbox_env_overrides: + sandbox_env = metadata.get("environment", {}) + for override in sandbox_env_overrides: + key, value = override.split("=", 1) + sandbox_env[key] = value + metadata["environment"] = sandbox_env + + sandbox_cmd = Command( + command=cmd, + container=cluster_config["containers"]["sandbox"], + name=task_name, + metadata=metadata, + ) - return groups + components.append(sandbox_cmd) + + # Find maximum GPUs/nodes needed by any component for the HardwareConfig + # The job-level resource request must be the maximum across all components + max_gpus = max((comp.gpus or 0) for comp in components) + max_nodes = max((comp.nodes or 1) for comp in components) + + return CommandGroup( + commands=components, + hardware=HardwareConfig( + partition=partition, + num_gpus=max_gpus, + num_nodes=max_nodes, + sbatch_kwargs=sbatch_kwargs, + ), + name=task_name, + log_dir=log_dir, + ) @app.command(context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) @@ -231,45 +186,21 @@ def generate( "If not specified, will use the registered generation module for the " "generation type (which is required in this case).", ), - model: List[str] = typer.Option( - None, - help="Path to the model(s). CLI: space-separated. Python API: string or list. " - "Single value broadcasts to all models for multi-model generation.", - ), - server_address: List[str] = typer.Option( - None, - help="Server address(es). CLI: space-separated. Python API: string or list. " - "Single value broadcasts to all models.", - ), - server_type: List[pipeline_utils.SupportedServers] = typer.Option( - ..., - help="Server type(s). CLI: space-separated. Python API: string or list. " - "Single value broadcasts to all models.", + model: str = typer.Option(None, help="Path to the model or model name in API"), + server_address: str = typer.Option( + None, help="Use ip:port for self-hosted models or the API url if using model providers" ), - server_gpus: List[int] = typer.Option( + server_type: pipeline_utils.SupportedServers = typer.Option(..., help="Type of server to use"), + server_gpus: int = typer.Option(None, help="Number of GPUs to use if hosting the model"), + server_nodes: int = typer.Option(1, help="Number of nodes required for hosting LLM server"), + server_args: str = typer.Option("", help="Any extra arguments to pass to the server"), + server_entrypoint: str = typer.Option( None, - help="Number of GPUs per model. CLI: space-separated ints. Python API: int or list. " - "Single value broadcasts to all models.", + help="Path to the entrypoint of the server. " + "If not specified, will use the default entrypoint for the server type.", ), - server_nodes: List[int] = typer.Option( - [1], - help="Number of nodes per model. CLI: space-separated ints. Python API: int or list. " - "Single value broadcasts to all models.", - ), - server_args: List[str] = typer.Option( - [""], - help="Server arguments per model. CLI: space-separated. Python API: string or list. " - "Single value broadcasts to all models.", - ), - server_entrypoint: List[str] = typer.Option( - None, - help="Server entrypoint(s). CLI: space-separated. Python API: string or list. " - "Single value broadcasts to all models.", - ), - server_container: List[str] = typer.Option( - None, - help="Container image(s). CLI: space-separated. Python API: string or list. " - "Single value broadcasts to all models.", + server_container: str = typer.Option( + None, help="Override container image for the hosted server (if server_gpus is set)" ), dependent_jobs: int = typer.Option(0, help="Specify this to launch that number of dependent jobs"), mount_paths: str = typer.Option(None, help="Comma separated list of paths to mount on the remote machine"), @@ -365,18 +296,7 @@ def generate( None, help="Internal option to specify task dependencies.", hidden=True ), ): - """Generate LLM completions for single or multiple models. - - Supports both single-model and multi-model generation through a unified interface. - - Parameter Types: - Multi-model parameters (model, server_*, etc.) use List[T] type hints for Typer CLI - compatibility, but accept both scalars and lists when called from Python: - - CLI: --model m1 m2 (space-separated) → Typer converts to ["m1", "m2"] - - Python API: model="m1" or model=["m1", "m2"] → Both work (normalized internally) - - Single values broadcast to all models: server_gpus=8 → [8, 8, 8] for 3 models - - Multi-model usage requires either --generation-type or --generation-module. + """Generate LLM completions for a given input file. Run `python -m nemo_skills.inference.generate --help` for other supported arguments (need to be prefixed with ++, since we use Hydra for that script). @@ -386,42 +306,10 @@ def generate( LOG.info("Starting generation job") LOG.info("Extra arguments that will be passed to the underlying script: %s", extra_arguments) - # Normalize model configuration to list - models_list = pipeline_utils.normalize_models_config(model) - num_models = len(models_list) - - LOG.info(f"Number of models: {num_models}") - for model_idx, model_name in enumerate(models_list): - LOG.info(f" Model {model_idx}: {model_name}") - - # Convert server_type enum values to strings - def convert_server_type_to_string(server_type): - return server_type.value if hasattr(server_type, "value") else server_type - - if isinstance(server_type, list): - server_type = [convert_server_type_to_string(st) for st in server_type] - else: - server_type = convert_server_type_to_string(server_type) - - # Normalize all server parameters to per-model lists - server_types_list = pipeline_utils.normalize_parameter(server_type, num_models, "server_type") - server_gpus_list = pipeline_utils.normalize_parameter(server_gpus, num_models, "server_gpus") - server_nodes_list = pipeline_utils.normalize_parameter(server_nodes, num_models, "server_nodes") - server_args_list = pipeline_utils.normalize_parameter(server_args, num_models, "server_args") - server_entrypoints_list = pipeline_utils.normalize_parameter(server_entrypoint, num_models, "server_entrypoint") - server_containers_list = pipeline_utils.normalize_parameter(server_container, num_models, "server_container") - - if server_address is not None: - server_addresses_list = pipeline_utils.normalize_parameter(server_address, num_models, "server_address") - else: - server_addresses_list = [None] * num_models - - # Validate multi-model requirements - if num_models > 1: - if generation_type is None and generation_module is None: - raise ValueError( - "Multi-model generation requires either --generation-type or --generation-module to be specified" - ) + try: + server_type = server_type.value + except AttributeError: + pass if log_samples: wandb_parameters = { @@ -437,6 +325,8 @@ def convert_server_type_to_string(server_type): else: wandb_parameters = None + get_random_port = pipeline_utils.should_get_random_port(server_gpus, exclusive) + if random_seeds and num_random_seeds: raise ValueError("Cannot specify both random_seeds and num_random_seeds") if num_random_seeds: @@ -465,6 +355,8 @@ def convert_server_type_to_string(server_type): check_mounted_paths=check_mounted_paths, ) + original_server_address = server_address + if generation_module is not None and generation_type is not None: raise ValueError("Cannot specify both generation_module and generation_type. ") if generation_module is None: @@ -515,36 +407,36 @@ def convert_server_type_to_string(server_type): chunk_id=None, ) for chunk_id in chunk_ids: - # Configure clients for each model - server_configs = [] - server_addresses_resolved = [] - # For single model: configure_client returns extra_args with server config appended - # For multi-model: use original extra_args (server config added as lists in get_generation_cmd) - extra_arguments = extra_arguments_original - - for model_idx in range(num_models): - get_random_port_for_server = pipeline_utils.should_get_random_port( - server_gpus_list[model_idx], exclusive - ) - - srv_config, srv_address, srv_extra_args = pipeline_utils.configure_client( - model=models_list[model_idx], - server_type=server_types_list[model_idx], - server_address=server_addresses_list[model_idx], - server_gpus=server_gpus_list[model_idx], - server_nodes=server_nodes_list[model_idx], - server_args=server_args_list[model_idx], - server_entrypoint=server_entrypoints_list[model_idx], - server_container=server_containers_list[model_idx], - extra_arguments=extra_arguments_original if model_idx == 0 else "", - get_random_port=get_random_port_for_server, - ) - server_configs.append(srv_config) - server_addresses_resolved.append(srv_address) + # Configure client (same as before) + server_config, server_address, extra_arguments = pipeline_utils.configure_client( + model=model, + server_type=server_type, + server_address=original_server_address, + server_gpus=server_gpus, + server_nodes=server_nodes, + server_args=server_args, + server_entrypoint=server_entrypoint, + server_container=server_container, + extra_arguments=extra_arguments_original, + get_random_port=get_random_port, + ) - # For single model, capture the extra_args with server config from configure_client - if model_idx == 0 and num_models == 1: - extra_arguments = srv_extra_args + # Build generation command (same as before) + cmd = pipeline_utils.get_generation_cmd( + input_file=input_file, + input_dir=input_dir, + random_seed=seed, + output_dir=output_dir, + extra_arguments=extra_arguments, + chunk_id=chunk_id, + num_chunks=num_chunks, + preprocess_cmd=preprocess_cmd, + postprocess_cmd=postprocess_cmd, + wandb_parameters=wandb_parameters if seed_idx == 0 else None, + script=generation_module, + with_sandbox=with_sandbox, + ) + cmd = pipeline_utils.wrap_python_path(cmd=cmd) # Base task name (shared across all dependent jobs in the chain) task_name = f"{expname}-rs{seed}" if seed is not None else expname @@ -556,35 +448,22 @@ def convert_server_type_to_string(server_type): prev_job = None for dep_idx in range(dependent_jobs + 1): - # Build generation parameters dict for Script - generation_params = { - "output_dir": output_dir, - "input_file": input_file, - "input_dir": input_dir, - "extra_arguments": extra_arguments, - "random_seed": seed, - "chunk_id": chunk_id, - "num_chunks": num_chunks, - "preprocess_cmd": preprocess_cmd, - "postprocess_cmd": postprocess_cmd, - "wandb_parameters": wandb_parameters if seed_idx == 0 else None, - "script": generation_module, - # Multi-model specific fields - "server_addresses_prehosted": server_addresses_resolved, - "model_names": models_list, - "server_types": server_types_list, - } + # Allocate sandbox port if needed + # This must be done BEFORE creating CommandGroup so client knows the port + if with_sandbox: + current_sandbox_port = get_free_port(strategy="random") if get_random_port else 6000 + else: + current_sandbox_port = None - # Create CommandGroup(s) using Script objects - # For multi-model, this creates multiple CommandGroups (one per model + one for client) - # For single-model, this creates a single CommandGroup - job_groups = _create_job_unified( - models=models_list, - server_configs=[cfg.copy() if cfg else None for cfg in server_configs], - generation_params=generation_params, + # Create CommandGroup for this task + cmd_group = _create_commandgroup_from_config( + generation_cmd=cmd, + server_config=server_config.copy() if server_config else None, + with_sandbox=with_sandbox, + sandbox_port=current_sandbox_port, cluster_config=cluster_config, installation_command=installation_command, - with_sandbox=with_sandbox, + get_server_command_fn=generation_task.get_server_command_fn(), partition=partition, keep_mounts_for_sandbox=keep_mounts_for_sandbox, task_name=task_name, @@ -608,16 +487,11 @@ def convert_server_type_to_string(server_type): # Subsequent jobs in chain depend on previous job (use job object, not string) job_deps = [prev_job] - # For multi-group jobs, use "groups" key; for single-group, use "group" key job_spec = { "name": internal_job_name, + "group": cmd_group, "dependencies": job_deps, } - if len(job_groups) > 1: - job_spec["groups"] = job_groups - else: - job_spec["group"] = job_groups[0] - jobs.append(job_spec) prev_job = job_spec # Track for next iteration diff --git a/nemo_skills/pipeline/nemo_evaluator.py b/nemo_skills/pipeline/nemo_evaluator.py index 39838737ed..020162692a 100644 --- a/nemo_skills/pipeline/nemo_evaluator.py +++ b/nemo_skills/pipeline/nemo_evaluator.py @@ -89,7 +89,7 @@ import copy import logging -from dataclasses import dataclass, field +from dataclasses import dataclass from pathlib import Path from typing import Dict, List, Optional @@ -97,12 +97,12 @@ from nemo_evaluator_launcher.api import RunConfig from nemo_evaluator_launcher.common.helpers import get_eval_factory_command from nemo_evaluator_launcher.common.mapping import get_task_from_mapping, load_tasks_mapping -from omegaconf import OmegaConf +from omegaconf import DictConfig, OmegaConf import nemo_skills.pipeline.utils as pipeline_utils from nemo_skills.pipeline.app import app, typer_unpacker +from nemo_skills.pipeline.utils.commands import vllm_server_command from nemo_skills.pipeline.utils.declarative import Command, CommandGroup, HardwareConfig, Pipeline -from nemo_skills.pipeline.utils.scripts import BaseJobScript, ServerScript from nemo_skills.utils import get_logger_name, setup_logging LOG = logging.getLogger(get_logger_name(__file__)) @@ -289,8 +289,8 @@ def nemo_evaluator( expname=expname, idx=idx, task_name=task.name, - launcher_run_cfg=OmegaConf.to_container(launcher_run_cfg, resolve=True), - task_cfg=OmegaConf.to_container(task, resolve=True), + launcher_run_cfg=launcher_run_cfg, + task_cfg=task, task_definition=task_definition, base_output_root=base_output_root, eval_image=eval_image, @@ -443,8 +443,10 @@ def _create_serving_command_obj( idx: int, task_name: str, ) -> Command: - """Create a `Command` backed by a `ServerScript` for a hosted serving component. + """Create a Command object for a hosted serving component (main or judge server). + This function wraps vllm_server_command and standardizes container selection, + logging prefixes, and metadata for both main and judge servers. Args: cluster_config: Cluster configuration dictionary @@ -462,53 +464,54 @@ def _create_serving_command_obj( task_name: Task name for naming Returns: - Command: A Command object whose `script` is a configured `ServerScript`. + Command object configured for the serving component """ stype = (server_type or "vllm").lower() + sargs = args or "" if stype != "vllm": LOG.warning("Only vllm server_type is supported currently; got %s", stype) - server_script = ServerScript( - server_type=stype, - model_path=model or "", + cmd_str, meta = vllm_server_command( cluster_config=cluster_config, - num_gpus=gpus, - num_nodes=nodes or 1, - server_args=args or "", - server_entrypoint=entrypoint, + model=model, # type: ignore[arg-type] port=port, - allocate_port=port is None, + server_type=stype, + gpus=gpus, + nodes=nodes, + args=sargs, + entrypoint=entrypoint, ) - # Judge servers get a distinct log prefix for clarity - if is_judge: - server_script.log_prefix = "judge-server" - + # Resolve container fallback when not explicitly provided if not container: container = cluster_config["containers"][stype] + log_prefix = "judge-server" if is_judge else "server" name_role = "judge-server" if is_judge else "server" return Command( - script=server_script, + command=cmd_str, container=container, + gpus=gpus, + nodes=nodes or 1, name=f"{expname}-{name_role}-{idx}-{task_name}", + metadata={ + **meta, + "gpus": gpus, + "log_prefix": log_prefix, + }, ) @dataclass class _TaskCreationContext: - """Local helper to pass around the information about the task and easier logic sharing. - - Note: launcher_run_cfg and task_cfg are stored as plain dicts (not OmegaConf) to allow - serialization by nemo_run/fiddle. Convert back to DictConfig if OmegaConf operations are needed. - """ + """Local helper to pass around the information about the task and easier logic sharing.""" expname: str idx: int task_name: str - launcher_run_cfg: dict # Stored as plain dict for serialization compatibility - task_cfg: dict # Stored as plain dict for serialization compatibility + launcher_run_cfg: RunConfig + task_cfg: DictConfig task_definition: dict base_output_root: Optional[str] eval_image: str @@ -627,7 +630,12 @@ def _build_judge_server_if_needed(ctx: _TaskCreationContext) -> Optional[Command def _build_client_command( ctx: _TaskCreationContext, main_server_cmd: Optional[Command], judge_server_cmd: Optional[Command] ) -> Command: - """Create the evaluator client `Command` using `EvaluatorClientScript`. + """Build Command for evaluator client. + + The client command behavior depends on server hosting: + - If servers are co-hosted: Uses lambda factory to resolve runtime URLs via hostname_ref/meta_ref + - If using external servers: Uses static URLs from server_base_url/judge_server_base_url + - If no servers: Uses URLs from evaluator config or defaults Args: ctx: Task creation context with all configuration @@ -635,26 +643,100 @@ def _build_client_command( judge_server_cmd: Judge server Command if self-hosted, None otherwise Returns: - Command: A Command whose script builds the evaluator CLI at runtime + Command object for evaluator client """ + if ctx.hosting_server or ctx.hosting_judge: + # Co-hosted servers: Use lambda factory to resolve runtime URLs + # The lambda is evaluated at execution time when het_group_index is assigned + def _client_cmd_factory(): + waits: List[str] = [] + target_url: Optional[str] = None + judge_url: Optional[str] = None - client_script = EvaluatorClientScript( - ctx=ctx, - main_server_script=main_server_cmd.script if main_server_cmd else None, - judge_server_script=judge_server_cmd.script if judge_server_cmd else None, + # Build main server URL from runtime references + if ctx.hosting_server and main_server_cmd is not None: + server_host = main_server_cmd.hostname_ref() + server_port_val = main_server_cmd.meta_ref("port") + base_url = f"http://{server_host}:{server_port_val}" + waits.append(pipeline_utils.get_server_wait_cmd(f"{base_url}{ctx.server_health_path}")) + target_url = f"{base_url}{ctx.server_api_path}" + + # Build judge server URL from runtime references + if ctx.hosting_judge and judge_server_cmd is not None: + jhost = judge_server_cmd.hostname_ref() + jport = judge_server_cmd.meta_ref("port") + jbase = f"http://{jhost}:{jport}" + waits.append(pipeline_utils.get_server_wait_cmd(f"{jbase}{ctx.judge_server_health_path}")) + judge_url = f"{jbase}{ctx.judge_server_api_path}" + + # Wait for servers to be ready, then run evaluator + wait_cmd = " && ".join(waits) if waits else "true" + cmd = _build_task_cmd( + task_name=ctx.task_name, + launcher_run_cfg=ctx.launcher_run_cfg, + task_cfg=ctx.task_cfg, + task_definition=ctx.task_definition, + expname=ctx.expname, + base_output_root=ctx.base_output_root, + url_override=target_url, + model_id=ctx.server_model, + judge_url_override=judge_url, + judge_model_id=ctx.judge_server_model, + ) + return f"{wait_cmd} && {cmd}" + + return Command( + command=_client_cmd_factory, + container=ctx.eval_image, + gpus=ctx.job_gpus or None, + nodes=ctx.job_nodes or 1, + name=f"{ctx.expname}-client-{ctx.idx}-{ctx.task_name}", + metadata={ + "log_prefix": "main", + "environment": ctx.env_vars, + "gpus": ctx.job_gpus or None, + }, + ) + + # No hosted servers: Use external URLs or config defaults + server_url = None + if ctx.with_external_server and ctx.server_base_url: + server_url = ctx.server_base_url.rstrip("/") + ctx.server_api_path + judge_url = None + if ctx.with_external_judge and ctx.judge_server_base_url: + judge_url = ctx.judge_server_base_url.rstrip("/") + ctx.judge_server_api_path + + eval_cmd = _build_task_cmd( + task_name=ctx.task_name, + launcher_run_cfg=ctx.launcher_run_cfg, + task_cfg=ctx.task_cfg, + task_definition=ctx.task_definition, + expname=ctx.expname, + base_output_root=ctx.base_output_root, + url_override=server_url, + model_id=ctx.server_model, + judge_url_override=judge_url, + judge_model_id=ctx.judge_server_model, ) return Command( - script=client_script, + command=eval_cmd, container=ctx.eval_image, - name=f"{ctx.expname}-client-{ctx.idx}-{ctx.task_name}", + gpus=None, + nodes=ctx.job_nodes or 1, + name=f"{ctx.expname}-{ctx.idx}-{ctx.task_name}", + metadata={ + "log_prefix": "main", + "environment": ctx.env_vars, + "gpus": ctx.job_gpus or None, + }, ) def _build_task_cmd( task_name: str, - launcher_run_cfg: dict, - task_cfg: dict, + launcher_run_cfg: DictConfig, + task_cfg: DictConfig, task_definition: dict, expname: str, base_output_root: Optional[str], @@ -670,8 +752,8 @@ def _build_task_cmd( Args: task_name: Task identifier (e.g., "ifeval", "gpqa_diamond") - launcher_run_cfg: Global evaluator configuration (as plain dict) - task_cfg: Task-specific configuration (as plain dict, may include task-level overrides) + launcher_run_cfg: Global evaluator configuration from RunConfig + task_cfg: Task-specific configuration (may include task-level overrides) task_definition: Task definition from mapping (container, harness info) expname: Experiment name for output directory structure base_output_root: Base directory for task outputs @@ -689,9 +771,7 @@ def _build_task_cmd( - Judge: config.params.extra.judge.url Output directory is set to: {base_output_root}/{expname}/nemo-evaluator-results/{task_name} """ - # Convert back to DictConfig for OmegaConf operations - launcher_run_cfg = OmegaConf.create(launcher_run_cfg) - task_cfg_copy = OmegaConf.create(copy.deepcopy(task_cfg)) + task_cfg_copy = copy.deepcopy(task_cfg) if url_override: OmegaConf.update(task_cfg_copy, "overrides", {"target.api_endpoint.url": url_override}, force_add=True) @@ -726,56 +806,3 @@ def _build_task_cmd( cmd_struct = get_eval_factory_command(launcher_run_cfg, task_cfg_copy, task_definition) return cmd_struct.cmd - - -@dataclass(kw_only=True) -class EvaluatorClientScript(BaseJobScript): - """run.Script implementation for nemo-evaluator client with runtime server resolution.""" - - ctx: _TaskCreationContext - main_server_script: Optional[ServerScript] = None - judge_server_script: Optional[ServerScript] = None - log_prefix: str = field(default="main", init=False) - - def __post_init__(self): - def build_command(): - waits: List[str] = [] - target_url: Optional[str] = None - judge_url: Optional[str] = None - - if self.ctx.hosting_server and self.main_server_script is not None: - server_host = self.main_server_script.hostname_ref() - base_url = f"http://{server_host}:{self.main_server_script.port}" - waits.append(pipeline_utils.get_server_wait_cmd(f"{base_url}{self.ctx.server_health_path}")) - target_url = f"{base_url}{self.ctx.server_api_path}" - elif self.ctx.with_external_server and self.ctx.server_base_url: - target_url = self.ctx.server_base_url.rstrip("/") + self.ctx.server_api_path - - if self.ctx.hosting_judge and self.judge_server_script is not None: - judge_host = self.judge_server_script.hostname_ref() - judge_base = f"http://{judge_host}:{self.judge_server_script.port}" - waits.append(pipeline_utils.get_server_wait_cmd(f"{judge_base}{self.ctx.judge_server_health_path}")) - judge_url = f"{judge_base}{self.ctx.judge_server_api_path}" - elif self.ctx.with_external_judge and self.ctx.judge_server_base_url: - judge_url = self.ctx.judge_server_base_url.rstrip("/") + self.ctx.judge_server_api_path - - cmd = _build_task_cmd( - task_name=self.ctx.task_name, - launcher_run_cfg=self.ctx.launcher_run_cfg, - task_cfg=self.ctx.task_cfg, - task_definition=self.ctx.task_definition, - expname=self.ctx.expname, - base_output_root=self.ctx.base_output_root, - url_override=target_url, - model_id=self.ctx.server_model, - judge_url_override=judge_url, - judge_model_id=self.ctx.judge_server_model, - ) - - wait_cmd = " && ".join(waits) if waits else None - final_cmd = f"{wait_cmd} && {cmd}" if wait_cmd else cmd - env_vars = copy.deepcopy(self.ctx.env_vars) - return final_cmd, {"environment": env_vars} - - self.set_inline(build_command) - super().__post_init__() diff --git a/nemo_skills/pipeline/utils/__init__.py b/nemo_skills/pipeline/utils/__init__.py index 3e738a530f..1e470f3539 100644 --- a/nemo_skills/pipeline/utils/__init__.py +++ b/nemo_skills/pipeline/utils/__init__.py @@ -49,8 +49,6 @@ get_chunked_rs_filename, get_generation_cmd, get_remaining_jobs, - normalize_models_config, - normalize_parameter, wrap_cmd, ) from nemo_skills.pipeline.utils.mounts import ( diff --git a/nemo_skills/pipeline/utils/declarative.py b/nemo_skills/pipeline/utils/declarative.py index 51d0746c63..e294a3ed82 100644 --- a/nemo_skills/pipeline/utils/declarative.py +++ b/nemo_skills/pipeline/utils/declarative.py @@ -12,71 +12,39 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import annotations - -import logging -from contextlib import nullcontext -from dataclasses import dataclass -from typing import Dict, List, Optional, Tuple, Union - -import nemo_run as run - -from nemo_skills.pipeline.utils import ( - get_env_variables, - get_executor, - get_exp, - get_exp_handles, - get_registered_external_repo, - get_tunnel, - run_exp, - temporary_env_update, -) -from nemo_skills.pipeline.utils.exp import ( - REUSE_CODE_EXP, - get_packaging_job_key, - tunnel_hash, -) -from nemo_skills.pipeline.utils.mounts import is_mounted_filepath -from nemo_skills.pipeline.utils.server import wrap_python_path -from nemo_skills.utils import get_logger_name - """ -Simplified declarative pipeline system using Command with run.Script objects. +Simplified declarative pipeline system using only Command for all task types. Basic Example (Single job with multiple commands): - from nemo_skills.pipeline.utils.scripts import ServerScript, SandboxScript, GenerationClientScript + from nemo_skills.pipeline.utils.commands import vllm_server_command, sandbox_command from nemo_skills.pipeline.utils.declarative import Command, CommandGroup, HardwareConfig, Pipeline - - # Create Script objects for server and sandbox - # Scripts handle port allocation, cross-component references, and command building - server_script = ServerScript( - server_type="vllm", - model_path="Qwen/Qwen2.5-Math-7B-Instruct", - server_args="--tensor-parallel-size 1" + from nemo_skills.pipeline.utils.server import get_free_port + + # Allocate ports for server and sandbox + server_port = get_free_port(strategy="random") + sandbox_port = get_free_port(strategy="random") + + # Commands that run together in one SLURM job + # Note: Lambdas are needed for cross-component references (hostname_ref, meta_ref) + # which aren't resolved until het_group_index is assigned at pipeline execution time. + server_cmd, server_meta = vllm_server_command(cluster_cfg, model="Qwen/Qwen3-8B", port=server_port) + server = Command(command=server_cmd, gpus=8, name="server", metadata=server_meta) + + sandbox_cmd, sandbox_meta = sandbox_command(cluster_cfg, port=sandbox_port) + sandbox = Command(command=sandbox_cmd, name="sandbox", metadata=sandbox_meta) + + # This lambda is ESSENTIAL - server.hostname_ref() and meta_ref() aren't available until runtime + # Client needs NEMO_SKILLS_SANDBOX_PORT to connect to sandbox + client = Command( + command=lambda: f"curl {server.hostname_ref()}:{server.meta_ref('port')}/health", + name="client", + metadata={"environment": {"NEMO_SKILLS_SANDBOX_PORT": str(sandbox_port)}} ) - sandbox_script = SandboxScript() - - # Create generation client that references server and sandbox - # Cross-component references (hostname_ref, port) are resolved at runtime - client_script = GenerationClientScript( - output_dir="/results/inference", - extra_arguments="++prompt_config=math ++split=test", - servers=[server_script], # References server for hostname/port - model_names=["Qwen/Qwen2.5-Math-7B-Instruct"], - server_types=["vllm"], - sandbox=sandbox_script, # References sandbox for port - with_sandbox=True, - ) - - # Wrap Scripts in Commands with container and resource info - server = Command(script=server_script, container="vllm", name="server") - sandbox = Command(script=sandbox_script, container="nemo-skills", name="sandbox") - client = Command(script=client_script, container="nemo-skills", name="client") - # Group them together (they run in one SLURM job) + # Group them together inference_group = CommandGroup( commands=[server, sandbox, client], - hardware=HardwareConfig(partition="batch", num_gpus=1), + hardware=HardwareConfig(partition="batch"), name="inference" ) @@ -89,27 +57,13 @@ pipeline.run() Advanced Example (Multiple jobs with dependencies and heterogeneous components): - from nemo_skills.pipeline.utils.scripts import ServerScript, SandboxScript, GenerationClientScript - from nemo_run import Script - log_dir = "/experiments/full_pipeline/logs" - - # Job 1: Preprocessing with custom Script - @dataclass(kw_only=True) - class PreprocessScript(Script): - input_file: str - output_file: str - - def __post_init__(self): - cmd = f"python preprocess.py --input {self.input_file} --output {self.output_file}" - self.inline = cmd - object.__setattr__(self, 'entrypoint', 'bash') - - preprocess_script = PreprocessScript( - input_file="data.jsonl", - output_file="processed.jsonl" + # Job 1: Preprocessing + preprocess = Command( + command="python preprocess.py --input data.jsonl --output processed.jsonl", + gpus=0, + name="preprocess" ) - preprocess = Command(script=preprocess_script, name="preprocess") prep_group = CommandGroup( commands=[preprocess], hardware=HardwareConfig(partition="cpu"), @@ -118,76 +72,39 @@ def __post_init__(self): ) prep_job = {"name": "prep", "group": prep_group} - # Job 2: Two different model servers (HETEROGENEOUS SLURM job with 2 het groups) - # 8B model group - server_8b = ServerScript( - server_type="vllm", - model_path="Qwen/Qwen2.5-Math-7B-Instruct", - server_args="--tensor-parallel-size 1" - ) - sandbox_8b = SandboxScript() - client_8b = GenerationClientScript( - output_dir="/results/eval_8b", - extra_arguments="++prompt_config=math", - servers=[server_8b], - model_names=["Qwen/Qwen2.5-Math-7B-Instruct"], - server_types=["vllm"], - sandbox=sandbox_8b, - with_sandbox=True, - ) + # Job 2: Two different model servers (HETEROGENEOUS SLURM job with 2 het components) + # Allocate ports for each server/sandbox pair + from nemo_skills.pipeline.utils.server import get_free_port + server_8b_port = get_free_port(strategy="random") + sandbox_8b_port = get_free_port(strategy="random") + server_32b_port = get_free_port(strategy="random") + sandbox_32b_port = get_free_port(strategy="random") - group_8b = CommandGroup( - commands=[ - Command(script=server_8b, container="vllm", name="server_8b"), - Command(script=sandbox_8b, container="nemo-skills", name="sandbox_8b"), - Command(script=client_8b, container="nemo-skills", name="eval_8b"), - ], - hardware=HardwareConfig(partition="batch", num_gpus=1), - name="eval_8b", - log_dir=log_dir - ) + # Build commands with cluster_config + server_8b_cmd, server_8b_meta = vllm_server_command(cluster_config, model="Qwen/Qwen3-8B", port=server_8b_port) + sandbox_8b_cmd, sandbox_8b_meta = sandbox_command(cluster_config, port=sandbox_8b_port) + server_32b_cmd, server_32b_meta = vllm_server_command(cluster_config, model="Qwen/Qwen3-32B", port=server_32b_port) + sandbox_32b_cmd, sandbox_32b_meta = sandbox_command(cluster_config, port=sandbox_32b_port) - # 32B model group - server_32b = ServerScript( - server_type="vllm", - model_path="Qwen/Qwen2.5-Math-32B-Instruct", - server_args="--tensor-parallel-size 4" - ) - sandbox_32b = SandboxScript() - client_32b = GenerationClientScript( - output_dir="/results/eval_32b", - extra_arguments="++prompt_config=math", - servers=[server_32b], - model_names=["Qwen/Qwen2.5-Math-32B-Instruct"], - server_types=["vllm"], - sandbox=sandbox_32b, - with_sandbox=True, - ) + server_8b = Command(command=server_8b_cmd, gpus=8, name="server_8b", metadata=server_8b_meta) + sandbox_8b = Command(command=sandbox_8b_cmd, name="sandbox_8b", metadata=sandbox_8b_meta) + eval_8b = Command(command="python eval.py --model 8b", gpus=1, name="eval_8b") - group_32b = CommandGroup( - commands=[ - Command(script=server_32b, container="vllm", name="server_32b"), - Command(script=sandbox_32b, container="nemo-skills", name="sandbox_32b"), - Command(script=client_32b, container="nemo-skills", name="eval_32b"), - ], - hardware=HardwareConfig(partition="batch", num_gpus=4), - name="eval_32b", - log_dir=log_dir - ) + server_32b = Command(command=server_32b_cmd, gpus=8, name="server_32b", metadata=server_32b_meta) + sandbox_32b = Command(command=sandbox_32b_cmd, name="sandbox_32b", metadata=sandbox_32b_meta) + eval_32b = Command(command="python eval.py --model 32b", gpus=1, name="eval_32b") + + group_8b = CommandGroup(commands=[server_8b, sandbox_8b, eval_8b], name="eval_8b", log_dir=log_dir) + group_32b = CommandGroup(commands=[server_32b, sandbox_32b, eval_32b], name="eval_32b", log_dir=log_dir) evals_job = {"name": "evals", "groups": [group_8b, group_32b], "dependencies": [prep_job]} # Job 3: Report generation (depends on both evaluations) - @dataclass(kw_only=True) - class ReportScript(Script): - output_file: str - - def __post_init__(self): - self.inline = f"python generate_report.py --output {self.output_file}" - object.__setattr__(self, 'entrypoint', 'bash') - - report_script = ReportScript(output_file="report.txt") - report = Command(script=report_script, name="report") + report = Command( + command="python generate_report.py --output report.txt", + gpus=0, + name="report" + ) report_group = CommandGroup(commands=[report], name="report", log_dir=log_dir) # Create pipeline with dependency graph @@ -204,56 +121,130 @@ def __post_init__(self): pipeline.run() """ +import logging +import shlex +from contextlib import nullcontext +from dataclasses import dataclass, field +from typing import Callable, Dict, List, Optional, Tuple, Union + +import nemo_run as run + +from nemo_skills.pipeline.utils import ( + get_env_variables, + get_executor, + get_exp, + get_exp_handles, + get_tunnel, + run_exp, + temporary_env_update, +) +from nemo_skills.pipeline.utils.commands import wrap_command +from nemo_skills.pipeline.utils.exp import ( + REUSE_CODE_EXP, + get_packaging_job_key, + install_packages_wrap, + tunnel_hash, +) +from nemo_skills.pipeline.utils.mounts import is_mounted_filepath +from nemo_skills.pipeline.utils.packager import get_registered_external_repo +from nemo_skills.utils import get_logger_name + LOG = logging.getLogger(get_logger_name(__file__)) @dataclass class Command: - """Declarative command for running tasks in containers using run.Script objects. + """Declarative command for running tasks in containers. + + The command can be either: + - A string: evaluated immediately + - A callable (lambda): evaluated lazily when the task is prepared - Example: - server = ServerScript(server_type="vllm", model_path="/models/llama", ...) - Command(script=server, container="vllm", name="my_server") + Lambdas are ONLY needed for cross-component references (hostname_ref, meta_ref). + The het_group_index isn't assigned until pipeline execution, so these must be lazy: + # Lambda is ESSENTIAL here - server.hostname_ref() and meta_ref() don't exist yet + client = Command(command=lambda: f"curl {server.hostname_ref()}:{server.meta_ref('port')}") """ - script: run.Script + # Command can be a string or callable (lambda). + # Lambdas are primarily used for cross-component references (hostname_ref, meta_ref). + command: Union[str, Callable] container: str = "nemo-skills" + gpus: Optional[int] = None + nodes: int = 1 name: str = "command" + working_dir: str = "/nemo_run/code" + env_vars: Dict[str, str] = field(default_factory=dict) + installation_command: Optional[str] = None + port: Optional[int] = None # Can be set from metadata + metadata: Dict[str, any] = field(default_factory=dict) # Stores metadata from command builders + het_group_index: Optional[int] = None # Set per-job by Pipeline (not global) + + def __post_init__(self): + # Wrap plain strings with environment setup + if isinstance(self.command, str) and (self.env_vars or self.working_dir): + self.command = wrap_command(self.command, self.working_dir, self.env_vars) + + def hostname_ref(self) -> str: + """Get hostname reference for hetjob cross-component communication.""" + if self.het_group_index is None: + return "127.0.0.1" # Local fallback + # For heterogeneous SLURM jobs, resolve nodelist to actual hostname + return f"$(scontrol show hostnames $SLURM_JOB_NODELIST_HET_GROUP_{self.het_group_index} | head -n1)" + + def meta_ref(self, key: str) -> str: + """Get metadata value (like port). Fails if key not found.""" + if key not in self.metadata: + raise KeyError( + f"Metadata key '{key}' not found in Command '{self.name}'. " + f"Available keys: {list(self.metadata.keys())}" + ) + return str(self.metadata[key]) - def prepare_for_execution(self, cluster_config: Dict) -> Tuple[run.Script, Dict]: - """Prepare script for execution. + def prepare_for_execution(self, cluster_config: Dict) -> Tuple[str, Dict]: + """Prepare command for execution. This method: - 1. Evaluates lazy commands (if script.inline is callable) - 2. Builds execution config from Script fields + 1. Evaluates callables (resolves cross-component references) + 2. Wraps with installation_command if provided Returns: - Tuple of (Script_object, execution_config) + Tuple of (final_command, execution_config) """ - runtime_metadata = {} - - # If script.inline is callable (lazy command building), evaluate it now - if callable(self.script.inline): - result = self.script.inline() + # 1. Evaluate if callable (for cross-component references like hostname_ref) + if callable(self.command): + result = self.command() if isinstance(result, tuple): - evaluated_command, runtime_metadata = result + final_command, runtime_metadata = result + # Deep merge metadata, especially environment dict + for key, value in runtime_metadata.items(): + if key == "environment" and key in self.metadata: + # Merge environment dicts instead of replacing + self.metadata[key].update(value) + else: + self.metadata[key] = value else: - evaluated_command = result + final_command = result + else: + final_command = self.command - # Update script.inline with evaluated command - self.script.set_inline(evaluated_command) + # 2. Wrap with installation_command if provided + if self.installation_command: + final_command = install_packages_wrap(final_command, self.installation_command) - # Build execution config from Script fields + # 3. Build execution config from metadata execution_config = { - "log_prefix": getattr(self.script, "log_prefix", "main"), - "environment": runtime_metadata.get("environment", {}), - "mounts": None, # Mounts not currently exposed by Scripts - "container": self.container, + "num_tasks": self.metadata.get("num_tasks", 1), + "num_gpus": self.metadata.get("gpus", self.gpus or 0), + "num_nodes": self.metadata.get("nodes", self.nodes), + "environment": self.metadata.get("environment", {}), + "log_prefix": self.metadata.get("log_prefix", "main"), + "mounts": self.metadata.get("mounts"), + "container": self.metadata.get("container", self.container), # Use container from metadata if available } - # Return the Script object itself - return self.script, execution_config + return final_command, execution_config def get_name(self) -> str: return self.name @@ -266,7 +257,6 @@ class HardwareConfig: partition: Optional[str] = None num_gpus: Optional[int] = None num_nodes: Optional[int] = None - num_tasks: Optional[int] = 1 sbatch_kwargs: Optional[dict] = None @@ -492,49 +482,16 @@ def run(self, dry_run: bool = False, log_dir: Optional[str] = None, _reuse_exp=N return exp - def _prepare_command(self, command, cluster_config: Dict) -> Tuple[run.Script, Dict]: - """Prepare command for execution. + def _prepare_command(self, command, cluster_config: Dict) -> Tuple[str, Dict]: + """Prepare command and handle mpirun wrapping.""" + final_cmd, exec_config = command.prepare_for_execution(cluster_config) - Returns: - Tuple of (Script_object, exec_config) - """ - script, exec_config = command.prepare_for_execution(cluster_config) - # Only rewrite paths for "none" executor (native execution without containers) - # For "local" executor (Docker), paths should stay as /nemo_run/code/... since - # that's where the code is mounted inside the container - if cluster_config.get("executor") == "none": - script = self._rewrite_local_paths(script) - # Note: mpirun wrapping for multi-task scripts is handled by the executor - return script, exec_config - - def _rewrite_local_paths(self, script: run.Script) -> run.Script: - """For executor='none', replace /nemo_run/code paths with local repo paths.""" - nemo_repo = get_registered_external_repo("nemo_skills") - if nemo_repo is None: - return script - - pkg_path = str(nemo_repo.path) - repo_root = str(nemo_repo.path.parent) - - def _replace(cmd: str) -> str: - return cmd.replace("/nemo_run/code/nemo_skills", pkg_path).replace("/nemo_run/code", repo_root) - - inline_cmd = script.inline - if isinstance(inline_cmd, str): - script.set_inline(_replace(inline_cmd)) - elif callable(inline_cmd): - original_inline = inline_cmd - - def wrapped_inline(): - result = original_inline() - if isinstance(result, tuple): - cmd, metadata = result - return _replace(cmd), metadata - return _replace(result) - - script.set_inline(wrapped_inline) - - return script + # Handle mpirun wrapping for non-SLURM executors + num_tasks = exec_config["num_tasks"] + if cluster_config["executor"] != "slurm" and num_tasks > 1: + final_cmd = f"mpirun --allow-run-as-root -np {num_tasks} bash -c {shlex.quote(final_cmd)}" + + return final_cmd, exec_config def _resolve_container(self, exec_config: Dict, command, cluster_config: Dict) -> str: """Resolve container name to image path.""" @@ -556,7 +513,6 @@ def _create_executor( total_het_groups: int, overlap: bool, dependencies: Optional[List] = None, - job_name_override: Optional[str] = None, ): """Create executor with optional environment update.""" env_context = ( @@ -569,10 +525,10 @@ def _create_executor( return get_executor( cluster_config=cluster_config, container=container_image, - num_nodes=hardware.num_nodes if hardware and hardware.num_nodes is not None else 1, - tasks_per_node=hardware.num_tasks if hardware and hardware.num_tasks is not None else 1, - gpus_per_node=hardware.num_gpus if hardware and hardware.num_gpus is not None else 0, - job_name=job_name_override if job_name_override else command.name, + num_nodes=exec_config["num_nodes"], + tasks_per_node=exec_config["num_tasks"], + gpus_per_node=exec_config["num_gpus"], + job_name=command.name, log_dir=log_dir, log_prefix=exec_config["log_prefix"], partition=hardware.partition if hardware else None, @@ -611,105 +567,81 @@ def _plan_and_add_job( if log_dir is None: raise ValueError(f"CommandGroup '{groups[0].name}' must have log_dir set, or provide it to pipeline.run()") - scripts: List[run.Script] = [] + commands: List[str] = [] executors: List = [] het_group_indices: List[int] = [] - # Assign het_group_index values before evaluating any commands so cross-references - # (e.g., hostname_ref) see the correct indices regardless of processing order. - for het_idx, group in enumerate(groups): - for command in group.commands: - command.script.het_group_index = het_idx if heterogeneous else None - - # Prepare commands once and collect runtime data for a second pass where we - # construct executors. This ensures all scripts have resolved cross-references. - prepared_commands: List[Dict] = [] + # In heterogeneous jobs, collect environment from all commands for cross-component refs shared_env_vars: Dict[str, str] = {} + if heterogeneous: + for het_idx, group in enumerate(groups): + for command in group.commands: + _, exec_config_probe = command.prepare_for_execution(cluster_config) + shared_env_vars.update(exec_config_probe.get("environment", {})) + + # Share packager across executors for efficiency (single-group only) + shared_packager = None + # Build commands and executors for het_idx, group in enumerate(groups): has_multiple_components = len(group.commands) > 1 total_het_groups = ( len(groups) if heterogeneous else (len(group.commands) if has_multiple_components else 1) ) - for comp_idx, command in enumerate(group.commands): - script, exec_config = self._prepare_command(command, cluster_config) - - if isinstance(script.inline, str): - if cluster_config.get("executor") not in ("none", "local"): - script.set_inline(wrap_python_path(script.inline)) - - prepared_commands.append( - { - "het_idx": het_idx, - "comp_idx": comp_idx, - "group": group, - "command": command, - "script": script, - "exec_config": exec_config, - "total_het_groups": total_het_groups, - "overlap": len(group.commands) > 1, - } - ) - - if heterogeneous: - shared_env_vars.update(exec_config.get("environment", {})) - - # Share packager across executors for efficiency (single-group only) - shared_packager = None - - # Build commands and executors using prepared data - for entry in prepared_commands: - het_idx = entry["het_idx"] - comp_idx = entry["comp_idx"] - group = entry["group"] - command = entry["command"] - script = entry["script"] - exec_config = entry["exec_config"] - total_het_groups = entry["total_het_groups"] - overlap = entry["overlap"] - - scripts.append(script) - - # Merge shared environment for heterogeneous jobs - if heterogeneous and shared_env_vars: - exec_config["environment"].update(shared_env_vars) - - # Resolve container and create executor - container_image = self._resolve_container(exec_config, command, cluster_config) - # Pass external dependencies only to the first executor (SLURM doesn't support per-component dependencies in hetjobs) - exec_dependencies = external_deps if (het_idx == 0 and comp_idx == 0) else None - - # Always use group.name for SLURM job name (consistent across all components) - # The group name is set to task_name in generate.py, without component suffixes - # Component names (like {task_name}_server, {task_name}_sandbox) are only used for log_prefix - job_name_for_slurm = group.name - - executor = self._create_executor( - command, - exec_config, - container_image, - cluster_config, - log_dir, - group.hardware, - heterogeneous, - het_idx if heterogeneous else comp_idx, - total_het_groups, - overlap, - dependencies=exec_dependencies, - job_name_override=job_name_for_slurm, + # For single-group jobs with multiple components, allow job-level GPU override for sbatch allocation + job_level_gpus = ( + group.hardware.num_gpus if (not heterogeneous and has_multiple_components and group.hardware) else None ) - # Share packager across executors for single-group jobs - if not heterogeneous: - if comp_idx == 0 and het_idx == 0: - shared_packager = executor.packager + for comp_idx, command in enumerate(group.commands): + # Assign het_group_index ONLY for heterogeneous jobs (per-job, not global) + # Non-heterogeneous jobs use localhost, so het_group_index should remain None + if heterogeneous: + command.het_group_index = het_idx else: - executor.packager = shared_packager + command.het_group_index = None + + final_cmd, exec_config = self._prepare_command(command, cluster_config) + commands.append(final_cmd) + + # Adjust GPU allocation (first component gets job-level GPUs for sbatch) for single-group jobs + exec_config["num_gpus"] = exec_config["num_gpus"] or 0 + if (not heterogeneous) and (comp_idx == 0) and (job_level_gpus is not None): + exec_config["num_gpus"] = job_level_gpus + + # Merge shared environment for heterogeneous jobs + if heterogeneous and shared_env_vars: + exec_config["environment"].update(shared_env_vars) + + # Resolve container and create executor + container_image = self._resolve_container(exec_config, command, cluster_config) + # Pass external dependencies only to the first executor (SLURM doesn't support per-component dependencies in hetjobs) + exec_dependencies = external_deps if (het_idx == 0 and comp_idx == 0) else None + executor = self._create_executor( + command, + exec_config, + container_image, + cluster_config, + log_dir, + group.hardware, + heterogeneous, + het_idx if heterogeneous else comp_idx, + total_het_groups, + (len(group.commands) > 1), + dependencies=exec_dependencies, + ) - executors.append(executor) - if heterogeneous: - het_group_indices.append(het_idx) + # Share packager across executors for single-group jobs + if not heterogeneous: + if comp_idx == 0 and het_idx == 0: + shared_packager = executor.packager + else: + executor.packager = shared_packager + + executors.append(executor) + if heterogeneous: + het_group_indices.append(het_idx) # For heterogeneous jobs, set het_group_indices on the first executor if heterogeneous and executors: @@ -744,7 +676,13 @@ def _plan_and_add_job( # If reuse_code=False, clear cache REUSE_CODE_EXP.pop(tunnel_hash(tunnel), None) - # Note: Path replacements for executor="none" are no longer needed with Script interface + # Handle executor="none" path replacements (single-group only) + if (not heterogeneous) and cluster_config["executor"] == "none": + for idx in range(len(commands)): + commands[idx] = commands[idx].replace( + "/nemo_run/code/nemo_skills", str(get_registered_external_repo("nemo_skills").path) + ) + commands[idx] = commands[idx].replace("/nemo_run/code", "./") # Ray metadata handling if self.with_ray and cluster_config["executor"] == "slurm": @@ -755,24 +693,19 @@ def _plan_and_add_job( # Add to experiment and return task ID # Note: Internal dependencies (task handles from same experiment) go to exp.add() # External dependencies (SLURM job IDs from other experiments) go to executor - if (not heterogeneous) and len(scripts) == 1: - # Single script - pass directly to exp.add() - if metadata: - scripts[0].metadata = metadata + if (not heterogeneous) and len(commands) == 1: task_id = exp.add( - scripts[0], + run.Script(inline=commands[0], metadata=metadata), executor=executors[0], name="nemo-run", dependencies=internal_deps, ) else: - # Multiple scripts or heterogeneous job - # Apply metadata to first script only - if metadata: - scripts[0].metadata = metadata - task_id = exp.add( - scripts, + [ + run.Script(inline=cmd, metadata=(metadata if idx == 0 else None)) + for idx, cmd in enumerate(commands) + ], executor=executors, name="nemo-run", dependencies=internal_deps, diff --git a/nemo_skills/pipeline/utils/generation.py b/nemo_skills/pipeline/utils/generation.py index 8ae4e96bb5..cd576053c1 100644 --- a/nemo_skills/pipeline/utils/generation.py +++ b/nemo_skills/pipeline/utils/generation.py @@ -17,7 +17,6 @@ import shlex import subprocess from collections import defaultdict -from typing import Any, List, Optional, Union from nemo_skills.pipeline.utils.cluster import get_tunnel from nemo_skills.pipeline.utils.mounts import get_unmounted_path @@ -27,81 +26,6 @@ LOG = logging.getLogger(get_logger_name(__file__)) -def normalize_models_config( - model: Optional[Union[str, List[str]]], -) -> List[str]: - """ - Normalize model specification to list. - - Handles both scalar and list inputs: - - CLI (Typer): Converts single values to single-element lists automatically - - Python API: Accepts both strings and lists - - Args: - model: Model path(s) - string or list from Python API, list from CLI - - Returns: - List of model paths - - Raises: - ValueError: If model is None or empty - """ - if model is None: - raise ValueError("Must specify --model") - - # Handle string (Python API with single model) - if isinstance(model, str): - return [model] - - # Handle list - if len(model) == 0: - raise ValueError("Must specify --model") - return list(model) - - -def normalize_parameter( - param_value: Any, - num_models: int, - param_name: str, -) -> List[Any]: - """ - Normalize a parameter to a per-model list. - - Handles both scalar and list inputs for flexible usage: - - CLI (Typer): Converts single values to single-element lists automatically - - Python API: Accepts both scalars and lists directly - - Broadcast logic: - - Scalar value: Broadcast to all models [value] * num_models - - Single-element list: Broadcast to all models - - Multi-element list: Must match num_models exactly - - Args: - param_value: Parameter value (scalar or list) - num_models: Number of models - param_name: Name of parameter (for error messages) - - Returns: - List of parameter values (one per model) - - Raises: - ValueError: If list length doesn't match num_models - """ - if not isinstance(param_value, list): - return [param_value] * num_models - - if len(param_value) == num_models: - return list(param_value) - - if len(param_value) == 1: - return param_value * num_models - - raise ValueError( - f"Parameter {param_name} has {len(param_value)} values but {num_models} models specified. " - f"Must be 1 value (broadcast) or {num_models} values (per-model)." - ) - - def get_chunked_rs_filename( output_dir: str, random_seed: int = None, @@ -370,20 +294,8 @@ def get_generation_cmd( wandb_parameters=None, with_sandbox: bool = False, script: str = "nemo_skills.inference.generate", - # Optional: for multi-model generation - server_addresses: Optional[List[str]] = None, - model_names: Optional[List[str]] = None, - server_types: Optional[List[str]] = None, ): - """Construct the generation command for language model inference. - - Supports both single-model and multi-model generation. For multi-model: - - server_addresses: List of server addresses (one per model) - - model_names: List of model names (one per model) - - server_types: List of server types (one per model) - - For single-model, server config is passed via extra_arguments. - """ + """Construct the generation command for language model inference.""" if input_file is None and input_dir is None: raise ValueError("Either input_file or input_dir must be provided.") if input_file is not None and input_dir is not None: @@ -401,7 +313,6 @@ def get_generation_cmd( output_dir=output_dir, random_seed=random_seed, ) - # Preamble for generation commands: added at executor/declarative level cmd = "export HYDRA_FULL_ERROR=1 && " # Separate Hydra config args (--config-*) from override args (++) @@ -416,22 +327,6 @@ def get_generation_cmd( else: # It's a module name, use -m flag cmd += f"python -m {script} {hydra_config_args} {common_args} " - - # Add multi-model configuration if provided - if server_addresses is not None and model_names is not None: - num_models = len(model_names) - if num_models > 1: - # Multi-model: pass server configuration as lists - model_names_arg = ",".join(model_names) - cmd += f"++server.model=[{model_names_arg}] " - - server_types_arg = ",".join(server_types) - cmd += f"++server.server_type=[{server_types_arg}] " - - server_addresses_arg = ",".join(server_addresses) - cmd += f"++server.base_url=[{server_addresses_arg}] " - # For n=1: server config is already in extra_arguments from configure_client - job_end_cmd = "" if random_seed is not None and input_dir is None: # if input_dir is not None, we default to greedy generations diff --git a/nemo_skills/pipeline/utils/scripts.py b/nemo_skills/pipeline/utils/scripts.py deleted file mode 100644 index 4e37a6b594..0000000000 --- a/nemo_skills/pipeline/utils/scripts.py +++ /dev/null @@ -1,419 +0,0 @@ -# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Script classes for NeMo-Skills pipeline components. - -These classes wrap NeMo-Run's run.Script interface to provide typed, reusable -job components (servers, clients, sandboxes) with explicit fields and -cross-component reference support for heterogeneous jobs. - -Example: - # Create a server script with automatic port allocation - server = ServerScript( - server_type="vllm", - model_path="/models/llama-8b", - cluster_config=cluster_config, - num_gpus=8, - ) - - # Create a client that references the server - client = GenerationClientScript( - output_dir="/results", - input_file="/data/input.jsonl", - server=server, # Cross-component reference - ) - - # Use in Command objects - Command(script=server, container="vllm", ...) - Command(script=client, container="nemo-skills", ...) -""" - -import logging -from dataclasses import dataclass, field -from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Tuple, Union - -import nemo_run as run - -from nemo_skills.pipeline.utils.commands import sandbox_command -from nemo_skills.pipeline.utils.exp import install_packages_wrap -from nemo_skills.pipeline.utils.generation import get_generation_cmd -from nemo_skills.pipeline.utils.server import get_free_port, get_server_command -from nemo_skills.utils import get_logger_name - -if TYPE_CHECKING: - # Avoid circular imports for type hints - pass - -LOG = logging.getLogger(get_logger_name(__file__)) - - -@dataclass -class BaseJobScript(run.Script): - """Base class for job component scripts with heterogeneous job support. - - This class provides: - - het_group_index tracking for cross-component references in heterogeneous SLURM jobs - - hostname_ref() method for getting hostnames in het jobs - - Common pattern for Script initialization - - Attributes: - het_group_index: Index in heterogeneous job group (set by Pipeline at runtime) - """ - - het_group_index: Optional[int] = field(default=None, init=False, repr=False) - installation_command: Optional[str] = None - entrypoint: str = field(default="bash", init=False) - - def __post_init__(self): - """Wrap inline command with installation_command if provided.""" - if not self.installation_command: - return - - if callable(self.inline): - original_inline = self.inline - - def wrapped_inline(): - result = original_inline() - if isinstance(result, tuple): - command, metadata = result - return install_packages_wrap(command, self.installation_command), metadata - return install_packages_wrap(result, self.installation_command) - - self.set_inline(wrapped_inline) - elif isinstance(self.inline, str): - self.set_inline(install_packages_wrap(self.inline, self.installation_command)) - - def set_inline(self, command: Union[str, Callable, run.Script]) -> None: - """Set the inline command safely on frozen dataclass.""" - object.__setattr__(self, "inline", command) - - def hostname_ref(self) -> str: - """Get hostname reference for hetjob cross-component communication. - - Returns a shell variable reference that resolves to the master node hostname - for this het group. Uses environment variables automatically exported by nemo-run: - SLURM_MASTER_NODE_HET_GROUP_0, SLURM_MASTER_NODE_HET_GROUP_1, etc. - - These are set via: - export SLURM_MASTER_NODE_HET_GROUP_N=$(scontrol show hostnames $SLURM_JOB_NODELIST_HET_GROUP_N | head -n1) - """ - if self.het_group_index is None: - return "127.0.0.1" # Local fallback for non-heterogeneous jobs - - # Use the environment variable exported by nemo-run - return f"${{SLURM_MASTER_NODE_HET_GROUP_{self.het_group_index}:-localhost}}" - - -@dataclass(kw_only=True) -class ServerScript(BaseJobScript): - """Script for model inference servers (vLLM, TRT-LLM, SGLang, etc.). - - This script wraps server command builders and provides: - - Automatic port allocation if not specified - - Type-safe server configuration - - Cross-component address sharing (get_address()) - - Resource requirement tracking (num_gpus, num_nodes, num_tasks) - - Attributes: - server_type: Type of server (vllm, trtllm, sglang, megatron, openai, etc.) - model_path: Path to model weights or model name for API services - cluster_config: Cluster configuration dictionary - num_gpus: Number of GPUs required (default: 8) - num_nodes: Number of nodes required (default: 1) - server_args: Additional server-specific arguments - server_entrypoint: Custom server entrypoint script (optional) - port: Server port (allocated automatically if None) - allocate_port: Whether to allocate port automatically (default: True) - num_tasks: Number of MPI tasks (computed in __post_init__) - log_prefix: Prefix for log files (default: "server") - - Example: - # Basic usage - server = ServerScript( - server_type="vllm", - model_path="/models/llama-3-8b", - cluster_config=cluster_config, - num_gpus=8, - ) - - # Access allocated port - print(f"Server will run on port {server.port}") - - # Get full address for client connection - address = server.get_address() # Returns "hostname:port" - """ - - server_type: str - model_path: str - cluster_config: Dict - num_gpus: int = 8 - num_nodes: int = 1 - server_args: str = "" - server_entrypoint: Optional[str] = None # Custom server entrypoint script - port: Optional[int] = None - allocate_port: bool = True - - # Computed fields (set in __post_init__) - num_tasks: int = field(init=False, repr=False) - log_prefix: str = field(default="server", init=False) - - def __post_init__(self): - """Initialize server script. - - - Allocates port if not provided - - Builds server command using get_server_command() - - Sets self.inline to the command string - - Computes num_tasks from server command builder - """ - # Allocate port if not provided - if self.port is None and self.allocate_port: - self.port = get_free_port(strategy="random") - LOG.debug(f"Allocated port {self.port} for {self.server_type} server") - - # Build server command - cmd, self.num_tasks = get_server_command( - server_type=self.server_type, - num_gpus=self.num_gpus, - num_nodes=self.num_nodes, - model_path=self.model_path, - cluster_config=self.cluster_config, - server_port=self.port, - server_args=self.server_args, - server_entrypoint=self.server_entrypoint, - ) - - self.set_inline(cmd) - super().__post_init__() - - def get_address(self) -> str: - """Get server address for client connections. - - Returns hostname:port string that clients can use to connect. - In heterogeneous jobs, hostname_ref() returns a bash expression - that resolves at runtime. - - Returns: - Server address in format "hostname:port" - - Example: - # Use in client command - client_cmd = f"python client.py --server-url http://{server.get_address()}" - """ - return f"{self.hostname_ref()}:{self.port}" - - -@dataclass(kw_only=True) -class SandboxScript(BaseJobScript): - """Script for code execution sandbox container. - - The sandbox provides a secure environment for executing LLM-generated code. - This script wraps sandbox command builders and provides: - - Automatic port allocation - - Mount configuration (can optionally keep mounts, though risky) - - Type-safe sandbox configuration - - Attributes: - cluster_config: Cluster configuration dictionary - port: Sandbox port (allocated automatically if None) - keep_mounts: Whether to keep filesystem mounts (default: False, risky if True). - Note: This is stored for documentation but actually handled at - the executor level, not in the sandbox command itself. - allocate_port: Whether to allocate port automatically (default: True) - log_prefix: Prefix for log files (default: "sandbox") - - Example: - sandbox = SandboxScript( - cluster_config=cluster_config, - keep_mounts=False, # Safer: sandbox has no access to mounted paths - ) - - # Client can reference sandbox port - client = GenerationClientScript(..., sandbox=sandbox) - """ - - cluster_config: Dict - port: Optional[int] = None - keep_mounts: bool = False - allocate_port: bool = True - env_overrides: Optional[List[str]] = None # Extra env vars in KEY=VALUE form - log_prefix: str = field(default="sandbox", init=False) - - def __post_init__(self): - """Initialize sandbox script. - - - Allocates port if not provided - - Builds sandbox command using sandbox_command() - - Sets self.inline to a callable that returns command and environment vars - """ - # Allocate port if not provided - if self.port is None and self.allocate_port: - self.port = get_free_port(strategy="random") - LOG.debug(f"Allocated port {self.port} for sandbox") - - # Build sandbox command and metadata (including environment vars) - # Note: keep_mounts is handled at the executor level, not in the command itself - cmd, metadata = sandbox_command( - cluster_config=self.cluster_config, - port=self.port, - ) - - # Use a callable to return both command and environment variables - # This ensures the sandbox's LISTEN_PORT and NGINX_PORT are properly set - def build_cmd() -> Tuple[str, Dict]: - env = dict(metadata.get("environment", {})) - # Apply user-specified environment overrides - if self.env_overrides: - for override in self.env_overrides: - key, value = override.split("=", 1) - env[key] = value - return cmd, {"environment": env} - - self.set_inline(build_cmd) - super().__post_init__() - - -@dataclass(kw_only=True) -class GenerationClientScript(BaseJobScript): - """Script for LLM generation/inference client. - - This script wraps generation command builders and provides: - - Cross-component references to multiple servers and sandbox - - Lazy command building for runtime hostname resolution - - Type-safe generation configuration - - Environment variable handling for sandbox/server communication - - Attributes: - output_dir: Directory for output files - input_file: Input JSONL file (mutually exclusive with input_dir) - input_dir: Input directory (mutually exclusive with input_file) - extra_arguments: Additional arguments for generation script - random_seed: Random seed for sampling (optional) - chunk_id: Chunk ID for parallel processing (optional) - num_chunks: Total number of chunks (required if chunk_id set) - preprocess_cmd: Command to run before generation (optional) - postprocess_cmd: Command to run after generation (optional) - wandb_parameters: WandB logging configuration (optional) - with_sandbox: Whether sandbox is enabled - script: Module or file path for generation script (default: nemo_skills.inference.generate) - servers: List of ServerScript references (None for pre-hosted servers) - server_addresses_prehosted: Addresses for pre-hosted servers (parallel to servers list) - model_names: Model names for multi-model generation (optional) - server_types: Server types for multi-model generation (optional) - sandbox: Reference to SandboxScript for cross-component communication (optional) - log_prefix: Prefix for log files (default: "main") - - Examples: - # Single server - client = GenerationClientScript( - output_dir="/results", - input_file="/data/input.jsonl", - servers=[server_script], - model_names=["llama-8b"], - server_types=["vllm"], - ) - - # Multi-model with self-hosted and pre-hosted servers - client = GenerationClientScript( - output_dir="/results", - input_file="/data/input.jsonl", - servers=[server1, server2, None], # None = pre-hosted - server_addresses_prehosted=["", "", "https://api.openai.com"], - model_names=["llama-8b", "llama-70b", "gpt-4"], - server_types=["vllm", "vllm", "openai"], - sandbox=sandbox_script, - with_sandbox=True, - ) - """ - - output_dir: str - input_file: Optional[str] = None - input_dir: Optional[str] = None - extra_arguments: str = "" - random_seed: Optional[int] = None - chunk_id: Optional[int] = None - num_chunks: Optional[int] = None - preprocess_cmd: Optional[str] = None - postprocess_cmd: Optional[str] = None - wandb_parameters: Optional[Dict] = None - with_sandbox: bool = False - script: str = "nemo_skills.inference.generate" - - # Cross-component references for single/multi-model - servers: Optional[List[Optional["ServerScript"]]] = None - server_addresses_prehosted: Optional[List[str]] = None - model_names: Optional[List[str]] = None - server_types: Optional[List[str]] = None - sandbox: Optional["SandboxScript"] = None - - log_prefix: str = field(default="main", init=False) - - def __post_init__(self): - """Initialize generation client script with lazy command building. - - Builds command lazily via a callable that is evaluated when het_group_index - is assigned, allowing hostname_ref() to resolve correctly for heterogeneous jobs. - - This works for both cases: - - With cross-refs: Resolves server hostnames and sandbox ports at runtime - - Without cross-refs: Just builds the command string (no runtime resolution needed) - """ - - def build_cmd() -> Tuple[str, Dict]: - """Build command at runtime when cross-refs are resolved.""" - env_vars = {} - - # Add sandbox port to environment if sandbox is referenced - if self.sandbox: - env_vars["NEMO_SKILLS_SANDBOX_PORT"] = str(self.sandbox.port) - - # Build server addresses if servers are provided - server_addresses = None - if self.servers is not None: - server_addresses = [] - for server_idx, server_script in enumerate(self.servers): - if server_script is not None: - # Self-hosted: construct address from hostname and port refs - addr = f"{server_script.hostname_ref()}:{server_script.port}" - else: - # Pre-hosted: use the address from server_addresses_prehosted - addr = self.server_addresses_prehosted[server_idx] - server_addresses.append(addr) - - # Build generation command - cmd = get_generation_cmd( - output_dir=self.output_dir, - input_file=self.input_file, - input_dir=self.input_dir, - extra_arguments=self.extra_arguments, - random_seed=self.random_seed, - chunk_id=self.chunk_id, - num_chunks=self.num_chunks, - preprocess_cmd=self.preprocess_cmd, - postprocess_cmd=self.postprocess_cmd, - wandb_parameters=self.wandb_parameters, - with_sandbox=self.with_sandbox, - script=self.script, - # Multi-model parameters (None for single-model) - server_addresses=server_addresses, - model_names=self.model_names, - server_types=self.server_types, - ) - - # Return command and runtime metadata (environment vars) - return cmd, {"environment": env_vars} - - # Always use lazy command building - self.set_inline(build_cmd) - super().__post_init__() diff --git a/tests/gpu-tests/test_eval.py b/tests/gpu-tests/test_eval.py index 05dccf6b51..31c8f2cccf 100644 --- a/tests/gpu-tests/test_eval.py +++ b/tests/gpu-tests/test_eval.py @@ -44,7 +44,7 @@ "mbpp", "mmau-pro", "asr-leaderboard", - "mrcr", + "aalcr", # Has tokenization mismatch issues "audiobench", "librispeech-pc", } diff --git a/tests/test_declarative_pipeline.py b/tests/test_declarative_pipeline.py index 9d76fd4721..92117e403d 100644 --- a/tests/test_declarative_pipeline.py +++ b/tests/test_declarative_pipeline.py @@ -16,7 +16,6 @@ import json import os -from typing import Callable, Optional from unittest.mock import MagicMock, patch import pytest @@ -27,83 +26,127 @@ from nemo_skills.pipeline.utils.declarative import Command, CommandGroup, HardwareConfig, Pipeline -class DummyScript: - """Minimal run.Script stand-in for unit tests.""" - - def __init__(self, inline: str | Callable | None = "echo test"): - self.inline = inline - self.log_prefix = "main" - self.metadata = {} - self.het_group_index: Optional[int] = None - - def set_inline(self, inline): - self.inline = inline - - def hostname_ref(self) -> str: - if self.het_group_index is None: - return "127.0.0.1" - return f"${{SLURM_MASTER_NODE_HET_GROUP_{self.het_group_index}:-localhost}}" +class TestCommand: + """Test Command class functionality.""" + def test_command_basic_string(self): + """Test creating a Command with a simple string.""" + cmd = Command(command="echo hello", name="test") + assert cmd.name == "test" + assert cmd.container == "nemo-skills" + assert cmd.gpus is None + assert cmd.nodes == 1 -def make_command(*, inline: str | Callable | None = "echo test", name: str = "cmd", script: DummyScript | None = None): - """Helper to build Command objects with DummyScript instances.""" - script_obj = script or DummyScript(inline=inline) - return Command(script=script_obj, name=name) + def test_command_with_metadata(self): + """Test Command with metadata passed separately.""" + cmd = Command(command="echo hello", name="server", metadata={"port": 8080, "log_prefix": "server"}) + assert cmd.metadata["port"] == 8080 + assert cmd.metadata["log_prefix"] == "server" + # Command gets wrapped with working_dir by default + assert "echo hello" in cmd.command + def test_command_with_callable(self): + """Test Command with callable that returns tuple.""" -class TestCommand: - """Tests for the new Script-based Command wrapper.""" + def make_cmd(): + return ("echo world", {"port": 5000}) - def test_command_basic_script(self): - cmd = make_command(inline="echo hello", name="test") - assert cmd.name == "test" - assert cmd.container == "nemo-skills" - assert cmd.script.inline == "echo hello" + cmd = Command(command=make_cmd, name="dynamic") + assert callable(cmd.command) + assert cmd.name == "dynamic" def test_command_prepare_for_execution_string(self): - cmd = make_command(inline="python script.py", name="test") + """Test prepare_for_execution with string command.""" + cmd = Command(command="python script.py", gpus=2, name="test") cluster_config = {"executor": "local", "containers": {}} - script_obj, exec_config = cmd.prepare_for_execution(cluster_config) + final_cmd, exec_config = cmd.prepare_for_execution(cluster_config) - assert script_obj.inline == "python script.py" - assert exec_config["log_prefix"] == "main" - assert exec_config["environment"] == {} + assert "python script.py" in final_cmd + assert exec_config["num_gpus"] == 2 + assert exec_config["num_nodes"] == 1 + assert exec_config["num_tasks"] == 1 def test_command_prepare_for_execution_callable(self): - script = DummyScript(inline=lambda: "echo test") - cmd = make_command(name="test", script=script) + """Test prepare_for_execution with callable command.""" + + def make_cmd(): + return "echo test" + + cmd = Command(command=make_cmd, name="test") cluster_config = {"executor": "local", "containers": {}} - script_obj, _ = cmd.prepare_for_execution(cluster_config) - assert script_obj.inline == "echo test" + final_cmd, exec_config = cmd.prepare_for_execution(cluster_config) + + assert final_cmd == "echo test" def test_command_prepare_for_execution_callable_with_metadata(self): + """Test prepare_for_execution with callable returning tuple.""" + def make_cmd(): - return ("echo metadata", {"environment": {"VAR": "value"}}) + return ("echo metadata", {"num_tasks": 4, "environment": {"VAR": "value"}}) - script = DummyScript(inline=make_cmd) - cmd = make_command(name="test", script=script) + cmd = Command(command=make_cmd, name="test") cluster_config = {"executor": "local", "containers": {}} - _, exec_config = cmd.prepare_for_execution(cluster_config) + final_cmd, exec_config = cmd.prepare_for_execution(cluster_config) + assert final_cmd == "echo metadata" + assert exec_config["num_tasks"] == 4 assert exec_config["environment"]["VAR"] == "value" - def test_command_hostname_ref_none(self): - script = DummyScript() - cmd = make_command(name="test", script=script) + def test_command_meta_ref(self): + """Test meta_ref for accessing metadata.""" + cmd = Command(command="echo test", name="server", metadata={"port": 8080, "host": "localhost"}) + + assert cmd.meta_ref("port") == "8080" + assert cmd.meta_ref("host") == "localhost" + + def test_command_meta_ref_missing_key(self): + """Test meta_ref with missing key raises KeyError.""" + cmd = Command(command="echo test", name="test") - assert script.hostname_ref() == "127.0.0.1" - assert cmd.get_name() == "test" + with pytest.raises(KeyError, match="Metadata key 'port' not found"): + cmd.meta_ref("port") + + def test_command_hostname_ref_none(self): + """Test hostname_ref returns localhost when het_group_index is None.""" + cmd = Command(command="echo test", name="test") + assert cmd.het_group_index is None + assert cmd.hostname_ref() == "127.0.0.1" def test_command_hostname_ref_heterogeneous(self): - script = DummyScript() - script.het_group_index = 2 - make_command(name="test", script=script) + """Test hostname_ref returns SLURM variable when het_group_index is set.""" + cmd = Command(command="echo test", name="test") + cmd.het_group_index = 2 - hostname = script.hostname_ref() - assert "${SLURM_MASTER_NODE_HET_GROUP_2" in hostname + hostname = cmd.hostname_ref() + assert "$SLURM_JOB_NODELIST_HET_GROUP_2" in hostname + assert "scontrol" in hostname + + def test_command_with_installation_command(self): + """Test Command with installation_command.""" + cmd = Command(command="python script.py", installation_command="pip install package", name="test") + cluster_config = {"executor": "local", "containers": {}} + + final_cmd, _ = cmd.prepare_for_execution(cluster_config) + + # Installation command should be wrapped around the main command + assert "pip install package" in final_cmd + assert "python script.py" in final_cmd + + def test_command_env_vars_wrapping(self): + """Test that env_vars and working_dir are applied to string commands.""" + cmd = Command( + command="python script.py", + env_vars={"MY_VAR": "value"}, + working_dir="/custom/path", + name="test", + ) + + # The command should be wrapped with env setup + assert "export MY_VAR=value" in cmd.command + assert "cd /custom/path" in cmd.command class TestCommandGroup: @@ -111,8 +154,8 @@ class TestCommandGroup: def test_commandgroup_basic(self): """Test creating a basic CommandGroup.""" - cmd1 = make_command(inline="echo 1", name="cmd1") - cmd2 = make_command(inline="echo 2", name="cmd2") + cmd1 = Command(command="echo 1", name="cmd1") + cmd2 = Command(command="echo 2", name="cmd2") group = CommandGroup(commands=[cmd1, cmd2], name="test_group") @@ -122,7 +165,7 @@ def test_commandgroup_basic(self): def test_commandgroup_with_hardware(self): """Test CommandGroup with HardwareConfig.""" - cmd = make_command(inline="echo test", name="cmd") + cmd = Command(command="echo test", name="cmd") hardware = HardwareConfig(partition="batch", sbatch_kwargs={"time_min": "01:00:00"}, num_gpus=8) group = CommandGroup(commands=[cmd], hardware=hardware, name="gpu_group") @@ -133,7 +176,7 @@ def test_commandgroup_with_hardware(self): def test_commandgroup_with_log_dir(self): """Test CommandGroup with log_dir.""" - cmd = make_command(inline="echo test", name="cmd") + cmd = Command(command="echo test", name="cmd") group = CommandGroup(commands=[cmd], log_dir="/logs/test", name="group") assert group.log_dir == "/logs/test" @@ -144,7 +187,7 @@ class TestPipeline: def test_pipeline_with_single_job(self): """Test Pipeline with single job.""" - cmd = make_command(inline="echo test", name="cmd") + cmd = Command(command="echo test", name="cmd") group = CommandGroup(commands=[cmd], name="group") cluster_config = {"executor": "local", "containers": {}} @@ -161,10 +204,10 @@ def test_pipeline_with_single_job(self): def test_pipeline_with_jobs(self): """Test Pipeline with jobs parameter (full format with dependencies).""" - cmd1 = make_command(inline="echo 1", name="cmd1") + cmd1 = Command(command="echo 1", name="cmd1") group1 = CommandGroup(commands=[cmd1], name="group1", log_dir="/logs") - cmd2 = make_command(inline="echo 2", name="cmd2") + cmd2 = Command(command="echo 2", name="cmd2") group2 = CommandGroup(commands=[cmd2], name="group2", log_dir="/logs") job1 = {"name": "job1", "group": group1} @@ -189,7 +232,7 @@ def test_pipeline_requires_jobs(self): def test_pipeline_with_run_after(self): """Test Pipeline with run_after parameter.""" - cmd = make_command(inline="echo test", name="cmd") + cmd = Command(command="echo test", name="cmd") group = CommandGroup(commands=[cmd], name="group") cluster_config = {"executor": "local", "containers": {}} @@ -205,7 +248,7 @@ def test_pipeline_with_run_after(self): def test_pipeline_with_run_after_list(self): """Test Pipeline with run_after as list.""" - cmd = make_command(inline="echo test", name="cmd") + cmd = Command(command="echo test", name="cmd") group = CommandGroup(commands=[cmd], name="group") cluster_config = {"executor": "local", "containers": {}} @@ -221,7 +264,7 @@ def test_pipeline_with_run_after_list(self): def test_pipeline_cluster_config_passed_directly(self): """Test that cluster_config is passed directly (no more string resolution).""" - cmd = make_command(inline="echo test", name="cmd") + cmd = Command(command="echo test", name="cmd") group = CommandGroup(commands=[cmd], name="group") cluster_config = {"executor": "local", "containers": {}} @@ -256,7 +299,7 @@ def test_pipeline_run_basic(self, mock_run_exp, mock_env_vars, mock_get_exp): mock_get_exp.return_value.__enter__.return_value = mock_exp # Create pipeline - cmd = make_command(inline="echo test", name="cmd") + cmd = Command(command="echo test", name="cmd") group = CommandGroup(commands=[cmd], name="group", log_dir="/logs") pipeline = Pipeline( name="test", cluster_config=mock_config, jobs=[{"name": "job1", "group": group}], skip_hf_home_check=True @@ -286,10 +329,10 @@ def test_pipeline_run_with_dependencies(self, mock_run_exp, mock_env_vars, mock_ mock_get_exp.return_value.__enter__.return_value = mock_exp # Create pipeline with internal dependencies - cmd1 = make_command(inline="echo 1", name="cmd1") + cmd1 = Command(command="echo 1", name="cmd1") group1 = CommandGroup(commands=[cmd1], name="group1", log_dir="/logs") - cmd2 = make_command(inline="echo 2", name="cmd2") + cmd2 = Command(command="echo 2", name="cmd2") group2 = CommandGroup(commands=[cmd2], name="group2", log_dir="/logs") job1 = {"name": "job1", "group": group1, "dependencies": []} @@ -332,7 +375,7 @@ def test_pipeline_hf_home_validation(self, mock_get_executor, mock_is_mounted, m mock_exp.add.return_value = "handle" mock_get_exp.return_value.__enter__.return_value = mock_exp - cmd = make_command(inline="echo test", name="cmd") + cmd = Command(command="echo test", name="cmd") group = CommandGroup(commands=[cmd], name="group", log_dir="/logs") pipeline = Pipeline(name="test", cluster_config=mock_config, jobs=[{"name": "job1", "group": group}]) @@ -348,7 +391,7 @@ def test_pipeline_hf_home_missing(self, mock_env_vars): mock_config = {"executor": "slurm", "containers": {}} mock_env_vars.return_value = {} # No HF_HOME - cmd = make_command(inline="echo test", name="cmd") + cmd = Command(command="echo test", name="cmd") group = CommandGroup(commands=[cmd], name="group", log_dir="/logs") # Should raise in __init__ now, not run() @@ -363,7 +406,7 @@ def test_pipeline_hf_home_not_mounted(self, mock_is_mounted, mock_env_vars): mock_env_vars.return_value = {"HF_HOME": "/hf"} mock_is_mounted.return_value = False - cmd = make_command(inline="echo test", name="cmd") + cmd = Command(command="echo test", name="cmd") group = CommandGroup(commands=[cmd], name="group", log_dir="/logs") # Should raise in __init__ now, not run() @@ -389,8 +432,8 @@ def test_het_group_index_non_heterogeneous(self, mock_env_vars, mock_get_exp): mock_get_exp.return_value.__enter__.return_value = mock_exp # Create single-group job with multiple components - cmd1 = make_command(inline="echo 1", name="cmd1") - cmd2 = make_command(inline="echo 2", name="cmd2") + cmd1 = Command(command="echo 1", name="cmd1") + cmd2 = Command(command="echo 2", name="cmd2") group = CommandGroup(commands=[cmd1, cmd2], name="group", log_dir="/logs") pipeline = Pipeline( @@ -399,10 +442,10 @@ def test_het_group_index_non_heterogeneous(self, mock_env_vars, mock_get_exp): pipeline.run(dry_run=True) # Both commands should have None het_group_index (localhost communication) - assert cmd1.script.het_group_index is None - assert cmd2.script.het_group_index is None - assert cmd1.script.hostname_ref() == "127.0.0.1" - assert cmd2.script.hostname_ref() == "127.0.0.1" + assert cmd1.het_group_index is None + assert cmd2.het_group_index is None + assert cmd1.hostname_ref() == "127.0.0.1" + assert cmd2.hostname_ref() == "127.0.0.1" @patch("nemo_skills.pipeline.utils.declarative.get_exp") @patch("nemo_skills.pipeline.utils.declarative.get_env_variables") @@ -419,10 +462,10 @@ def test_het_group_index_heterogeneous(self, mock_env_vars, mock_get_exp): mock_get_exp.return_value.__enter__.return_value = mock_exp # Create multi-group heterogeneous job - cmd1 = make_command(inline="echo 1", name="cmd1") + cmd1 = Command(command="echo 1", name="cmd1") group1 = CommandGroup(commands=[cmd1], name="group1", log_dir="/logs") - cmd2 = make_command(inline="echo 2", name="cmd2") + cmd2 = Command(command="echo 2", name="cmd2") group2 = CommandGroup(commands=[cmd2], name="group2", log_dir="/logs") jobs = [{"name": "hetjob", "groups": [group1, group2]}] @@ -430,10 +473,10 @@ def test_het_group_index_heterogeneous(self, mock_env_vars, mock_get_exp): pipeline.run(dry_run=True) # Commands should have het_group_index 0 and 1 - assert cmd1.script.het_group_index == 0 - assert cmd2.script.het_group_index == 1 - assert "SLURM_MASTER_NODE_HET_GROUP_0" in cmd1.script.hostname_ref() - assert "SLURM_MASTER_NODE_HET_GROUP_1" in cmd2.script.hostname_ref() + assert cmd1.het_group_index == 0 + assert cmd2.het_group_index == 1 + assert "$SLURM_JOB_NODELIST_HET_GROUP_0" in cmd1.hostname_ref() + assert "$SLURM_JOB_NODELIST_HET_GROUP_1" in cmd2.hostname_ref() @patch("nemo_skills.pipeline.utils.declarative.get_exp") @patch("nemo_skills.pipeline.utils.declarative.get_env_variables") @@ -450,16 +493,16 @@ def test_het_group_index_per_job_not_global(self, mock_env_vars, mock_get_exp): mock_get_exp.return_value.__enter__.return_value = mock_exp # Create two separate heterogeneous jobs - cmd1 = make_command(inline="echo 1", name="cmd1") + cmd1 = Command(command="echo 1", name="cmd1") group1 = CommandGroup(commands=[cmd1], name="group1", log_dir="/logs") - cmd2 = make_command(inline="echo 2", name="cmd2") + cmd2 = Command(command="echo 2", name="cmd2") group2 = CommandGroup(commands=[cmd2], name="group2", log_dir="/logs") - cmd3 = make_command(inline="echo 3", name="cmd3") + cmd3 = Command(command="echo 3", name="cmd3") group3 = CommandGroup(commands=[cmd3], name="group3", log_dir="/logs") - cmd4 = make_command(inline="echo 4", name="cmd4") + cmd4 = Command(command="echo 4", name="cmd4") group4 = CommandGroup(commands=[cmd4], name="group4", log_dir="/logs") jobs = [ @@ -470,10 +513,10 @@ def test_het_group_index_per_job_not_global(self, mock_env_vars, mock_get_exp): pipeline.run(dry_run=True) # Both jobs should have het_group_index starting from 0 - assert cmd1.script.het_group_index == 0 - assert cmd2.script.het_group_index == 1 - assert cmd3.script.het_group_index == 0 # Starts from 0 again! - assert cmd4.script.het_group_index == 1 + assert cmd1.het_group_index == 0 + assert cmd2.het_group_index == 1 + assert cmd3.het_group_index == 0 # Starts from 0 again! + assert cmd4.het_group_index == 1 class TestDependencyResolution: @@ -493,7 +536,7 @@ def test_dependency_none_handling(self, mock_env_vars, mock_get_exp): mock_exp.add.return_value = "handle" mock_get_exp.return_value.__enter__.return_value = mock_exp - cmd = make_command(inline="echo test", name="cmd") + cmd = Command(command="echo test", name="cmd") group = CommandGroup(commands=[cmd], name="group", log_dir="/logs") jobs = [{"name": "job", "group": group, "dependencies": None}] @@ -516,7 +559,7 @@ def test_pipeline_run_after_applies_to_jobs(self, mock_env_vars, mock_get_exp): mock_exp.add.return_value = "handle" mock_get_exp.return_value.__enter__.return_value = mock_exp - cmd = make_command(inline="echo test", name="cmd") + cmd = Command(command="echo test", name="cmd") group = CommandGroup(commands=[cmd], name="group", log_dir="/logs") pipeline = Pipeline( @@ -546,7 +589,7 @@ def test_pipeline_job_missing_group_or_groups(self): def test_commandgroup_missing_log_dir(self): """Test that CommandGroup without log_dir raises error during execution.""" mock_config = {"executor": "none", "containers": {}} - cmd = make_command(inline="echo test", name="cmd") + cmd = Command(command="echo test", name="cmd") group = CommandGroup(commands=[cmd], name="group") # No log_dir pipeline = Pipeline(name="test", cluster_config=mock_config, jobs=[{"name": "job1", "group": group}]) @@ -583,14 +626,14 @@ def test_multiple_internal_dependencies(self): } # Job 1 and Job 2: independent - cmd1 = make_command(inline="echo job1", name="job1") + cmd1 = Command(command="echo job1", name="job1") group1 = CommandGroup(commands=[cmd1], name="group1", log_dir="/tmp/logs") - cmd2 = make_command(inline="echo job2", name="job2") + cmd2 = Command(command="echo job2", name="job2") group2 = CommandGroup(commands=[cmd2], name="group2", log_dir="/tmp/logs") # Job 3: depends on both job1 and job2 - cmd3 = make_command(inline="echo job3", name="job3") + cmd3 = Command(command="echo job3", name="job3") group3 = CommandGroup(commands=[cmd3], name="group3", log_dir="/tmp/logs") job1_spec = {"name": "job1", "group": group1} @@ -672,11 +715,11 @@ def mock_get_executor(**kwargs): } # Job 1: depends on external experiment - cmd1 = make_command(inline="echo job1", name="job1") + cmd1 = Command(command="echo job1", name="job1") group1 = CommandGroup(commands=[cmd1], name="group1", log_dir="/tmp/logs") # Job 2: depends on job1 (internal) AND external experiment - cmd2 = make_command(inline="echo job2", name="job2") + cmd2 = Command(command="echo job2", name="job2") group2 = CommandGroup(commands=[cmd2], name="group2", log_dir="/tmp/logs") job1_spec = { @@ -888,38 +931,35 @@ def capture_env_update(cluster_config, updates): # Debug: print what we captured print(f"Captured env updates: {env_updates_captured}") - # Verify both sandbox and client environment variables are captured - assert len(env_updates_captured) >= 2, ( - f"Expected at least 2 environment updates (sandbox + client), got {len(env_updates_captured)}: {env_updates_captured}" - ) - - # Find the sandbox and client environment updates - sandbox_env = None + # Find the client and sandbox environment updates client_env = None + sandbox_env = None + for env_update in env_updates_captured: - if "LISTEN_PORT" in env_update and "NGINX_PORT" in env_update: - sandbox_env = env_update if "NEMO_SKILLS_SANDBOX_PORT" in env_update: client_env = env_update + elif "LISTEN_PORT" in env_update and "NGINX_PORT" in env_update: + sandbox_env = env_update - # Verify sandbox got LISTEN_PORT and NGINX_PORT - assert sandbox_env is not None, ( - f"LISTEN_PORT/NGINX_PORT not set for sandbox command: {env_updates_captured}" + # Verify client got NEMO_SKILLS_SANDBOX_PORT (old behavior: exp.py line 493) + # This is the key fix - ensuring sandbox port is passed to client + assert client_env is not None, ( + f"Client environment update not found. Captured updates: {env_updates_captured}\n" + f"This means NEMO_SKILLS_SANDBOX_PORT was not set for the client command, " + f"so the Sandbox class cannot connect to the sandbox server." ) - assert sandbox_env["LISTEN_PORT"] == sandbox_env["NGINX_PORT"], ( - f"LISTEN_PORT and NGINX_PORT should match: {sandbox_env}" + assert "NEMO_SKILLS_SANDBOX_PORT" in client_env, ( + "NEMO_SKILLS_SANDBOX_PORT not set for client command" ) - # Verify client got NEMO_SKILLS_SANDBOX_PORT - assert client_env is not None, ( - f"NEMO_SKILLS_SANDBOX_PORT not set for client command: {env_updates_captured}" + # Verify sandbox got its environment vars (old behavior: exp.py lines 525-538) + assert sandbox_env is not None, ( + f"Sandbox environment update not found. Captured: {env_updates_captured}" ) + assert "LISTEN_PORT" in sandbox_env, "LISTEN_PORT not set for sandbox" + assert "NGINX_PORT" in sandbox_env, "NGINX_PORT not set for sandbox" - # Verify the ports match between sandbox and client - assert client_env["NEMO_SKILLS_SANDBOX_PORT"] == sandbox_env["LISTEN_PORT"], ( - f"Sandbox port mismatch: client has {client_env['NEMO_SKILLS_SANDBOX_PORT']}, " - f"sandbox has {sandbox_env['LISTEN_PORT']}" - ) + # This test verifies the fix works end-to-end through the actual generate() function if __name__ == "__main__": diff --git a/tests/test_generation.py b/tests/test_generation.py index 2693d62241..b69b526a0e 100644 --- a/tests/test_generation.py +++ b/tests/test_generation.py @@ -16,12 +16,12 @@ # running most things through subprocess since that's how it's usually used import subprocess +from unittest.mock import MagicMock import pytest from nemo_skills.evaluation.metrics import ComputeMetrics -from nemo_skills.pipeline.generate import _create_job_unified -from nemo_skills.pipeline.utils.scripts import ServerScript +from nemo_skills.pipeline.generate import _create_commandgroup_from_config def test_eval_gsm8k_api(tmp_path): @@ -153,42 +153,36 @@ def test_generate_openai_format(tmp_path, format): assert len(data[1]["generation"]) > 0 -def test_server_metadata_from_num_tasks(tmp_path): +def test_server_metadata_from_num_tasks(): """Test that metadata dict is properly created from server command returning (cmd, num_tasks).""" + mock_server_fn = MagicMock(return_value=("python server.py", 4)) cluster_config = { - "containers": { - "vllm": "apitest/vllm", - "nemo-skills": "apitest/nemo-skills", - "sandbox": "apitest/sandbox", - }, - "executor": "none", + "containers": {"vllm": "nvcr.io/nvidia/nemo:vllm", "nemo-skills": "nvcr.io/nvidia/nemo:skills"}, + "executor": "slurm", } server_config = { "server_type": "vllm", "num_gpus": 8, "num_nodes": 1, - "model_path": str(tmp_path / "model"), + "model_path": "/models/test", "server_port": 5000, - "server_args": "", } - generation_params = {"output_dir": "/tmp/out"} - groups = _create_job_unified( - models=[server_config["model_path"]], - server_configs=[server_config], - generation_params=generation_params, + cmd_group = _create_commandgroup_from_config( + generation_cmd="python generate.py", + server_config=server_config, + with_sandbox=False, + sandbox_port=None, cluster_config=cluster_config, installation_command=None, - with_sandbox=False, + get_server_command_fn=mock_server_fn, partition=None, keep_mounts_for_sandbox=False, task_name="test-task", log_dir="/tmp/logs", ) - server_cmd = groups[0].commands[0] - assert isinstance(server_cmd.script, ServerScript) - assert server_cmd.script.num_tasks >= 1 - assert server_cmd.script.num_gpus == server_config["num_gpus"] - assert groups[0].hardware.num_gpus == server_config["num_gpus"] - assert groups[0].hardware.num_tasks == server_cmd.script.num_tasks + server_cmd = cmd_group.commands[0] + assert isinstance(server_cmd.metadata, dict) + assert server_cmd.metadata["num_tasks"] == 4 + assert server_cmd.metadata["gpus"] == 8 diff --git a/tests/test_nemo_evaluator_pipeline.py b/tests/test_nemo_evaluator_pipeline.py index 0f333ab748..22ac250882 100644 --- a/tests/test_nemo_evaluator_pipeline.py +++ b/tests/test_nemo_evaluator_pipeline.py @@ -17,14 +17,8 @@ import pytest -from nemo_skills.pipeline.nemo_evaluator import ( - EvaluatorClientScript, -) -from nemo_skills.pipeline.nemo_evaluator import ( - nemo_evaluator as nemo_evaluator_fn, -) +from nemo_skills.pipeline.nemo_evaluator import nemo_evaluator as nemo_evaluator_fn from nemo_skills.pipeline.utils.declarative import Command, CommandGroup -from nemo_skills.pipeline.utils.scripts import ServerScript @pytest.fixture @@ -137,8 +131,9 @@ def test_no_servers_external_urls( # Verify client command client_cmd = group.commands[0] assert isinstance(client_cmd, Command) - assert client_cmd.name.startswith("evaluator-test-client-0") - assert isinstance(client_cmd.script, EvaluatorClientScript) + assert "evaluator-test-0" in client_cmd.name + assert client_cmd.gpus is None # No GPUs when no hosted servers + assert client_cmd.nodes == 1 # Verify hardware config assert group.hardware is not None @@ -186,17 +181,16 @@ def test_main_server_hosted( server_cmd = group.commands[0] assert isinstance(server_cmd, Command) assert "server" in server_cmd.name - assert isinstance(server_cmd.script, ServerScript) - assert server_cmd.script.num_gpus == 8 - assert server_cmd.script.log_prefix == "server" - assert server_cmd.script.port is not None + assert server_cmd.gpus == 8 + assert server_cmd.nodes == 1 + assert "port" in server_cmd.metadata + assert server_cmd.metadata["log_prefix"] == "server" # Verify client command client_cmd = group.commands[1] assert isinstance(client_cmd, Command) assert "client" in client_cmd.name - assert isinstance(client_cmd.script, EvaluatorClientScript) - assert callable(client_cmd.script.inline) # Should be lambda for cross-component refs + assert callable(client_cmd.command) # Should be lambda for cross-component refs # Verify hardware config (should use server GPUs) assert group.hardware.num_gpus == 8 @@ -241,16 +235,14 @@ def test_judge_server_hosted( judge_cmd = group.commands[0] assert isinstance(judge_cmd, Command) assert "judge-server" in judge_cmd.name - assert isinstance(judge_cmd.script, ServerScript) - assert judge_cmd.script.num_gpus == 32 - assert judge_cmd.script.log_prefix == "judge-server" + assert judge_cmd.gpus == 32 + assert judge_cmd.metadata["log_prefix"] == "judge-server" # Verify client command client_cmd = group.commands[1] assert isinstance(client_cmd, Command) assert "client" in client_cmd.name - assert isinstance(client_cmd.script, EvaluatorClientScript) - assert callable(client_cmd.script.inline) # Should be lambda for cross-component refs + assert callable(client_cmd.command) # Should be lambda for cross-component refs # Verify hardware config (should use judge server GPUs) assert group.hardware.num_gpus == 32 @@ -308,22 +300,19 @@ def test_both_servers_hosted_separate_groups( server_cmd = server_group.commands[0] assert isinstance(server_cmd, Command) assert "server" in server_cmd.name - assert isinstance(server_cmd.script, ServerScript) - assert server_cmd.script.num_gpus == 8 + assert server_cmd.gpus == 8 # Verify client command in first group client_cmd = server_group.commands[1] assert isinstance(client_cmd, Command) assert "client" in client_cmd.name - assert isinstance(client_cmd.script, EvaluatorClientScript) - assert callable(client_cmd.script.inline) # Lambda for cross-component refs + assert callable(client_cmd.command) # Lambda for cross-component refs # Verify judge server command in second group judge_cmd = judge_group.commands[0] assert isinstance(judge_cmd, Command) assert "judge-server" in judge_cmd.name - assert isinstance(judge_cmd.script, ServerScript) - assert judge_cmd.script.num_gpus == 32 + assert judge_cmd.gpus == 32 @patch("nemo_skills.pipeline.nemo_evaluator.Pipeline")