Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
0a79fda
WIP add run.Script interface
gwarmstrong Nov 13, 2025
069e2eb
ENH actually pass script to nemo-run
gwarmstrong Nov 13, 2025
b5cfa55
ENH actually pass script to nemo-run
gwarmstrong Nov 13, 2025
fae9c52
ENH actually pass script to nemo-run
gwarmstrong Nov 13, 2025
d371be9
Make run.Script refactor multi model
gwarmstrong Nov 13, 2025
0d1ec46
MAINT convert run.Script to expose multi model
gwarmstrong Nov 13, 2025
b8238f8
MAINT convert examples to use run.Script
gwarmstrong Nov 18, 2025
5bca483
MAINT some small consolidations for generation
gwarmstrong Nov 18, 2025
5691132
resolve some sbatch differences
gwarmstrong Nov 19, 2025
874ba1d
MAINT simplify hardware specificaiton
gwarmstrong Nov 20, 2025
7534d1f
MAINT simplify task naming scheme
gwarmstrong Nov 20, 2025
4834302
ENH make method explicit
gwarmstrong Nov 21, 2025
ca594a2
MAINT update group name
gwarmstrong Nov 21, 2025
1e6da25
MAINT simplify entrypoint
gwarmstrong Nov 21, 2025
91a1587
FIX hetgroup references
gwarmstrong Nov 21, 2025
223a482
Merge branch 'main' of github.com:NVIDIA/NeMo-Skills into georgea/ref…
gwarmstrong Nov 21, 2025
363be21
FIX update nemo_evaluator with new run.Script syntax
gwarmstrong Nov 21, 2025
93aea1e
TST fix tests for updated internals
gwarmstrong Nov 21, 2025
5624ea0
FIX declarative pipeline tests
gwarmstrong Nov 22, 2025
a737dfc
FIX mounting nemo run in local execution context
gwarmstrong Nov 22, 2025
b8de6c3
Merge branch 'main' into georgea/refactor-generate-run-script
gwarmstrong Nov 22, 2025
8162f17
Merge branch 'main' into georgea/refactor-generate-run-script
gwarmstrong Dec 4, 2025
c79188a
Merge branch 'main' into georgea/refactor-generate-run-script
gwarmstrong Dec 5, 2025
8e35ca7
Merge branch 'main' into georgea/refactor-generate-run-script
gwarmstrong Dec 10, 2025
1ffc6dc
FIX local executor path rewrite
gwarmstrong Dec 11, 2025
47e3f34
Merge branch 'main' into georgea/refactor-generate-run-script
gwarmstrong Dec 11, 2025
dc083a2
Merge branch 'main' into georgea/refactor-generate-run-script
gwarmstrong Dec 12, 2025
7517ac8
FIX pass sandbox metadata
gwarmstrong Dec 12, 2025
58ce8ef
FIX tests
gwarmstrong Dec 12, 2025
47d0e6f
add heartbeat to gpu tests
gwarmstrong Dec 12, 2025
02dcbd7
MAINT update hearbeat
gwarmstrong Dec 12, 2025
9c1a5f6
try skip mrcr
gwarmstrong Dec 12, 2025
cce3822
MAINT fix nemo-evaluator
gwarmstrong Dec 12, 2025
17de525
Merge branch 'main' into georgea/refactor-generate-run-script
gwarmstrong Dec 12, 2025
17634ea
fix sandbox send args
gwarmstrong Dec 13, 2025
1c32167
Merge branch 'main' into georgea/refactor-generate-run-script
gwarmstrong Dec 16, 2025
cef4d1b
Merge branch 'main' into georgea/refactor-generate-run-script
gwarmstrong Dec 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .github/workflows/gpu_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,15 @@ jobs:
cd ${{ github.run_id }}
nvidia-smi
set -o pipefail # this will make sure next line returns non-0 exit code if tests fail
./tests/gpu-tests/run_qwen.sh
# Run heartbeat in background, capture its PID, and ensure cleanup
(while true; do sleep 60; echo "[HEARTBEAT] $(date '+%Y-%m-%d %H:%M:%S') - still running..."; done) &
HEARTBEAT_PID=$!
# Run tests and capture exit code
EXIT_CODE=0
./tests/gpu-tests/run_qwen.sh || EXIT_CODE=$?
# Kill heartbeat and exit with test result
kill $HEARTBEAT_PID 2>/dev/null || true
exit $EXIT_CODE
- name: Cleanup
if: always()
run: |
Expand Down
452 changes: 289 additions & 163 deletions nemo_skills/pipeline/generate.py

Large diffs are not rendered by default.

217 changes: 95 additions & 122 deletions nemo_skills/pipeline/nemo_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,20 @@

import copy
import logging
from dataclasses import dataclass
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional

import typer
from nemo_evaluator_launcher.api import RunConfig
from nemo_evaluator_launcher.common.helpers import get_eval_factory_command
from nemo_evaluator_launcher.common.mapping import get_task_from_mapping, load_tasks_mapping
from omegaconf import DictConfig, OmegaConf
from omegaconf import OmegaConf

import nemo_skills.pipeline.utils as pipeline_utils
from nemo_skills.pipeline.app import app, typer_unpacker
from nemo_skills.pipeline.utils.commands import vllm_server_command
from nemo_skills.pipeline.utils.declarative import Command, CommandGroup, HardwareConfig, Pipeline
from nemo_skills.pipeline.utils.scripts import BaseJobScript, ServerScript
from nemo_skills.utils import get_logger_name, setup_logging

LOG = logging.getLogger(get_logger_name(__file__))
Expand Down Expand Up @@ -289,8 +289,8 @@ def nemo_evaluator(
expname=expname,
idx=idx,
task_name=task.name,
launcher_run_cfg=launcher_run_cfg,
task_cfg=task,
launcher_run_cfg=OmegaConf.to_container(launcher_run_cfg, resolve=True),
task_cfg=OmegaConf.to_container(task, resolve=True),
task_definition=task_definition,
base_output_root=base_output_root,
eval_image=eval_image,
Expand Down Expand Up @@ -443,10 +443,8 @@ def _create_serving_command_obj(
idx: int,
task_name: str,
) -> Command:
"""Create a Command object for a hosted serving component (main or judge server).
"""Create a `Command` backed by a `ServerScript` for a hosted serving component.

This function wraps vllm_server_command and standardizes container selection,
logging prefixes, and metadata for both main and judge servers.

Args:
cluster_config: Cluster configuration dictionary
Expand All @@ -464,54 +462,53 @@ def _create_serving_command_obj(
task_name: Task name for naming

Returns:
Command object configured for the serving component
Command: A Command object whose `script` is a configured `ServerScript`.
"""
stype = (server_type or "vllm").lower()
sargs = args or ""
if stype != "vllm":
LOG.warning("Only vllm server_type is supported currently; got %s", stype)

cmd_str, meta = vllm_server_command(
server_script = ServerScript(
server_type=stype,
model_path=model or "",
cluster_config=cluster_config,
model=model, # type: ignore[arg-type]
num_gpus=gpus,
num_nodes=nodes or 1,
server_args=args or "",
server_entrypoint=entrypoint,
port=port,
server_type=stype,
gpus=gpus,
nodes=nodes,
args=sargs,
entrypoint=entrypoint,
allocate_port=port is None,
)

# Resolve container fallback when not explicitly provided
# Judge servers get a distinct log prefix for clarity
if is_judge:
server_script.log_prefix = "judge-server"

if not container:
container = cluster_config["containers"][stype]

log_prefix = "judge-server" if is_judge else "server"
name_role = "judge-server" if is_judge else "server"

return Command(
command=cmd_str,
script=server_script,
container=container,
gpus=gpus,
nodes=nodes or 1,
name=f"{expname}-{name_role}-{idx}-{task_name}",
metadata={
**meta,
"gpus": gpus,
"log_prefix": log_prefix,
},
)


@dataclass
class _TaskCreationContext:
"""Local helper to pass around the information about the task and easier logic sharing."""
"""Local helper to pass around the information about the task and easier logic sharing.

Note: launcher_run_cfg and task_cfg are stored as plain dicts (not OmegaConf) to allow
serialization by nemo_run/fiddle. Convert back to DictConfig if OmegaConf operations are needed.
"""

expname: str
idx: int
task_name: str
launcher_run_cfg: RunConfig
task_cfg: DictConfig
launcher_run_cfg: dict # Stored as plain dict for serialization compatibility
task_cfg: dict # Stored as plain dict for serialization compatibility
task_definition: dict
base_output_root: Optional[str]
eval_image: str
Expand Down Expand Up @@ -630,113 +627,34 @@ def _build_judge_server_if_needed(ctx: _TaskCreationContext) -> Optional[Command
def _build_client_command(
ctx: _TaskCreationContext, main_server_cmd: Optional[Command], judge_server_cmd: Optional[Command]
) -> Command:
"""Build Command for evaluator client.

The client command behavior depends on server hosting:
- If servers are co-hosted: Uses lambda factory to resolve runtime URLs via hostname_ref/meta_ref
- If using external servers: Uses static URLs from server_base_url/judge_server_base_url
- If no servers: Uses URLs from evaluator config or defaults
"""Create the evaluator client `Command` using `EvaluatorClientScript`.

Args:
ctx: Task creation context with all configuration
main_server_cmd: Main server Command if self-hosted, None otherwise
judge_server_cmd: Judge server Command if self-hosted, None otherwise

Returns:
Command object for evaluator client
Command: A Command whose script builds the evaluator CLI at runtime
"""
if ctx.hosting_server or ctx.hosting_judge:
# Co-hosted servers: Use lambda factory to resolve runtime URLs
# The lambda is evaluated at execution time when het_group_index is assigned
def _client_cmd_factory():
waits: List[str] = []
target_url: Optional[str] = None
judge_url: Optional[str] = None

# Build main server URL from runtime references
if ctx.hosting_server and main_server_cmd is not None:
server_host = main_server_cmd.hostname_ref()
server_port_val = main_server_cmd.meta_ref("port")
base_url = f"http://{server_host}:{server_port_val}"
waits.append(pipeline_utils.get_server_wait_cmd(f"{base_url}{ctx.server_health_path}"))
target_url = f"{base_url}{ctx.server_api_path}"

# Build judge server URL from runtime references
if ctx.hosting_judge and judge_server_cmd is not None:
jhost = judge_server_cmd.hostname_ref()
jport = judge_server_cmd.meta_ref("port")
jbase = f"http://{jhost}:{jport}"
waits.append(pipeline_utils.get_server_wait_cmd(f"{jbase}{ctx.judge_server_health_path}"))
judge_url = f"{jbase}{ctx.judge_server_api_path}"

# Wait for servers to be ready, then run evaluator
wait_cmd = " && ".join(waits) if waits else "true"
cmd = _build_task_cmd(
task_name=ctx.task_name,
launcher_run_cfg=ctx.launcher_run_cfg,
task_cfg=ctx.task_cfg,
task_definition=ctx.task_definition,
expname=ctx.expname,
base_output_root=ctx.base_output_root,
url_override=target_url,
model_id=ctx.server_model,
judge_url_override=judge_url,
judge_model_id=ctx.judge_server_model,
)
return f"{wait_cmd} && {cmd}"

return Command(
command=_client_cmd_factory,
container=ctx.eval_image,
gpus=ctx.job_gpus or None,
nodes=ctx.job_nodes or 1,
name=f"{ctx.expname}-client-{ctx.idx}-{ctx.task_name}",
metadata={
"log_prefix": "main",
"environment": ctx.env_vars,
"gpus": ctx.job_gpus or None,
},
)

# No hosted servers: Use external URLs or config defaults
server_url = None
if ctx.with_external_server and ctx.server_base_url:
server_url = ctx.server_base_url.rstrip("/") + ctx.server_api_path
judge_url = None
if ctx.with_external_judge and ctx.judge_server_base_url:
judge_url = ctx.judge_server_base_url.rstrip("/") + ctx.judge_server_api_path

eval_cmd = _build_task_cmd(
task_name=ctx.task_name,
launcher_run_cfg=ctx.launcher_run_cfg,
task_cfg=ctx.task_cfg,
task_definition=ctx.task_definition,
expname=ctx.expname,
base_output_root=ctx.base_output_root,
url_override=server_url,
model_id=ctx.server_model,
judge_url_override=judge_url,
judge_model_id=ctx.judge_server_model,
client_script = EvaluatorClientScript(
ctx=ctx,
main_server_script=main_server_cmd.script if main_server_cmd else None,
judge_server_script=judge_server_cmd.script if judge_server_cmd else None,
)

return Command(
command=eval_cmd,
script=client_script,
container=ctx.eval_image,
gpus=None,
nodes=ctx.job_nodes or 1,
name=f"{ctx.expname}-{ctx.idx}-{ctx.task_name}",
metadata={
"log_prefix": "main",
"environment": ctx.env_vars,
"gpus": ctx.job_gpus or None,
},
name=f"{ctx.expname}-client-{ctx.idx}-{ctx.task_name}",
)


def _build_task_cmd(
task_name: str,
launcher_run_cfg: DictConfig,
task_cfg: DictConfig,
launcher_run_cfg: dict,
task_cfg: dict,
task_definition: dict,
expname: str,
base_output_root: Optional[str],
Expand All @@ -752,8 +670,8 @@ def _build_task_cmd(

Args:
task_name: Task identifier (e.g., "ifeval", "gpqa_diamond")
launcher_run_cfg: Global evaluator configuration from RunConfig
task_cfg: Task-specific configuration (may include task-level overrides)
launcher_run_cfg: Global evaluator configuration (as plain dict)
task_cfg: Task-specific configuration (as plain dict, may include task-level overrides)
task_definition: Task definition from mapping (container, harness info)
expname: Experiment name for output directory structure
base_output_root: Base directory for task outputs
Expand All @@ -771,7 +689,9 @@ def _build_task_cmd(
- Judge: config.params.extra.judge.url
Output directory is set to: {base_output_root}/{expname}/nemo-evaluator-results/{task_name}
"""
task_cfg_copy = copy.deepcopy(task_cfg)
# Convert back to DictConfig for OmegaConf operations
launcher_run_cfg = OmegaConf.create(launcher_run_cfg)
task_cfg_copy = OmegaConf.create(copy.deepcopy(task_cfg))
if url_override:
OmegaConf.update(task_cfg_copy, "overrides", {"target.api_endpoint.url": url_override}, force_add=True)

Expand Down Expand Up @@ -806,3 +726,56 @@ def _build_task_cmd(
cmd_struct = get_eval_factory_command(launcher_run_cfg, task_cfg_copy, task_definition)

return cmd_struct.cmd


@dataclass(kw_only=True)
class EvaluatorClientScript(BaseJobScript):
"""run.Script implementation for nemo-evaluator client with runtime server resolution."""

ctx: _TaskCreationContext
main_server_script: Optional[ServerScript] = None
judge_server_script: Optional[ServerScript] = None
log_prefix: str = field(default="main", init=False)

def __post_init__(self):
def build_command():
waits: List[str] = []
target_url: Optional[str] = None
judge_url: Optional[str] = None

if self.ctx.hosting_server and self.main_server_script is not None:
server_host = self.main_server_script.hostname_ref()
base_url = f"http://{server_host}:{self.main_server_script.port}"
waits.append(pipeline_utils.get_server_wait_cmd(f"{base_url}{self.ctx.server_health_path}"))
target_url = f"{base_url}{self.ctx.server_api_path}"
elif self.ctx.with_external_server and self.ctx.server_base_url:
target_url = self.ctx.server_base_url.rstrip("/") + self.ctx.server_api_path

if self.ctx.hosting_judge and self.judge_server_script is not None:
judge_host = self.judge_server_script.hostname_ref()
judge_base = f"http://{judge_host}:{self.judge_server_script.port}"
waits.append(pipeline_utils.get_server_wait_cmd(f"{judge_base}{self.ctx.judge_server_health_path}"))
judge_url = f"{judge_base}{self.ctx.judge_server_api_path}"
elif self.ctx.with_external_judge and self.ctx.judge_server_base_url:
judge_url = self.ctx.judge_server_base_url.rstrip("/") + self.ctx.judge_server_api_path

cmd = _build_task_cmd(
task_name=self.ctx.task_name,
launcher_run_cfg=self.ctx.launcher_run_cfg,
task_cfg=self.ctx.task_cfg,
task_definition=self.ctx.task_definition,
expname=self.ctx.expname,
base_output_root=self.ctx.base_output_root,
url_override=target_url,
model_id=self.ctx.server_model,
judge_url_override=judge_url,
judge_model_id=self.ctx.judge_server_model,
)

wait_cmd = " && ".join(waits) if waits else None
final_cmd = f"{wait_cmd} && {cmd}" if wait_cmd else cmd
env_vars = copy.deepcopy(self.ctx.env_vars)
return final_cmd, {"environment": env_vars}

self.set_inline(build_command)
super().__post_init__()
2 changes: 2 additions & 0 deletions nemo_skills/pipeline/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
get_chunked_rs_filename,
get_generation_cmd,
get_remaining_jobs,
normalize_models_config,
normalize_parameter,
wrap_cmd,
)
from nemo_skills.pipeline.utils.mounts import (
Expand Down
Loading