diff --git a/nemo_skills/pipeline/eval.py b/nemo_skills/pipeline/eval.py index 1f1557a4f9..ac12716124 100644 --- a/nemo_skills/pipeline/eval.py +++ b/nemo_skills/pipeline/eval.py @@ -372,6 +372,12 @@ def eval( None, help="Number of chunks to split the dataset into. If None, will not chunk the dataset.", ), + gpus_per_node: int = typer.Option( + 1, + help="Number of GPUs per node for multi-instance mode. " + "When > 1, launches multiple server instances (one per GPU) within a single job. " + "Requires num_chunks to be a multiple of gpus_per_node.", + ), chunk_ids: str = typer.Option( None, help="List of explicit chunk ids to run. Separate with , or .. to specify range. " @@ -581,6 +587,7 @@ def eval( eval_requires_judge=eval_requires_judge, generation_type=generation_type, generation_module=generation_module, + gpus_per_node=gpus_per_node, ) sbatch_kwargs = parse_kwargs(sbatch_kwargs, exclusive=exclusive, qos=qos, time_min=time_min) @@ -605,9 +612,14 @@ def eval( job_server_address, job_server_command, job_sandbox_env_overrides, + job_gpus_per_node, ) = job_args prev_tasks = _task_dependencies + # Add gpus_per_node to server config for multi-instance mode + if job_server_config and job_gpus_per_node > 1: + job_server_config["gpus_per_node"] = job_gpus_per_node + for _ in range(dependent_jobs + 1): has_tasks = True new_task = pipeline_utils.add_task( @@ -617,6 +629,7 @@ def eval( log_dir=log_dir, container=cluster_config["containers"]["nemo-skills"], cluster_config=cluster_config, + num_tasks=job_gpus_per_node, partition=partition, server_config=job_server_config, with_sandbox=job_needs_sandbox or with_sandbox, diff --git a/nemo_skills/pipeline/utils/eval.py b/nemo_skills/pipeline/utils/eval.py index 9750659e5b..37c81825f7 100644 --- a/nemo_skills/pipeline/utils/eval.py +++ b/nemo_skills/pipeline/utils/eval.py @@ -267,11 +267,20 @@ def prepare_eval_commands( eval_requires_judge, generation_type=None, generation_module=None, + gpus_per_node: int = 1, ): # TODO: there is a bit too much code duplication here and logic is quite dense, should try to refactor # TODO: should we allow setting num chunks per benchmark when not using groups? Maybe benchmark:rs_num:num_chunks? + # Validate gpus_per_node for multi-instance mode + if gpus_per_node > 1: + if num_chunks is None: + raise ValueError("gpus_per_node > 1 requires num_chunks to be specified") + if num_chunks % gpus_per_node != 0: + raise ValueError(f"num_chunks ({num_chunks}) must be a multiple of gpus_per_node ({gpus_per_node})") + LOG.info(f"Multi-instance mode: {gpus_per_node} GPUs per node, {num_chunks // gpus_per_node} jobs") + if generation_type is not None: if generation_module is not None: raise ValueError("Cannot specify both generation_module and generation_type. ") @@ -354,7 +363,12 @@ def prepare_eval_commands( rerun_done=rerun_done, ) for seed_idx, (seed, benchmark_chunk_ids) in enumerate(benchmark_args.remaining_jobs.items()): - total_evals += len(benchmark_chunk_ids) + # Multi-instance mode: count unique base chunks (each base chunk = 1 job) + if gpus_per_node > 1: + base_chunks = set((cid // gpus_per_node) * gpus_per_node for cid in benchmark_chunk_ids) + total_evals += len(base_chunks) + else: + total_evals += len(benchmark_chunk_ids) if num_jobs < 0: # if num_jobs is -1, we run all benchmarks in parallel @@ -376,6 +390,7 @@ def prepare_eval_commands( **server_parameters, extra_arguments=extra_arguments, get_random_port=get_random_port, + gpus_per_node=gpus_per_node, ) cur_eval = 0 @@ -398,7 +413,18 @@ def prepare_eval_commands( random_seed=seed, chunk_id=None, ) - for chunk_id in benchmark_chunk_ids: + # Multi-instance mode: compute which base chunks need to run + # If ANY chunk in a batch is incomplete, we run the entire batch (base_chunk) + if gpus_per_node > 1: + base_chunks_to_run = set() + for cid in benchmark_chunk_ids: + base_chunk = (cid // gpus_per_node) * gpus_per_node + base_chunks_to_run.add(base_chunk) + chunks_to_process = sorted(base_chunks_to_run) + else: + chunks_to_process = benchmark_chunk_ids + + for chunk_id in chunks_to_process: job_benchmarks.add(benchmark) effective_generation_module = generation_module or benchmark_args.generation_module @@ -431,12 +457,17 @@ def prepare_eval_commands( f"{job_extra_arguments} " ) + # Multi-instance mode: use shell expression for chunk_id + effective_chunk_id = chunk_id + if gpus_per_node > 1: + effective_chunk_id = f"$(({chunk_id} + $SLURM_LOCALID))" + cmd = pipeline_utils.get_generation_cmd( input_file=benchmark_args.input_file, output_dir=benchmark_output_dir, extra_arguments=full_extra_arguments, random_seed=seed, - chunk_id=chunk_id, + chunk_id=effective_chunk_id, num_chunks=benchmark_args.num_chunks, script=generation_module or benchmark_args.generation_module, requirements=requirements, @@ -480,12 +511,14 @@ def prepare_eval_commands( # a check above guarantees that this is the same for all tasks in a job generation_task.get_server_command_fn(), job_sandbox_env_overrides, + gpus_per_node, # client num_tasks for multi-instance mode ) ) job_server_config, job_server_address, job_extra_arguments = pipeline_utils.configure_client( **server_parameters, extra_arguments=extra_arguments, get_random_port=get_random_port, + gpus_per_node=gpus_per_node, ) for job_benchmark in job_benchmarks: benchmarks_dict[job_benchmark].job_ids.append(cur_job_idx) diff --git a/nemo_skills/pipeline/utils/exp.py b/nemo_skills/pipeline/utils/exp.py index 0f9ec7f123..d22fddec6a 100644 --- a/nemo_skills/pipeline/utils/exp.py +++ b/nemo_skills/pipeline/utils/exp.py @@ -127,7 +127,7 @@ def stdout(self) -> Path: @property def srun_stdout(self) -> Path: - return Path(self.folder) / f"{self.srun_prefix}%j_srun.log" + return Path(self.folder) / f"{self.srun_prefix}%j_%t_srun.log" @property def stderr(self) -> Path: @@ -135,7 +135,7 @@ def stderr(self) -> Path: @property def srun_stderr(self) -> Path: - return Path(self.folder) / f"{self.srun_prefix}%j_srun.log" + return Path(self.folder) / f"{self.srun_prefix}%j_%t_srun.log" @property def ls_term(self) -> str: @@ -144,7 +144,7 @@ def ls_term(self) -> str: The command used to list the files is ls -1 {ls_term} 2> /dev/null """ assert self.folder - return os.path.join(self.folder, "*%j_srun.log") + return os.path.join(self.folder, "*%j_*_srun.log") @dataclass(kw_only=True) @@ -314,13 +314,30 @@ def get_executor( srun_args = [ "--no-container-mount-home", "--mpi=pmix", - "--wait=10", # we need to be explicit about this in srun as commands might need to run in parallel f"--ntasks-per-node={tasks_per_node}", f"--nodes={num_nodes}", # NeMo-run should take care of this, but we'll put it here temporarily f"--container-env={','.join([k.strip() for k in env_vars.keys()])}", ] + # IMPORTANT: + # Slurm's `srun --wait=` terminates the job step if other tasks are still + # running seconds after the first task exits. + # + # `nemo_run` adds `--wait=60` by default; for multi-instance runs (e.g., chunked + # evaluation) tasks can finish at very different times (some may exit quickly + # due to `++skip_filled=True`), which causes Slurm to kill still-running tasks. + # + # We override this with a large wait by default for multi-instance mode. + # You can customize via cluster config: + # srun_wait_seconds: + srun_wait_seconds = cluster_config.get("srun_wait_seconds") + if srun_wait_seconds is None and tasks_per_node > 1: + # Use a reasonably large wait (1 hour) so long-running ranks aren't killed just + # because other ranks finished earlier. + srun_wait_seconds = 60 * 60 + if srun_wait_seconds is not None: + srun_args.append(f"--wait={int(srun_wait_seconds)}") if overlap: srun_args.append("--overlap") if not cluster_config.get("disable_gpus_per_node", False) and gpus_per_node is not None: diff --git a/nemo_skills/pipeline/utils/generation.py b/nemo_skills/pipeline/utils/generation.py index 4432181cff..5d45159016 100644 --- a/nemo_skills/pipeline/utils/generation.py +++ b/nemo_skills/pipeline/utils/generation.py @@ -495,8 +495,15 @@ def get_generation_cmd( cmd += "++wait_for_sandbox=true " if chunk_id is not None: - cmd += f" ++num_chunks={num_chunks} ++chunk_id={chunk_id} " - output_file = get_chunked_rs_filename(output_dir, random_seed=random_seed, chunk_id=chunk_id) + # Check if chunk_id is a shell expression (e.g., "$((0 + $SLURM_LOCALID))") + is_shell_expr = isinstance(chunk_id, str) and "$" in str(chunk_id) + + if is_shell_expr: + # For shell expressions, use double quotes so shell expands the expression + cmd += f' ++num_chunks={num_chunks} "++chunk_id={chunk_id}" ' + else: + cmd += f" ++num_chunks={num_chunks} ++chunk_id={chunk_id} " + donefiles = [] # we are always waiting for all chunks in num_chunks, no matter chunk_ids in # the current run (as we don't want to merge partial jobs) @@ -505,10 +512,23 @@ def get_generation_cmd( donefile = f"{filename}.done" donefiles.append(donefile) - if job_end_cmd: - job_end_cmd += f" && touch {donefiles[chunk_id]} " + if is_shell_expr: + # For shell expression, compute the donefile path at runtime + # Get the base pattern with _chunk_0 and replace with shell expression + base_donefile = donefiles[0] # e.g., /path/output_chunk_0.jsonl.done + # Replace "_chunk_0.jsonl" with "_chunk_$((expr)).jsonl" where expr is expanded by shell + # Extract the expression part (e.g., "0 + $SLURM_LOCALID" from "$((0 + $SLURM_LOCALID))") + donefile_pattern = base_donefile.replace("_chunk_0.jsonl", f"_chunk_{chunk_id}.jsonl") + if job_end_cmd: + job_end_cmd += f' && touch "{donefile_pattern}" ' + else: + job_end_cmd = f'touch "{donefile_pattern}" ' else: - job_end_cmd = f"touch {donefiles[chunk_id]} " + output_file = get_chunked_rs_filename(output_dir, random_seed=random_seed, chunk_id=chunk_id) + if job_end_cmd: + job_end_cmd += f" && touch {donefiles[chunk_id]} " + else: + job_end_cmd = f"touch {donefiles[chunk_id]} " # getting file name as if there is no chunking since that's where we want to merge merged_output_file = get_chunked_rs_filename(output_dir=output_dir, random_seed=random_seed) @@ -582,6 +602,7 @@ def configure_client( get_random_port: bool, extra_arguments: str, server_container: str | None = None, + gpus_per_node: int = 1, ): """ Utility function to configure a client for the model inference server. @@ -597,6 +618,7 @@ def configure_client( get_random_port: Whether to get a random port for the server. extra_arguments: Extra arguments to pass to the command. server_container: Container to use for the server. + gpus_per_node: Number of GPUs per node for multi-instance mode. Returns: A tuple containing: @@ -625,9 +647,16 @@ def configure_client( } if server_container: server_config["container"] = server_container - extra_arguments = ( - f"++server.host=127.0.0.1 ++server.port={server_port} ++server.model={model} {extra_arguments}" - ) + if gpus_per_node > 1: + # Multi-instance mode: port is computed at runtime based on SLURM_LOCALID + extra_arguments = ( + f"++server.host=127.0.0.1 " + f'"++server.port=$(({server_port} + $SLURM_LOCALID))" ++server.model={model} {extra_arguments}' + ) + else: + extra_arguments = ( + f"++server.host=127.0.0.1 ++server.port={server_port} ++server.model={model} {extra_arguments}" + ) else: # model is hosted elsewhere server_config = None extra_arguments = f"++server.base_url={server_address} ++server.model={model} {extra_arguments}" diff --git a/nemo_skills/pipeline/utils/server.py b/nemo_skills/pipeline/utils/server.py index 87abca4a99..124d3e7a6d 100644 --- a/nemo_skills/pipeline/utils/server.py +++ b/nemo_skills/pipeline/utils/server.py @@ -120,9 +120,17 @@ def get_server_command( server_port: int, server_args: str = "", server_entrypoint: str | None = None, + gpus_per_node: int = 1, ): num_tasks = num_gpus + if gpus_per_node > 1 and server_type != "generic": + raise ValueError( + f"Multi-instance mode (gpus_per_node={gpus_per_node}) is only supported for " + f"server_type='generic', but got server_type='{server_type}'. " + f"Use gpus_per_node=1 or switch to server_type='generic'." + ) + # check if the model path is mounted if not vllm, sglang, or trtllm; # vllm, sglang, trtllm can also pass model name as "model_path" so we need special processing if server_type not in ["vllm", "sglang", "trtllm", "generic"]: @@ -209,15 +217,29 @@ def get_server_command( elif server_type == "generic": if not server_entrypoint: raise ValueError("For 'generic' server type, 'server_entrypoint' must be specified.") - server_start_cmd = ( - f"{server_entrypoint} " - f" --model {model_path} " - f" --num_gpus {num_gpus} " - f" --num_nodes {num_nodes} " - f" --port {server_port} " - f" {server_args} " - ) - num_tasks = 1 + if gpus_per_node > 1: + # Multi-instance mode: each SLURM task gets its own GPU and port + server_start_cmd = ( + f"echo 'SLURM_LOCALID='$SLURM_LOCALID' SLURM_PROCID='$SLURM_PROCID && " + f"export CUDA_VISIBLE_DEVICES=${{SLURM_LOCALID:-0}} && " + f"{server_entrypoint} " + f" --model {model_path} " + f" --num_gpus 1 " + f" --num_nodes 1 " + f" --port $(({server_port} + ${{SLURM_LOCALID:-0}})) " + f" {server_args} " + ) + num_tasks = gpus_per_node + else: + server_start_cmd = ( + f"{server_entrypoint} " + f" --model {model_path} " + f" --num_gpus {num_gpus} " + f" --num_nodes {num_nodes} " + f" --port {server_port} " + f" {server_args} " + ) + num_tasks = 1 else: raise ValueError(f"Server type '{server_type}' not supported for model inference.")