diff --git a/nemo_skills/dataset/mmau-pro/closed_form/__init__.py b/nemo_skills/dataset/mmau-pro/closed_form/__init__.py index 4a3921f31d..3fe4b84ef1 100644 --- a/nemo_skills/dataset/mmau-pro/closed_form/__init__.py +++ b/nemo_skills/dataset/mmau-pro/closed_form/__init__.py @@ -18,5 +18,5 @@ # NVEmbed judge configuration for closed-form evaluation JUDGE_PIPELINE_ARGS = { - "judge_type": "nvembed", + "judge_step": "nemo_skills.pipeline.judges.nvembed_judge::create_judge_tasks", } diff --git a/nemo_skills/pipeline/eval.py b/nemo_skills/pipeline/eval.py index 44eed3f8a6..9edd529599 100644 --- a/nemo_skills/pipeline/eval.py +++ b/nemo_skills/pipeline/eval.py @@ -41,177 +41,6 @@ class SingleNodeMode(str, enum.Enum): parallel = "parallel" -def _create_comet_judge_tasks( - exp, - expname, - benchmark, - judge_pipeline_args, - rerun_done, - log_dir, - server_parameters, - cluster_config, - judge_server_gpus, - judge_server_nodes, - partition, - run_after, - reuse_code_exp, - reuse_code, - dependent_tasks, - all_tasks, - _task_dependencies, - installation_command, - skip_hf_home_check, - sbatch_kwargs, -): - """Create tasks for Comet judge evaluation.""" - from nemo_skills.pipeline.utils.generation import get_remaining_jobs - - output_dir_path = judge_pipeline_args.get("output_dir") - input_file = judge_pipeline_args.get("input_file") - comet_model_path = judge_pipeline_args.get("judge_model") - - # Determine seeds to check - if input_file is None: - num_seeds = judge_pipeline_args.get("num_random_seeds", 1) - random_seeds = list(range(num_seeds)) - else: - random_seeds = [None] - - remaining_jobs = get_remaining_jobs( - cluster_config=cluster_config, - output_dir=output_dir_path, - random_seeds=random_seeds, - chunk_ids=[None], # No chunking for judge task - rerun_done=rerun_done, - ) - - if not remaining_jobs or all(not chunks for chunks in remaining_jobs.values()): - LOG.info(f"Skipping Comet judge for {benchmark} - all output files and .done markers exist") - return [] - - # Build command to run xCOMET-XXL judge script - script_args = [f"--output-dir {output_dir_path} --comet-model-path {comet_model_path}"] - - if input_file is None: - input_dir = judge_pipeline_args.get("input_dir") - script_args.append(f"--input-dir {input_dir}") - script_args.append(f"--num-seeds {num_seeds}") - else: - script_args.append(f"--input-file {input_file}") - - run_cmd = f"pip install unbabel-comet && python3 -I /nemo_run/code/nemo_skills/evaluation/evaluator/comet.py {' '.join(script_args)}" - - # Create task with GPU support for Comet - judge_task = pipeline_utils.add_task( - exp, - cmd=run_cmd, - task_name=f"{expname}-{benchmark}-comet-judge", - log_dir=log_dir + "/judge", - container=cluster_config["containers"]["vllm"], - cluster_config=cluster_config, - num_gpus=judge_server_gpus or 1, - num_nodes=judge_server_nodes or 1, - partition=partition, - run_after=run_after, - reuse_code_exp=reuse_code_exp, - reuse_code=reuse_code, - task_dependencies=( - dependent_tasks if cluster_config["executor"] == "slurm" else all_tasks + _task_dependencies - ), - installation_command=installation_command, - skip_hf_home_check=skip_hf_home_check, - sbatch_kwargs=sbatch_kwargs, - ) - return [judge_task] - - -def _create_nvembed_judge_tasks( - exp, - expname, - benchmark, - judge_pipeline_args, - rerun_done, - log_dir, - server_parameters, - cluster_config, - judge_server_gpus, - judge_server_nodes, - partition, - run_after, - reuse_code_exp, - reuse_code, - dependent_tasks, - all_tasks, - _task_dependencies, - installation_command, - skip_hf_home_check, - sbatch_kwargs, -): - """Create tasks for NVEmbed judge evaluation.""" - from nemo_skills.pipeline.utils.generation import get_remaining_jobs - - output_dir_path = judge_pipeline_args.get("output_dir") - input_file = judge_pipeline_args.get("input_file") - - # Determine seeds to check - if input_file is None: - num_seeds = judge_pipeline_args.get("num_random_seeds", 1) - random_seeds = list(range(num_seeds)) - else: - random_seeds = [None] - - remaining_jobs = get_remaining_jobs( - cluster_config=cluster_config, - output_dir=output_dir_path, - random_seeds=random_seeds, - chunk_ids=[None], # No chunking for judge task - rerun_done=rerun_done, - ) - - if not remaining_jobs or all(not chunks for chunks in remaining_jobs.values()): - LOG.info(f"Skipping NVEmbed judge for {benchmark} - all output files and .done markers exist") - return [] - - # Build command to run NVEmbed judge script - script_args = [f"--output-dir {output_dir_path}"] - - if input_file is None: - input_dir = judge_pipeline_args.get("input_dir") - script_args.append(f"--input-dir {input_dir}") - script_args.append(f"--num-seeds {num_seeds}") - else: - script_args.append(f"--input-file {input_file}") - - # Add skip-existing flag unless rerun_done is set - if not rerun_done: - script_args.append("--skip-existing") - - run_cmd = f"python3 -I /nemo_run/code/nemo_skills/evaluation/evaluator/nvembed_judge.py {' '.join(script_args)}" - - # Create task with GPU support for NVEmbed - judge_task = pipeline_utils.add_task( - exp, - cmd=run_cmd, - task_name=f"{expname}-{benchmark}-nvembed-judge", - log_dir=log_dir + "/judge", - container=cluster_config["containers"]["vllm"], - cluster_config=cluster_config, - num_gpus=judge_server_gpus or 1, - num_nodes=judge_server_nodes or 1, - partition=partition, - run_after=run_after, - reuse_code_exp=reuse_code_exp, - reuse_code=reuse_code, - task_dependencies=( - dependent_tasks if cluster_config["executor"] == "slurm" else all_tasks + _task_dependencies - ), - installation_command=installation_command, - skip_hf_home_check=skip_hf_home_check, - sbatch_kwargs=sbatch_kwargs, - ) - return [judge_task] - - def _create_llm_judge_tasks( ctx, expname, @@ -325,7 +154,6 @@ def eval( help="Path to the entrypoint of the server. " "If not specified, will use the default entrypoint for the server type.", ), - judge_type: str = typer.Option("llm", help="Type of judge to use: 'llm' (default) or 'nvembed'"), judge_model: str = typer.Option(None, help="Path to the model to be used as a judge (if applicable)"), judge_server_address: str = typer.Option(None, help="Address of the server hosting the judge model"), judge_server_type: pipeline_utils.SupportedServers = typer.Option( @@ -519,7 +347,7 @@ def eval( "generation_type": judge_generation_type, "generation_module": judge_generation_module, } - eval_requires_judge = any(param_value for param_value in cli_judge_pipeline_args.values()) or judge_type != "llm" + eval_requires_judge = any(param_value for param_value in cli_judge_pipeline_args.values()) # Prepare cluster config and mount paths cluster_config = pipeline_utils.get_cluster_config(cluster, config_dir) @@ -643,43 +471,29 @@ def eval( benchmark_args.eval_subfolder = benchmark_args.eval_subfolder[4:] judge_pipeline_args["output_dir"] = str(Path(output_dir) / benchmark_args.eval_subfolder) - # Check for per-benchmark judge_type, fall back to global judge_type - benchmark_judge_type = judge_pipeline_args.pop("judge_type", judge_type) + # judge_step is a :: path to the judge creator function (locate() convention). + # Benchmarks set this directly in JUDGE_PIPELINE_ARGS; falls back to None for LLM judge. + judge_creator_path = judge_pipeline_args.pop("judge_step", None) - # Create judge tasks based on judge type - if benchmark_judge_type == "nvembed": - judge_tasks = _create_nvembed_judge_tasks( - exp=exp, - expname=expname, - benchmark=benchmark, - judge_pipeline_args=judge_pipeline_args, - rerun_done=rerun_done, - log_dir=log_dir, - server_parameters=server_parameters, - cluster_config=cluster_config, - judge_server_gpus=judge_server_gpus, - judge_server_nodes=judge_server_nodes, - partition=partition, - run_after=run_after, - reuse_code_exp=reuse_code_exp, - reuse_code=reuse_code, - dependent_tasks=dependent_tasks, - all_tasks=all_tasks, - _task_dependencies=_task_dependencies, - installation_command=installation_command, - skip_hf_home_check=skip_hf_home_check, - sbatch_kwargs=sbatch_kwargs, - ) - elif benchmark_judge_type == "comet": - judge_pipeline_args["judge_model"] = judge_model - judge_tasks = _create_comet_judge_tasks( + # Pass judge_model through so judge implementations can access it if needed (e.g. comet) + if judge_model: + judge_pipeline_args.setdefault("judge_model", judge_model) + + if judge_creator_path: + # Use locate() to dynamically load judge creator function + from nemo_skills.dataset.utils import locate + + judge_creator_fn = locate(judge_creator_path) + + # Call with standardized parameters + judge_tasks = judge_creator_fn( exp=exp, expname=expname, - benchmark=benchmark, + benchmarks=[benchmark], judge_pipeline_args=judge_pipeline_args, rerun_done=rerun_done, log_dir=log_dir, - server_parameters=server_parameters, + output_dir=output_dir, cluster_config=cluster_config, judge_server_gpus=judge_server_gpus, judge_server_nodes=judge_server_nodes, diff --git a/nemo_skills/pipeline/judges/__init__.py b/nemo_skills/pipeline/judges/__init__.py new file mode 100644 index 0000000000..2a379ddf4f --- /dev/null +++ b/nemo_skills/pipeline/judges/__init__.py @@ -0,0 +1,15 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Judge implementations for evaluation pipeline.""" diff --git a/nemo_skills/pipeline/judges/comet_judge.py b/nemo_skills/pipeline/judges/comet_judge.py new file mode 100644 index 0000000000..da3b6c3acb --- /dev/null +++ b/nemo_skills/pipeline/judges/comet_judge.py @@ -0,0 +1,133 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Comet judge implementation for translation quality evaluation.""" + +import logging + +from nemo_skills.pipeline.utils import add_task +from nemo_skills.pipeline.utils.generation import get_remaining_jobs +from nemo_skills.utils import get_logger_name + +LOG = logging.getLogger(get_logger_name(__file__)) + + +def create_judge_tasks( + exp, + expname, + benchmarks, + judge_pipeline_args, + rerun_done, + log_dir, + output_dir, + cluster_config, + judge_server_gpus, + judge_server_nodes, + partition, + run_after, + reuse_code_exp, + reuse_code, + dependent_tasks, + all_tasks, + _task_dependencies, + installation_command, + skip_hf_home_check, + sbatch_kwargs, +): + """Create tasks for Comet judge evaluation. + + Args: + exp: NeMo-Run experiment object + expname: Name of the experiment + benchmarks: List of benchmarks to evaluate (typically single benchmark) + judge_pipeline_args: Configuration for judge pipeline + rerun_done: Whether to rerun already completed jobs + log_dir: Directory for logs + output_dir: Output directory (unused, kept for interface compatibility) + cluster_config: Cluster configuration dict + judge_server_gpus: Number of GPUs for judge + judge_server_nodes: Number of nodes for judge + partition: SLURM partition + run_after: Dependencies to run after + reuse_code_exp: Experiment to reuse code from + reuse_code: Whether to reuse code + dependent_tasks: List of dependent tasks + all_tasks: List of all tasks + _task_dependencies: Additional task dependencies + installation_command: Installation command + skip_hf_home_check: Whether to skip HF_HOME check + sbatch_kwargs: Additional sbatch kwargs + + Returns: + List of judge tasks created + """ + benchmark = benchmarks[0] # Comet judge works on single benchmark + + output_dir_path = judge_pipeline_args.get("output_dir") + input_file = judge_pipeline_args.get("input_file") + comet_model_path = judge_pipeline_args.get("judge_model") + + # Determine seeds to check + if input_file is None: + num_seeds = judge_pipeline_args.get("num_random_seeds", 1) + random_seeds = list(range(num_seeds)) + else: + random_seeds = [None] + + remaining_jobs = get_remaining_jobs( + cluster_config=cluster_config, + output_dir=output_dir_path, + random_seeds=random_seeds, + chunk_ids=[None], # No chunking for judge task + rerun_done=rerun_done, + ) + + if not remaining_jobs or all(not chunks for chunks in remaining_jobs.values()): + LOG.info(f"Skipping Comet judge for {benchmark} - all output files and .done markers exist") + return [] + + # Build command to run xCOMET-XXL judge script + script_args = [f"--output-dir {output_dir_path} --comet-model-path {comet_model_path}"] + + if input_file is None: + input_dir = judge_pipeline_args.get("input_dir") + script_args.append(f"--input-dir {input_dir}") + script_args.append(f"--num-seeds {num_seeds}") + else: + script_args.append(f"--input-file {input_file}") + + run_cmd = f"pip install unbabel-comet && python3 -I /nemo_run/code/nemo_skills/evaluation/evaluator/comet.py {' '.join(script_args)}" + + # Create task with GPU support for Comet + judge_task = add_task( + exp, + cmd=run_cmd, + task_name=f"{expname}-{benchmark}-comet-judge", + log_dir=log_dir + "/judge", + container=cluster_config["containers"]["vllm"], + cluster_config=cluster_config, + num_gpus=judge_server_gpus or 1, + num_nodes=judge_server_nodes or 1, + partition=partition, + run_after=run_after, + reuse_code_exp=reuse_code_exp, + reuse_code=reuse_code, + task_dependencies=( + dependent_tasks if cluster_config["executor"] == "slurm" else all_tasks + _task_dependencies + ), + installation_command=installation_command, + skip_hf_home_check=skip_hf_home_check, + sbatch_kwargs=sbatch_kwargs, + ) + return [judge_task] diff --git a/nemo_skills/pipeline/judges/nvembed_judge.py b/nemo_skills/pipeline/judges/nvembed_judge.py new file mode 100644 index 0000000000..f4010e4d9e --- /dev/null +++ b/nemo_skills/pipeline/judges/nvembed_judge.py @@ -0,0 +1,136 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""NVEmbed judge implementation for embedding-based similarity matching.""" + +import logging + +from nemo_skills.pipeline.utils import add_task +from nemo_skills.pipeline.utils.generation import get_remaining_jobs +from nemo_skills.utils import get_logger_name + +LOG = logging.getLogger(get_logger_name(__file__)) + + +def create_judge_tasks( + exp, + expname, + benchmarks, + judge_pipeline_args, + rerun_done, + log_dir, + output_dir, + cluster_config, + judge_server_gpus, + judge_server_nodes, + partition, + run_after, + reuse_code_exp, + reuse_code, + dependent_tasks, + all_tasks, + _task_dependencies, + installation_command, + skip_hf_home_check, + sbatch_kwargs, +): + """Create tasks for NVEmbed judge evaluation. + + Args: + exp: NeMo-Run experiment object + expname: Name of the experiment + benchmarks: List of benchmarks to evaluate (typically single benchmark) + judge_pipeline_args: Configuration for judge pipeline + rerun_done: Whether to rerun already completed jobs + log_dir: Directory for logs + output_dir: Output directory (unused, kept for interface compatibility) + cluster_config: Cluster configuration dict + judge_server_gpus: Number of GPUs for judge + judge_server_nodes: Number of nodes for judge + partition: SLURM partition + run_after: Dependencies to run after + reuse_code_exp: Experiment to reuse code from + reuse_code: Whether to reuse code + dependent_tasks: List of dependent tasks + all_tasks: List of all tasks + _task_dependencies: Additional task dependencies + installation_command: Installation command + skip_hf_home_check: Whether to skip HF_HOME check + sbatch_kwargs: Additional sbatch kwargs + + Returns: + List of judge tasks created + """ + benchmark = benchmarks[0] # NVEmbed judge works on single benchmark + + output_dir_path = judge_pipeline_args.get("output_dir") + input_file = judge_pipeline_args.get("input_file") + + # Determine seeds to check + if input_file is None: + num_seeds = judge_pipeline_args.get("num_random_seeds", 1) + random_seeds = list(range(num_seeds)) + else: + random_seeds = [None] + + remaining_jobs = get_remaining_jobs( + cluster_config=cluster_config, + output_dir=output_dir_path, + random_seeds=random_seeds, + chunk_ids=[None], # No chunking for judge task + rerun_done=rerun_done, + ) + + if not remaining_jobs or all(not chunks for chunks in remaining_jobs.values()): + LOG.info(f"Skipping NVEmbed judge for {benchmark} - all output files and .done markers exist") + return [] + + # Build command to run NVEmbed judge script + script_args = [f"--output-dir {output_dir_path}"] + + if input_file is None: + input_dir = judge_pipeline_args.get("input_dir") + script_args.append(f"--input-dir {input_dir}") + script_args.append(f"--num-seeds {num_seeds}") + else: + script_args.append(f"--input-file {input_file}") + + # Add skip-existing flag unless rerun_done is set + if not rerun_done: + script_args.append("--skip-existing") + + run_cmd = f"python3 -I /nemo_run/code/nemo_skills/evaluation/evaluator/nvembed_judge.py {' '.join(script_args)}" + + # Create task with GPU support for NVEmbed + judge_task = add_task( + exp, + cmd=run_cmd, + task_name=f"{expname}-{benchmark}-nvembed-judge", + log_dir=log_dir + "/judge", + container=cluster_config["containers"]["vllm"], + cluster_config=cluster_config, + num_gpus=judge_server_gpus or 1, + num_nodes=judge_server_nodes or 1, + partition=partition, + run_after=run_after, + reuse_code_exp=reuse_code_exp, + reuse_code=reuse_code, + task_dependencies=( + dependent_tasks if cluster_config["executor"] == "slurm" else all_tasks + _task_dependencies + ), + installation_command=installation_command, + skip_hf_home_check=skip_hf_home_check, + sbatch_kwargs=sbatch_kwargs, + ) + return [judge_task]