Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions .github/workflows/gpu_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,7 @@ jobs:
cd ${{ github.run_id }}
nvidia-smi
set -o pipefail # this will make sure next line returns non-0 exit code if tests fail
# Run heartbeat in background, capture its PID, and ensure cleanup
(while true; do sleep 60; echo "[HEARTBEAT] $(date '+%Y-%m-%d %H:%M:%S') - still running..."; done) &
HEARTBEAT_PID=$!
# Run tests and capture exit code
EXIT_CODE=0
./tests/gpu-tests/run_qwen.sh || EXIT_CODE=$?
# Kill heartbeat and exit with test result
kill $HEARTBEAT_PID 2>/dev/null || true
exit $EXIT_CODE
./tests/gpu-tests/run_qwen.sh
- name: Cleanup
if: always()
run: |
Expand Down
452 changes: 163 additions & 289 deletions nemo_skills/pipeline/generate.py

Large diffs are not rendered by default.

217 changes: 122 additions & 95 deletions nemo_skills/pipeline/nemo_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,20 @@

import copy
import logging
from dataclasses import dataclass, field
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional

import typer
from nemo_evaluator_launcher.api import RunConfig
from nemo_evaluator_launcher.common.helpers import get_eval_factory_command
from nemo_evaluator_launcher.common.mapping import get_task_from_mapping, load_tasks_mapping
from omegaconf import OmegaConf
from omegaconf import DictConfig, OmegaConf

import nemo_skills.pipeline.utils as pipeline_utils
from nemo_skills.pipeline.app import app, typer_unpacker
from nemo_skills.pipeline.utils.commands import vllm_server_command
from nemo_skills.pipeline.utils.declarative import Command, CommandGroup, HardwareConfig, Pipeline
from nemo_skills.pipeline.utils.scripts import BaseJobScript, ServerScript
from nemo_skills.utils import get_logger_name, setup_logging

LOG = logging.getLogger(get_logger_name(__file__))
Expand Down Expand Up @@ -289,8 +289,8 @@ def nemo_evaluator(
expname=expname,
idx=idx,
task_name=task.name,
launcher_run_cfg=OmegaConf.to_container(launcher_run_cfg, resolve=True),
task_cfg=OmegaConf.to_container(task, resolve=True),
launcher_run_cfg=launcher_run_cfg,
task_cfg=task,
task_definition=task_definition,
base_output_root=base_output_root,
eval_image=eval_image,
Expand Down Expand Up @@ -443,8 +443,10 @@ def _create_serving_command_obj(
idx: int,
task_name: str,
) -> Command:
"""Create a `Command` backed by a `ServerScript` for a hosted serving component.
"""Create a Command object for a hosted serving component (main or judge server).

This function wraps vllm_server_command and standardizes container selection,
logging prefixes, and metadata for both main and judge servers.

Args:
cluster_config: Cluster configuration dictionary
Expand All @@ -462,53 +464,54 @@ def _create_serving_command_obj(
task_name: Task name for naming

Returns:
Command: A Command object whose `script` is a configured `ServerScript`.
Command object configured for the serving component
"""
stype = (server_type or "vllm").lower()
sargs = args or ""
if stype != "vllm":
LOG.warning("Only vllm server_type is supported currently; got %s", stype)

server_script = ServerScript(
server_type=stype,
model_path=model or "",
cmd_str, meta = vllm_server_command(
cluster_config=cluster_config,
num_gpus=gpus,
num_nodes=nodes or 1,
server_args=args or "",
server_entrypoint=entrypoint,
model=model, # type: ignore[arg-type]
port=port,
allocate_port=port is None,
server_type=stype,
gpus=gpus,
nodes=nodes,
args=sargs,
entrypoint=entrypoint,
)

# Judge servers get a distinct log prefix for clarity
if is_judge:
server_script.log_prefix = "judge-server"

# Resolve container fallback when not explicitly provided
if not container:
container = cluster_config["containers"][stype]

log_prefix = "judge-server" if is_judge else "server"
name_role = "judge-server" if is_judge else "server"

return Command(
script=server_script,
command=cmd_str,
container=container,
gpus=gpus,
nodes=nodes or 1,
name=f"{expname}-{name_role}-{idx}-{task_name}",
metadata={
**meta,
"gpus": gpus,
"log_prefix": log_prefix,
},
)


@dataclass
class _TaskCreationContext:
"""Local helper to pass around the information about the task and easier logic sharing.

Note: launcher_run_cfg and task_cfg are stored as plain dicts (not OmegaConf) to allow
serialization by nemo_run/fiddle. Convert back to DictConfig if OmegaConf operations are needed.
"""
"""Local helper to pass around the information about the task and easier logic sharing."""

expname: str
idx: int
task_name: str
launcher_run_cfg: dict # Stored as plain dict for serialization compatibility
task_cfg: dict # Stored as plain dict for serialization compatibility
launcher_run_cfg: RunConfig
task_cfg: DictConfig
task_definition: dict
base_output_root: Optional[str]
eval_image: str
Expand Down Expand Up @@ -627,34 +630,113 @@ def _build_judge_server_if_needed(ctx: _TaskCreationContext) -> Optional[Command
def _build_client_command(
ctx: _TaskCreationContext, main_server_cmd: Optional[Command], judge_server_cmd: Optional[Command]
) -> Command:
"""Create the evaluator client `Command` using `EvaluatorClientScript`.
"""Build Command for evaluator client.

The client command behavior depends on server hosting:
- If servers are co-hosted: Uses lambda factory to resolve runtime URLs via hostname_ref/meta_ref
- If using external servers: Uses static URLs from server_base_url/judge_server_base_url
- If no servers: Uses URLs from evaluator config or defaults

Args:
ctx: Task creation context with all configuration
main_server_cmd: Main server Command if self-hosted, None otherwise
judge_server_cmd: Judge server Command if self-hosted, None otherwise

Returns:
Command: A Command whose script builds the evaluator CLI at runtime
Command object for evaluator client
"""
if ctx.hosting_server or ctx.hosting_judge:
# Co-hosted servers: Use lambda factory to resolve runtime URLs
# The lambda is evaluated at execution time when het_group_index is assigned
def _client_cmd_factory():
waits: List[str] = []
target_url: Optional[str] = None
judge_url: Optional[str] = None

client_script = EvaluatorClientScript(
ctx=ctx,
main_server_script=main_server_cmd.script if main_server_cmd else None,
judge_server_script=judge_server_cmd.script if judge_server_cmd else None,
# Build main server URL from runtime references
if ctx.hosting_server and main_server_cmd is not None:
server_host = main_server_cmd.hostname_ref()
server_port_val = main_server_cmd.meta_ref("port")
base_url = f"http://{server_host}:{server_port_val}"
waits.append(pipeline_utils.get_server_wait_cmd(f"{base_url}{ctx.server_health_path}"))
target_url = f"{base_url}{ctx.server_api_path}"

# Build judge server URL from runtime references
if ctx.hosting_judge and judge_server_cmd is not None:
jhost = judge_server_cmd.hostname_ref()
jport = judge_server_cmd.meta_ref("port")
jbase = f"http://{jhost}:{jport}"
waits.append(pipeline_utils.get_server_wait_cmd(f"{jbase}{ctx.judge_server_health_path}"))
judge_url = f"{jbase}{ctx.judge_server_api_path}"

# Wait for servers to be ready, then run evaluator
wait_cmd = " && ".join(waits) if waits else "true"
cmd = _build_task_cmd(
task_name=ctx.task_name,
launcher_run_cfg=ctx.launcher_run_cfg,
task_cfg=ctx.task_cfg,
task_definition=ctx.task_definition,
expname=ctx.expname,
base_output_root=ctx.base_output_root,
url_override=target_url,
model_id=ctx.server_model,
judge_url_override=judge_url,
judge_model_id=ctx.judge_server_model,
)
return f"{wait_cmd} && {cmd}"

return Command(
command=_client_cmd_factory,
container=ctx.eval_image,
gpus=ctx.job_gpus or None,
nodes=ctx.job_nodes or 1,
name=f"{ctx.expname}-client-{ctx.idx}-{ctx.task_name}",
metadata={
"log_prefix": "main",
"environment": ctx.env_vars,
"gpus": ctx.job_gpus or None,
},
)

# No hosted servers: Use external URLs or config defaults
server_url = None
if ctx.with_external_server and ctx.server_base_url:
server_url = ctx.server_base_url.rstrip("/") + ctx.server_api_path
judge_url = None
if ctx.with_external_judge and ctx.judge_server_base_url:
judge_url = ctx.judge_server_base_url.rstrip("/") + ctx.judge_server_api_path

eval_cmd = _build_task_cmd(
task_name=ctx.task_name,
launcher_run_cfg=ctx.launcher_run_cfg,
task_cfg=ctx.task_cfg,
task_definition=ctx.task_definition,
expname=ctx.expname,
base_output_root=ctx.base_output_root,
url_override=server_url,
model_id=ctx.server_model,
judge_url_override=judge_url,
judge_model_id=ctx.judge_server_model,
)

return Command(
script=client_script,
command=eval_cmd,
container=ctx.eval_image,
name=f"{ctx.expname}-client-{ctx.idx}-{ctx.task_name}",
gpus=None,
nodes=ctx.job_nodes or 1,
name=f"{ctx.expname}-{ctx.idx}-{ctx.task_name}",
metadata={
"log_prefix": "main",
"environment": ctx.env_vars,
"gpus": ctx.job_gpus or None,
},
)


def _build_task_cmd(
task_name: str,
launcher_run_cfg: dict,
task_cfg: dict,
launcher_run_cfg: DictConfig,
task_cfg: DictConfig,
task_definition: dict,
expname: str,
base_output_root: Optional[str],
Expand All @@ -670,8 +752,8 @@ def _build_task_cmd(

Args:
task_name: Task identifier (e.g., "ifeval", "gpqa_diamond")
launcher_run_cfg: Global evaluator configuration (as plain dict)
task_cfg: Task-specific configuration (as plain dict, may include task-level overrides)
launcher_run_cfg: Global evaluator configuration from RunConfig
task_cfg: Task-specific configuration (may include task-level overrides)
task_definition: Task definition from mapping (container, harness info)
expname: Experiment name for output directory structure
base_output_root: Base directory for task outputs
Expand All @@ -689,9 +771,7 @@ def _build_task_cmd(
- Judge: config.params.extra.judge.url
Output directory is set to: {base_output_root}/{expname}/nemo-evaluator-results/{task_name}
"""
# Convert back to DictConfig for OmegaConf operations
launcher_run_cfg = OmegaConf.create(launcher_run_cfg)
task_cfg_copy = OmegaConf.create(copy.deepcopy(task_cfg))
task_cfg_copy = copy.deepcopy(task_cfg)
if url_override:
OmegaConf.update(task_cfg_copy, "overrides", {"target.api_endpoint.url": url_override}, force_add=True)

Expand Down Expand Up @@ -726,56 +806,3 @@ def _build_task_cmd(
cmd_struct = get_eval_factory_command(launcher_run_cfg, task_cfg_copy, task_definition)

return cmd_struct.cmd


@dataclass(kw_only=True)
class EvaluatorClientScript(BaseJobScript):
"""run.Script implementation for nemo-evaluator client with runtime server resolution."""

ctx: _TaskCreationContext
main_server_script: Optional[ServerScript] = None
judge_server_script: Optional[ServerScript] = None
log_prefix: str = field(default="main", init=False)

def __post_init__(self):
def build_command():
waits: List[str] = []
target_url: Optional[str] = None
judge_url: Optional[str] = None

if self.ctx.hosting_server and self.main_server_script is not None:
server_host = self.main_server_script.hostname_ref()
base_url = f"http://{server_host}:{self.main_server_script.port}"
waits.append(pipeline_utils.get_server_wait_cmd(f"{base_url}{self.ctx.server_health_path}"))
target_url = f"{base_url}{self.ctx.server_api_path}"
elif self.ctx.with_external_server and self.ctx.server_base_url:
target_url = self.ctx.server_base_url.rstrip("/") + self.ctx.server_api_path

if self.ctx.hosting_judge and self.judge_server_script is not None:
judge_host = self.judge_server_script.hostname_ref()
judge_base = f"http://{judge_host}:{self.judge_server_script.port}"
waits.append(pipeline_utils.get_server_wait_cmd(f"{judge_base}{self.ctx.judge_server_health_path}"))
judge_url = f"{judge_base}{self.ctx.judge_server_api_path}"
elif self.ctx.with_external_judge and self.ctx.judge_server_base_url:
judge_url = self.ctx.judge_server_base_url.rstrip("/") + self.ctx.judge_server_api_path

cmd = _build_task_cmd(
task_name=self.ctx.task_name,
launcher_run_cfg=self.ctx.launcher_run_cfg,
task_cfg=self.ctx.task_cfg,
task_definition=self.ctx.task_definition,
expname=self.ctx.expname,
base_output_root=self.ctx.base_output_root,
url_override=target_url,
model_id=self.ctx.server_model,
judge_url_override=judge_url,
judge_model_id=self.ctx.judge_server_model,
)

wait_cmd = " && ".join(waits) if waits else None
final_cmd = f"{wait_cmd} && {cmd}" if wait_cmd else cmd
env_vars = copy.deepcopy(self.ctx.env_vars)
return final_cmd, {"environment": env_vars}

self.set_inline(build_command)
super().__post_init__()
2 changes: 0 additions & 2 deletions nemo_skills/pipeline/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
get_chunked_rs_filename,
get_generation_cmd,
get_remaining_jobs,
normalize_models_config,
normalize_parameter,
wrap_cmd,
)
from nemo_skills.pipeline.utils.mounts import (
Expand Down
Loading