diff --git a/nemo_skills/dataset/mmau-pro/closed_form/__init__.py b/nemo_skills/dataset/mmau-pro/closed_form/__init__.py index 4e3b424d84..4390c1d887 100644 --- a/nemo_skills/dataset/mmau-pro/closed_form/__init__.py +++ b/nemo_skills/dataset/mmau-pro/closed_form/__init__.py @@ -16,6 +16,7 @@ METRICS_TYPE = "mmau_pro_closed_form" SCORE_MODULE = "nemo_skills.evaluation.metrics.mmau_pro_metrics" GENERATION_ARGS = "++prompt_format=openai" +EVAL_ARGS = "++eval_type=mmau-pro" # NVEmbed judge configuration for closed-form evaluation JUDGE_PIPELINE_ARGS = { diff --git a/nemo_skills/dataset/mmau-pro/open_ended/__init__.py b/nemo_skills/dataset/mmau-pro/open_ended/__init__.py index 22773d6fed..c5f09272d2 100644 --- a/nemo_skills/dataset/mmau-pro/open_ended/__init__.py +++ b/nemo_skills/dataset/mmau-pro/open_ended/__init__.py @@ -23,4 +23,4 @@ "server_type": "openai", "server_address": "https://integrate.api.nvidia.com/v1", } -JUDGE_ARGS = "++prompt_config=judge/speechlm ++generation_key=judgement" +JUDGE_ARGS = "++prompt_config=judge/mmau-pro ++generation_key=judgement" diff --git a/nemo_skills/dataset/mmau-pro/prepare.py b/nemo_skills/dataset/mmau-pro/prepare.py index a6f04d621b..0ea66ec2b7 100644 --- a/nemo_skills/dataset/mmau-pro/prepare.py +++ b/nemo_skills/dataset/mmau-pro/prepare.py @@ -75,8 +75,8 @@ def format_entry(entry, with_audio=False): if category == "open": content = entry["question"] elif choices and len(choices) > 1: - options_text = "\n".join(f"{chr(65 + i)}. {choice}" for i, choice in enumerate(choices)) - content = f"{entry['question']}\n\n{options_text}" + options_text = "\n".join(f"{chr(65 + i)}) {choice}" for i, choice in enumerate(choices)) + content = f"{entry['question']}\n\n{options_text}\n\nRespond with the complete text of the correct option, not just the letter." else: content = entry["question"] @@ -84,13 +84,18 @@ def format_entry(entry, with_audio=False): if entry.get("audio_path"): audio_path = entry["audio_path"] - - if isinstance(audio_path, list) and audio_path: - user_message["audios"] = [{"path": path, "duration": 10.0} for path in audio_path] - elif isinstance(audio_path, str): - user_message["audio"] = {"path": audio_path, "duration": 10.0} - - formatted_entry["messages"] = [user_message] + # Prepend /dataset/mmau-pro/ to make paths absolute for cluster + if len(audio_path) == 1: + user_message["audio"] = {"path": f"/dataset/mmau-pro/{audio_path[0]}"} + else: + user_message["audios"] = [{"path": f"/dataset/mmau-pro/{path}"} for path in audio_path] + + # Don't use /no_think for open-ended questions to allow reasoning + system_content = "You are a helpful assistant." + if category != "open": + system_content += " /no_think" + + formatted_entry["messages"] = [{"role": "system", "content": system_content}, user_message] return formatted_entry diff --git a/nemo_skills/evaluation/metrics/mmau_pro_metrics.py b/nemo_skills/evaluation/metrics/mmau_pro_metrics.py index f079049cc1..000dbcf13f 100644 --- a/nemo_skills/evaluation/metrics/mmau_pro_metrics.py +++ b/nemo_skills/evaluation/metrics/mmau_pro_metrics.py @@ -13,14 +13,52 @@ # limitations under the License. import logging +import re + +import numpy as np from nemo_skills.evaluation.metrics.base import BaseMetrics, as_int, as_percentage -from nemo_skills.evaluation.metrics.utils import is_correct_judgement from nemo_skills.utils import get_logger_name LOG = logging.getLogger(get_logger_name(__file__)) +def extract_multicriteria_scores(judgement_text: str) -> dict[str, float]: + """Extract multi-criteria scores (1-5 scale) from LLM judge evaluation. + + Expected format: + CORRECTNESS: [score] - [justification] + RELEVANCE: [score] - [justification] + COMPLETENESS: [score] - [justification] + CLARITY: [score] - [justification] + OVERALL: [score] - [overall assessment] + + Returns: + Dictionary with keys: correctness, relevance, completeness, clarity, overall + Defaults to 3.0 if score not found. + """ + scores = {} + + patterns = { + "correctness": r"CORRECTNESS:\s*(\d+(?:\.\d+)?)", + "relevance": r"RELEVANCE:\s*(\d+(?:\.\d+)?)", + "completeness": r"COMPLETENESS:\s*(\d+(?:\.\d+)?)", + "clarity": r"CLARITY:\s*(\d+(?:\.\d+)?)", + "overall": r"OVERALL:\s*(\d+(?:\.\d+)?)", + } + + for criterion, pattern in patterns.items(): + match = re.search(pattern, judgement_text, re.IGNORECASE) + scores[criterion] = float(match.group(1)) if match else 3.0 + + # Fallback: compute overall if missing or still 3.0 + if "overall" not in scores or scores["overall"] == 3.0: + criteria_scores = [scores.get(k, 3.0) for k in ["correctness", "relevance", "completeness", "clarity"]] + scores["overall"] = sum(criteria_scores) / len(criteria_scores) + + return scores + + class MMAUProMetrics(BaseMetrics): """Metrics class for MMAU-Pro benchmark (all subgroups).""" @@ -28,16 +66,24 @@ def __init__(self, compute_no_answer: bool = True, max_k: int = 1): super().__init__(compute_no_answer=compute_no_answer) self.max_k = max_k + # Track multi-criteria scores for open-ended questions (1-5 scale) + self.multicriteria_scores = { + "correctness": [], + "relevance": [], + "completeness": [], + "clarity": [], + "overall": [], + } + def _get_score_dict(self, prediction: dict) -> dict[str, bool | int | float]: """Extract correctness scores from prediction.""" score_dict = {} - # Open-ended: extract from judge result + # Open-ended: use LLM judge correctness score >= 3 as correct if "judgement" in prediction: - judge_result = is_correct_judgement(prediction["judgement"]) - score_dict["judge_correct"] = judge_result - score_dict["correct"] = judge_result - # Closed-form and instruction following: use is_correct + multicriteria = extract_multicriteria_scores(prediction["judgement"]) + score_dict["correct"] = multicriteria.get("correctness", 3.0) >= 3.0 + # Closed-form / instruction-following: use binary correctness elif "is_correct" in prediction: score_dict["correct"] = prediction["is_correct"] else: @@ -58,24 +104,61 @@ def get_incorrect_sample(self, prediction: dict) -> dict: def update(self, predictions): """Update metrics with new predictions.""" super().update(predictions) + predicted_answers = [pred.get("generation", None).strip() or None for pred in predictions] self._compute_pass_at_k(predictions=predictions, predicted_answers=predicted_answers) self._compute_majority_at_k(predictions=predictions, predicted_answers=predicted_answers) + # Collect multi-criteria scores for open-ended questions + for pred in predictions: + if "judgement" in pred: + multicriteria = extract_multicriteria_scores(pred["judgement"]) + for criterion in self.multicriteria_scores: + self.multicriteria_scores[criterion].append(multicriteria.get(criterion, 3.0)) + def get_metrics(self): """Get computed metrics.""" metrics_dict = super().get_metrics() + for agg_mode, agg_metrics in metrics_dict.items(): - # Ensure avg_tokens is always present for MMAU-Pro + # Ensure avg_tokens is present if "avg_tokens" not in agg_metrics: agg_metrics["avg_tokens"] = 0 if "no_answer" in agg_metrics: agg_metrics["no_answer"] = agg_metrics["no_answer"] / 2.0 - # Set success_rate from correct or judge_correct - if "judge_correct" in agg_metrics: - agg_metrics["success_rate"] = agg_metrics["judge_correct"] + + # Add multi-criteria averages for open-ended (convert 1-5 scale to percentage) + if self.multicriteria_scores["overall"]: + for criterion in self.multicriteria_scores: + scores = self.multicriteria_scores[criterion] + if scores: + # Convert 1-5 scale to 0-100 percentage scale + avg_score = np.mean(scores) + std_score = np.std(scores) + agg_metrics[f"avg_{criterion}"] = (avg_score / 5.0) * 100 + agg_metrics[f"std_{criterion}"] = (std_score / 5.0) * 100 + + # Set correct and success_rate to avg_correctness for open-ended + agg_metrics["correct"] = agg_metrics["avg_correctness"] + agg_metrics["success_rate"] = agg_metrics["avg_correctness"] + + # Calculate good/poor response rates based on overall >= 4 or <= 2 + overall_scores = self.multicriteria_scores["overall"] + good_responses = sum(1 for score in overall_scores if score >= 4.0) + poor_responses = sum(1 for score in overall_scores if score <= 2.0) + + agg_metrics["good_response_rate"] = (good_responses / len(overall_scores)) * 100 + agg_metrics["poor_response_rate"] = (poor_responses / len(overall_scores)) * 100 + + # For closed-form / instruction-following: use binary correctness elif "correct" in agg_metrics: agg_metrics["success_rate"] = agg_metrics["correct"] + + # Round all numeric values to 2 decimal places + for key, value in agg_metrics.items(): + if isinstance(value, float) and not isinstance(value, bool): + agg_metrics[key] = round(value, 2) + return metrics_dict def metrics_to_print(self): @@ -87,5 +170,20 @@ def metrics_to_print(self): } if self.compute_no_answer: base_metrics["no_answer"] = as_percentage + + # Add multi-criteria metrics for open-ended questions (now in percentage format) + if self.multicriteria_scores["overall"]: + base_metrics.update( + { + "avg_overall": as_percentage, + "avg_correctness": as_percentage, + "avg_relevance": as_percentage, + "avg_completeness": as_percentage, + "avg_clarity": as_percentage, + "good_response_rate": as_percentage, + "poor_response_rate": as_percentage, + } + ) + base_metrics["num_entries"] = as_int return base_metrics diff --git a/nemo_skills/inference/generate.py b/nemo_skills/inference/generate.py index aae36c7351..fd46cc3a9b 100644 --- a/nemo_skills/inference/generate.py +++ b/nemo_skills/inference/generate.py @@ -38,6 +38,8 @@ supports_single_eval, ) from nemo_skills.inference.model import ( + AudioProcessor, + AudioProcessorConfig, ParallelThinkingConfig, get_code_execution_model, get_model, @@ -186,9 +188,18 @@ class GenerateSolutionsConfig: # If True, will enable litellm disk cache (useful for keeping intermediate results in case of job timelimit failures) enable_litellm_cache: bool = False + # List of content types to drop from messages (e.g., base64 audio) to keep output files smaller + drop_content_types: list[str] = field(default_factory=lambda: ["audio_url"]) + + # Audio processing configuration (EXPERIMENTAL) + # Set to enable audio file preprocessing (file->base64 conversion, chunking for long audio) + # Example: ++audio.data_dir=/path/to/audio ++audio.enable_chunking=true + audio: AudioProcessorConfig | None = None + # Evaluation setup if requested. If eval_type is set to None, evaluation is skipped eval_type: str | None = None # "lean4-proof", "math", etc. eval_config: dict = field(default_factory=dict) # Config for the evaluator + dataset_group: str | None = None # "math", "code", "speechlm", etc. from benchmark's DATASET_GROUP def __post_init__(self): self._post_init_validate_data() @@ -393,6 +404,32 @@ def setup_llm(self): else: llm = get_model(**self.cfg.server, tokenizer=self.tokenizer) + # Audio wrapper (preprocesses messages before they reach the model) + # Auto-enable for audio benchmarks on vLLM (eval_type=audio OR dataset_group=speechlm) + should_enable_audio = self.cfg.audio is not None or ( + self.cfg.server.get("server_type", "").lower() == "vllm" + and (self.cfg.eval_type == "audio" or self.cfg.dataset_group == "speechlm") + ) + + if should_enable_audio: + audio_supported_servers = {"vllm"} + server_type = self.cfg.server.get("server_type", "").lower() + if server_type not in audio_supported_servers: + raise ValueError( + f"Audio processing is not supported for server_type='{server_type}'. " + f"Supported server types: {audio_supported_servers}" + ) + + # Use provided config or create default + audio_config = self.cfg.audio if self.cfg.audio is not None else AudioProcessorConfig() + + llm = AudioProcessor( + llm, + audio_config, + eval_config=dict(self.cfg.eval_config), + eval_type=self.cfg.eval_type, + ) + if self.cfg.parallel_thinking.mode is not None: # We don't want to override these key variables which overlap with self.cfg inference_override_config = { @@ -519,12 +556,45 @@ def fill_prompt(self, data_point, data): filled_prompt[-1]["content"] += self.cfg.prompt_suffix else: filled_prompt += self.cfg.prompt_suffix + + # Copy audio fields from data_point to the user message for audio models + if isinstance(filled_prompt, list): + for msg in filled_prompt: + if msg.get("role") == "user": + if "audio" in data_point: + msg["audio"] = data_point["audio"] + break + return filled_prompt + def preprocess_prompt(self, prompt: str | list[dict]) -> str | list[dict]: + """Preprocess the prompt before sending to the model. + + Override this method to add custom preprocessing logic. + Audio conversion is handled by the AudioProcessor wrapper. + """ + return prompt + def dump_outputs(self, outputs, data_points, fout): for output in outputs: fout.write(json.dumps(output) + "\n") + def drop_binary_data(self, output): + """Remove binary data (like base64 audio) from messages to keep output files smaller.""" + # Skip if output doesn't have messages (e.g., text completion mode or error cases) + if "messages" not in output: + return + + for message in output["messages"]: + # Skip if content is not a list (e.g., string content in system messages) + if not isinstance(message.get("content"), list): + continue + + # Filter out content types specified in drop_content_types config + message["content"] = [ + content for content in message["content"] if content.get("type") not in self.cfg.drop_content_types + ] + async def postprocess_single_output(self, output, original_data_point): # to make it easier to follow up with other generations and limit accidental errors, we are adding # all of the original data to the output file alongside the new generations @@ -540,6 +610,9 @@ async def postprocess_single_output(self, output, original_data_point): for key in output: original_data_point.pop(key, None) output.update(original_data_point) + + self.drop_binary_data(output) + if self.cfg.parse_reasoning: parse_reasoning( output, @@ -570,13 +643,20 @@ async def process_single_datapoint(self, data_point, all_data): # Already a dict from Hydra inference_params = dict(self.cfg.inference) + prompt = self.fill_prompt(data_point, all_data) + prompt = self.preprocess_prompt(prompt) + generation_params = { **inference_params, **self.extra_generate_params, - "prompt": self.fill_prompt(data_point, all_data), + "prompt": prompt, "stop_phrases": [self.cfg.stop_phrase] if self.cfg.stop_phrase else None, } + # Pass task_type for audio chunking + if isinstance(self.llm, AudioProcessor) and "task_type" in data_point: + generation_params["task_type"] = data_point["task_type"] + if self.cfg.code_execution: if self.cfg.override_max_code_executions and self.cfg.total_code_executions_in_prompt is not None: generation_params["max_code_executions"] = data_point["total_code_executions"] diff --git a/nemo_skills/inference/model/__init__.py b/nemo_skills/inference/model/__init__.py index bd2e246499..214c28b500 100644 --- a/nemo_skills/inference/model/__init__.py +++ b/nemo_skills/inference/model/__init__.py @@ -12,6 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. +"""Model wrappers and factories for inference. + +This module provides: +- Base model implementations (VLLMModel, OpenAIModel, etc.) +- Wrappers for additional capabilities: + - AudioProcessor: Audio file preprocessing and chunking + - CodeExecutionWrapper: Code execution in sandboxes + - ToolCallingWrapper: Tool/function calling support + - ParallelThinkingTask: Parallel thinking/reasoning +- Factory functions: get_model, get_code_execution_model, get_tool_calling_model, etc. +""" + import dataclasses from nemo_skills.mcp.utils import locate @@ -19,6 +31,9 @@ # NIM models (speech) from .asr_nim import ASRNIMModel + +# Audio processing +from .audio_processor import AudioProcessor, AudioProcessorConfig from .azure import AzureOpenAIModel # Base classes @@ -80,6 +95,33 @@ def get_model(server_type, tokenizer=None, model_class: str | None = None, **kwa return loaded_class(tokenizer=tokenizer, **kwargs) +def get_audio_model(server_type, audio_config: dict | AudioProcessorConfig | None = None, **kwargs): + """A helper function to create a model with audio processing capabilities. + + This wraps the base model with AudioProcessor for handling audio files in messages. + + Args: + server_type: The type of server (vllm, sglang, openai, etc.) + audio_config: Audio processing configuration. Can be: + - None: No audio processing (returns base model) + - dict: Will be converted to AudioProcessorConfig + - AudioProcessorConfig: Used directly + **kwargs: Additional arguments passed to get_model() + + Returns: + Model optionally wrapped with AudioProcessor + """ + model = get_model(server_type=server_type, **kwargs) + + if audio_config is None: + return model + + if isinstance(audio_config, dict): + audio_config = AudioProcessorConfig(**audio_config) + + return AudioProcessor(model, audio_config) + + def get_code_execution_model(server_type, tokenizer=None, code_execution=None, sandbox=None, **kwargs): """A helper function to make it easier to set server through cmd.""" model = get_model(server_type=server_type, tokenizer=tokenizer, **kwargs) diff --git a/nemo_skills/inference/model/audio_processor.py b/nemo_skills/inference/model/audio_processor.py new file mode 100644 index 0000000000..29c49b80b1 --- /dev/null +++ b/nemo_skills/inference/model/audio_processor.py @@ -0,0 +1,374 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Audio processing wrapper for multimodal models. + +This module provides an AudioProcessor wrapper that can be composed with any +BaseModel to add audio preprocessing capabilities. It handles: +- Converting audio file paths to base64-encoded audio_url format +- Chunking long audio files for models with duration limits +- Aggregating results from chunked audio processing +""" + +import base64 +import logging +import os + +from nemo_skills.utils import get_logger_name, nested_dataclass + +LOG = logging.getLogger(get_logger_name(__file__)) + + +@nested_dataclass(kw_only=True) +class AudioProcessorConfig: + """Configuration for audio preprocessing. + + Attributes: + data_dir: Base directory for resolving relative audio file paths. + enable_chunking: Whether to chunk long audio files. + chunk_task_types: If None, chunk all task types; if specified, only chunk these. + chunk_threshold_sec: Audio duration threshold (in seconds) above which to chunk. + """ + + data_dir: str = "" + enable_chunking: bool = True + chunk_task_types: list[str] | None = None + chunk_threshold_sec: int = 30 + + +def audio_file_to_base64(audio_file_path: str) -> str: + """Encodes an audio file into a base64 string.""" + with open(audio_file_path, "rb") as audio_file: + audio_content = audio_file.read() + return base64.b64encode(audio_content).decode("utf-8") + + +def load_audio_file(audio_file_path: str): + """Load audio file and return array and sampling rate.""" + import soundfile as sf + + audio_array, sampling_rate = sf.read(audio_file_path) + return audio_array, sampling_rate + + +def chunk_audio(audio_array, sampling_rate, chunk_duration_sec=30): + """Chunk audio array into segments of specified duration. + + Args: + audio_array: Audio data as numpy array + sampling_rate: Sampling rate in Hz + chunk_duration_sec: Duration of each chunk in seconds + + Returns: + List of audio chunks + """ + import numpy as np + + chunk_samples = int(chunk_duration_sec * sampling_rate) + num_chunks = int(np.ceil(len(audio_array) / chunk_samples)) + + chunks = [] + for i in range(num_chunks): + start = i * chunk_samples + end = min((i + 1) * chunk_samples, len(audio_array)) + chunks.append(audio_array[start:end]) + + return chunks + + +def save_audio_chunk_to_base64(audio_chunk, sampling_rate) -> str: + """Save audio chunk to temporary file and convert to base64. + + Args: + audio_chunk: Audio data as numpy array + sampling_rate: Sampling rate in Hz + + Returns: + Base64 encoded audio string + """ + import tempfile + + import soundfile as sf + + # Create temporary file + with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file: + tmp_path = tmp_file.name + sf.write(tmp_path, audio_chunk, sampling_rate) + + try: + # Read and encode + with open(tmp_path, "rb") as f: + audio_content = f.read() + encoded = base64.b64encode(audio_content).decode("utf-8") + finally: + # Clean up + if os.path.exists(tmp_path): + os.unlink(tmp_path) + + return encoded + + +class AudioProcessor: + """Wraps any model to add audio preprocessing capabilities. + + This wrapper handles: + - Converting audio file paths in messages to base64-encoded audio_url format + - Chunking long audio files and aggregating results + - Passing through all other requests unchanged + + Example usage: + model = get_model(server_type="vllm", ...) + audio_model = AudioProcessor(model, AudioProcessorConfig(), eval_config={...}, eval_type="audio") + result = await audio_model.generate_async(prompt=messages, ...) + """ + + def __init__( + self, + model, + config: AudioProcessorConfig, + eval_config: dict | None = None, + eval_type: str | None = None, + ): + """Initialize AudioProcessor wrapper. + + Args: + model: The underlying model to wrap (must have generate_async method) + config: Audio processing configuration + eval_config: Optional eval config dict (contains "data_dir" key) for inferring data_dir + eval_type: Optional eval type string for inferring data_dir + """ + self.model = model + self.config = config + + # Resolve data_dir: explicit config takes precedence, then infer from eval_config + if config.data_dir: + self.data_dir = config.data_dir + elif eval_config is not None and eval_type is not None: + eval_data_dir = eval_config.get("data_dir") + if eval_data_dir is not None: + self.data_dir = os.path.join(eval_data_dir, eval_type) + else: + self.data_dir = "" + else: + self.data_dir = "" + + # Expose common model attributes for compatibility + if hasattr(model, "model_name_or_path"): + self.model_name_or_path = model.model_name_or_path + if hasattr(model, "tokenizer"): + self.tokenizer = model.tokenizer + + async def generate_async( + self, + prompt: str | list[dict] | None = None, + task_type: str = None, + **kwargs, + ) -> dict: + """Generate with automatic audio preprocessing and chunking. + + If the prompt contains audio that needs chunking, processes each chunk + separately and aggregates results. Otherwise, converts audio to base64 + and passes through to the underlying model. + + Args: + prompt: Either a string (text completion) or list of messages (chat) + task_type: Optional task type for chunking filtering + **kwargs: Additional arguments passed to the underlying model + + Returns: + Generation result dict with 'generation' key and optional metadata + """ + if isinstance(prompt, list): + messages = prompt + needs_chunking, audio_path, duration = self._check_chunking_needed(messages, task_type) + + if needs_chunking: + return await self._generate_with_chunking(messages, audio_path, duration, **kwargs) + + # Convert audio fields to base64 format + messages = self._prepare_audio_messages(messages) + prompt = messages + + return await self.model.generate_async(prompt=prompt, **kwargs) + + def _prepare_audio_messages(self, messages: list[dict]) -> list[dict]: + """Convert audio file references in messages to base64-encoded audio_url format. + + Handles 'audio' or 'audios' keys in messages and converts them to + base64-encoded audio_url content items. + + CRITICAL: Audio must come BEFORE text for Qwen models to transcribe correctly. + """ + prepared_messages = [] + + for message in messages: + msg = message.copy() + + if "audio" not in msg and "audios" not in msg: + prepared_messages.append(msg) + continue + + # Convert content to list format if needed + content = msg.get("content", "") + if isinstance(content, str): + text_content = [{"type": "text", "text": content}] + elif isinstance(content, list): + text_content = content + else: + raise TypeError(f"Unexpected content type: {type(content)}") + + # Build audio content items + audio_items = [] + + if "audio" in msg: + audio = msg["audio"] + audio_path = os.path.join(self.data_dir, audio["path"]) + base64_audio = audio_file_to_base64(audio_path) + audio_items.append( + {"type": "audio_url", "audio_url": {"url": f"data:audio/wav;base64,{base64_audio}"}} + ) + del msg["audio"] + elif "audios" in msg: + for audio in msg["audios"]: + audio_path = os.path.join(self.data_dir, audio["path"]) + base64_audio = audio_file_to_base64(audio_path) + audio_items.append( + {"type": "audio_url", "audio_url": {"url": f"data:audio/wav;base64,{base64_audio}"}} + ) + del msg["audios"] + + # Audio items BEFORE text content (required for Qwen models) + msg["content"] = audio_items + text_content + prepared_messages.append(msg) + + return prepared_messages + + def _check_chunking_needed(self, messages: list[dict], task_type: str = None) -> tuple[bool, str, float]: + """Check if audio in messages needs chunking. + + Returns: + Tuple of (needs_chunking, audio_path, duration) + """ + if not self.config.enable_chunking: + return False, None, 0.0 + + # Check if task type should be chunked (if filter is specified) + if self.config.chunk_task_types is not None: + if task_type not in self.config.chunk_task_types: + return False, None, 0.0 + + # Find audio in messages + for msg in messages: + if msg.get("role") == "user": + audio_info = msg.get("audio") or (msg.get("audios", [{}])[0] if msg.get("audios") else {}) + if audio_info and "path" in audio_info: + audio_path = os.path.join(self.data_dir, audio_info["path"]) + + if not os.path.exists(audio_path): + return False, None, 0.0 + + # Load audio to check duration + try: + audio_array, sampling_rate = load_audio_file(audio_path) + duration = len(audio_array) / sampling_rate + + if duration > self.config.chunk_threshold_sec: + return True, audio_path, duration + except Exception: + pass + + return False, None, 0.0 + + async def _generate_with_chunking( + self, + messages: list[dict], + audio_path: str, + duration: float, + tokens_to_generate: int | None = None, + **kwargs, + ) -> dict: + """Generate by chunking long audio and aggregating results. + + Args: + messages: Original messages containing audio reference + audio_path: Path to the audio file to chunk + duration: Duration of audio in seconds + tokens_to_generate: Max tokens per chunk + **kwargs: Additional generation parameters + + Returns: + Aggregated result with combined generation from all chunks + """ + audio_array, sampling_rate = load_audio_file(audio_path) + chunks = chunk_audio(audio_array, sampling_rate, self.config.chunk_threshold_sec) + + LOG.info(f"Chunking audio ({duration:.1f}s) into {len(chunks)} chunks of {self.config.chunk_threshold_sec}s") + + chunk_results = [] + result = None + + for chunk_idx, audio_chunk in enumerate(chunks): + chunk_messages = [] + + for msg in messages: + msg_copy = msg.copy() + + if msg_copy.get("role") == "user" and ("audio" in msg_copy or "audios" in msg_copy): + chunk_base64 = save_audio_chunk_to_base64(audio_chunk, sampling_rate) + + content = msg_copy.get("content", "") + if isinstance(content, str): + text_content = [{"type": "text", "text": content}] + else: + text_content = content + + # Add audio chunk at the beginning (before text) + msg_copy["content"] = [ + {"type": "audio_url", "audio_url": {"url": f"data:audio/wav;base64,{chunk_base64}"}} + ] + text_content + + # Remove original audio fields + msg_copy.pop("audio", None) + msg_copy.pop("audios", None) + + chunk_messages.append(msg_copy) + + result = await self.model.generate_async( + prompt=chunk_messages, tokens_to_generate=tokens_to_generate, **kwargs + ) + + generation = result.get("generation", "") + chunk_results.append(generation.strip()) + + # Aggregate results + aggregated_text = " ".join(chunk_results) + + if result: + final_result = result.copy() + final_result["generation"] = aggregated_text + final_result["num_audio_chunks"] = len(chunks) + final_result["audio_duration"] = duration + else: + final_result = { + "generation": aggregated_text, + "num_audio_chunks": len(chunks), + "audio_duration": duration, + } + + return final_result + + # Proxy other common methods to the underlying model + def __getattr__(self, name): + """Proxy attribute access to the underlying model.""" + return getattr(self.model, name) diff --git a/nemo_skills/inference/model/vllm.py b/nemo_skills/inference/model/vllm.py index e9a2146520..ecf75617e1 100644 --- a/nemo_skills/inference/model/vllm.py +++ b/nemo_skills/inference/model/vllm.py @@ -25,8 +25,11 @@ class VLLMModel(BaseModel): - def __init__(self, **kwargs): - super().__init__(**kwargs) + """VLLM-compatible model client. + + This is a clean OpenAI-compatible client for VLLM servers. + For audio processing capabilities, wrap this model with AudioProcessor. + """ def _get_tokenizer_endpoint(self): """ diff --git a/nemo_skills/pipeline/utils/eval.py b/nemo_skills/pipeline/utils/eval.py index 736d1c7cb6..9245337064 100644 --- a/nemo_skills/pipeline/utils/eval.py +++ b/nemo_skills/pipeline/utils/eval.py @@ -131,6 +131,12 @@ def get_benchmark_args_from_module( if eval_args: generation_args = f"{eval_args} {generation_args}" generation_args += f" ++eval_config.split={split} " + + # Pass dataset_group from benchmark config if defined + dataset_group = get_arg_from_module_or_dict(benchmark_module, "DATASET_GROUP", None, override_dict=override_dict) + if dataset_group: + generation_args += f" ++dataset_group={dataset_group} " + requires_sandbox = get_arg_from_module_or_dict(benchmark_module, "REQUIRES_SANDBOX", False, override_dict) keep_mounts_for_sandbox = get_arg_from_module_or_dict( benchmark_module, "KEEP_MOUNTS_FOR_SANDBOX", False, override_dict diff --git a/nemo_skills/prompt/config/judge/mmau-pro.yaml b/nemo_skills/prompt/config/judge/mmau-pro.yaml new file mode 100644 index 0000000000..5339e4ab0d --- /dev/null +++ b/nemo_skills/prompt/config/judge/mmau-pro.yaml @@ -0,0 +1,30 @@ +# Judge prompt configuration for Speech/Audio Language Model evaluation +# Used for evaluating open-ended responses in MMAU-Pro benchmark +# Uses multi-criteria scoring on 1-5 scale + +user: |- + You are an expert evaluator for audio and speech-related questions. Please evaluate the quality of a model's response to a question. + + Question: {question} + + Reference Answer: {expected_answer} + + Model Response: {generation} + + Please evaluate the model response on the following criteria and provide scores from 1-5 (where 5 is best): + + 1. **Correctness**: How factually accurate is the response compared to the reference? + 2. **Relevance**: How well does the response address the specific question asked? + 3. **Completeness**: Does the response cover all important aspects mentioned in the reference? + 4. **Clarity**: How clear and well-structured is the response? + + For each criterion, provide: + - A score from 1-5 + - A brief justification (1-2 sentences) + + Format your response as: + CORRECTNESS: [score] - [justification] + RELEVANCE: [score] - [justification] + COMPLETENESS: [score] - [justification] + CLARITY: [score] - [justification] + OVERALL: [average score] - [overall assessment] diff --git a/nemo_skills/prompt/config/judge/speechlm.yaml b/nemo_skills/prompt/config/judge/speechlm.yaml deleted file mode 100644 index 4862558145..0000000000 --- a/nemo_skills/prompt/config/judge/speechlm.yaml +++ /dev/null @@ -1,28 +0,0 @@ -# Judge prompt configuration for Speech/Audio Language Model evaluation -# Used for evaluating open-ended responses in MMAU-Pro benchmark -# Follows nemo-skills standard Yes/No judgement pattern - -user: |- - You are an expert evaluator for audio and speech-related questions. Please evaluate whether the model's response correctly answers the question. - - Question: {question} - - Reference Answer: {expected_answer} - - Model Response: {generation} - - Your task is to determine if the model's response is correct based on the reference answer. Consider: - - 1. **Factual Accuracy**: Is the information in the response factually correct? - 2. **Relevance**: Does the response address the specific question asked? - 3. **Completeness**: Does the response cover the key points from the reference answer? - - Please first explain your reasoning in 2-3 sentences, then provide your final judgement. - - Your final judgement must be either "Yes" or "No": - - "Yes" if the model response is correct and adequately answers the question - - "No" if the model response is incorrect, irrelevant, or inadequate - - Format your response as: - Reasoning: [Your explanation] - Judgement: [Yes or No] diff --git a/tests/gpu-tests/run_qwen.sh b/tests/gpu-tests/run_qwen.sh index 10201cf9ed..01803c1dae 100755 --- a/tests/gpu-tests/run_qwen.sh +++ b/tests/gpu-tests/run_qwen.sh @@ -20,6 +20,10 @@ pytest tests/gpu-tests/test_contamination.py -s -x # Tool calling tests (uses same Qwen3-4B-Instruct model) pytest tests/gpu-tests/test_tool_calling.py -s -x +# Audio tests (requires Qwen2.5-Omni model) +export NEMO_SKILLS_TEST_HF_MODEL=Qwen/Qwen2.5-Omni-7B +pytest tests/gpu-tests/test_vllm_audio.py -s -x + # TODO: Add fast context retry tests # pytest tests/gpu-tests/test_context_retry.py -s -x diff --git a/tests/gpu-tests/test_vllm_audio.py b/tests/gpu-tests/test_vllm_audio.py new file mode 100644 index 0000000000..2f5d33ef94 --- /dev/null +++ b/tests/gpu-tests/test_vllm_audio.py @@ -0,0 +1,83 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import shutil +import subprocess +import tempfile +from pathlib import Path + +import pytest +from utils import require_env_var + + +@pytest.mark.gpu +def test_vllm_audio_generation(): + """Integration test: Generate with vLLM server using audio input.""" + model_path = require_env_var("NEMO_SKILLS_TEST_HF_MODEL") + model_type = require_env_var("NEMO_SKILLS_TEST_MODEL_TYPE") + + output_dir = f"/tmp/nemo-skills-tests/{model_type}/vllm-audio-generation" + # Clean up output directory + if Path(output_dir).exists(): + shutil.rmtree(output_dir) + + # Create test input file with audio + with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f: + test_data = [ + { + "question": "Transcribe this audio", + "audio": {"path": "/nemo_run/code/tests/slurm-tests/asr_nim/wavs/t2_16.wav"}, + }, + { + "question": "What is in this audio?", + "audio": {"path": "/nemo_run/code/tests/slurm-tests/asr_nim/wavs/t3_16.wav"}, + }, + ] + for item in test_data: + f.write(json.dumps(item) + "\n") + input_file = f.name + + try: + cmd = ( + f"ns generate " + f" --cluster test-local --config_dir {Path(__file__).absolute().parent} " + f" --model {model_path} " + f" --output_dir {output_dir} " + f" --server_type vllm " + f" --server_gpus 1 " + f" --server_nodes 1 " + f" --server_args '--enforce-eager' " + f" --input_file={input_file} " + f" ++prompt_config=generic/default " + f" ++skip_filled=False " + ) + subprocess.run(cmd, shell=True, check=True) + + # Verify output exists and has audio-related generation + with open(f"{output_dir}/output.jsonl") as fin: + lines = fin.readlines() + + assert len(lines) == 2, "Should have 2 output lines" + + for line in lines: + data = json.loads(line) + assert "generation" in data, "Should have generation field" + assert len(data["generation"]) > 0, "Generation should not be empty" + # If model supports audio, generation should contain something + print(f"Generated: {data['generation']}") + + finally: + # Cleanup temp file + Path(input_file).unlink(missing_ok=True) diff --git a/tests/test_vllm_audio.py b/tests/test_vllm_audio.py new file mode 100644 index 0000000000..05cc2645eb --- /dev/null +++ b/tests/test_vllm_audio.py @@ -0,0 +1,101 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +import os +import tempfile +from unittest.mock import patch + +import pytest + +from nemo_skills.inference.model.vllm import VLLMModel, audio_file_to_base64 + + +def test_audio_file_to_base64(): + """Test basic audio file encoding to base64.""" + with tempfile.NamedTemporaryFile(mode="wb", suffix=".wav", delete=False) as f: + test_content = b"RIFF" + b"\x00" * 100 + f.write(test_content) + temp_path = f.name + + try: + result = audio_file_to_base64(temp_path) + assert isinstance(result, str) + assert len(result) > 0 + decoded = base64.b64decode(result) + assert decoded == test_content + finally: + os.unlink(temp_path) + + +@pytest.fixture +def mock_vllm_model(): + """Create a mock VLLMModel for testing audio preprocessing.""" + with patch.object(VLLMModel, "__init__", lambda self, **kwargs: None): + model = VLLMModel() + model.data_dir = "" + return model + + +def test_content_text_to_list_with_audio(mock_vllm_model, tmp_path): + """Test converting string content with audio to list format. + + CRITICAL: Audio must come BEFORE text for Qwen Audio to transcribe correctly. + """ + audio_path = tmp_path / "test.wav" + with open(audio_path, "wb") as f: + f.write(b"RIFF" + b"\x00" * 100) + + # Set data_dir to tmp_path parent so path resolution works + mock_vllm_model.data_dir = str(tmp_path) + + message = {"role": "user", "content": "Describe this audio", "audio": {"path": "test.wav"}} + + result = mock_vllm_model.content_text_to_list(message) + + assert isinstance(result["content"], list) + assert len(result["content"]) == 2 + assert result["content"][0]["type"] == "audio_url" + assert result["content"][0]["audio_url"]["url"].startswith("data:audio/wav;base64,") + assert result["content"][1]["type"] == "text" + + +def test_content_text_to_list_with_multiple_audios(mock_vllm_model, tmp_path): + """Test handling message with multiple audio files. + + CRITICAL: Audio must come BEFORE text for Qwen Audio to transcribe correctly. + """ + audio_paths = [] + for i in range(2): + audio_path = tmp_path / f"test_{i}.wav" + with open(audio_path, "wb") as f: + f.write(b"RIFF" + b"\x00" * 100) + audio_paths.append(f"test_{i}.wav") + + mock_vllm_model.data_dir = str(tmp_path) + + message = { + "role": "user", + "content": "Compare these", + "audios": [{"path": audio_paths[0]}, {"path": audio_paths[1]}], + } + + result = mock_vllm_model.content_text_to_list(message) + + assert isinstance(result["content"], list) + assert len(result["content"]) == 3 + # Audio MUST come before text for Qwen Audio + assert result["content"][0]["type"] == "audio_url" + assert result["content"][1]["type"] == "audio_url" + assert result["content"][2]["type"] == "text"