diff --git a/dockerfiles/sandbox/start-with-nginx.sh b/dockerfiles/sandbox/start-with-nginx.sh index 3333343e82..a4f8a77a92 100644 --- a/dockerfiles/sandbox/start-with-nginx.sh +++ b/dockerfiles/sandbox/start-with-nginx.sh @@ -12,12 +12,21 @@ echo "Workers: $NUM_WORKERS, Nginx port: $NGINX_PORT" # Override nginx config for multi-worker mode (single mode uses original config) echo "Configuring nginx for multi-worker load balancing..." -# Force session affinity settings: 1 process per worker with minimal cheaper -UWSGI_PROCESSES=1 -UWSGI_CHEAPER=1 -export UWSGI_PROCESSES -export UWSGI_CHEAPER -echo "Forced UWSGI settings for session affinity: PROCESSES=$UWSGI_PROCESSES, CHEAPER=$UWSGI_CHEAPER" + +# Allow callers to opt-out of single-process state-preserving mode where each worker is given one process +: "${STATEFUL_SANDBOX:=1}" +if [ "$STATEFUL_SANDBOX" -eq 1 ]; then + UWSGI_PROCESSES=1 + UWSGI_CHEAPER=1 +else + # In stateless mode, honour caller-supplied values + : "${UWSGI_PROCESSES:=1}" + : "${UWSGI_CHEAPER:=1}" +fi + +export UWSGI_PROCESSES UWSGI_CHEAPER + +echo "UWSGI settings: PROCESSES=$UWSGI_PROCESSES, CHEAPER=$UWSGI_CHEAPER" # Validate and fix uwsgi configuration if [ -z "$UWSGI_PROCESSES" ]; then diff --git a/nemo_skills/code_execution/local_sandbox/start_local_sandbox.sh b/nemo_skills/code_execution/local_sandbox/start_local_sandbox.sh index bebc6aa077..db40d3c5ce 100755 --- a/nemo_skills/code_execution/local_sandbox/start_local_sandbox.sh +++ b/nemo_skills/code_execution/local_sandbox/start_local_sandbox.sh @@ -22,5 +22,7 @@ docker build --tag=${SANDBOX_NAME} --build-arg="NUM_WORKERS=$((`nproc --all`))" echo "Multi-worker mode: Starting $((`nproc --all`)) workers with session affinity" docker run --network=host \ --memory=${NEMO_SKILLS_SANDBOX_MEM_LIMIT:-"16g"} \ + ${UWSGI_CPU_AFFINITY:+-e UWSGI_CPU_AFFINITY=${UWSGI_CPU_AFFINITY}} \ + ${UWSGI_PROCESSES:+-e UWSGI_PROCESSES=${UWSGI_PROCESSES}} \ --restart unless-stopped \ --name=local-sandbox ${SANDBOX_NAME} diff --git a/nemo_skills/dataset/ioi24/__init__.py b/nemo_skills/dataset/ioi24/__init__.py index a6f7f91e89..f3121341a7 100644 --- a/nemo_skills/dataset/ioi24/__init__.py +++ b/nemo_skills/dataset/ioi24/__init__.py @@ -17,3 +17,12 @@ DATASET_GROUP = "code" METRICS_TYPE = "ioi" EVAL_ARGS = "++eval_type=ioi" + +# environment variables required by this benchmark +SANDBOX_ENV_VARS = [ + "UWSGI_PROCESSES=1024", + "UWSGI_CPU_AFFINITY=8", + "UWSGI_CHEAPER=1023", + "NUM_WORKERS=1", + "STATEFUL_SANDBOX=0", +] diff --git a/nemo_skills/dataset/ioi24/prepare.py b/nemo_skills/dataset/ioi24/prepare.py index 8e5df75b81..656e480b60 100644 --- a/nemo_skills/dataset/ioi24/prepare.py +++ b/nemo_skills/dataset/ioi24/prepare.py @@ -42,14 +42,11 @@ entries.append( { "id": x, - "run": run_code, - "compile": compile_code, "name": item["name"], "ioi_id": item["id"], "subtask": item["subtask"], "question": item["problem"], - "score": item["score"], - "grader_files": item["grader_files"], + "subtask_score": item["score"], } ) @@ -78,8 +75,11 @@ tests[test_name] = test_cases[problem_id][test_name] final_structure[problem_id][subtask] = { "tests": tests, - "score": entry["score"], + "subtask_score": entry["score"], "score_precision": entry["score_precision"], + "run": run_code, + "compile": compile_code, + "grader_files": entry["grader_files"], } with open(os.path.join(data_dir, f"{args.split}_metadata.json"), "w") as f: diff --git a/nemo_skills/dataset/ioi25/__init__.py b/nemo_skills/dataset/ioi25/__init__.py new file mode 100644 index 0000000000..b4311554a9 --- /dev/null +++ b/nemo_skills/dataset/ioi25/__init__.py @@ -0,0 +1,32 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +todo: We are working on providing the data files that are necessary to run IOI25 evaluation. +""" + +# settings that define how evaluation should be done by default (all can be changed from cmdline) +GENERATION_ARGS = "++prompt_config=generic/default" +DATASET_GROUP = "code" +METRICS_TYPE = "ioi" +EVAL_ARGS = "++eval_type=ioi" + +# environment variables required by this benchmark +SANDBOX_ENV_VARS = [ + "UWSGI_PROCESSES=1024", + "UWSGI_CPU_AFFINITY=8", + "UWSGI_CHEAPER=1023", + "NUM_WORKERS=1", + "STATEFUL_SANDBOX=0", +] diff --git a/nemo_skills/evaluation/evaluator/__init__.py b/nemo_skills/evaluation/evaluator/__init__.py index 2471a937bb..d8d6656dd4 100644 --- a/nemo_skills/evaluation/evaluator/__init__.py +++ b/nemo_skills/evaluation/evaluator/__init__.py @@ -26,7 +26,7 @@ ) from nemo_skills.evaluation.evaluator.ifbench import eval_ifbench from nemo_skills.evaluation.evaluator.ifeval import eval_if -from nemo_skills.evaluation.evaluator.ioi import eval_ioi +from nemo_skills.evaluation.evaluator.ioi import IOIEvaluator from nemo_skills.evaluation.evaluator.livecodebench import eval_livecodebench from nemo_skills.evaluation.evaluator.math import ( Lean4ProofEvaluator, @@ -58,7 +58,6 @@ def dummy_eval(cfg): "livecodebench_pro": eval_livecodebench_pro, "scicode": eval_scicode, "mrcr": eval_mrcr, - "ioi": eval_ioi, "bigcodebench": eval_bigcodebench, "ojbench": eval_ojbench, "human_eval_infilling": eval_human_eval_infilling, @@ -70,6 +69,7 @@ def dummy_eval(cfg): "lean4-proof": Lean4ProofEvaluator, "lean4-statement": Lean4StatementEvaluator, # Other evaluators can be added here as they're converted to classes + "ioi": IOIEvaluator, } # Validation: Ensure no overlap between class and function maps diff --git a/nemo_skills/evaluation/evaluator/ioi.py b/nemo_skills/evaluation/evaluator/ioi.py index d46c62116b..c883b96e0b 100644 --- a/nemo_skills/evaluation/evaluator/ioi.py +++ b/nemo_skills/evaluation/evaluator/ioi.py @@ -16,107 +16,150 @@ import multiprocessing import os import re +import threading +import time +from typing import Dict from nemo_skills.code_execution.sandbox import LocalSandbox +from nemo_skills.evaluation.evaluator.base import BaseEvaluator from nemo_skills.file_utils import jdump from nemo_skills.utils import nested_dataclass, unroll_files @nested_dataclass(kw_only=True) class IOIEvaluatorConfig: - # Directory where metadata files are located. - test_dir: str = "" + test_file: str = "test_metadata.json" + num_workers: int = 16 # number of test workers + test_batch_size: int = 16 # number of tests to run concurrently + overwrite: bool = False - # Metadata file name or absolute path (default: {split}_metadata.json). - test_file: str = "{split}_metadata.json" - num_workers: int = 4 # number of test workers - test_batch_size: int = 5 # number of tests to run concurrently +_precompile_loop_tls = threading.local() +worker_sandbox = None # type: ignore +worker_loop = asyncio.new_event_loop() +asyncio.set_event_loop(worker_loop) -def init_worker(sandbox_arg): - global worker_sandbox - worker_sandbox = sandbox_arg - global worker_loop - worker_loop = asyncio.new_event_loop() - asyncio.set_event_loop(worker_loop) +def _sandbox_exec_sync(sandbox: LocalSandbox, cmd: str, *, language: str = "shell", timeout: int = 120): + """Run sandbox.execute_code synchronously with a persistent event loop. + Re-creating and immediately closing a loop for every call can leave background + tasks (e.g., httpx/anyio socket reads) unfinished, causing "Event loop is + closed" errors. We therefore maintain a single loop for all such + pre-compile operations. + """ + loop = getattr(_precompile_loop_tls, "loop", None) + if loop is None or loop.is_closed(): + loop = asyncio.new_event_loop() + _precompile_loop_tls.loop = loop -def run_test_case(task_args: dict, worker_id: int) -> dict: - global worker_sandbox + # Use the loop within this thread exclusively. + return loop.run_until_complete(sandbox.execute_code(cmd, language=language, timeout=timeout))[0] - unique_dir = f"/tmp/ioi_run_{worker_id}_{os.getpid()}" - try: - # 1. Create all necessary files in one batch command - grader_files = task_args.get("grader_files", []) - file_creation_commands = [f"mkdir -p {unique_dir}/graders"] +def wait_for_sandbox(sandbox, timeout: int = 240, poll: float = 1.0): + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + resp = _sandbox_exec_sync(sandbox, "echo hello world", language="shell", timeout=10) + if resp.get("stdout", "").strip() == "hello world": + return + except Exception: + pass + time.sleep(poll) + raise RuntimeError(f"Sandbox not ready after waiting {timeout}s") - file_creation_commands.append( - f""" -cat <<'_EOT_' > {unique_dir}/graders/{task_args["problem_id"]}.cpp -{task_args["generated_code"]} -_EOT_ -""" - ) - for filepath, content in grader_files: - dir_name = os.path.dirname(filepath) - if dir_name: - file_creation_commands.append(f"mkdir -p {unique_dir}/{dir_name}") - file_creation_commands.append( - f""" -cat <<'_EOT_' > {unique_dir}/{filepath} -{content} -_EOT_ -""" - ) +def init_worker(): + """Per-process initializer: set up an event loop for httpx/asyncio calls.""" + global worker_sandbox, worker_loop + worker_sandbox = None # lazily initialised when first used + worker_loop = asyncio.new_event_loop() + asyncio.set_event_loop(worker_loop) - file_creation_commands.append( - f""" -cat <<'_EOT_' > {unique_dir}/compile.sh -{task_args["compile_code"]} -_EOT_ -chmod +x {unique_dir}/compile.sh -""" - ) - file_creation_commands.append( - f""" -cat <<'_EOT_' > {unique_dir}/run.sh -{task_args["run_code"]} -_EOT_ -chmod +x {unique_dir}/run.sh -""" - ) +def _precompile_grader( + problem_name: str, grader_files, compile_code: str, run_code: str, sandbox: LocalSandbox +) -> str: + """Precompile checker/grader for a problem once and return the directory path.""" + # Ensure sandbox belongs to this thread; if not, create a local one. + if getattr(sandbox, "_owner_tid", None) != threading.get_ident(): + sandbox = LocalSandbox() + wait_for_sandbox(sandbox) + sandbox._owner_tid = threading.get_ident() + + pre_dir = f"/tmp/ioi_pre_{problem_name}_{os.getpid()}" + # Build shell script to create files and invoke compile.sh. + creation_cmds = [ + f"mkdir -p {pre_dir}/graders", + ] + # Dump grader related files + for filepath, content in grader_files: + dir_name = os.path.dirname(filepath) + if dir_name: + creation_cmds.append(f"mkdir -p {pre_dir}/{dir_name}") + creation_cmds.append(f"cat <<'_EOT_' > {pre_dir}/{filepath}\n{content}\n_EOT_\n") + + # Write compile.sh and run.sh as provided (needed later in workers) + creation_cmds.append( + f"cat <<'_EOT_' > {pre_dir}/compile.sh\n{compile_code}\n_EOT_\nchmod +x {pre_dir}/compile.sh\n" + ) + creation_cmds.append(f"cat <<'_EOT_' > {pre_dir}/run.sh\n{run_code}\n_EOT_\nchmod +x {pre_dir}/run.sh\n") + + setup_script = "\n".join(creation_cmds) + # 1. create files + _sandbox_exec_sync(sandbox, setup_script, language="shell", timeout=120) + + # 2. run compile.sh but ignore final failure when problem cpp missing + _sandbox_exec_sync(sandbox, f"cd {pre_dir} && ./compile.sh || true", language="shell", timeout=120) + + return pre_dir - file_creation_commands.append( - f""" -cat <<'_EOT_' > {unique_dir}/input.txt -{task_args["test_input"]} -_EOT_ -""" - ) +def run_test_case(task_args: dict, worker_id: int) -> dict: + # Use high-resolution timestamp to guarantee uniqueness across parallel calls. + unique_dir = f"/tmp/ioi_run_{worker_id}_{os.getpid()}_{time.time_ns()}" + + try: + # 1. Create all necessary files in one batch command + precompiled_dir = task_args.get("precompiled_dir") + # Step 1: prepare the working directory and copy shared pre-compiled artifacts first + file_creation_commands = [ + # Create the unique run directory itself + f"mkdir -p {unique_dir}", + # Ensure `graders/` directory exists + f"mkdir -p {unique_dir}/graders", + f"cp -r {precompiled_dir}/* {unique_dir}/", + # Next write the contestant's generated solution into the graders folder so it is not overwritten + f"cat <<'_EOT_' > {unique_dir}/graders/{task_args['problem_id']}.cpp\n{task_args['generated_code']}\n_EOT_\n", + ] + + # Prepare input and expected output files + file_creation_commands.append(f"cat <<'_EOT_' > {unique_dir}/input.txt\n{task_args['test_input']}\n_EOT_\n") file_creation_commands.append( - f""" -cat <<'_EOT_' > {unique_dir}/correct_output.txt -{task_args["test_output"]} -_EOT_ -""" + f"cat <<'_EOT_' > {unique_dir}/correct_output.txt\n{task_args['test_output']}\n_EOT_\n" ) setup_script = "\n".join(file_creation_commands) + sandbox = LocalSandbox() setup_result, _ = worker_loop.run_until_complete( - worker_sandbox.execute_code(setup_script, language="shell", timeout=120) + sandbox.execute_code(setup_script, language="shell", timeout=120) ) if setup_result.get("stderr"): raise Exception(f"File setup failed: {setup_result['stderr']}") - # 2. Compile the code - compile_command = f"cd {unique_dir} && ./compile.sh" + # 2. Compile only the problem solution (skip checker/grader recompilation) + # Compile the solution together with optional grader/stub sources without + # recompiling the checker/manager again. + compile_command = ( + f"cd {unique_dir} && " + f'SRC="graders/{task_args["problem_id"]}.cpp"; ' + f'[ -e graders/grader.cpp ] && SRC="$SRC graders/grader.cpp"; ' + f'[ -e graders/stub.cpp ] && SRC="$SRC graders/stub.cpp"; ' + f"g++ -DEVAL -std=gnu++17 -O2 -pipe -s -o graders/{task_args['problem_id']} $SRC" + ) compile_result, _ = worker_loop.run_until_complete( - worker_sandbox.execute_code(compile_command, language="shell", timeout=120) + sandbox.execute_code(compile_command, language="shell", timeout=120) ) result = { @@ -125,6 +168,7 @@ def run_test_case(task_args: dict, worker_id: int) -> dict: "compile_stderr": compile_result.get("stderr", ""), "run_stdout": "", "run_stderr": "", + "error": "", "score": 0.0, } @@ -134,7 +178,7 @@ def run_test_case(task_args: dict, worker_id: int) -> dict: # 3. Run the code run_command = f"cd {unique_dir} && ./run.sh" run_result, _ = worker_loop.run_until_complete( - worker_sandbox.execute_code(run_command, language="shell", timeout=120) + sandbox.execute_code(run_command, language="shell", timeout=120) ) run_stdout = run_result.get("stdout", "") @@ -158,10 +202,11 @@ def run_test_case(task_args: dict, worker_id: int) -> dict: return {"score": 0.0, "output": "", "error": str(e)} finally: + # 4. Clean up the directory + # Fire and forget; ignore return values try: - worker_loop.run_until_complete( - worker_sandbox.execute_code(f"rm -rf {unique_dir}", language="shell", timeout=120) - ) + sandbox = LocalSandbox() + worker_loop.run_until_complete(sandbox.execute_code(f"rm -rf {unique_dir}", language="shell", timeout=120)) except Exception: pass @@ -199,116 +244,153 @@ def add_includes(code: str, problem_id: str) -> str: return code_header + code + ("\n" + dummy if dummy else "") -def eval_ioi(cfg): - eval_config = IOIEvaluatorConfig(_init_nested=True, **cfg.eval_config) - sandbox = LocalSandbox() - batch_size = eval_config.test_batch_size - - # Resolve metadata path. - if not (os.path.isabs(eval_config.test_file) and os.path.exists(eval_config.test_file)): - fname = eval_config.test_file.format(split=cfg.split) - search_dir = None - if eval_config.test_dir: - search_dir = eval_config.test_dir - if cfg.data_dir: - search_dir = os.path.join(cfg.data_dir, "ioi24") - if search_dir is None: - raise ValueError("Either data_dir or eval_config.test_dir must be specified.") - eval_config.test_file = os.path.join(search_dir, fname) - - if not os.path.exists(eval_config.test_file): - raise ValueError( - f"Could not find tests file at {eval_config.test_file}. " - "Provide an absolute path as eval_config.test_file, eval_config.test_dir, or data_dir." - ) +class IOIEvaluator(BaseEvaluator): + def __init__(self, config: dict, num_parallel_requests: int = 10): + super().__init__(config, num_parallel_requests) + self.eval_cfg = IOIEvaluatorConfig(_init_nested=True, **config) + + # Heavy runtime resources are lazily initialized within _evaluate_entry. + self.sandbox = None # type: ignore + self.metadata = None # type: ignore + self.precompiled_cache: Dict[str, str] = {} + self.pool = None # type: ignore + + async def _initialize_runtime(self): + """Asynchronously create sandbox and related runtime state on first use.""" + if self.sandbox is not None: + return # Already initialized + + # Run blocking setup in a background thread to avoid nested event‐loop issues. + def _setup(): + sbox = LocalSandbox() + wait_for_sandbox(sbox) + # Remember the thread id that owns this sandbox instance. + sbox._owner_tid = threading.get_ident() + + if not os.path.exists(self.eval_cfg.test_file): + raise FileNotFoundError( + f"Metadata file {self.eval_cfg.test_file} does not exist." + " This file is generated when preparing the IOI dataset, and found in the dataset directory. " + " Please provide a valid parameter for ++eval_config.test_file=x when running IOI Evaluation." + ) + with open(self.eval_cfg.test_file, "r") as f: + metadata_local = json.load(f) + pool_local = multiprocessing.Pool( + processes=self.eval_cfg.test_batch_size, + initializer=init_worker, + ) - with open(eval_config.test_file) as f: - metadata = json.load(f) + return sbox, metadata_local, pool_local + + self.sandbox, self.metadata, self.pool = await asyncio.to_thread(_setup) + + # Internal helper + async def _evaluate_entry(self, entry: dict) -> dict: + # Ensure runtime (sandbox, metadata, pool, etc.) is ready for evaluation. + await self._initialize_runtime() + completion = add_includes(extract_final_cpp_block(entry["generation"]), entry["ioi_id"]) + + pid = entry["ioi_id"] + + # Retrieve helper scripts and grader resources from metadata instead of the dataset entry. + problem_metadata = self.metadata[entry["name"]] + subtask_meta = problem_metadata[entry["subtask"]] + compile_code = subtask_meta["compile"] + run_code = subtask_meta["run"] + grader_files = subtask_meta["grader_files"] + + if pid not in self.precompiled_cache: + self.precompiled_cache[pid] = await asyncio.to_thread( + _precompile_grader, + pid, + grader_files, + compile_code, + run_code, + self.sandbox, + ) + pre_dir = self.precompiled_cache[pid] + + subtask_state = { + st: { + "score": data["subtask_score"], + "precision": data["score_precision"], + "outputs": [], + "scores": [], + "passed": True, + } + for st, data in problem_metadata.items() + } - pool = multiprocessing.Pool(processes=batch_size, initializer=init_worker, initargs=(sandbox,)) + all_tests = [(st, tname, t) for st, data in problem_metadata.items() for tname, t in data["tests"].items()] + + batch_size = self.eval_cfg.test_batch_size + + for i in range(0, len(all_tests), batch_size): + batch = [t for t in all_tests[i : i + batch_size] if subtask_state[t[0]]["passed"]] + if not batch: + continue + + tasks = [] + for _, _, test_data in batch: + tasks.append( + { + "generated_code": completion, + "problem_id": pid, + "precompiled_dir": pre_dir, + "test_input": test_data["input"], + "test_output": test_data["output"], + } + ) + + # map with unique worker id argument + results = await asyncio.to_thread( + self.pool.starmap, run_test_case, [(ta, idx) for idx, ta in enumerate(tasks)] + ) - for jsonl_file in unroll_files(cfg.input_files): - samples = [] - with open(jsonl_file) as f: - for line in f: - sample = json.loads(line) - samples.append(sample) + for (subtask, test_name, _), result in zip(batch, results): + st = subtask_state[subtask] + result["test_name"] = test_name + st["outputs"].append(result) + st["scores"].append(float(result.get("score", 0))) + if float(result.get("score", 0)) == 0.0: + st["passed"] = False + + # Debug prints similar to original implementation + if not result.get("compile_success", True): + print( + f"Compile failed for problem '{entry['name']}', test '{test_name}':\n" + f"--- STDOUT ---\n{result.get('compile_stdout', '').strip()}\n" + f"--- STDERR ---\n{result.get('compile_stderr', '').strip()}\n" + ) + + test_case_results = {} + for st, data in subtask_state.items(): + score = round(min(data["scores"]) * data["score"], data["precision"]) if data["scores"] else 0.0 + test_case_results[st] = {"score": score, "outputs": data["outputs"]} + + return { + "name": entry["name"], + "subtask": entry["subtask"], + "test_case_results": test_case_results, + } - if len(samples) == 0: - raise ValueError( - f"No samples found in the file {jsonl_file}.\n" - f"Make sure the file contains jsonl data with 'codes' key which is a list containing " - f"individual code samples." - ) + async def eval_full(self, input_files): # type: ignore[override] + for jsonl_file in unroll_files(input_files): + with open(jsonl_file, "r", encoding="utf-8") as f: + all_samples = [json.loads(line) for line in f] - outputs = [] - for x, entry in enumerate(samples): - print(f"Evaluating {x}/{len(samples)}") - completion = extract_final_cpp_block(entry["generation"]) - completion = add_includes(completion, entry["ioi_id"]) - - test_case_results = {} - problem_name = entry["name"] - problem_metadata = metadata[problem_name] - for subtask, subtask_data in problem_metadata.items(): - tests = subtask_data["tests"] - subtask_score = subtask_data["score"] - subtask_score_precision = subtask_data["score_precision"] - subtask_passed = True - subtask_outputs = [] - test_items = list(tests.items()) - - scores = [] - for i in range(0, len(test_items), batch_size): - batch = test_items[i : i + batch_size] - tasks = [] - for local_idx, (test_name, test_data) in enumerate(batch): - task_args = { - "generated_code": completion, - "problem_id": entry["ioi_id"], - "grader_files": entry["grader_files"], - "run_code": entry["run"], - "compile_code": entry["compile"], - "test_input": test_data["input"], - "test_output": test_data["output"], - } - tasks.append((task_args, local_idx)) - results = pool.starmap(run_test_case, tasks) - - for (test_name, _), result in zip(batch, results): - result_with_name = dict(result) - result_with_name["test_name"] = test_name - subtask_outputs.append(result_with_name) - scores.append(float(result["score"])) - if float(result["score"]) == 0.0: - # break early as we failed this test case. - subtask_passed = False - break - if not subtask_passed: - break - - effective_score = round(min([score for score in scores]) * subtask_score, subtask_score_precision) - test_case_results[subtask] = {"score": effective_score, "outputs": subtask_outputs} - - outputs.append( - { - "name": entry["name"], - "subtask": entry["subtask"], - "test_case_results": test_case_results, - } - ) + tasks = [self._evaluate_entry(s) for s in all_samples] + outputs = await asyncio.gather(*tasks) + + for s, o in zip(all_samples, outputs): + s["test_case_results"] = o["test_case_results"] + s["eval_status"] = o["eval_status"] - for s, o in zip(samples, outputs): - s["test_case_results"] = o["test_case_results"] - jdump(samples, jsonl_file, mode="wt") + jdump(all_samples, jsonl_file, mode="wt") - total_passed = 0 - total_problems = len(outputs) - for o in outputs: - for subtask_result in o["test_case_results"].values(): - if subtask_result["score"] > 0: - total_passed += 1 - print(f"Subtasks passed: {total_passed} out of {total_problems * len(metadata[o['name']])}") + if self.pool is not None: + self.pool.close() + self.pool.join() - pool.close() - pool.join() + async def eval_single(self, data_point: dict): + return await self._evaluate_entry(data_point) diff --git a/nemo_skills/evaluation/metrics/ioi_metrics.py b/nemo_skills/evaluation/metrics/ioi_metrics.py index 745ab0eca9..a2028f6a6d 100644 --- a/nemo_skills/evaluation/metrics/ioi_metrics.py +++ b/nemo_skills/evaluation/metrics/ioi_metrics.py @@ -38,13 +38,11 @@ def get_problem_score(self, submissions) -> float: """ if not submissions: return 0.0 - subtasks = list(submissions[0]["test_case_results"].keys()) - subtask_scores = {subtask: 0 for subtask in subtasks} + subtask_scores = {} for submission in submissions: - test_case_results = submission["test_case_results"] - for subtask, result in test_case_results.items(): - subtask_scores[subtask] = max(subtask_scores[subtask], result["score"]) + for subtask, result in submission["test_case_results"].items(): + subtask_scores[subtask] = max(subtask_scores.get(subtask, 0), result["score"]) return sum(subtask_scores.values()), subtask_scores def simulate_round_robin_score(self, submissions) -> float: @@ -71,27 +69,21 @@ def simulate_round_robin_score(self, submissions) -> float: selected = sorted_submissions[:50] # for each subtask, take the maximum score among the selected submissions - subtasks = list(submissions[0]["test_case_results"].keys()) - subtask_scores = {subtask: 0 for subtask in subtasks} + subtask_scores = {} for submission in selected: for subtask, result in submission["test_case_results"].items(): - subtask_scores[subtask] = max(subtask_scores[subtask], result["score"]) + subtask_scores[subtask] = max(subtask_scores.get(subtask, 0), result["score"]) return sum(subtask_scores.values()) def get_metrics(self): - """ - Computes two metrics across all problems: - 1. total_score: For each problem, compute the score by summing, for each subtask, - the maximum score over all submissions. - 2. round_robin_score: For each problem, compute the round robin score as described - in simulate_round_robin_score (which limits the submissions to 50 per problem). - Returns a dict with both metrics. - """ total_score = total_round_robin = 0.0 - for _, submissions in self.predictions_by_problem.items(): - score, _ = self.get_problem_score(submissions) + self.problem_scores = {} + for name, submissions in self.predictions_by_problem.items(): + score, subtasks = self.get_problem_score(submissions) + self.problem_scores[name] = (score, subtasks) total_score += score total_round_robin += self.simulate_round_robin_score(submissions) + self.print_problem_scores() metrics_dict = super().get_metrics() for m in metrics_dict.values(): m["total_score"], m["round_robin_score"] = str(total_score), str(total_round_robin) @@ -100,3 +92,16 @@ def get_metrics(self): def reset(self): super().reset() self.predictions_by_problem = defaultdict(list) + self.problem_scores = {} + + def print_problem_scores(self): + print("---------------------------------Problem and subtask scores---------------------------------") + for name, (achieved_total, achieved_subtasks) in self.problem_scores.items(): + submissions = self.predictions_by_problem[name] + max_subtasks = {} + for sub in submissions: + max_subtasks[sub["subtask"]] = sub["subtask_score"] + max_total = sum(max_subtasks.values()) + print(f"# {name}: {achieved_total}/{max_total}") + for subtask, achieved in achieved_subtasks.items(): + print(f" {subtask}: {achieved}/{max_subtasks[subtask]}") diff --git a/nemo_skills/pipeline/eval.py b/nemo_skills/pipeline/eval.py index 607ca28f06..6b78c1e175 100644 --- a/nemo_skills/pipeline/eval.py +++ b/nemo_skills/pipeline/eval.py @@ -342,6 +342,7 @@ def eval( job_server_config, job_server_address, job_server_command, + job_sandbox_env_overrides, ) = job_args prev_tasks = _task_dependencies @@ -361,6 +362,7 @@ def eval( with_sandbox=job_needs_sandbox or with_sandbox, keep_mounts_for_sandbox=job_needs_sandbox_to_keep_mounts or keep_mounts_for_sandbox, sandbox_port=None if get_random_port else 6000, + sandbox_env_overrides=job_sandbox_env_overrides, run_after=run_after, reuse_code_exp=reuse_code_exp, reuse_code=reuse_code, diff --git a/nemo_skills/pipeline/utils/eval.py b/nemo_skills/pipeline/utils/eval.py index 071be72505..1b387cc10a 100644 --- a/nemo_skills/pipeline/utils/eval.py +++ b/nemo_skills/pipeline/utils/eval.py @@ -20,6 +20,7 @@ from copy import deepcopy from dataclasses import dataclass, field from pathlib import Path +from typing import Dict import nemo_skills.pipeline.utils as pipeline_utils from nemo_skills.dataset.utils import get_dataset_module, import_from_path @@ -102,6 +103,8 @@ class BenchmarkArgs: score_module: str | None = None job_ids: list[int] = field(default_factory=list) remaining_jobs: list[dict] = field(default_factory=list) + # Per-benchmark sandbox environment overrides in KEY=VALUE form + sandbox_env_overrides: list[str] = field(default_factory=list) @property def requires_judge(self): @@ -185,6 +188,10 @@ def get_benchmark_args_from_module( benchmark_module, "KEEP_MOUNTS_FOR_SANDBOX", False, override_dict ) + # Collect any benchmark-specific environment variables + env_vars_from_module = getattr(benchmark_module, "SANDBOX_ENV_VARS", []) + sandbox_env_overrides = list(env_vars_from_module) if env_vars_from_module else [] + generation_module = get_arg_from_module_or_dict( benchmark_module, "GENERATION_MODULE", "nemo_skills.inference.generate", override_dict ) @@ -231,6 +238,7 @@ def get_benchmark_args_from_module( num_chunks=num_chunks, eval_subfolder=eval_subfolder, benchmark_group=benchmark_group, + sandbox_env_overrides=sandbox_env_overrides, ) @@ -517,6 +525,23 @@ def prepare_eval_commands( job_needs_sandbox_to_keep_mounts = any( benchmarks_dict[b].keep_mounts_for_sandbox for b in job_benchmarks ) + # Aggregate per-job sandbox env overrides from participating benchmarks (first key wins) + ordered_benchmarks = [b for b in benchmarks_dict.keys() if b in job_benchmarks] + env_map: Dict[str, str] = {} + env_source: Dict[str, str] = {} + for b in ordered_benchmarks: + for override in benchmarks_dict[b].sandbox_env_overrides: + key, value = override.split("=", 1) + if key in env_map and env_map[key] != value: + raise ValueError( + f"Conflicting sandbox environment overrides for key '{key}': " + f"'{env_map[key]}' (from {env_source[key]}) vs '{value}' (from {b}). " + "Please submit the benchmarks as separate jobs or increase num_jobs so they do not share a job." + ) + env_map[key] = value + env_source[key] = b + job_sandbox_env_overrides = [f"{k}={v}" for k, v in env_map.items()] + # TODO: move to a dataclass job_batches.append( ( @@ -528,6 +553,7 @@ def prepare_eval_commands( job_server_address, # a check above guarantees that this is the same for all tasks in a job generation_task.get_server_command_fn(), + job_sandbox_env_overrides, ) ) job_server_config, job_server_address, job_extra_arguments = pipeline_utils.configure_client( diff --git a/nemo_skills/pipeline/utils/exp.py b/nemo_skills/pipeline/utils/exp.py index b2a89ded13..f150d4fce8 100644 --- a/nemo_skills/pipeline/utils/exp.py +++ b/nemo_skills/pipeline/utils/exp.py @@ -366,6 +366,7 @@ def add_task( installation_command: str | None = None, skip_hf_home_check: bool | None = None, dry_run: bool = False, + sandbox_env_overrides: list[str] | None = None, ): """Wrapper for nemo-run exp.add to help setting up executors and dependencies. @@ -525,6 +526,10 @@ def add_task( "LISTEN_PORT": sandbox_port, "NGINX_PORT": sandbox_port, } + if sandbox_env_overrides: + for override in sandbox_env_overrides: + key, value = override.split("=", 1) + sandbox_env_updates.setdefault(key, value) current_env_vars = cluster_config.get("env_vars", []).copy() for override in current_env_vars: if "PYTHONPATH" in override: