diff --git a/docs/evaluation/eval-kit.md b/docs/evaluation/eval-kit.md deleted file mode 100644 index f5658edae3..0000000000 --- a/docs/evaluation/eval-kit.md +++ /dev/null @@ -1,282 +0,0 @@ -# VLMEvalKit Integration (eval_kit) - -This page explains how to run VLMEvalKit benchmarks through NeMo Skills using the `eval_kit` generation module. This enables evaluating Megatron multimodal models on VLMEvalKit's benchmark collection (MMBench, LibriSpeech, TedLium, etc.) without leaving the NeMo Skills pipeline. - -## Overview - -Two inference modes are available: - -| Mode | How it works | When to use | -|------|-------------|-------------| -| **mcore** | Megatron model loaded in-process via `torchrun` (no HTTP server) | Megatron checkpoints | -| **vllm** | NeMo Skills starts a vLLM server, VLMEvalKit connects as client | HF models served by vLLM | - -Both modes use the same pipeline command — the only difference is the `++model_type` flag. - -## Prerequisites - -Before running eval_kit benchmarks, you need four things set up: - -### 1. VLMEvalKit source code (local) - -The `vlmeval/` directory from VLMEvalKit gets packaged and shipped to the cluster automatically. You need a local clone: - -```bash -# Clone VLMEvalKit (NVIDIA internal fork with MultiModalMCore support) -git clone VLMEvalKitMcore /path/to/VLMEvalKitMcore -``` - -Then set the environment variable **before running any `ns eval` command**: - -```bash -export NEMO_SKILLS_VLMEVALKIT_PATH=/path/to/VLMEvalKitMcore -``` - -!!! important - This path is read **locally at submission time**. The pipeline packages the `vlmeval/` subdirectory and rsyncs it to the cluster. It does NOT need to exist on the cluster. - -### 2. eval_kit container on the cluster - -The eval_kit container must have PyTorch, Megatron, and VLMEvalKit dependencies pre-installed. Add it to your cluster config: -This container can be found in container storage - -```yaml -# cluster_configs/my_cluster.yaml -containers: - eval_kit: /path/to/eval-kit-nemo-skills.sqsh - # ... other containers -``` - -### 3. Megatron path (for mcore mode) - -The container needs access to a Megatron-LM installation. Set it in your cluster config: - -```yaml -env_vars: - - MEGATRON_PATH=/path/to/megatron-lm - - PYTHONPATH=/path/to/megatron-lm -``` - -And ensure the path is mounted: - -```yaml -mounts: - - /host/path/to/megatron-lm:/host/path/to/megatron-lm -``` - -### 4. VLMEvalKit dataset cache (for benchmarks that download from HuggingFace) - -VLMEvalKit downloads benchmark data on first use. Set a persistent cache directory: - -```yaml -env_vars: - - LMUData=/path/to/vlmevalkit_cache -``` - -## Running eval_kit Benchmarks - -### Mode 1: Megatron in-process (mcore) - -This is the primary mode. The model runs directly inside the `torchrun` process — no separate server. - -```bash -export NEMO_SKILLS_VLMEVALKIT_PATH=/path/to/VLMEvalKitMcore - -ns eval \ - --cluster=my_cluster \ - --output_dir=/path/to/results \ - --benchmarks=eval_kit.LibriSpeech_test_clean \ - --server_type=megatron \ - --server_gpus=8 \ - --server_container=/path/to/eval-kit-nemo-skills.sqsh \ - ++model_type=mcore \ - ++model_config=/path/to/config.yaml \ - ++load_dir=/path/to/checkpoint/TP_1/ -``` - -Key parameters: - -| Parameter | Purpose | -|-----------|---------| -| `--benchmarks=eval_kit.` | VLMEvalKit dataset name (e.g., `LibriSpeech_test_clean`, `MMBench_DEV_EN`, `TedLium_ASR_Test`) | -| `++model_type=mcore` | Triggers self-contained mode (no HTTP server, model loaded in-process) | -| `++model_config=` | Path to Megatron model YAML config on the cluster | -| `++load_dir=` | Path to Megatron checkpoint directory on the cluster | -| `--server_gpus=8` | Number of GPUs allocated to the torchrun process | -| `--server_container=` | Container with Megatron + VLMEvalKit dependencies | - -!!! note - `--server_gpus` controls GPU allocation even though no server is started. In mcore mode, these GPUs go directly to the `torchrun` main task. - -!!! note - `--model` is **not needed** for mcore mode — the model is specified via `++model_config` and `++load_dir`. - -### Mode 2: vLLM server - -The pipeline starts a vLLM server, and VLMEvalKit's `VLLMLocal` client connects to it. - -```bash -export NEMO_SKILLS_VLMEVALKIT_PATH=/path/to/VLMEvalKitMcore - -ns eval \ - --cluster=my_cluster \ - --output_dir=/path/to/results \ - --benchmarks=eval_kit.MMBench_DEV_EN \ - --model=Qwen/Qwen2-Audio-7B-Instruct \ - --server_type=vllm \ - --server_gpus=2 \ - --server_container=/path/to/vllm-audio.sqsh \ - --main_container=/path/to/eval-kit-nemo-skills.sqsh \ - --server_args="--max-model-len 8192 --trust-remote-code" \ - ++model_type=vllm \ - ++model_name=qwen2-audio-7b -``` - -Key differences from mcore mode: - -| Parameter | Purpose | -|-----------|---------| -| `--model=` | HuggingFace model name or path (vLLM downloads/loads it) | -| `++model_type=vllm` | VLMEvalKit uses its `VLLMLocal` client | -| `++model_name=` | Model identifier used by VLMEvalKit for result naming | -| `--main_container=` | Container for the eval_kit client (must have `vlmeval`). Separate from the vLLM server container | -| `--server_container=` | Container for the vLLM server | - -!!! warning - The vLLM server container and the eval_kit client container are different. Use `--server_container` for vLLM and `--main_container` for the eval_kit client that needs `vlmeval`. - -## Available Benchmarks - -Any VLMEvalKit dataset can be used with the `eval_kit.` prefix. Examples: - -### Audio / ASR - -| Benchmark name | Dataset | -|---|---| -| `eval_kit.LibriSpeech_test_clean` | LibriSpeech test-clean (2,620 samples) | -| `eval_kit.LibriSpeech_test_other` | LibriSpeech test-other | -| `eval_kit.TedLium_ASR_Test` | TED-LIUM | -| `eval_kit.GigaSpeech_ASR_test` | GigaSpeech | -| `eval_kit.VoxPopuli_ASR_test` | VoxPopuli | -| `eval_kit.AMI_ASR_Test` | AMI meeting transcription | -| `eval_kit.SPGISpeech_ASR_test` | SPGISpeech | -| `eval_kit.Earnings22_ASR_Test` | Earnings22 | - -### Vision-Language - -| Benchmark name | Dataset | -|---|---| -| `eval_kit.MMBench_DEV_EN` | MMBench English dev | -| `eval_kit.MME` | MME perception + cognition | -| `eval_kit.MMMU_DEV_VAL` | MMMU dev+val | -| `eval_kit.MathVista_MINI` | MathVista mini | - -The full list depends on your VLMEvalKit version. Check `vlmeval/dataset/` for all supported datasets. - -## mcore_skills: NeMo Skills Data + Megatron In-Process - -For benchmarks that already have NeMo Skills JSONL data (like `asr-leaderboard`), you can use the `mcore_skills` generation type. This reads NeMo Skills data and prompts but uses MultiModalMCore for inference (no server). - -```bash -export NEMO_SKILLS_VLMEVALKIT_PATH=/path/to/VLMEvalKitMcore - -ns eval \ - --cluster=my_cluster \ - --output_dir=/path/to/results \ - --benchmarks=asr-leaderboard \ - --split=librispeech_clean \ - --data_dir=/data \ - --generation_type=mcore_skills \ - --server_type=megatron \ - --server_gpus=8 \ - --server_container=/path/to/eval-kit-nemo-skills.sqsh \ - ++model_config=/path/to/config.yaml \ - ++load_dir=/path/to/checkpoint/TP_1/ \ - ++tokenizer=/path/to/tokenizer -``` - -Key differences from eval_kit: - -| | eval_kit | mcore_skills | -|---|---|---| -| Data source | VLMEvalKit downloads from HuggingFace | NeMo Skills JSONL from `--data_dir` | -| Prompts | VLMEvalKit's built-in prompts | NeMo Skills prompt templates | -| Evaluation | VLMEvalKit's `dataset.evaluate()` | ASR WER via VLMEvalKit's `asr_wer()` | -| Benchmarks | Any VLMEvalKit dataset | Any NeMo Skills benchmark with JSONL | -| Flag | `--benchmarks=eval_kit.` | `--generation_type=mcore_skills` | - -## Cluster Config Example - -Here is a complete cluster config section for eval_kit support: - -```yaml -containers: - eval_kit: /path/to/eval-kit-nemo-skills.sqsh - megatron: /path/to/megatron-container.sqsh - vllm: /path/to/vllm-container.sqsh - # ... other containers - -mounts: - - /path/to/megatron-lm:/path/to/megatron-lm - - /path/to/data:/data - - /path/to/hf_cache:/workspace_hf/hf_cache - - /path/to/vlmevalkit_cache:/path/to/vlmevalkit_cache - -env_vars: - - MEGATRON_PATH=/path/to/megatron-lm - - PYTHONPATH=/path/to/megatron-lm - - LMUData=/path/to/vlmevalkit_cache - - HF_HOME=/workspace_hf/hf_cache - - HYDRA_FULL_ERROR=1 - - CUDA_DEVICE_MAX_CONNECTIONS=1 -``` - -## Understanding Results - -After evaluation completes, results are in `/eval-results/`: - -``` -/ -└── eval-results/ - └── eval_kit.LibriSpeech_test_clean/ - ├── output.jsonl # Per-sample results (generation + expected_answer) - ├── eval_kit_metrics.json # Aggregate metrics from VLMEvalKit - └── metrics.json # NeMo Skills summary -``` - -The `eval_kit_metrics.json` contains VLMEvalKit's computed metrics. For ASR benchmarks this is typically: - -```json -{ - "result": " Dataset WER (%) Metric\n0 LibriSpeechDataset 1.555811 WER" -} -``` - -## Troubleshooting - -### `No module named 'megatron.core'` - -The `MEGATRON_PATH` or `PYTHONPATH` is not set correctly in the cluster config `env_vars`. Ensure both point to a Megatron-LM installation that contains `megatron/core/`. - -### `env variable RD_TABLEBENCH_SRC is missing` - -Some VLMEvalKit versions have a hard assert on this environment variable at import time. Fix: use the stable VLMEvalKitMcore version, or set `RD_TABLEBENCH_SRC=/tmp` in your cluster config env_vars. - -### `ModuleNotFoundError: No module named 'vlmeval'` - -The `NEMO_SKILLS_VLMEVALKIT_PATH` was not set when you ran `ns eval`, so the `vlmeval/` directory was not packaged. Set it and re-run: - -```bash -export NEMO_SKILLS_VLMEVALKIT_PATH=/path/to/VLMEvalKitMcore -ns eval ... -``` - -### Installation command for missing dependencies - -If the eval_kit container is missing some Python packages, use `--installation_command`: - -```bash ---installation_command "pip install --no-deps pylatexenc==2.10" -``` - -This runs inside the container before the main task starts. diff --git a/docs/evaluation/index.md b/docs/evaluation/index.md index 6a54960b39..32f885d4d9 100644 --- a/docs/evaluation/index.md +++ b/docs/evaluation/index.md @@ -12,7 +12,6 @@ We support many popular benchmarks and it's easy to add new in the future. The f - [**Multilingual**](./multilingual.md): e.g. [mmlu-prox](./multilingual.md#mmlu-prox), [flores-200](./multilingual.md#flores-200), [wmt24pp](./multilingual.md#wmt24pp) - [**Speech & Audio**](./speech-audio.md): e.g. [asr-leaderboard](./speech-audio.md#asr-leaderboard), [mmau-pro](./speech-audio.md#mmau-pro) - [**Vision-Language Models (VLM)**](./vlm.md): e.g. [mmmu-pro](./vlm.md#mmmu-pro) -- [**VLMEvalKit Integration (eval_kit)**](./eval-kit.md): Run VLMEvalKit benchmarks via Megatron in-process or vLLM - [**Speculative Decoding (SD)**](./speculative-decoding.md): e.g. [SPEED-Bench](./speculative-decoding.md#SPEED-Bench) See [nemo_skills/dataset](https://github.com/NVIDIA-NeMo/Skills/blob/main/nemo_skills/dataset) where each folder is a benchmark we support. diff --git a/nemo_skills/dataset/eval_kit/__init__.py b/nemo_skills/dataset/eval_kit/__init__.py deleted file mode 100644 index a8c29e8977..0000000000 --- a/nemo_skills/dataset/eval_kit/__init__.py +++ /dev/null @@ -1,45 +0,0 @@ -# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# VLMEvalKit integration module. -# Benchmarks are referenced as eval_kit., e.g. eval_kit.MMBench_DEV_EN -# The sub-benchmark name after eval_kit. is dynamically resolved and passed to VLMEvalKit. - -GENERATION_MODULE = "nemo_skills.inference.eval.eval_kit" -METRICS_TYPE = "eval_kit" -GENERATION_ARGS = "" -NUM_SAMPLES = 0 # VLMEvalKit inference is deterministic; no random seeds - -# No JSONL input file; VLMEvalKit manages its own data via build_dataset() -SKIP_INPUT_FILE = True - -# Note: SELF_CONTAINED_TASK is NOT set here because it depends on model_type. -# For mcore mode (Megatron in-process), the pipeline sets self_contained_task=True -# at runtime based on ++model_type=mcore in extra_arguments. -# For vllm mode, the standard NeMo Skills server/client flow is used. - - -def get_extra_generation_args(benchmark): - """Return extra generation args for the given benchmark name. - - Extracts the VLMEvalKit dataset name from the dotted benchmark name - (e.g. eval_kit.MMBench_DEV_EN -> ++vlm_dataset=MMBench_DEV_EN). - """ - if "." not in benchmark: - raise ValueError( - f"eval_kit benchmark must be in 'eval_kit.' format, got '{benchmark}'. " - f"Example: eval_kit.MMBench_DEV_EN, eval_kit.LibriSpeech_test_clean" - ) - sub = benchmark.split(".", 1)[1] - return f" ++vlm_dataset={sub} " diff --git a/nemo_skills/dataset/utils.py b/nemo_skills/dataset/utils.py index 5239dd3c0e..d918ba086e 100644 --- a/nemo_skills/dataset/utils.py +++ b/nemo_skills/dataset/utils.py @@ -161,13 +161,6 @@ def _load_external_dataset(dataset_path): def get_default_dataset_module(dataset): data_path = "/nemo_run/code/nemo_skills/dataset" - - # For dotted names like eval_kit.MMBench_DEV_EN, import the parent package. - # The sub-benchmark part is handled by the module's get_extra_generation_args(). - if dataset.startswith("eval_kit."): - dataset_module = importlib.import_module("nemo_skills.dataset.eval_kit") - return dataset_module, data_path - dataset_module = importlib.import_module(f"nemo_skills.dataset.{dataset}") return dataset_module, data_path diff --git a/nemo_skills/evaluation/evaluator/audio.py b/nemo_skills/evaluation/evaluator/audio.py index 6d8af5b1b6..f212859ff1 100644 --- a/nemo_skills/evaluation/evaluator/audio.py +++ b/nemo_skills/evaluation/evaluator/audio.py @@ -505,35 +505,13 @@ def evaluate_sample(sample: dict[str, Any], config: AudioEvaluatorConfig) -> dic """Evaluate single sample based on task_type. Returns dict of updates to merge.""" updates = {} task_type = sample.get("task_type", "unknown") - generation_raw = sample.get("generation") - generation = generation_raw.strip() if isinstance(generation_raw, str) else "" + generation = sample["generation"].strip() expected_answer = sample.get("expected_answer", "").strip() # Strip helpful prefixes for ASR tasks (e.g., "The audio says: ...") if config.strip_helpful_prefixes: generation = strip_helpful_prefixes(generation) - # Normalise AudioBench speech-translation task types (ST-EN-ZH -> Translation) - _ASR_TYPES = {"ASR", "ASR-ZH", "ASR-PC", "ASR_LEADERBOARD"} - _TRANSLATION_TYPES = {"AST", "Translation"} - # AudioBench speech translation types: ST-{src}-{tgt} - if task_type.startswith("ST-"): - _TRANSLATION_TYPES.add(task_type) - - if task_type in (_ASR_TYPES | _TRANSLATION_TYPES | {"CER"}) and not generation: - base = { - "is_correct": False, - "error": "missing_generation", - } - if task_type in _TRANSLATION_TYPES: - return {**base, "bleu": 0.0} - if task_type == "CER": - return {**base, "cer": 1.0} - if task_type == "ASR-PC": - return {**base, "wer": 1.0, "wer_c": 1.0, "wer_pc": 1.0, "per": 1.0} - # ASR / ASR-ZH / ASR_LEADERBOARD - return {**base, "wer": 1.0} - if task_type == "ASR-PC": mode = resolve_asr_normalization_mode(config) metrics = evaluate_asr_pc( @@ -544,7 +522,7 @@ def evaluate_sample(sample: dict[str, Any], config: AudioEvaluatorConfig) -> dic ) updates.update(metrics) - elif task_type in {"ASR", "ASR-ZH"}: + elif task_type == "ASR": mode = resolve_asr_normalization_mode(config) metrics = evaluate_asr(expected_answer, generation, normalization_mode=mode) updates.update(metrics) @@ -566,7 +544,7 @@ def evaluate_sample(sample: dict[str, Any], config: AudioEvaluatorConfig) -> dic updates[f"wer_{metric_suffix}"] = ref_metrics["wer"] updates[f"is_correct_{metric_suffix}"] = ref_metrics["is_correct"] - elif task_type in _TRANSLATION_TYPES: + elif task_type in ["AST", "Translation"]: metrics = evaluate_translation(expected_answer, generation) updates.update(metrics) @@ -583,13 +561,6 @@ def evaluate_sample(sample: dict[str, Any], config: AudioEvaluatorConfig) -> dic metrics = evaluate_pc_rate(expected_answer, generation) updates.update(metrics) - elif task_type == "MathQA": - # AudioBench MathQA: exact string match after normalization - gen_norm = generation.strip().lower() - ref_norm = expected_answer.strip().lower() - updates["is_correct"] = gen_norm == ref_norm - updates["predicted_answer"] = generation - else: if "requires_judge" not in sample: updates["requires_judge"] = True diff --git a/nemo_skills/evaluation/metrics/eval_kit_metrics.py b/nemo_skills/evaluation/metrics/eval_kit_metrics.py deleted file mode 100644 index ffa760826a..0000000000 --- a/nemo_skills/evaluation/metrics/eval_kit_metrics.py +++ /dev/null @@ -1,95 +0,0 @@ -# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import json -from pathlib import Path - -from nemo_skills.evaluation.metrics.base import BaseMetrics - - -class EvalKitMetrics(BaseMetrics): - """Metrics class for VLMEvalKit benchmarks. - - VLMEvalKit computes its own aggregate metrics during evaluation. - This class reads pre-computed aggregates from eval_kit_metrics.json - (written by EvalKitGenerationTask) rather than computing per-sample metrics. - The per-sample JSONL is still read by ComputeMetrics for the update() loop, - but we only count entries here -- the real metrics come from the JSON file. - - Note: ComputeMetrics only calls setup() on the "_all_" calculator. When - the data contains ``subset_for_metrics``, additional per-subset calculator - instances are created but never receive a setup() call. We use a - class-level ``_shared_metrics_file`` so that those subset instances can - still locate the eval_kit_metrics.json discovered by the "_all_" instance. - """ - - # Shared across all instances so subset calculators can find the file - # even though only the "_all_" calculator receives setup(). - _shared_metrics_file: Path | None = None - - def __init__(self, **kwargs): - super().__init__(compute_no_answer=False) - self.eval_kit_metrics_file = None - - def setup(self, input_files): - """Find the eval_kit_metrics.json in the same directory as the input files.""" - if input_files: - # input_files are like ['/path/to/eval-results/eval_kit.MMBench_DEV_EN/output.jsonl'] - metrics_dir = Path(input_files[0]).parent - candidate = metrics_dir / "eval_kit_metrics.json" - if candidate.exists(): - self.eval_kit_metrics_file = candidate - EvalKitMetrics._shared_metrics_file = candidate - else: - # Reset stale shared path so a previous run's file isn't reused. - EvalKitMetrics._shared_metrics_file = None - - def update(self, predictions): - """Count entries but don't compute per-sample metrics.""" - self.total += 1 - - def get_metrics(self): - """Return pre-computed VLMEvalKit aggregate metrics.""" - metrics_dict = {} - - # Load pre-computed metrics from VLMEvalKit. - # Fall back to the class-level shared file for subset calculators - # that never received a setup() call. - eval_kit_results = {} - effective_file = self.eval_kit_metrics_file or EvalKitMetrics._shared_metrics_file - if effective_file and effective_file.exists(): - with open(effective_file, "rt", encoding="utf-8") as f: - eval_kit_results = json.load(f) - - # Build the metrics in NeMo Skills format - agg_dict = {"num_entries": self.total} - - # Flatten VLMEvalKit results into the metrics dict - for key, value in eval_kit_results.items(): - if isinstance(value, dict): - # Nested results (e.g., per-category scores) - for sub_key, sub_value in value.items(): - if isinstance(sub_value, (int, float)): - agg_dict[f"{key}_{sub_key}"] = sub_value - elif isinstance(value, (int, float)): - agg_dict[key] = value - - metrics_dict["greedy"] = agg_dict - return metrics_dict - - def metrics_to_print(self): - return None - - def evaluations_to_print(self): - return ["greedy"] diff --git a/nemo_skills/evaluation/metrics/map_metrics.py b/nemo_skills/evaluation/metrics/map_metrics.py index 00332bde61..92f9f3282c 100644 --- a/nemo_skills/evaluation/metrics/map_metrics.py +++ b/nemo_skills/evaluation/metrics/map_metrics.py @@ -30,7 +30,6 @@ SweBenchMetrics, ) from nemo_skills.evaluation.metrics.critpt_metrics import CritPtMetrics -from nemo_skills.evaluation.metrics.eval_kit_metrics import EvalKitMetrics from nemo_skills.evaluation.metrics.gradingbench_metrics import GradingBenchMetrics from nemo_skills.evaluation.metrics.hleaa_metrics import HLEAAMetrics from nemo_skills.evaluation.metrics.icpc_metrics import ICPCMetrics @@ -88,7 +87,6 @@ "compute-eval": ComputeEvalMetrics, "gradingbench": GradingBenchMetrics, "critpt": CritPtMetrics, - "eval_kit": EvalKitMetrics, "specdec": SpecdecMetrics, } diff --git a/nemo_skills/evaluation/metrics/translation_metrics.py b/nemo_skills/evaluation/metrics/translation_metrics.py index 8f5be0bdeb..5a819152cd 100644 --- a/nemo_skills/evaluation/metrics/translation_metrics.py +++ b/nemo_skills/evaluation/metrics/translation_metrics.py @@ -16,6 +16,7 @@ from collections import defaultdict import numpy as np +from sacrebleu import corpus_bleu from nemo_skills.evaluation.metrics.base import BaseMetrics, as_float @@ -34,8 +35,6 @@ class TranslationMetrics(BaseMetrics): # TODO: add support for other translation metrics, such as MetricX def get_metrics(self): - from sacrebleu import corpus_bleu - metrics_dict = {} for key in self.translation_dict: src_lang, tgt_lang = key.split("->") diff --git a/nemo_skills/inference/eval/eval_kit.py b/nemo_skills/inference/eval/eval_kit.py deleted file mode 100644 index 420250d55d..0000000000 --- a/nemo_skills/inference/eval/eval_kit.py +++ /dev/null @@ -1,561 +0,0 @@ -# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""VLMEvalKit integration for NeMo Skills. - -This module implements a self-contained generation task that uses VLMEvalKit's -inference and evaluation pipeline. Two modes are supported: - -1. Megatron in-process (model_type=mcore): VLMEvalKit's MultiModalMCore loads - and runs the Megatron model directly. No NeMo Skills server is started. - -2. vLLM client (model_type=vllm): NeMo Skills starts a vLLM server normally, - and VLMEvalKit's VLLMLocal connects to it as a client. - -Benchmarks are referenced as eval_kit. in NeMo Skills, -e.g. --benchmarks eval_kit.MMBench_DEV_EN -""" - -import json -import logging -import os -import pickle -import threading -from dataclasses import field -from pathlib import Path - -import hydra -from omegaconf import MISSING - -try: - from nemo_skills.inference.generate import GenerationTask -except ImportError: - # On the cluster, GenerationTask may not be importable due to missing deps - # (nemo_run, litellm, etc.). The inheritance is only needed for the pipeline's - # __func__ check which runs locally. On the cluster we just need a base class. - GenerationTask = object - -from nemo_skills.utils import get_logger_name, nested_dataclass - -LOG = logging.getLogger(get_logger_name(__file__)) - -# VLMEvalKit (vlmeval) is packaged alongside Skills code by nemo_run when -# NEMO_SKILLS_VLMEVALKIT_PATH is set (see eval.py extra_package_dirs logic). -# It lands at /nemo_run/code/vlmeval/ on the cluster, importable via PYTHONPATH. -# func-timeout is installed at job start via --installation_command in the run script. -# No venv-based requirements are needed (get_generation_requirements returns None). - - -@nested_dataclass(kw_only=True) -class EvalKitConfig: - """Configuration for VLMEvalKit generation task.""" - - # VLMEvalKit dataset name (injected by pipeline from benchmark name) - vlm_dataset: str = MISSING - - # Model configuration - model_type: str = "mcore" # "mcore" or "vllm" - model_config: str | None = None # Path to YAML config for mcore - load_dir: str | None = None # Checkpoint directory for mcore - load_ckpt: str | None = None # Specific checkpoint for mcore - server_url: str | None = None # URL for vLLM server (vllm mode) - model_name: str | None = None # Model name for vLLM - - # Inference parameters - reasoning: bool = False - temperature: float = 1.0 - top_k: int = 1 - top_p: float = 0.95 - - # Video dataset parameters - nframe: int = 16 - fps: int = -1 - nframe_max: int = -1 - use_subtitle: bool = False - media_dir: str = "./" - - # Evaluation parameters - eval_mode: str = "all" # "all", "infer", or "eval" - judge: str | None = None - judge_nproc: int = 4 - judge_retry: int = 3 - - # Output configuration (populated by the pipeline) - work_dir: str = "./outputs" - output_file: str = "" - skip_filled: bool = False # Accepted from pipeline but unused (VLMEvalKit has its own resume) - - # Fields accepted from pipeline but unused by eval_kit (avoids Hydra errors from common_args) - eval_config: dict = field(default_factory=dict) - - -cs = hydra.core.config_store.ConfigStore.instance() -cs.store(name="base_eval_kit_config", node=EvalKitConfig) - - -class EvalKitGenerationTask(GenerationTask): - """Generation task using VLMEvalKit. - - Supports two modes: - - mcore: Self-contained, no external server. Pipeline sets - self_contained_task=True so no server is started. - - vllm: Pipeline starts a vLLM server normally. This task overrides - ``configure_client_overrides`` to translate the server address into - eval_kit's flat config fields (``++server_url``, ``++model_name``) - instead of the standard nested ``++server.*`` overrides. - """ - - # --- Declarative pipeline attributes (read generically by pipeline/eval.py) --- - CONTAINER_KEY = "eval_kit" - USE_TORCHRUN = True - - @classmethod - def is_self_contained(cls, extra_arguments: str = "") -> bool: - """Self-contained only when user explicitly requests mcore mode. - - Note: EvalKitConfig.model_type defaults to "mcore" at runtime, but - at submission time we check explicit user intent. Without the flag - the pipeline assumes vllm (server-based) mode. - """ - return "++model_type=mcore" in extra_arguments - - @classmethod - def configure_client_overrides(cls, *, host: str, port: int, model: str, server_type: str) -> str: - """Return Hydra overrides for connecting to an already-running server. - - EvalKitConfig uses flat fields (server_url, model_name) rather than - the standard nested ``server.*`` group, so we translate here. - """ - return f"++server_url=http://{host}:{port} ++model_name={model} ++model_type=vllm " - - @classmethod - def get_env_prefix(cls) -> str: - """Shell env setup prepended before the main command (Megatron/VLMEvalKit needs).""" - return ( - 'export LMUData="${LMUData:-${LMUDATA:-}}" && ' - "export LD_LIBRARY_PATH=/opt/hpcx/ucx/lib:${LD_LIBRARY_PATH:-} && " - "export MKL_THREADING_LAYER=GNU && " - "export OMP_NUM_THREADS=1 && " - "export MKL_NUM_THREADS=1 && " - "ldconfig && " - # Create empty .env so VLMEvalKit's load_env() doesn't emit ERROR logs. - "touch /nemo_run/code/.env 2>/dev/null; " - ) - - @classmethod - def get_extra_package_dirs(cls) -> list[str]: - """Directories to package alongside nemo_run code (VLMEvalKit vlmeval/).""" - vlmevalkit_path = os.environ.get("NEMO_SKILLS_VLMEVALKIT_PATH") - if vlmevalkit_path: - pkg = os.path.join(vlmevalkit_path, "vlmeval") - if os.path.isdir(pkg): - return [pkg] - return [] - - @classmethod - def get_generation_default_args(cls): - return "" - - @classmethod - def get_generation_requirements(cls): - # VLMEvalKit is installed via --installation_command (pip install from mounted source). - # No additional venv-based requirements needed. - return None - - def __init__(self, cfg: EvalKitConfig): - self.cfg = cfg - - # Validate environment - lmu_data = os.environ.get("LMUData") - if not lmu_data: - raise ValueError( - "LMUData environment variable must be set for eval_kit benchmarks. " - "Add LMUData=/mounted/path to your cluster config env_vars." - ) - - # Build model FIRST so that initialize_megatron() sets up the - # distributed process group before we need dist.barrier() for - # rank-0-first dataset download. - if cfg.model_type == "mcore": - from vlmeval.vlm.multimodal_mcore.model import MultiModalMCore - - if not cfg.model_config: - raise ValueError("model_config is required for mcore model_type.") - self.model = MultiModalMCore( - model_config=cfg.model_config, - load_dir=cfg.load_dir, - load_ckpt=cfg.load_ckpt, - reasoning=cfg.reasoning, - ) - self.model_name = f"mcore_{Path(cfg.model_config).stem}" - elif cfg.model_type == "vllm": - from vlmeval.vlm.vllm_local import VLLMLocal - - if not cfg.server_url: - raise ValueError("server_url is required for vllm model_type.") - self.model = VLLMLocal( - vllm_url=cfg.server_url, - autospawn=False, - model_name=cfg.model_name or "vllm_local", - reasoning_mode=cfg.reasoning, - temperature=cfg.temperature, - top_k=cfg.top_k, - top_p=cfg.top_p, - ) - self.model_name = cfg.model_name or "vllm_local" - else: - raise ValueError(f"Unknown model_type: {cfg.model_type}. Must be 'mcore' or 'vllm'.") - - # Build dataset after model so the distributed process group is available - # for the rank-0-first download pattern (run.py:428-433). - from vlmeval.dataset import build_dataset - - dataset_kwargs = self._build_dataset_kwargs() - rank = int(os.environ.get("RANK", 0)) - world_size = int(os.environ.get("WORLD_SIZE", 1)) - - if world_size > 1: - import torch.distributed as dist - - if rank == 0: - build_dataset(cfg.vlm_dataset, **dataset_kwargs) - dist.barrier() - - self.dataset = build_dataset(cfg.vlm_dataset, **dataset_kwargs) - if self.dataset is None: - raise ValueError(f"VLMEvalKit dataset '{cfg.vlm_dataset}' is not valid.") - - self.work_dir = os.path.join(cfg.work_dir, "eval_kit_work", cfg.vlm_dataset) - os.makedirs(self.work_dir, exist_ok=True) - - # Async JSONL writer state - self._async_stop = threading.Event() - self._async_written_indices = set() - self._async_lock = threading.Lock() - self._async_thread = None - - # ------------------------------------------------------------------ - # Incremental JSONL writer (mirrors NeMo Skills' -async pattern) - # ------------------------------------------------------------------ - - def _build_index_to_meta(self): - """Build a lookup from dataset index -> {question, answer} for JSONL rows.""" - meta = {} - df = self.dataset.data - for _, row in df.iterrows(): - idx = row["index"] - meta[idx] = { - "question": str(row["question"]) if "question" in row.index else "", - "expected_answer": str(row["answer"]) if "answer" in row.index else "", - } - return meta - - def _pkl_to_prediction(self, value): - """Extract the prediction string from a pkl entry (str or dict).""" - if isinstance(value, dict) and "prediction" in value: - return str(value["prediction"]) - return str(value) - - def _async_writer_loop(self, pkl_path, index_meta, output_path, poll_interval=5): - """Background thread: poll the pkl file and append new entries to JSONL.""" - while not self._async_stop.is_set(): - self._flush_pkl_to_jsonl(pkl_path, index_meta, output_path) - self._async_stop.wait(timeout=poll_interval) - # Final flush after inference signals stop - self._flush_pkl_to_jsonl(pkl_path, index_meta, output_path) - - def _flush_pkl_to_jsonl(self, pkl_path, index_meta, output_path): - """Read the pkl, find new entries, append them to the JSONL file.""" - if not os.path.exists(pkl_path): - return - try: - with open(pkl_path, "rb") as f: - data = pickle.load(f) - except Exception: - # pkl may be mid-write; skip this cycle - return - if not isinstance(data, dict): - return - - new_entries = [] - with self._async_lock: - for idx, value in data.items(): - if idx not in self._async_written_indices: - self._async_written_indices.add(idx) - meta = index_meta.get(idx, {}) - new_entries.append( - { - "generation": self._pkl_to_prediction(value), - "expected_answer": meta.get("expected_answer", ""), - "question": meta.get("question", ""), - } - ) - - if new_entries: - with open(output_path, "a", encoding="utf-8") as f: - for entry in new_entries: - f.write(json.dumps(entry) + "\n") - LOG.info( - "Async JSONL: flushed %d new entries (total %d)", len(new_entries), len(self._async_written_indices) - ) - - def _start_async_writer(self): - """Start the background JSONL writer if output_file is configured.""" - if not self.cfg.output_file: - return - rank = int(os.environ.get("RANK", 0)) - if rank != 0: - return - - world_size = int(os.environ.get("WORLD_SIZE", 1)) - ds_name = self.dataset.dataset_name - pkl_path = os.path.join(self.work_dir, f"0{world_size}_{ds_name}.pkl") - - output_dir = Path(self.cfg.output_file).parent - output_dir.mkdir(parents=True, exist_ok=True) - - # Clear any previous async file - async_path = self.cfg.output_file - if os.path.exists(async_path): - os.remove(async_path) - - index_meta = self._build_index_to_meta() - - self._async_stop.clear() - self._async_written_indices.clear() - self._async_thread = threading.Thread( - target=self._async_writer_loop, - args=(pkl_path, index_meta, async_path), - daemon=True, - ) - self._async_thread.start() - LOG.info("Started async JSONL writer, monitoring %s", pkl_path) - - def _stop_async_writer(self): - """Stop the background writer and wait for final flush.""" - if self._async_thread is None: - return - self._async_stop.set() - self._async_thread.join(timeout=30) - self._async_thread = None - LOG.info("Async JSONL writer stopped (%d entries written)", len(self._async_written_indices)) - - def _build_dataset_kwargs(self): - """Build dataset kwargs mirroring VLMEvalKit's run.py:390-425.""" - from vlmeval.smp import listinstr - - kwargs = {} - ds = self.cfg.vlm_dataset - - if ds in ["MMLongBench_DOC", "DUDE", "DUDE_MINI", "SLIDEVQA", "SLIDEVQA_MINI"]: - kwargs["model"] = self.cfg.model_name or self.cfg.model_config or "" - - if ds in ( - "Video-MME", - "Video-MME-With-Audio", - "WorldSense-AVLM", - "MetropolisVideoDataset", - "WorldSense", - "avqa_val", - ): - kwargs["use_subtitle"] = self.cfg.use_subtitle - if ds in ( - "Video-MME", - "MetropolisVideoDataset", - "MLVU", - "LongVideoBench", - "MMBench-Video", - "MVBench", - "MLVU_MCQ", - "PAI-Bench-U", - ): - kwargs["nframe"] = self.cfg.nframe - if ds in [ - "Video-MME", - "MLVU", - "LongVideoBench", - "WorldSense", - "avqa_val", - "MMBench-Video", - "MVBench", - "MLVU_MCQ", - "PAI-Bench-U", - ]: - kwargs["fps"] = self.cfg.fps - if ds in [ - "Video-MME", - "MLVU", - "LongVideoBench", - "WorldSense", - "avqa_val", - "MLVU_MCQ", - "MMBench-Video", - "PAI-Bench-U", - ]: - kwargs["nframe_max"] = self.cfg.nframe_max - if ds in ["ANet-RTL", "Charades-STA"]: - kwargs["nframe"] = self.cfg.nframe - - if listinstr(["Video-MME-With-Audio", "DailyOmni", "WorldSense-AVLM", "JensenKeyNote"], ds): - kwargs["media_dir"] = self.cfg.media_dir - - return kwargs - - def generate(self): - """Run VLMEvalKit inference and evaluation.""" - from vlmeval.inference import infer_data_job - from vlmeval.inference_mt import infer_data_job_mt - from vlmeval.inference_video import infer_data_job_video - from vlmeval.smp import get_pred_file_format - - dataset = self.dataset - ds_name = dataset.dataset_name - pred_format = get_pred_file_format() - result_file_base = f"{self.model_name}_{ds_name}.{pred_format}" - - rank = int(os.environ.get("RANK", 0)) - - # Start incremental JSONL writer before inference begins - self._start_async_writer() - - # Dispatch to correct inference function (mirrors run.py:453-488) - try: - if self.cfg.eval_mode != "eval": - if dataset.MODALITY == "VIDEO": - self.model = infer_data_job_video( - model=self.model, - work_dir=self.work_dir, - model_name=self.model_name, - dataset=dataset, - result_file_name=result_file_base, - strip_think=not self.cfg.reasoning, - reasoning_flag=self.cfg.reasoning, - ) - elif dataset.TYPE == "MT": - self.model = infer_data_job_mt( - model=self.model, - work_dir=self.work_dir, - model_name=self.model_name, - dataset=dataset, - ) - else: - self.model = infer_data_job( - model=self.model, - work_dir=self.work_dir, - model_name=self.model_name, - dataset=dataset, - strip_think=not self.cfg.reasoning, - reasoning_flag=self.cfg.reasoning, - ) - finally: - self._stop_async_writer() - - # Evaluate (mirrors run.py:490-548) - eval_result = {} - if self.cfg.eval_mode != "infer" and rank == 0: - from vlmeval.smp import get_pred_file_path - - result_file = get_pred_file_path(self.work_dir, self.model_name, ds_name, use_env_format=True) - judge_kwargs = { - "nproc": self.cfg.judge_nproc, - "verbose": False, - "retry": self.cfg.judge_retry, - } - if self.cfg.judge: - judge_kwargs["model"] = self.cfg.judge - - if os.path.exists(result_file): - try: - eval_result = dataset.evaluate(result_file, **judge_kwargs) - except KeyError as e: - if e.args and e.args[0] == "model": - LOG.warning( - "Dataset %s requires a judge model for evaluation (e.g. MathVista). " - "Skipping evaluation. Set ++judge= (e.g. gpt-4o) to enable. " - "Inference output was still written.", - ds_name, - ) - eval_result = {} - else: - raise - if eval_result is None: - eval_result = {} - - # Convert to NeMo Skills format and write outputs (rank 0 only) - if rank == 0: - self._convert_to_nemo_skills_format(eval_result) - - # Write .done file for pipeline tracking - if self.cfg.output_file: - Path(f"{self.cfg.output_file}.done").touch() - - def _convert_to_nemo_skills_format(self, eval_result): - """Rewrite the final ordered JSONL output and eval_kit_metrics.json. - - The async writer has already been producing incremental JSONL during - inference. Here we overwrite with the authoritative, properly-ordered - result that VLMEvalKit merged from all ranks. - """ - if not self.cfg.output_file: - return - - from vlmeval.smp import get_pred_file_path - from vlmeval.smp import load as vlm_load - - output_dir = Path(self.cfg.output_file).parent - output_dir.mkdir(parents=True, exist_ok=True) - - # Write JSONL (required by summarize_results to find output*jsonl files) - result_file = get_pred_file_path( - self.work_dir, - self.model_name, - self.dataset.dataset_name, - use_env_format=True, - ) - if os.path.exists(result_file): - df = vlm_load(result_file) - with open(self.cfg.output_file, "w", encoding="utf-8") as f: - for _, row in df.iterrows(): - entry = { - "generation": str(row["prediction"]) if "prediction" in row.index else "", - "expected_answer": str(row["answer"]) if "answer" in row.index else "", - "question": str(row["question"]) if "question" in row.index else "", - } - f.write(json.dumps(entry) + "\n") - LOG.info("Wrote final ordered JSONL to %s (%d entries)", self.cfg.output_file, len(df)) - else: - LOG.warning("VLMEvalKit result file not found: %s", result_file) - - # Write aggregate metrics for EvalKitMetrics to read - # eval_result can be a dict or a pandas DataFrame (e.g. ASR); avoid "if eval_result" for DataFrame - if eval_result is not None: - metrics_data = eval_result if isinstance(eval_result, dict) else {"result": str(eval_result)} - metrics_path = output_dir / "eval_kit_metrics.json" - with open(metrics_path, "w", encoding="utf-8") as f: - json.dump(metrics_data, f, indent=2, default=str) - LOG.info("Wrote eval_kit metrics to %s", metrics_path) - - -GENERATION_TASK_CLASS = EvalKitGenerationTask - - -@hydra.main(version_base=None, config_name="base_eval_kit_config") -def main(cfg: EvalKitConfig): - cfg = EvalKitConfig(_init_nested=True, **cfg) - task = EvalKitGenerationTask(cfg) - task.generate() - - -if __name__ == "__main__": - main() diff --git a/nemo_skills/inference/factory.py b/nemo_skills/inference/factory.py index 93f5bbe193..cd29bbd2c5 100644 --- a/nemo_skills/inference/factory.py +++ b/nemo_skills/inference/factory.py @@ -19,12 +19,10 @@ class GenerationType(str, Enum): generate = "generate" math_judge = "math_judge" check_contamination = "check_contamination" - mcore_skills = "mcore_skills" GENERATION_MODULE_MAP = { GenerationType.generate: "nemo_skills.inference.generate", GenerationType.math_judge: "nemo_skills.inference.llm_math_judge", GenerationType.check_contamination: "nemo_skills.inference.check_contamination", - GenerationType.mcore_skills: "nemo_skills.inference.mcore_skills", } diff --git a/nemo_skills/inference/generate.py b/nemo_skills/inference/generate.py index fcf0a94446..3122f2ceeb 100644 --- a/nemo_skills/inference/generate.py +++ b/nemo_skills/inference/generate.py @@ -273,38 +273,6 @@ def _get_disallowed_params(self): class GenerationTask: - # --- Declarative pipeline attributes --- - # Subclasses can override to declare their runtime needs generically. - # The pipeline reads these instead of hardcoding module-name checks. - - # Container key in cluster_config["containers"]; None means use "nemo-skills" default. - CONTAINER_KEY: str | None = None - - # Whether to wrap the command with torchrun for multi-GPU data-parallel inference. - USE_TORCHRUN: bool = False - - @classmethod - def is_self_contained(cls, extra_arguments: str = "") -> bool: - """Whether this task manages its own model (no NeMo Skills server). - - Override in subclasses. *extra_arguments* is the raw CLI extra args string - so that the decision can depend on runtime flags (e.g. model_type). - """ - return False - - @classmethod - def get_env_prefix(cls) -> str: - """Shell commands prepended before the main command (e.g. env exports). - - Return an empty string if no special environment is needed. - """ - return "" - - @classmethod - def get_extra_package_dirs(cls) -> list[str]: - """Extra directories to package alongside nemo_run code.""" - return [] - @classmethod def get_generation_default_args(cls) -> str: """ diff --git a/nemo_skills/inference/mcore_skills.py b/nemo_skills/inference/mcore_skills.py deleted file mode 100644 index 5db561d1b6..0000000000 --- a/nemo_skills/inference/mcore_skills.py +++ /dev/null @@ -1,547 +0,0 @@ -# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""NeMo Skills generation via VLMEvalKit MultiModalMCore in-process. - -This module implements Option A from the plan: read NeMo Skills JSONL, fill -prompts with NeMo Skills prompt config, run inference through MultiModalMCore -synchronously, write NeMo Skills-format JSONL. No HTTP server; evaluation -remains NeMo Skills metrics on the output. - -When run with torchrun (multi-GPU), all ranks participate in model.generate(); -only rank 0 performs file I/O. -""" - -import json -import logging -import os -import re -from dataclasses import field -from pathlib import Path - -import hydra -from omegaconf import MISSING -from tqdm import tqdm - -from nemo_skills.prompt.utils import get_prompt -from nemo_skills.utils import chunk_data, get_logger_name, nested_dataclass - -LOG = logging.getLogger(get_logger_name(__file__)) - -try: - from nemo_skills.inference.generate import GenerationTask -except ImportError: - GenerationTask = None - -if GenerationTask is not None: - _get_server_command_fn = GenerationTask.get_server_command_fn -else: - - @classmethod - def _get_server_command_fn(cls): - from nemo_skills.pipeline.utils import get_server_command - - return get_server_command - - -@nested_dataclass(kw_only=True) -class MegatronMCoreConfig: - """Configuration for MegatronMCore NeMo Skills generation.""" - - input_file: str = MISSING - output_file: str = MISSING - - # Prompt config for text-only data (used by fill_prompt). Not needed when the - # input JSONL already contains OpenAI-format 'messages' (e.g. asr-leaderboard). - prompt_config: str | None = None - - # Tokenizer for prompt filling (format_as_string=True). HF model name or path. - # Required when prompt_config is set; optional for messages-only data. - tokenizer: str | None = None - - # MultiModalMCore model - model_config: str = MISSING - load_dir: str | None = None - load_ckpt: str | None = None - reasoning: bool = False - - # Prompt options (mirror GenerationTaskConfig where needed) - code_tags: str | None = None - examples_type: str | None = None - system_message: str | None = None - start_assistant_response_key: str | None = None - chat_template_kwargs: dict = field(default_factory=dict) - - # Base directory to resolve relative audio/image paths (e.g. NEMO_SKILLS_DATA_DIR). - data_dir: str = "" - - # Generation limits and resume - max_samples: int = -1 - skip_filled: bool = False - num_chunks: int | None = None - chunk_id: int | None = None - - # Output - generation_key: str = "generation" - add_generation_stats: bool = True - async_position_key: str = "_async_position" - - dry_run: bool = False - - # Dataset name passed to MultiModalMCore.generate() — used by VLMEvalKit internally - # for dataset-specific logic (e.g. video tile config). Defaults to "nemo_skills". - dataset_name: str = "nemo_skills" - - # Accepted from pipeline/dataset modules but unused by mcore_skills (avoid Hydra errors). - # These come via ++key=value overrides from dataset modules (e.g. asr-leaderboard). - eval_config: dict = field(default_factory=dict) - eval_type: str | None = None - prompt_format: str | None = None - enable_audio: bool = False - - -def _make_mcore_model(cfg: MegatronMCoreConfig): - from vlmeval.vlm.multimodal_mcore.model import MultiModalMCore - - return MultiModalMCore( - model_config=cfg.model_config, - load_dir=cfg.load_dir, - load_ckpt=cfg.load_ckpt, - reasoning=cfg.reasoning, - ) - - -class MegatronMCoreGenerationTask: - """Generation task using NeMo Skills data + prompts and VLMEvalKit MultiModalMCore in-process.""" - - get_server_command_fn = _get_server_command_fn - - # --- Declarative pipeline attributes (read generically by pipeline/eval.py) --- - CONTAINER_KEY = "eval_kit" - USE_TORCHRUN = True - # Metrics are computed by VLMEvalKit (asr_wer etc.) and saved as - # eval_kit_metrics.json — tell the summarize step to use EvalKitMetrics. - METRICS_TYPE_OVERRIDE = "eval_kit" - - @classmethod - def is_self_contained(cls, extra_arguments: str = "") -> bool: - """Always self-contained (in-process MultiModalMCore, no HTTP server).""" - return True - - @classmethod - def get_env_prefix(cls) -> str: - """Shell env setup prepended before the main command (Megatron/VLMEvalKit needs).""" - return ( - 'export LMUData="${LMUData:-${LMUDATA:-}}" && ' - "export LD_LIBRARY_PATH=/opt/hpcx/ucx/lib:${LD_LIBRARY_PATH:-} && " - "export MKL_THREADING_LAYER=GNU && " - "export OMP_NUM_THREADS=1 && " - "export MKL_NUM_THREADS=1 && " - "ldconfig && " - # Create empty .env so VLMEvalKit's load_env() doesn't emit ERROR logs. - "touch /nemo_run/code/.env 2>/dev/null; " - ) - - @classmethod - def get_extra_package_dirs(cls) -> list[str]: - """Directories to package alongside nemo_run code (VLMEvalKit vlmeval/).""" - vlmevalkit_path = os.environ.get("NEMO_SKILLS_VLMEVALKIT_PATH") - if vlmevalkit_path: - pkg = os.path.join(vlmevalkit_path, "vlmeval") - if os.path.isdir(pkg): - return [pkg] - return [] - - @classmethod - def get_generation_default_args(cls): - return "" - - @classmethod - def get_generation_requirements(cls): - return None - - def __init__(self, cfg: MegatronMCoreConfig): - self.cfg = cfg - # Prompt is only needed for text-only data (no 'messages' field). - # For multimodal data with OpenAI-format messages, _build_mcore_messages - # extracts content directly — no prompt template required. - if cfg.prompt_config: - self.prompt = get_prompt( - prompt_config=cfg.prompt_config, - tokenizer=cfg.tokenizer, - code_tags=cfg.code_tags, - examples_type=cfg.examples_type, - system_message=cfg.system_message, - ) - else: - self.prompt = None - self.model = _make_mcore_model(cfg) - - def load_data(self): - data = [] - with open(self.cfg.input_file, "rt", encoding="utf-8") as fin: - for line in fin: - data.append(json.loads(line)) - if self.cfg.num_chunks is not None and self.cfg.chunk_id is not None: - data, self.cfg.output_file = chunk_data(data, self.cfg.output_file, self.cfg.chunk_id, self.cfg.num_chunks) - LOG.info( - "Chunking: %d chunks, processing chunk %d; samples in chunk: %d", - self.cfg.num_chunks, - self.cfg.chunk_id, - len(data), - ) - if self.cfg.max_samples > 0: - data = data[: self.cfg.max_samples] - return data - - def skip_completed_samples(self, data: list) -> list: - if not self.cfg.skip_filled or not Path(self.cfg.output_file).exists(): - return data - filled = 0 - with open(self.cfg.output_file, "rt", encoding="utf-8") as fin: - for _ in fin: - filled += 1 - if filled >= len(data): - return [] - return data[filled:] - - def fill_prompt(self, data_point: dict, data: list) -> str: - from copy import deepcopy - - data_point = deepcopy(data_point) - filled = self.prompt.fill( - data_point, - start_assistant_response_key=self.cfg.start_assistant_response_key, - chat_template_kwargs=self.cfg.chat_template_kwargs or {}, - format_as_string=True, - ) - return filled if isinstance(filled, str) else str(filled) - - def _get_data_dir(self) -> str: - """Return the effective data_dir from cfg or eval_config.""" - data_dir = getattr(self.cfg, "data_dir", None) or "" - if not data_dir and getattr(self.cfg, "eval_config", None): - data_dir = self.cfg.eval_config.get("data_dir") or "" - return data_dir - - def _resolve_path(self, path: str) -> str: - """Resolve a media file path, handling relative paths and mount mismatches. - - 1. Relative paths are joined with data_dir. - 2. Absolute paths that don't exist on disk are retried relative to data_dir - (handles mount mismatches, e.g. JSONL has /dataset/... but data is at /data/...). - """ - if not path: - return path - data_dir = self._get_data_dir() - if not os.path.isabs(path): - if data_dir: - return os.path.join(data_dir, path) - return path - # Absolute path — use as-is if it exists - if os.path.exists(path): - return path - # Absolute path doesn't exist — try stripping the first directory component - # and re-rooting under data_dir (e.g. /dataset/asr-leaderboard/... → /data/asr-leaderboard/...) - if data_dir: - # Strip leading /mount_name/ to get the relative portion - parts = path.strip("/").split("/", 1) - if len(parts) == 2: - relative = parts[1] - candidate = os.path.join(data_dir, relative) - if os.path.exists(candidate): - return candidate - return path - - def _build_mcore_messages(self, data_point: dict) -> list | None: - """Convert a NeMo Skills data point into MultiModalMCore message list. - - If the data point has a 'messages' field (OpenAI format), converts it to - list[dict] with types: "text", "image", "sound". - - Only user/assistant message text is included — system messages are skipped - because MultiModalMCore's generate_inner() builds its own prompt template - with system/user roles internally. - - If no 'messages' field, returns None (caller should use fill_prompt for text-only). - """ - messages = data_point.get("messages") - if not messages: - return None - - mcore: list[dict] = [] - text_parts: list[str] = [] - - for msg in messages: - if not isinstance(msg, dict): - continue - role = msg.get("role", "") - content = msg.get("content", "") - - # Skip system messages — generate_inner builds its own system prompt. - if role == "system": - continue - - # Audio: single or multiple - if "audio" in msg: - audio = msg["audio"] - if isinstance(audio, dict) and "path" in audio: - path = self._resolve_path(audio["path"]) - mcore.append({"type": "sound", "value": path, "sample_rate": 16000}) - if "audios" in msg: - for audio in msg["audios"]: - if isinstance(audio, dict) and "path" in audio: - path = self._resolve_path(audio["path"]) - mcore.append({"type": "sound", "value": path, "sample_rate": 16000}) - - # Content: str or list of content items (text, image_url) - if isinstance(content, str): - if content.strip(): - text_parts.append(content.strip()) - elif isinstance(content, list): - for item in content: - if isinstance(item, dict): - if item.get("type") == "text" and "text" in item: - text_parts.append(item["text"].strip()) - elif item.get("type") == "image_url": - image_url = item.get("image_url") or {} - url = image_url.get("url", "") - if url.startswith("file://"): - path = url[7:] - else: - path = url - if path: - path = self._resolve_path(path) - mcore.append({"type": "image", "value": path}) - - combined_text = "\n".join(t for t in text_parts if t) - if combined_text: - mcore.append({"type": "text", "value": combined_text}) - - if not mcore: - return None - return mcore - - def dump_outputs(self, outputs: list, fout): - for out in outputs: - fout.write(json.dumps(out) + "\n") - - @staticmethod - def _strip_thinking_tags(text: str) -> str: - """Strip ... tags (including empty ones) from model output.""" - return re.sub(r".*?", "", text, flags=re.DOTALL).strip() - - def _generate_for_sample(self, data_point: dict, data: list) -> str: - """Run model inference for a single data point. Returns generated text.""" - message_list = self._build_mcore_messages(data_point) - if message_list is not None: - raw = self.model.generate(message_list, dataset=self.cfg.dataset_name) - return self._strip_thinking_tags(raw) - if self.prompt is None: - raise ValueError( - "Data point has no 'messages' field and prompt_config is not set. " - "Either provide ++prompt_config for text-only data or ensure " - "the input JSONL contains OpenAI-format 'messages'." - ) - prompt_str = self.fill_prompt(data_point, data) - raw = self.model.generate( - [{"type": "text", "value": prompt_str}], - dataset=self.cfg.dataset_name, - ) - return self._strip_thinking_tags(raw) - - def generate(self): - import sys - - # Use Megatron DP rank/size for data sharding (matches VLMEvalKit pattern). - # With data_parallel=True in generate_and_post_process, each DP rank runs - # generation independently on its shard while TP ranks synchronise internally. - dp_rank = self.model.get_dp_rank() - dp_size = self.model.get_dp_size() - - output_dir = Path(self.cfg.output_file).absolute().parent - if dp_rank == 0: - output_dir.mkdir(parents=True, exist_ok=True) - - data = self.load_data() - data = self.skip_completed_samples(data) - if not data: - if dp_rank == 0: - LOG.info("No data to process, skipping generation") - return - if self.cfg.dry_run: - if dp_rank == 0: - LOG.info("Dry run: would process %d samples", len(data)) - return - - # Round-robin shard by dp_rank (same strategy as VLMEvalKit infer_data). - my_indices = list(range(dp_rank, len(data), dp_size)) - my_data = [data[i] for i in my_indices] - - if dp_rank == 0: - LOG.info( - "Data parallelism: dp_size=%d, total=%d, this rank=%d samples", - dp_size, - len(data), - len(my_data), - ) - - # Per-rank output file — visible during the run so progress can be - # monitored (e.g. ``wc -l output_rank*.jsonl``). Contains a - # ``_dp_global_idx`` field used for ordered merging at the end. - rank_file = output_dir / f"output_rank{dp_rank}.jsonl" - - # Suppress VLMEvalKit's per-sample print() on non-primary DP ranks to - # avoid 8x duplicate output in logs. - _real_stdout = sys.stdout - if dp_rank != 0: - sys.stdout = open(os.devnull, "w") - - try: - with open(rank_file, "w", encoding="utf-8") as fout: - iterator = tqdm(my_data, desc=f"mcore_skills[dp{dp_rank}]") if dp_rank == 0 else my_data - for local_idx, data_point in enumerate(iterator): - global_idx = my_indices[local_idx] - gen = self._generate_for_sample(data_point, data) - output = { - "_dp_global_idx": global_idx, - self.cfg.generation_key: gen, - **{k: v for k, v in data_point.items() if k != self.cfg.async_position_key}, - } - fout.write(json.dumps(output) + "\n") - fout.flush() - finally: - if dp_rank != 0: - sys.stdout.close() - sys.stdout = _real_stdout - - # Barrier: wait for all DP ranks to finish writing. - import torch.distributed as dist - - if dist.is_initialized(): - dist.barrier() - - # Rank 0 merges per-rank files into the final ordered output. - if dp_rank == 0: - all_results: dict[int, str] = {} - for r in range(dp_size): - rf = output_dir / f"output_rank{r}.jsonl" - if rf.exists() and rf.stat().st_size > 0: - with open(rf, "rt", encoding="utf-8") as fin: - for line in fin: - entry = json.loads(line) - idx = entry.pop("_dp_global_idx") - all_results[idx] = json.dumps(entry) - - mode = "a" if self.cfg.skip_filled and Path(self.cfg.output_file).exists() else "w" - merged_lines = [all_results[idx] + "\n" for idx in sorted(all_results.keys())] - with open(self.cfg.output_file, mode, encoding="utf-8") as fout: - fout.writelines(merged_lines) - LOG.info( - "Merged %d results from %d DP ranks into %s", - len(all_results), - dp_size, - self.cfg.output_file, - ) - - # Clean up per-rank files after successful merge. - for r in range(dp_size): - rf = output_dir / f"output_rank{r}.jsonl" - rf.unlink(missing_ok=True) - - # Evaluate using VLMEvalKit (same as eval_kit.py does). - # Done BEFORE marking .done so failed metrics prevent false completion. - self._evaluate_results() - - Path(f"{self.cfg.output_file}.done").touch() - - def _evaluate_results(self): - """Compute metrics using VLMEvalKit's evaluation functions. - - Uses the same asr_wer() that eval_kit.py calls via dataset.evaluate(), - so metrics are identical. Saves eval_kit_metrics.json (consumed by - EvalKitMetrics in the summarize step). - """ - output_file = self.cfg.output_file - if not output_file or not Path(output_file).exists(): - return - - output_path = Path(output_file) - - try: - from vlmeval.dataset.avlm.utils import asr_wer - - # Read entries and build VLMEvalKit-format results list - entries = [] - results = [] - with open(output_file, "rt", encoding="utf-8") as fin: - for line in fin: - entry = json.loads(line) - # Strip leftover tags (older runs may have them) - gen_key = self.cfg.generation_key - gen = entry.get(gen_key, "") - cleaned = self._strip_thinking_tags(gen) - if cleaned != gen: - entry[gen_key] = cleaned - entries.append(entry) - results.append( - { - "gt": entry.get("expected_answer", ""), - "pred": entry[gen_key], - } - ) - - # Re-write output.jsonl with cleaned generations - with open(output_file, "w", encoding="utf-8") as fout: - for entry in entries: - fout.write(json.dumps(entry) + "\n") - - # Compute WER using VLMEvalKit (same function as eval_kit path) - wer_score = asr_wer(results) - LOG.info("ASR WER: %.2f%%", wer_score) - - # Save as eval_kit_metrics.json (same format eval_kit.py writes) - metrics = {"wer": wer_score} - metrics_file = output_path.parent / "eval_kit_metrics.json" - with open(metrics_file, "w", encoding="utf-8") as f: - json.dump(metrics, f, indent=2) - LOG.info("Metrics saved to %s", metrics_file) - - except ImportError: - LOG.warning( - "VLMEvalKit asr_wer not available — skipping eval-kit-style metrics. " - "The summarize_results job will compute metrics separately." - ) - except Exception: - LOG.exception("Inline metrics computation failed") - - -GENERATION_TASK_CLASS = MegatronMCoreGenerationTask - -cs = hydra.core.config_store.ConfigStore.instance() -cs.store(name="base_mcore_skills_config", node=MegatronMCoreConfig) - - -@hydra.main(version_base=None, config_name="base_mcore_skills_config") -def main(cfg: MegatronMCoreConfig): - cfg = MegatronMCoreConfig(_init_nested=True, **cfg) - task = MegatronMCoreGenerationTask(cfg) - task.generate() - - -if __name__ == "__main__": - import nemo_skills.utils as utils - - utils.setup_logging() - main() diff --git a/nemo_skills/pipeline/eval.py b/nemo_skills/pipeline/eval.py index 69d2e27aeb..37e9c38095 100644 --- a/nemo_skills/pipeline/eval.py +++ b/nemo_skills/pipeline/eval.py @@ -36,34 +36,6 @@ LOG = logging.getLogger(get_logger_name(__file__)) -def _apply_task_overrides(combined_cmd, task_classes, job_num_gpus, cluster_config): - """Apply env/torchrun/container overrides declared by generation task classes. - - Returns (modified_cmd, container). - """ - # Environment prefix (first non-empty wins; jobs are not mixed across task types) - for tc in task_classes: - prefix = tc.get_env_prefix() if hasattr(tc, "get_env_prefix") else "" - if prefix: - combined_cmd = f"{prefix}{combined_cmd}" - break - - # Torchrun for multi-GPU data-parallel inference - if any(getattr(tc, "USE_TORCHRUN", False) for tc in task_classes): - if job_num_gpus and int(job_num_gpus) > 1: - combined_cmd = combined_cmd.replace("python -m ", f"torchrun --nproc_per_node {job_num_gpus} -m ", 1) - - # Container selection (task class CONTAINER_KEY, falling back to nemo-skills default) - container = cluster_config["containers"]["nemo-skills"] - for tc in task_classes: - key = getattr(tc, "CONTAINER_KEY", None) - if key and key in cluster_config.get("containers", {}): - container = cluster_config["containers"][key] - break - - return combined_cmd, container - - class SingleNodeMode(str, enum.Enum): sequential = "sequential" parallel = "parallel" @@ -444,20 +416,6 @@ def eval( sbatch_kwargs = parse_kwargs(sbatch_kwargs, exclusive=exclusive, qos=qos, time_min=time_min) get_random_port = pipeline_utils.should_get_random_port(server_gpus, exclusive) - - # Build extra_package_dirs: include any dirs declared by generation task classes - extra_pkg_dirs = [] - seen_pkg_dirs = set() - for ba in benchmarks_dict.values(): - task_cls = ba.generation_task_class - if task_cls is not None and hasattr(task_cls, "get_extra_package_dirs"): - for pkg_dir in task_cls.get_extra_package_dirs(): - if pkg_dir not in seen_pkg_dirs: - seen_pkg_dirs.add(pkg_dir) - extra_pkg_dirs.append(pkg_dir) - LOG.info("Packaging extra dir from %s: %s", task_cls.__name__, pkg_dir) - extra_pkg_dirs = extra_pkg_dirs or None - has_tasks = False job_id_to_tasks = {} benchmark_to_judge_tasks = {} @@ -476,34 +434,20 @@ def eval( job_server_address, job_server_command, job_sandbox_env_overrides, - job_num_gpus, ) = job_args prev_tasks = _task_dependencies for _ in range(dependent_jobs + 1): has_tasks = True - combined_cmd = pipeline_utils.wrap_python_path(cmd=combine_cmds(cmds, single_node_mode)) - - # Apply env/torchrun/container overrides from generation task classes - job_task_classes = [ - benchmarks_dict[b].generation_task_class - for b in job_benchmarks - if benchmarks_dict[b].generation_task_class is not None - ] - combined_cmd, job_container = _apply_task_overrides( - combined_cmd, job_task_classes, job_num_gpus, cluster_config - ) - new_task = pipeline_utils.add_task( exp, - cmd=combined_cmd, + cmd=pipeline_utils.wrap_python_path(cmd=combine_cmds(cmds, single_node_mode)), task_name=f"{expname}-{'-'.join(job_benchmarks)}", log_dir=log_dir, - container=main_container or job_container, + container=main_container or cluster_config["containers"]["nemo-skills"], cluster_config=cluster_config, partition=partition, account=account, - num_gpus=job_num_gpus, server_config=job_server_config, with_sandbox=job_needs_sandbox or with_sandbox, keep_mounts_for_sandbox=job_needs_sandbox_to_keep_mounts or keep_mounts_for_sandbox, @@ -517,7 +461,6 @@ def eval( prev_tasks if cluster_config["executor"] == "slurm" else all_tasks + _task_dependencies ), get_server_command=job_server_command, - extra_package_dirs=extra_pkg_dirs, sbatch_kwargs=sbatch_kwargs, installation_command=installation_command, skip_hf_home_check=skip_hf_home_check, @@ -666,8 +609,6 @@ def eval( command += f" --wandb_group={wandb_group} " if wandb_project: command += f" --wandb_project={wandb_project} " - if data_dir: - command += f" --data_dir={data_dir} " if metrics_kwargs: command += f" --metrics_kwargs='{kwargs_to_string(metrics_kwargs)}' " diff --git a/nemo_skills/pipeline/utils/eval.py b/nemo_skills/pipeline/utils/eval.py index 0edc91b50f..ff1ab345fe 100644 --- a/nemo_skills/pipeline/utils/eval.py +++ b/nemo_skills/pipeline/utils/eval.py @@ -30,24 +30,10 @@ LOG = logging.getLogger(get_logger_name(__file__)) -def _resolve_generation_task_class(module_name: str): - """Import a generation module and return its GENERATION_TASK_CLASS, or None.""" - try: - if module_name.endswith(".py") or os.sep in module_name: - path_suffix = ".py" if not module_name.endswith(".py") else "" - mod = import_from_path(module_name + path_suffix) - else: - mod = importlib.import_module(module_name) - return getattr(mod, "GENERATION_TASK_CLASS", None) - except (ImportError, ModuleNotFoundError): - LOG.debug("Could not resolve GENERATION_TASK_CLASS from %s", module_name) - return None - - @dataclass class BenchmarkArgs: name: str - input_file: str | None + input_file: str generation_args: str judge_args: str judge_pipeline_args: dict @@ -60,14 +46,10 @@ class BenchmarkArgs: metrics_type: str | None = None benchmark_group: str | None = None score_module: str | None = None - self_contained_task: bool = False - num_gpus: int | None = None # For self-contained tasks that need GPU allocation on the main task job_ids: list[int] = field(default_factory=list) remaining_jobs: list[dict] = field(default_factory=list) # Per-benchmark sandbox environment overrides in KEY=VALUE form sandbox_env_overrides: list[str] = field(default_factory=list) - # Resolved GENERATION_TASK_CLASS (populated by prepare_eval_commands) - generation_task_class: type | None = None @property def requires_judge(self): @@ -109,57 +91,50 @@ def get_benchmark_args_from_module( local_data_path=None, data_dir=None, ): - skip_input_file = getattr(benchmark_module, "SKIP_INPUT_FILE", False) - self_contained_task = getattr(benchmark_module, "SELF_CONTAINED_TASK", False) - if split is None: split = get_arg_from_module_or_dict(benchmark_module, "EVAL_SPLIT", "test", override_dict) - input_file = None - if not skip_input_file: - if not is_on_cluster: - if pipeline_utils.is_mounted_filepath(cluster_config, data_path) or cluster_config["executor"] == "none": - input_file = f"{data_path}/{benchmark.replace('.', '/')}/{split}.jsonl" - if local_data_path is not None: - unmounted_path = f"{local_data_path}/{benchmark.replace('.', '/')}/{split}.jsonl" - else: - unmounted_input_file = pipeline_utils.get_unmounted_path(cluster_config, input_file) - unmounted_path = str( - Path(__file__).parents[3] / unmounted_input_file.replace("/nemo_run/code/", "") - ) - else: - # will be copied over in this case as it must come from extra datasets - input_file = f"/nemo_run/code/{Path(data_path).name}/{benchmark.replace('.', '/')}/{split}.jsonl" - unmounted_path = Path(data_path) / benchmark.replace(".", "/") / f"{split}.jsonl" - else: - # on cluster we will always use the mounted path + if not is_on_cluster: + if pipeline_utils.is_mounted_filepath(cluster_config, data_path) or cluster_config["executor"] == "none": input_file = f"{data_path}/{benchmark.replace('.', '/')}/{split}.jsonl" - unmounted_path = pipeline_utils.get_unmounted_path(cluster_config, input_file) - - unmounted_path = str(unmounted_path) - # When data_dir is specified, use it for both input_file and the existence check - # data_dir is always assumed to be a mounted path - if data_dir: - data_dir_unmounted = pipeline_utils.get_unmounted_path(cluster_config, data_dir) - input_file = f"{data_dir}/{benchmark.replace('.', '/')}/{split}.jsonl" - check_path = f"{data_dir_unmounted}/{benchmark.replace('.', '/')}/{split}.jsonl" - else: - check_path = unmounted_path - # checking if data file exists (can check locally as well) - if is_on_cluster: - if not pipeline_utils.cluster_path_exists(cluster_config, check_path): - raise ValueError( - f"Data file {check_path} does not exist on cluster. " - "Please check the benchmark and split parameters. " - "Did you forget to run prepare data commands or add data_dir argument?" - ) + if local_data_path is not None: + unmounted_path = f"{local_data_path}/{benchmark.replace('.', '/')}/{split}.jsonl" + else: + unmounted_input_file = pipeline_utils.get_unmounted_path(cluster_config, input_file) + unmounted_path = str(Path(__file__).parents[3] / unmounted_input_file.replace("/nemo_run/code/", "")) else: - if not Path(check_path).exists(): - raise ValueError( - f"Data file {check_path} does not exist locally. " - "Please check the benchmark and split parameters. " - "Did you forget to run prepare data commands or add data_dir argument?" - ) + # will be copied over in this case as it must come from extra datasets + input_file = f"/nemo_run/code/{Path(data_path).name}/{benchmark.replace('.', '/')}/{split}.jsonl" + unmounted_path = Path(data_path) / benchmark.replace(".", "/") / f"{split}.jsonl" + else: + # on cluster we will always use the mounted path + input_file = f"{data_path}/{benchmark.replace('.', '/')}/{split}.jsonl" + unmounted_path = pipeline_utils.get_unmounted_path(cluster_config, input_file) + + unmounted_path = str(unmounted_path) + # When data_dir is specified, use it for both input_file and the existence check + # data_dir is always assumed to be a mounted path + if data_dir: + data_dir_unmounted = pipeline_utils.get_unmounted_path(cluster_config, data_dir) + input_file = f"{data_dir}/{benchmark.replace('.', '/')}/{split}.jsonl" + check_path = f"{data_dir_unmounted}/{benchmark.replace('.', '/')}/{split}.jsonl" + else: + check_path = unmounted_path + # checking if data file exists (can check locally as well) + if is_on_cluster: + if not pipeline_utils.cluster_path_exists(cluster_config, check_path): + raise ValueError( + f"Data file {check_path} does not exist on cluster. " + "Please check the benchmark and split parameters. " + "Did you forget to run prepare data commands or add data_dir argument?" + ) + else: + if not Path(check_path).exists(): + raise ValueError( + f"Data file {check_path} does not exist locally. " + "Please check the benchmark and split parameters. " + "Did you forget to run prepare data commands or add data_dir argument?" + ) # this is deprecated, should remove in the future prompt_config = get_arg_from_module_or_dict(benchmark_module, "PROMPT_CONFIG", "", override_dict=override_dict) @@ -171,11 +146,6 @@ def get_benchmark_args_from_module( if eval_args: generation_args = f"{eval_args} {generation_args}" generation_args += f" ++eval_config.split={split} " - - # Let the dataset module inject extra generation args (e.g. ++vlm_dataset=) - if hasattr(benchmark_module, "get_extra_generation_args"): - generation_args += benchmark_module.get_extra_generation_args(benchmark) - requires_sandbox = get_arg_from_module_or_dict(benchmark_module, "REQUIRES_SANDBOX", False, override_dict) keep_mounts_for_sandbox = get_arg_from_module_or_dict( benchmark_module, "KEEP_MOUNTS_FOR_SANDBOX", False, override_dict @@ -232,7 +202,6 @@ def get_benchmark_args_from_module( benchmark_group=benchmark_group, metrics_type=metrics_type, sandbox_env_overrides=sandbox_env_overrides, - self_contained_task=self_contained_task, ) @@ -397,24 +366,6 @@ def prepare_eval_commands( ): LOG.warning("Found benchmark (%s) which requires sandbox to keep mounts, enabling it.", benchmark) - # Resolve GENERATION_TASK_CLASS for each benchmark and query declarative attributes. - # Each task class declares its own is_self_contained(), get_env_prefix(), etc. - for ba in benchmarks_dict.values(): - effective_module = generation_module or ba.generation_module - task_cls = _resolve_generation_task_class(effective_module) - ba.generation_task_class = task_cls - if task_cls is not None and hasattr(task_cls, "is_self_contained"): - if task_cls.is_self_contained(extra_arguments): - ba.self_contained_task = True - if server_parameters["server_gpus"]: - ba.num_gpus = server_parameters["server_gpus"] - # Allow task class to override metrics_type (e.g. mcore_skills uses - # VLMEvalKit evaluation and writes eval_kit_metrics.json). - if task_cls is not None and hasattr(task_cls, "METRICS_TYPE_OVERRIDE"): - ba.metrics_type = task_cls.METRICS_TYPE_OVERRIDE - - has_self_contained = any(ba.self_contained_task for ba in benchmarks_dict.values()) - total_evals = 0 for benchmark, benchmark_args in benchmarks_dict.items(): if benchmark_args.num_samples == 0: @@ -445,12 +396,6 @@ def prepare_eval_commands( # if num_jobs is -1, we run all benchmarks in parallel num_jobs = total_evals - # Self-contained tasks (e.g., eval_kit mcore mode) bypass the server/client split - # and manage their own GPU allocation, so each benchmark must get its own job (no grouping). - if has_self_contained and num_jobs != total_evals: - LOG.info("Self-contained tasks detected, forcing num_jobs = total_evals (no job grouping).") - num_jobs = total_evals - if num_jobs == 0: return benchmarks_dict, [] @@ -463,7 +408,6 @@ def prepare_eval_commands( cur_job_idx = 0 get_random_port = pipeline_utils.should_get_random_port(server_parameters["server_gpus"], exclusive) - job_server_config, job_server_address, job_extra_arguments = pipeline_utils.configure_client( **server_parameters, extra_arguments=extra_arguments, @@ -517,31 +461,10 @@ def prepare_eval_commands( "which is not supported for evaluation when grouping jobs." ) - # Self-contained tasks don't use NeMo Skills server, so skip - # server-related args that configure_client adds to job_extra_arguments. - # Tasks with configure_client_overrides translate server params into - # their own config format (e.g. eval_kit uses flat ++server_url instead - # of nested ++server.* overrides). - if benchmark_args.self_contained_task: - effective_extra_args = extra_arguments - elif hasattr(generation_task, "configure_client_overrides"): - # rsplit to handle URLs like http://host:port (takes last colon) - host, port = (job_server_address or "localhost:5000").rsplit(":", 1) - model = server_parameters["model"] - server_type = server_parameters["server_type"] - task_overrides = generation_task.configure_client_overrides( - host=host, - port=int(port), - model=model, - server_type=server_type, - ) - effective_extra_args = f"{task_overrides} {extra_arguments}" - else: - effective_extra_args = job_extra_arguments full_extra_arguments = ( f"{generation_task.get_generation_default_args()} " f"{benchmark_args.generation_args} " - f"{effective_extra_args} " + f"{job_extra_arguments} " ) cmd = pipeline_utils.get_generation_cmd( @@ -581,43 +504,30 @@ def prepare_eval_commands( env_source[key] = b job_sandbox_env_overrides = [f"{k}={v}" for k, v in env_map.items()] - # For self-contained tasks, override server config and get num_gpus - job_num_gpus = None - is_self_contained_job = any(benchmarks_dict[b].self_contained_task for b in job_benchmarks) - if is_self_contained_job: - effective_server_config = None - for b in job_benchmarks: - if benchmarks_dict[b].num_gpus is not None: - job_num_gpus = benchmarks_dict[b].num_gpus - break - else: - effective_server_config = job_server_config - # TODO: move to a dataclass job_batches.append( ( job_cmds, - sorted(job_benchmarks), + job_benchmarks, job_needs_sandbox, job_needs_sandbox_to_keep_mounts, - effective_server_config, + job_server_config, job_server_address, # a check above guarantees that this is the same for all tasks in a job generation_task.get_server_command_fn(), job_sandbox_env_overrides, - job_num_gpus, ) ) - for job_benchmark in job_benchmarks: - benchmarks_dict[job_benchmark].job_ids.append(cur_job_idx) - cur_job_idx += 1 - job_cmds = [] - job_benchmarks = set() job_server_config, job_server_address, job_extra_arguments = pipeline_utils.configure_client( **server_parameters, extra_arguments=extra_arguments, get_random_port=get_random_port, ) + for job_benchmark in job_benchmarks: + benchmarks_dict[job_benchmark].job_ids.append(cur_job_idx) + cur_job_idx += 1 + job_cmds = [] + job_benchmarks = set() cur_eval += 1 diff --git a/nemo_skills/pipeline/utils/generation.py b/nemo_skills/pipeline/utils/generation.py index 812120655c..4432181cff 100644 --- a/nemo_skills/pipeline/utils/generation.py +++ b/nemo_skills/pipeline/utils/generation.py @@ -136,15 +136,9 @@ def build_requirements_venv_cmd(requirements: list[str]) -> str: 'mkdir -p "$VENV_ROOT" && ' 'if [ ! -f "$READY_FILE" ]; then ' ' if mkdir "$LOCK_DIR" 2>/dev/null; then ' - " if command -v uv >/dev/null 2>&1; then " - ' if ! uv venv --system-site-packages "$VENV_DIR"; then rmdir "$LOCK_DIR"; exit 1; fi; ' - ' . "$VENV_DIR/bin/activate"; ' - ' if ! uv pip install -r "$REQS_FILE"; then rmdir "$LOCK_DIR"; exit 1; fi; ' - " else " - ' if ! python3 -m venv --system-site-packages "$VENV_DIR"; then rmdir "$LOCK_DIR"; exit 1; fi; ' - ' . "$VENV_DIR/bin/activate"; ' - ' if ! python3 -m pip install -r "$REQS_FILE"; then rmdir "$LOCK_DIR"; exit 1; fi; ' - " fi; " + ' if ! uv venv --system-site-packages "$VENV_DIR"; then rmdir "$LOCK_DIR"; exit 1; fi; ' + ' . "$VENV_DIR/bin/activate"; ' + ' if ! uv pip install -r "$REQS_FILE"; then rmdir "$LOCK_DIR"; exit 1; fi; ' ' touch "$READY_FILE"; ' ' rmdir "$LOCK_DIR"; ' " else " @@ -440,6 +434,8 @@ def get_generation_cmd( If requirements are provided, a per-requirements uv venv is prepared and activated before running the generation command. """ + if input_file is None and input_dir is None: + raise ValueError("Either input_file or input_dir must be provided.") if input_file is not None and input_dir is not None: raise ValueError("Please provide either input_file or input_dir, not both.") @@ -462,9 +458,7 @@ def get_generation_cmd( hydra_config_args, override_args = separate_hydra_args(extra_arguments) # Handle file paths vs module names - common_args = f"++skip_filled=True ++output_file={output_file}" - if input_file is not None: - common_args += f" ++input_file={input_file}" + common_args = f"++skip_filled=True ++input_file={input_file} ++output_file={output_file}" if script.endswith(".py") or os.sep in script: # It's a file path, run it directly with .py extension script_path = script if script.endswith(".py") else f"{script}.py" diff --git a/requirements/eval-kit.txt b/requirements/eval-kit.txt deleted file mode 100644 index dcfc99670b..0000000000 --- a/requirements/eval-kit.txt +++ /dev/null @@ -1,3 +0,0 @@ -# VLMEvalKit (vlmeval) is installed at job start via --installation_command -# in run_eval_kit.sh (pip install from mounted cluster source). -# This file is kept as a placeholder; no venv-based requirements are needed.