diff --git a/docs/evaluation/index.md b/docs/evaluation/index.md index b1c191963b..32f885d4d9 100644 --- a/docs/evaluation/index.md +++ b/docs/evaluation/index.md @@ -12,6 +12,7 @@ We support many popular benchmarks and it's easy to add new in the future. The f - [**Multilingual**](./multilingual.md): e.g. [mmlu-prox](./multilingual.md#mmlu-prox), [flores-200](./multilingual.md#flores-200), [wmt24pp](./multilingual.md#wmt24pp) - [**Speech & Audio**](./speech-audio.md): e.g. [asr-leaderboard](./speech-audio.md#asr-leaderboard), [mmau-pro](./speech-audio.md#mmau-pro) - [**Vision-Language Models (VLM)**](./vlm.md): e.g. [mmmu-pro](./vlm.md#mmmu-pro) +- [**Speculative Decoding (SD)**](./speculative-decoding.md): e.g. [SPEED-Bench](./speculative-decoding.md#SPEED-Bench) See [nemo_skills/dataset](https://github.com/NVIDIA-NeMo/Skills/blob/main/nemo_skills/dataset) where each folder is a benchmark we support. diff --git a/docs/evaluation/speculative-decoding.md b/docs/evaluation/speculative-decoding.md new file mode 100644 index 0000000000..d795e2f78e --- /dev/null +++ b/docs/evaluation/speculative-decoding.md @@ -0,0 +1,96 @@ +# Speculative Decoding + +This section details how to evaluate speculative decoding (SD) benchmarks. +SD has emerged as a leading technique for accelerating LLM inference. By allowing a smaller draft model to propose multiple future tokens that are verified in a single forward pass by a larger target model, SD can significantly increase system throughput. + +In all SD benchmarks we want to measure two qualitative metrics for draft accuracy/quality: acceptance length (AL), acceptance rate (AR). +Other metric in this group is conditional acceptance rate (or per-position acceptance rate), which measures the acceptance rate in a given position conditioned that all previous tokens were accepted. + +For more advanced evaluation of SD, including throughput and per-category metrics, please use the evaluation framework [here](https://github.com/NVIDIA/Model-Optimizer/tree/main/examples/specdec_bench). + + +## How we evaluate? + +!!! note + The current evaluation supports only SGLang and VLLM servers. + +The evaluation is executed by the following process: + +1. Get SD metrics from `/metrics` endpoint of the server. +2. Send the benchmark's prompts to the server. +3. Get metrics from `/metrics` endpoint, and calculate the difference from step (1), to get the average SD metrics (AL, AR, etc.). + +!!! note + For `local` executor and SGLang server, we also support a flow which writes a metrics file per request to a local path, and then we calculate the SD metrics based on this file. This way, we can have a per-request metric, which can be relevant in some cases. More information on this feature can be found in [SGLang Documentation](https://docs.sglang.io/advanced_features/server_arguments.html#requestmetricsexporter-configuration). + + +## Supported Benchmarks + +### SPEED-Bench + +- Benchmark is defined in [`nemo_skills/dataset/speed-bench/__init__.py`](https://github.com/NVIDIA-NeMo/Skills/blob/main/nemo_skills/dataset/speed-bench/__init__.py) +- Original benchmark source, is [here](https://huggingface.co/datasets/nvidia/SPEED-Bench). +- NOTICE: This dataset is governed by the [NVIDIA Evaluation Dataset License Agreement](https://huggingface.co/datasets/nvidia/SPEED-Bench/blob/main/License.pdf). For each dataset a user elects to use, the user is responsible for checking if the dataset license is fit for the intended purpose. The `prepare_data` script automatically fetches data from all the source datasets. + +#### Data preparation + +See example of data preparation command in [main evaluation docs](../evaluation/index.md#using-data-on-cluster). + +```shell +ns prepare_data speed-bench --data_dir= --cluster= +``` + +Other supported options: + + * **config**: select which config to prepare, can be one of the splits in the dataset (e.g., `qualitative`, `throughput_2k`) or `all` to prepare all of the configs. + + +#### Evaluation command + +An example of running Llama 3.3 70B with external draft Llama 3.2 1B using SGLang and a draft length of 3: + +```bash +ns eval \ + --cluster= \ + --data_dir= \ + --output_dir= \ + --benchmarks=speed-bench \ + --model=meta-llama/Llama-3.3-70B-Instruct \ + --server_args="--speculative-algorithm STANDALONE --speculative-draft-model-path meta-llama/Llama-3.2-1B-Instruct --speculative-num-steps 3 --speculative-eagle-topk 1 --torch-compile-max-bs 32 --max-running-requests 32 --cuda-graph-max-bs 32 --mem-fraction-static 0.8" \ + --server_nodes=1 \ + --server_gpus=8 \ + --server_type=sglang \ + ++inference.tokens_to_generate=1024 +``` + +Example evaluation metrics: + +``` +--------------------------------------------- speed-bench ---------------------------------------------- +evaluation_mode | num_entries | avg_tokens | gen_seconds | spec_acceptance_length | spec_acceptance_rate +pass@1 | 880 | 464 | 139 | 2.78 | 69.38 +``` + +An example of running Llama 3.3 70B with EAGLE3 using vLLM and a draft length of 3: + +```bash +ns eval \ + --cluster= \ + --data_dir= \ + --output_dir= \ + --benchmarks=speed-bench \ + --model=meta-llama/Llama-3.3-70B-Instruct \ + --server_args="--speculative-config '{\"method\": \"eagle3\", \"num_speculative_tokens\": 3, \"model\": \"nvidia/Llama-3.3-70B-Instruct-Eagle3\"}'" \ + --server_nodes=1 \ + --server_gpus=8 \ + --server_type=vllm \ + ++inference.tokens_to_generate=1024 +``` + +Example evaluation metrics: + +``` +--------------------------------------------- speed-bench ---------------------------------------------- +evaluation_mode | num_entries | avg_tokens | gen_seconds | spec_acceptance_length | spec_acceptance_rate +pass@1 | 880 | 463 | 104 | 2.37 | 45.52 +``` \ No newline at end of file diff --git a/mkdocs.yml b/mkdocs.yml index c395b0aabe..1532b6cbad 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -89,6 +89,7 @@ nav: - evaluation/vlm.md - evaluation/other-benchmarks.md - evaluation/robustness.md + - evaluation/speculative-decoding.md - External benchmarks: evaluation/external-benchmarks.md - Agentic Inference: - agentic_inference/parallel_thinking.md diff --git a/nemo_skills/dataset/speed-bench/__init__.py b/nemo_skills/dataset/speed-bench/__init__.py new file mode 100644 index 0000000000..e62e22ad40 --- /dev/null +++ b/nemo_skills/dataset/speed-bench/__init__.py @@ -0,0 +1,20 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# settings that define how evaluation should be done by default (all can be changed from cmdline) +REQUIRES_DATA_DIR = True +METRICS_TYPE = "specdec" +EVAL_SPLIT = "qualitative" +GENERATION_ARGS = "++prompt_format=openai ++eval_type=specdec ++inference.include_response=true" +GENERATION_MODULE = "nemo_skills.inference.eval.specdec" diff --git a/nemo_skills/dataset/speed-bench/prepare.py b/nemo_skills/dataset/speed-bench/prepare.py new file mode 100644 index 0000000000..6226f0c909 --- /dev/null +++ b/nemo_skills/dataset/speed-bench/prepare.py @@ -0,0 +1,636 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import random +import re +from enum import Enum +from pathlib import Path +from typing import Any, Literal, get_args + +import numpy as np +import pandas as pd +import tiktoken +from datasets import Dataset, concatenate_datasets, load_dataset + +DATASET_CONFIG = Literal[ + "qualitative", "throughput_1k", "throughput_2k", "throughput_8k", "throughput_16k", "throughput_32k" +] + +TURNS_PLACEHOLDER = "FULL BENCHMARK DATA SHOULD BE FETCHED FROM THE SOURCE USING SPECDEC_BENCH" +HLE_RNG = np.random.default_rng(42) + + +class BenchmarkDataset(str, Enum): + """Enum for benchmark datasets used in SPEED-Bench. + + Each enum value represents a HuggingFace dataset identifier used for + loading external benchmark datasets. + """ + + BAMBOO = "RUCAIBox/BAMBOO" + CNN_DAILYMAIL = "abisee/cnn_dailymail" + HLE = "cais/hle" + LIVECODEBENCH = "livecodebench/code_generation_lite" + CODE_CONTESTS = "deepmind/code_contests" + MTBENCH_101 = "mtbench101/mt-bench-101" + OPUS100 = "Helsinki-NLP/opus-100" + CHATRAG_BENCH = "nvidia/ChatRAG-Bench" + MMLU_PRO = "TIGER-Lab/MMLU-Pro" + ADALEVAL_STACKSELECT = "AdaLEval/stackselect" + ADALEVAL_TEXTSORT = "AdaLEval/textsort" + ROLEBENCH = "ZenMoore/RoleBench" + ROLEBENCH_ROLES = "ZenMoore/RoleBench/roles" + COSER = "Neph0s/CoSER" + + +DATASETS_AND_LOADERS_FUNCTIONS = { + BenchmarkDataset.BAMBOO.value: lambda dataset_name, config_name: load_dataset( + "json", data_files={"test": config_name}, split="test" + ), + BenchmarkDataset.CNN_DAILYMAIL.value: lambda dataset_name, config_name: load_dataset( + dataset_name, config_name, split="test" + ), + BenchmarkDataset.HLE.value: lambda dataset_name, config_name: load_dataset( + dataset_name, split="test", revision="021a3d71f516a7ac28ceb8d284969902edf1edeb" + ) + if config_name != "train_test_split" + else load_dataset( + dataset_name, split="test", revision="021a3d71f516a7ac28ceb8d284969902edf1edeb" + ).train_test_split(test_size=0.5, shuffle=True, seed=42), + BenchmarkDataset.LIVECODEBENCH.value: lambda dataset_name, config_name: load_dataset( + "json", + data_files={ + "test": [ + f"https://huggingface.co/datasets/livecodebench/code_generation_lite/resolve/0fe84c3912ea0c4d4a78037083943e8f0c4dd505/{file_name}.jsonl" + for file_name in ["test", "test2", "test3", "test4", "test5", "test6"] + ] + }, + split="test", + ), + BenchmarkDataset.CODE_CONTESTS.value: lambda dataset_name, config_name: load_dataset( + dataset_name, split="test", revision="802411c3010cb00d1b05bad57ca77365a3c699d6" + ), + BenchmarkDataset.MTBENCH_101.value: lambda dataset_name, config_name: load_dataset( + "json", data_files={"test": config_name}, split="test" + ), + BenchmarkDataset.OPUS100.value: lambda dataset_name, config_name: load_dataset( + dataset_name, config_name, split="test", revision="805090dc28bf78897da9641cdf08b61287580df9" + ), + BenchmarkDataset.CHATRAG_BENCH.value: lambda dataset_name, config_names: concatenate_datasets( + [ + load_dataset(dataset_name, config_name, split="test", revision="af6c7d420ddddf21f54f8ab3394bbf462aad2577") + for config_name in config_names + ] + ), + BenchmarkDataset.MMLU_PRO.value: lambda dataset_name, config_name: load_dataset( + dataset_name, split="test", revision="30527804ea8854662078e457808040d872ecdf29" + ), + BenchmarkDataset.ADALEVAL_STACKSELECT.value: lambda dataset_name, config_name: load_dataset( + "json", data_files={"test": config_name}, split="test" + ), + BenchmarkDataset.ADALEVAL_TEXTSORT.value: lambda dataset_name, config_name: load_dataset( + "json", data_files={"test": config_name}, split="test" + ), + BenchmarkDataset.ROLEBENCH.value: lambda dataset_name, config_name: pd.read_json(config_name, lines=True), + BenchmarkDataset.ROLEBENCH_ROLES.value: lambda dataset_name, config_name: load_dataset( + "json", data_files={"test": config_name}, split="test" + ), + BenchmarkDataset.COSER.value: lambda dataset_name, config_name: load_dataset( + "json", data_files={"test": config_name.replace("tree", "raw") + "/test/test_set.json"}, split="test" + ), +} + +EXTERNAL_DATASETS = dict() + + +def _get_external_dataset(dataset_name: str, config_name: str = "default"): + full_name = f"{dataset_name}_{config_name}" + if full_name not in EXTERNAL_DATASETS: + EXTERNAL_DATASETS[full_name] = DATASETS_AND_LOADERS_FUNCTIONS[dataset_name](dataset_name, config_name) + if config_name == "train_test_split": + EXTERNAL_DATASETS[full_name] = ( + EXTERNAL_DATASETS[full_name]["train"], + EXTERNAL_DATASETS[full_name]["test"], + ) + return EXTERNAL_DATASETS[full_name] + + +def _generate_stackselect_prompt(question: str, answers: list[str], answer: str, num_tokens: int) -> str: + random.seed(42) + encoder = tiktoken.get_encoding("o200k_base") + # Original prompt as given in Ada-LEval paper: https://arxiv.org/pdf/2404.06480 + prompt = """ +You are an AI assistant. Your job is to find out the most helpful answer to a given question. +Each time, you will be provided with a question and n answers to this question. +Each answer begins with an 'A' and a number(e.g. A4), which represents its designation. +You need to determine which answer is the most helpful one to the question. +The case sample is shown below and you should give me the answer in the format exactly the same as the sample. + +However, you should NOT focus on the content of sample answer. + +Sample Input (format only): + +The question is given below. +XXX(The content of question) +Possible answers are given below. +A1: +XXX(The content of answer 1) +A2: +XXX(The content of answer 2) +. +. +. +An: +XXX(The content of answer n) +Now the answers are over, please decide which answer is the most helpful one to the question. +You must give me the designation of the MOST helpful answer and the reason why you choose this answer. +For every other answer, you must give me the reason why you do not choose this answer. + +Sample Output (format only): + +Answer: The designation of the most helpful answer.(e.g. A4 means answer 4 is the most helpful answer) +Explanation: +A4: The reason why you choose this answer. +A1: The reason why you do not choose this answer. +A2: The reason why you do not choose this answer. +. +. +. +An: The reason why you do not choose this answer. +""" + prompt += "The question is given below.\n" + prompt += question + "\n\n" + prompt += "Possible answers are given below.\n" + tokens_prompt = len(encoder.encode(prompt, disallowed_special=())) + end_prompt = "Now the answers are over, please decide which answer is the most helpful one to the question. \n" + end_prompt += ( + "You must give me the designation of the MOST helpful answer and the reason why you choose this answer.\n" + ) + end_prompt += "For every other answer, you must give me the reason why you do not choose this answer.\n" + end_prompt_tokens = len(encoder.encode(end_prompt, disallowed_special=())) + correct_answer_i = int(answer.strip("A")) - 1 + correct_answer_tokens = len( + encoder.encode(answer + ":\n\n" + answers[correct_answer_i] + "\n\n", disallowed_special=()) + ) + all_tokens = tokens_prompt + end_prompt_tokens + correct_answer_tokens + answers_to_add_stop = 0 + for i, answer in enumerate(answers): + if i == correct_answer_i: + continue + answer_to_add = f"A{i + 1}:\n\n{answer}\n\n" + answer_to_add_tokens = len(encoder.encode(answer_to_add, disallowed_special=())) + if all_tokens + answer_to_add_tokens > num_tokens: + break + answers_to_add_stop = i + all_tokens += answer_to_add_tokens + answers_to_add = ( + answers[: answers_to_add_stop + 1] + if answers_to_add_stop >= correct_answer_i + else [answers[correct_answer_i]] + answers[: answers_to_add_stop + 1] + ) + random.shuffle(answers_to_add) + for i, answer in enumerate(answers_to_add): + prompt += f"A{i + 1}:\n\n{answer}\n\n" + prompt += end_prompt + return prompt + + +def _generate_textsort_prompt(prompt: str) -> str: + # Original prompt as given in Ada-LEval paper: https://arxiv.org/pdf/2404.06480 + original_instruction = "\n You are an AI assistant. Your job is to sort multiple book sections into the correct order.\n Each time, you will be provided with 4 pieces of text.\n These texts form a continuous part of a book, but are provided in random order.\n You need to find the correct order and return the answer in a string.\n For example, if you output [4, 1, 3, 2], that means the correct order is: Part 4 -> Part 1 -> Part 3 -> Part 2.\n You will also be provided with the neighboring paragraphs before and after the 4 pieces of texts. \n\n The case sample is shown below and you should give me the answer in the format exactly the same as the sample. \n\n However, you should NOT focus on the content of sample answer. \n\n Please do NOT output any extra content. \n Sample Input (format only): \n\n Before: XXX (Text before the continuous book part)\n\n\n Part 1: XXX\n\n\n Part 2: XXX\n\n\n Part 3: XXX\n\n\n Part 4: XXX\n\n\n After: XXX (Text after the continuous book part)\n\n\n Sample Output (format only): \n\n Answer: [4, 1, 3, 2] \n\n\n\n" + + new_instruction = """ +You are an AI assistant. Your job is to sort multiple book sections into the correct order. +Each time, you will be provided with 4 pieces of text. +These texts form a continuous part of a book, but are provided in random order. +You need to find the correct order and write the all the parts in the correct order. +For example, if the correct order is: Part 4 -> Part 1 -> Part 3 -> Part 2, you need to answer with a continous text of all the parts in the correct order. +You should NOT change the text, just write it in the order it should appear. +You will also be provided with the neighboring paragraphs before and after the 4 pieces of texts. +You should NOT output the before and after paragraphs, just the text in the correct order. + +The case sample is shown below and you should give me the answer in the format exactly the same as the sample. + +However, you should NOT focus on the content of sample answer. + +Please do NOT output any extra content. + +Sample Input (format only): + +Before: BBB (Text before the continuous book part) + + +Part 1: XXX + + +Part 2: YYY + + +Part 3: ZZZ + + +Part 4: WWW + + +After: AAA (Text after the continuous book part) + +Sample Output (format only): + +Answer: + + +WWW + +XXX + +ZZZ + +YYY +""" + return prompt.replace(original_instruction, new_instruction, 1) + + +def _generate_writing_prompt(contents: list[str]) -> str: + content = "\n\n".join([f"START CONTENT {i + 1}\n\n{content}\n\nEND CONTENT" for i, content in enumerate(contents)]) + # Inspired by the prompt used in BAMBOO paper: https://arxiv.org/pdf/2309.13345 + prompt = f""" +I want you to act as a long dialogue completer. +Given a long dialogue(s), your objectives are: +1. Add one speaker mentioned in the past dialogue(s) at the end of the last sentence of each dialogue (between START CONTENT and END CONTENT) to complete the sentence and ensure its semantic integrity. At here, the added word must be a person's name which appears in the dialogue. +2. Continue the dialogue(s) with one or more speakers who appeared in the dialogue(s) before. Be coherent with the previous dialogue(s) and be creative in your response. +The content of the dialogue(s) is given below. + + +{content} +""" + return prompt + + +def _pad_or_truncate_prompt(prompt: str, target_num_tokens: int, padding: str = "Answer now please.\n") -> str: + encoder = tiktoken.get_encoding("o200k_base") + + tokens = encoder.encode(prompt, disallowed_special=()) + current_num_tokens = len(tokens) + + if current_num_tokens > target_num_tokens: + # Truncate if too long + tokens = encoder.encode(prompt, disallowed_special=()) + return encoder.decode(tokens[:target_num_tokens]) + elif current_num_tokens < target_num_tokens: + # Add padding if too short + padding_tokens = encoder.encode(padding, disallowed_special=()) + tokens_needed = target_num_tokens - current_num_tokens + # Calculate how many full padding sequences we need + num_padding_repeats = (tokens_needed + len(padding_tokens) - 1) // len(padding_tokens) + padded_prompt = prompt + (padding * num_padding_repeats) + # Truncate to exact target length + padded_tokens = encoder.encode(padded_prompt, disallowed_special=()) + return encoder.decode(padded_tokens[:target_num_tokens]) + else: + return prompt + + +def _generate_bamboo_prompt(external_dataset: "Dataset", num_tokens: int) -> str: + prompt = _generate_writing_prompt(external_dataset["content"]) + return _pad_or_truncate_prompt(prompt, num_tokens) + + +def _generate_chatrag_bench_prompt(external_dataset: "Dataset") -> str: + prompt = ( + "Please give a full and complete answer for the questions. \n\nContext:\n{context}\n\nQuestion:\n{question}" + ) + context = "\n\n".join([ctx["text"] for ctx in external_dataset["ctxs"][0]]) + questions = [message["content"] for message in external_dataset["messages"][0] if message["role"] == "user"] + + return [prompt.format(context=context, question=questions[0])] + questions[1:] + + +def _generate_coser_prompt(external_dataset: "Dataset") -> str: + rng = np.random.default_rng(seed=12347) + # Original prompt as given in CoSER paper: https://arxiv.org/pdf/2404.06480 + prompt = """You are {character} from {book_name}. +==={character}'s Profile=== +{character_profile} +===Current Scenario=== +{scenario} +===Information about the other Characters=== +{other_character_profiles_str} +===Your Inner Thoughts=== +{motivation} + +===Requirements=== +Your output should include **thought**, **speech**, and **action**. Use [your thought] +for thoughts, which others can't see, e.g. [I'm terrified, but I must appear strong.]. Use +(your action) for actions, which others can see, such as (watches silently, trying to control +her fear and anger).""" + character = rng.choice(external_dataset["major_characters"][0]) + character_profile = external_dataset["character_profiles"][0][character] + scenario = external_dataset["scenario"][0] + book_name = external_dataset["book"][0] + motivation = next( + ( + key_character["motivation"] + for key_character in external_dataset["key_characters"][0] + if key_character["name"] == character + ), + "No motivation provided", + ) + other_character_profiles_str = "\n\n".join( + [ + f"{character_name}: {character_profile}" + for character_name, character_profile in external_dataset["character_profiles"][0].items() + if character_name != character and character_profile is not None + ] + ) + return prompt.format( + character=character, + character_profile=character_profile, + book_name=book_name, + scenario=scenario, + other_character_profiles_str=other_character_profiles_str, + motivation=motivation, + ) + + +def _generate_mmlu_pro_prompt(external_dataset: "Dataset", subject: str) -> str: + def get_question_and_options(question, options): + options = [(chr(ord("A") + i), a) for i, a in enumerate(options)] + options_str = "\n".join([f"({letter}) {option}" for letter, option in options]) + return f"Question: {question}\n\nOptions: {options_str}\n\n" + + # Original prompt as given in MMLU-Pro paper: https://arxiv.org/pdf/2406.01574 + prompt = 'The following are multiple choice questions (with answers) about {subject}. Think step by step and then finish your answer with "the answer is (X)" where X is the correct letter choice.\n\n' + first_question = prompt.format(subject=subject) + get_question_and_options( + external_dataset["question"][0], external_dataset["options"][0] + ) + return [first_question] + [ + get_question_and_options(question, options) + for question, options in zip(external_dataset["question"][1:], external_dataset["options"][1:]) + ] + + +def _generate_hle_prompt( + example: dict[str, Any], hle_train: "pd.DataFrame", num_tokens: int, rng: "np.random.Generator" +) -> str: + encoder = tiktoken.get_encoding("o200k_base") + prompt = ( + "Please answer the question below.\n\nHere are some examples of question and answer pairs in the category of " + + example["category"] + + ":\n\n" + ) + prompt_tokens = encoder.encode(prompt) + example_tokens = encoder.encode(example["question"]) + current_num_tokens = len(prompt_tokens) + len(example_tokens) + hle_train_category = hle_train[hle_train["category"] == example["category"]] + + while current_num_tokens < num_tokens: + hle_train_category_sample = hle_train_category.sample(1, random_state=rng) + prompt += hle_train_category_sample["demonstration"].iloc[0] + current_num_tokens += len(hle_train_category_sample["tokens"].iloc[0]) + prompt_tokens += list(hle_train_category_sample["tokens"].iloc[0]) + + return encoder.decode(prompt_tokens[: num_tokens - len(example_tokens) + 1] + example_tokens) + + +def _get_num_tokens_from_config(speed_config: DATASET_CONFIG | str) -> int: + match = re.search(r"throughput_(\d+)k", speed_config) + if match: + return int(match.group(1)) * 1000 + else: + raise ValueError(f"Could not determine num_tokens from speed_config: {speed_config}") + + +def _fetch_all_turns_data(example: dict[str, Any], speed_config: DATASET_CONFIG | str) -> dict[str, Any]: + turns = example["turns"] + if not turns[0].startswith(TURNS_PLACEHOLDER): + return example + + if BenchmarkDataset.BAMBOO.value in example["source"]: + num_tokens = _get_num_tokens_from_config(speed_config) + src_ids = [int(match) for match in re.findall(r"_(\d+)", example["src_id"])] + external_dataset = _get_external_dataset(BenchmarkDataset.BAMBOO.value, config_name=example["source"]) + external_dataset = external_dataset.select(src_ids) + example["turns"] = [_generate_bamboo_prompt(external_dataset, num_tokens)] + + elif BenchmarkDataset.CNN_DAILYMAIL.value in example["source"]: + external_dataset = _get_external_dataset(BenchmarkDataset.CNN_DAILYMAIL.value, config_name="3.0.0").to_pandas() + src_id = example["src_id"] + article = external_dataset[external_dataset["id"] == src_id]["article"].iloc[0] + example["turns"] = [example["turns"][0].removeprefix(f"{TURNS_PLACEHOLDER}\n\n").format(article=article)] + + elif BenchmarkDataset.HLE.value in example["source"]: + if "qualitative" in speed_config: + external_dataset = _get_external_dataset(BenchmarkDataset.HLE.value, config_name="test").to_pandas() + src_id = example["src_id"] + example["turns"] = [external_dataset[external_dataset["id"] == src_id]["question"].iloc[0]] + elif "throughput" in speed_config: + num_tokens = _get_num_tokens_from_config(speed_config) + hle_train, hle_test = _get_external_dataset(BenchmarkDataset.HLE.value, config_name="train_test_split") + hle_train = hle_train.to_pandas() + hle_train = hle_train[hle_train["image"] == ""] + hle_train["demonstration"] = hle_train.apply( + lambda e: "Question: " + e["question"] + "\n\nAnswer: " + e["rationale"] + "\n\n", axis=1 + ) + hle_train["tokens"] = hle_train["demonstration"].apply( + lambda e: tiktoken.get_encoding("o200k_base").encode(e, disallowed_special=()) + ) + src_id = example["src_id"] + hle_test = hle_test.to_pandas() + external_dataset_example = hle_test[hle_test["id"] == src_id].iloc[0] + example["turns"] = [_generate_hle_prompt(external_dataset_example, hle_train, num_tokens, HLE_RNG)] + else: + raise ValueError(f"Invalid speed_config: {speed_config}") + + elif BenchmarkDataset.LIVECODEBENCH.value in example["source"]: + external_dataset = _get_external_dataset(BenchmarkDataset.LIVECODEBENCH.value, config_name="test").to_pandas() + src_id = example["src_id"] + external_dataset_example = external_dataset[external_dataset["question_id"] == src_id].iloc[0] + example["turns"] = [ + example["turns"][0] + .removeprefix(f"{TURNS_PLACEHOLDER}\n\n") + .format( + question=external_dataset_example["question_content"], + starter_code=external_dataset_example["starter_code"], + ) + ] + + elif BenchmarkDataset.CODE_CONTESTS.value in example["source"]: + external_dataset = _get_external_dataset(BenchmarkDataset.CODE_CONTESTS.value, config_name="test").to_pandas() + src_id = example["src_id"] + external_dataset_example = external_dataset[external_dataset["name"] == src_id].iloc[0] + example["turns"] = [ + example["turns"][0] + .removeprefix(f"{TURNS_PLACEHOLDER}\n\n") + .format(question=external_dataset_example["description"]) + ] + + elif BenchmarkDataset.MTBENCH_101.value in example["source"]: + external_dataset = _get_external_dataset(BenchmarkDataset.MTBENCH_101.value, config_name=example["source"]) + src_id = example["src_id"].rsplit("_", 1)[1] + external_dataset_example = external_dataset.select([int(src_id)]) + example["turns"] = [entry["user"] for entry in external_dataset_example["history"][0]] + + elif BenchmarkDataset.OPUS100.value in example["source"]: + _, config_name, src_id = example["src_id"].split("_") + external_dataset = _get_external_dataset(BenchmarkDataset.OPUS100.value, config_name=config_name) + external_dataset_example = external_dataset.select([int(src_id)]) + example["turns"] = [ + example["turns"][0] + .removeprefix(f"{TURNS_PLACEHOLDER}\n\n") + .format(question=external_dataset_example["translation"][0]) + ] + + elif BenchmarkDataset.CHATRAG_BENCH.value in example["source"]: + external_dataset = _get_external_dataset( + BenchmarkDataset.CHATRAG_BENCH.value, config_name=["hybridial", "sqa"] + ) + src_id = example["src_id"].rsplit("_", 1)[1] + external_dataset_example = external_dataset.select([int(src_id)]) + example["turns"] = _generate_chatrag_bench_prompt(external_dataset_example) + + elif BenchmarkDataset.MMLU_PRO.value in example["source"]: + external_dataset = _get_external_dataset(BenchmarkDataset.MMLU_PRO.value, config_name="test") + src_id = int(example["src_id"].split("(")[1].split(",")[0]) + external_dataset_example = external_dataset.select(range(src_id, src_id + len(example["turns"]))) + example["turns"] = _generate_mmlu_pro_prompt(external_dataset_example, example["sub_category"]) + + elif BenchmarkDataset.ADALEVAL_STACKSELECT.value in example["source"]: + num_tokens = _get_num_tokens_from_config(speed_config) + external_dataset = _get_external_dataset( + BenchmarkDataset.ADALEVAL_STACKSELECT.value, config_name=example["source"] + ).to_pandas() + src_id = example["src_id"] + external_dataset_example = external_dataset[external_dataset["question_id"] == src_id].iloc[0] + example["turns"] = [ + _pad_or_truncate_prompt( + _generate_stackselect_prompt( + question=external_dataset_example["question"], + answers=external_dataset_example["all_answers"], + answer=external_dataset_example["answer"], + num_tokens=num_tokens, + ), + num_tokens, + ) + ] + + elif BenchmarkDataset.ADALEVAL_TEXTSORT.value in example["source"]: + num_tokens = _get_num_tokens_from_config(speed_config) + external_dataset = _get_external_dataset( + BenchmarkDataset.ADALEVAL_TEXTSORT.value, config_name=example["source"] + ) + src_id = example["src_id"].split("_")[1] + external_dataset_example = external_dataset.select([int(src_id)]) + example["turns"] = [ + _pad_or_truncate_prompt(_generate_textsort_prompt(external_dataset_example["prompt"][0]), num_tokens) + ] + + elif BenchmarkDataset.ROLEBENCH.value in example["source"]: + config_name = example["src_id"].split("_")[1] + external_dataset = _get_external_dataset( + BenchmarkDataset.ROLEBENCH.value, + config_name=example["source"].replace("tree", "raw") + f"/{config_name}/role_specific/test.jsonl", + ) + roles_dataset = _get_external_dataset( + BenchmarkDataset.ROLEBENCH_ROLES.value, + config_name="https://huggingface.co/datasets/ZenMoore/RoleBench/raw/a57ed54f9613921e4a5f1b63601a558cd5acf971/profiles-eng/desc.json", + ) + src_ids = [int(match) for match in re.findall(r"_(\d+)", example["src_id"])][: len(example["turns"])] + external_dataset_example = external_dataset.iloc[src_ids] + role_name = external_dataset_example["role"].iloc[0] + role_description_and_catchphrases = roles_dataset[role_name][0] + example["turns"] = [ + example["turns"][0] + .removeprefix(f"{TURNS_PLACEHOLDER}\n\n") + .format(role_name=role_name, role_description_and_catchphrases=role_description_and_catchphrases) + + "\n" + + external_dataset_example["question"].iloc[0] + ] + [ + question.removeprefix(f"{role_name}, ").removeprefix(f" {role_name},") + for question in external_dataset_example["question"].iloc[1:] + ] + + elif BenchmarkDataset.COSER.value in example["source"]: + external_dataset = _get_external_dataset(BenchmarkDataset.COSER.value, config_name=example["source"]) + src_id = example["src_id"].split("_")[1] + external_dataset_example = external_dataset.select([int(src_id)]) + example["turns"] = [_generate_coser_prompt(external_dataset_example)] + + return example + + +def _resolve_external_data(dataset: Dataset, speed_config: DATASET_CONFIG | str) -> Dataset: + """Resolve all external data references in the dataset. + + Applies ``_fetch_all_turns_data`` to every example so that turn + placeholders are replaced with fully-resolved prompt text. + + Args: + dataset: The HuggingFace dataset with potentially unresolved turns. + speed_config: The SPEED-Bench config name used to determine + token-length parameters for throughput configs. + + Returns: + The dataset with all turns fully resolved. + """ + return dataset.map( + _fetch_all_turns_data, fn_kwargs={"speed_config": speed_config}, desc=f"Preparing config {speed_config}" + ) + + +def prepare_data(args: argparse.Namespace) -> None: + """Prepare and save benchmark data to disk. + + Calls the dataset's ``prepare_data`` classmethod which downloads and + resolves all external data references, then saves the fully-resolved + result as a parquet file so that subsequent benchmark runs can load + directly from disk without re-downloading. + + Args: + args: Parsed CLI arguments containing dataset type, config, + output directory, and optional filtering parameters. + """ + configs = get_args(DATASET_CONFIG) if args.config == "all" else [args.config] + + for config in configs: + dataset = load_dataset("nvidia/SPEED-Bench", config, split="test") + dataset = _resolve_external_data(dataset, config) + dataset = dataset.map( + lambda example: {"messages": [{"role": "user", "content": turn} for turn in example["turns"]]}, + remove_columns=["turns"], + ) + output_path = args.output_dir / f"{config}.jsonl" + dataset.to_json(output_path) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Download and prepare SPEED-Bench dataset for nemo-skills evaluation.", + ) + parser.add_argument( + "--config", + type=str, + default="all", + choices=list(get_args(DATASET_CONFIG)) + ["all"], + help='SPEED-Bench configuration to prepare. Use "all" to prepare all configs. (default: %(default)s)', + ) + parser.add_argument( + "--output_dir", + type=Path, + default=Path(__file__).parent, + help="Directory to save the prepared dataset files (default: %(default)s)", + ) + + args = parser.parse_args() + prepare_data(args) diff --git a/nemo_skills/evaluation/evaluator/__init__.py b/nemo_skills/evaluation/evaluator/__init__.py index 6e1365e856..caeaaf95a3 100644 --- a/nemo_skills/evaluation/evaluator/__init__.py +++ b/nemo_skills/evaluation/evaluator/__init__.py @@ -41,6 +41,7 @@ "bigcodebench": "nemo_skills.evaluation.evaluator.code:eval_bigcodebench", "human_eval_infilling": "nemo_skills.evaluation.evaluator.code:eval_human_eval_infilling", "mmau-pro": "nemo_skills.evaluation.evaluator.mmau_pro:eval_mmau_pro", + "specdec": "nemo_skills.evaluation.evaluator.specdec:eval_specdec", } # Class-based evaluators: eval_type -> "module_path:ClassName" diff --git a/nemo_skills/evaluation/evaluator/specdec.py b/nemo_skills/evaluation/evaluator/specdec.py new file mode 100644 index 0000000000..1d1ca7eb0b --- /dev/null +++ b/nemo_skills/evaluation/evaluator/specdec.py @@ -0,0 +1,103 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import os +from typing import Any + +from nemo_skills.evaluation.evaluator.base import BaseEvaluatorConfig +from nemo_skills.utils import get_logger_name, nested_dataclass + +LOG = logging.getLogger(get_logger_name(__file__)) + + +@nested_dataclass(kw_only=True) +class SpecdecEvaluatorConfig(BaseEvaluatorConfig): + """Config for the speculative-decoding evaluator. + + Attributes: + specdec_stats: Pre-computed speculative decoding delta statistics + (acceptance_length, acceptance_rate, per_position_acceptance_rates, + etc.). Injected by :class:`SpecdecGenerationTask` after computing + the before/after delta from the server's ``/metrics`` endpoint. + """ + + specdec_stats: dict[str, Any] + + def __post_init__(self): + if not isinstance(self.specdec_stats, dict): + raise TypeError(f"specdec_stats must be a dictionary, got {type(self.specdec_stats)}") + + +def eval_specdec(cfg: dict[str, Any]) -> None: + """Evaluate speculative decoding performance using pre-computed delta stats. + + This evaluator receives speculative decoding statistics that were computed + by :class:`SpecdecGenerationTask` using a before/after delta of the + server's Prometheus ``/metrics`` counters. + + It stamps each data point in the output JSONL with the computed metrics so + that the downstream :class:`SpecdecMetrics` class can aggregate them. + + Metrics stamped onto each data point: + + * ``acceptance_length`` — ``1 + delta_accepted / delta_drafts`` + * ``acceptance_rate`` — ``(delta_accepted / delta_draft_tokens) * 100`` + * ``num_drafts`` — number of draft rounds during this run + * ``draft_tokens`` — total draft tokens proposed during this run + * ``accepted_tokens`` — total tokens accepted during this run + * ``per_position_acceptance_rates`` — list of per-position acceptance + probabilities + + Args: + cfg: Evaluator configuration dict. Must contain ``input_file`` and + ``specdec_stats`` keys. + """ + eval_config = SpecdecEvaluatorConfig(**cfg) + + # ------------------------------------------------------------------ + # 1. Read output file + # ------------------------------------------------------------------ + jsonl_file = eval_config.input_file + with open(jsonl_file, "rt", encoding="utf-8") as fin: + data = [json.loads(line) for line in fin] + + # ------------------------------------------------------------------ + # 2. Inject pre-computed spec-decode stats into each data point + # ------------------------------------------------------------------ + stats = eval_config.specdec_stats + LOG.info( + "Stamping spec-decode stats onto %d data points: " + "acceptance_length=%.4f, acceptance_rate=%.2f%%, num_drafts=%d", + len(data), + stats["acceptance_length"], + stats["acceptance_rate"], + stats["num_drafts"], + ) + for sample in data: + for key, value in stats.items(): + if key not in sample: + sample[key] = value + + # ------------------------------------------------------------------ + # 3. Write back + # ------------------------------------------------------------------ + tmp_file = jsonl_file + "-tmp" + with open(tmp_file, "wt", encoding="utf-8") as fout: + for sample in data: + fout.write(json.dumps(sample) + "\n") + + os.replace(tmp_file, jsonl_file) + LOG.info("Speculative decoding evaluation complete. Updated %d entries.", len(data)) diff --git a/nemo_skills/evaluation/metrics/map_metrics.py b/nemo_skills/evaluation/metrics/map_metrics.py index fae7308ec4..92f9f3282c 100644 --- a/nemo_skills/evaluation/metrics/map_metrics.py +++ b/nemo_skills/evaluation/metrics/map_metrics.py @@ -43,6 +43,7 @@ from nemo_skills.evaluation.metrics.physics_metrics import PhysicsMetrics from nemo_skills.evaluation.metrics.ruler_metrics import RulerMetrics from nemo_skills.evaluation.metrics.simpleqa_metrics import SimpleQAMetrics +from nemo_skills.evaluation.metrics.specdec_metrics import SpecdecMetrics from nemo_skills.evaluation.metrics.translation_metrics import TranslationMetrics METRICS_MAP = { @@ -86,6 +87,7 @@ "compute-eval": ComputeEvalMetrics, "gradingbench": GradingBenchMetrics, "critpt": CritPtMetrics, + "specdec": SpecdecMetrics, } diff --git a/nemo_skills/evaluation/metrics/specdec_metrics.py b/nemo_skills/evaluation/metrics/specdec_metrics.py new file mode 100644 index 0000000000..d62d08e6ca --- /dev/null +++ b/nemo_skills/evaluation/metrics/specdec_metrics.py @@ -0,0 +1,98 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from nemo_skills.evaluation.metrics.base import BaseMetrics, as_float, as_int, as_percentage +from nemo_skills.utils import get_logger_name + +LOG = logging.getLogger(get_logger_name(__file__)) + + +class SpecdecMetrics(BaseMetrics): + """Metrics for SPEED-Bench speculative decoding evaluation. + + Reads per-data-point speculative decoding statistics that were computed + by the generation task using a before/after delta of the server's + Prometheus ``/metrics`` counters and stamped by the ``eval_specdec`` + evaluator. + + Key metrics (computed from Prometheus counter deltas): + + * **acceptance_length** (AL) — ``1 + delta_accepted / delta_drafts``. + The "+1" accounts for the target model's verified token that is always + emitted even on a full rejection. + * **acceptance_rate** — ``(delta_accepted / delta_draft_tokens) * 100``. + * **per_position_acceptance_rates** — per-position acceptance probability, + computed as ``delta_per_pos[i] / delta_drafts``. + + """ + + def __init__(self): + super().__init__(compute_no_answer=False) + + def _get_score_dict(self, prediction: dict) -> dict[str, bool | int | float]: + return { + "spec_draft_tokens": prediction["draft_tokens"], + "spec_accepted_tokens": prediction["accepted_tokens"], + "spec_num_drafts": prediction["num_drafts"], + "spec_acceptance_length": prediction["acceptance_length"], + "spec_acceptance_rate": prediction["acceptance_rate"], + } + + def update(self, predictions: list[dict]) -> None: + """Update the evaluation results with the current element. + + Args: + predictions: Aggregated predictions across all generations. + Each prediction should contain speculative decoding metrics + stamped by the ``eval_specdec`` evaluator. + """ + super().update(predictions) + self._compute_pass_at_k( + predictions=predictions, predicted_answers=[pred.get("generation", None) for pred in predictions] + ) + + def get_metrics(self) -> dict: + """Get all computed metrics including speculative decoding statistics. + + Returns: + Nested dict of evaluation mode → metric name → value. + """ + metrics_dict = {} + + for agg_mode, agg_metric_dict in self.eval_dict.items(): + metrics_dict[agg_mode] = {} + self.update_common_metrics(metrics_dict[agg_mode]) + for metric_key, metric_value in agg_metric_dict.items(): + if metric_key.startswith("spec_"): + metrics_dict[agg_mode][metric_key] = metric_value / self.total + else: + metrics_dict[agg_mode][metric_key] = metric_value + self._add_std_metrics(metrics_dict) + + return metrics_dict + + def metrics_to_print(self) -> dict: + """Control which metrics are displayed in the summary table.""" + metrics_to_print = { + "num_entries": as_int, + "avg_tokens": as_int, + "gen_seconds": as_int, + "spec_acceptance_length": as_float, + "spec_acceptance_rate": as_float, + } + if self.compute_no_answer: + metrics_to_print["no_answer"] = as_percentage + return metrics_to_print diff --git a/nemo_skills/inference/eval/specdec.py b/nemo_skills/inference/eval/specdec.py new file mode 100644 index 0000000000..12691db824 --- /dev/null +++ b/nemo_skills/inference/eval/specdec.py @@ -0,0 +1,784 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import glob +import json +import logging +import os +import sys +from dataclasses import dataclass, field +from typing import Any + +import hydra +import requests + +from nemo_skills.inference.generate import ( + GenerationTask, + GenerationTaskConfig, +) +from nemo_skills.inference.model import server_params +from nemo_skills.utils import get_help_message, get_logger_name, nested_dataclass, setup_logging + +LOG = logging.getLogger(get_logger_name(__file__)) + + +class SpecDecodeMetricsError(Exception): + """Exception raised when fetching speculative decoding metrics fails.""" + + def __init__(self, message: str): + super().__init__(message) + self.message = message + + def __str__(self): + return self.message + + +# --------------------------------------------------------------------------- +# Speculative decoding metrics from the server's Prometheus endpoint +# --------------------------------------------------------------------------- + + +@dataclass +class SpecDecodeMetrics: + """Unified speculative decoding snapshot scraped from ``/metrics``.""" + + # VLLM counters + num_drafts: int = 0 + num_draft_tokens: int = 0 + num_accepted_tokens: int = 0 + accepted_per_pos: dict[int, int] = field(default_factory=dict) + + # SGLang gauges and counters + spec_accept_length: float = 0.0 + spec_accept_rate: float = 0.0 + num_requests: int = 0 + generation_tokens: int = 0 + + +def _fetch_metrics_text(base_url: str) -> str | None: + """Fetch raw Prometheus text from ``/metrics`` endpoint.""" + metrics_url = f"{base_url.rstrip('/')}/metrics" + try: + response = requests.get(metrics_url, timeout=30) + if response.status_code != 200: + LOG.warning("Metrics endpoint returned status %d", response.status_code) + return None + return response.text + except requests.RequestException as exc: + LOG.warning("Failed to fetch metrics from %s: %s", metrics_url, exc) + return None + + +def fetch_vllm_spec_decode_metrics(base_url: str) -> SpecDecodeMetrics: + """Fetch speculative decoding metrics from the server's ``/metrics`` endpoint. + + Parses Prometheus text-format exposition looking for VLLM + ``vllm:spec_decode_*`` counters including per-position acceptance counts. + + Args: + base_url: Server root URL, e.g. ``http://127.0.0.1:5000``. + + Returns: + :class:`SpecDecodeMetrics` with the scraped counters. + """ + text = _fetch_metrics_text(base_url) + if text is None: + message = "Failed to fetch metrics from the server" + LOG.error(message) + raise SpecDecodeMetricsError(message) + + metrics = SpecDecodeMetrics() + found_spec_decode = False + pos_label = 'position="' + + for line in text.split("\n"): + line = line.strip() + if not line or line.startswith("#"): + continue + if not line.startswith("vllm:spec_decode"): + continue + + found_spec_decode = True + if "_created" in line: + continue + + parts = line.split() + if not parts: + continue + + with contextlib.suppress(ValueError): + metric_value = int(float(parts[-1])) + if "num_drafts" in line: + metrics.num_drafts += metric_value + elif "num_draft_tokens" in line: + metrics.num_draft_tokens += metric_value + elif "num_accepted_tokens_per_pos" in line: + if pos_label in line: + start = line.index(pos_label) + len(pos_label) + end = line.index('"', start) + pos = int(line[start:end]) + metrics.accepted_per_pos[pos] = metrics.accepted_per_pos.get(pos, 0) + metric_value + elif "num_accepted_tokens" in line: + metrics.num_accepted_tokens += metric_value + + if not found_spec_decode: + message = "No vllm:spec_decode_* metrics found on the server (speculative decoding may not be enabled)." + LOG.error(message) + raise SpecDecodeMetricsError(message) + + return metrics + + +def find_sglang_metrics_file(metrics_dir: str) -> str | None: + """Find the most recent SGLang metrics file in the given directory. + + SGLang creates files like ``sglang-request-metrics-YYYYMMDD_HH.log``. + + Args: + metrics_dir: Directory containing SGLang metrics files. + + Returns: + Path to the most recent metrics file, or ``None`` if not found. + """ + if not os.path.isdir(metrics_dir): + LOG.warning("SGLang metrics directory does not exist: %s", metrics_dir) + return None + + pattern = os.path.join(metrics_dir, "sglang-request-metrics-*.log") + files = glob.glob(pattern) + if not files: + LOG.warning("No SGLang metrics files found matching pattern: %s", pattern) + return None + + # Return the most recently modified file + latest_file = max(files, key=os.path.getmtime) + LOG.info("Using SGLang metrics file: %s", latest_file) + return latest_file + + +def fetch_sglang_spec_decode_metrics(base_url: str) -> SpecDecodeMetrics: + """Fetch speculative decoding metrics from an SGLang server's ``/metrics`` endpoint. + + Parses Prometheus text-format exposition looking for ``sglang:spec_accept_*`` + gauges and ``sglang:num_requests_total`` / ``sglang:generation_tokens_total`` + counters. + + Args: + base_url: Server root URL, e.g. ``http://127.0.0.1:5000``. + + Returns: + :class:`SpecDecodeMetrics` with the scraped values. + """ + text = _fetch_metrics_text(base_url) + if text is None: + message = "Failed to fetch metrics from the server" + LOG.error(message) + raise SpecDecodeMetricsError(message) + + metrics = SpecDecodeMetrics() + found_spec = False + + for line in text.split("\n"): + line = line.strip() + if not line or line.startswith("#"): + continue + + parts = line.split() + if len(parts) < 2: + continue + + with contextlib.suppress(ValueError): + if "sglang:spec_accept_length{" in line or line.startswith("sglang:spec_accept_length "): + metrics.spec_accept_length = float(parts[-1]) + found_spec = True + elif "sglang:spec_accept_rate{" in line or line.startswith("sglang:spec_accept_rate "): + metrics.spec_accept_rate = float(parts[-1]) + found_spec = True + elif "sglang:num_requests_total{" in line or line.startswith("sglang:num_requests_total "): + metrics.num_requests = int(float(parts[-1])) + elif "sglang:generation_tokens_total{" in line or line.startswith("sglang:generation_tokens_total "): + metrics.generation_tokens = int(float(parts[-1])) + + if not found_spec: + message = "No sglang:spec_accept_* metrics found on the server (speculative decoding may not be enabled)." + LOG.error(message) + raise SpecDecodeMetricsError(message) + return metrics + + +def _build_specdec_stats( + *, + num_drafts: int, + draft_tokens: int, + accepted_tokens: int, + acceptance_rate_fraction: float, + acceptance_length: float, + per_position_acceptance_rates: list[float] | None = None, +) -> dict[str, Any]: + """Build a normalized spec-decode payload for evaluator injection.""" + return { + "num_drafts": num_drafts, + "draft_tokens": draft_tokens, + "accepted_tokens": accepted_tokens, + "acceptance_rate": acceptance_rate_fraction * 100, + "acceptance_length": acceptance_length, + "per_position_acceptance_rates": per_position_acceptance_rates or [], + } + + +def _compute_weighted_delta( + *, + before_avg: float, + after_avg: float, + before_count: int, + after_count: int, +) -> float | None: + """Compute benchmark-only average from cumulative weighted averages.""" + delta_count = after_count - before_count + if delta_count <= 0: + return None + if before_count == 0: + return after_avg + weighted_after = after_avg * after_count + weighted_before = before_avg * before_count + return (weighted_after - weighted_before) / delta_count + + +def compute_sglang_spec_decode_delta( + before: SpecDecodeMetrics, + after: SpecDecodeMetrics, +) -> dict[str, Any] | None: + """Compute benchmark-specific acceptance metrics from two SGLang snapshots. + + SGLang exposes ``spec_accept_length`` and ``spec_accept_rate`` as **gauges** + (running averages from the server's perspective). Combined with the + ``num_requests_total`` and ``generation_tokens_total`` **counters** we can + back out the benchmark-specific averages:: + + weighted_al_after = al_after x n_after + weighted_al_before = al_before x n_before + benchmark_al = (weighted_al_after - weighted_al_before) + / (n_after - n_before) + + If the before snapshot had 0 requests (fresh server), the after values are + returned as-is. + + Args: + before: Snapshot taken before generation. + after: Snapshot taken after generation. + + Returns: + Dictionary with ``acceptance_length``, ``acceptance_rate``, counters, + etc., or ``None`` if no requests were generated. + """ + delta_requests = after.num_requests - before.num_requests + delta_gen_tokens = after.generation_tokens - before.generation_tokens + + if delta_requests <= 0: + LOG.warning( + "SGLang: no new requests between before (%d) and after (%d) snapshots.", + before.num_requests, + after.num_requests, + ) + return None + + acceptance_length = _compute_weighted_delta( + before_avg=before.spec_accept_length, + after_avg=after.spec_accept_length, + before_count=before.num_requests, + after_count=after.num_requests, + ) + acceptance_rate_fraction = _compute_weighted_delta( + before_avg=before.spec_accept_rate, + after_avg=after.spec_accept_rate, + before_count=before.num_requests, + after_count=after.num_requests, + ) + if acceptance_length is None or acceptance_rate_fraction is None: + LOG.warning("SGLang: failed to compute weighted delta from request counters.") + return None + + LOG.info( + "SGLang Prometheus delta: requests=%d, gen_tokens=%d, acceptance_length=%.4f, acceptance_rate=%.2f%%", + delta_requests, + delta_gen_tokens, + acceptance_length, + acceptance_rate_fraction * 100, + ) + + return _build_specdec_stats( + num_drafts=delta_requests, + draft_tokens=delta_gen_tokens, + accepted_tokens=int(delta_gen_tokens * acceptance_rate_fraction) if delta_gen_tokens > 0 else 0, + acceptance_rate_fraction=acceptance_rate_fraction, + acceptance_length=acceptance_length, + ) + + +def compute_vllm_spec_decode_delta( + before: SpecDecodeMetrics, + after: SpecDecodeMetrics, +) -> dict[str, Any] | None: + """Compute the delta of speculative decoding counters between two snapshots. + + Uses the before/after pattern to isolate metrics to only the generation + run (since Prometheus counters are cumulative). + + Computes: + * **acceptance_rate** — ``delta_accepted / delta_draft_tokens * 100`` + * **acceptance_length** — ``1 + delta_accepted / delta_drafts`` + (the "+1" accounts for the target model's verified token that + is always emitted even on a full rejection) + * **per_position_acceptance_rates** — per-position acceptance + probability, computed as ``delta_per_pos[i] / delta_drafts`` + + Args: + before: Counters snapshot taken before generation. + after: Counters snapshot taken after generation. + + Returns: + Dictionary with computed spec decode statistics, or ``None`` if the + delta contains no meaningful data. + """ + delta_drafts = after.num_drafts - before.num_drafts + delta_draft_tokens = after.num_draft_tokens - before.num_draft_tokens + delta_accepted = after.num_accepted_tokens - before.num_accepted_tokens + + # Compute per-position acceptance rates + per_pos_rates: list[float] = [] + if delta_drafts > 0: + positions = sorted(set(before.accepted_per_pos.keys()) | set(after.accepted_per_pos.keys())) + for pos in positions: + before_val = before.accepted_per_pos.get(pos, 0) + after_val = after.accepted_per_pos.get(pos, 0) + delta_pos = after_val - before_val + per_pos_rates.append(delta_pos / delta_drafts) + + if delta_draft_tokens <= 0: + LOG.warning( + "No speculative decoding activity detected during generation " + "(delta_draft_tokens=%d). Metrics will be empty.", + delta_draft_tokens, + ) + return None + + acceptance_rate_fraction = delta_accepted / delta_draft_tokens + acceptance_length = 1 + delta_accepted / delta_drafts if delta_drafts > 0 else 0.0 + + return _build_specdec_stats( + num_drafts=delta_drafts, + draft_tokens=delta_draft_tokens, + accepted_tokens=delta_accepted, + acceptance_rate_fraction=acceptance_rate_fraction, + acceptance_length=acceptance_length, + per_position_acceptance_rates=per_pos_rates, + ) + + +# --------------------------------------------------------------------------- +# Generation config & task +# --------------------------------------------------------------------------- + + +@nested_dataclass(kw_only=True) +class SpecdecGenerationConfig(GenerationTaskConfig): + """SPEED-Bench generation config for speculative decoding evaluation. + + Extends the standard generation config to inject server connection + information into the evaluator so it can query the server's ``/metrics`` + Prometheus endpoint after generation completes. + + For the full list of supported parameters, use + ``python -m nemo_skills.inference.generate --help`` + """ + + # Directory to write SGLang metrics to. If not specified, will use a + # temporary directory created at launch. This is the only reliable way + # to share the tempdir between the pipeline process (which builds the + # server command) and this generation worker process. + metrics_file_dir: str | None = None + max_concurrent_requests: int = 32 + + def _post_init_validate_server(self): + super()._post_init_validate_server() + assert self.server["server_type"] in ["sglang", "vllm"], ( + f"server_type must be either 'sglang' or 'vllm' for specdec generation, got '{self.server['server_type']}'" + ) + + +cs = hydra.core.config_store.ConfigStore.instance() +cs.store(name="base_specdec_generation_config", node=SpecdecGenerationConfig) + + +class SpecdecGenerationTask(GenerationTask): + """Custom generation task for SPEED-Bench speculative decoding evaluation. + + Captures speculative decoding counters from the server's ``/metrics`` + Prometheus endpoint *before* and *after* generation, then computes the + delta to derive accurate acceptance length (AL), acceptance rate, and + per-position conditional acceptance rates. + """ + + _sglang_metrics_dir: str | None = None + + def __init__(self, cfg: SpecdecGenerationConfig): + super().__init__(cfg) + self._before_metrics: SpecDecodeMetrics | None = None + + @classmethod + def _ensure_sglang_metrics_dir(cls) -> str: + """Return (and lazily create) a unique temp directory for SGLang metrics.""" + if cls._sglang_metrics_dir is None: + import tempfile + + cls._sglang_metrics_dir = tempfile.mkdtemp(prefix="sglang-metrics-") + LOG.info("Created SGLang metrics temp directory: %s", cls._sglang_metrics_dir) + return cls._sglang_metrics_dir + + @classmethod + def get_generation_default_args(cls) -> str: + """Pass the SGLang metrics temp-directory to the generation worker. + + Called in the **pipeline** process. The returned hydra override is + appended to the generation command, so the worker can read the same + path from ``self.cfg.server.metrics_file_dir``. + """ + metrics_dir = cls._ensure_sglang_metrics_dir() + return f"++metrics_file_dir={metrics_dir}" + + @classmethod + def get_server_command_fn(cls) -> callable: + """Return a wrapper around the default server command builder. + + When the server type is ``sglang``, the wrapper automatically appends + ``--enable-metrics --export-metrics-to-file --export-metrics-to-file-dir`` + so that SGLang writes per-request speculative decoding metrics that the + evaluator can aggregate after generation. + """ + from nemo_skills.pipeline.utils import get_server_command + + metrics_dir = cls._ensure_sglang_metrics_dir() + sglang_metrics_args = f"--enable-metrics --export-metrics-to-file --export-metrics-to-file-dir {metrics_dir}" + + def specdec_server_command( + server_type, + num_gpus, + num_nodes, + model_path, + cluster_config, + server_port, + server_args="", + server_entrypoint=None, + ): + if server_type == "sglang": + server_args = f"{server_args} {sglang_metrics_args}".strip() + return get_server_command( + server_type=server_type, + num_gpus=num_gpus, + num_nodes=num_nodes, + model_path=model_path, + cluster_config=cluster_config, + server_port=server_port, + server_args=server_args, + server_entrypoint=server_entrypoint, + ) + + return specdec_server_command + + def inject_sglang_metrics( + self, + metrics_file_path: str, + ) -> dict[str, Any] | None: + """Inject SGLang metrics into the generation output. + + SGLang writes per-request metrics as JSON lines when launched with + ``--export-metrics-to-file``. Each line contains: + + - ``id``: Request ID (matches the ``rid`` in request_parameters) + - ``spec_accept_length``: Acceptance length for this request + - ``spec_accept_rate``: Acceptance rate for this request + - ``spec_accept_token_num``: Number of accepted tokens + - ``spec_draft_token_num``: Number of draft tokens + - ``spec_verify_ct``: Number of verification steps (drafts) + + Args: + metrics_file_path: Path to the SGLang metrics JSONL file. + request_ids: Optional set of request IDs to filter by. If ``None``, + **all** entries in the file are aggregated (useful when the + server is dedicated to the benchmark). + + Returns: + Dictionary with aggregated spec decode statistics, or ``None`` if no + matching requests were found or metrics are unavailable. + """ + if not os.path.exists(metrics_file_path): + LOG.warning("SGLang metrics file not found: %s, skipping metrics injection", metrics_file_path) + return None + + metrics: dict[str, Any] = {} + with open(metrics_file_path, "rt", encoding="utf-8") as fin: + for i, line in enumerate(fin): + try: + metric = json.loads(line) + metrics[metric["id"]] = { + "acceptance_length": metric["spec_accept_length"], + "acceptance_rate": metric["spec_accept_rate"] * 100, + "accepted_tokens": metric["spec_accept_token_num"], + "draft_tokens": metric["spec_draft_token_num"], + "num_drafts": metric["spec_verify_ct"], + } + except json.JSONDecodeError: + LOG.warning(f"Failed to parse JSON line {i} for metrics injection, skipping") + continue + + data_points = [] + with open(self.cfg.output_file, "rt", encoding="utf-8") as fin: + for i, line in enumerate(fin): + try: + data_points.append(json.loads(line)) + except json.JSONDecodeError: + LOG.warning(f"Failed to parse JSON line {i} for metrics injection, skipping") + continue + + with open(self.cfg.output_file, "wt", encoding="utf-8") as fout: + for data_point in data_points: + if all(response_id in metrics for response_id in data_point["response_ids"]): + data_point.update( + { + "num_drafts": sum( + metrics[response_id]["num_drafts"] for response_id in data_point["response_ids"] + ), + "draft_tokens": sum( + metrics[response_id]["draft_tokens"] for response_id in data_point["response_ids"] + ), + "accepted_tokens": sum( + metrics[response_id]["accepted_tokens"] for response_id in data_point["response_ids"] + ), + "acceptance_rate": sum( + metrics[response_id]["acceptance_rate"] for response_id in data_point["response_ids"] + ) + / len(data_point["response_ids"]), + "acceptance_length": sum( + metrics[response_id]["acceptance_length"] for response_id in data_point["response_ids"] + ) + / len(data_point["response_ids"]), + } + ) + else: + LOG.warning( + "No metrics found for response_ids: %s, skipping data point", data_point["response_ids"] + ) + fout.write(json.dumps(data_point) + "\n") + + try: + return_value = { + "num_drafts": sum([data_point["num_drafts"] for data_point in data_points]), + "draft_tokens": sum([data_point["draft_tokens"] for data_point in data_points]), + "accepted_tokens": sum([data_point["accepted_tokens"] for data_point in data_points]), + "acceptance_rate": sum([data_point["acceptance_rate"] for data_point in data_points]) + / len(data_points), + "acceptance_length": sum([data_point["acceptance_length"] for data_point in data_points]) + / len(data_points), + "per_position_acceptance_rates": [], + } + except KeyError: + LOG.warning("Metrics injection failed for some data points, skipping") + return None + return return_value + + async def process_single_datapoint(self, data_point, all_data, prompt_format="openai"): + """Handle single-turn and multi-turn generation. + + For single-turn data points (or when ``multiturn`` is ``False``), this + falls through to the base-class implementation. + + For multi-turn data points, it loops through every turn, building up + the conversation and accumulating token counts. + + Also tracks request IDs for SGLang metrics matching. + """ + + messages = [] + responses = [] + + for message in data_point["messages"]: + messages.append(message) + new_data_point = {"messages": messages} + current_response = await super().process_single_datapoint( + new_data_point, all_data, prompt_format=prompt_format + ) + if "response" in current_response: + raw_response = current_response.pop("response") + current_response["response_id"] = raw_response.id + responses.append(current_response) + messages.append({"role": "assistant", "content": current_response["generation"]}) + + # Return aggregated results + return { + "generation": [response["generation"] for response in responses], + "num_generated_tokens": sum([response["num_generated_tokens"] for response in responses]), + "response_ids": [response["response_id"] for response in responses], + } + + def _get_server_base_address(self) -> str: + """Derive the server base address from the config. + + Returns: + Server base address string, e.g. ``http://127.0.0.1:5000``. + """ + return self.cfg.server.get("base_url") or f"http://{self.cfg.server['host']}:{self.cfg.server['port']}" + + def wait_for_server(self): + """Wait for the server, then snapshot speculative decoding counters. + + For VLLM: Captures the "before" VLLM Prometheus counters + (``vllm:spec_decode_*``) so that after generation we can compute a + clean delta isolated to our generation run. + + For SGLang: Captures the "before" SGLang Prometheus gauges + (``sglang:spec_accept_length``, ``sglang:spec_accept_rate``) and + counters (``sglang:num_requests_total``, etc.) for the same purpose. + If the metrics file (``--export-metrics-to-file``) is available after + generation, it takes priority over the Prometheus delta. + """ + super().wait_for_server() + server_type = self.cfg.server["server_type"] + base_url = self._get_server_base_address() + + if server_type == "sglang": + # SGLang: Fetch "before" snapshot from Prometheus /metrics + LOG.info("Fetching BEFORE SGLang spec-decode metrics from %s/metrics", base_url) + self._before_metrics = fetch_sglang_spec_decode_metrics(base_url) + LOG.info( + "SGLang before snapshot: accept_length=%.4f, accept_rate=%.4f, num_requests=%d, gen_tokens=%d", + self._before_metrics.spec_accept_length, + self._before_metrics.spec_accept_rate, + self._before_metrics.num_requests, + self._before_metrics.generation_tokens, + ) + return + + # VLLM: Fetch before snapshot + LOG.info("Fetching BEFORE VLLM spec-decode metrics from %s/metrics", base_url) + self._before_metrics = fetch_vllm_spec_decode_metrics(base_url) + LOG.info( + "Before snapshot: drafts=%d, draft_tokens=%d, accepted=%d, per_pos_keys=%s", + self._before_metrics.num_drafts, + self._before_metrics.num_draft_tokens, + self._before_metrics.num_accepted_tokens, + sorted(self._before_metrics.accepted_per_pos.keys()), + ) + + def run_batch_evaluation(self): + """Fetch after-metrics, compute delta, then run the evaluator. + + For **VLLM**: Uses Prometheus ``/metrics`` endpoint with before/after + delta on the ``vllm:spec_decode_*`` counters. + + For **SGLang**: Two strategies are attempted in order: + + 1. **Metrics file (preferred)** — if the SGLang server was launched + with ``--export-metrics-to-file``, read the per-request JSONL log + and aggregate speculative decoding statistics. + 2. **Prometheus before/after fallback** — fetch + ``sglang:spec_accept_length``, ``sglang:spec_accept_rate`` (gauges) + and ``sglang:num_requests_total`` (counter) *before* and *after* + generation, then back out the benchmark-specific averages from the + weighted delta. + + The ``eval_specdec`` evaluator receives the pre-computed stats in + ``eval_config["specdec_stats"]`` and stamps them onto each data point + in the output JSONL. + """ + server_address = self._get_server_base_address() + server_type = self.cfg.server["server_type"] + + specdec_stats: dict[str, Any] | None = None + + if server_type == "sglang": + # ----- Strategy 1: Metrics file (preferred) ----- + metrics_dir = getattr(self.cfg, "metrics_file_dir", None) + if metrics_dir: + metrics_file = find_sglang_metrics_file(metrics_dir) + if metrics_file: + specdec_stats = self.inject_sglang_metrics(metrics_file) + else: + LOG.warning("Could not find SGLang metrics file in %s", metrics_dir) + + # ----- Strategy 2: Prometheus before/after fallback ----- + if specdec_stats is None: + LOG.info("Falling back to SGLang Prometheus before/after delta…") + LOG.info("Fetching AFTER SGLang spec-decode metrics from %s/metrics", server_address) + after_sglang = fetch_sglang_spec_decode_metrics(server_address) + + specdec_stats = compute_sglang_spec_decode_delta(self._before_metrics, after_sglang) + LOG.info( + "SGLang Prometheus delta: acceptance_length=%.4f, " + "acceptance_rate=%.2f%%, requests=%d, gen_tokens=%d", + specdec_stats["acceptance_length"], + specdec_stats["acceptance_rate"], + specdec_stats["num_drafts"], + specdec_stats["draft_tokens"], + ) + else: + # VLLM: Fetch "after" snapshot and compute delta + LOG.info("Fetching AFTER spec-decode metrics from %s/metrics", server_address) + after_metrics = fetch_vllm_spec_decode_metrics(server_address) + + specdec_stats = compute_vllm_spec_decode_delta(self._before_metrics, after_metrics) + LOG.info( + "Spec-decode delta: drafts=%d, draft_tokens=%d, accepted=%d, " + "acceptance_rate=%.2f%%, acceptance_length=%.4f", + specdec_stats["num_drafts"], + specdec_stats["draft_tokens"], + specdec_stats["accepted_tokens"], + specdec_stats["acceptance_rate"], + specdec_stats["acceptance_length"], + ) + if specdec_stats["per_position_acceptance_rates"]: + LOG.info( + "Per-position acceptance rates: %s", + [f"{r:.4f}" for r in specdec_stats["per_position_acceptance_rates"]], + ) + + # Inject into eval_config for the evaluator + self.cfg.eval_config["specdec_stats"] = specdec_stats + + super().run_batch_evaluation() + + +GENERATION_TASK_CLASS = SpecdecGenerationTask + + +@hydra.main(version_base=None, config_name="base_specdec_generation_config") +def specdec_generation(cfg: SpecdecGenerationConfig): + cfg = SpecdecGenerationConfig(_init_nested=True, **cfg) + LOG.info("Config used: %s", cfg) + + task = SpecdecGenerationTask(cfg) + task.generate() + + +HELP_MESSAGE = get_help_message( + SpecdecGenerationConfig, + server_params=server_params(), +) + +if __name__ == "__main__": + if "--help" in sys.argv or "-h" in sys.argv: + print(HELP_MESSAGE) + else: + setup_logging() + specdec_generation() diff --git a/nemo_skills/inference/server/serve_sglang.py b/nemo_skills/inference/server/serve_sglang.py index 1015c191d9..6383112e15 100644 --- a/nemo_skills/inference/server/serve_sglang.py +++ b/nemo_skills/inference/server/serve_sglang.py @@ -14,6 +14,7 @@ import argparse import subprocess +from shlex import join def main(): @@ -32,7 +33,7 @@ def main(): if args.dist_init_addr is None: raise ValueError("dist_init_addr must be specified for multi-node setup") - extra_arguments = f"{' '.join(unknown)}" + extra_arguments = join(unknown) print(f"Deploying model {args.model}") print("Starting OpenAI Server") diff --git a/nemo_skills/inference/server/serve_vllm.py b/nemo_skills/inference/server/serve_vllm.py index 1c9b7636e3..c27f090f8a 100644 --- a/nemo_skills/inference/server/serve_vllm.py +++ b/nemo_skills/inference/server/serve_vllm.py @@ -14,6 +14,7 @@ import argparse import subprocess +from shlex import join def main(): @@ -25,7 +26,7 @@ def main(): parser.add_argument("--no_verbose", action="store_true", help="Print verbose logs") args, unknown = parser.parse_known_args() - extra_arguments = f"{' '.join(unknown)}" + extra_arguments = join(unknown) print(f"Deploying model {args.model}") print("Starting OpenAI Server") diff --git a/tests/gpu-tests/test_eval.py b/tests/gpu-tests/test_eval.py index 098e359154..3836b37aa6 100644 --- a/tests/gpu-tests/test_eval.py +++ b/tests/gpu-tests/test_eval.py @@ -55,6 +55,8 @@ "compute-eval", # CritPt requires exactly 70 submissions and external API key (ARTIFICIAL_ANALYSIS_API_KEY) "critpt", + # SPEED-Bench downloads dozens of large external HF datasets, exhausting CI runner disk space + "speed-bench", }